Skip to content

Conversation

@jdpgrailsdev
Copy link
Contributor

What

  • Separate the handling of validation failures from the coercer to allow for IOC/metric tracking when validation fails and values are "munged"

How

  • Modify the validation method interface of the ValueCoercer to return a ValidationResult
  • Use the ValidationResult to perform the "munging" in a separate singleton that is also responsible for tracking metrics related to the munging
  • Introduce a separate singleton to accumulate the metrics

N.B. This is a breaking interface change that will require modifications in each destination that uses the new data flow to match the new interface. It is not a breaking change from an end-user point of view. Also note that I did not remove the nullify and truncate methods from the EnrichedAirbyteValue, as those are used still in non-data flow CDK code. There is probably a missing abstraction/consolidation piece to avoid having the pipe the result of coercer.validate() into validationResultHandler.handle. If we like this approach, I can work on that next in this PR to introduce one layer that the converter can call to get the fully validated and munged result in one call.

Review guide

  1. ValueCoercer.kt
  2. ValidationResultHandler.kt
  3. MetricTracker.kt
  4. JsonConverter.kt/ProtobufConverter.kt

Can this PR be safely reverted and rolled back?

  • YES 💚
  • NO ❌
@github-actions
Copy link
Contributor

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

Helpful Resources

PR Slash Commands

Airbyte Maintainers (that's you!) can execute the following slash commands on your PR:

  • /format-fix - Fixes most formatting issues.
  • /bump-version - Bumps connector versions.
    • You can specify a custom changelog by passing changelog. Example: /bump-version changelog="My cool update"
    • Leaving the changelog arg blank will auto-populate the changelog from the PR title.
  • /run-cat-tests - Runs legacy CAT tests (Connector Acceptance Tests)
  • /build-connector-images - Builds and publishes a pre-release docker image for the modified connector(s).
  • JVM connectors:
    • /update-connector-cdk-version connector=<CONNECTOR_NAME> - Updates the specified connector to the latest CDK version.
      Example: /update-connector-cdk-version connector=destination-bigquery
    • /bump-bulk-cdk-version type=patch changelog='foo' - Bump the Bulk CDK's version. type can be major/minor/patch.
  • Python connectors:
    • /poe connector source-example lock - Run the Poe lock task on the source-example connector, committing the results back to the branch.
    • /poe source example lock - Alias for /poe connector source-example lock.
    • /poe source example use-cdk-branch my/branch - Pin the source-example CDK reference to the branch name specified.
    • /poe source example use-cdk-latest - Update the source-example CDK dependency to the latest available version.

📝 Edit this welcome message.

@github-actions
Copy link
Contributor

github-actions bot commented Oct 31, 2025

Note

Detected that there are differences in the Gradle dependencies.

@Singleton
class MetricTracker {

private val metrics: MutableMap<String, Double> = ConcurrentHashMap()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we're tracking a set of known metrics and we should always publish 0, should we just make this a class instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gosusnp Maybe...hadn't gotten that far yet. We should either a) declare all of the metrics in the tracker/some other class that the tracker can access and initialize them to 0 on creation or b) make the caller initialize them. Seems like A makes more sense and roughly follows what we do in the platform the the metric registry.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gosusnp I took a swing at adding a metric registry (name to be determined) that handles the initialization to 0 for each known metric.

* value.nullify(
* AirbyteRecordMessageMetaChange.Reason.DESTINATION_FIELD_SIZE_LIMITATION
* )
* ShouldNullify(AirbyteRecordMessageMetaChange.Reason.DESTINATION_FIELD_SIZE_LIMITATION)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous was doing nullify but shouldn't this be truncate instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gosusnp Not sure why the previous was doing nullify. If we want to change this, we should do this as a separate change after we get whatever refactor we need in place to make it easy to track/revert the change.

}

override fun validate(value: EnrichedAirbyteValue): EnrichedAirbyteValue {
override fun validate(value: EnrichedAirbyteValue): ValidationResult =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the only place where we can mutate and introduce meta.changes? I thought this might happen in .map as well, example I am thinking about would be bad timestamps maybe.
If this is the case, shouldn't the ValidationResult be more a Envelop of value with the corresponding mutation tracked?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gosusnp As far as I can tell, the answer is yes in the new "data flow" CDK. However, those methods (nullify and truncate) are in the EnrichedAirbyteValue so it may be that they are getting called from other places in the destinations that use the new data flow CDK when we get in there. That seems to be a mistake (over exposing those methods) so in a way, this change will encapsulate it to make it harder for code to call these methods outside of the coercer/validation framework.

@jdpgrailsdev
Copy link
Contributor Author

@gosusnp @subodh1810 I have added a change to the StateStatsEnricher and DestinationMessage that should include the additional stats on state message using the new interfaces/objects in this PR. Please take a look.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

3 participants