A generic, high-performance, dynamic-capacity channel for Go.
This library provides an advanced channel implementation that offers more flexibility than Go's native channels. It's designed for scenarios requiring an unbounded buffer with the ability to apply backpressure dynamically, preventing uncontrolled memory growth.
- Generic & Type-Safe: Built with Go 1.18+ generics for full type safety.
- Effectively Unbounded Buffer: Uses a linked-list as an internal buffer, allowing it to grow as long as memory is available.
- Dynamic Capacity & Backpressure: You can set a "soft" capacity at runtime. When the number of buffered items reaches this capacity, the channel applies backpressure on the input, slowing down producers until consumers catch up.
- Graceful & Immediate Shutdown:
Close(): Gracefully closes the input, processes all buffered items, and then closes the output.Shutdown(): Immediately closes the channel and discards all buffered items.
- High Performance: Utilizes a
sync.Poolto reuse internal buffer nodes, reducing GC pressure in high-throughput applications. StopChanUtility: Includes a robust utility for coordinating the graceful shutdown of multiple goroutines.
go get github.com/qjpcpu/channelCreate a channel, send some data, and receive it. The Close() method ensures that the consumer can exit its loop gracefully.
package main
import (
"fmt"
"github.com/qjpcpu/channel"
)
func main() {
ch := channel.New[int]()
// Producer
go func() {
defer ch.Close() // Close the input when done
for i := 0; i < 5; i++ {
ch.In() <- i
fmt.Printf("Sent: %d\n", i)
}
}()
// Consumer
for val := range ch.Out() {
fmt.Printf("Received: %d\n", val)
}
// Wait until the channel is fully drained and closed
<-ch.Done()
fmt.Println("Channel is fully drained.")
}You can set a capacity to prevent the internal buffer from growing indefinitely. When the buffer is full, writes to In() will block until there is space.
package main
import (
"fmt"
"time"
"github.com/qjpcpu/channel"
)
func main() {
// Create a channel with a capacity of 2
ch := channel.New[int]().SetCap(2)
// Send 2 items, which will fill the buffer
ch.In() <- 1
ch.In() <- 2
fmt.Printf("Buffer length is now: %d\n", ch.Len()) // Outputs: 2
// This next send will block until an item is consumed
select {
case ch.In() <- 3:
fmt.Println("This should not be printed immediately.")
case <-time.After(100 * time.Millisecond):
fmt.Println("ch.In() is blocked as expected.")
}
// Consume one item
val := <-ch.Out()
fmt.Printf("Consumed: %d\n", val)
fmt.Printf("Buffer length is now: %d\n", ch.Len()) // Outputs: 1
// Now we can send another item without blocking
ch.In() <- 3
fmt.Printf("Buffer length is now: %d\n", ch.Len()) // Outputs: 2
}NewT any Channel[T]: Creates and returns a new dynamic channel.In() chan<- T: Returns the write-only input channel.Out() <-chan T: Returns the read-only output channel.Len() int64: Returns the current number of items in the buffer.Cap() int64: Returns the current capacity.0or less means unbounded.SetCap(c int64) Channel[T]: Sets the capacity. Can be changed at any time.Close(): Initiates a graceful shutdown. Closes theIn()channel and processes all buffered items.Done() <-chan struct{}: Returns a channel that is closed whenClose()has been called and all items have been sent toOut().Shutdown(): Initiates an immediate shutdown. Closes the channel and discards all buffered data.
StopChan is a helper for coordinating the graceful shutdown of multiple goroutines. It combines a sync.WaitGroup with a signal channel.
package main
import (
"fmt"
"time"
"github.com/qjpcpu/channel"
)
func worker(id int, sc channel.StopChan) {
sc.Add(1) // Register this worker
defer sc.Done() // Unregister when exiting
fmt.Printf("Worker %d started\n", id)
for {
select {
case <-time.After(500 * time.Millisecond):
fmt.Printf("Worker %d is doing work...\n", id)
case <-sc.C():
// Stop signal received
fmt.Printf("Worker %d is stopping.\n", id)
return
}
}
}
func main() {
stopController := channel.NewStopChan()
// Start a few workers
for i := 1; i <= 3; i++ {
go worker(i, stopController)
}
// Run for a while
time.Sleep(2 * time.Second)
// Trigger the shutdown
fmt.Println("Main: Sending stop signal...")
stopController.Stop() // This will block until all workers call Done()
fmt.Println("Main: All workers have stopped. Exiting.")
}Contributions are welcome! Please feel free to submit a pull request.
This project is licensed under the MIT License.