Kontinuierliche Abfragen überwachen

Sie können kontinuierliche BigQuery-Abfragen mit den folgenden BigQuery-Tools überwachen:

Da eine kontinuierliche BigQuery-Abfrage sehr lange andauert, fehlen Messwerte, die normalerweise nach Abschluss einer SQL-Abfrage generiert werden, eventuell nicht oder sind ungenau.

INFORMATION_SCHEMA-Ansichten verwenden

Sie können eine Reihe von INFORMATION_SCHEMA-Ansichten verwenden, um kontinuierliche Abfragen und kontinuierliche Abfragereservierungen zu überwachen.

Auftragsdetails aufrufen

Mit der Ansicht JOBS können Sie Metadaten des kontinuierlichen Abfragejobs abrufen.

Die folgende Abfrage gibt die Metadaten für alle aktiven kontinuierlichen Abfragen zurück. Die Metadaten enthalten den Zeitstempel des Ausgabewasserzeichens, der den Punkt angibt, bis zu dem die kontinuierliche Abfrage Daten erfolgreich verarbeitet hat.

  1. Rufen Sie in der Google Cloud Console die Seite BigQuery auf.

    BigQuery aufrufen

  2. Führen Sie im Abfrageeditor folgende Abfrage aus:

    SELECT
      start_time,
      job_id,
      user_email,
      query,
      state,
      reservation_id,
      continuous_query_info.output_watermark
    FROM `PROJECT_ID.region-REGION.INFORMATION_SCHEMA.JOBS`
    WHERE
      creation_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 day)
      AND continuous IS TRUE
      AND state = "RUNNING"
    ORDER BY
      start_time DESC

    Ersetzen Sie Folgendes:

    • PROJECT_ID: die ID des Projekts.
    • REGION: ist ein beliebiger Dataset-Regionsname. Beispiel: region-us

Details zur Reservierungszuweisung ansehen

Mit den Ansichten ASSIGNMENTS und RESERVATIONS können Sie Details zur Zuweisung von kontinuierlichen Abfragereservierungen abrufen.

Geben Sie die Details der Reservierungszuweisung für kontinuierliche Abfragen zurück:

  1. Rufen Sie in der Google Cloud Console die Seite BigQuery auf.

    BigQuery aufrufen

  2. Führen Sie im Abfrageeditor folgende Abfrage aus:

    SELECT
      reservation.reservation_name,
      reservation.slot_capacity
    FROM
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.ASSIGNMENTS`
        AS assignment
    INNER JOIN
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.RESERVATIONS`
        AS reservation
      ON (assignment.reservation_name = reservation.reservation_name)
    WHERE
      assignment.assignee_id = 'PROJECT_ID'
      AND job_type = 'CONTINUOUS';

    Ersetzen Sie dabei Folgendes:

    • ADMIN_PROJECT_ID: Die ID des Administrationsprojekts, dem die Reservierung gehört.
    • LOCATION: Der Standort der Reservierung.
    • PROJECT_ID: die ID des Projekts, das der Reservierung zugewiesen ist. Es werden nur Informationen zu kontinuierlichen Abfragen zurückgegeben, die in diesem Projekt ausgeführt werden.

Informationen zum Slotverbrauch aufrufen

Sie können die Ansichten ASSIGNMENTS, RESERVATIONS und JOBS_TIMELINE verwenden, um kontinuierliche Informationen zum Verbrauch von Abfrage-Slots zu erhalten.

