監控持續查詢

您可以使用下列 BigQuery 工具監控 BigQuery 持續性查詢

由於 BigQuery 持續性查詢的執行時間很長,因此通常在 SQL 查詢完成時產生的指標可能會缺少或不準確。

使用 INFORMATION_SCHEMA 檢視畫面

您可以使用多個 INFORMATION_SCHEMA 檢視畫面來監控連續查詢和連續查詢保留。

查看工作詳細資料

您可以使用 JOBS 檢視區塊取得持續查詢工作中繼資料。

下列查詢會傳回所有有效的持續查詢中繼資料。這些中繼資料包含輸出水印時間戳記,代表持續性查詢已成功處理資料的時間點。

  1. 前往 Google Cloud 控制台的「BigQuery」頁面。

    前往 BigQuery

  2. 在查詢編輯器中執行以下查詢:

    SELECT
      start_time,
      job_id,
      user_email,
      query,
      state,
      reservation_id,
      continuous_query_info.output_watermark
    FROM `PROJECT_ID.region-REGION.INFORMATION_SCHEMA.JOBS`
    WHERE
      creation_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 day)
      AND continuous IS TRUE
      AND state = "RUNNING"
    ORDER BY
      start_time DESC

    更改下列內容:

查看預約指派詳細資料

您可以使用 ASSIGNMENTSRESERVATIONS 檢視畫面,取得持續查詢保留作業指派詳細資料。

傳回持續查詢的預留項目指派詳細資料:

  1. 前往 Google Cloud 控制台的「BigQuery」頁面。

    前往 BigQuery

  2. 在查詢編輯器中執行以下查詢:

    SELECT
      reservation.reservation_name,
      reservation.slot_capacity
    FROM
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.ASSIGNMENTS`
        AS assignment
    INNER JOIN
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.RESERVATIONS`
        AS reservation
      ON (assignment.reservation_name = reservation.reservation_name)
    WHERE
      assignment.assignee_id = 'PROJECT_ID'
      AND job_type = 'CONTINUOUS';

    更改下列內容:

    • ADMIN_PROJECT_ID:擁有預留容量的管理專案 ID。
    • LOCATION:預訂的住宿地點。
    • PROJECT_ID:指派給預留項目的專案 ID。只會傳回在這個專案中執行的持續查詢相關資訊。

查看運算單元用量資訊

您可以使用 ASSIGNMENTSRESERVATIONSJOBS_TIMELINE 檢視畫面,取得持續查詢時段使用情形資訊。

傳回持續查詢的空格用量資訊:

  1. 前往 Google Cloud 控制台的「BigQuery」頁面。

    前往 BigQuery

  2. 在查詢編輯器中執行以下查詢:

    SELECT
      jobs.period_start,
      reservation.reservation_name,
      reservation.slot_capacity,
      SUM(jobs.period_slot_ms) / 1000 AS consumed_total_slots
    FROM
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.ASSIGNMENTS`
        AS assignment
    INNER JOIN
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.RESERVATIONS`
        AS reservation
      ON (assignment.reservation_name = reservation.reservation_name)
    INNER JOIN
      `PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.JOBS_TIMELINE` AS jobs
      ON (
        UPPER(CONCAT('ADMIN_PROJECT_ID:LOCATION.', assignment.reservation_name))
        = UPPER(jobs.reservation_id))
    WHERE
      assignment.assignee_id = 'PROJECT_ID'
      AND assignment.job_type = 'CONTINUOUS'
      AND jobs.period_start
        BETWEEN TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
        AND CURRENT_TIMESTAMP()
    GROUP BY 1, 2, 3
    ORDER BY jobs.period_start DESC;

    更改下列內容:

    • ADMIN_PROJECT_ID:擁有預留容量的管理專案 ID。
    • LOCATION:預訂的住宿地點。
    • PROJECT_ID:指派給預留項目的專案 ID。只會傳回在這個專案中執行的持續查詢資訊。

