Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/core/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"time"

"github.com/firebase/genkit/go/internal"
"github.com/firebase/genkit/go/internal/tracing"
"github.com/firebase/genkit/go/core/tracing"
"github.com/invopop/jsonschema"
)

Expand Down
32 changes: 15 additions & 17 deletions go/core/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
"sync"
"time"

"github.com/firebase/genkit/go/core/tracing"
"github.com/firebase/genkit/go/gtime"
"github.com/firebase/genkit/go/internal"
"github.com/firebase/genkit/go/internal/tracing"
"github.com/google/uuid"
otrace "go.opentelemetry.io/otel/trace"
)
Expand Down Expand Up @@ -251,7 +251,7 @@ func (f *Flow[I, O, S]) action() *Action[*flowInstruction[I], *flowState[I, O],
"outputSchema": inferJSONSchema(o),
}
cback := func(ctx context.Context, inst *flowInstruction[I], cb func(context.Context, S) error) (*flowState[I, O], error) {
tracing.SpanMetaKey.FromContext(ctx).SetAttr("flow:wrapperAction", "true")
tracing.SetCustomMetadataAttr(ctx, "flow:wrapperAction", "true")
return f.runInstruction(ctx, inst, streamingCallback[S](cb))
}
return NewStreamingAction(f.name, ActionTypeFlow, metadata, cback)
Expand Down Expand Up @@ -359,12 +359,11 @@ func (f *Flow[I, O, S]) execute(ctx context.Context, state *flowState[I, O], dis
// TODO(jba): retrieve the JSON-marshaled SpanContext from state.traceContext.
// TODO(jba): add a span link to the context.
output, err := tracing.RunInNewSpan(ctx, fctx.tracingState(), f.name, "flow", true, state.Input, func(ctx context.Context, input I) (O, error) {
spanMeta := tracing.SpanMetaKey.FromContext(ctx)
spanMeta.SetAttr("flow:execution", strconv.Itoa(len(state.Executions)-1))
tracing.SetCustomMetadataAttr(ctx, "flow:execution", strconv.Itoa(len(state.Executions)-1))
// TODO(jba): put labels into span metadata.
spanMeta.SetAttr("flow:name", f.name)
spanMeta.SetAttr("flow:id", state.FlowID)
spanMeta.SetAttr("flow:dispatchType", dispatchType)
tracing.SetCustomMetadataAttr(ctx, "flow:name", f.name)
tracing.SetCustomMetadataAttr(ctx, "flow:id", state.FlowID)
tracing.SetCustomMetadataAttr(ctx, "flow:dispatchType", dispatchType)
rootSpanContext := otrace.SpanContextFromContext(ctx)
traceID := rootSpanContext.TraceID().String()
exec.TraceIDs = append(exec.TraceIDs, traceID)
Expand All @@ -376,15 +375,15 @@ func (f *Flow[I, O, S]) execute(ctx context.Context, state *flowState[I, O], dis
if err != nil {
// TODO(jba): handle InterruptError
internal.Logger(ctx).Error("flow failed",
"path", spanMeta.Path,
"path", tracing.SpanPath(ctx),
"err", err.Error(),
)
writeFlowFailure(ctx, f.name, latency, err)
spanMeta.SetAttr("flow:state", "error")
tracing.SetCustomMetadataAttr(ctx, "flow:state", "error")
} else {
internal.Logger(ctx).Info("flow succeeded", "path", spanMeta.Path)
internal.Logger(ctx).Info("flow succeeded", "path", tracing.SpanPath(ctx))
writeFlowSuccess(ctx, f.name, latency)
spanMeta.SetAttr("flow:state", "done")
tracing.SetCustomMetadataAttr(ctx, "flow:state", "done")

}
// TODO(jba): telemetry
Expand Down Expand Up @@ -485,10 +484,9 @@ func Run[T any](ctx context.Context, name string, f func() (T, error)) (T, error
// as in the js.
return tracing.RunInNewSpan(ctx, fc.tracingState(), name, "flowStep", false, 0, func(ctx context.Context, _ int) (T, error) {
uName := fc.uniqueStepName(name)
spanMeta := tracing.SpanMetaKey.FromContext(ctx)
spanMeta.SetAttr("flow:stepType", "run")
spanMeta.SetAttr("flow:stepName", name)
spanMeta.SetAttr("flow:resolvedStepName", uName)
tracing.SetCustomMetadataAttr(ctx, "flow:stepType", "run")
tracing.SetCustomMetadataAttr(ctx, "flow:stepName", name)
tracing.SetCustomMetadataAttr(ctx, "flow:resolvedStepName", uName)
// Memoize the function call, using the cache in the flowState.
// The locking here prevents corruption of the cache from concurrent access, but doesn't
// prevent two goroutines racing to check the cache and call f. However, that shouldn't
Expand All @@ -503,7 +501,7 @@ func Run[T any](ctx context.Context, name string, f func() (T, error)) (T, error
if err := json.Unmarshal(j, &t); err != nil {
return internal.Zero[T](), err
}
spanMeta.SetAttr("flow:state", "cached")
tracing.SetCustomMetadataAttr(ctx, "flow:state", "cached")
return t, nil
}
t, err := f()
Expand All @@ -517,7 +515,7 @@ func Run[T any](ctx context.Context, name string, f func() (T, error)) (T, error
fs.lock()
fs.cache()[uName] = json.RawMessage(bytes)
fs.unlock()
spanMeta.SetAttr("flow:state", "run")
tracing.SetCustomMetadataAttr(ctx, "flow:state", "run")
return t, nil
})
}
Expand Down
18 changes: 16 additions & 2 deletions go/core/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@ package core

import (
"context"
"crypto/md5"
"fmt"
"log"
"log/slog"
"os"
"path/filepath"
"slices"
"sync"

"github.com/firebase/genkit/go/internal/tracing"
"github.com/firebase/genkit/go/core/tracing"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"golang.org/x/exp/maps"
)
Expand Down Expand Up @@ -58,7 +61,7 @@ func newRegistry() (*registry, error) {
actions: map[string]action{},
traceStores: map[Environment]tracing.Store{},
}
tstore, err := tracing.NewDevStore()
tstore, err := newDevStore()
if err != nil {
return nil, err
}
Expand All @@ -68,6 +71,17 @@ func newRegistry() (*registry, error) {
return r, nil
}

func newDevStore() (tracing.Store, error) {
programName := filepath.Base(os.Args[0])
rootHash := fmt.Sprintf("%02x", md5.Sum([]byte(programName)))
dir := filepath.Join(os.TempDir(), ".genkit", rootHash, "traces")
if err := os.MkdirAll(dir, 0o755); err != nil {
return nil, err
}
// Don't remove the temp directory, for post-mortem debugging.
return tracing.NewFileStore(dir)
}

// An Environment is the execution context in which the program is running.
type Environment string

Expand Down
2 changes: 1 addition & 1 deletion go/core/servers.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
"time"

"github.com/firebase/genkit/go/internal"
"github.com/firebase/genkit/go/internal/tracing"
"github.com/firebase/genkit/go/core/tracing"
"go.opentelemetry.io/otel/trace"
)

Expand Down
2 changes: 1 addition & 1 deletion go/core/servers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"strings"
"testing"

"github.com/firebase/genkit/go/internal/tracing"
"github.com/firebase/genkit/go/core/tracing"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/invopop/jsonschema"
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
30 changes: 10 additions & 20 deletions go/internal/tracing/tracing.go → go/core/tracing/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ package tracing

import (
"context"
"crypto/md5"
"fmt"
"os"
"path/filepath"
"sync"

"github.com/firebase/genkit/go/internal"
Expand Down Expand Up @@ -74,17 +70,6 @@ func (ts *State) AddTraceStoreBatch(tstore Store) (shutdown func(context.Context
return ts.tp.Shutdown
}

func NewDevStore() (Store, error) {
programName := filepath.Base(os.Args[0])
rootHash := fmt.Sprintf("%02x", md5.Sum([]byte(programName)))
dir := filepath.Join(os.TempDir(), ".genkit", rootHash, "traces")
if err := os.MkdirAll(dir, 0o755); err != nil {
return nil, err
}
// Don't remove the temp directory, for post-mortem debugging.
return NewFileStore(dir)
}

// The rest of this file contains code translated from js/common/src/tracing/*.ts.

const (
Expand All @@ -111,7 +96,7 @@ func RunInNewSpan[I, O any](
Input: input,
IsRoot: isRoot,
}
parentSpanMeta := SpanMetaKey.FromContext(ctx)
parentSpanMeta := spanMetaKey.FromContext(ctx)
var parentPath string
if parentSpanMeta != nil {
parentPath = parentSpanMeta.Path
Expand All @@ -126,7 +111,7 @@ func RunInNewSpan[I, O any](
// At the end, copy some of the spanMetadata to the OpenTelemetry span.
defer func() { span.SetAttributes(sm.attributes()...) }()
// Add the spanMetadata to the context, so the function can access it.
ctx = SpanMetaKey.NewContext(ctx, sm)
ctx = spanMetaKey.NewContext(ctx, sm)
// Run the function.
output, err := f(ctx, input)

Expand Down Expand Up @@ -197,10 +182,15 @@ func (sm *spanMetadata) attributes() []attribute.KeyValue {
return kvs
}

// SpanMetaKey is for storing spanMetadatas in a context.
var SpanMetaKey = internal.NewContextKey[*spanMetadata]()
// spanMetaKey is for storing spanMetadatas in a context.
var spanMetaKey = internal.NewContextKey[*spanMetadata]()

// SetCustomMetadataAttr records a key in the current span metadata.
func SetCustomMetadataAttr(ctx context.Context, key, value string) {
SpanMetaKey.FromContext(ctx).SetAttr(key, value)
spanMetaKey.FromContext(ctx).SetAttr(key, value)
}

// SpanPath returns the path as recroding in the current span metadata.
func SpanPath(ctx context.Context) string {
return spanMetaKey.FromContext(ctx).Path
}
File renamed without changes.
2 changes: 1 addition & 1 deletion go/plugins/dotprompt/genkit.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

"github.com/firebase/genkit/go/ai"
"github.com/firebase/genkit/go/core"
"github.com/firebase/genkit/go/internal/tracing"
"github.com/firebase/genkit/go/core/tracing"
)

// ActionInput is the input type of a prompt action.
Expand Down