Skip to content

arrno/gliter

Repository files navigation

Gliter ✨

Go Reference Go Build Go Report Card License: MIT

Composable async & concurrency patterns for Go.
Write pipelines, worker pools, and async utilities without worrying about race conditions, deadlocks, or goroutine leaks.


Quick Start

Install:

go get github.com/arrno/gliter

Import:

import "github.com/arrno/gliter"

Table of Contents


Overview

Gliter makes it easy to assemble normal business logic into complex async flows.

Instead of spending time debugging goroutines, channels, or leaks, you define your data flow declaratively and let Gliter handle the concurrency patterns.


Pipeline

Compose stages of functions into a branching async pipeline:

gliter.NewPipeline(streamTransactionsFromKafka).
    Stage(
        preprocessFeatures,
    ).
    Fork(
        runFraudModel,
        checkBusinessRules,
    ).
    Merge(aggregateResults).
    Fork(
        sendToAlertSystem,
        storeInDatabase,
    ).
    Run()

Key properties:

  • Data always flows downstream from the generator through each stage.
  • Side effects (DB writes, API calls, etc.) belong inside stage functions.
  • Errors short-circuit the pipeline automatically.

Options:


Fork

Add multiple handlers in one stage to fork the pipeline (Fork is an alias for Stage):

gliter.NewPipeline(exampleGen()).
    Fork(
        exampleMid, // branch A
        exampleAlt, // branch B
    ).
    Stage(exampleEnd).
    Run()

👉 Each downstream stage is duplicated for each branch.
⚠️ If processing pointers, clone before mutating downstream.

For branching without duplicating streams, use WorkPool Stage, Option Stage, or InParallel.


WorkPool Stage

Fans out a handler function into N concurrent workers.
Each record is processed by exactly one worker (no cloning or duplication), then multiplexed onto the single downstream stream.

Configure behavior with options:

  • WithBuffer(M) → buffered channel capacity between upstream and workers
  • WithRetry(R) → automatic retries on failure

Allows fine-grained control over throughput, backpressure, and fault tolerance.

gliter.NewPipeline(exampleGen()).
    WorkPool(
        func(item int) (int, error) { return 1 + item, nil },
        3, // numWorkers
        WithBuffer(6),
        WithRetry(2),
    ).
    WorkPool(
        func(item int) (int, error) { return 2 + item, nil },
        6, // numWorkers
        WithBuffer(12),
        WithRetry(2),
    ).
    Run()

MixPool

Use when a worker pool needs distinct handlers but you still want automatic backpressure.
It's a convenience wrapper around WorkPool: pass a slice of handlers and Gliter will fan the stream across them while keeping worker semantics.

gliter.NewPipeline(exampleGen()).
    MixPool([]func(int) (int, error){
        func(item int) (int, error) { return item + 1, nil },
        func(item int) (int, error) { return item * 2, nil },
    },
        WithRetry(1),
    ).
    Run()

Throttle

Control concurrency when downstream stages overwhelm your DB or API:

gliter.NewPipeline(exampleGen()).
    Fork(exampleMid, exampleMid).
    Fork(exampleMid, exampleMid, exampleMid).
    Throttle(2).
    Stage(exampleEnd).
    Run()

Merge

Combine multiple branches into one:

gliter.NewPipeline(exampleGen()).
    Fork(exampleMid, exampleMid).
    Merge(func(items []int) ([]int, error) {
        sum := 0
        for _, item := range items {
            sum += item
        }
        return []int{sum}, nil
    }).
    Stage(exampleEnd).
    Run()

ForkOutIn

Shortcut for "fork, do a little work, then merge" flows.
Under the hood it's just a Fork followed by Merge, so keep it brief for small aggregations.

gliter.NewPipeline(exampleGen()).
    ForkOutIn(
        []func(int) (int, error){
            func(item int) (int, error) { return item + 1, nil },
            func(item int) (int, error) { return item - 1, nil },
        },
        func(items []int) ([]int, error) { return []int{items[0] - items[1]}, nil },
    ).
    Run()

Stick with the dedicated Fork/Merge stages when you need more complex fan-out trees.


Batch

Batch records for bulk operations:

func exampleBatch(items []int) ([]int, error) {
    if err := storeToDB(items); err != nil {
        return nil, err
    }
    return items, nil
}

gliter.NewPipeline(exampleGen()).
    Stage(exampleMid).
    Batch(100, exampleBatch).
    Stage(exampleEnd).
    Run()

Option

Route each record to exactly one handler (no cloning):

gliter.NewPipeline(exampleGen()).
    Stage(exampleMid).
    Option(
        func(item int) (int, error) { return 1 + item, nil },
        func(item int) (int, error) { return 2 + item, nil },
        func(item int) (int, error) { return 3 + item, nil },
    ).
    Stage(exampleEnd).
    Run()

Buffer

Insert a buffer before a slow stage:

gliter.NewPipeline(exampleGen()).
    Stage(exampleMid).
    Buffer(5).
    Stage(exampleEnd).
    Run()

Context / Cancel

Use the WithContext option for timeout/cancel:

ctx, cancel := context.WithTimeout(context.Background(), 200*time.Second)
defer cancel()

gliter.NewPipeline(
    exampleGen(),
    gliter.WithContext(ctx),
).
    Stage(exampleMid).
    Stage(exampleEnd).
    Run()

Count / Tally

Count items processed, either via config:

counts, err := gliter.NewPipeline(
    exampleGen(),
    gliter.WithReturnCount(),
).Run()

Or with a tally channel:

pipeline := NewPipeline(exampleGen())
tally := pipeline.Tally()

Insight

Enable logging for debugging:

  • WithLogCount — summary counts
  • WithLogEmit — every emission
  • WithLogErr — errors
  • WithLogAll — all above
  • WithLogStep — interactive stepper
gliter.NewPipeline(
    exampleGen(),
    gliter.WithLogAll(),
).
    Fork(exampleMid, exampleAlt).
    Stage(exampleEnd).
    Run()

InParallel

Fan-out tasks, run concurrently, and collect results in order:

tasks := []func() (string, error){
    func() (string, error) { return "Hello", nil },
    func() (string, error) { return ", ", nil },
    func() (string, error) { return "Async!", nil },
}

results, err := gliter.InParallel(tasks)

Also available: InParallelThrottle for token-bucket concurrency.


Worker Pool (standalone)

Generic worker pools in one line:

results, errors := gliter.NewWorkerPool(3, handler).
    Push(0, 1, 2, 3, 4).
    Close().
    Collect()

Supported WorkerPool Options:

  • WithRetry
  • WithBuffer

See ./examples/worker_pool/main.go for more.


Misc Utilities

  • ThrottleBy — custom throttling
  • TeeBy — channel forking
  • ReadOrDone, WriteOrDone, IterDone
  • Any — consolidate “done” channels
  • Multiplex — merge streams

List Type (sync helpers)

val := gliter.
    List(0, 1, 2, 3, 4).
    Filter(func(i int) bool { return i%2 == 0 }).
    Map(func(val int) int { return val * 2 }).
    Reduce(func(acc *int, val int) { *acc += val }) // 12

Includes Filter, Map, Reduce, Find, Len, Reverse, At, Slice, etc.


Examples


Contributing

PRs welcome! 🚀
If something feels missing, broken, or unclear, open an issue or submit a fix.

About

Go-lang-iter tools aims to make it easier to compose complex async systems in Go.

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages