Generic ring buffer and concurrent worker pool for Go, with blocking/non-blocking push/pop, multi-reader support, and event-driven task dispatch.
go get github.com/sonnt85/goring- Generic
RingBuffer[T]— circular buffer implementingio.ReaderWriter-style API - Blocking (
PushWait,PopWait,ReadWait,WriteWait) and non-blocking (TryPush,TryPop) operations - Timeout and context-aware push/pop (
PushWaitTimeOut,ReadWaitTimeOut,WriteWaitTimeOut) RingMultipleReader[T]— lock-free ring buffer supporting multiple independent consumersEventWorker[K]— concurrent worker pool backed by aRingBufferqueueEventWorkerMap[K]— concurrent worker pool backed by a map (dedup by key)- Task submission variants:
Submit,SubmitForce,TrySubmit,SubmitWaitDone,SubmitWithTimeout - Per-task result retrieval, wait-until-finish, and eviction callbacks
- Worker pause/resume and queue enable/disable controls
- Linked-list event queue (
EventLinkedList) - Stats reporting for both ring buffer and worker pool
// Ring buffer
ring := goring.NewRing[int](64)
ring.PushWait(42)
val := ring.PopWait()
// Worker pool (max 4 workers, buffer 100, task TTL 10s, error TTL 30s)
pool := goring.NewEventWorker[string](4, 100, 10*time.Second, 30*time.Second)
pool.EnableWorker()
task, err := pool.Submit("my-task", 5*time.Second,
func() string { return "task-id" },
func(x int) int { return x * 2 }, 21,
)
// Wait for a specific task
retvals, _ := pool.WaitUntilTaskFinishThenDelete("task-id")
// Multiple-reader ring buffer
rmr, _ := goring.NewRingMultipleReader[string](256, 4)
consumer, _ := rmr.NewConsumer()
go rmr.Write("hello")
val2 := consumer.Get()NewRing[T](size int) *RingBuffer[T]— create a new ring bufferPush(c T) error/PushWait(c T)/PushForce(c T)/TryPush(c T) error— write one elementPop() (T, error)/PopWait() T/TryPop() (T, error)— read one elementWrite(p []T)/WriteWait(p []T)/TryWrite(p []T)— write sliceRead(p []T)/ReadWait(p []T)/ReadAll()/ReadAllWait()— read sliceTryRead(p []T) (n int, err error)— non-blocking read; returnsErrAcquireLockif lock is unavailableTryReadAll() ([]T, error)— non-blocking read of all available elementsPushWaitTimeOut(c T, timeout time.Duration) error— push with deadlineReadWaitTimeOut(c []T, timeout time.Duration, ctxs ...context.Context)— read with deadlineWaitUntilNotFull()— block until the buffer has at least one free slotWaitUntilEmpty()— block until the buffer is completely drainedLength() int/Capacity() int/Free() int/IsEmpty() bool/IsFull() boolReset()/Resize()/Copy() []T/Stats()
NewRingMultipleReader[T](size, maxConsumers uint32)— create multi-reader ringWrite(value T)— write (blocks until space is available for all consumers)NewConsumer() (Consumer[T], error)— create an independent consumerConsumer.Get() T— blocking read for that consumerConsumer.Remove()— deregister consumer
NewEventWorker[K](maxWorkers, buffsize int, defaultExpiration, errorTTL time.Duration) *EventWorker[K]Submit/SubmitForce/TrySubmit/SubmitWithTimeout/SubmitWaitDoneGetSavedTask(id K)/GetResultTask(id K)/WaitUntilTaskFinishThenDelete(id K)GetSavedTaskThenDelete(id K) (*Task, bool)— retrieve a saved task and remove it from the storeGetResultTaskThenDelete(id K) ([]interface{}, bool, bool)— retrieve task result values and remove the taskGetNumFinishTask() uint32— return the total number of completed tasksOnEvictedSavedTask(f func(K, *Task))— register a callback invoked when a saved task expires or is evictedOnEvictedFinishTask(f func(*Task))— register a callback invoked each time a task finishes successfullyStopped() bool— return true if the queue is currently emptyWaitUntilAllTaskFinish()/EnableWorker()/DisableWorker()/EnableQueue()/DisableQueue()Stats()/Size()/ConfigMaxWorker(n int)
Same API as EventWorker[K] but uses a map as backing store (newer submission for same key replaces previous).
A generic ordered list with a movable read cursor, designed for cycling through a fixed set of items.
NewEventLinkedList[T]() *EventLinkedList[T]— create an empty linked listNext() (T, error)— advance the cursor forward by one and return the elementNextWait() (T, error)— likeNextbut blocks until the list is non-emptyPrev() (T, error)— move the cursor backward by one and return the elementPrevWait() (T, error)— likePrevbut blocks until the list is non-emptySeek(n int) (T, error)— move the cursor bynsteps (positive or negative) and return the elementSeekWait(n int) (T, error)— likeSeekbut blocks until the list is non-emptyCurrent() (T, error)— return the element at the current cursor position without movingInsert(n int, els ...T) error— insert elements before positionnRemove(n int) error— remove the element at positionnCopy() ([]T, error)— return a snapshot copy of the entire listUpdateNewEventLinkedList(p []T) bool— atomically replace list contents if they differ; broadcasts to waiting goroutines; returns true if changedLength() int/Reset()/String() string
sonnt85 — thanhson.rf@gmail.com
MIT License - see LICENSE for details.