Memantau kueri berkelanjutan

Anda dapat memantau kueri berkelanjutan BigQuery menggunakan alat BigQuery berikut:

Karena sifat kueri berkelanjutan BigQuery yang berjalan lama, metrik yang biasanya dihasilkan setelah kueri SQL selesai mungkin tidak ada atau tidak akurat.

Menggunakan tampilan INFORMATION_SCHEMA

Anda dapat menggunakan sejumlah tampilan INFORMATION_SCHEMA untuk memantau kueri berkelanjutan dan reservasi kueri berkelanjutan.

Melihat detail tugas

Anda dapat menggunakan tampilan JOBS untuk mendapatkan metadata tugas kueri berkelanjutan.

Kueri berikut menampilkan metadata untuk semua kueri berkelanjutan yang aktif. Metadata mencakup stempel waktu watermark output, yang mewakili titik hingga kueri berkelanjutan berhasil memproses data.

  1. Di Google Cloud konsol, buka halaman BigQuery.

    Buka BigQuery

  2. Di editor kueri, jalankan kueri berikut:

    SELECT
      start_time,
      job_id,
      user_email,
      query,
      state,
      reservation_id,
      continuous_query_info.output_watermark
    FROM `PROJECT_ID.region-REGION.INFORMATION_SCHEMA.JOBS`
    WHERE
      creation_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 day)
      AND continuous IS TRUE
      AND state = "RUNNING"
    ORDER BY
      start_time DESC

    Ganti kode berikut:

Melihat detail penetapan reservasi

Anda dapat menggunakan tampilan ASSIGNMENTS dan RESERVATIONS untuk mendapatkan detail penetapan reservasi kueri berkelanjutan.

Menampilkan detail penetapan reservasi untuk kueri berkelanjutan:

  1. Di Google Cloud konsol, buka halaman BigQuery.

    Buka BigQuery

  2. Di editor kueri, jalankan kueri berikut:

    SELECT
      reservation.reservation_name,
      reservation.slot_capacity
    FROM
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.ASSIGNMENTS`
        AS assignment
    INNER JOIN
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.RESERVATIONS`
        AS reservation
      ON (assignment.reservation_name = reservation.reservation_name)
    WHERE
      assignment.assignee_id = 'PROJECT_ID'
      AND job_type = 'CONTINUOUS';

    Ganti kode berikut:

    • ADMIN_PROJECT_ID: ID project administrasi yang memiliki pemesanan.
    • LOCATION: lokasi pemesanan.
    • PROJECT_ID: ID project yang ditetapkan ke pemesanan. Hanya informasi tentang kueri berkelanjutan yang berjalan di project ini yang ditampilkan.

Melihat informasi penggunaan slot

Anda dapat menggunakan tampilan ASSIGNMENTS, RESERVATIONS, dan JOBS_TIMELINE untuk mendapatkan informasi penggunaan slot kueri berkelanjutan.

Menampilkan informasi penggunaan slot untuk kueri berkelanjutan:

  1. Di Google Cloud konsol, buka halaman BigQuery.

    Buka BigQuery

  2. Di editor kueri, jalankan kueri berikut:

    SELECT
      jobs.period_start,
      reservation.reservation_name,
      reservation.slot_capacity,
      SUM(jobs.period_slot_ms) / 1000 AS consumed_total_slots
    FROM
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.ASSIGNMENTS`
        AS assignment
    INNER JOIN
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.RESERVATIONS`
        AS reservation
      ON (assignment.reservation_name = reservation.reservation_name)
    INNER JOIN
      `PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.JOBS_TIMELINE` AS jobs
      ON (
        UPPER(CONCAT('ADMIN_PROJECT_ID:LOCATION.', assignment.reservation_name))
        = UPPER(jobs.reservation_id))
    WHERE
      assignment.assignee_id = 'PROJECT_ID'
      AND assignment.job_type = 'CONTINUOUS'
      AND jobs.period_start
        BETWEEN TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
        AND CURRENT_TIMESTAMP()
    GROUP BY 1, 2, 3
    ORDER BY jobs.period_start DESC;

    Ganti kode berikut:

    • ADMIN_PROJECT_ID: ID project administrasi yang memiliki pemesanan.
    • LOCATION: lokasi pemesanan.
    • PROJECT_ID: ID project yang ditetapkan ke pemesanan. Hanya informasi tentang kueri berkelanjutan yang berjalan di project ini yang ditampilkan.

Anda juga dapat memantau pemesanan kueri berkelanjutan menggunakan alat lain seperti Metrics Explorer dan diagram resource administratif. Untuk informasi selengkapnya, lihat Memantau reservasi BigQuery.

Menggunakan grafik eksekusi kueri

