Smalltalk über Apache Hudi
Über Hudi
Apache Hudi ist eine Data-Lake-Plattform, die die Fähigkeiten einer Datenbank und eines Data Warehouse innerhalb des Data Lake kombiniert. Es ermöglicht Echtzeitanalysen von Streaming-Daten mit minimaler Latenz und ermöglicht Analysen auf Minutenebene. Hudi führt ein neues Framework für die inkrementelle Verarbeitung ein, das die üblicherweise verwendete langsame Batch-Datenverarbeitung ersetzt. Mit Hudi können Benutzer von Funktionen wie Tabellen, Transaktionen, effizienten Upserts und Löschungen, erweiterter Indizierung, Streaming-Datenerfassungsdiensten, Datenclustering, Komprimierungsoptimierungen und Parallelitätssteuerung profitieren. Was Hudi auszeichnet, ist seine Fähigkeit, Daten in Open-Source-Dateiformaten zu verwalten, wodurch es mit verschiedenen Abfrage-Engines wie Apache Spark, Flink, Presto, Hive und mehr kompatibel ist. Es ist ideal für das Streaming von Workloads und unterstützt auch die Erstellung effizienter inkrementeller Batch-Pipelines, um leistungsstarke Analysen zu gewährleisten.
So funktioniert das Hudi-Update
Hudi arbeitet auf der Grundlage der Indizierung. Es erstellt einen Index basierend auf dem Primärschlüssel. Wenn Updates für den Primärschlüssel anstehen, wird nach der Datei gesucht, die den Datensatz für diesen Schlüssel enthält, und eine neue Version der Datei mit aktualisierten Datensätzen erstellt.
Schreibvorgänge
Zuvor kann es hilfreich sein, die 3 verschiedenen Schreiboperationen zu verstehen, die von der Hudi-Datenquelle oder dem Delta-Streamer-Tool bereitgestellt werden, und wie sie am besten genutzt werden können. Diese Operationen können für jeden Commit/Deltacommit, der für die Tabelle ausgegeben wird, ausgewählt/geändert werden.
Erforderliche Eigenschaften für Einfügungen.
- OPERATION_OPTIEREN_SCHLÜSSEL('hoodie.datasource.write.operation') : Bezieht sich auf die Art der Operation, die mit Hudi - UPSERT durchgeführt wird_OPERATION_OPTIEREN_VAL (Vorgabe)MASSE_EINFÜGEN_OPERATION_OPTIEREN_VAL, EINFÜGEN_OPERATION_OPTIEREN_WERT, LÖSCHEN_OPERATION_OPTIEREN_VAL
- PARTITIONSPFAD_FIELD.key() ('hoodie.datasource.write.partitionpath.field') : Bezieht sich auf den Partitionspfad, zu dem der Datensatz gehört.
- VORKOMBINIEREN_FIELD.key() ('hoodie.datasource.write.precombine.field') : Feld, das verwendet wird, um mehrere Datensätze aus dem Datensatzstapel zu deduplizieren, der erfasst wird.
- REKORDSCHLÜSSEL_FELD_OPTIEREN_SCHLÜSSEL (Erforderlich): Feld für den Primärschlüssel(s). Datensatzschlüssel identifizieren einen Datensatz/eine Zeile innerhalb jeder Partition eindeutig.
- hoodie.insert.shuffle.parallelism : Bezieht sich auf die Funkenparallelität, die beim Aufnehmen von Datensätzen in hudi verwendet werden soll
- hoodie.datasource.write.table.type : Bezieht sich auf den Tabellentyp der hudi-Tabelle. Es gibt zwei Tabellentypen in Hudi, nämlich COPY_AUF_SCHREIBEN(Vorgabe) und MERGE_AUF_LESEN.
- TISCH_NAME (»hoodie.table.name«) : Bezieht sich auf den Namen der Hudi-Tabelle.
KÜHE VS MOR
Kopieren auf Schreibtabelle
Datei-Slices in der Copy-On-Write-Tabelle enthalten nur die Basis-/Spaltendatei, und jeder Commit erzeugt neue Versionen der Basisdateien. Mit anderen Worten, wir komprimieren implizit bei jedem Commit, sodass nur spaltenbasierte Daten vorhanden sind. Infolgedessen wird die Schreibverstärkung (Anzahl der geschriebenen Bytes für 1 Byte eingehender Daten) ist viel höher, wenn die Leseverstärkung Null ist. Dies ist eine sehr begehrte Eigenschaft für analytische Workloads, die überwiegend leselastig sind.
Im Folgenden wird veranschaulicht, wie dies konzeptionell funktioniert, wenn Daten in eine Copy-on-Write-Tabelle geschrieben und zwei Abfragen darauf ausgeführt werden.
Empfohlen von LinkedIn
Beim Schreiben von Daten wird bei Aktualisierungen vorhandener Dateigruppen ein neues Slice für diese Dateigruppe erzeugt, das mit dem Stempel Commit-Sofortzeit versehen ist, während Einfügungen eine neue Dateigruppe zuordnen und das erste Slice für diese Dateigruppe schreiben. Diese Datei-Slices und ihre Commit-Zeitpunkte sind oben farbcodiert. SQL-Abfragen, die für eine solche Tabelle ausgeführt werden (z.B.: Anzahl auswählen(*) Zählen der Gesamtdatensätze in dieser Partition), überprüft zunächst die Zeitachse auf den letzten Commit und filtert alle bis auf die neuesten Datei-Slices jeder Dateigruppe. Wie Sie sehen können, werden bei einer alten Abfrage die Dateien des aktuellen Inflight-Commits nicht farblich rosa codiert, sondern bei einer neuen Abfrage, die nach dem Commit gestartet wird, werden die neuen Daten abgerufen. Daher sind Abfragen immun gegen Schreibfehler/partielle Schreibvorgänge und werden nur für festgeschriebene Daten ausgeführt.
Die Absicht von copy on write table ist es, die Art und Weise, wie Tabellen heute verwaltet werden, grundlegend zu verbessern durch
- Erstklassige Unterstützung für die atomare Aktualisierung von Daten auf Dateiebene, anstatt ganze Tabellen/Partitionen neu zu schreiben
- Möglichkeit, inkrementelle Änderungen zu nutzen, im Gegensatz zu verschwenderischen Scans oder dem Herumfummeln mit Heuristiken
- Strenge Kontrolle der Dateigrößen, um eine hervorragende Abfrageleistung zu gewährleisten (Kleine Dateien beeinträchtigen die Abfrageleistung erheblich).
Zusammenführen bei Lesetabelle
Merge on read table ist eine Obermenge von copy on write, in dem Sinne, dass es weiterhin leseoptimierte Abfragen der Tabelle unterstützt, indem nur die Basis-/Spaltendateien in den neuesten Dateisegmenten verfügbar gemacht werden. Darüber hinaus werden eingehende Upserts für jede Dateigruppe in einem zeilenbasierten Delta-Protokoll gespeichert, um Momentaufnahmeabfragen zu unterstützen, indem das Delta-Protokoll während der Abfragezeit auf die neueste Version jeder Datei-ID angewendet wird. Daher versucht dieser Tabellentyp, die Lese- und Schreibverstärkung intelligent auszugleichen, um Daten nahezu in Echtzeit bereitzustellen. Die wichtigste Änderung betrifft hier den Verdichter, der jetzt sorgfältig auswählt, welche Delta-Protokolldateien in ihre spaltenbasierte Basisdatei komprimiert werden müssen, um die Abfrageleistung in Schach zu halten (Größere Delta-Protokolldateien würden längere Zusammenführungszeiten mit Zusammenführungsdaten auf der Abfrageseite verursachen)
Im Folgenden wird veranschaulicht, wie die Tabelle funktioniert, und es werden zwei Arten von Abfragen gezeigt: Momentaufnahmeabfrage und leseoptimierte Abfrage.
In diesem Beispiel passieren viele interessante Dinge, die die Feinheiten des Ansatzes hervorheben.
- Wir haben jetzt Commits etwa alle 1 Minute, was wir mit dem anderen Tabellentyp nicht tun konnten.
- Innerhalb jeder Datei-ID-Gruppe gibt es jetzt eine Delta-Protokolldatei, die eingehende Aktualisierungen der Datensätze enthält, die bereits in den spaltenbasierten Basisdateien vorhanden sind. Im Beispiel enthalten die Deltaprotokolldateien alle Daten von 10:05 bis 10:10 Uhr. Die spaltenbasierten Basisdateien werden wie zuvor mit dem Commit versioniert. Wenn man also nur die Basisdateien betrachtet, dann sieht das Tabellenlayout genau so aus wie eine Kopie auf einer Schreibtabelle.
- Bei einem periodischen Komprimierungsprozess werden diese Änderungen aus dem Deltaprotokoll abgeglichen und eine neue Version der Basisdatei erstellt, genau wie im Beispiel um 10:05 Uhr.
- Es gibt zwei Möglichkeiten, dieselbe zugrunde liegende Tabelle abzufragen: Optimierte Abfrage lesen und Momentaufnahmeabfrage, je nachdem, ob wir die Abfrageleistung oder die Aktualität der Daten ausgewählt haben.
- Die Semantik, wann Daten aus einem Commit für eine Abfrage verfügbar sind, ändert sich für eine leseoptimierte Abfrage auf subtile Weise. Beachten Sie, dass eine solche Abfrage, die um 10:10 Uhr ausgeführt wird, keine Daten nach 10:05 Uhr oben sieht, während eine Momentaufnahmeabfrage immer die aktuellsten Daten sieht.
- Wenn wir die Verdichtung auslösen und was sie entscheidet, um zu komprimieren, halten Sie den Schlüssel zur Lösung dieser schwierigen Probleme in der Hand. Durch die Implementierung einer Komprimierungsstrategie, bei der wir die neuesten Partitionen im Vergleich zu älteren Partitionen aggressiv komprimieren, konnten wir sicherstellen, dass die leseoptimierten Abfragen Daten sehen, die innerhalb von X Minuten auf konsistente Weise veröffentlicht wurden.
Die Absicht der Zusammenführung in der Lesetabelle besteht darin, eine Verarbeitung nahezu in Echtzeit direkt auf DFS zu ermöglichen, im Gegensatz zum Kopieren von Daten auf spezialisierte Systeme, die möglicherweise nicht in der Lage sind, das Datenvolumen zu verarbeiten. Es gibt auch einige sekundäre Nebeneffekte dieser Tabelle, wie z. B. eine reduzierte Schreibverstärkung durch Vermeidung des synchronen Zusammenführens von Daten, d. h. die Menge an Daten, die pro 1 Byte Daten in einem Batch geschrieben werden
Nachteile
- Um HUDI-Upsert zu verwenden, müssen wir nur Aktualisierungen/Datensätze aus der Quelle abrufen.
- HUDI-Upsert ist möglicherweise nicht hilfreich, wenn fast alle Datensätze in der Tabelle aktualisiert werden. Es fügt Indizierungs- und Suchaufwand hinzu.
- Wir können eine Datei nicht direkt löschen oder eine Partition in HUDI löschen, da HUDI alle Dateinamen- und Datensatzschlüssel in seiner Metadatendatei speichert. Diese Vorgänge müssen über HUDI delete ausgeführt werden.
- Auf die Zeitstempelspalte kann von der Luftlinie aus nicht zugegriffen werden. (Der Zugriff kann über Spark erfolgen )
- Die Dateigröße ist im Vergleich zu ORC höher. (Sie können auch auf Metadaten zurückzuführen sein. Wir können sehen, ob wir die Dateigröße durch Komprimierung reduzieren können)
Awesome blog- cc Apache Hudi