-
Notifications
You must be signed in to change notification settings - Fork 1.5k
feat(firestore): [PQ] #13274
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: feature/fs-pipeline-queries
Are you sure you want to change the base?
feat(firestore): [PQ] #13274
Conversation
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces significant enhancements to Firestore pipeline queries, including consistency controls (transactions, read options), execution options (explain, index hints), and a Raw stage for extensibility. The changes are well-structured, and the addition of comprehensive integration tests is commendable.
My review focuses on a few key areas. I've identified a critical bug in how request context headers are constructed, which leads to complex and brittle retry logic. I've also noted a regression in observability due to the removal of tracing. Additionally, there are a few suggestions for code cleanup and improving consistency to enhance maintainability.
firestore/pipeline_result.go
Outdated
| ctx := withRequestParamsHeader(it.ctx, reqParamsHeaderVal(client.path())) | ||
| ctx = withResourceHeader(it.ctx, client.path()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a bug here. The context from withRequestParamsHeader on line 282 is being overwritten on line 283 because it.ctx is used again instead of the updated ctx. This is likely the cause of the InvalidArgument error that the new retry logic in this function is trying to solve. After fixing this context chaining, the special-cased retry for InvalidArgument should no longer be necessary and can be removed.
| ctx := withRequestParamsHeader(it.ctx, reqParamsHeaderVal(client.path())) | |
| ctx = withResourceHeader(it.ctx, client.path()) | |
| ctx := withRequestParamsHeader(it.ctx, reqParamsHeaderVal(client.path())) | |
| ctx = withResourceHeader(ctx, client.path()) |
| func (it *streamPipelineResultIterator) next() (_ *PipelineResult, err error) { | ||
| client := it.p.c | ||
|
|
||
| // streamClient is initialized on first next call | ||
| if it.streamClient == nil { | ||
| it.ctx = trace.StartSpan(it.ctx, "cloud.google.com/go/firestore.ExecutePipeline") | ||
| defer func() { | ||
| if errors.Is(err, iterator.Done) { | ||
| trace.EndSpan(it.ctx, nil) | ||
| } else { | ||
| trace.EndSpan(it.ctx, err) | ||
| for i := 0; i < maxExecuteRecvAttempts; i++ { | ||
| if it.streamClient == nil { | ||
| req, reqErr := it.p.toExecutePipelineRequest(it.execSettings) | ||
| if reqErr != nil { | ||
| return nil, reqErr | ||
| } | ||
| }() | ||
| req, err := it.p.toExecutePipelineRequest() | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| ctx := withRequestParamsHeader(it.ctx, reqParamsHeaderVal(client.path())) | ||
|
|
||
| it.streamClient, err = client.c.ExecutePipeline(ctx, req) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| } | ||
| ctx := withRequestParamsHeader(it.ctx, reqParamsHeaderVal(client.path())) | ||
| ctx = withResourceHeader(it.ctx, client.path()) | ||
|
|
||
| // If the current response is nil or all its results have been processed, | ||
| // receive the next response from the stream. | ||
| if it.currResp == nil || it.currRespResultsIdx >= len(it.currResp.GetResults()) { | ||
| var res *pb.ExecutePipelineResponse | ||
| for { | ||
| res, err = it.streamClient.Recv() | ||
| if err == io.EOF { | ||
| return nil, iterator.Done | ||
| } | ||
| it.streamClient, err = client.c.ExecutePipeline(ctx, req) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| if res.GetResults() != nil { | ||
| it.currResp = res | ||
| it.currRespResultsIdx = 0 | ||
| break | ||
| } | ||
|
|
||
| var shouldRetry bool | ||
| if it.currResp == nil || it.currRespResultsIdx >= len(it.currResp.GetResults()) { | ||
| var res *pb.ExecutePipelineResponse | ||
| for { | ||
| res, err = it.streamClient.Recv() | ||
| if err != nil { | ||
| st, ok := status.FromError(err) | ||
| if ok && st.Code() == codes.InvalidArgument && | ||
| strings.Contains(err.Error(), "Please fill in the request header with format x-goog-request-params:project_id=") { | ||
|
|
||
| it.streamClient = nil | ||
| shouldRetry = true | ||
| break | ||
| } | ||
| } | ||
| if err == io.EOF { | ||
| return nil, iterator.Done | ||
| } | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| if res.GetResults() != nil { | ||
| it.currResp = res | ||
| it.currRespResultsIdx = 0 | ||
| it.statsPb = res.GetExplainStats() | ||
| break | ||
| } | ||
| // No results => partial progress; keep receiving | ||
| } | ||
| // No results => partial progress; keep receiving | ||
| // TODO: Set ExplainStats | ||
| } | ||
| } | ||
| if shouldRetry { | ||
| continue | ||
| } | ||
|
|
||
| // Get the next document proto from the current response. | ||
| docProto := it.currResp.GetResults()[it.currRespResultsIdx] | ||
| it.currRespResultsIdx++ | ||
| // Get the next document proto from the current response. | ||
| docProto := it.currResp.GetResults()[it.currRespResultsIdx] | ||
| it.currRespResultsIdx++ | ||
|
|
||
| var docRef *DocumentRef | ||
| if len(docProto.GetName()) != 0 { | ||
| var pathErr error | ||
| docRef, pathErr = pathToDoc(docProto.GetName(), client) | ||
| if pathErr != nil { | ||
| return nil, pathErr | ||
| var docRef *DocumentRef | ||
| if len(docProto.GetName()) != 0 { | ||
| var pathErr error | ||
| docRef, pathErr = pathToDoc(docProto.GetName(), client) | ||
| if pathErr != nil { | ||
| return nil, pathErr | ||
| } | ||
| } | ||
| } | ||
|
|
||
| pr, err := newPipelineResult(docRef, docProto, client, it.currResp.GetExecutionTime()) | ||
| if err != nil { | ||
| return nil, err | ||
| pr, err := newPipelineResult(docRef, docProto, client, it.currResp.GetExecutionTime()) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| return pr, nil | ||
| } | ||
| return pr, nil | ||
| return nil, err | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
firestore/integration_test.go
Outdated
| }, | ||
| } | ||
| copts := append(ti.CallOptions(), option.WithTokenSource(ts)) | ||
| copts := append(ti.CallOptions()) //, option.WithTokenSource(ts)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // WithIndexMode sets the index mode for the pipeline execution. | ||
| func WithIndexMode(indexMode string) ExecuteOption { | ||
| return newFuncExecuteOption(func(eo *executeSettings) { | ||
| eo.IndexMode = indexMode | ||
| }) | ||
| } | ||
|
|
||
| // Execute executes the pipeline and returns an iterator for streaming the results. | ||
| // TODO: Accept PipelineOptions | ||
| func (p *Pipeline) Execute(ctx context.Context) *PipelineResultIterator { | ||
| func (p *Pipeline) Execute(ctx context.Context, opts ...ExecuteOption) *PipelineResultIterator { | ||
| ctx = withResourceHeader(ctx, p.c.path()) | ||
| ctx = withRequestParamsHeader(ctx, reqParamsHeaderVal(p.c.path())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For consistency with other parts of the code (e.g., newReplaceStage), you could inline the creation of ancestor and collectionID pb.Values into the Args slice literal. This would make the code more concise.
optionsPb, err := optionsToProto(s.options)
if err != nil {
return nil, err
}
return &pb.Pipeline_Stage{
Name: s.name(),
Args: []*pb.Value{
{ValueType: &pb.Value_ReferenceValue{ReferenceValue: s.ancestor}},
{ValueType: &pb.Value_StringValue{StringValue: s.collectionID}},
},
Options: optionsPb,
}, nil| func isRetryablePipelineExecuteErr(err error) bool { | ||
| return false | ||
| } | ||
| func isRetryablePipelineExecuteErr1(err error) bool { | ||
| if err == nil { | ||
| return false | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function isRetryablePipelineExecuteErr is now stubbed to always return false, and a new function isRetryablePipelineExecuteErr1 has been added which seems to contain the old logic. Neither of these functions appear to be used anymore. This looks like leftover code from refactoring and should be removed to keep the codebase clean.
firestore/pipeline_stage.go
Outdated
| field := typ.Field(i) | ||
| fieldValue := v.Field(i) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This loop iterates over all fields of the options struct, including unexported ones. While it currently works for collectionSettings and collectionGroupSettings, it's fragile. If a struct with unexported fields is used as options in the future, this could lead to unexpected behavior. It's safer to only process exported fields. Consider adding a check like if !field.IsExported() { continue } after getting the field.
Changes in this PR in the order they appear in "Files changed" tab:
feat(firestore): Query profiling #10164.
Also add ConsistencySelector
https://github.com/googleapis/java-firestore/blob/742fab6583c9a6f9c47cf0496124c3c9b05fe0ee/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Pipeline.java#L997-L1021
feat(firestore): [PQ] add all array, string and vector functions #13245 (comment)
https://github.com/googleapis/java-firestore/blob/742fab6583c9a6f9c47cf0496124c3c9b05fe0ee/google-cloud-firestore/src/main/java/com/google/cloud/firestore/ExplainStats.java#L42-L71
Previous pull requests