A lightweight and flexible generic pipeline library for Go. It allows you to define and compose stages and producers that transform or generate data in a pipeline style.
go get github.com/burik666/pipelineThe package provides the following core concepts:
- StageFn — a function that transforms input and optionally calls the next stage:
func(in T, next func(T) (T, error)) (T, error)
- NewStage — creates a stage with access to
next. - NewSimpleStage — a stage without
next, useful for final transformations. - NewProducer — a stage that emits values instead of consuming input.
- Pipeline — a sequence of stages, created with
Newand executed viaDo(with input) orRun(without explicit input). - Middleware — intercepts execution of stages (for logging, tracing, metrics).
- WithName — attaches names to stages for easier debugging.
- Reverse result propagation — stages can return values back up the chain instead of (or in addition to) passing them forward. This allows implementing finalizers, cleanup logic, or collecting results after downstream processing.
This example shows the most basic usage of a pipeline:
we define a stage that increments the input and chain it twice.
The input value 5 goes through two stages (+1, +1) and produces 7.
package main
import (
"fmt"
"github.com/burik666/pipeline"
)
func main() {
inc := func(in int, next func(int) (int, error)) (int, error) {
return next(in + 1)
}
result, err := pipeline.Do(
5,
inc,
inc,
)
if err != nil {
panic(err)
}
fmt.Println(result)
// Output: 7
}Here we demonstrate a producer stage:
instead of receiving input, it generates a sequence of numbers (0 to 4) and sends them down the pipeline.
Each value is then multiplied by 2 in the next stage.
The final pipeline prints 0, 2, 4, 6, 8.
package main
import (
"fmt"
"github.com/burik666/pipeline"
)
func main() {
producer := func(next func(int) (int, error)) error {
for i := 0; i < 5; i++ {
res, err := next(i)
if err != nil {
return err
}
fmt.Println(res)
}
return nil
}
mul2 := func(in int, next func(int) (int, error)) (int, error) {
return next(in * 2)
}
p := pipeline.New(
pipeline.NewProducer(producer),
pipeline.NewStage(mul2),
)
_, err := p.Run()
if err != nil {
panic(err)
}
}
// Output:
// 0
// 2
// 4
// 6
// 8This example shows how middleware can wrap stages to perform additional logic.
We add names to stages (stage1, stage2) and use middleware to log messages before and after each stage.
This demonstrates how you can implement logging, tracing, or metrics around every pipeline step.
package main
import (
"fmt"
"github.com/burik666/pipeline"
)
func main() {
inc := func(in int, next func(int) (int, error)) (int, error) {
return next(in + 1)
}
p := pipeline.New(
pipeline.NewStage(inc, pipeline.WithName("stage1")),
pipeline.NewStage(inc, pipeline.WithName("stage2")),
)
p.Middleware(func(in int, next func(int) (int, error), opts pipeline.Opts) (int, error) {
fmt.Printf("pre: %s\n", opts.Name())
v, err := next(in)
fmt.Printf("post: %s\n", opts.Name())
return v, err
})
res, err := p.Do(0)
if err != nil {
panic(err)
}
fmt.Println(res)
}
// Output:
// pre: stage1
// pre: stage2
// post: stage2
// post: stage1
// 2This pipeline demonstrates how to combine multiple transformations:
first, the number is doubled, then 3 is added.
Input 4 goes through *2 → +3 and produces 11.
package main
import (
"fmt"
"github.com/burik666/pipeline"
)
func main() {
double := func(in int, next func(int) (int, error)) (int, error) {
return next(in * 2)
}
addThree := func(in int, next func(int) (int, error)) (int, error) {
return next(in + 3)
}
result, err := pipeline.Do(4, double, addThree)
if err != nil {
panic(err)
}
fmt.Println(result)
// Output: 11 (4 * 2 + 3)
}MIT License — see the LICENSE file for details.