Informationen zum Slot-Verbrauch für kontinuierliche Abfragen zurückgeben:

  1. Rufen Sie in der Google Cloud Console die Seite BigQuery auf.

    BigQuery aufrufen

  2. Führen Sie im Abfrageeditor folgende Abfrage aus:

    SELECT
      jobs.period_start,
      reservation.reservation_name,
      reservation.slot_capacity,
      SUM(jobs.period_slot_ms) / 1000 AS consumed_total_slots
    FROM
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.ASSIGNMENTS`
        AS assignment
    INNER JOIN
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.RESERVATIONS`
        AS reservation
      ON (assignment.reservation_name = reservation.reservation_name)
    INNER JOIN
      `PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.JOBS_TIMELINE` AS jobs
      ON (
        UPPER(CONCAT('ADMIN_PROJECT_ID:LOCATION.', assignment.reservation_name))
        = UPPER(jobs.reservation_id))
    WHERE
      assignment.assignee_id = 'PROJECT_ID'
      AND assignment.job_type = 'CONTINUOUS'
      AND jobs.period_start
        BETWEEN TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
        AND CURRENT_TIMESTAMP()
    GROUP BY 1, 2, 3
    ORDER BY jobs.period_start DESC;

    Ersetzen Sie dabei Folgendes:

    • ADMIN_PROJECT_ID: Die ID des Administrationsprojekts, dem die Reservierung gehört.
    • LOCATION: Der Standort der Reservierung.
    • PROJECT_ID: die ID des Projekts, das der Reservierung zugewiesen ist. Es werden nur Informationen zu kontinuierlichen Abfragen zurückgegeben, die in diesem Projekt ausgeführt werden.

Sie können kontinuierliche Abfragereservierungen auch mit anderen Tools wie Metrics Explorer und administrativen Ressourcendiagrammen überwachen. Weitere Informationen finden Sie unter BigQuery Reservations überwachen.

Ausführungsgrafik für Abfragen verwenden

Mit der Ausführungsgrafik für Abfragen können Sie Leistungsinformationen und allgemeine Statistiken für eine kontinuierliche Abfrage abrufen. Weitere Informationen finden Sie unter Statistiken zur Abfrageleistung ansehen.

Jobverlauf ansehen

Details zu Jobs mit kontinuierlicher Abfrage können Sie in Ihrem persönlichen Jobverlauf oder im Jobverlauf des Projekts einsehen. Weitere Informationen finden Sie unter Jobdetails ansehen.

Die Verlaufsliste der Jobs wird nach der Startzeit des Jobs sortiert. Daher befinden sich kontinuierliche Abfragen, die bereits seit einer Weile ausgeführt werden, möglicherweise nicht nahe am Anfang der Liste.

Administrativen Job-Explorer verwenden

Filtern Sie im Explorer für administrative Jobs, um fortlaufende Abfragen anzuzeigen. Legen Sie dazu den Filter Jobkategorie auf Fortlaufende Abfrage fest.

Cloud Monitoring verwenden

Mit Cloud Monitoring können Sie Messwerte speziell für kontinuierliche BigQuery-Abfragen aufrufen. Weitere Informationen finden Sie unter Dashboards, Diagramme und Benachrichtigungen erstellen und in der Übersicht der für die Visualisierung verfügbaren Messwerte.

Benachrichtigung bei fehlgeschlagenen Abfragen

