Skip to content

azorg/evt

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

31 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

evt

import "github.com/azorg/evt"

evt - simple in-process on-memory event bus based on Go channels

Features

  • Object oriented lightweight API (only two class: `Bus` and `Sub`)
  • One default event broker (bus) out of the box
  • Two publish methods (directly and buffered to inbox channel)
  • Flush method (including version with timeout)
  • Graceful shutdown

Block diagram of two event ways

  1. Short direct way (may blocking) `Publish[Ex](topic, message)` -> Each subscriber channel -> Each subscriber

  2. Long buffered way via inbox channel (non-blocking) `PublishInbox[Ex](topic, message)` -> Common inbox channel -> Bus monitor -> Each subscriber channel -> Each subscriber

Examples

// Create new event bus (broker)
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
bus := evt.Bus(context.Background(), inboxChannelSize) // *evt.Bus

// Subscribe to event topic
sub := bus.Subscribe("topic", channelSize) // *evt.Sub
...
topic := sub.Topic() // get subscriber topic
...
subscribed := sub.IsSubscribed() // check subscription (bool)

// Wait event
// ^^^^^^^^^^
msg, ok := sub.Wait() // any, bool
if !ok { // subscriber unsubscribed or bus canceled
...
}

// Read from subscriber channel
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
select {
case msg, ok := <-sub.C(): // any, bool
	if !ok { // subscriber unsubscribed or bus canceled
	...
	}
...
} // select

// Publish event to topic directly (may blocking)
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
msg := "hello"
count, err := bus.Publish("topic", msg)
if err != nil {
	if errors.Is(err, context.Canceled) {
		...
	}
}
if count == 0 { // no any subscribers, message lost
	...
}
...
msg = "world"
count, err = bus.PublishEx("topic", msg, 3*time.Second) // timeout = 3s
if err != nil {
	if errors.Is(err, context.Canceled) {
		...
	} else if errors.Is(err, evt.ErrTimeout) {
		...
	}
}

// Publish event to topic via inbox channel (buffered, non-blocking)
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
msg := "hello"
bus.PublishInbox("topic", msg)
...
msg = "world"
bus.PublishInboxEx("topic", msg, 5*time.Second) // timeout = 5s

// Wait until all published message delivered (flush)
bus.Flush()
...
err := bus.FlushEx(10*time.Secind) // timeout = 10s
if err != nil {
	if errors.Is(err, evt.ErrTimeout) { // timeout
		...
	}
}

// Unsubscribe from event topic
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
sub.Cancel()

// Graceful shutdown
// ^^^^^^^^^^^^^^^^^
// Cancel bus, unsubscribe all subscribers, cancel goroutines
bus.Cancel()

// Wait until graceful shutdown with timeout (wait goroutines finished)
err := bus.WaitEx(10*time.Second) // timeout = 10s
if err != nil {
	if errors.Is(err, evt.ErrTimeout) { // timeout
		...
	}
}

Index

Constants

Default inbox channel size

const DefaultInboxSize = 1000

Variables

var ErrTimeout = errors.New("timeout")

func Cancel

func Cancel()

Cancel bus, unsubscribe all subscribers, cancel goroutines

func Count

func Count(topic string) int

Get count of event subscribers

func Flush

func Flush()

Wait until all published message delivered

func FlushEx

func FlushEx(timeout time.Duration) error

Wait until all published message delivered with timeout

func Publish

func Publish(topic string, msg any) (int, error)

Publish event to topic immediately (may blocking)

func PublishEx

func PublishEx(topic string, msg any, timeout time.Duration) (int, error)

Publish event to topic immediately with timeout (may blocking)

func PublishInbox

func PublishInbox(topic string, msg any)

Publish event to topic via inbox channel (non-blocking)

func PublishInboxEx

func PublishInboxEx(topic string, msg any, timeout time.Duration)

Publish event to topic via inbox channel with timeout (non-blocking)

func Topics

func Topics() []string

Get all subscribed topics

func Wait

func Wait()

Wait until graceful shutdown (wait goroutines finished)

func WaitEx

func WaitEx(timeout time.Duration) error

Wait until graceful shutdown with timeout (wait goroutines finished)

type Bus

Event bus (broker)

type Bus struct {
    // contains filtered or unexported fields
}

func DefaultBus

func DefaultBus() *Bus

Get default event bus

func New

func New(ctx context.Context, inboxSize int) *Bus

