rule

package
v0.0.0-...-b9c99d5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 20, 2025 License: MIT Imports: 20 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IsDataUnmarshalError

func IsDataUnmarshalError(err error) bool

func IsDispatcherNotFound

func IsDispatcherNotFound(err error) bool

func IsMatcherNotFound

func IsMatcherNotFound(err error) bool

func IsNotExistsVal

func IsNotExistsVal(val interface{}) bool

IsNotExistsVal determine if a value is a non-existent value, i.e., a value generated by NewNotExistsVal

func IsTransformerNotFound

func IsTransformerNotFound(err error) bool

Types

type Dispatcher

type Dispatcher interface {
	io.Closer
	Dispatch(ctx context.Context, event *EventExt) error
}

type EventExt

type EventExt struct {
	*v1.EventExt
}

func CloneEventExt

func CloneEventExt(evt *EventExt) *EventExt

func NewEventExt

func NewEventExt(evt *v1.Event, retry v1.RetryStrategy) (*EventExt, error)

NewEventExt when the ID of an event is zero, it is reassigned a unique ID under a source

func NewEventExtFromBytes

func NewEventExtFromBytes(b []byte) (*EventExt, error)

func (*EventExt) GetFieldByPath

func (e *EventExt) GetFieldByPath(path []string) (interface{}, error)

GetFieldByPath get internal field value by path. e.g.: ["data", "source"] -> e.Event.Data.source. if the path does not exist, the function will return the value generated by NewNotExistsVal to distinguish nil. if the event data is parsed incorrectly, the function will return a *dataUnmarshalError error, which can be asserted using the function IsDataUnmarshalError. all numbers use float64. to prevent precision overflow, id returns a string type.

func (*EventExt) Key

func (e *EventExt) Key() string

func (*EventExt) ValidateEventData

func (e *EventExt) ValidateEventData(validator *gojsonschema.Schema) error

ValidateEventData checking data field by schema definition

func (*EventExt) Value

func (e *EventExt) Value() []byte

type Executor

type Executor interface {
	Matcher
	io.Closer
	Dispatch(context.Context, *EventExt) error
	Transform(ctx context.Context, event *EventExt) ([]*EventExt, error)
	Update(context.Context, *Rule) error
	IsFilterPatternEqual(filterPattern string) bool
	IsTargetsEqual(Targets []*Target) bool
}

type Matcher

type Matcher interface {
	Pattern(ctx context.Context, event *EventExt) (bool, error)
}

type NewDispatcherFunc

type NewDispatcherFunc func(ctx context.Context, logger log.Logger, Target *Target) (Dispatcher, error)

type NewExecutorFunc

type NewExecutorFunc func(context.Context, log.Logger, *Rule, ...Option) (Executor, error)

type NewMatcherFunc

type NewMatcherFunc func(ctx context.Context, logger log.Logger, pattern map[string]interface{}) (Matcher, error)

type NewTransformerFunc

type NewTransformerFunc func(ctx context.Context, logger log.Logger, Target *Target) (Transformer, error)

type NotExistsValT

type NotExistsValT struct{}

func NewNotExistsVal

func NewNotExistsVal() *NotExistsValT

NewNotExistsVal generate a value to represent a value that does not exist

type Option

type Option func(*options)

Option is a functional option for configuring the executor.

func WithExecuteDuration

func WithExecuteDuration(c metric.Float64Histogram) Option

WithExecuteDuration with executed duration histogram.

func WithExecuteTotal

func WithExecuteTotal(c metric.Int64Counter) Option

WithExecuteTotal with executed total counter.

func WithTransformParallelism

func WithTransformParallelism(parallelism int) Option

WithTransformParallelism sets the parallelism for transforming events.

type Rule

type Rule struct {
	Name    string
	BusName string
	Status  v1.RuleStatus
	Pattern string
	Targets []*Target
}

type Rules

type Rules interface {
	GetExecutors(busName string) (map[string]Executor, error)
}

func NewRules

func NewRules(
	logger log.Logger,
	reflector informer.Reflector,
	nef NewExecutorFunc,
	ExecOpts ...Option,
) (Rules, func(), error)

type RulesManager

type RulesManager interface {
	GetExecutors() (map[string]Executor, error)
	UpdateRules(ctx context.Context, rs []*Rule) error
	CleanupRules() error
}

type Target

type Target struct {
	ID            uint64
	Type          string
	Params        []*TargetParam
	RetryStrategy v1.RetryStrategy
}

type TargetParam

type TargetParam struct {
	Key      string
	Form     string
	Value    string
	Template *string
}

type Transformer

type Transformer interface {
	Transform(ctx context.Context, event *EventExt) (*EventExt, error)
}

Directories

Path Synopsis