Anstatt regelmäßig zu prüfen, ob Ihre fortlaufenden Abfragen fehlgeschlagen sind, kann es hilfreich sein, eine Benachrichtigung zu erstellen, die Sie über einen Fehler informiert. Eine Möglichkeit besteht darin, einen benutzerdefinierten logbasierten Cloud Logging-Messwert mit einem Filter für Ihre Jobs und eine Cloud Monitoring-Benachrichtigungsrichtlinie auf Grundlage dieses Messwerts zu erstellen:

  1. Verwenden Sie beim Erstellen einer fortlaufenden Abfrage ein benutzerdefiniertes Job-ID-Präfix. Mehrere fortlaufende Abfragen können dasselbe Präfix haben. Sie können beispielsweise das Präfix prod- verwenden, um eine Produktionsabfrage anzugeben.
  2. Rufen Sie in der Google Cloud Console die Seite Logbasierte Messwerte auf.

    Zu „Logbasierte Messwerte”

  3. Klicken Sie auf Messwert erstellen. Der Bereich Logmesswert erstellen wird angezeigt.

  4. Wählen Sie als Messwerttyp die Option Zähler aus.

  5. Geben Sie im Bereich Details einen Namen für den Messwert ein. Beispiel: CUSTOM_JOB_ID_PREFIX-metric

  6. Geben Sie im Bereich Filterauswahl im Editor Filter erstellen Folgendes ein:

    resource.type = "bigquery_project"
    protoPayload.resourceName : "projects/PROJECT_ID/jobs/CUSTOM_JOB_ID_PREFIX"
    severity = ERROR
    

    Ersetzen Sie Folgendes:

  7. Klicken Sie auf Messwert erstellen.

  8. Klicken Sie im Navigationsmenü auf Protokollbasierte Messwerte. Der Messwert, den Sie gerade erstellt haben, wird in der Liste der benutzerdefinierten Messwerte angezeigt.

  9. Klicken Sie in der Zeile des Messwerts auf  Weitere Aktionen und dann auf Benachrichtigung aus Messwert erstellen.

  10. Klicken Sie auf Weiter. Sie müssen die Standardeinstellungen auf der Seite Richtlinienkonfigurationsmodus nicht ändern.

  11. Klicken Sie auf Weiter. Sie müssen die Standardeinstellungen auf der Seite Benachrichtigungstrigger konfigurieren nicht ändern.

  12. Wählen Sie Ihre Benachrichtigungskanäle aus und geben Sie einen Namen für die Benachrichtigungsrichtlinie ein.

  13. Klicken Sie auf Richtlinie erstellen.

Sie können die Benachrichtigung testen, indem Sie eine kontinuierliche Abfrage mit dem von Ihnen ausgewählten benutzerdefinierten Job-ID-Präfix ausführen und dann abbrechen. Es kann einige Minuten dauern, bis die Benachrichtigung Ihren Benachrichtigungskanal erreicht.

Fehlgeschlagene Abfragen wiederholen

Wenn Sie eine fehlgeschlagene kontinuierliche Abfrage noch einmal versuchen, können Sie Situationen vermeiden, in denen eine kontinuierliche Pipeline über einen längeren Zeitraum ausfällt oder ein manueller Neustart erforderlich ist. Beachten Sie Folgendes, wenn Sie eine fehlgeschlagene fortlaufende Abfrage noch einmal ausführen:

  • Ob die erneute Verarbeitung einer bestimmten Menge von Daten, die von der vorherigen Abfrage verarbeitet wurden, bevor sie fehlgeschlagen ist, tolerabel ist.
  • Wie Sie Wiederholungsversuche einschränken oder exponentielles Backoff verwenden.

Eine mögliche Vorgehensweise zur Automatisierung des Wiederholungsversuchs einer Abfrage ist die folgende:

  1. Erstellen Sie eine Cloud Logging-Senke basierend auf einem Inklusionsfilter, der den folgenden Kriterien entspricht, um Protokolle an ein Pub/Sub-Thema weiterzuleiten:

    resource.type = "bigquery_project"
    protoPayload.resourceName : "projects/PROJECT_ID/jobs/CUSTOM_JOB_ID_PREFIX"
    severity = ERROR
    

    Ersetzen Sie Folgendes:

  2. Erstellen Sie eine Cloud Run-Funktion, die als Reaktion auf Pub/Sub-Empfangsprotokolle ausgelöst wird, die Ihrem Filter entsprechen.

    Die Cloud Run-Funktion kann die Datennutzlast aus der Pub/Sub-Nachricht akzeptieren und versuchen, eine neue kontinuierliche Abfrage mit derselben SQL-Syntax wie die fehlgeschlagene Abfrage zu starten, aber erst nach dem Beenden des vorherigen Jobs.

Sie können beispielsweise eine Funktion wie die folgende verwenden:

Python

