Infrastructure

Powering Apache Pinot ingestion with Hoptimator

In a previous blog, we described a new open-source project, Hoptimator, which helps automate end-to-end multi-hop data pipelines. End-to-end multi-hop refers to Hoptimator's ability to plan pipelines across otherwise unrelated systems, and to wire up all the necessary components for the resulting pipelines to run. These pipelines are complex and touch many different systems, so building them manually can take weeks of effort. With Hoptimator, we can deploy new pipelines in minutes.

We are now using Hoptimator as part of a managed ingestion solution for Apache Pinot. Managed ingestion involves consumer-driven pipelines, which means that the consumer – in this case, Apache Pinot – can itself create and control pipelines as needed. Under the hood, Hoptimator is used to plan, deploy, update, and monitor these pipelines on behalf of the consumer, eliminating manual toil on an ongoing basis.

As the name "Hoptimator" implies, these managed pipelines are fully automated and periodically optimized. All new production Pinot tables at LinkedIn ingest from a Hoptimator-managed pipeline by default, eliminating considerable friction for Pinot's users.

This blog describes how Pinot at Linkedin leverages Hoptimator for dynamic, fully-managed ingestion pipelines.

Apache Pinot before managed ingestion

Apache Pinot is a real-time distributed OLAP datastore, originally developed at LinkedIn, that enables fast analytics queries against real-time data, such as ad click events, page views, or host metrics. Generally, the data in Pinot comes from "nearline" event streams (e.g. Kafka topics), but data can also be pushed to Pinot in bulk from "offline" sources (e.g. HDFS).

Like several similar derived data platforms at LinkedIn, Pinot can ingest data directly from Apache Kafka. In Pinot's case, the storage nodes (Pinot Servers) themselves are responsible for both ingesting and serving the data. Each Pinot Server consumes a subset of Kafka partitions, deserializes each incoming record, performs lightweight data transformations/aggregations, and periodically commits in-memory segments (containing batches of records) onto disk. This architecture enables Pinot to serve real-time data with very low latency and sub-second data freshness.

However, this also means that Pinot Servers are sensitive to the rate, partitioning, and format of incoming records. For example:

  • Pinot may only need to store a few fields in a complex record, but the storage nodes will need to read and deserialize the entire record.
  • Pinot may only need a subset of incoming records, e.g. those matching some predicate, but the storage nodes will need to read and process all of them.
  • Pinot may only need to store per-minute aggregations, but the storage nodes will need to read every record and compute their own per-minute aggregations in memory.
  • Pinot may need to store data across multiple nodes, but the incoming records may be unevenly partitioned (i.e. "hot partitions").
  • Pinot may require records to be keyed on a certain column to enable query optimizations like segment pruning that reduce query fanout.

Ultimately, this means Pinot Servers spend I/O, memory and CPU well before data is ever stored on disk or queried. As the rate of incoming data increases, we may need to add additional Pinot Servers – even if we have plenty of storage!

Historically, this meant the Online Analytics team has asked data producers to introduce logic specifically for optimizing Pinot ingestion. This may involve re-keying or fixing hot partitions, adding new partitions, or pre-aggregating metrics. Often, this involves creating new stream processing jobs and new Kafka topics just for Pinot. In addition to manually writing this new logic, analytics application developers are then responsible for owning and operating these jobs going forward.

Customer pre-processing jobs prior to Pinot-managed ingestion
Figure 1. Customer pre-processing jobs prior to Pinot-managed ingestion

What if Pinot could create and manage these data pipelines itself to continuously optimize its own ingestion?

Dynamic pipelines with Hoptimator

Hoptimator was developed to empower data consumers to create and control their own data pipelines. At LinkedIn, Hoptimator powers subscriptions, which represent an ongoing request for data. Data consumers can create, modify, and delete subscriptions dynamically via the Subscription API. This service leverages Hoptimator to orchestrate end-to-end multi-hop data pipelines that deliver the requested data.

For each subscription, Hoptimator creates a new Flink SQL job, along with all the external resources the job will need to run. This includes Kafka topics, Brooklin CDC datastreams, resource ACLs, etc. As subscriptions are created, Hoptimator automatically de-dupes overlapping "hops" across pipelines, including shared CDC streams. As subscriptions are deleted, Hoptimator automatically garbage-collects orphaned resources. Our internal version of Hoptimator enforces access control policies at each "hop", ensuring data is only delivered to systems that already have explicit access.

Pinot-managed ingestion pipelines

We recently launched a new Pinot integration powered by Apache Airflow and Hoptimator. At LinkedIn, we have built a Smart Provisioning capability on Airflow that enables self-service provisioning of Pinot tables in production. Smart Provisioning observes actual query patterns during initial ramping of the table to derive query cost estimates. It then assesses and allocates capacity in order to meet customer requirements like query latency and QPS. With Pinot-managed ingestion, we've extended our Smart Provisioning DAGs to create pre-processing pipelines dynamically by invoking the Subscription API. The Smart Provisioning system uses machine learning to derive both the optimal number of partitions as well as an optional partitioning key for the re-processed stream. These inputs are used to create or update a subscription to continuously optimize Pinot ingestion for the table.

