建立持續查詢

本文件說明如何在 BigQuery 中執行持續查詢

BigQuery 持續查詢是會持續執行的 SQL 陳述式,持續查詢可讓您即時在 BigQuery 中分析傳入的資料,然後將結果匯出至 Bigtable 或 Pub/Sub,或將結果寫入 BigQuery 資料表。

選擇帳戶類型

您可以使用使用者帳戶建立及執行持續查詢工作,也可以使用使用者帳戶建立持續查詢工作,然後使用服務帳戶執行該工作。您必須使用服務帳戶執行持續查詢,將結果匯出至 Pub/Sub 主題。

使用使用者帳戶時,持續性查詢最多可執行兩天。使用服務帳戶時,持續查詢最多可執行 150 天。詳情請參閱「授權」。

所需權限

本節說明建立及執行持續性查詢所需的權限。除了上述提到的身分與存取權管理 (IAM) 角色,您也可以透過自訂角色取得必要權限。

使用使用者帳戶時的權限

本節提供使用者帳戶建立及執行持續查詢時,所需的角色和權限相關資訊。

如要在 BigQuery 中建立工作,使用者帳戶必須具備 bigquery.jobs.create IAM 權限。以下各項 IAM 角色會授予 bigquery.jobs.create 權限:

如要從 BigQuery 資料表匯出資料,使用者帳戶必須具備 bigquery.tables.export IAM 權限。以下各項 IAM 角色都會授予 bigquery.tables.export 權限:

如要更新 BigQuery 資料表中的資料,使用者帳戶必須具備 bigquery.tables.updateData IAM 權限。以下各項 IAM 角色都會授予 bigquery.tables.updateData 權限:

如果使用者帳戶必須啟用持續查詢用途所需的 API,則該帳戶必須具備「服務用量管理員 (roles/serviceusage.serviceUsageAdmin)」角色。

使用服務帳戶時的權限

本節提供有關角色和權限的資訊,這些角色和權限是建立持續查詢的使用者帳戶,以及執行持續查詢的服務帳戶所需的。

使用者帳戶權限

如要在 BigQuery 中建立工作,使用者帳戶必須具備 bigquery.jobs.create IAM 權限。以下各項 IAM 角色都會授予 bigquery.jobs.create 權限:

如要提交使用服務帳戶執行的工作,使用者帳戶必須具備「服務帳戶使用者」(roles/iam.serviceAccountUser) 角色。如果您使用相同的使用者帳戶建立服務帳戶,則該使用者帳戶必須具備「服務帳戶管理員 (roles/iam.serviceAccountAdmin)」角色。如要瞭解如何限制使用者對單一服務帳戶的存取權,而非專案中的所有服務帳戶,請參閱「授予單一角色」一文。

如果使用者帳戶必須啟用持續查詢用途所需的 API,則該帳戶必須具備「服務用量管理員 (roles/serviceusage.serviceUsageAdmin)」角色。

服務帳戶權限

如要從 BigQuery 資料表匯出資料,服務帳戶必須具備 bigquery.tables.export IAM 權限。以下各項 IAM 角色都會授予 bigquery.tables.export 權限:

如要更新 BigQuery 資料表中的資料,服務帳戶必須具備 bigquery.tables.updateData IAM 權限。以下各項 IAM 角色都會授予 bigquery.tables.updateData 權限:

事前準備

  1. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  2. Make sure that billing is enabled for your Google Cloud project.

  3. Enable the BigQuery API.

    Enable the API

建立保留項目

建立 Enterprise 或 Enterprise Plus 版本保留項目,然後使用 CONTINUOUS 工作類型建立保留項目指派。此預留項目可使用自動調整資源配置。有預留限制適用於持續查詢的預留指派。

匯出至 Pub/Sub

您必須具備額外的 API、身分與存取權管理權限和 Google Cloud 資源,才能將資料匯出至 Pub/Sub。詳情請參閱「匯出至 Pub/Sub」。

在 Pub/Sub 訊息中將自訂屬性嵌入為中繼資料

