Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions js/ai/src/telemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import {
MetricCounter,
MetricHistogram,
} from '@genkit-ai/core/metrics';
import { spanMetadataAls } from '@genkit-ai/core/tracing';
import { spanMetadataAls, traceMetadataAls } from '@genkit-ai/core/tracing';
import { ValueType } from '@opentelemetry/api';
import { createHash } from 'crypto';
import { GenerateOptions } from './generate.js';
Expand Down Expand Up @@ -133,6 +133,7 @@ const generateActionOutputAudio = new MetricCounter(

type SharedDimensions = {
modelName?: string;
flowName?: string;
path?: string;
temperature?: number;
topK?: number;
Expand All @@ -154,6 +155,7 @@ export function recordGenerateActionMetrics(
topK: input.config?.topK,
topP: input.config?.topP,
maxOutputTokens: input.config?.maxOutputTokens,
flowName: traceMetadataAls?.getStore()?.flowName,
path: spanMetadataAls?.getStore()?.path,
latencyMs: opts.response?.latencyMs,
err: opts.err,
Expand All @@ -167,8 +169,9 @@ export function recordGenerateActionInputLogs(
options: GenerateOptions,
input: GenerateRequest
) {
const flowName = traceMetadataAls?.getStore()?.flowName;
const path = spanMetadataAls?.getStore()?.path;
const sharedMetadata = { model, path };
const sharedMetadata = { model, path, flowName };
logger.logStructured(`Config[${path}, ${model}]`, {
...sharedMetadata,
temperature: options.config?.temperature,
Expand Down Expand Up @@ -202,8 +205,9 @@ export function recordGenerateActionOutputLogs(
options: GenerateOptions,
output: GenerateResponseData
) {
const flowName = traceMetadataAls?.getStore()?.flowName;
const path = spanMetadataAls?.getStore()?.path;
const sharedMetadata = { model, path };
const sharedMetadata = { model, path, flowName };
const candidates = output.candidates.length;
output.candidates.forEach((cand, candIdx) => {
const parts = cand.message.content.length;
Expand Down Expand Up @@ -315,6 +319,7 @@ function doRecordGenerateActionMetrics(
modelName: string,
usage: GenerationUsage,
dimensions: {
flowName?: string;
path?: string;
temperature?: number;
maxOutputTokens?: number;
Expand All @@ -328,6 +333,7 @@ function doRecordGenerateActionMetrics(
) {
const shared: SharedDimensions = {
modelName: modelName,
flowName: dimensions.flowName,
path: dimensions.path,
temperature: dimensions.temperature,
topK: dimensions.topK,
Expand Down
8 changes: 8 additions & 0 deletions js/core/src/telemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ import {
MetricCounter,
MetricHistogram,
} from './metrics.js';
import {
spanMetadataAls,
traceMetadataAls,
} from './tracing/instrumentation.js';

/**
* Wraps the declared metrics in a Genkit-specific, internal namespace.
Expand All @@ -41,6 +45,8 @@ const actionLatencies = new MetricHistogram(_N('latency'), {
export function writeActionSuccess(actionName: string, latencyMs: number) {
const dimensions = {
name: actionName,
flowName: traceMetadataAls?.getStore()?.flowName,
path: spanMetadataAls?.getStore()?.path,
source: 'ts',
sourceVersion: GENKIT_VERSION,
};
Expand All @@ -55,6 +61,8 @@ export function writeActionFailure(
) {
const dimensions = {
name: actionName,
flowName: traceMetadataAls?.getStore()?.flowName,
path: spanMetadataAls?.getStore()?.path,
source: 'ts',
sourceVersion: GENKIT_VERSION,
error: err?.name,
Expand Down
3 changes: 3 additions & 0 deletions js/core/src/tracing/instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ export async function newTrace<T>(
paths: new Set<PathMetadata>(),
timestamp: performance.now(),
};
if (opts.labels && opts.labels[SPAN_TYPE_ATTR] === 'flow') {
traceMetadata.flowName = opts.name;
}
return await traceMetadataAls.run(traceMetadata, () =>
runInNewSpan(
{
Expand Down
1 change: 1 addition & 0 deletions js/core/src/tracing/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ export const PathMetadataSchema = z.object({
export type PathMetadata = z.infer<typeof PathMetadataSchema>;

export const TraceMetadataSchema = z.object({
flowName: z.string().optional(),
paths: z.set(PathMetadataSchema).optional(),
timestamp: z.number(),
});
Expand Down
21 changes: 16 additions & 5 deletions js/flow/src/telemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ export function writeFlowSuccess(flowName: string, latencyMs: number) {

const paths = traceMetadataAls.getStore()?.paths || new Set<PathMetadata>();
if (paths) {
const pathDimensions = {
flowName: flowName,
source: 'ts',
sourceVersion: GENKIT_VERSION,
};
const relevantPaths = Array.from(paths).filter((meta) =>
meta.path.includes(flowName)
);
Expand All @@ -90,13 +95,13 @@ export function writeFlowSuccess(flowName: string, latencyMs: number) {

relevantPaths.forEach((p) => {
pathCounter.add(1, {
...dimensions,
...pathDimensions,
success: 'success',
path: p.path,
});

pathLatencies.record(p.latency, {
...dimensions,
...pathDimensions,
path: p.path,
});
});
Expand Down Expand Up @@ -130,23 +135,29 @@ export function writeFlowFailure(
paths: relevantPaths.map((p) => p.path),
});

const pathDimensions = {
flowName: flowName,
source: 'ts',
sourceVersion: GENKIT_VERSION,
};
// All paths that have succeeded need to be tracked as succeeded.
relevantPaths.forEach((p) => {
pathCounter.add(1, {
flowName: flowName,
...pathDimensions,
success: 'success',
path: p.path,
});

pathLatencies.record(p.latency, {
...dimensions,
...pathDimensions,
path: p.path,
});
});

pathCounter.add(1, {
flowName: flowName,
...pathDimensions,
success: 'failure',
error: err.name,
path: failPath,
});
}
Expand Down
68 changes: 66 additions & 2 deletions js/plugins/google-cloud/tests/metrics_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,71 @@ describe('GoogleCloudMetrics', () => {
assert.ok(requestCounter.attributes.sourceVersion);
});

it('writes flow label to action metrics when running inside flow', async () => {
const testAction = createAction('testAction');
const flow = createFlow('flowNameLabelTestFlow', async () => {
return await runAction(testAction);
});

await runFlow(flow);

const requestCounter = await getCounterMetric('genkit/action/requests');
const latencyHistogram = await getHistogramMetric('genkit/action/latency');
assert.equal(requestCounter.attributes.flowName, 'flowNameLabelTestFlow');
assert.equal(latencyHistogram.attributes.flowName, 'flowNameLabelTestFlow');
});

it('writes flow label to generate metrics when running inside flow', async () => {
const testModel = createModel('testModel', async () => {
return {
candidates: [
{
index: 0,
finishReason: 'stop',
message: {
role: 'user',
content: [
{
text: 'response',
},
],
},
},
],
usage: {
inputTokens: 10,
outputTokens: 14,
inputCharacters: 8,
outputCharacters: 16,
inputImages: 1,
outputImages: 3,
},
};
});
const flow = createFlow('testFlow', async () => {
return await generate({
model: testModel,
prompt: 'test prompt',
});
});

await runFlow(flow);

const metrics = [
await getCounterMetric('genkit/ai/generate/requests'),
await getCounterMetric('genkit/ai/generate/input/tokens'),
await getCounterMetric('genkit/ai/generate/output/tokens'),
await getCounterMetric('genkit/ai/generate/input/characters'),
await getCounterMetric('genkit/ai/generate/output/characters'),
await getCounterMetric('genkit/ai/generate/input/images'),
await getCounterMetric('genkit/ai/generate/output/images'),
await getHistogramMetric('genkit/ai/generate/latency'),
];
for (metric of metrics) {
assert.equal(metric.attributes.flowName, 'testFlow');
}
});

it('writes flow paths metrics', async () => {
const flow = createFlow('pathTestFlow', async () => {
const step1Result = await run('step1', async () => {
Expand All @@ -308,15 +373,14 @@ describe('GoogleCloudMetrics', () => {
]);
const pathCounterPoints = await getCounterDataPoints(
'genkit/flow/path/requests'
).then((points) =>
points.filter((point) => point.attributes.name === 'pathTestFlow')
);
const paths = new Set(
pathCounterPoints.map((point) => point.attributes.path)
);
assert.deepEqual(paths, expectedPaths);
pathCounterPoints.forEach((point) => {
assert.equal(point.value, 1);
assert.equal(point.attributes.flowName, 'pathTestFlow');
assert.equal(point.attributes.source, 'ts');
assert.equal(point.attributes.success, 'success');
assert.ok(point.attributes.sourceVersion);
Expand Down