您也可以使用其他工具監控持續查詢保留作業,例如 Metrics Explorer管理資源圖表。詳情請參閱「監控 BigQuery 預留空間」。

使用查詢執行圖表

您可以使用查詢執行圖,取得持續查詢的效能深入分析和一般統計資料。詳情請參閱「查看查詢效能深入分析」。

查看工作記錄

您可以在個人工作記錄或專案的工作記錄中查看持續查詢工作詳細資料。詳情請參閱「查看工作詳細資料」。

請注意,工作歷史記錄清單會依工作開始時間排序,因此執行一段時間的連續查詢可能不會出現在清單的開頭。

使用管理工作探索工具

在管理工作探索器中,將工作類別篩選器設為連續查詢篩選工作以顯示連續查詢。

使用 Cloud Monitoring

您可以使用 Cloud Monitoring 查看 BigQuery 持續查詢專屬的指標。如需更多資訊,請參閱「建立資訊��頁、圖表和快訊」一文,並瞭解可用於視覺化呈現的指標

針對失敗的查詢發出快訊

您可以建立快訊,在持續查詢失敗時通知您,而非定期檢查查詢是否失敗。方法之一是建立自訂 Cloud Logging 記錄指標,並為工作建立篩選器,以及根據該指標建立 Cloud Monitoring 警告政策

  1. 建立持續查詢時,請使用自訂工作 ID 前置字串。多個持續性查詢可以共用相同的前置字串。舉例來說,您可以使用前置字串 prod- 表示正式版查詢。
  2. 在 Google Cloud 控制台中,前往「記錄指標」頁面。

    前往「Logs-based Metrics」(記錄指標)

  3. 按一下「建立指標」,畫面上會顯示「Create logs metric」面板。

  4. 在「Metric type」(指標類型) 部分,選取「Counter」(計數器)

  5. 在「Details」部分,為指標命名。例如:CUSTOM_JOB_ID_PREFIX-metric

  6. 在「Filter selection」部分,在「Build filter」編輯器中輸入以下內容:

    resource.type = "bigquery_project"
    protoPayload.resourceName : "projects/PROJECT_ID/jobs/CUSTOM_JOB_ID_PREFIX"
    severity = ERROR
    

    更改下列內容:

  7. 點選「建立指標」

  8. 在導覽選單中,按一下「以記錄為準的評估指標」。您剛建立的指標會顯示在使用者定義指標清單中。

  9. 在指標的資料列中,依序按一下 「更多動作」和「運用指標建立快訊」

  10. 點選「下一步」。您不需要變更「政策設定模式」頁面上的預設設定。

  11. 點選「下一步」。您不需要變更「Configure alert trigger」(設定快訊觸發條件)頁面上的預設設定。

  12. 選取通知管道,然後輸入快訊政策的名稱。

  13. 按一下「建立政策」

您可以使用所選自訂作業 ID 前置字元執行持續查詢,然後取消查詢,以便測試警示。警示可能需要幾分鐘的時間才能傳送至通知管道。

重試失敗的查詢

重試失敗的持續查詢,有助於避免持續管道長時間停機,或需要人為介入才能重新啟動。重試失敗的持續性查詢時,請考量以下重要事項:

  • 是否可容許重新處理先前查詢失敗前處理的部分資料。
  • 如何處理重試限制或使用指數輪詢。

自動重試查詢的其中一種方法如下:

  1. 根據符合下列條件的納入篩選器,建立 Cloud Logging 接收器,將記錄轉送至 Pub/Sub 主題:

    resource.type = "bigquery_project"
    protoPayload.resourceName : "projects/PROJECT_ID/jobs/CUSTOM_JOB_ID_PREFIX"
    severity = ERROR
    

    更改下列內容:

  2. 建立 Cloud Run 函式,當 Pub/Sub 接收符合篩選條件的記錄時,系統就會觸發此函式。

    Cloud Run 函式可接受 Pub/Sub 訊息的資料酬載,並嘗試使用與失敗查詢相同的 SQL 語法,在先前工作停止後立即開始新的持續性查詢。

