FuncFrog is a library for performing efficient, parallel, lazy map, reduce, filter and many other operations on slices and other data sequences in a pipeline. The sequence can be set by a variety of generating functions. Everything is supported to be executed in parallel with minimal overhead on copying and locks. There is a built-in support of error handling with Yeet/Snag methods
The library is easy to use and has a clean, intuitive API.
You can measure performance comparing to vanilla for loop on your machine using cd perf/; make (spoiler: FuncFrog
is better when multithreading).
- Getting Started
- Basic information
- Supported functions list
- Using prefix
Pipeto transformPipetype - Using
ffpackage to write shortened pipes - Look for useful functions in
Pipiespackage - Examples
- Basic example
- Example using
FuncandTake - Example using
FuncandGen - Example difference between
TakeandGen - Example using
FilterandMap - Example using
MapandReduce - Example of
MapandReducewith the underlying array type change - Example using
Sort - Example of infine sequence generation
- Example using
RangeandMap - Example using
RepeatandMap - Example using
CycleandFilter - Example using
EraseandCollect - Example of simple error handling
- Example of multiple error handling
- Is this package stable?
- Contributions
- What's next?
To use FuncFrog in your project, run the following command:
go get github.com/koss-null/funcfrog
Then, import the library into your Go code (basically you need the pipe package):
import "github.com/koss-null/funcfrog/pkg/pipe"You can then use the pipe package to create a pipeline of operations on a slice:
res := pipe.Slice(a).
Map(func(x int) int { return x * x }).
Filter(func(x *int) bool { return *x > 100 }).
Parallel(12).
Do()All operations are carefully fenced with interfaces, so feel free to use anything, autosuggestion suggests you.
If you want it fast and short, you may use ff:
import "github.com/koss-null/funcfrog/pkg/ff"
res := ff.Map(strArr, strings.ToUpper).Do()To see some code snippets, check out the Examples.
The Piper (or PiperNoLen for pipes with undetermined lengths) is an interface that represents a lazy-evaluated sequence of data. The Piper interface provides a set of methods that can be used to transform, filter, collect and analyze data in the sequence.
Every pipe can be conveniently copied at every moment just by equating it to a variable. Some methods (as Take or Gen) lead from PiperNoLen to Piper interface making wider method range available.
The following functions can be used to create a new Pipe (this is how I call the inner representation of a sequence ofelements and a sequence operations on them):
- 🐸
Slice([]T) Piper: creates aPipeof a given typeTfrom a slice, the length is known. - 🐸
Func(func(i int) (T, bool)) PiperNL: creates aPipeof typeTfrom a function. The function returns an element which is considered to be atith position in thePipe, as well as a boolean indicating whether the element should be included (true) or skipped (false), the length is unknown. - 🐸
Fn(func(i int) (T)) PiperNL: creates aPipeof typeTfrom a function. The function should return the value of the element at theith position in thePipe; to be able to skip values useFunc. - 🐸
FuncP(func(i int) (*T, bool)) PiperNL: creates aPipeof typeTfrom a function. The function returns a pointer to an element which is considered to be atith position in thePipe, as well as a boolean indicating whether the element should be included (true) or skipped (false), the length is unknown. - 🐸
Cycle(data []T) PiperNL: creates a newPipethat cycles through the elements of the provided slice indefinitely. The length is unknown. - 🐸
Range(start, end, step T) Piper: creates a newPipethat generates a sequence of values of typeTfromstarttoend(exclusive) with a fixedstepvalue between each element.Tcan be any numeric type, such asint,float32, orfloat64. The length is known. - 🐸
Repeat(x T, n int) Piper: creates a newPipethat generates a sequence of values of typeTand value x with the length of n. The length is known.
- 🐸
Take(n int) Piper: if it's aFunc-madePipe, expectsnvalues to be eventually returned. Transforms unknown length to known. - 🐸
Gen(n int) Piper: if it's aFunc-madePipe, generates a sequence from[0, n)and applies the function to it. Transforms unknown length to known.
- 🐸
Parallel(n int) Pipe: sets the number of goroutines to be executed on (1 by default). This function can be used to specify the level of parallelism in the pipeline. Availabble for unknown length.
- 🐸
Map(fn func(x T) T) Pipe: applies the functionfnto every element of thePipeand returns a newPipewith the transformed data. Available for unknown length. - 🐸
Filter(fn func(x *T) bool) Pipe: applies the predicate functionfnto every element of thePipeand returns a newPipewith only the elements that satisfy the predicate. Available for unknown length. - 🐸
MapFilter(fn func(T) (T, bool)) Piper[T]: applies given function to each element of the underlying slice. If the second returning value offnis false, the element is skipped (may be useful for error handling). - 🐸
Reduce(fn func(x, y *T) T) *T: applies the binary functionfnto the elements of thePipeand returns a single value that is the result of the reduction. Returnsnilif thePipewas empty before reduction. - 🐸
Sum(plus func(x, y *T) T) T: makes parallel reduce with associative functionplus. - 🐸
Sort(less func(x, y *T) bool) Pipe: sorts the elements of thePipeusing the providedlessfunction as the comparison function.
- 🐸
Any() T: returns a random element existing in the pipe. Available for unknown length. - 🐸
First() T: returns the first element of thePipe, ornilif thePipeis empty. Available for unknown length. - 🐸
Count() int: returns the number of elements in thePipe. It does not allocate memory for the elements, but instead simply returns the number of elements in thePipe.
- 🐸
Do() []Tfunction is used to execute the pipeline and return the resulting slice of data. This function should be called at the end of the pipeline to retrieve the final result.
- 🐸
Erase() Pipe[any]: returns a pipe where all objects are the objects from the initialPipebut with erased type. Basically for eachxit returnsany(&x). Usepipe.Collect[T](Piper[any]) PiperTto collect it back into some type (orpipe.CollectNLfor slices with length not set yet).
- 🐸
pipe.Collect[T](Piper[any]) PiperNoLen[T] - 🐸
pipe.CollectNL[T](PiperNoLen[any]) PiperNoLen[T]This functions takes a Pipe of erasedinterface{}type (which is pretty useful if you have a lot of type conversions along your pipeline and can be achieved by callingErase()on aPipe). Basically, for each elementxin a sequenceCollectreturns*(x.(*T))element.
- 🐸
Yeti(yeti) Pipe[T]:set ayeti- an object that will collect errors thrown withyeti.Yeet(error)and will be used to handle them. - 🐸
Snag(func(error)) Pipe[T]: set a function that will handle all errors which have been sent withyeti.Yeet(error)to the lastyetiobject that was set throughPipe[T].Yeti(yeti) Pipe[T]method. Error handling may look pretty uncommon at a first glance. To get better intuition about it you may like to check out examples section.
- 🌱 TBD:
Until(fn func(*T) bool): if it's aFunc-madePipe, it evaluates one-by-one until fn return false. This feature may require some newPipeinterfaces, since it is applicable only in a context of a single thread - 🌱 TBD:
IsAny() bool: returnstrueif thePipecontains any elements, andfalseotherwise. Available for unknown length. - 🌱 TBD:
MoreThan(n int) bool: returnstrueif thePipecontains more thannelements, andfalseotherwise. Available for unknown length. - 🌱 TBD:
Reverse() *Pipe: reverses the underlying slice.
In addition to the functions described above, the pipe package also provides several utility functions that can be used to create common types of Pipes, such as Range, Repeat, and Cycle. These functions can be useful for creating Pipes of data that follow a certain pattern or sequence.
Also it is highly recommended to get familiarize with the pipies package, containing some useful predecates, comparators and accumulators.
You may found that using Erase() is not so convenient as it makes you to do some pointer conversions. Fortunately there is another way to convert a pipe type: use functions from pipe/prefixpipe.go. These functions takes Piper or PiperNoLen as a first parameter and function to apply as the second and returns a resulting pipe (or the result itself) of a destination type.
- 🐸
pipe.Map(Piper[SrcT], func(x SrcT) DstT) Piper[DstT]- applies map from one type to another for thePipewith known length. - 🐸
pipe.MapNL(PiperNoLen[SrcT], func(x SrcT) DstT) PiperNoLen[DstT]- applies map from one type to another for thePipewith unknown length. - 🐸
Reduce(Piper[SrcT], func(*DstT, *SrcT) DstT, initVal ...DstT)- applies reduce operation onPipeof typeSrcTand returns result of typeDstT.initValis an optional parameter to initialize a value that should be used on the first steps of reduce.
Sometimes you need just to apply a function. Creating a pipe using pipe.Slice and then call Map looks a little bit verbose, especially when you need to call Map or Reduce from one type to another. The solution for it is funcfrog/pkg/ff package. It contains shortened Map and Reduce functions which can be called directly with a slice as a first parameter.
- 🐸
Map([]SrcT, func(SrcT) DstT) pipe.Piper[DstT]- applies sent function to a slice, returns aPipeof resulting type - 🐸
Reduce([]SrcT, func(*DstT, *SrcT) DstT, initVal ...DstT) DstT- applies reduce operation on a slice and returns the result of typeDstT.initValis an optional parameter to initialize a value that should be used on the first steps of reduce.
Some of the functions that are sent to Map, Filter or Reduce (or other Pipe methods) are pretty common. Also there is a common comparator for any integers and floats for a Sort method.
res := pipe.Slice(a).
Map(func(x int) int { return x * x }).
Map(func(x int) int { return x + 1 }).
Filter(func(x *int) bool { return *x > 100 }).
Filter(func(x *int) bool { return *x < 1000 }).
Parallel(12).
Do()p := pipe.Func(func(i int) (v int, b bool) {
if i < 10 {
return i * i, true
}; return
}).Take(5).Do()
// p will be [0, 1, 4, 9, 16]p := pipe.Func(func(i int) (v int, b bool) {
if i < 10 {
return i * i, true
}; return
}).Gen(5).Do()
// p will be [0, 1, 4, 9, 16]Gen(n) generates the sequence of n elements and applies all pipeline afterwards.
p := pipe.Func(func(i int) (v int, b bool) {
return i, true
}).
Filter(func(x *int) bool { return (*x) % 2 == 0})
Gen(10).
Do()
// p will be [0, 2, 4]Take(n) expects the result to be of n length.
p := pipe.Func(func(i int) (v int, b bool) {
return i, true
}).
Filter(func(x *int) bool { return (*x) % 2 == 0})
Take(10).
Do()
// p will be [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]Watch out, if Take value is set uncarefully, it may jam the whole pipenile.
// DO NOT DO THIS, IT WILL JAM
p := pipe.Func(func(i int) (v int, b bool) {
return i, i < 10 // only 10 first values are not skipped
}).
Take(11). // we can't get any 11th value ever
Parallel(4). // why not
Do()
// Do() will try to evaluate the 11th value in 4 goroutines until it reaches maximum int valuep := pipe.Slice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}).
Filter(func(x *int) bool { return *x % 2 == 0 }).
Map(func(x int) int { return len(strconv.Itoa(x)) }).
Do()
// p will be [1, 1, 1, 1, 2]In this example Reduce is used in it's prefix form to be able to convert ints to string.
p := pipe.Reduce(
pipe.Slice([]int{1, 2, 3, 4, 5}).
Map(func(x int) int { return x * x }),
func(x, y *int) string {
return strconv.Itoa(*x) + "-" + strconv.Itoa(y)
},
)
// p will be "1-4-9-16-25"In this example Reduce is used as usual in it's postfix form.
p := pipe.Slice([]stirng{"Hello", "darkness", "my", "old", "friend"}).
Map(strings.Title).
Reduce(func(x, y *string) string {
return *x + " " + *y
})
)
// p will be "Hello Darkness My Old Friend"p := pipe.Slice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9})
strP := pipe.Map(p, func(x int) string { return strconv.Itoa(x) })
result := pipe.Reduce(strP, func(x, y *string) int { return len(*x) + len(*y) }).Do()
// result will be 45p := pipe.Func(func(i int) (float32, bool) {
return 100500-float32(i) * 0.9, true
}).
Map(func(x float32) float32 { return x * x * 0.1 }).
Gen(100500). // Sort is only availavle on pipes with known length
Sort(pipies.Less[float32]). // pipies.Less(x, y *T) bool is available to all comparables
// check out pipies package to find more usefull things
Parallel(12).
Do()
// p will contain the elements sorted in ascending orderHere is an example of generating an infinite sequence of Fibonacci:
var fib []chan int
p := pipe.Func(func(i int) (int, bool) {
if i < 2 {
fib[i] <- i
return i, true
}
p1 := <-fib[i-1]; fib[i-1] <- p1
p2 := <-fib[i-2]; fib[i-2] <- p2
fib[i] <- p1 + p2
return p1 + p2, true
}).Parallel(20)To generate a specific number of values, you can use the Take or Gen method:
// fill the array first
fib = make([]chan int, 60)
for i := range fib { fib[i] = make(chan int, 1) }
// do the Take
p = p.Take(60)To accumulate the elements of the Pipe, you can use the Reduce or Sum method:
sum := p.Sum(pipe.Sum[float32])
//also you can: sum := p.Reduce(func(x, y *float32) float32 { return *x + *y})
// sum will be the sum of the first 65000 random float32 values greater than 0.5p := pipe.Range(10, 20, 2).Map(func(x int) int { return x * x }).Do()
// p will be [100, 144, 196, 256, 324]p := pipe.Repeat("hello", 5).Map(strings.ToUpper).Do()
// p will be ["HELLO", "HELLO", "HELLO", "HELLO", "HELLO"]Here is an example how you can handle multiple function returning error call this way:
func foo() error {
// <...>
return nil
}
errs := pipe.Map(
pipe.Repeat(foo, 50),
func(f func() error) error { return f() },
).Do()
for _, e := range errs {
if e != nil {
log.Err(e)
}
}p := pipe.Cycle([]int{1, 2, 3}).Filter(func(x *int) bool { return *x % 2 == 0 }).Take(4).Do()
// p will be [2, 2, 2, 2]p := pipe.Slice([]int{1, 2, 3}).
Erase().
Map(func(x any) any {
i := *(x.(*int))
return &MyStruct{Weight: i}
}).Filter(x *any) bool {
return (*x).(*MyStruct).Weight > 10
}
ms := pipe.Collect[MyStruct](p).Parallel(10).Do()y := pipe.NewYeti()
p := pipe.Range[int](-10, 10, 1).
Yeti(y). // it's important to set yeti before yeeting, or the handle process will not be called
MapFilter(func(x int) (int, bool) {
if x == 0 {
y.Yeet(errors.New("zero devision")) // yeet the error
return 0, false // use MapFilter to filter out this value
}
return int(256.0 / float64(x)), true
}).Snag(func(err error) {
fmt.Println("oopsie-doopsie: ", err)
}).Do()
fmt.Println("p is: ", p)
/////////// output is:
// oopsie-doopsie: zero devision
// p is: [-25 -28 -32 -36 -42 -51 -64 -85 -128 -256 256 128 85 64 51 42 36 32 28]This example demonstrates generating a set of values 256/i, where i ranges from -10 to 9 (excluding 10) with a step of 1. To handle division by zero, the library provides an error handling mechanism.
To begin, you need to create an error handler using the pipe.NewYeti() function. Then, register the error handler by calling the Yeti(yeti) method on your pipe object. This registered yeti will be the last error handler used in the pipe chain.
To yeet an error, you can use y.Yeet(error) from the registered yeti object.
To handle the yeeted error, use the Snag(func(error)) method, which sets up an error handling function. You can set up multiple Snag functions, but all of them will consider the last yeti object set with the Yeti(yeti) method.
This is a simple example of how to handle basic errors. Below, you will find a more realistic example of error handling in a real-life scenario.
y1, y2 := pipe.NewYeti(), pipe.NewYeti()
users := pipe.Func(func(i int) (*domain.DomObj, bool) {
domObj, err := uc.GetUser(i)
if err != nil {
y1.Yeet(err)
return nil, false
}
return domObj, true
}).
Yeti(y1).Snag(handleGetUserErr). // suppose we have some pre-defined handler
MapFilter(func(do *domain.DomObj) (*domain.DomObj, bool) {
enriched, err := uc.EnrichUser(do)
if err != nil {
return nil, false
}
return enriched, true
}).Yeti(y2).Snag(handleEnrichUserErr).
Do()The full working code with samples of handlers and implementations of usecase functions can be found at: https://go.dev/play/p/YGtM-OeMWqu.
This example demonstrates how multiple error handling functions can be set up at different stages of the data processing pipeline to handle errors specific to each stage.
Lets break down what is happening here.
In this code fragment, there are two instances of pipe.Yeti created: y1 and y2. These Yeti instances are used to handle errors at different stages of the data processing pipeline.
Within the pipe.Func operation, there are error-handling statements. When calling uc.GetUser(i), if an error occurs, it is yeeted using y1.Yeet(err), and the function returns nil and false to indicate the failure.
The Yeti(y1).Snag(handleGetUserErr) statement sets up an error handling function handleGetUserErr to handle the error thrown by uc.GetUser(i). This function is defined elsewhere and specifies how to handle the error.
After that, the MapFilter operation is performed on the resulting *domain.DomObj. If the uc.EnrichUser(do) operation encounters an error, it returns nil and false to filter out the value.
The Yeti(y2).Snag(handleEnrichUserErr) statement sets up another error handling function handleEnrichUserErr to handle the error thrown by uc.EnrichUser(do).
Finally, the Do() method executes the entire pipeline and assigns the result to the users variable.
Yes it finally is stable since v1.0.0! All listed functionality is fully covered by unit-tests. Functionality marked as TBD will be implemented as it described in the README and covered by unit-tests to be delivered stable.
If there will be any method signature changes, the major version will be incremented.
I will accept any pr's with the functionality marked as TBD.
Also I will accept any sane unit-tests.
Bugfixes.
You are welcome to create any issues and connect to me via email.
I hope to provide some roadmap of the project soon.
Feel free to fork, inspire and use!
