建立持續查詢
本文件說明如何在 BigQuery 中執行持續查詢。
BigQuery 持續查詢是會持續執行的 SQL 陳述式,持續查詢可讓您即時在 BigQuery 中分析傳入的資料,然後將結果匯出至 Bigtable 或 Pub/Sub,或將結果寫入 BigQuery 資料表。
選擇帳戶類型
您可以使用使用者帳戶建立及執行持續查詢工作,也可以使用使用者帳戶建立持續查詢工作,然後使用服務帳戶執行該工作。您必須使用服務帳戶執行持續查詢,將結果匯出至 Pub/Sub 主題。
使用使用者帳戶時,持續性查詢最多可執行兩天。使用服務帳戶時,持續查詢最多可執行 150 天。詳情請參閱「授權」。
所需權限
本節說明建立及執行持續性查詢所需的權限。除了上述提到的身分與存取權管理 (IAM) 角色,您也可以透過自訂角色取得必要權限。
使用使用者帳戶時的權限
本節提供使用者帳戶建立及執行持續查詢時,所需的角色和權限相關資訊。
如要在 BigQuery 中建立工作,使用者帳戶必須具備 bigquery.jobs.create
IAM 權限。以下各項 IAM 角色會授予 bigquery.jobs.create
權限:
- BigQuery 使用者 (
roles/bigquery.user
) - BigQuery 作业使用者 (
roles/bigquery.jobUser
) - BigQuery 管理員 (
roles/bigquery.admin
)
如要從 BigQuery 資料表匯出資料,使用者帳戶必須具備 bigquery.tables.export
IAM 權限。以下各項 IAM 角色都會授予 bigquery.tables.export
權限:
- BigQuery 資料檢視器 (
roles/bigquery.dataViewer
) - BigQuery 資料編輯器 (
roles/bigquery.dataEditor
) - BigQuery 資料擁有者 (
roles/bigquery.dataOwner
) - BigQuery 管理員 (
roles/bigquery.admin
)
如要更新 BigQuery 資料表中的資料,使用者帳戶必須具備 bigquery.tables.updateData
IAM 權限。以下各項 IAM 角色都會授予 bigquery.tables.updateData
權限:
- BigQuery 資料編輯器 (
roles/bigquery.dataEditor
) - BigQuery 資料擁有者 (
roles/bigquery.dataOwner
) - BigQuery 管理員 (
roles/bigquery.admin
)
如果使用者帳戶必須啟用持續查詢用途所需的 API,則該帳戶必須具備「服務用量管理員 (roles/serviceusage.serviceUsageAdmin
)」角色。
使用服務帳戶時的權限
本節提供有關角色和權限的資訊,這些角色和權限是建立持續查詢的使用者帳戶,以及執行持續查詢的服務帳戶所需的。
使用者帳戶權限
如要在 BigQuery 中建立工作,使用者帳戶必須具備 bigquery.jobs.create
IAM 權限。以下各項 IAM 角色都會授予 bigquery.jobs.create
權限:
- BigQuery 使用者 (
roles/bigquery.user
) - BigQuery 作业使用者 (
roles/bigquery.jobUser
) - BigQuery 管理員 (
roles/bigquery.admin
)
如要提交使用服務帳戶執行的工作,使用者帳戶必須具備「服務帳戶使用者」(roles/iam.serviceAccountUser
) 角色。如果您使用相同的使用者帳戶建立服務帳戶,則該使用者帳戶必須具備「服務帳戶管理員 (roles/iam.serviceAccountAdmin
)」角色。如要瞭解如何限制使用者對單一服務帳戶的存取權,而非專案中的所有服務帳戶,請參閱「授予單一角色」一文。
如果使用者帳戶必須啟用持續查詢用途所需的 API,則該帳戶必須具備「服務用量管理員 (roles/serviceusage.serviceUsageAdmin
)」角色。
服務帳戶權限
如要從 BigQuery 資料表匯出資料,服務帳戶必須具備 bigquery.tables.export
IAM 權限。以下各項 IAM 角色都會授予 bigquery.tables.export
權限:
- BigQuery 資料檢視器 (
roles/bigquery.dataViewer
) - BigQuery 資料編輯器 (
roles/bigquery.dataEditor
) - BigQuery 資料擁有者 (
roles/bigquery.dataOwner
) - BigQuery 管理員 (
roles/bigquery.admin
)
bigquery.tables.updateData
IAM 權限。以下各項 IAM 角色都會授予 bigquery.tables.updateData
權限:
- BigQuery 資料編輯器 (
roles/bigquery.dataEditor
) - BigQuery 資料擁有者 (
roles/bigquery.dataOwner
) - BigQuery 管理員 (
roles/bigquery.admin
)
事前準備
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the BigQuery API.
建立保留項目
建立 Enterprise 或 Enterprise Plus 版本保留項目,然後使用 CONTINUOUS
工作類型建立保留項目指派。此預留項目可使用自動調整資源配置。有預留限制適用於持續查詢的預留指派。
匯出至 Pub/Sub
您必須具備額外的 API、身分與存取權管理權限和 Google Cloud 資源,才能將資料匯出至 Pub/Sub。詳情請參閱「匯出至 Pub/Sub」。
在 Pub/Sub 訊息中將自訂屬性嵌入為中繼資料
您可以使用 Pub/Sub 屬性提供訊息的其他資訊,例如優先順序、來源、目的地或其他中繼資料。您也可以使用屬性篩選訂閱項目中的訊息。
在連續查詢結果中,如果資料欄名為 _ATTRIBUTES
,則其值會複製到 Pub/Sub 訊息屬性。_ATTRIBUTES
中提供的欄位會用做屬性鍵。
_ATTRIBUTES
欄必須是 JSON
類型,格式為 ARRAY<STRUCT<STRING, STRING>>
或 STRUCT<STRING>
。
如需範例,請參閱將資料匯出至 Pub/Sub 主題。
匯出至 Bigtable
您必須具備其他 API、IAM 權限和 Google Cloud資源,才能將資料匯出至 Bigtable。詳情請參閱「匯出至 Bigtable」。
將資料寫入 BigQuery 資料表
您可以使用 INSERT
陳述式,將資料寫入 BigQuery 資料表。
使用 AI 函式
您需要額外的 API、IAM 權限和 Google Cloud資源,才能在持續查詢中使用支援的 AI 函式。如需詳細資訊,請根據您的用途參閱下列任一主題:
- 使用
ML.GENERATE_TEXT
函式產生文字 - 使用
ML.GENERATE_EMBEDDING
函式產生文字嵌入 - 使用
ML.UNDERSTAND_TEXT
函式瞭解文字 - 使用
ML.TRANSLATE
函式翻譯文字
在持續查詢中使用 AI 函式時,請考量查詢輸出內容是否會維持在函式的配額範圍內。如果超出配額,您可能必須個別處理未處理的記錄。
指定起點
您必須在持續性查詢的 FROM
子句中使用 APPENDS
函式,���定���處理的最���資料。舉例來說,APPENDS(TABLE my_table, CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
會指示 BigQuery 處理在持續查詢開始前最多 10 分鐘內新增至資料表 my_table
的資料。新增至 my_table
的資料會在收到時處理。資料處理作業不會有延遲時間。在持續查詢中使用 APPENDS
函式時,請勿為其提供 end_timestamp
引數。
以下範例說明如何在查詢接收串流計程車資訊的 BigQuery 表格時,使用 APPENDS
函式,從特定時間點開始執行持續查詢:
EXPORT DATA OPTIONS (format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING(STRUCT(ride_id, timestamp, latitude, longitude)) AS message FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to specify when you want to -- start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'enroute');
指定早於目前時間的起始時間
如果您想處理目前時間點之前的資料,可以使用 APPENDS
函式指定查詢的較早起始點。您指定的起點必須落在所選資料表的時間回溯期內。時間回溯期預設為過去七天。
如要納入時間回溯期間以外的資料,請使用標準查詢,插入或匯出特定時間點之前的資料,然後從該時間點開始執行持續查詢。
範例
以下範例說明如何從 BigQuery 表格載入舊資料,該表格會接收計程車行程資訊,直到特定時間點為止,然後從舊資料的截止時間開始執行持續查詢。
執行標準查詢,將資料回填至特定時間點:
INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides` SELECT timestamp, meter_reading, ride_status, passenger_count, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance, SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger FROM `myproject.real_time_taxi_streaming.taxirides` -- Include all data inserted into the table up to this point in time. -- This timestamp must be within the time travel window. FOR SYSTEM_TIME AS OF '2025-01-01 00:00:00 UTC' WHERE ride_status = 'dropoff';
從查詢停止的時間點開始執行持續查詢:
INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides` SELECT timestamp, meter_reading, ride_status, passenger_count, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance, SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to start processing -- data right where the batch query left off. -- This timestamp must be within the time travel window. TIMESTAMP '2025-01-01 00:00:00 UTC' + INTERVAL 1 MICROSECOND) WHERE ride_status = 'dropoff';
使用使用者帳戶執行持續查詢
本節說明如何使用使用者帳戶執行持續查詢。持續性查詢執行後,您可以關閉 Google Cloud 主控台、終端機視窗或應用程式,而不會中斷查詢執行作業。使用者帳戶執行的持續性查詢最多可執行兩天,之後就會自動停止。如要繼續處理傳入的新資料,請啟動新的持續查詢並指定起始點。如要自動執行這項程序,請參閱「重試失敗的查詢」一文。
請按照下列步驟執行持續查詢:
主控台
前往 Google Cloud 控制台的「BigQuery」頁面。
在查詢編輯器中,按一下
「更多」。- 在「Choose query mode」部分,選擇「Continuous query」。
- 按一下「確認」。
- 選用步驟:如要控制查詢執行的時間長度,請按一下「查詢設定」,然後以毫秒為單位設定「工作逾時時間」。
在查詢編輯器中,輸入持續查詢的 SQL 陳述式。SQL 陳述式只能包含支援的運算。
按一下「執行」。
bq
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
在 Cloud Shell 中,使用
bq query
指令搭配--continuous
旗標,執行持續查詢:bq query --use_legacy_sql=false --continuous=true 'QUERY'
將
QUERY
替換為持續查詢的 SQL 陳述式。SQL 陳述式只能包含支援的作業。您可以使用--job_timeout_ms
標記控制查詢執行的時間長度。
API
呼叫 jobs.insert
方法執行持續查詢。您必須在傳入的 Job
資源的 JobConfigurationQuery
中,將 continuous
欄位設為 true
。您可以選擇設定 jobTimeoutMs
欄位,藉此控制查詢執行的時間長度。
curl --request POST \ "https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs" \ --header "Authorization: Bearer $(gcloud auth print-access-token)" \ --header "Content-Type: application/json; charset=utf-8" \ --data '{"configuration":{"query":{"query":"QUERY","useLegacySql":false,"continuous":true}}}' \ --compressed
更改下列內容:
PROJECT_ID
:您的專案 ID。QUERY
:持續性查詢的 SQL 陳述式。SQL 陳述式只能包含支援的運算。
使用服務帳戶執行持續查詢
本節說明如何使用服務帳戶執行持續查詢。���續性查詢執行後,您可以關閉 Google Cloud 主控台、終端機視窗或應用程式,而不會中斷查詢執行作業。使用服務帳戶執行的持續查詢最多可執行 150 天,然後自動停止。如要繼續處理傳入的新資料,請啟動新的持續查詢並指定起始點。如要自動執行這項程序,請參閱「重試失敗的查詢」一文。
如要使用服務帳戶執行持續查詢,請按照下列步驟操作:
主控台
bq
- 建立服務帳戶。
- 授予服務帳戶所需的權限。
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
在指令列上,使用
bq query
指令搭配下列標記,執行持續查詢:- 將
--continuous
旗標設為true
,即可讓查詢持續執行。 - 使用
--connection_property
標記指定要使用的服務帳戶。 - 選用步驟:設定
--job_timeout_ms
旗標,限制查詢執行時間。
bq query --project_id=PROJECT_ID --use_legacy_sql=false \ --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \ 'QUERY'
更改下列內容:
- 將
API
- 建立服務帳戶。
- 授予服務帳戶所需的權限。
呼叫
jobs.insert
方法執行持續查詢。在您傳入的Job
資源中,設定JobConfigurationQuery
資源的下列欄位:- 將
continuous
欄位設為true
,即可讓查詢持續執行。 - 使用
connectionProperties
欄位指定要使用的服務帳戶。
您可以選擇在
JobConfiguration
資源中設定jobTimeoutMs
欄位,藉此控制查詢執行的時間長度。curl --request POST \ "https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs" \ --header "Authorization: Bearer $(gcloud auth print-access-token)" \ --header "Content-Type: application/json; charset=utf-8" \ --data '{"configuration":{"query":{"query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":[{"key":"service_account","value":"SERVICE_ACCOUNT_EMAIL"}]}}}' \ --compressed
更改下列內容:
- 將
建立自訂工作 ID
每個查詢工作都會指派工作 ID,您可以使用該 ID 搜尋及管理工作。根據預設,系統會隨機產生工作 ID。如要透過工作記錄或工作探索器,更輕鬆地搜尋持續查詢的工作 ID,您可以指派自訂工作 ID 前置字串:
前往 Google Cloud 控制台的「BigQuery」頁面。
在查詢編輯器中,按一下「更多」。
在「Choose query mode」部分,選擇「Continuous query」。
按一下「確認」。
在查詢編輯器中,依序點選「更多」>「查詢設定」。
在「自訂工作 ID 前置字串」部分,輸入自訂名稱前置字串。
按一下 [儲存]。
範例
以下 SQL 範例說明持續查詢的常見用途。
將資料匯出至 Pub/Sub 主題
以下範例顯示持續性查詢,可從接收計程車乘車資訊串流的 BigQuery 資料表篩選資料,並將資料以訊息屬性即時發布至 Pub/Sub 主題:
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING( STRUCT( ride_id, timestamp, latitude, longitude)) AS message, TO_JSON( STRUCT( CAST(passenger_count AS STRING) AS passenger_count)) AS _ATTRIBUTES FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`, -- Configure the APPENDS TVF start_timestamp to specify when you want to -- start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'enroute' );
將資料匯出至 Bigtable 資料表
以下範例顯示持續查詢,可從接收串流計程車行程資訊的 BigQuery 資料表篩選資料,並即時將資料匯出至 Bigtable 資料表:
EXPORT DATA OPTIONS ( format = 'CLOUD_BIGTABLE', truncate = TRUE, overwrite = TRUE, uri = 'https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides') AS ( SELECT CAST(CONCAT(ride_id, timestamp, latitude, longitude) AS STRING) AS rowkey, STRUCT( timestamp, latitude, longitude, meter_reading, ride_status, passenger_count) AS features FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to specify when you want to -- start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'enroute' );
將資料寫入 BigQuery 資料表
下列範例顯示持續查詢,可從接收串流計程車行程資訊的 BigQuery 資料表中篩選及轉換資料,然後即時將資料寫入另一個 BigQuery 表格。這樣一來,資料就能用於進一步的下游分析。
INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides` SELECT timestamp, meter_reading, ride_status, passenger_count, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance, SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to specify when you want to -- start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'dropoff';
使用 Vertex AI 模型處理資料
以下範例顯示持續性查詢,該查詢會使用 Vertex AI 模型,根據乘客目前的緯度和經度為他們產生廣告,然後將結果即時匯出至 Pub/Sub 主題:
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING( STRUCT( ride_id, timestamp, latitude, longitude, prompt, ml_generate_text_llm_result)) AS message FROM ML.GENERATE_TEXT( MODEL `myproject.real_time_taxi_streaming.taxi_ml_generate_model`, ( SELECT timestamp, ride_id, latitude, longitude, CONCAT( 'Generate an ad based on the current latitude of ', latitude, ' and longitude of ', longitude) AS prompt FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to specify when you -- want to start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'enroute' ), STRUCT( 50 AS max_output_tokens, 1.0 AS temperature, 40 AS top_k, 1.0 AS top_p, TRUE AS flatten_json_output)) AS ml_output );
修改持續查詢的 SQL
在持續查詢工作執行期間,您無法更新在持續查詢中使用的 SQL。您必須取消持續查詢工作、修改 SQL,然後從原始持續查詢工作停止的時間���開始啟動新的持續查詢工作。
如要修改持續查詢中使用的 SQL,請按照下列步驟操作:
- 查看要更新的持續性查詢工作詳細資料,並記下工作 ID。
- 盡可能暫停上游資料的收集作業。如果無法執行這項操作,重新啟動持續查詢時,可能會出現資料重複的情形。
- 取消要修改的持續查詢。
使用
INFORMATION_SCHEMA
JOBS
檢視畫面,取得原始持續查詢工作的end_time
值:SELECT end_time FROM `PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECT WHERE EXTRACT(DATE FROM creation_time) = current_date() AND error_result.reason = 'stopped' AND job_id = 'JOB_ID';
更改下列內容:
PROJECT_ID
:您的專案 ID。REGION
:專案使用的區域。JOB_ID
:您在步驟 1 中指定的持續查詢工作 ID。
修改持續查詢 SQL 陳述式,使用您在步驟 5 中擷取的
end_time
值做為起始點,從特定時間點開始執行持續查詢。修改持續查詢 SQL 陳述式,反映所需變更。
執行經過修改的持續查詢。
取消持續查詢
您可以取消持續查詢工作,方法和取消其他工作相同。工作取消後,查詢可能需要最多一分鐘的時間才會停止執行。
如果您取消查詢後再重新啟動,重新啟動的查詢會像���新的���������詢一���運作。重新啟動的查詢不會開始處理先前作業停止的資料,也無法參照先前查詢的結果。請參閱「從特定時間點開始執行持續查詢」一文。
監控查詢並處理錯誤
連續查詢可能會因資料不一致、結構定義變更、服務暫時中斷或維護等因素而中斷。雖然 BigQuery 會處理某些暫時性錯誤,但改善工作彈性的方法包括: