Utilizzare il metastore BigLake con Dataproc

Questo documento spiega come utilizzare il metastore BigLake con Dataproc su Compute Engine. Questa connessione fornisce un singolo metastore condiviso che funziona su motori software open source, come Apache Spark o Apache Flink.

Prima di iniziare

  1. Abilita la fatturazione per il tuo Google Cloud progetto. Scopri come verificare se la fatturazione è attivata in un progetto.
  2. Abilita le API BigQuery e Dataproc.

    Abilita le API

  3. (Facoltativo) Scopri come funziona il metastore BigLake e perché dovresti usarlo.

Ruoli obbligatori

Per ottenere le autorizzazioni necessarie per utilizzare Spark o Flink e Dataproc con BigLake Metastore come archivio dei metadati, chiedi all'amministratore di concederti i seguenti ruoli IAM:

Per saperne di più sulla concessione dei ruoli, consulta Gestire l'accesso a progetti, cartelle e organizzazioni.

Potresti anche riuscire a ottenere le autorizzazioni richieste tramite i ruoli personalizzati o altri ruoli predefiniti.

Flusso di lavoro generale

Per utilizzare Dataproc su Compute Engine con il metastore BigLake, segui questi passaggi generali:

  1. Crea un cluster Dataproc o configurane uno esistente.
  2. Connettiti al tuo motore software open source preferito, ad esempio Spark o Flink.
  3. Utilizza un file JAR per installare il plug-in del catalogo Apache Iceberg sul cluster.
  4. Crea e gestisci le risorse del metastore BigLake in base alle tue esigenze, a seconda del motore software open source in uso.
  5. In BigQuery, accedi e utilizza le risorse del metastore BigLake.

Collegare il metastore BigLake a Spark

Le istruzioni riportate di seguito mostrano come connettere Dataproc al metastore BigLake utilizzando Spark SQL interattivo.

Scaricare il plug-in del catalogo Iceberg

Per collegare il metastore BigLake a Dataproc e Spark, devi utilizzare il file jar del plug-in del catalogo Iceberg del metastore BigLake.

Questo file è incluso per impostazione predefinita nella versione 2.2 dell'immagine Dataproc. Se i tuoi cluster Dataproc non hanno accesso diretto a internet, devi scaricare il plug-in e caricarlo in un bucket Cloud Storage a cui il tuo cluster Dataproc può accedere.

Scarica il plug-in del catalogo Iceberg di BigLake Metastore.

Configura un cluster Dataproc

Prima di connetterti al metastore BigLake, devi configurare un cluster Dataproc.

A questo scopo, puoi creare un nuovo cluster o utilizzarne uno esistente. Successivamente, utilizza questo cluster per eseguire Spark SQL interattivo e gestire le risorse del metastore BigLake.

  • Nella subnet della regione in cui viene creato il cluster deve essere attivo l'accesso privato Google (PGA). Per impostazione predefinita, le VM del cluster Dataproc create con un'immagine di versione 2.2 (predefinita) o successiva hanno solo indirizzi IP interni. Per consentire alle VM del cluster di comunicare con le API di Google, abilita l'accesso privato Google sulla subnet della rete default (o sul nome della rete specificato dall'utente, se applicabile) nella regione in cui viene creato il cluster.

  • Se vuoi eseguire l'esempio dell'interfaccia web di Zeppelin in questa guida, devi utilizzare o creare un cluster Dataproc con il componente facoltativo di Zeppelin abilitato.

Nuovo cluster

Per creare un nuovo cluster Dataproc, esegui il seguente comando gcloud dataproc clusters create. Questa configurazione contiene le impostazioni necessarie per utilizzare il metastore BigLake.

gcloud dataproc clusters create CLUSTER_NAME \
  --project=PROJECT_ID \
  --region=LOCATION \
  --optional-components=ZEPPELIN \
  --enable-component-gateway \
  --single-node

Sostituisci quanto segue:

  • CLUSTER_NAME: un nome per il tuo cluster Dataproc.
  • PROJECT_ID: l'ID del Google Cloud progetto in cui stai creando il cluster.
  • LOCATION: la Google Cloud regione in cui stai creando il cluster.

