Skip to content

Conversation

@bhshkh
Copy link
Contributor

@bhshkh bhshkh commented Oct 31, 2025

Changes in this PR in the order they appear in "Files changed" tab:

  1. Move PipelineStages integration tests to separate file.
  2. Add ExecuteOptions similar to query run options which was introduced in
    feat(firestore): Query profiling #10164.
    Also add ConsistencySelector
  3. Add raw stage similar to Java
    https://github.com/googleapis/java-firestore/blob/742fab6583c9a6f9c47cf0496124c3c9b05fe0ee/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Pipeline.java#L997-L1021
  4. Remove IsNaN, IsNotNaN, IsNull, IsNotNull, Equivalent as they are no longer supported by backend
  5. Remove examples as commented here
    feat(firestore): [PQ] add all array, string and vector functions #13245 (comment)
  6. ExplainStats - part of point 2 above. GetRawData and GetText implementation similar to Java
    https://github.com/googleapis/java-firestore/blob/742fab6583c9a6f9c47cf0496124c3c9b05fe0ee/google-cloud-firestore/src/main/java/com/google/cloud/firestore/ExplainStats.java#L42-L71
  7. Add retries to Execute until b/456536006 is fixed.

Previous pull requests

@bhshkh
Copy link
Contributor Author

bhshkh commented Oct 31, 2025

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces significant enhancements to Firestore pipeline queries, including consistency controls (transactions, read options), execution options (explain, index hints), and a Raw stage for extensibility. The changes are well-structured, and the addition of comprehensive integration tests is commendable.

My review focuses on a few key areas. I've identified a critical bug in how request context headers are constructed, which leads to complex and brittle retry logic. I've also noted a regression in observability due to the removal of tracing. Additionally, there are a few suggestions for code cleanup and improving consistency to enhance maintainability.

Comment on lines 282 to 283
ctx := withRequestParamsHeader(it.ctx, reqParamsHeaderVal(client.path()))
ctx = withResourceHeader(it.ctx, client.path())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There is a bug here. The context from withRequestParamsHeader on line 282 is being overwritten on line 283 because it.ctx is used again instead of the updated ctx. This is likely the cause of the InvalidArgument error that the new retry logic in this function is trying to solve. After fixing this context chaining, the special-cased retry for InvalidArgument should no longer be necessary and can be removed.