例如,您可以使用類似以下的函式:

Python

在嘗試這個範例之前,請先按照 BigQuery 快速入門:使用用戶端程式庫中的 Python 設定說明進行操作。詳情請參閱 BigQuery Python API 參考說明文件

如要向 BigQuery 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證機制」。

import base64
import json
import logging
import re
import uuid

import google.auth
import google.auth.transport.requests
import requests


def retry_continuous_query(event, context):
    logging.info("Cloud Function started.")

    if "data" not in event:
        logging.info("No data in Pub/Sub message.")
        return

    try:
        # Decode and parse the Pub/Sub message data
        log_entry = json.loads(base64.b64decode(event["data"]).decode("utf-8"))

        # Extract the SQL query and other necessary data
        proto_payload = log_entry.get("protoPayload", {})
        metadata = proto_payload.get("metadata", {})
        job_change = metadata.get("jobChange", {})
        job = job_change.get("job", {})
        job_config = job.get("jobConfig", {})
        query_config = job_config.get("queryConfig", {})
        sql_query = query_config.get("query")
        job_stats = job.get("jobStats", {})
        end_timestamp = job_stats.get("endTime")
        failed_job_id = job.get("jobName")

        # Check if required fields are missing
        if not all([sql_query, failed_job_id, end_timestamp]):
            logging.error("Required fields missing from log entry.")
            return

        logging.info(f"Retrying failed job: {failed_job_id}")

        # Adjust the timestamp in the SQL query
        timestamp_match = re.search(
            r"\s*TIMESTAMP\(('.*?')\)(\s*\+ INTERVAL 1 MICROSECOND)?", sql_query
        )

        if timestamp_match:
            original_timestamp = timestamp_match.group(1)
            new_timestamp = f"'{end_timestamp}'"
            sql_query = sql_query.replace(original_timestamp, new_timestamp)
        elif "CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE" in sql_query:
            new_timestamp = f"TIMESTAMP('{end_timestamp}') + INTERVAL 1 MICROSECOND"
            sql_query = sql_query.replace(
                "CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE", new_timestamp
            )

        # Get access token
        credentials, project = google.auth.default(
            scopes=["https://www.googleapis.com/auth/cloud-platform"]
        )
        request = google.auth.transport.requests.Request()
        credentials.refresh(request)
        access_token = credentials.token

        # API endpoint
        url = f"https://bigquery.googleapis.com/bigquery/v2/projects/{project}/jobs"

        # Request headers
        headers = {
            "Authorization": f"Bearer {access_token}",
            "Content-Type": "application/json",
        }

        # Generate a random UUID
        random_suffix = str(uuid.uuid4())[:8]  # Take the first 8 characters of the UUID

        # Combine the prefix and random suffix
        job_id = f"CUSTOM_JOB_ID_PREFIX{random_suffix}"

        # Request payload
        data = {
            "configuration": {
                "query": {
                    "query": sql_query,
                    "useLegacySql": False,
                    "continuous": True,
                    "connectionProperties": [
                        {"key": "service_account", "value": "SERVICE_ACCOUNT"}
                    ],
                    # ... other query parameters ...
                },
                "labels": {"bqux_job_id_prefix": "CUSTOM_JOB_ID_PREFIX"},
            },
            "jobReference": {
                "projectId": project,
                "jobId": job_id,  # Use the generated job ID here
            },
        }

        # Make the API request
        response = requests.post(url, headers=headers, json=data)

        # Handle the response
        if response.status_code == 200:
            logging.info("Query job successfully created.")
        else:
            logging.error(f"Error creating query job: {response.text}")

    except Exception as e:
        logging.error(
            f"Error processing log entry or retrying query: {e}", exc_info=True
        )

    logging.info("Cloud Function finished.")

後續步驟