Example subscription which ingests from a Kafka topic
Figure 2. Example subscription which ingests from a Kafka topic

Now, when a user promotes a Pinot table to prod, a subscription is automatically created for the table. In turn, Hoptimator provisions new Flink jobs, Kafka topics, and ACLs as needed to deliver a re-processed stream optimized for Pinot ingestion. Throughout the process, the end user experience is unchanged, and Pinot’s customers are unaware of the re-processing that is happening behind the scenes!

Unlike before, the new pipeline is specific to the Pinot table asking for it:

  • Only Pinot Servers are authorized to read the data.
  • The incoming records only contain the specific fields the table needs.
  • The number of Kafka partitions is adjusted to produce optimal Pinot segments.
  • The incoming records are re-partitioned evenly across Pinot Servers.
  • If optimal for the use case, the incoming records are re-partitioned to match Pinot’s partitioning column.

Additionally, we can change these attributes dynamically as the Pinot table changes. So far, we've built Airflow DAGs to automatically add Kafka partitions and evolve schemas as needed. In the future, we plan to expand Pinot's managed ingestion by adding more capabilities to Hoptimator including metric pre-aggregation, column-level transforms (e.g. extracting nested fields), row-level filtering, and the ability to ingest from multiple Kafka topics.

Consumer-driven ingestion: a new approach to data pipelines

Pinot-managed ingestion represents a shift from producer-driven to consumer-driven data pipelines. The former is about empowering data producers to write, own, and operate data pipelines to deliver data. The latter is about automating pipelines that deliver data to the consumers asking for it.

The resulting pipelines are roughly the same, but the responsibility of writing, owning, and operating these pipelines is allocated completely differently. With the producer-driven model, downstream consumers (like Pinot) must ask data producers to accommodate their specific requests and limitations. In Pinot's case, this has resulted in Pinot-specific logic spread across otherwise unrelated streaming and batch jobs. In many cases, entirely new Pinot-specific jobs have been written, maintained, and operated by upstream teams.

With Pinot-managed ingestion, the analytics platform itself creates and optimizes this logic on behalf of the end user:

Pinot-managed nearline ingestion powered by Hoptimator
Figure 3. Pinot-managed nearline ingestion powered by Hoptimator

The analytics platform now owns and operates all such pipelines. This "consumer-driven" model has some obvious benefits:

  • Pinot's users are no longer involved. This reduces friction for new users as they onboard to Pinot.
  • The analytics platform can manipulate subscriptions dynamically, drastically reducing operator and customer toil as use cases evolve.
  • Pinot ingests only the data it needs, which reduces IO and CPU/memory requirements on the Pinot Servers.

The Online Analytics team partnered with the Hoptimator team to co-design the Subscription API that powers Pinot's nearline managed ingestion. Pinot ingestion was Hoptimator's first production use case and released in August of 2024. Since then, Hoptimator has established itself as the Control Plane for Data Planes, and is now onboarding new use cases from various data serving platforms at LinkedIn.

After successfully proving our value proposition with the release of Pinot managed nearline ingestion, we are now working towards similar integrations with other derived data serving platforms at LinkedIn. Our long term plan is also to extend Pinot-managed ingestion to support offline data sources like Apache Iceberg tables. Upon detecting changes to the customer's offline data set, Hoptimator can trigger execution of a Pinot build-and-push job to upload segments to Pinot for serving queries. We expect the Hoptimator project to evolve as we expand to these new use cases.

Acknowledgements

It truly took a village to turn the vision of Hoptimator and Pinot-managed ingestion into a reality. This initiative is the culmination of a close collaboration between the Online Analytics and Hoptimator teams over the last 2 years. Many thanks to all the team members who made this possible, including Ryanne Dolan (tech lead for Hoptimator), Harshil Shukla, Eric Honer, Vaibhav Maheshwari, Joseph Grogan, from the Hoptimator team, as well as Gerardo Viedma Nunez (tech lead for Online Analytics), Sajjad Moradi, Lucy Cheng, Prachi Khobragade, Benson Yuan and Krishnapriya Raghuveer from Online Analytics.

Thanks a lot to our leadership Naveenkumar Selvaraj, Aditya Toomula, Shraddha Sahay, Ashish Singhai, Lenisha Gandhi, Renu Tewari, Sandeep Singhal and Kartik Paramasivam for their continued support in this journey. Also, thanks to Subbu Subramaniam, Yi Pan, Becket Qin, Sriram Rao and Diego Buthay for their technical guidance in helping to shape Pinot’s Next Gen Ingestion strategy and the Hoptimator vision.

Finally, Pinot-managed ingestion is a product of many passionate discussions with partners and stakeholders across LinkedIn, including Vasudeva Nagaraja, Zarwan Hashem, Abhishek Mendhekar, Weiqing Yang, Manan Chandra, Swathi Koundinya, Anthony Aasta, Jin An, Saipreethi Muthusrinivasan and Boris Shkolnik.