Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion js/core/src/tracing/instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import { AsyncLocalStorage } from 'node:async_hooks';
import { SpanMetadata } from './types.js';

export const spanMetadataAls = new AsyncLocalStorage<SpanMetadata>();
export const pathVariants = new Set<string>();

export const ATTR_PREFIX = 'genkit';
export const SPAN_TYPE_ATTR = ATTR_PREFIX + ':type';
Expand Down Expand Up @@ -77,13 +78,26 @@ export async function runInNewSpan<T>(
if (opts.labels) otSpan.setAttributes(opts.labels);
try {
const parentPath = parentStep?.path || '';
opts.metadata.path = parentPath + '/' + opts.metadata.name;
const stepType =
opts.labels && opts.labels['genkit:type']
? `,t:${opts.labels['genkit:type']}`
: '';
opts.metadata.path = parentPath + `/{${opts.metadata.name}${stepType}}`;

const pathVariantCount = pathVariants.size;
const output = await spanMetadataAls.run(opts.metadata, () =>
fn(opts.metadata, otSpan, isInRoot)
);
if (opts.metadata.state !== 'error') {
opts.metadata.state = 'success';
}

opts.metadata.path = decoratePathWithSubtype(opts.metadata);

if (pathVariantCount == pathVariants.size) {
pathVariants.add(opts.metadata.path);
}

return output;
} catch (e) {
opts.metadata.state = 'error';
Expand Down Expand Up @@ -167,3 +181,23 @@ function getCurrentSpan(): SpanMetadata {
}
return step;
}

function decoratePathWithSubtype(metadata: SpanMetadata): string {
if (!metadata.path) {
return '';
}

const pathComponents = metadata.path.split('}/{');

if (pathComponents.length == 1) {
return metadata.path;
}

const stepSubtype =
metadata.metadata && metadata.metadata['subtype']
? `,s:${metadata.metadata['subtype']}`
: '';
const root = `${pathComponents.slice(0, -1).join('}/{')}}/`;
const decoratedStep = `{${pathComponents.at(-1)?.slice(0, -1)}${stepSubtype}}`;
return root + decoratedStep;
}
4 changes: 4 additions & 0 deletions js/flow/src/flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ import {
runWithActiveContext,
} from './utils.js';

import { pathVariants } from '@genkit-ai/core/tracing';

const streamDelimiter = '\n';

const CREATED_FLOWS = 'genkit__CREATED_FLOWS';
Expand Down Expand Up @@ -449,6 +451,7 @@ export class Flow<
setCustomMetadataAttribute(metadataPrefix('state'), 'done');
telemetry.writeFlowSuccess(
ctx.flow.name,
pathVariants,
performance.now() - startTimeMs
);
return output;
Expand Down Expand Up @@ -479,6 +482,7 @@ export class Flow<
telemetry.recordError(e);
telemetry.writeFlowFailure(
ctx.flow.name,
pathVariants,
performance.now() - startTimeMs,
e
);
Expand Down
49 changes: 48 additions & 1 deletion js/flow/src/telemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ const flowCounter = new MetricCounter(_N('requests'), {
valueType: ValueType.INT,
});

const variantCounter = new MetricCounter(_N('variants'), {
description: 'Tracks unique flow variants per flow.',
valueType: ValueType.INT,
});

const flowLatencies = new MetricHistogram(_N('latency'), {
description: 'Latencies when calling Genkit flows.',
valueType: ValueType.DOUBLE,
Expand All @@ -53,18 +58,40 @@ export function recordError(err: any) {
});
}

export function writeFlowSuccess(flowName: string, latencyMs: number) {
export function writeFlowSuccess(
flowName: string,
variants: Set<string>,
latencyMs: number
) {
const dimensions = {
name: flowName,
source: 'ts',
sourceVersion: GENKIT_VERSION,
};
flowCounter.add(1, dimensions);
flowLatencies.record(latencyMs, dimensions);

const relevantVariants = Array.from(variants).filter((variant) =>
variant.includes(flowName)
);

logger.logStructured(`Variants[/${flowName}]`, {
flowName: flowName,
variants: relevantVariants,
});

relevantVariants.forEach((variant) =>
variantCounter.add(1, {
...dimensions,
success: 'success',
variant,
})
);
}

export function writeFlowFailure(
flowName: string,
variants: Set<string>,
latencyMs: number,
err: any
) {
Expand All @@ -77,6 +104,26 @@ export function writeFlowFailure(
};
flowCounter.add(1, dimensions);
flowLatencies.record(latencyMs, dimensions);

const path = spanMetadataAls?.getStore()?.path;
const relevantVariants = Array.from(variants).filter(
(variant) => variant.includes(flowName) && variant !== path
);

// All variants that have succeeded need to be tracked as succeeded.
relevantVariants.forEach((variant) =>
variantCounter.add(1, {
flowName: flowName,
success: 'success',
variant: variant,
})
);

variantCounter.add(1, {
flowName: flowName,
success: 'failure',
variant: path,
});
}

export function logRequest(flowName: string, req: express.Request) {
Expand Down