Skip to content

Commit 710b937

Browse files
authored
Use async reader for parsing Apache Arrow responses (#2788)
1 parent 926b468 commit 710b937

File tree

3 files changed

+32
-42
lines changed

3 files changed

+32
-42
lines changed

‎docs/reference/client-helpers.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -619,15 +619,15 @@ Added in `v8.16.0`
619619

620620
ES|QL can return results in multiple binary formats, including [Apache Arrow](https://arrow.apache.org/)'s streaming format. Because it is a very efficient format to read, it can be valuable for performing high-performance in-memory analytics. And, because the response is streamed as batches of records, it can be used to produce aggregations and other calculations on larger-than-memory data sets.
621621

622-
`toArrowReader` returns a [`RecordBatchStreamReader`](https://arrow.apache.org/docs/js/classes/Arrow_dom.RecordBatchReader.md).
622+
`toArrowReader` returns an [`AsyncRecordBatchStreamReader`](https://github.com/apache/arrow/blob/520ae44272d491bbb52eb3c9b84864ed7088f11a/js/src/ipc/reader.ts#L216).
623623

624624
```ts
625625
const reader = await client.helpers
626626
.esql({ query: 'FROM sample_data' })
627627
.toArrowReader()
628628

629629
// print each record as JSON
630-
for (const recordBatch of reader) {
630+
for await (const recordBatch of reader) {
631631
for (const record of recordBatch) {
632632
console.log(record.toJSON())
633633
}

‎src/helpers.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import assert from 'node:assert'
1111
import * as timersPromises from 'node:timers/promises'
1212
import { Readable } from 'node:stream'
1313
import { errors, TransportResult, TransportRequestOptions, TransportRequestOptionsWithMeta } from '@elastic/transport'
14-
import { Table, TypeMap, tableFromIPC, RecordBatchStreamReader } from 'apache-arrow/Arrow.node'
14+
import { Table, TypeMap, tableFromIPC, AsyncRecordBatchStreamReader } from 'apache-arrow/Arrow.node'
1515
import Client from './client'
1616
import * as T from './api/types'
1717
import { Id } from './api/types'
@@ -135,7 +135,7 @@ export interface EsqlColumn {
135135
export interface EsqlHelper {
136136
toRecords: <TDocument>() => Promise<EsqlToRecords<TDocument>>
137137
toArrowTable: () => Promise<Table<TypeMap>>
138-
toArrowReader: () => Promise<RecordBatchStreamReader>
138+
toArrowReader: () => Promise<AsyncRecordBatchStreamReader>
139139
}
140140

141141
export interface EsqlToRecords<TDocument> {
@@ -1000,7 +1000,7 @@ export default class Helpers {
10001000
return tableFromIPC(response)
10011001
},
10021002

1003-
async toArrowReader (): Promise<RecordBatchStreamReader> {
1003+
async toArrowReader (): Promise<AsyncRecordBatchStreamReader> {
10041004
if (metaHeader !== null) {
10051005
reqOptions.headers = reqOptions.headers ?? {}
10061006
reqOptions.headers['x-elastic-client-meta'] = `${metaHeader as string},h=qa`
@@ -1009,9 +1009,9 @@ export default class Helpers {
10091009

10101010
params.format = 'arrow'
10111011

1012-
// @ts-expect-error the return type will be ArrayBuffer when the format is set to 'arrow'
1013-
const response: ArrayBuffer = await client.esql.query(params, reqOptions)
1014-
return RecordBatchStreamReader.from(response)
1012+
// @ts-expect-error response is a Readable when asStream is true
1013+
const response: Readable = await client.esql.query(params, reqOptions)
1014+
return await AsyncRecordBatchStreamReader.from(Readable.from(response))
10151015
}
10161016
}
10171017

‎test/unit/helpers/esql.test.ts

Lines changed: 24 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -182,17 +182,28 @@ test('ES|QL helper', t => {
182182
t.end()
183183
})
184184

185-
test('toArrowReader', t => {
186-
t.test('Parses a binary response into an Arrow stream reader', async t => {
187-
const binaryContent = '/////zABAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAIAAAB8AAAABAAAAJ7///8UAAAARAAAAEQAAAAAAAoBRAAAAAEAAAAEAAAAjP///wgAAAAQAAAABAAAAGRhdGUAAAAADAAAAGVsYXN0aWM6dHlwZQAAAAAAAAAAgv///wAAAQAEAAAAZGF0ZQAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABMAAAAVAAAAAAAAwFUAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAYAAABkb3VibGUAAAwAAABlbGFzdGljOnR5cGUAAAAAAAAAAAAABgAIAAYABgAAAAAAAgAGAAAAYW1vdW50AAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAOABUAEAAEAAwAAABgAAAAAAAAAAAABAAQAAAAAAMKABgADAAIAAQACgAAABQAAABYAAAABQAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAQAAAAAAAAAIAAAAAAAAACgAAAAAAAAAMAAAAAAAAAABAAAAAAAAADgAAAAAAAAAKAAAAAAAAAAAAAAAAgAAAAUAAAAAAAAAAAAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAHwAAAAAAAAAAAACgmZkTQAAAAGBmZiBAAAAAAAAAL0AAAADAzMwjQAAAAMDMzCtAHwAAAAAAAADV6yywkgEAANWPBquSAQAA1TPgpZIBAADV17mgkgEAANV7k5uSAQAA/////wAAAAA='
185+
test('toArrowReader', async t => {
186+
const testRecords = [
187+
{ amount: 4.900000095367432, },
188+
{ amount: 8.199999809265137, },
189+
{ amount: 15.5, },
190+
{ amount: 9.899999618530273, },
191+
{ amount: 13.899999618530273, },
192+
]
193+
194+
// build reusable Arrow table
195+
const table = arrow.tableFromJSON(testRecords)
196+
const rawData = await arrow.RecordBatchStreamWriter.writeAll(table).toUint8Array()
188197

198+
t.test('Parses a binary response into an Arrow stream reader', async t => {
189199
const MockConnection = connection.buildMockConnection({
190200
onRequest (_params) {
191201
return {
192-
body: Buffer.from(binaryContent, 'base64'),
202+
body: Buffer.from(rawData),
193203
statusCode: 200,
194204
headers: {
195-
'content-type': 'application/vnd.elasticsearch+arrow+stream'
205+
'content-type': 'application/vnd.elasticsearch+arrow+stream',
206+
'transfer-encoding': 'chunked'
196207
}
197208
}
198209
}
@@ -206,30 +217,8 @@ test('ES|QL helper', t => {
206217
const result = await client.helpers.esql({ query: 'FROM sample_data' }).toArrowReader()
207218
t.ok(result.isStream())
208219

209-
const testRecords = [
210-
{
211-
amount: 4.900000095367432,
212-
date: 1729532586965,
213-
},
214-
{
215-
amount: 8.199999809265137,
216-
date: 1729446186965,
217-
},
218-
{
219-
amount: 15.5,
220-
date: 1729359786965,
221-
},
222-
{
223-
amount: 9.899999618530273,
224-
date: 1729273386965,
225-
},
226-
{
227-
amount: 13.899999618530273,
228-
date: 1729186986965,
229-
},
230-
]
231220
let count = 0
232-
for (const recordBatch of result) {
221+
for await (const recordBatch of result) {
233222
for (const record of recordBatch) {
234223
t.same(record.toJSON(), testRecords[count])
235224
count++
@@ -240,17 +229,16 @@ test('ES|QL helper', t => {
240229
})
241230

242231
t.test('ESQL helper uses correct x-elastic-client-meta helper value', async t => {
243-
const binaryContent = '/////zABAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAIAAAB8AAAABAAAAJ7///8UAAAARAAAAEQAAAAAAAoBRAAAAAEAAAAEAAAAjP///wgAAAAQAAAABAAAAGRhdGUAAAAADAAAAGVsYXN0aWM6dHlwZQAAAAAAAAAAgv///wAAAQAEAAAAZGF0ZQAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABMAAAAVAAAAAAAAwFUAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAYAAABkb3VibGUAAAwAAABlbGFzdGljOnR5cGUAAAAAAAAAAAAABgAIAAYABgAAAAAAAgAGAAAAYW1vdW50AAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAOABUAEAAEAAwAAABgAAAAAAAAAAAABAAQAAAAAAMKABgADAAIAAQACgAAABQAAABYAAAABQAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAQAAAAAAAAAIAAAAAAAAACgAAAAAAAAAMAAAAAAAAAABAAAAAAAAADgAAAAAAAAAKAAAAAAAAAAAAAAAAgAAAAUAAAAAAAAAAAAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAHwAAAAAAAAAAAACgmZkTQAAAAGBmZiBAAAAAAAAAL0AAAADAzMwjQAAAAMDMzCtAHwAAAAAAAADV6yywkgEAANWPBquSAQAA1TPgpZIBAADV17mgkgEAANV7k5uSAQAA/////wAAAAA='
244-
245232
const MockConnection = connection.buildMockConnection({
246233
onRequest (params) {
247234
const header = params.headers?.['x-elastic-client-meta'] ?? ''
248235
t.ok(header.includes('h=qa'), `Client meta header does not include ESQL helper value: ${header}`)
249236
return {
250-
body: Buffer.from(binaryContent, 'base64'),
237+
body: Buffer.from(rawData),
251238
statusCode: 200,
252239
headers: {
253-
'content-type': 'application/vnd.elasticsearch+arrow+stream'
240+
'content-type': 'application/vnd.elasticsearch+arrow+stream',
241+
'transfer-encoding': 'chunked'
254242
}
255243
}
256244
}
@@ -289,10 +277,12 @@ test('ES|QL helper', t => {
289277
new arrow.RecordBatch(schema, batch3.data),
290278
])
291279

280+
const rawData = await arrow.RecordBatchStreamWriter.writeAll(table).toUint8Array()
281+
292282
const MockConnection = connection.buildMockConnection({
293283
onRequest (_params) {
294284
return {
295-
body: Buffer.from(arrow.tableToIPC(table, "stream")),
285+
body: Buffer.from(rawData),
296286
statusCode: 200,
297287
headers: {
298288
'content-type': 'application/vnd.elasticsearch+arrow+stream'
@@ -310,7 +300,7 @@ test('ES|QL helper', t => {
310300
t.ok(result.isStream())
311301

312302
let counter = 0
313-
for (const batch of result) {
303+
for await (const batch of result) {
314304
for (const row of batch) {
315305
counter++
316306
const { id, val } = row.toJSON()

0 commit comments

Comments
 (0)