Composable async & concurrency patterns for Go.
Write pipelines, worker pools, and async utilities without worrying about race conditions, deadlocks, or goroutine leaks.
Install:
go get github.com/arrno/gliterImport:
import "github.com/arrno/gliter"Gliter makes it easy to assemble normal business logic into complex async flows.
Instead of spending time debugging goroutines, channels, or leaks, you define your data flow declaratively and let Gliter handle the concurrency patterns.
Compose stages of functions into a branching async pipeline:
gliter.NewPipeline(streamTransactionsFromKafka).
Stage(
preprocessFeatures,
).
Fork(
runFraudModel,
checkBusinessRules,
).
Merge(aggregateResults).
Fork(
sendToAlertSystem,
storeInDatabase,
).
Run()Key properties:
- Data always flows downstream from the generator through each stage.
- Side effects (DB writes, API calls, etc.) belong inside stage functions.
- Errors short-circuit the pipeline automatically.
Options:
Add multiple handlers in one stage to fork the pipeline (Fork is an alias for Stage):
gliter.NewPipeline(exampleGen()).
Fork(
exampleMid, // branch A
exampleAlt, // branch B
).
Stage(exampleEnd).
Run()👉 Each downstream stage is duplicated for each branch.
For branching without duplicating streams, use WorkPool Stage, Option Stage, or InParallel.
Fans out a handler function into N concurrent workers.
Each record is processed by exactly one worker (no cloning or duplication), then multiplexed onto the single downstream stream.
Configure behavior with options:
WithBuffer(M)→ buffered channel capacity between upstream and workersWithRetry(R)→ automatic retries on failure
Allows fine-grained control over throughput, backpressure, and fault tolerance.
gliter.NewPipeline(exampleGen()).
WorkPool(
func(item int) (int, error) { return 1 + item, nil },
3, // numWorkers
WithBuffer(6),
WithRetry(2),
).
WorkPool(
func(item int) (int, error) { return 2 + item, nil },
6, // numWorkers
WithBuffer(12),
WithRetry(2),
).
Run()Use when a worker pool needs distinct handlers but you still want automatic backpressure.
It's a convenience wrapper around WorkPool: pass a slice of handlers and Gliter will fan the stream across them while keeping worker semantics.
gliter.NewPipeline(exampleGen()).
MixPool([]func(int) (int, error){
func(item int) (int, error) { return item + 1, nil },
func(item int) (int, error) { return item * 2, nil },
},
WithRetry(1),
).
Run()Control concurrency when downstream stages overwhelm your DB or API:
gliter.NewPipeline(exampleGen()).
Fork(exampleMid, exampleMid).
Fork(exampleMid, exampleMid, exampleMid).
Throttle(2).
Stage(exampleEnd).
Run()Combine multiple branches into one:
gliter.NewPipeline(exampleGen()).
Fork(exampleMid, exampleMid).
Merge(func(items []int) ([]int, error) {
sum := 0
for _, item := range items {
sum += item
}
return []int{sum}, nil
}).
Stage(exampleEnd).
Run()Shortcut for "fork, do a little work, then merge" flows.
Under the hood it's just a Fork followed by Merge, so keep it brief for small aggregations.
gliter.NewPipeline(exampleGen()).
ForkOutIn(
[]func(int) (int, error){
func(item int) (int, error) { return item + 1, nil },
func(item int) (int, error) { return item - 1, nil },
},
func(items []int) ([]int, error) { return []int{items[0] - items[1]}, nil },
).
Run()Stick with the dedicated Fork/Merge stages when you need more complex fan-out trees.
Batch records for bulk operations:
func exampleBatch(items []int) ([]int, error) {
if err := storeToDB(items); err != nil {
return nil, err
}
return items, nil
}
gliter.NewPipeline(exampleGen()).
Stage(exampleMid).
Batch(100, exampleBatch).
Stage(exampleEnd).
Run()Route each record to exactly one handler (no cloning):
gliter.NewPipeline(exampleGen()).
Stage(exampleMid).
Option(
func(item int) (int, error) { return 1 + item, nil },
func(item int) (int, error) { return 2 + item, nil },
func(item int) (int, error) { return 3 + item, nil },
).
Stage(exampleEnd).
Run()Insert a buffer before a slow stage:
gliter.NewPipeline(exampleGen()).
Stage(exampleMid).
Buffer(5).
Stage(exampleEnd).
Run()Use the WithContext option for timeout/cancel:
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Second)
defer cancel()
gliter.NewPipeline(
exampleGen(),
gliter.WithContext(ctx),
).
Stage(exampleMid).
Stage(exampleEnd).
Run()Count items processed, either via config:
counts, err := gliter.NewPipeline(
exampleGen(),
gliter.WithReturnCount(),
).Run()Or with a tally channel:
pipeline := NewPipeline(exampleGen())
tally := pipeline.Tally()Enable logging for debugging:
WithLogCount— summary countsWithLogEmit— every emissionWithLogErr— errorsWithLogAll— all aboveWithLogStep— interactive stepper
gliter.NewPipeline(
exampleGen(),
gliter.WithLogAll(),
).
Fork(exampleMid, exampleAlt).
Stage(exampleEnd).
Run()Fan-out tasks, run concurrently, and collect results in order:
tasks := []func() (string, error){
func() (string, error) { return "Hello", nil },
func() (string, error) { return ", ", nil },
func() (string, error) { return "Async!", nil },
}
results, err := gliter.InParallel(tasks)Also available: InParallelThrottle for token-bucket concurrency.
Generic worker pools in one line:
results, errors := gliter.NewWorkerPool(3, handler).
Push(0, 1, 2, 3, 4).
Close().
Collect()Supported WorkerPool Options:
WithRetryWithBuffer
See ./examples/worker_pool/main.go for more.
ThrottleBy— custom throttlingTeeBy— channel forkingReadOrDone,WriteOrDone,IterDoneAny— consolidate “done” channelsMultiplex— merge streams
val := gliter.
List(0, 1, 2, 3, 4).
Filter(func(i int) bool { return i%2 == 0 }).
Map(func(val int) int { return val * 2 }).
Reduce(func(acc *int, val int) { *acc += val }) // 12Includes Filter, Map, Reduce, Find, Len, Reverse, At, Slice, etc.
PRs welcome! 🚀
If something feels missing, broken, or unclear, open an issue or submit a fix.