Suggested change
ctx := withRequestParamsHeader(it.ctx, reqParamsHeaderVal(client.path()))
ctx = withResourceHeader(it.ctx, client.path())
ctx := withRequestParamsHeader(it.ctx, reqParamsHeaderVal(client.path()))
ctx = withResourceHeader(ctx, client.path())
Comment on lines 272 to 345
func (it *streamPipelineResultIterator) next() (_ *PipelineResult, err error) {
client := it.p.c

// streamClient is initialized on first next call
if it.streamClient == nil {
it.ctx = trace.StartSpan(it.ctx, "cloud.google.com/go/firestore.ExecutePipeline")
defer func() {
if errors.Is(err, iterator.Done) {
trace.EndSpan(it.ctx, nil)
} else {
trace.EndSpan(it.ctx, err)
for i := 0; i < maxExecuteRecvAttempts; i++ {
if it.streamClient == nil {
req, reqErr := it.p.toExecutePipelineRequest(it.execSettings)
if reqErr != nil {
return nil, reqErr
}
}()
req, err := it.p.toExecutePipelineRequest()
if err != nil {
return nil, err
}

ctx := withRequestParamsHeader(it.ctx, reqParamsHeaderVal(client.path()))

it.streamClient, err = client.c.ExecutePipeline(ctx, req)
if err != nil {
return nil, err
}
}
ctx := withRequestParamsHeader(it.ctx, reqParamsHeaderVal(client.path()))
ctx = withResourceHeader(it.ctx, client.path())

// If the current response is nil or all its results have been processed,
// receive the next response from the stream.
if it.currResp == nil || it.currRespResultsIdx >= len(it.currResp.GetResults()) {
var res *pb.ExecutePipelineResponse
for {
res, err = it.streamClient.Recv()
if err == io.EOF {
return nil, iterator.Done
}
it.streamClient, err = client.c.ExecutePipeline(ctx, req)
if err != nil {
return nil, err
}
if res.GetResults() != nil {
it.currResp = res
it.currRespResultsIdx = 0
break
}

var shouldRetry bool
if it.currResp == nil || it.currRespResultsIdx >= len(it.currResp.GetResults()) {
var res *pb.ExecutePipelineResponse
for {
res, err = it.streamClient.Recv()
if err != nil {
st, ok := status.FromError(err)
if ok && st.Code() == codes.InvalidArgument &&
strings.Contains(err.Error(), "Please fill in the request header with format x-goog-request-params:project_id=") {

it.streamClient = nil
shouldRetry = true
break
}
}
if err == io.EOF {
return nil, iterator.Done
}
if err != nil {
return nil, err
}
if res.GetResults() != nil {
it.currResp = res
it.currRespResultsIdx = 0
it.statsPb = res.GetExplainStats()
break
}
// No results => partial progress; keep receiving
}
// No results => partial progress; keep receiving
// TODO: Set ExplainStats
}
}
if shouldRetry {
continue
}

// Get the next document proto from the current response.
docProto := it.currResp.GetResults()[it.currRespResultsIdx]
it.currRespResultsIdx++
// Get the next document proto from the current response.
docProto := it.currResp.GetResults()[it.currRespResultsIdx]
it.currRespResultsIdx++

var docRef *DocumentRef
if len(docProto.GetName()) != 0 {
var pathErr error
docRef, pathErr = pathToDoc(docProto.GetName(), client)
if pathErr != nil {
return nil, pathErr
var docRef *DocumentRef
if len(docProto.GetName()) != 0 {
var pathErr error
docRef, pathErr = pathToDoc(docProto.GetName(), client)
if pathErr != nil {
return nil, pathErr
}
}
}

pr, err := newPipelineResult(docRef, docProto, client, it.currResp.GetExecutionTime())
if err != nil {
return nil, err
pr, err := newPipelineResult(docRef, docProto, client, it.currResp.GetExecutionTime())
if err != nil {
return nil, err
}
return pr, nil
}
return pr, nil
return nil, err
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The tracing span that was present in the previous version of this function has been removed. This is a regression in observability for pipeline query execution. Please re-introduce the tracing, ensuring it correctly wraps the ExecutePipeline RPC call.

},
}
copts := append(ti.CallOptions(), option.WithTokenSource(ts))
copts := append(ti.CallOptions()) //, option.WithTokenSource(ts))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This line contains commented-out code. If this is a temporary change for debugging, please revert it or clean it up before merging.

Suggested change
copts := append(ti.CallOptions()) //, option.WithTokenSource(ts))
copts := append(ti.CallOptions())
Comment on lines +104 to 114
// WithIndexMode sets the index mode for the pipeline execution.
func WithIndexMode(indexMode string) ExecuteOption {
return newFuncExecuteOption(func(eo *executeSettings) {
eo.IndexMode = indexMode
})
}

// Execute executes the pipeline and returns an iterator for streaming the results.
// TODO: Accept PipelineOptions
func (p *Pipeline) Execute(ctx context.Context) *PipelineResultIterator {
func (p *Pipeline) Execute(ctx context.Context, opts ...ExecuteOption) *PipelineResultIterator {
ctx = withResourceHeader(ctx, p.c.path())
ctx = withRequestParamsHeader(ctx, reqParamsHeaderVal(p.c.path()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For consistency with other parts of the code (e.g., newReplaceStage), you could inline the creation of ancestor and collectionID pb.Values into the Args slice literal. This would make the code more concise.

optionsPb, err := optionsToProto(s.options)
if err != nil {
	return nil, err
}
return &pb.Pipeline_Stage{
	Name: s.name(),
	Args: []*pb.Value{
		{ValueType: &pb.Value_ReferenceValue{ReferenceValue: s.ancestor}},
		{ValueType: &pb.Value_StringValue{StringValue: s.collectionID}},
	},
	Options: optionsPb,
}, nil
Comment on lines 1139 to 1145
func isRetryablePipelineExecuteErr(err error) bool {
return false
}
func isRetryablePipelineExecuteErr1(err error) bool {
if err == nil {
return false
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The function isRetryablePipelineExecuteErr is now stubbed to always return false, and a new function isRetryablePipelineExecuteErr1 has been added which seems to contain the old logic. Neither of these functions appear to be used anymore. This looks like leftover code from refactoring and should be removed to keep the codebase clean.

Comment on lines 622 to 623
field := typ.Field(i)
fieldValue := v.Field(i)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This loop iterates over all fields of the options struct, including unexported ones. While it currently works for collectionSettings and collectionGroupSettings, it's fragile. If a struct with unexported fields is used as options in the future, this could lead to unexpected behavior. It's safer to only process exported fields. Consider adding a check like if !field.IsExported() { continue } after getting the field.

@bhshkh bhshkh changed the title Feat/fspq consistency raw Oct 31, 2025
@bhshkh bhshkh changed the title feat(firestore): Oct 31, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

1 participant