Cluster esistente

Per configurare un cluster esistente, aggiungi il seguente runtime Iceberg Spark al cluster.

org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1

Puoi aggiungere il runtime utilizzando una delle seguenti opzioni:

Invia un job Spark

Per inviare un job Spark, utilizza uno dei seguenti metodi:

Interfaccia a riga di comando gcloud

gcloud dataproc jobs submit spark-sql \
--project=PROJECT_ID \
--cluster=CLUSTER_NAME \
--region==REGION \
--jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
--properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \
spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \
spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \
spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \
spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \
--execute="SPARK_SQL_COMMAND"

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del Google Cloud progetto che contiene il cluster Dataproc.
  • CLUSTER_NAME: il nome del cluster Dataproc che utilizzi per eseguire il job Spark SQL.
  • REGION: la regione Compute Engine in cui si trova il cluster.
  • LOCATION: la posizione delle risorse BigQuery.
  • CATALOG_NAME: il nome del catalogo Spark che utilizzi con il tuo job SQL.
  • WAREHOUSE_DIRECTORY: la cartella Cloud Storage contenente il data warehouse. Questo valore inizia con gs://.
  • SPARK_SQL_COMMAND: la query Spark SQL che vuoi eseguire. Questa query include i comandi per creare le risorse. Ad esempio, per creare un spazio dei nomi e una tabella.

Interactive Spark

Connettiti a Spark e installa il plug-in del catalogo

Per installare il plug-in del catalogo per il metastore BigLake, connettiti al tuo cluster Dataproc utilizzando SSH.

  1. Nella Google Cloud console, vai alla pagina Istanze VM.
  2. Per connetterti a un'istanza VM Dataproc, fai clic su SSH nell'elenco delle istanze della macchina virtuale. L'output è simile al seguente:

    Connected, host fingerprint: ssh-rsa ...
    Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ...
    ...
    example-cluster@cluster-1-m:~$
    
  3. Nel terminale, esegui il seguente comando di inizializzazione del metastore BigLake:

    spark-sql \
    --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
    --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
    --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
    --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \
    --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY

    Sostituisci quanto segue:

    • CATALOG_NAME: il nome del catalogo Spark che utilizzi con il tuo job SQL.
    • PROJECT_ID: l' Google Cloud ID progetto del catalogo del metastore BigLake a cui è collegato il tuo catalogo Spark.
    • LOCATION: la Google Cloud posizione del metastore BigLake.
    • WAREHOUSE_DIRECTORY: la cartella Cloud Storage contenente il data warehouse. Questo valore inizia con gs://.

    Dopo aver eseguito la connessione a un cluster, il terminale Spark visualizza il prompt spark-sql.

    spark-sql (default)>
    

Gestire le risorse del metastore BigLake

Ora sei connesso al metastore BigLake. Puoi visualizzare le risorse esistenti o crearne di nuove in base ai metadati archiviati nel metastore BigLake.

Ad esempio, prova a eseguire i seguenti comandi nella sessione interattiva di Spark SQL per creare uno spazio dei nomi e una tabella Iceberg.

  • Utilizza il catalogo personalizzato Iceberg:

    USE `CATALOG_NAME`;
  • Crea uno spazio dei nomi:

    CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;
  • Utilizza lo spazio dei nomi creato:

    USE NAMESPACE_NAME;
  • Crea una tabella Iceberg:

    CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG;
  • Per inserire una riga di tabella:

    INSERT INTO TABLE_NAME VALUES (1, "first row");
  • Aggiungi una colonna della tabella:

    ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);
  • Visualizza i metadati della tabella:

    DESCRIBE EXTENDED TABLE_NAME;
  • Elenca le tabelle nello spazio dei nomi:

    SHOW TABLES;

