Monitorare le query continue

Puoi monitorare le query continue di BigQuery utilizzando i seguenti strumenti BigQuery:

A causa della natura di esecuzione prolungata di una query continua di BigQuery, le metriche che vengono solitamente generate al termine di una query SQL potrebbero essere assenti o imprecise.

Utilizzare le visualizzazioni INFORMATION_SCHEMA

Puoi utilizzare una serie di visualizzazioni INFORMATION_SCHEMA per monitorare le query continue e le prenotazioni di query continue.

Visualizza i dettagli del job

Puoi utilizzare la vista JOBS per ottenere i metadati dei job di query continua.

La seguente query restituisce i metadati di tutte le query continue attive. I metadati includono il timestamp della filigrana in uscita, che rappresenta il punto fino al quale la query continua ha elaborato correttamente i dati.

  1. Nella console Google Cloud , vai alla pagina BigQuery.

    Vai a BigQuery

  2. Nell'editor di query, esegui la seguente query:

    SELECT
      start_time,
      job_id,
      user_email,
      query,
      state,
      reservation_id,
      continuous_query_info.output_watermark
    FROM `PROJECT_ID.region-REGION.INFORMATION_SCHEMA.JOBS`
    WHERE
      creation_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 day)
      AND continuous IS TRUE
      AND state = "RUNNING"
    ORDER BY
      start_time DESC

    Sostituisci quanto segue:

Visualizzare i dettagli dell'assegnazione della prenotazione

Puoi utilizzare le visualizzazioni ASSIGNMENTS e RESERVATIONS per ottenere dettagli sull'assegnazione delle prenotazioni delle query continue.

Restituisce i dettagli dell'assegnazione della prenotazione per le query continue:

  1. Nella console Google Cloud , vai alla pagina BigQuery.

    Vai a BigQuery

  2. Nell'editor di query, esegui la seguente query:

    SELECT
      reservation.reservation_name,
      reservation.slot_capacity
    FROM
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.ASSIGNMENTS`
        AS assignment
    INNER JOIN
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.RESERVATIONS`
        AS reservation
      ON (assignment.reservation_name = reservation.reservation_name)
    WHERE
      assignment.assignee_id = 'PROJECT_ID'
      AND job_type = 'CONTINUOUS';

    Sostituisci quanto segue:

    • ADMIN_PROJECT_ID: l'ID del progetto di amministrazione proprietario della prenotazione.
    • LOCATION: la località della prenotazione.
    • PROJECT_ID: l'ID del progetto assegnato alla prenotazione. Vengono restituite solo le informazioni sulle query continue in esecuzione in questo progetto.

Visualizzare le informazioni sul consumo degli slot

Puoi utilizzare le visualizzazioni ASSIGNMENTS, RESERVATIONS e JOBS_TIMELINE per ottenere informazioni sul consumo degli slot di query continua.

Restituisce informazioni sul consumo di slot per le query continue:

  1. Nella console Google Cloud , vai alla pagina BigQuery.

    Vai a BigQuery

  2. Nell'editor di query, esegui la seguente query:

    SELECT
      jobs.period_start,
      reservation.reservation_name,
      reservation.slot_capacity,
      SUM(jobs.period_slot_ms) / 1000 AS consumed_total_slots
    FROM
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.ASSIGNMENTS`
        AS assignment
    INNER JOIN
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.RESERVATIONS`
        AS reservation
      ON (assignment.reservation_name = reservation.reservation_name)
    INNER JOIN
      `PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.JOBS_TIMELINE` AS jobs
      ON (
        UPPER(CONCAT('ADMIN_PROJECT_ID:LOCATION.', assignment.reservation_name))
        = UPPER(jobs.reservation_id))
    WHERE
      assignment.assignee_id = 'PROJECT_ID'
      AND assignment.job_type = 'CONTINUOUS'
      AND jobs.period_start
        BETWEEN TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
        AND CURRENT_TIMESTAMP()
    GROUP BY 1, 2, 3
    ORDER BY jobs.period_start DESC;

    Sostituisci quanto segue:

    • ADMIN_PROJECT_ID: l'ID del progetto di amministrazione proprietario della prenotazione.
    • LOCATION: la località della prenotazione.
    • PROJECT_ID: l'ID del progetto assegnato alla prenotazione. Vengono restituite solo le informazioni sulle query continue in esecuzione in questo progetto.