您可以使用 Pub/Sub 屬性提供訊息的其他資訊,例如優先順序、來源、目的地或其他中繼資料。您也可以使用屬性篩選訂閱項目中的訊息

在連續查詢結果中,如果資料欄名為 _ATTRIBUTES,則其值會複製到 Pub/Sub 訊息屬性。_ATTRIBUTES 中提供的欄位會用做屬性鍵。

_ATTRIBUTES 欄必須是 JSON 類型,格式為 ARRAY<STRUCT<STRING, STRING>>STRUCT<STRING>

如需範例,請參閱將資料匯出至 Pub/Sub 主題

匯出至 Bigtable

您必須具備其他 API、IAM 權限和 Google Cloud資源,才能將資料匯出至 Bigtable。詳情請參閱「匯出至 Bigtable」。

將資料寫入 BigQuery 資料表

您可以使用 INSERT 陳述式,將資料寫入 BigQuery 資料表。

使用 AI 函式

您需要額外的 API、IAM 權限和 Google Cloud資源,才能在持續查詢中使用支援的 AI 函式。如需詳細資訊,請根據您的用途參閱下列任一主題:

在持續查詢中使用 AI 函式時,請考量查詢輸出內容是否會維持在函式的配額範圍內。如果超出配額,您可能必須個別處理未處理的記錄。

指定起點

您必須在持續性查詢的 FROM 子句中使用 APPENDS 函式,���定���處理的最���資料。舉例來說,APPENDS(TABLE my_table, CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) 會指示 BigQuery 處理在持續查詢開始前最多 10 分鐘內新增至資料表 my_table 的資料。新增至 my_table 的資料會在收到時處理。資料處理作業不會有延遲時間。在持續查詢中使用 APPENDS 函式時,請勿為其提供 end_timestamp 引數。

以下範例說明如何在查詢接收串流計程車資訊的 BigQuery 表格時,使用 APPENDS 函式,從特定時間點開始執行持續查詢:

EXPORT DATA
  OPTIONS (format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS (
  SELECT
    TO_JSON_STRING(STRUCT(ride_id,
        timestamp,
        latitude,
        longitude)) AS message
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
      -- Configure the APPENDS TVF start_timestamp to specify when you want to
      -- start processing data using your continuous query.
      -- This example starts processing at 10 minutes before the current time.
      CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE
    ride_status = 'enroute');

指定早於目前時間的起始時間

如果您想處理目前時間點之前的資料,可以使用 APPENDS 函式指定查詢的較早起始點。您指定的起點必須落在所選資料表的時間回溯期內。時間回溯期預設為過去七天。

如要納入時間回溯期間以外的資料,請使用標準查詢,插入或匯出特定時間點之前的資料,然後從該時間點開始執行持續查詢。

範例

以下範例說明如何從 BigQuery 表格載入舊資料,該表格會接收計程車行程資訊,直到特定時間點為止,然後從舊資料的截止時間開始執行持續查詢。

  1. 執行標準查詢,將資料回填至特定時間點:

    INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides`
    SELECT
      timestamp,
      meter_reading,
      ride_status,
      passenger_count,
      ST_Distance(
        ST_GeogPoint(pickup_longitude, pickup_latitude),
        ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance,
        SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger
    FROM `myproject.real_time_taxi_streaming.taxirides`
      -- Include all data inserted into the table up to this point in time.
      -- This timestamp must be within the time travel window.
      FOR SYSTEM_TIME AS OF '2025-01-01 00:00:00 UTC'
    WHERE
      ride_status = 'dropoff';
  2. 從查詢停止的時間點開始執行持續查詢:

    INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides`
    SELECT
      timestamp,
      meter_reading,
      ride_status,
      passenger_count,
      ST_Distance(
        ST_GeogPoint(pickup_longitude, pickup_latitude),
        ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance,
        SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger
    FROM
      APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
        -- Configure the APPENDS TVF start_timestamp to start processing
        -- data right where the batch query left off.
        -- This timestamp must be within the time travel window.
        TIMESTAMP '2025-01-01 00:00:00 UTC' + INTERVAL 1 MICROSECOND)
    WHERE
      ride_status = 'dropoff';

使用使用者帳戶執行持續查詢

本節說明如何使用使用者帳戶執行持續查詢。持續性查詢執行後,您可以關閉 Google Cloud 主控台、終端機視窗或應用程式,而不會中斷查詢執行作業。使用者帳戶執行的持續性查詢最多可執行兩天,之後就會自動停止。如要繼續處理傳入的新資料,請啟動新的持續查詢並指定起始點。如要自動執行這項程序,請參閱「重試失敗的查詢」一文。

請按照下列步驟執行持續查詢:

主控台

  1. 前往 Google Cloud 控制台的「BigQuery」頁面。

    前往 BigQuery

  2. 在查詢編輯器中,按一下 「更多」

    1. 在「Choose query mode」部分,選擇「Continuous query」
    2. 按一下「確認」。
    3. 選用步驟:如要控制查詢執行的時間長度,請按一下「查詢設定」,然後以毫秒為單位設定「工作逾時時間」
  3. 在查詢編輯器中,輸入持續查詢的 SQL 陳述式。SQL 陳述式只能包含支援的運算

  4. 按一下「執行」

bq

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 在 Cloud Shell 中,使用 bq query 指令搭配 --continuous 旗標,執行持續查詢:

    bq query --use_legacy_sql=false --continuous=true
    'QUERY'

    QUERY 替換為持續查詢的 SQL 陳述式。SQL 陳述式只能包含支援的作業。您可以使用 --job_timeout_ms 標記控制查詢執行的時間長度。

API

呼叫 jobs.insert 方法執行持續查詢。您必須在傳入的 Job 資源JobConfigurationQuery 中,將 continuous 欄位設為 true。您可以選擇設定 jobTimeoutMs 欄位,藉此控制查詢執行的時間長度。

curl --request POST \
  "https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs" \
  --header "Authorization: Bearer $(gcloud auth print-access-token)" \
  --header "Content-Type: application/json; charset=utf-8" \
  --data '{"configuration":{"query":{"query":"QUERY","useLegacySql":false,"continuous":true}}}' \
  --compressed

更改下列內容:

  • PROJECT_ID:您的專案 ID。
  • QUERY:持續性查詢的 SQL 陳述式。SQL 陳述式只能包含支援的運算

使用服務帳戶執行持續查詢

本節說明如何使用服務帳戶執行持續查詢。���續性查詢執行後,您可以關閉 Google Cloud 主控台、終端機視窗或應用程式,而不會中斷查詢執行作業。使用服務帳戶執行的持續查詢最多可執行 150 天,然後自動停止。如要繼續處理傳入的新資料,請啟動新的持續查詢並指定起始點。如要自動執行這項程序,請參閱「重試失敗的查詢」一文。

如要使用服務帳戶執行持續查詢,請按照下列步驟操作:

主控台

  1. 建立服務帳戶
  2. 授予服務帳戶所需的權限
  3. 前往 Google Cloud 控制台的「BigQuery」頁面。

    前往 BigQuery

  4. 在查詢編輯器中,按一下「更多」

  5. 在「Choose query mode」部分,選擇「Continuous query」

  6. 按一下「確認」。

  7. 在查詢編輯器中,依序點選「更多」「查詢設定」。

  8. 在「持續查詢」專區中,使用「服務帳戶」方塊選取您建立的服務帳戶。

  9. 選用步驟:如要控管查詢執行時間,請以毫秒為單位設定工作逾時時間

  10. 按一下 [儲存]

  11. 在查詢編輯器中,輸入持續查詢的 SQL 陳述式。SQL 陳述式只能包含支援的運算

  12. 按一下「執行」

bq

  1. 建立服務帳戶
  2. 授予服務帳戶所需的權限
  3. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  4. 在指令列上,使用 bq query 指令搭配下列標記,執行持續查詢:

    • --continuous 旗標設為 true,即可讓查詢持續執行。
    • 使用 --connection_property 標記指定要使用的服務帳戶。
    • 選用步驟:設定 --job_timeout_ms 旗標,限制查詢執行時間。
    bq query --project_id=PROJECT_ID --use_legacy_sql=false \
    --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \
    'QUERY'

    更改下列內容:

    • PROJECT_ID:您的專案 ID。
    • SERVICE_ACCOUNT_EMAIL:服務帳戶電子郵件地址。您可以從 Google Cloud 控制台的「服務帳戶」頁面取得服務帳戶電子郵件地址。
    • QUERY:持續性查詢的 SQL 陳述式。SQL 陳述式只能包含支援的運算

API

  1. 建立服務帳戶
  2. 授予服務帳戶所需的權限
  3. 呼叫 jobs.insert 方法執行持續查詢。在您傳入的 Job 資源中,設定 JobConfigurationQuery 資源的下列欄位:

    • continuous 欄位設為 true,即可讓查詢持續執行。
    • 使用 connectionProperties 欄位指定要使用的服務帳戶。

    您可以選擇在 JobConfiguration 資源中設定 jobTimeoutMs 欄位,藉此控制查詢執行的時間長度。

    curl --request POST \
      "https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs" \
      --header "Authorization: Bearer $(gcloud auth print-access-token)" \
      --header "Content-Type: application/json; charset=utf-8" \
      --data '{"configuration":{"query":{"query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":[{"key":"service_account","value":"SERVICE_ACCOUNT_EMAIL"}]}}}' \
      --compressed

    更改下列內容:

    • PROJECT_ID:您的專案 ID。
    • QUERY:持續性查詢的 SQL 陳述式。SQL 陳述式只能包含支援的運算
    • SERVICE_ACCOUNT_EMAIL:服務帳戶電子郵件地址。您可以在 Google Cloud 控制台的「服務帳戶」頁面中取得服務帳戶電子郵件地址。

建立自訂工作 ID

每個查詢工作都會指派工作 ID,您可以使用該 ID 搜尋及管理工作。根據預設,系統會隨機產生工作 ID。如要透過工作記錄工作探索器,更輕鬆地搜尋持續查詢的工作 ID,您可以指派自訂工作 ID 前置字串:

  1. 前往 Google Cloud 控制台的「BigQuery」頁面。

    前往 BigQuery

  2. 在查詢編輯器中,按一下「更多」

  3. 在「Choose query mode」部分,選擇「Continuous query」

  4. 按一下「確認」。

  5. 在查詢編輯器中,依序點選「更多」>「查詢設定」

  6. 在「自訂工作 ID 前置字串」部分,輸入自訂名稱前置字串。

  7. 按一下 [儲存]

範例

以下 SQL 範例說明持續查詢的常見用途。

將資料匯出至 Pub/Sub 主題

以下範例顯示持續性查詢,可從接收計程車乘車資訊串流的 BigQuery 資料表篩選資料,並將資料以訊息屬性即時發布至 Pub/Sub 主題:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude)) AS message,
    TO_JSON(
      STRUCT(
        CAST(passenger_count AS STRING) AS passenger_count)) AS _ATTRIBUTES
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`,
      -- Configure the APPENDS TVF start_timestamp to specify when you want to
      -- start processing data using your continuous query.
      -- This example starts processing at 10 minutes before the current time.
      CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE ride_status = 'enroute'
);

將資料匯出至 Bigtable 資料表

以下範例顯示持續查詢,可從接收串流計程車行程資訊的 BigQuery 資料表篩選資料,並即時將資料匯出至 Bigtable 資料表:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_BIGTABLE',
    truncate = TRUE,
    overwrite = TRUE,
    uri = 'https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides')
AS (
  SELECT
    CAST(CONCAT(ride_id, timestamp, latitude, longitude) AS STRING) AS rowkey,
    STRUCT(
      timestamp,
      latitude,
      longitude,
      meter_reading,
      ride_status,
      passenger_count) AS features
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
      -- Configure the APPENDS TVF start_timestamp to specify when you want to
      -- start processing data using your continuous query.
      -- This example starts processing at 10 minutes before the current time.
      CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE ride_status = 'enroute'
);

將資料寫入 BigQuery 資料表

下列範例顯示持續查詢,可從接收串流計程車行程資訊的 BigQuery 資料表中篩選及轉換資料,然後即時將資料寫入另一個 BigQuery 表格。這樣一來,資料就能用於進一步的下游分析。

INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides`
SELECT
  timestamp,
  meter_reading,
  ride_status,
  passenger_count,
  ST_Distance(
    ST_GeogPoint(pickup_longitude, pickup_latitude),
    ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance,
    SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger
FROM
  APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
    -- Configure the APPENDS TVF start_timestamp to specify when you want to
    -- start processing data using your continuous query.
    -- This example starts processing at 10 minutes before the current time.
    CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
WHERE
  ride_status = 'dropoff';

使用 Vertex AI 模型處理資料

以下範例顯示持續性查詢,該查詢會使用 Vertex AI 模型,根據乘客目前的緯度和經度為他們產生廣告,然後將結果即時匯出至 Pub/Sub 主題:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude,
        prompt,
        ml_generate_text_llm_result)) AS message
  FROM
    ML.GENERATE_TEXT(
      MODEL `myproject.real_time_taxi_streaming.taxi_ml_generate_model`,
      (
        SELECT
          timestamp,
          ride_id,
          latitude,
          longitude,
          CONCAT(
            'Generate an ad based on the current latitude of ',
            latitude,
            ' and longitude of ',
            longitude) AS prompt
        FROM
          APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
            -- Configure the APPENDS TVF start_timestamp to specify when you
            -- want to start processing data using your continuous query.
            -- This example starts processing at 10 minutes before the current time.
            CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
        WHERE ride_status = 'enroute'
      ),
      STRUCT(
        50 AS max_output_tokens,
        1.0 AS temperature,
        40 AS top_k,
        1.0 AS top_p,
        TRUE AS flatten_json_output))
      AS ml_output
);