Notebook Zeppelin

  1. Nella Google Cloud console, vai alla pagina Cluster Dataproc.

    Vai ai cluster Dataproc

  2. Fai clic sul nome del cluster che vuoi utilizzare.

    Viene visualizzata la pagina Dettagli cluster.

  3. Nel menu di navigazione, fai clic su Interfacce web.

  4. In Gateway dei componenti, fai clic su Zeppelin. Viene visualizzata la pagina del notebook Zeppelin.

  5. Nel menu di navigazione, fai clic su Notebook e poi su +Crea nuova nota.

  6. Nella finestra di dialogo, inserisci il nome di un notebook. Lascia selezionato Spark come interprete predefinito.

  7. Fai clic su Crea. Viene creato un nuovo notebook.

  8. Nel notebook, fai clic sul menu delle impostazioni e poi su Interprete.

  9. Nel campo Cerca interpreti, cerca Spark.

  10. Fai clic su Modifica.

  11. Nel campo Spark.jars, inserisci l'URI del jar Spark.

    https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar
    
  12. Fai clic su Salva.

  13. Fai clic su OK.

  14. Copia il seguente codice PySpark nel notebook Zeppelin.

    %pyspark
    from pyspark.sql import SparkSession
    spark = SparkSession.builder \
    .appName("BigLake Metastore Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    spark.sql("select version()").show()
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG;")
    spark.sql("DESCRIBE TABLE_NAME;").show()

    Sostituisci quanto segue:

    • CATALOG_NAME: il nome del catalogo Spark da utilizzare per il job SQL.
    • PROJECT_ID: l'ID del Google Cloud progetto che contiene il cluster Dataproc.
    • WAREHOUSE_DIRECTORY: la cartella Cloud Storage contenente il data warehouse. Questo valore inizia con gs://.
    • NAMESPACE_NAME: il nome dello spazio dei nomi che fa riferimento alla tabella Spark.
    • WAREHOUSE_DIRECTORY: l'URI della cartella Cloud Storage in cui è archiviato il data warehouse.
    • TABLE_NAME: un nome per la tabella Spark.
  15. Fai clic sull'icona di esecuzione o premi Shift-Enter per eseguire il codice. Al termine del job, viene visualizzato il messaggio di stato "Job Spark completato" e l'output mostra i contenuti della tabella:

Le istruzioni riportate di seguito mostrano come connettere Dataproc al metastore BigLake utilizzando il client Flink SQL.

Per collegare il metastore BigLake a Flink:

  1. Crea un cluster Dataproc con il componente Flink facoltativo abilitato e assicurati di utilizzare Dataproc 2.2 o versioni successive.
  2. Nella Google Cloud console, vai alla pagina Istanze VM.

    Vai a Istanze VM

  3. Nell'elenco delle istanze della macchina virtuale, fai clic su SSH per connetterti a un'istanza VM Dataproc.

  4. Configura il plug-in del catalogo personalizzato Iceberg per il metastore BigLake:

    FLINK_VERSION=1.17
    ICEBERG_VERSION=1.5.2
    
    cd /usr/lib/flink
    
    sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib
    
    sudo gcloud storage cp gs://spark-lib/bigquery/iceberg-bigquery-catalog-${ICEBERG_VERSION}-1.0.1-beta.jar lib/
  5. Avvia la sessione Flink su YARN:

    HADOOP_CLASSPATH=`hadoop classpath`
    
    sudo bin/yarn-session.sh -nm flink-dataproc -d
    
    sudo bin/sql-client.sh embedded \
      -s yarn-session
  6. Crea un catalogo in Flink:

    CREATE CATALOG CATALOG_NAME WITH (
      'type'='iceberg',
      'warehouse'='WAREHOUSE_DIRECTORY',
      'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog',
      'gcp_project'='PROJECT_ID',
      'gcp_location'='LOCATION'
    );

    Sostituisci quanto segue:

    • CATALOG_NAME: l'identificatore del catalogo Flink, collegato a un catalogo del metastore BigLake.
    • WAREHOUSE_DIRECTORY: il percorso di base per la directory del magazzino (la cartella Cloud Storage in cui Flink crea i file). Questo valore inizia con gs://.
    • PROJECT_ID: l'ID progetto del catalogo del metastore BigLake a cui si collega il catalogo Flink.
    • LOCATION: la posizione delle risorse BigQuery.

La sessione Flink è ora collegata al metastore BigLake e puoi eseguire comandi Flink SQL.

Ora che hai eseguito la connessione al metastore BigLake, puoi creare e visualizzare le risorse in base ai metadati archiviati nel metastore BigLake.

Ad esempio, prova a eseguire i seguenti comandi nella sessione Flink SQL interattiva per creare un database e una tabella Iceberg.

  1. Utilizza il catalogo personalizzato Iceberg:

    USE CATALOG CATALOG_NAME;

    Sostituisci CATALOG_NAME con il tuo ID catalogo Flink.

  2. Crea un database, che crea un set di dati in BigQuery:

    CREATE DATABASE IF NOT EXISTS DATABASE_NAME;

    Sostituisci DATABASE_NAME con il nome del nuovo database.

  3. Utilizza il database che hai creato:

    USE DATABASE_NAME;
  4. Crea una tabella Iceberg. Di seguito viene creata un'esempio di tabella delle vendite:

    CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME (
        order_number BIGINT,
        price        DECIMAL(32,2),
        buyer        ROW<first_name STRING, last_name STRING>,
        order_time   TIMESTAMP(3)
    );

    Sostituisci ICEBERG_TABLE_NAME con un nome per la nuova tabella.

  5. Visualizza i metadati della tabella:

    DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
  6. Elenca le tabelle nel database:

    SHOW TABLES;

Importa i dati nella tabella

Dopo aver creato una tabella Iceberg nella sezione precedente, puoi utilizzare Flink DataGen come origine dati per importare dati in tempo reale nella tabella. I passaggi che seguono sono un esempio di questo workow:

  1. Crea una tabella temporanea utilizzando DataGen:

    CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME
    WITH (
        'connector' = 'datagen',
        'rows-per-second' = '10',
        'fields.order_number.kind' = 'sequence',
        'fields.order_number.start' = '1',
        'fields.order_number.end' = '1000000',
        'fields.price.min' = '0',
        'fields.price.max' = '10000',
        'fields.buyer.first_name.length' = '10',
        'fields.buyer.last_name.length' = '10'
    )
    LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);

    Sostituisci quanto segue:

    • DATABASE_NAME: il nome del database in cui memorizzare la tabella temporanea.
    • TEMP_TABLE_NAME: un nome per la tabella temporanea.
    • ICEBERG_TABLE_NAME: il nome della tabella Iceberg creata nella sezione precedente.
  2. Imposta il parallelismo su 1:

    SET 'parallelism.default' = '1';
  3. Imposta l'intervallo di checkpoint:

    SET 'execution.checkpointing.interval' = '10second';
  4. Imposta il checkpoint:

    SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
  5. Avvia il job di streaming in tempo reale:

    INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;

    L'output è simile al seguente:

    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 0de23327237ad8a811d37748acd9c10b
    
  6. Per controllare lo stato del job di streaming:

    1. Nella Google Cloud console, vai alla pagina Cluster.

      Vai a Cluster

    2. Seleziona il cluster.

    3. Fai clic sulla scheda Interfacce web.

    4. Fai clic sul link YARN ResourceManager.

    5. Nell'interfaccia YARN ResourceManager, individua la sessione Flink e fai clic sul link ApplicationMaster in Tracking UI.

    6. Nella colonna Stato, verifica che lo stato del job sia In esecuzione.

  7. Esegui query sui dati in streaming nel client Flink SQL:

    SELECT * FROM ICEBERG_TABLE_NAME
    /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/
    ORDER BY order_time desc
    LIMIT 20;
  8. Esegui query sui dati in streaming in BigQuery:

    SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME`
    ORDER BY order_time desc
    LIMIT 20;
  9. Termina il job di streaming nel client Flink SQL:

    STOP JOB 'JOB_ID';

    Sostituisci JOB_ID con l'ID job visualizzato nell'output quando hai creato il job di streaming.

Passaggi successivi