Anda dapat menggunakan grafik eksekusi kueri untuk mendapatkan insight performa dan statistik umum untuk kueri berkelanjutan. Untuk mengetahui informasi selengkapnya, lihat Melihat insight performa kueri.

Melihat histori tugas

Anda dapat melihat detail tugas kueri berkelanjutan di histori tugas pribadi atau histori tugas project. Untuk informasi selengkapnya, lihat Melihat detail lowongan.

Perhatikan bahwa daftar historis tugas diurutkan berdasarkan waktu mulai tugas, sehingga kueri berkelanjutan yang telah berjalan selama beberapa waktu mungkin tidak dekat dengan awal daftar.

Menggunakan penjelajah tugas administratif

Di penjelajah tugas administratif, filter tugas Anda untuk menampilkan kueri berkelanjutan dengan menetapkan filter Kategori tugas ke Kueri berkelanjutan.

Menggunakan Cloud Monitoring

Anda dapat melihat metrik khusus untuk kueri berkelanjutan BigQuery dengan menggunakan Cloud Monitoring. Untuk informasi selengkapnya, lihat Membuat dasbor, diagram, dan pemberitahuan dan baca tentang metrik yang tersedia untuk visualisasi.

Notifikasi tentang kueri yang gagal

Daripada memeriksa secara rutin apakah kueri berkelanjutan Anda gagal, sebaiknya buat pemberitahuan untuk memberi tahu Anda tentang kegagalan. Salah satu cara untuk melakukannya adalah dengan membuat metrik berbasis log Cloud Logging kustom dengan filter untuk tugas Anda, dan kebijakan pemberitahuan Cloud Monitoring berdasarkan metrik tersebut:

  1. Saat Anda membuat kueri berkelanjutan, gunakan awalan ID tugas kustom. Beberapa kueri berkelanjutan dapat memiliki awalan yang sama. Misalnya, Anda dapat menggunakan awalan prod- untuk menunjukkan kueri produksi.
  2. Di Google Cloud console, buka halaman Log-based Metrics.

    Buka Log-based Metrics

  3. Klik Create metric. Panel Create logs metric akan muncul.

  4. Untuk Jenis metrik, pilih Penghitung.

  5. Di bagian Details, beri nama metrik Anda. Contoh, CUSTOM_JOB_ID_PREFIX-metric.

  6. Di bagian Filter selection, masukkan perintah berikut ke dalam editor Build filter:

    resource.type = "bigquery_project"
    protoPayload.resourceName : "projects/PROJECT_ID/jobs/CUSTOM_JOB_ID_PREFIX"
    severity = ERROR
    

    Ganti kode berikut:

    • PROJECT_ID: nama project Anda.
    • CUSTOM_JOB_ID_PREFIX: nama awalan ID tugas kustom yang Anda tetapkan untuk kueri berkelanjutan.
  7. Klik Create metric.

  8. Di menu navigasi, klik Metrik berbasis log. Metrik yang baru saja Anda buat akan muncul dalam daftar metrik yang ditentukan pengguna.

  9. Di baris metrik, klik More actions, lalu klik Create alert from metric.

  10. Klik Berikutnya. Anda tidak perlu mengubah setelan default di halaman Mode konfigurasi kebijakan.

  11. Klik Berikutnya. Anda tidak perlu mengubah setelan default di halaman Konfigurasi pemicu notifikasi.

  12. Pilih saluran notifikasi dan masukkan nama untuk kebijakan pemberitahuan.

  13. Klik Create policy.

Anda dapat menguji pemberitahuan dengan menjalankan kueri berkelanjutan dengan awalan ID tugas kustom yang Anda pilih, lalu membatalkannya. Mungkin perlu waktu beberapa menit hingga pemberitahuan mencapai saluran notifikasi Anda.

Mencoba lagi kueri yang gagal

Mencoba ulang kueri berkelanjutan yang gagal dapat membantu menghindari situasi saat pipeline berkelanjutan tidak berfungsi selama jangka waktu yang lama atau memerlukan intervensi manusia untuk memulai ulang. Hal-hal penting yang perlu dipertimbangkan saat Anda mencoba lagi kueri berkelanjutan yang gagal meliputi hal berikut:

  • Apakah pemrosesan ulang sejumlah data yang diproses oleh kueri sebelumnya sebelum gagal dapat ditoleransi.
  • Cara menangani pembatasan percobaan ulang atau menggunakan backoff eksponensial.

