Skip to content

Use async reader for parsing Apache Arrow responses (#2788) #2790

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Use async reader for parsing Apache Arrow responses (#2788)
(cherry picked from commit 710b937)
  • Loading branch information
JoshMock committed Apr 24, 2025
commit b3212a19655a6d116bc008b160f6a01bf0cb691d
4 changes: 2 additions & 2 deletions docs/reference/client-helpers.md
Original file line number Diff line number Diff line change
Expand Up @@ -475,15 +475,15 @@ Added in `v8.16.0`

ES|QL can return results in multiple binary formats, including [Apache Arrow](https://arrow.apache.org/)'s streaming format. Because it is a very efficient format to read, it can be valuable for performing high-performance in-memory analytics. And, because the response is streamed as batches of records, it can be used to produce aggregations and other calculations on larger-than-memory data sets.

`toArrowReader` returns a [`RecordBatchStreamReader`](https://arrow.apache.org/docs/js/classes/Arrow_dom.RecordBatchReader.md).
`toArrowReader` returns an [`AsyncRecordBatchStreamReader`](https://github.com/apache/arrow/blob/520ae44272d491bbb52eb3c9b84864ed7088f11a/js/src/ipc/reader.ts#L216).

```ts
const reader = await client.helpers
.esql({ query: 'FROM sample_data' })
.toArrowReader()

// print each record as JSON
for (const recordBatch of reader) {
for await (const recordBatch of reader) {
for (const record of recordBatch) {
console.log(record.toJSON())
}
Expand Down
12 changes: 6 additions & 6 deletions src/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import assert from 'node:assert'
import * as timersPromises from 'node:timers/promises'
import { Readable } from 'node:stream'
import { errors, TransportResult, TransportRequestOptions, TransportRequestOptionsWithMeta } from '@elastic/transport'
import { Table, TypeMap, tableFromIPC, RecordBatchStreamReader } from 'apache-arrow/Arrow.node'
import { Table, TypeMap, tableFromIPC, AsyncRecordBatchStreamReader } from 'apache-arrow/Arrow.node'
import Client from './client'
import * as T from './api/types'
import { Id } from './api/types'
Expand Down Expand Up @@ -135,7 +135,7 @@ export interface EsqlColumn {
export interface EsqlHelper {
toRecords: <TDocument>() => Promise<EsqlToRecords<TDocument>>
toArrowTable: () => Promise<Table<TypeMap>>
toArrowReader: () => Promise<RecordBatchStreamReader>
toArrowReader: () => Promise<AsyncRecordBatchStreamReader>
}

export interface EsqlToRecords<TDocument> {
Expand Down Expand Up @@ -1000,7 +1000,7 @@ export default class Helpers {
return tableFromIPC(response)
},

async toArrowReader (): Promise<RecordBatchStreamReader> {
async toArrowReader (): Promise<AsyncRecordBatchStreamReader> {
if (metaHeader !== null) {
reqOptions.headers = reqOptions.headers ?? {}
reqOptions.headers['x-elastic-client-meta'] = `${metaHeader as string},h=qa`
Expand All @@ -1009,9 +1009,9 @@ export default class Helpers {

params.format = 'arrow'

// @ts-expect-error the return type will be ArrayBuffer when the format is set to 'arrow'
const response: ArrayBuffer = await client.esql.query(params, reqOptions)
return RecordBatchStreamReader.from(response)
// @ts-expect-error response is a Readable when asStream is true
const response: Readable = await client.esql.query(params, reqOptions)
return await AsyncRecordBatchStreamReader.from(Readable.from(response))
}
}

Expand Down
47 changes: 31 additions & 16 deletions test/unit/helpers/esql.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,17 +158,28 @@ test('ES|QL helper', t => {
t.end()
})

test('toArrowReader', t => {
t.test('Parses a binary response into an Arrow stream reader', async t => {
const binaryContent = '/////zABAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAIAAAB8AAAABAAAAJ7///8UAAAARAAAAEQAAAAAAAoBRAAAAAEAAAAEAAAAjP///wgAAAAQAAAABAAAAGRhdGUAAAAADAAAAGVsYXN0aWM6dHlwZQAAAAAAAAAAgv///wAAAQAEAAAAZGF0ZQAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABMAAAAVAAAAAAAAwFUAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAYAAABkb3VibGUAAAwAAABlbGFzdGljOnR5cGUAAAAAAAAAAAAABgAIAAYABgAAAAAAAgAGAAAAYW1vdW50AAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAOABUAEAAEAAwAAABgAAAAAAAAAAAABAAQAAAAAAMKABgADAAIAAQACgAAABQAAABYAAAABQAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAQAAAAAAAAAIAAAAAAAAACgAAAAAAAAAMAAAAAAAAAABAAAAAAAAADgAAAAAAAAAKAAAAAAAAAAAAAAAAgAAAAUAAAAAAAAAAAAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAHwAAAAAAAAAAAACgmZkTQAAAAGBmZiBAAAAAAAAAL0AAAADAzMwjQAAAAMDMzCtAHwAAAAAAAADV6yywkgEAANWPBquSAQAA1TPgpZIBAADV17mgkgEAANV7k5uSAQAA/////wAAAAA='
test('toArrowReader', async t => {
const testRecords = [
{ amount: 4.900000095367432, },
{ amount: 8.199999809265137, },
{ amount: 15.5, },
{ amount: 9.899999618530273, },
{ amount: 13.899999618530273, },
]

// build reusable Arrow table
const table = arrow.tableFromJSON(testRecords)
const rawData = await arrow.RecordBatchStreamWriter.writeAll(table).toUint8Array()

t.test('Parses a binary response into an Arrow stream reader', async t => {
const MockConnection = connection.buildMockConnection({
onRequest (_params) {
return {
body: Buffer.from(binaryContent, 'base64'),
body: Buffer.from(rawData),
statusCode: 200,
headers: {
'content-type': 'application/vnd.elasticsearch+arrow+stream'
'content-type': 'application/vnd.elasticsearch+arrow+stream',
'transfer-encoding': 'chunked'
}
}
}
Expand All @@ -182,26 +193,28 @@ test('ES|QL helper', t => {
const result = await client.helpers.esql({ query: 'FROM sample_data' }).toArrowReader()
t.ok(result.isStream())

const recordBatch = result.next().value
t.same(recordBatch.get(0)?.toJSON(), {
amount: 4.900000095367432,
date: 1729532586965,
})
let count = 0
for await (const recordBatch of result) {
for (const record of recordBatch) {
t.same(record.toJSON(), testRecords[count])
count++
}
}

t.end()
})

t.test('ESQL helper uses correct x-elastic-client-meta helper value', async t => {
const binaryContent = '/////zABAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAIAAAB8AAAABAAAAJ7///8UAAAARAAAAEQAAAAAAAoBRAAAAAEAAAAEAAAAjP///wgAAAAQAAAABAAAAGRhdGUAAAAADAAAAGVsYXN0aWM6dHlwZQAAAAAAAAAAgv///wAAAQAEAAAAZGF0ZQAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABMAAAAVAAAAAAAAwFUAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAYAAABkb3VibGUAAAwAAABlbGFzdGljOnR5cGUAAAAAAAAAAAAABgAIAAYABgAAAAAAAgAGAAAAYW1vdW50AAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAOABUAEAAEAAwAAABgAAAAAAAAAAAABAAQAAAAAAMKABgADAAIAAQACgAAABQAAABYAAAABQAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAQAAAAAAAAAIAAAAAAAAACgAAAAAAAAAMAAAAAAAAAABAAAAAAAAADgAAAAAAAAAKAAAAAAAAAAAAAAAAgAAAAUAAAAAAAAAAAAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAHwAAAAAAAAAAAACgmZkTQAAAAGBmZiBAAAAAAAAAL0AAAADAzMwjQAAAAMDMzCtAHwAAAAAAAADV6yywkgEAANWPBquSAQAA1TPgpZIBAADV17mgkgEAANV7k5uSAQAA/////wAAAAA='

const MockConnection = connection.buildMockConnection({
onRequest (params) {
const header = params.headers?.['x-elastic-client-meta'] ?? ''
t.ok(header.includes('h=qa'), `Client meta header does not include ESQL helper value: ${header}`)
return {
body: Buffer.from(binaryContent, 'base64'),
body: Buffer.from(rawData),
statusCode: 200,
headers: {
'content-type': 'application/vnd.elasticsearch+arrow+stream'
'content-type': 'application/vnd.elasticsearch+arrow+stream',
'transfer-encoding': 'chunked'
}
}
}
Expand Down Expand Up @@ -240,10 +253,12 @@ test('ES|QL helper', t => {
new arrow.RecordBatch(schema, batch3.data),
])

const rawData = await arrow.RecordBatchStreamWriter.writeAll(table).toUint8Array()

const MockConnection = connection.buildMockConnection({
onRequest (_params) {
return {
body: Buffer.from(arrow.tableToIPC(table, "stream")),
body: Buffer.from(rawData),
statusCode: 200,
headers: {
'content-type': 'application/vnd.elasticsearch+arrow+stream'
Expand All @@ -261,7 +276,7 @@ test('ES|QL helper', t => {
t.ok(result.isStream())

let counter = 0
for (const batch of result) {
for await (const batch of result) {
for (const row of batch) {
counter++
const { id, val } = row.toJSON()
Expand Down