Puoi anche monitorare le prenotazioni di query continue utilizzando altri strumenti come Metrics Explorer e i grafici delle risorse amministrative. Per ulteriori informazioni, consulta Monitorare le prenotazioni BigQuery.

Utilizzare il grafico di esecuzione delle query

Puoi utilizzare il grafico di esecuzione delle query per ottenere informazioni sulle prestazioni e statistiche generali per una query continua. Per ulteriori informazioni, consulta Visualizzare gli insight sul rendimento delle query.

Visualizzare la cronologia dei job

Puoi visualizzare i dettagli dei job di query continua nella cronologia dei job personale o nel cronologia dei job del progetto. Per saperne di più, consulta Visualizzare i dettagli del job.

Tieni presente che l'elenco storico dei job è ordinato in base all'ora di inizio del job, pertanto le query continue in esecuzione da un po' di tempo potrebbero non trovarsi vicino all'inizio dell'elenco.

Utilizzare lo strumento di esplorazione dei job amministrativi

Nell'esploratore dei job amministrativi, filtra i job per mostrare le query continue impostando il filtro Categoria job su Query continua.

Usa Cloud Monitoring

Puoi visualizzare le metriche specifiche per le query continue di BigQuery utilizzando Cloud Monitoring. Per saperne di più, consulta Creare dashboard, grafici e avvisi e scopri le metriche disponibili per la visualizzazione.

Avviso sulle query non riuscite

Anziché controllare regolarmente se le query continue non sono riuscite, può essere utile creare un avviso che ti invii una notifica in caso di errore. Un modo per farlo è creare una metrica basata su log di Cloud Logging personalizzata con un filtro per i job e un criterio di avviso di Cloud Monitoring basato su questa metrica:

  1. Quando crei una query continua, utilizza un prefisso ID job personalizzato. Più query continue possono condividere lo stesso prefisso. Ad esempio, potresti utilizzare il prefisso prod- per indicare una query di produzione.
  2. Nella Google Cloud console, vai alla pagina Metriche basate su log.

    Vai a Metriche basate su log

  3. Fai clic su Crea metrica. Viene visualizzato il riquadro Crea metrica di log.

  4. Per Tipo di metrica, seleziona Contatore.

  5. Nella sezione Dettagli, assegna un nome alla metrica. Ad esempio, CUSTOM_JOB_ID_PREFIX-metric.

  6. Nella sezione Selezione filtro, inserisci quanto segue nell'editor Crea filtro:

    resource.type = "bigquery_project"
    protoPayload.resourceName : "projects/PROJECT_ID/jobs/CUSTOM_JOB_ID_PREFIX"
    severity = ERROR
    

    Sostituisci quanto segue:

  7. Fai clic su Crea metrica.

  8. Nel menu di navigazione, fai clic su Metriche basate su log. La metrica appena creata viene visualizzata nell'elenco delle metriche definite dall'utente.

  9. Nella riga della metrica, fai clic su Altre azioni e poi su Crea avviso da metrica.

  10. Fai clic su Avanti. Non è necessario modificare le impostazioni predefinite nella pagina Modalità di configurazione dei criteri.

  11. Fai clic su Avanti. Non è necessario modificare le impostazioni predefinite nella pagina Configura attivatore di avvisi.

  12. Seleziona i canali di notifica e inserisci un nome per il criterio di avviso.

  13. Fai clic su Crea criterio.

Puoi testare l'avviso eseguendo una query continua con il prefisso ID job personalizzato selezionato e poi annullandola. Potrebbero essere necessari alcuni minuti prima che l'avviso raggiunga il tuo canale di notifica.

Riprova le query non riuscite

Riprovare una query continua non riuscita potrebbe contribuire a evitare situazioni in cui una pipeline continua è inattiva per un periodo di tempo prolungato o richiede l'intervento umano per il riavvio. Di seguito sono riportati alcuni aspetti importanti da considerare quando ritenti una query continua non riuscita:

  • Se è tollerabile la rielaborazione di una certa quantità di dati elaborati dalla query precedente prima del fallimento.
  • Come gestire il limite dei tentativi o l'utilizzo del backoff esponenziale.

