The chans package provides generic channel operations to help you build concurrent pipelines in Go. It aims to be flexible, unopinionated, and composable, without over-abstracting or taking control away from the developer.
// Given a channel of documents.
docs := make(chan []string, 10)
docs <- []string{"go", "is", "awesome"}
docs <- []string{"cats", "are", "cute"}
close(docs)
// Extract all words from the documents.
words := make(chan string, 10)
chans.Flatten(ctx, words, docs)
close(words)
// Calculate the total byte count of all words.
step := func(acc int, word string) int { return acc + len(word) }
count := chans.Reduce(ctx, words, 0, step)
fmt.Println("byte count =", count)
// byte count = 22You can find function signatures and usage examples at the links below, or check out the full package documentation.
The golden trio:
- Filter sends values from the input channel to the output if a predicate returns true.
- Map reads values from the input channel, applies a function, and sends the result to the output.
- Reduce combines all values from the input channel into one using a function and returns the result.
Filtering and sampling:
- FilterOut ignores values from the input channel if a predicate returns true, otherwise sends them to the output.
- Drop skips the first N values from the input channel and sends the rest to the output.
- DropWhile skips values from the input channel as long as a predicate returns true, then sends the rest to the output.
- Take sends up to N values from the input channel to the output.
- TakeNth sends every Nth value from the input channel to the output.
- TakeWhile sends values from the input channel to the output while a predicate returns true.
- First returns the first value from the input channel that matches a predicate.
Batching and windowing:
- Chunk groups values from the input channel into fixed-size slices and sends them to the output.
- ChunkBy groups consecutive values from the input channel into slices whenever the key function's result changes.
- Flatten reads slices from the input channel and sends their elements to the output in order.
De-duplication:
- Compact sends values from the input channel to the output, skipping consecutive duplicates.
- CompactBy sends values from the input channel to the output, skipping consecutive duplicates as determined by a custom equality function.
- Distinct sends values from the input channel to the output, skipping all duplicates.
- DistinctBy sends values from the input channel to the output, skipping duplicates as determined by a key function.
Routing:
- Broadcast sends every value from the input channel to all output channels.
- Split sends values from the input channel to output channels in round-robin fashion.
- Partition sends values from the input channel to one of two outputs based on a predicate.
- Merge concurrently sends values from multiple input channels to the output, with no guaranteed order.
- Concat sends values from multiple input channels to the output, processing each input channel in order.
- Drain consumes and discards all values from the input channel.
I think third-party concurrency packages are often too opinionated and try to hide too much complexity. As a result, they end up being inflexible and don't fit a lot of use cases.
For example, here's how you use the Map function from the rill package:
// Concurrency = 3
users := rill.Map(ids, 3, func(id int) (*User, error) {
return db.GetUser(ctx, id)
})The code looks simple, but it makes Map pretty opinionated and not very flexible:
- The function is non-blocking and spawns a goroutine. There is no way to change this.
- The function doesn't exit early on error. There is no way to change this.
- The function creates the output channel. There is no way to control its buffering or lifecycle.
- The function can't be canceled.
- The function requires the developer to use a custom
Try[T]type for both input and output channels. - The "N workers" logic is baked in, so you can't use a custom concurrent group implementation.
While this approach works for many developers, I personally don't like it. With chans, my goal was to offer a fairly low-level set of composable channel operations and let developers decide how to use them.
For comparison, here's how you use the chans.Map function:
err := chans.Map(ctx, users, ids, func(id int) (*User, error) {
return db.GetUser(ctx, id)
})chans.Map only implements the core mapping logic:
- Reads values from the input channel.
- Calls the mapping function on each value.
- Writes results to the output channel.
- Stops if there's an error or if the context is canceled.
- Does not start any additional goroutines.
You decide the rest:
- Want Map to be non-blocking? Run it in a goroutine.
- Don't want to exit early? Gather the errors instead of returning them.
- Want to buffer the output channel or keep it opened? You have full control.
- Need to process input in parallel? Use
errgroup.Group, orsync.WaitGroup, or any other implementation.
The same applies to other channel operations.
Created by Anton Zhiyanov. Released under the MIT License.