Documentation
¶
Index ¶
- func DamperFlow(ctx *engine.Context) engine.Flow
- func DumpFlow(ctx *engine.Context) engine.Flow
- func FileInlet(ctx *engine.Context) engine.Inlet
- func FileOutlet(ctx *engine.Context) engine.Outlet
- func FlattenFlow(ctx *engine.Context) engine.Flow
- func InjectFlow(ctx *engine.Context) engine.Flow
- func MergeFlow(ctx *engine.Context) engine.Flow
- func NewCSVDecoder(conf engine.DecoderConfig) engine.Decoder
- func NewCSVEncoder(c engine.EncoderConfig) engine.Encoder
- func NewJSONDecoder(c engine.DecoderConfig) engine.Decoder
- func NewJSONEncoder(c engine.EncoderConfig) engine.Encoder
- func SelectFlow(ctx *engine.Context) engine.Flow
- func StringFields(r engine.Record) []string
- func StringFieldsWithFormat(r engine.Record, tf *engine.Timeformatter, decimal int) []string
- func UpdateFlow(ctx *engine.Context) engine.Flow
- type CSVDecoder
- type CSVEncoder
- type JSONDecoder
- type JSONEncoder
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DamperFlow ¶
Example ¶
package main
import (
"time"
"github.com/OutOfBedlam/tine/engine"
_ "github.com/OutOfBedlam/tine/plugins/base"
)
func main() {
recipe := `
name="pipeline-1"
[log]
path = "-"
level = "warn"
no_color = true
timeformat = "no-time-for-test"
[[inlets.file]]
data = [
"a,100",
"b,200",
"c,300",
"d,400",
]
[[flows.damper]]
buffer_limit = 2
[[flows.select]]
includes = ["#_ts", "*"]
[[outlets.file]]
path = "-"
format = "json"
`
seq := int64(0)
engine.Now = func() time.Time { seq++; return time.Unix(1721954797+seq, 0) }
pipe, err := engine.New(engine.WithConfig(recipe))
if err != nil {
panic(err)
}
err = pipe.Run()
if err != nil {
panic(err)
}
}
Output: {"0":"a","1":"100","_ts":1721954798} {"0":"b","1":"200","_ts":1721954799} {"0":"c","1":"300","_ts":1721954800} {"0":"d","1":"400","_ts":1721954801}
func DumpFlow ¶
Example ¶
package main
import (
"github.com/OutOfBedlam/tine/engine"
_ "github.com/OutOfBedlam/tine/plugins/base"
)
func main() {
recipe := `
name="pipeline-1"
[log]
path = "-"
level = "warn"
no_color = true
timeformat = "no-time-for-test"
[[inlets.file]]
data = [
"a,100",
"b,200",
]
[[inlets.file.flows.dump]]
level = "warn"
[[flows.dump]]
level = "error"
[[outlets.file]]
path = "-"
format = "json"
`
pipe, err := engine.New(engine.WithConfig(recipe))
if err != nil {
panic(err)
}
err = pipe.Run()
if err != nil {
panic(err)
}
}
Output: no-time-for-test WRN pipeline pipeline-1 flow-dump rec=1/2 0=a 1=100 no-time-for-test WRN pipeline pipeline-1 flow-dump rec=2/2 0=b 1=200 no-time-for-test ERR pipeline pipeline-1 flow-dump rec=1/2 0=a 1=100 no-time-for-test ERR pipeline pipeline-1 flow-dump rec=2/2 0=b 1=200 {"0":"a","1":"100"} {"0":"b","1":"200"}
func FileInlet ¶
Example ¶
package main
import (
"time"
"github.com/OutOfBedlam/tine/engine"
_ "github.com/OutOfBedlam/tine/plugins/base"
)
func main() {
dsl := `
[[inlets.file]]
data = [
"a,1",
"b,2",
"c,3",
]
format = "csv"
[[outlets.file]]
path = "-"
format = "csv"
`
// Make the output timestamp deterministic, so we can compare it
// This line is required only for testing
engine.Now = func() time.Time { return time.Unix(1721954797, 0) }
// Create a new pipeline
pipeline, err := engine.New(engine.WithConfig(dsl))
if err != nil {
panic(err)
}
if err := pipeline.Run(); err != nil {
panic(err)
}
}
Output: a,1 b,2 c,3
Example (Fields) ¶
package main
import (
"time"
"github.com/OutOfBedlam/tine/engine"
_ "github.com/OutOfBedlam/tine/plugins/base"
)
func main() {
dsl := `
[[inlets.file]]
data = [
"1,key1,1722642405,1.234",
"2,key2,1722642406,2.345",
]
format = "csv"
fields = ["line", "name", "time", "value"]
[[outlets.file]]
format = "json"
`
// Make the output timestamp deterministic, so we can compare it
// This line is required only for testing
engine.Now = func() time.Time { return time.Unix(1721954797, 0) }
// Create a new pipeline
pipeline, err := engine.New(engine.WithConfig(dsl))
if err != nil {
panic(err)
}
if err := pipeline.Run(); err != nil {
panic(err)
}
}
Output: {"line":"1","name":"key1","time":"1722642405","value":"1.234"} {"line":"2","name":"key2","time":"1722642406","value":"2.345"}
Example (File) ¶
package main
import (
"time"
"github.com/OutOfBedlam/tine/engine"
_ "github.com/OutOfBedlam/tine/plugins/base"
)
func main() {
dsl := `
[[inlets.file]]
path = "testdata/testdata.csv"
format = "csv"
fields = ["line", "name", "time", "value"]
types = ["int", "string", "time", "float"]
[[outlets.file]]
path = "-"
format = "json"
decimal = 2
`
// Make the output timestamp deterministic, so we can compare it
// This line is required only for testing
engine.Now = func() time.Time { return time.Unix(1721954797, 0) }
// Create a new pipeline
pipeline, err := engine.New(engine.WithConfig(dsl))
if err != nil {
panic(err)
}
if err := pipeline.Run(); err != nil {
panic(err)
}
}
Output: {"line":1,"name":"key1","time":1722642405,"value":1.23} {"line":2,"name":"key2","time":1722642406,"value":2.35}
func FileOutlet ¶
Example ¶
package main
import (
"time"
"github.com/OutOfBedlam/tine/engine"
_ "github.com/OutOfBedlam/tine/plugins/base"
)
func main() {
dsl := `
[[inlets.file]]
data = [
"a,1",
"b,2",
"c,3",
]
format = "csv"
[[outlets.file]]
path = "-"
format = "csv"
`
// Make the output timestamp deterministic, so we can compare it
// This line is required only for testing
engine.Now = func() time.Time { return time.Unix(1721954797, 0) }
// Create a new pipeline
pipeline, err := engine.New(engine.WithConfig(dsl))
if err != nil {
panic(err)
}
if err := pipeline.Run(); err != nil {
panic(err)
}
}
Output: a,1 b,2 c,3
func FlattenFlow ¶
Example ¶
package main
import (
"time"
"github.com/OutOfBedlam/tine/engine"
_ "github.com/OutOfBedlam/tine/plugins/base"
)
func main() {
dsl := `
[[inlets.file]]
data = [
"a,1",
"b,2",
"c,3",
]
format = "csv"
[[flows.flatten]]
name_infix = "::"
[[flows.select]]
includes = ["**"]
[[outlets.file]]
path = "-"
format = "csv"
`
// Make the output timestamp deterministic, so we can compare it
// This line is required only for testing
count := int64(0)
engine.Now = func() time.Time { count++; return time.Unix(1721954797+count, 0) }
// Create a new pipeline
pipeline, err := engine.New(engine.WithConfig(dsl))
if err != nil {
panic(err)
}
// Run the pipeline
if err := pipeline.Run(); err != nil {
panic(err)
}
}
Output: 1721954798,file::0,a 1721954798,file::1,1 1721954799,file::0,b 1721954799,file::1,2 1721954800,file::0,c 1721954800,file::1,3
func InjectFlow ¶
Example ¶
package main
import (
"os"
"time"
"github.com/OutOfBedlam/tine/engine"
_ "github.com/OutOfBedlam/tine/plugins/args"
_ "github.com/OutOfBedlam/tine/plugins/base"
)
func main() {
// This example demonstrates how to use the exec inlet to run a command and
dsl := `
[[inlets.args]]
[[flows.select]]
includes = ["**"]
[[flows.inject]]
id = "here"
[[outlets.file]]
path = "-"
format = "json"
`
// Make the output timestamp deterministic, so we can compare it
// This line is required only for testing
engine.Now = func() time.Time { return time.Unix(1721954797, 0) }
// Build pipeline
pipeline, err := engine.New(engine.WithConfig(dsl))
if err != nil {
panic(err)
}
pipeline.Context().Inject("here", func(r []engine.Record) ([]engine.Record, error) {
for i, rec := range r {
r[i] = rec.AppendOrReplace(engine.NewField("msg", "hello world - here updated"))
}
return r, nil
})
// Simulate the command line arguments
os.Args = []string{"command", "command-arg", "--", "msg=hello world"}
// Run the pipeline
if err := pipeline.Run(); err != nil {
panic(err)
}
}
Output: {"_in":"args","_ts":1721954797,"msg":"hello world - here updated"}
func MergeFlow ¶
Example ¶
package main
import (
"time"
"github.com/OutOfBedlam/tine/engine"
_ "github.com/OutOfBedlam/tine/plugins/base"
_ "github.com/OutOfBedlam/tine/plugins/exec"
_ "github.com/OutOfBedlam/tine/plugins/psutil"
)
func main() {
// This example demonstrates how to use the merge flow.
dsl := `
[[inlets.file]]
data = [
"a,1",
]
format = "csv"
[[inlets.exec]]
commands = ["echo", "hello world"]
count = 1
trim_space = true
ignore_error = true
[[flows.merge]]
wait_limit = "1s"
[[outlets.file]]
path = "-"
format = "json"
`
// Make the output time deterministic. so we can compare it.
// This line is not needed in production code.
engine.Now = func() time.Time { return time.Unix(1721954797, 0) }
// Create a new engine.
pipeline, err := engine.New(engine.WithConfig(dsl))
if err != nil {
panic(err)
}
// Run the engine.
if err := pipeline.Run(); err != nil {
panic(err)
}
}
Output: {"_ts":1721954797,"exec_stdout":"hello world","file_0":"a","file_1":"1"}
func NewCSVDecoder ¶
func NewCSVDecoder(conf engine.DecoderConfig) engine.Decoder
func NewCSVEncoder ¶
func NewCSVEncoder(c engine.EncoderConfig) engine.Encoder
func NewJSONDecoder ¶
func NewJSONDecoder(c engine.DecoderConfig) engine.Decoder
func NewJSONEncoder ¶
func NewJSONEncoder(c engine.EncoderConfig) engine.Encoder
func SelectFlow ¶
Example ¶
package main
import (
"time"
"github.com/OutOfBedlam/tine/engine"
_ "github.com/OutOfBedlam/tine/plugins/base"
)
func main() {
dsl := `
[[inlets.file]]
data = [
"a,1",
"b,2",
"c,3",
]
format = "csv"
[[flows.select]]
includes = ["**", "not-exist", "#_ts", "1"]
[[outlets.file]]
path = "-"
format = "csv"
`
// Make the output timestamp deterministic, so we can compare it
// This line is required only for testing
count := int64(0)
engine.Now = func() time.Time { count++; return time.Unix(1721954797+count, 0) }
// Create a new pipeline
pipeline, err := engine.New(engine.WithConfig(dsl))
if err != nil {
panic(err)
}
// Run the pipeline
if err := pipeline.Run(); err != nil {
panic(err)
}
}
Output: file,1721954798,a,1,,1721954798,1 file,1721954799,b,2,,1721954799,2 file,1721954800,c,3,,1721954800,3
Example (Tag) ¶
package main
import (
"time"
"github.com/OutOfBedlam/tine/engine"
_ "github.com/OutOfBedlam/tine/plugins/base"
)
func main() {
dsl := `
[[inlets.file]]
data = [
"a,1",
"b,2",
"c,3",
]
format = "csv"
fields = ["area", "ival"]
types = ["string", "int"]
[[flows.select]]
includes = ["#_in", "#non_exist", "*"]
[[outlets.file]]
format = "json"
`
// Make the output timestamp deterministic, so we can compare it
// This line is required only for testing
count := int64(0)
engine.Now = func() time.Time { count++; return time.Unix(1721954797+count, 0) }
// Create a new pipeline
pipeline, err := engine.New(engine.WithConfig(dsl))
if err != nil {
panic(err)
}
// Run the pipeline
if err := pipeline.Run(); err != nil {
panic(err)
}
}
Output: {"_in":"file","area":"a","ival":1,"non_exist":null} {"_in":"file","area":"b","ival":2,"non_exist":null} {"_in":"file","area":"c","ival":3,"non_exist":null}
func StringFields ¶
func StringFieldsWithFormat ¶
func UpdateFlow ¶
Example ¶
package main
import (
"time"
"github.com/OutOfBedlam/tine/engine"
_ "github.com/OutOfBedlam/tine/plugins/base"
)
func main() {
dsl := `
[[inlets.file]]
data = [
"James,1,1.23,true",
"Jane,2,2.34,false",
"Scott,3,3.45,true",
]
fields = ["my_name", "my_int", "my_float", "flag"]
format = "csv"
[[flows.update]]
set = [
{ field = "my_name", name = "new_name" },
{ field = "my_int", value = 10 },
{ field = "my_float", value = 9.87, name = "new_float" },
{ field = "flag", value = true, name = "new_flag" },
{ tag = "_in", value = "mine" },
]
[[flows.select]]
includes = ["#_in", "*"]
[[outlets.file]]
format = "json"
`
// Make the output timestamp deterministic, so we can compare it
// This line is required only for testing
engine.Now = func() time.Time { return time.Unix(1721954797, 0) }
// Create a new pipeline
pipeline, err := engine.New(engine.WithConfig(dsl))
if err != nil {
panic(err)
}
// Run the pipeline
if err := pipeline.Run(); err != nil {
panic(err)
}
}
Output: {"_in":"mine","my_int":"10","new_flag":true,"new_float":9.87,"new_name":"James"} {"_in":"mine","my_int":"10","new_flag":true,"new_float":9.87,"new_name":"Jane"} {"_in":"mine","my_int":"10","new_flag":true,"new_float":9.87,"new_name":"Scott"}
Types ¶
type CSVDecoder ¶
type CSVDecoder struct {
engine.DecoderConfig
// contains filtered or unexported fields
}
type CSVEncoder ¶
type CSVEncoder struct {
engine.EncoderConfig
// contains filtered or unexported fields
}
Example ¶
package main
import (
"time"
"github.com/OutOfBedlam/tine/engine"
_ "github.com/OutOfBedlam/tine/plugins/base"
)
func main() {
dsl := `
[[inlets.file]]
data = [
"a,1,1.234,true,2024/08/09 16:01:02",
"b,2,2.345,false,2024/08/09 16:03:04",
"c,3,3.456,true,2024/08/09 16:05:06",
]
format = "csv"
timeformat = "2006/01/02 15:04:05"
tz = "UTC"
fields = ["area","ival","fval","bval","tval"]
types = ["string", "int", "float", "bool", "time"]
[[flows.select]]
includes = ["#*", "ival", "area", "ival", "fval", "bval", "tval"]
[[outlets.file]]
path = "-"
format = "csv"
`
// Mock the current time
count := int64(0)
engine.Now = func() time.Time { count++; return time.Unix(1721954797+count, 0) }
// Create a new pipeline
pipeline, err := engine.New(engine.WithConfig(dsl))
if err != nil {
panic(err)
}
if err := pipeline.Run(); err != nil {
panic(err)
}
}
Output: file,1721954798,1,a,1,1.234,true,1723219262 file,1721954799,2,b,2,2.345,false,1723219384 file,1721954800,3,c,3,3.456,true,1723219506
type JSONDecoder ¶
type JSONDecoder struct {
engine.DecoderConfig
// contains filtered or unexported fields
}
Example ¶
package main
import (
"time"
"github.com/OutOfBedlam/tine/engine"
_ "github.com/OutOfBedlam/tine/plugins/base"
)
func main() {
dsl := `
[[inlets.file]]
path = "testdata/testdata.json"
format = "json"
[[flows.select]]
includes = ["#*", "area", "bval", "ival", "fval", "time"]
[[outlets.file]]
path = "-"
format = "json"
decimal = 2
`
// Make the output timestamp deterministic, so we can compare it
// This line is required only for testing
engine.Now = func() time.Time { return time.Unix(1721954797, 0) }
// Create a new pipeline
pipeline, err := engine.New(engine.WithConfig(dsl))
if err != nil {
panic(err)
}
if err := pipeline.Run(); err != nil {
panic(err)
}
}
Output: {"_in":"file","_ts":1721954797,"area":"a","bval":true,"fval":1.23,"ival":1.00,"time":"2020-01-01T00:00:00Z"} {"_in":"file","_ts":1721954797,"area":"b","bval":true,"fval":2.35,"ival":2.00,"time":"2020-01-02T00:00:00Z"}
Example (Data) ¶
package main
import (
"time"
"github.com/OutOfBedlam/tine/engine"
_ "github.com/OutOfBedlam/tine/plugins/base"
)
func main() {
dsl := `
[[inlets.file]]
data = [
'{"area": "a","ival": 1,"fval": 1.234,"time": "2020-01-01T00:00:00Z","bval": true}',
'{"area": "b","ival": 2,"fval": 2.345,"time": "2020-01-02T00:00:00Z","bval": true}',
]
format = "json"
[[flows.select]]
includes = ["#*", "area", "bval", "ival", "fval", "time"]
[[outlets.file]]
path = "-"
format = "json"
decimal = 2
`
// Make the output timestamp deterministic, so we can compare it
// This line is required only for testing
engine.Now = func() time.Time { return time.Unix(1721954797, 0) }
// Create a new pipeline
pipeline, err := engine.New(engine.WithConfig(dsl))
if err != nil {
panic(err)
}
if err := pipeline.Run(); err != nil {
panic(err)
}
}
Output: {"_in":"file","_ts":1721954797,"area":"a","bval":true,"fval":1.23,"ival":1.00,"time":"2020-01-01T00:00:00Z"} {"_in":"file","_ts":1721954797,"area":"b","bval":true,"fval":2.35,"ival":2.00,"time":"2020-01-02T00:00:00Z"}
type JSONEncoder ¶
type JSONEncoder struct {
engine.EncoderConfig
// contains filtered or unexported fields
}
Example ¶
package main
import (
"time"
"github.com/OutOfBedlam/tine/engine"
_ "github.com/OutOfBedlam/tine/plugins/base"
)
func main() {
dsl := `
[[inlets.file]]
data = [
"a,1,1.2345",
"b,2,2.3456",
"c,3,3.4567",
]
format = "csv"
fields = ["area", "ival", "fval"]
types = ["string", "int", "float"]
[[flows.select]]
includes = ["#*", "area", "ival", "fval"]
[[outlets.file]]
path = "-"
format = "json"
`
// Mock the current time
count := int64(0)
engine.Now = func() time.Time { count++; return time.Unix(1721954797+count, 0) }
// Create a new pipeline
pipeline, err := engine.New(engine.WithConfig(dsl))
if err != nil {
panic(err)
}
if err := pipeline.Run(); err != nil {
panic(err)
}
}
Output: {"_in":"file","_ts":1721954798,"area":"a","fval":1.2345,"ival":1} {"_in":"file","_ts":1721954799,"area":"b","fval":2.3456,"ival":2} {"_in":"file","_ts":1721954800,"area":"c","fval":3.4567,"ival":3}
Example (Decimal) ¶
package main
import (
"time"
"github.com/OutOfBedlam/tine/engine"
_ "github.com/OutOfBedlam/tine/plugins/base"
)
func main() {
dsl := `
[[inlets.file]]
data = [
"a,1,1.2345",
"b,2,2.3456",
"c,3,3.4567",
]
format = "csv"
fields = ["area", "ival", "fval"]
types = ["string", "int", "float"]
[[flows.select]]
includes = ["#*", "area", "ival", "fval"]
[[outlets.file]]
path = "-"
format = "json"
decimal = 2
`
// Mock the current time
count := int64(0)
engine.Now = func() time.Time { count++; return time.Unix(1721954797+count, 0) }
// Create a new pipeline
pipeline, err := engine.New(engine.WithConfig(dsl))
if err != nil {
panic(err)
}
if err := pipeline.Run(); err != nil {
panic(err)
}
}
Output: {"_in":"file","_ts":1721954798,"area":"a","fval":1.23,"ival":1} {"_in":"file","_ts":1721954799,"area":"b","fval":2.35,"ival":2} {"_in":"file","_ts":1721954800,"area":"c","fval":3.46,"ival":3}
Click to show internal directories.
Click to hide internal directories.