Salah satu pendekatan yang dapat dilakukan untuk mengotomatiskan percobaan ulang kueri adalah sebagai berikut:

  1. Buat sink Cloud Logging berdasarkan filter penyertaan yang cocok dengan kriteria berikut untuk merutekan log ke topik Pub/Sub:

    resource.type = "bigquery_project"
    protoPayload.resourceName : "projects/PROJECT_ID/jobs/CUSTOM_JOB_ID_PREFIX"
    severity = ERROR
    

    Ganti kode berikut:

    • PROJECT_ID: nama project Anda.
    • CUSTOM_JOB_ID_PREFIX: nama awalan ID tugas kustom yang Anda tetapkan untuk kueri berkelanjutan.
  2. Buat fungsi Cloud Run yang dipicu sebagai respons terhadap log penerimaan Pub/Sub yang cocok dengan filter Anda.

    Fungsi Cloud Run dapat menerima payload data dari pesan Pub/Sub dan mencoba memulai kueri berkelanjutan baru menggunakan sintaksis SQL yang sama dengan kueri yang gagal, tetapi di awal tepat setelah tugas sebelumnya dihentikan.

Misalnya, Anda dapat menggunakan fungsi yang mirip dengan berikut:

Python

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di Panduan memulai BigQuery menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat Dokumentasi referensi BigQuery Python API.

Untuk melakukan autentikasi ke BigQuery, siapkan Kredensial Default Aplikasi. Untuk informasi selengkapnya, lihat Menyiapkan autentikasi untuk library klien.

import base64
import json
import logging
import re
import uuid

import google.auth
import google.auth.transport.requests
import requests


def retry_continuous_query(event, context):
    logging.info("Cloud Function started.")

    if "data" not in event:
        logging.info("No data in Pub/Sub message.")
        return

    try:
        # Decode and parse the Pub/Sub message data
        log_entry = json.loads(base64.b64decode(event["data"]).decode("utf-8"))

        # Extract the SQL query and other necessary data
        proto_payload = log_entry.get("protoPayload", {})
        metadata = proto_payload.get("metadata", {})
        job_change = metadata.get("jobChange", {})
        job = job_change.get("job", {})
        job_config = job.get("jobConfig", {})
        query_config = job_config.get("queryConfig", {})
        sql_query = query_config.get("query")
        job_stats = job.get("jobStats", {})
        end_timestamp = job_stats.get("endTime")
        failed_job_id = job.get("jobName")

        # Check if required fields are missing
        if not all([sql_query, failed_job_id, end_timestamp]):
            logging.error("Required fields missing from log entry.")
            return

        logging.info(f"Retrying failed job: {failed_job_id}")

        # Adjust the timestamp in the SQL query
        timestamp_match = re.search(
            r"\s*TIMESTAMP\(('.*?')\)(\s*\+ INTERVAL 1 MICROSECOND)?", sql_query
        )

        if timestamp_match:
            original_timestamp = timestamp_match.group(1)
            new_timestamp = f"'{end_timestamp}'"
            sql_query = sql_query.replace(original_timestamp, new_timestamp)
        elif "CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE" in sql_query:
            new_timestamp = f"TIMESTAMP('{end_timestamp}') + INTERVAL 1 MICROSECOND"
            sql_query = sql_query.replace(
                "CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE", new_timestamp
            )

        # Get access token
        credentials, project = google.auth.default(
            scopes=["https://www.googleapis.com/auth/cloud-platform"]
        )
        request = google.auth.transport.requests.Request()
        credentials.refresh(request)
        access_token = credentials.token

        # API endpoint
        url = f"https://bigquery.googleapis.com/bigquery/v2/projects/{project}/jobs"

        # Request headers
        headers = {
            "Authorization": f"Bearer {access_token}",
            "Content-Type": "application/json",
        }

        # Generate a random UUID
        random_suffix = str(uuid.uuid4())[:8]  # Take the first 8 characters of the UUID

        # Combine the prefix and random suffix
        job_id = f"CUSTOM_JOB_ID_PREFIX{random_suffix}"

        # Request payload
        data = {
            "configuration": {
                "query": {
                    "query": sql_query,
                    "useLegacySql": False,
                    "continuous": True,
                    "connectionProperties": [
                        {"key": "service_account", "value": "SERVICE_ACCOUNT"}
                    ],
                    # ... other query parameters ...
                },
                "labels": {"bqux_job_id_prefix": "CUSTOM_JOB_ID_PREFIX"},
            },
            "jobReference": {
                "projectId": project,
                "jobId": job_id,  # Use the generated job ID here
            },
        }

        # Make the API request
        response = requests.post(url, headers=headers, json=data)

        # Handle the response
        if response.status_code == 200:
            logging.info("Query job successfully created.")
        else:
            logging.error(f"Error creating query job: {response.text}")

    except Exception as e:
        logging.error(
            f"Error processing log entry or retrying query: {e}", exc_info=True
        )

    logging.info("Cloud Function finished.")

Langkah berikutnya