-
Notifications
You must be signed in to change notification settings - Fork 290
Add New Source
Here is how the new plugin system works. Each data source need to implement one function:
type Sourcer interface {
Generate(*Flow) *Dataset
}
Simple enough?
Here are the an example implementation for Cassandra data source.
func (s *CassandraSource) Generate(f *flow.Flow) *flow.Dataset {
return s.genShardInfos(f).RoundRobin(s.Concurrency).Mapper(MapperReadShard)
}
genShardInfos(f) will generate an initial dataset with a list of CassandraShardInfo objects.
The CassandraShardInfo objects are distributed to a few number of executors. Each executor takes one CassandraShardInfo object, connect to the source, and read the corresponding Cassandra shard.
It should be obvious that since CassandraShardInfo objects are sent to remote executors, the objects should be serializable and deserializable.
With this simple Sourcer interface, an actual data source can be implemented in pure Go in any way you want.
-
The
MapperReadShardis a pure Go function. -
CassandraShardInfoobject should be serializable and deserializable. -
Data partitioning is determined by the number of
CassandraShardInfoobjects generated fromgenShardInfos(f). OneCassandraShardInfoobject corresponds to one data partition.
See cassandra_source.go for the example.