修改持續查詢的 SQL

在持續查詢工作執行期間,您無法更新在持續查詢中使用的 SQL。您必須取消持續查詢工作、修改 SQL,然後從原始持續查詢工作停止的時間���開始啟動新的持續查詢工作。

如要修改持續查詢中使用的 SQL,請按照下列步驟操作:

  1. 查看要更新的持續性查詢工作詳細資料,並記下工作 ID。
  2. 盡可能暫停上游資料的收集作業。如果無法執行這項操作,重新啟動持續查詢時,可能會出現資料重複的情形。
  3. 取消要修改的持續查詢
  4. 使用 INFORMATION_SCHEMA JOBS 檢視畫面,取得原始持續查詢工作的 end_time 值:

    SELECT end_time
    FROM `PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
    WHERE
      EXTRACT(DATE FROM creation_time) = current_date()
    AND error_result.reason = 'stopped'
    AND job_id = 'JOB_ID';

    更改下列內容:

    • PROJECT_ID:您的專案 ID。
    • REGION:專案使用的區域。
    • JOB_ID:您在步驟 1 中指定的持續查詢工作 ID。
  5. 修改持續查詢 SQL 陳述式,使用您在步驟 5 中擷取的 end_time 值做為起始點,從特定時間點開始執行持續查詢

  6. 修改持續查詢 SQL 陳述式,反映所需變更。

  7. 執行經過修改的持續查詢。

取消持續查詢

您可以取消持續查詢工作,方法和取消其他工作相同。工作取消後,查詢可能需要最多一分鐘的時間才會停止執行。

如果您取消查詢後再重新啟動,重新啟動的查詢會像���新的���������詢一���運作。重新啟動的查詢不會開始處理先前作業停止的資料,也無法參照先前查詢的結果。請參閱「從特定時間點開始執行持續查詢」一文。

監控查詢並處理錯誤

連續查詢可能會因資料不一致、結構定義變更、服務暫時中斷或維護等因素而中斷。雖然 BigQuery 會處理某些暫時性錯誤,但改善工作彈性的方法包括:

後續步驟