Bevor Sie dieses Beispiel anwenden, folgen Sie den Schritten zur Einrichtung von Python in der BigQuery-Kurzanleitung zur Verwendung von Clientbibliotheken. Weitere Angaben finden Sie in der Referenzdokumentation zur BigQuery Python API.

Richten Sie zur Authentifizierung bei BigQuery die Standardanmeldedaten für Anwendungen ein. Weitere Informationen finden Sie unter Authentifizierung für Clientbibliotheken einrichten.

import base64
import json
import logging
import re
import uuid

import google.auth
import google.auth.transport.requests
import requests


def retry_continuous_query(event, context):
    logging.info("Cloud Function started.")

    if "data" not in event:
        logging.info("No data in Pub/Sub message.")
        return

    try:
        # Decode and parse the Pub/Sub message data
        log_entry = json.loads(base64.b64decode(event["data"]).decode("utf-8"))

        # Extract the SQL query and other necessary data
        proto_payload = log_entry.get("protoPayload", {})
        metadata = proto_payload.get("metadata", {})
        job_change = metadata.get("jobChange", {})
        job = job_change.get("job", {})
        job_config = job.get("jobConfig", {})
        query_config = job_config.get("queryConfig", {})
        sql_query = query_config.get("query")
        job_stats = job.get("jobStats", {})
        end_timestamp = job_stats.get("endTime")
        failed_job_id = job.get("jobName")

        # Check if required fields are missing
        if not all([sql_query, failed_job_id, end_timestamp]):
            logging.error("Required fields missing from log entry.")
            return

        logging.info(f"Retrying failed job: {failed_job_id}")

        # Adjust the timestamp in the SQL query
        timestamp_match = re.search(
            r"\s*TIMESTAMP\(('.*?')\)(\s*\+ INTERVAL 1 MICROSECOND)?", sql_query
        )

        if timestamp_match:
            original_timestamp = timestamp_match.group(1)
            new_timestamp = f"'{end_timestamp}'"
            sql_query = sql_query.replace(original_timestamp, new_timestamp)
        elif "CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE" in sql_query:
            new_timestamp = f"TIMESTAMP('{end_timestamp}') + INTERVAL 1 MICROSECOND"
            sql_query = sql_query.replace(
                "CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE", new_timestamp
            )

        # Get access token
        credentials, project = google.auth.default(
            scopes=["https://www.googleapis.com/auth/cloud-platform"]
        )
        request = google.auth.transport.requests.Request()
        credentials.refresh(request)
        access_token = credentials.token

        # API endpoint
        url = f"https://bigquery.googleapis.com/bigquery/v2/projects/{project}/jobs"

        # Request headers
        headers = {
            "Authorization": f"Bearer {access_token}",
            "Content-Type": "application/json",
        }

        # Generate a random UUID
        random_suffix = str(uuid.uuid4())[:8]  # Take the first 8 characters of the UUID

        # Combine the prefix and random suffix
        job_id = f"CUSTOM_JOB_ID_PREFIX{random_suffix}"

        # Request payload
        data = {
            "configuration": {
                "query": {
                    "query": sql_query,
                    "useLegacySql": False,
                    "continuous": True,
                    "connectionProperties": [
                        {"key": "service_account", "value": "SERVICE_ACCOUNT"}
                    ],
                    # ... other query parameters ...
                },
                "labels": {"bqux_job_id_prefix": "CUSTOM_JOB_ID_PREFIX"},
            },
            "jobReference": {
                "projectId": project,
                "jobId": job_id,  # Use the generated job ID here
            },
        }

        # Make the API request
        response = requests.post(url, headers=headers, json=data)

        # Handle the response
        if response.status_code == 200:
            logging.info("Query job successfully created.")
        else:
            logging.error(f"Error creating query job: {response.text}")

    except Exception as e:
        logging.error(
            f"Error processing log entry or retrying query: {e}", exc_info=True
        )

    logging.info("Cloud Function finished.")

Nächste Schritte