Un possibile approccio per automatizzare il nuovo tentativo di query è il seguente:

  1. Crea un sink di Cloud Logging basato su un filtro di inclusione che corrisponda ai seguenti criteri per instradare i log a un argomento Pub/Sub:

    resource.type = "bigquery_project"
    protoPayload.resourceName : "projects/PROJECT_ID/jobs/CUSTOM_JOB_ID_PREFIX"
    severity = ERROR
    

    Sostituisci quanto segue:

  2. Crea una funzione Cloud Run che viene attivata in risposta alla ricezione di log Pub/Sub corrispondenti al tuo filtro.

    La funzione Cloud Run potrebbe accettare il payload dei dati dal messaggio Pub/Sub e tentare di avviare una nuova query continua utilizzando la stessa sintassi SQL della query non riuscita, ma all'inizio, subito dopo l'interruzione del job precedente.

Ad esempio, puoi utilizzare una funzione simile alla seguente:

Python

Prima di provare questo esempio, segui le istruzioni di configurazione Python riportate nella guida rapida all'utilizzo di BigQuery con le librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API BigQuery Python.

Per autenticarti in BigQuery, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configurare l'autenticazione per le librerie client.

import base64
import json
import logging
import re
import uuid

import google.auth
import google.auth.transport.requests
import requests


def retry_continuous_query(event, context):
    logging.info("Cloud Function started.")

    if "data" not in event:
        logging.info("No data in Pub/Sub message.")
        return

    try:
        # Decode and parse the Pub/Sub message data
        log_entry = json.loads(base64.b64decode(event["data"]).decode("utf-8"))

        # Extract the SQL query and other necessary data
        proto_payload = log_entry.get("protoPayload", {})
        metadata = proto_payload.get("metadata", {})
        job_change = metadata.get("jobChange", {})
        job = job_change.get("job", {})
        job_config = job.get("jobConfig", {})
        query_config = job_config.get("queryConfig", {})
        sql_query = query_config.get("query")
        job_stats = job.get("jobStats", {})
        end_timestamp = job_stats.get("endTime")
        failed_job_id = job.get("jobName")

        # Check if required fields are missing
        if not all([sql_query, failed_job_id, end_timestamp]):
            logging.error("Required fields missing from log entry.")
            return

        logging.info(f"Retrying failed job: {failed_job_id}")

        # Adjust the timestamp in the SQL query
        timestamp_match = re.search(
            r"\s*TIMESTAMP\(('.*?')\)(\s*\+ INTERVAL 1 MICROSECOND)?", sql_query
        )

        if timestamp_match:
            original_timestamp = timestamp_match.group(1)
            new_timestamp = f"'{end_timestamp}'"
            sql_query = sql_query.replace(original_timestamp, new_timestamp)
        elif "CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE" in sql_query:
            new_timestamp = f"TIMESTAMP('{end_timestamp}') + INTERVAL 1 MICROSECOND"
            sql_query = sql_query.replace(
                "CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE", new_timestamp
            )

        # Get access token
        credentials, project = google.auth.default(
            scopes=["https://www.googleapis.com/auth/cloud-platform"]
        )
        request = google.auth.transport.requests.Request()
        credentials.refresh(request)
        access_token = credentials.token

        # API endpoint
        url = f"https://bigquery.googleapis.com/bigquery/v2/projects/{project}/jobs"

        # Request headers
        headers = {
            "Authorization": f"Bearer {access_token}",
            "Content-Type": "application/json",
        }

        # Generate a random UUID
        random_suffix = str(uuid.uuid4())[:8]  # Take the first 8 characters of the UUID

        # Combine the prefix and random suffix
        job_id = f"CUSTOM_JOB_ID_PREFIX{random_suffix}"

        # Request payload
        data = {
            "configuration": {
                "query": {
                    "query": sql_query,
                    "useLegacySql": False,
                    "continuous": True,
                    "connectionProperties": [
                        {"key": "service_account", "value": "SERVICE_ACCOUNT"}
                    ],
                    # ... other query parameters ...
                },
                "labels": {"bqux_job_id_prefix": "CUSTOM_JOB_ID_PREFIX"},
            },
            "jobReference": {
                "projectId": project,
                "jobId": job_id,  # Use the generated job ID here
            },
        }

        # Make the API request
        response = requests.post(url, headers=headers, json=data)

        # Handle the response
        if response.status_code == 200:
            logging.info("Query job successfully created.")
        else:
            logging.error(f"Error creating query job: {response.text}")

    except Exception as e:
        logging.error(
            f"Error processing log entry or retrying query: {e}", exc_info=True
        )

    logging.info("Cloud Function finished.")

Passaggi successivi