Create new event bus (broker)

ctx - cancel context
inboxSize - inbox channel size

func (*Bus) Cancel

func (bus *Bus) Cancel()

Cancel bus, unsubscribe all subscribers, cancel goroutines

func (*Bus) Count

func (bus *Bus) Count(topic string) int

Get count of event subscribers

func (*Bus) Flush

func (bus *Bus) Flush()

Wait until all published message delivered

func (*Bus) FlushEx

func (bus *Bus) FlushEx(timeout time.Duration) error

Wait until all published message delivered with timeout

func (*Bus) Publish

func (bus *Bus) Publish(topic string, msg any) (count int, err error)

Publish event to topic immediately (may blocking)

topic - event topic
msg - message (event payload)

count - actual number of topic subscribers
err - nil or context.Canceled

func (*Bus) PublishEx

func (bus *Bus) PublishEx(topic string, msg any, timeout time.Duration) (count int, err error)

Publish event to topic immediately with timeout (may blocking)

topic - event topic
msg - message (event payload)
timeout - timeoit of write to each subscriber channel

count - actual number of topic subscribers
err - nil or context.Canceled or ErrTimeout

func (*Bus) PublishInbox

func (bus *Bus) PublishInbox(topic string, msg any)

Publish event to topic via inbox channel (non-blocking)

topic - event topic
msg - message (event payload)

func (*Bus) PublishInboxEx

func (bus *Bus) PublishInboxEx(topic string, msg any, timeout time.Duration)

Publish event to topic via inbox channel with timeout (non-blocking)

topic - event topic
msg - message (event payload)
timeout - timeoit of write to each subscriber channel

func (*Bus) Subscribe

func (bus *Bus) Subscribe(topic string, size int) *Sub

Subsctibe to event topic

topic - event topic
size - channel size of subscriber

func (*Bus) Topics

func (bus *Bus) Topics() []string

Get all subscribed topics

func (*Bus) Wait

func (bus *Bus) Wait()

Wait until graceful shutdown (wait goroutines finished)

func (*Bus) WaitEx

func (bus *Bus) WaitEx(timeout time.Duration) error

Wait until graceful shutdown with timeout (wait goroutines finished)

type BusInterface

Event bus interface

type BusInterface interface {
    // Subscribe to topic event
    Subscribe(topic string, size int) *Sub

    // Get all subscribed topics
    Topics() []string

    // Get count of event subscribers
    Count(topic string) int

    // Publish event to topic immediately (may blocking)
    Publish(topic string, msg any) (
        count int, err error)

    // Publish event to topic immediately with timeout (may blocking)
    PublishEx(topic string, msg any, timeout time.Duration) (
        count int, err error)

    // Publish event to topic via inbox channel (non-blocking)
    PublishInbox(topic string, msg any)

    // Publish event to topic via inbox channel with timeout (non-blocking)
    PublishInboxEx(topic string, msg any, timeout time.Duration)

    // Wait until all published message delivered
    Flush()

    // Wait until all published message delivered with timeout
    FlushEx(timeout time.Duration) error

    // Cancel bus, unsubscribe all subscribers, cancel goroutines
    Cancel()

    // Wait until graceful shutdown (wait goroutines finished)
    Wait()

    // Wait until graceful shutdown with timeout (wait goroutines finished)
    WaitEx(timeout time.Duration) error
}

type Sub

Subscriber handler

type Sub struct {
    // contains filtered or unexported fields
}

func Subscribe

func Subscribe(topic string, size int) *Sub

Subscribe to event topic

func (*Sub) C

func (sub *Sub) C() <-chan any

Get subscriber channel

func (*Sub) Cancel

func (sub *Sub) Cancel()

Unsubscribe from event topic

func (*Sub) IsSubscribed

func (sub *Sub) IsSubscribed() bool

Check subscription

func (*Sub) Topic

func (sub *Sub) Topic() string

Return subscriber topic

func (*Sub) Wait

func (sub *Sub) Wait() (msg any, ok bool)

Wait event (read from channel)

type SubInterface

Subscriber interface

type SubInterface interface {
    Topic() string            // return subscriber topic
    IsSubscribed() bool       // check subscription
    C() <-chan any            // get subscriber channel
    Wait() (msg any, ok bool) // wait event (read from channel)
    Cancel()                  // unsubscribe from topic
}

Generated by gomarkdoc

About

Simple in-process on-memory event bus based on Go channels

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors