Skip to content

Commit 2ae424c

Browse files
authored
fix(reflection): correctly/consistently handle errors from reflection api (#3972)
1 parent 101cf3d commit 2ae424c

File tree

5 files changed

+104
-20
lines changed

5 files changed

+104
-20
lines changed

‎genkit-tools/common/src/manager/manager.ts‎

Lines changed: 74 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@ import EventEmitter from 'events';
2020
import * as fsSync from 'fs';
2121
import fs from 'fs/promises';
2222
import path from 'path';
23+
import { GenkitError } from '../types';
2324
import {
2425
RunActionResponseSchema,
2526
type Action,
2627
type RunActionResponse,
2728
} from '../types/action';
2829
import * as apis from '../types/apis';
29-
import type { GenkitError } from '../types/error';
3030
import type { TraceData } from '../types/trace';
3131
import { logger } from '../utils/logger';
3232
import {
@@ -228,7 +228,12 @@ export class RuntimeManager {
228228
responseType: 'stream',
229229
}
230230
)
231-
.catch(this.httpErrorHandler);
231+
.catch((err) =>
232+
this.handleStreamError(
233+
err,
234+
`Error running action key='${input.key}'.`
235+
)
236+
);
232237
let genkitVersion: string;
233238
if (response.headers['x-genkit-version']) {
234239
genkitVersion = response.headers['x-genkit-version'];
@@ -302,7 +307,10 @@ export class RuntimeManager {
302307
responseType: 'stream', // Use stream to get early headers
303308
})
304309
.catch((err) =>
305-
this.httpErrorHandler(err, `Error running action key='${input.key}'.`)
310+
this.handleStreamError(
311+
err,
312+
`Error running action key='${input.key}'.`
313+
)
306314
);
307315

308316
const traceId = response.headers['x-genkit-trace-id'];
@@ -735,15 +743,23 @@ export class RuntimeManager {
735743
/**
736744
* Handles an HTTP error.
737745
*/
738-
private httpErrorHandler(error: AxiosError, message?: string): any {
746+
private httpErrorHandler(error: AxiosError, message?: string): never {
739747
const newError = new GenkitToolsError(message || 'Internal Error');
740748

741749
if (error.response) {
742-
if ((error.response?.data as any).message) {
743-
newError.message = (error.response?.data as any).message;
744-
}
745750
// we got a non-200 response; copy the payload and rethrow
746751
newError.data = error.response.data as GenkitError;
752+
newError.stack = (error.response?.data as any).message;
753+
if ((error.response?.data as any).message) {
754+
newError.data.data = {
755+
...newError.data.data,
756+
genkitErrorMessage: message,
757+
genkitErrorDetails: {
758+
stack: (error.response?.data as any).message,
759+
traceId: (error.response?.data as any).traceId,
760+
},
761+
};
762+
}
747763
throw newError;
748764
}
749765

@@ -753,6 +769,57 @@ export class RuntimeManager {
753769
});
754770
}
755771

772+
/**
773+
* Handles a stream error by reading the stream and then calling httpErrorHandler.
774+
*/
775+
private async handleStreamError(
776+
error: AxiosError,
777+
message: string
778+
): Promise<never> {
779+
if (
780+
error.response &&
781+
error.config?.responseType === 'stream' &&
782+
(error.response.data as any).on
783+
) {
784+
try {
785+
const body = await this.streamToString(error.response.data);
786+
try {
787+
error.response.data = JSON.parse(body);
788+
} catch (e) {
789+
error.response.data = {
790+
message: body || 'Unknown error',
791+
};
792+
}
793+
} catch (e) {
794+
// If stream reading fails, we must replace the stream object with a safe error object
795+
// to prevent circular structure errors during JSON serialization.
796+
error.response.data = {
797+
message: 'Failed to read error response stream',
798+
details: String(e),
799+
};
800+
}
801+
}
802+
this.httpErrorHandler(error, message);
803+
}
804+
805+
/**
806+
* Helper to convert a stream to string.
807+
*/
808+
private streamToString(stream: any): Promise<string> {
809+
return new Promise((resolve, reject) => {
810+
let buffer = '';
811+
stream.on('data', (chunk: Buffer) => {
812+
buffer += chunk.toString();
813+
});
814+
stream.on('end', () => {
815+
resolve(buffer);
816+
});
817+
stream.on('error', (err: Error) => {
818+
reject(err);
819+
});
820+
});
821+
}
822+
756823
/**
757824
* Performs health checks on all runtimes.
758825
*/

‎go/genkit/reflection.go‎

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -467,7 +467,15 @@ func handleRunAction(g *Genkit, activeActions *activeActionsMap) func(w http.Res
467467
refErr.Details.TraceID = &resp.Telemetry.TraceID
468468
}
469469

470-
json.NewEncoder(w).Encode(errorResponse{Error: refErr})
470+
reflectErr, err := json.Marshal(refErr)
471+
if err != nil {
472+
logger.FromContext(ctx).Error("writing output", "err", err)
473+
return nil
474+
}
475+
_, err = fmt.Fprintf(w, "{\"error\": %s }", reflectErr)
476+
if err != nil {
477+
return err
478+
}
471479
return nil
472480
}
473481

@@ -477,10 +485,17 @@ func handleRunAction(g *Genkit, activeActions *activeActionsMap) func(w http.Res
477485
errorResponse.Details.TraceID = &resp.Telemetry.TraceID
478486
}
479487

480-
if !headersSent {
481-
w.WriteHeader(errorResponse.Code)
488+
reflectErr, err := json.Marshal(errorResponse)
489+
if err != nil {
490+
logger.FromContext(ctx).Error("writing output", "err", err)
491+
return nil
482492
}
483-
return writeJSON(ctx, w, errorResponse)
493+
494+
_, err = fmt.Fprintf(w, "{\"error\": %s }", reflectErr)
495+
if err != nil {
496+
return err
497+
}
498+
return nil
484499
}
485500

486501
// Success case
@@ -491,7 +506,13 @@ func handleRunAction(g *Genkit, activeActions *activeActionsMap) func(w http.Res
491506
Result: resp.Result,
492507
Telemetry: telemetry{TraceID: resp.Telemetry.TraceID},
493508
}
494-
json.NewEncoder(w).Encode(finalResponse)
509+
data, err := json.Marshal(finalResponse)
510+
if err != nil {
511+
logger.FromContext(ctx).Error("writing output", "err", err)
512+
return nil
513+
}
514+
515+
w.Write(data)
495516
} else {
496517
// For non-streaming, headers were already sent via telemetry callback
497518
// Response already includes telemetry.traceId in body

‎go/genkit/reflection_test.go‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,12 +180,12 @@ func TestServeMux(t *testing.T) {
180180
{
181181
name: "invalid action key",
182182
body: `{"key": "/custom/test/invalid", "input": 3}`,
183-
wantStatus: http.StatusNotFound,
183+
wantStatus: http.StatusOK,
184184
},
185185
{
186186
name: "invalid input type",
187187
body: `{"key": "/custom/test/inc", "input": "not a number"}`,
188-
wantStatus: http.StatusBadRequest,
188+
wantStatus: http.StatusOK,
189189
},
190190
}
191191

‎go/samples/basic-gemini/main.go‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func main() {
5050
},
5151
}),
5252
ai.WithStreaming(cb),
53-
ai.WithOutputSchemaName("joke"),
53+
ai.WithOutputSchemaName("Joke"),
5454
ai.WithPrompt(`Tell short jokes about %s`, input))
5555
if err != nil {
5656
return "", err

‎js/core/src/reflection.ts‎

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -379,11 +379,7 @@ export class ReflectionServer {
379379
};
380380

381381
// Headers may have been sent already (via onTraceStart), so check before setting status
382-
if (!res.headersSent) {
383-
res.status(500).json(errorResponse);
384-
} else {
385-
res.end(JSON.stringify(errorResponse));
386-
}
382+
res.status(200).end(JSON.stringify({ error: errorResponse }));
387383
});
388384

389385
this.port = await this.findPort();

0 commit comments

Comments
 (0)