Skip to content

Releases: centrifugal/centrifuge

v0.18.1

28 Aug 11:09
af3115f

Choose a tag to compare

  • Add history reverse field to client protocol and handle it.

v0.18.0

28 Aug 09:19
98f6388

Choose a tag to compare

This release has several backward incompatible changes. Changes should affect the small number of Centrifuge library users, and it should be possible to adapt only by slightly modifying server-side code. Follow release notes carefully.

  • Breaking change. Client history API behavior changed. Now history call does not return all publications by default (#196). See an advice how you can handle this change in a backwards compatible way below.
  • Breaking change. Redis STREAM data structure now used by default to keep a publication history in Redis Engine (#195). Previously Centrifuge Redis Engine used LIST data structure by default. See RedisBrokerConfig.UseLists option to turn on previous behavior. Redis streams is a recommended way though, support for LIST data structure may be eventually removed.
  • Breaking change. Transport.Encoding removed. It turns out that this option is not really useful for the Centrifuge library and can only cause confusion.
  • Breaking change. Change Node.History behavior – Unrecoverable Position error now returned based on the wrong epoch only.
  • Breaking change. Remove deprecated seq/gen fields - see #197. Those were deprecated for a long time.
  • Breaking change. Client.Connect now does not return an error – this allows Centrifuge to automatically send a proper Disconnect push towards the connection.
  • Breaking change. WithClientWhitelist renamed to WithDisconnectClientWhitelist.
  • Much faster JSON client protocol. Expect at least 4x speedup for small messages JSON encoding/decoding. For large messages the difference can be even bigger. This is possible due to using code generation for encoding and a faster library for JSON decoding in centrifugal/protocol package. See centrifugal/protocol#8.
  • Message broadcast allocates less - see #193. Can be noticeable when broadcasting messages to large number of active subscribers. The side effect of this change is that Transport implementations should now have Write and WriteMany methods.
  • Centrifuge now uses official Protobuf library for Go with planetscale/vtprotobuf code generator instead of gogo/protobuf library which is not maintained these days anymore. The performance of Protobuf marshaling/unmarshaling is comparable.
  • New Config.UseSingleFlight option added. The option can help to reduce the load on Broker and Presence manager during massive reconnect and history synchronization scenarios.
  • WebSocket subprotocol is now can be used for switching to Protobuf protocol (#194). This will help to avoid adding ?format=protobuf in WebSocket connection URL.
  • OnTransportWrite callback added to inject custom logic before actual write to a client connection.
  • OnNodeInfoSend callback added to attach custom data to Node control frame.
  • Client.Info method which returns a copy of the connection info (set by Credentials).
  • Node.History now supports iteration in reversed order (#201).
  • Client.Refresh and Node.Refresh methods added to prolong/expire connections (useful for unidirectional transports).
  • GRPC unidirectional transport example improvements

Regarding client history API change. So previously when a client called history it received all publications in a stream by default. In Centrifuge v0.18.0 it will only receive current stream top position (offset and epoch) without any publications.

To mimic previous behavior you can use code like this:

node.OnConnect(func(client *centrifuge.Client) {
    client.OnHistory(func(e centrifuge.HistoryEvent, cb centrifuge.HistoryCallback) {
        if e.Filter.Limit == 0 {
            result, err := node.History(e.Channel,
                centrifuge.WithSince(e.Filter.Since),
                centrifuge.WithLimit(centrifuge.NoLimit),
            )
            if err != nil {
                cb(centrifuge.HistoryReply{}, err)
                return
            }
            cb(centrifuge.HistoryReply{Result: &result}, nil)
            return
        }
        cb(centrifuge.HistoryReply{}, nil)
    })
})

I.e. explicitly handle zero limit and return all publications in response (using centrifuge.NoLimit). Then upgrade clients to use recent Centrifuge clients (will be released soon after Centrifuge v0.18.0) which allow setting limit explicitly and remove this custom logic eventually from the server code.

gorelease -base v0.17.1 -version v0.18.0
github.com/centrifugal/centrifuge
---------------------------------
Incompatible changes:
- (*Client).Connect: changed from func(ConnectRequest) error to func(ConnectRequest)
- CompatibilityFlags: removed
- EncodingType: removed
- EncodingTypeBinary: removed
- EncodingTypeJSON: removed
- NodeInfo: old is comparable, new is not
- RedisBrokerConfig.UseStreams: removed
- Transport.Write: changed from func(...[]byte) error to func([]byte) error
- Transport.WriteMany: added
- TransportInfo.Encoding, method set of Transport: removed
- TransportInfo.Encoding: removed
- UseSeqGen: removed
- WithClientWhitelist: removed
Compatible changes:
- (*Client).Info: added
- (*Client).Refresh: added
- (*Hub).NumSubscriptions: added
- (*Hub).UserConnections: added
- (*Node).OnNodeInfoSend: added
- (*Node).OnTransportWrite: added
- (*Node).Refresh: added
- Config.UseSingleFlight: added
- ConnectEvent.Channels: added
- DisconnectChannelLimit: added
- HistoryFilter.Reverse: added
- HistoryOptions.Reverse: added
- NodeInfo.Data: added
- NodeInfo.NumSubs: added
- NodeInfoSendHandler: added
- NodeInfoSendReply: added
- RedisBrokerConfig.UseLists: added
- RefreshOption: added
- RefreshOptions: added
- SubscribeOptions.RecoverSince: added
- TransportWriteEvent: added
- TransportWriteHandler: added
- WithDisconnectClient: added
- WithDisconnectClientWhitelist: added
- WithRecoverSince: added
- WithRefreshClient: added
- WithRefreshExpireAt: added
- WithRefreshExpired: added
- WithRefreshInfo: added
- WithReverse: added

v0.18.0 is a valid semantic version for this release.

v0.17.1

23 Jun 08:25
60485bf

Choose a tag to compare

  • Possibility to bypass broker when publishing - see #198
  • Added more tests

v0.17.0

10 Apr 10:03
dd26c36

Choose a tag to compare

  • Remove unidirectional WebSocket support from builtin WebSocket transport. This was done to not force transport implementation details and to reduce code in a library core. It's still possible to build unidirectional WebSocket transport - just not with a builtin WebsocketHandler, see example.
  • Introduce node.Notify method. Notify allows sending an asynchronous notification to all other node (or to a single specific node). Unlike Survey it does not wait for any response
  • Since option of history call renamed to WithSince for consistency with other option names
gorelease -base v0.16.0 -version v0.17.0
github.com/centrifugal/centrifuge
---------------------------------
Incompatible changes:
- Since: removed
- WebsocketConfig.Unidirectional: removed
Compatible changes:
- (*Node).Notify: added
- (*Node).OnNotification: added
- NotificationEvent: added
- NotificationHandler: added
- WithSince: added

v0.16.0

22 Mar 18:59
168343a

Choose a tag to compare

This release is huge. The list of changes may look scary - but most changes should be pretty straightforward to adopt.

Highlights:

  • Support for unidirectional clients, this opens a road to more adoption of Centrifuge for cases where bidirectional communication is not really needed. Unidirectional support is still a subject to change in future versions as soon as more feedback appears – for now Centrifuge has examples for GRPC, EventSource(SSE), Fetch Streams, Unidirectional WebSocket transports. The beauty here is that you don't need to use any Centrifuge client library to receive real-time updates - just use native browser APIs or GRPC generated code with simple decoding step.
  • The introduction of unidirectional transport required to change Transport interface a bit. The important thing is that it's now a responsibility of Transport.Write to properly encode data to JSON-streaming or Protobuf length-delimited format
  • Centrifuge now uses same-origin policy by default when checking incoming WebSocket or SockJS request due to security considerations (prevent CSRF attack), previously default check allowed all connections. If you want to mimic previous behavior then pass custom check functions to handler configurations – see example below.
  • New Subscribe method of Node - to subscribe user to server-side channels cluster-wide - see example that demonstrates new API
  • Engine interface removed - now Centrifuge only has separate Broker and PresenceManager entities. This changes how you should set up Redis to scale Nodes - see updated Redis example - it's now required to provide Redis Broker and Redis Presence Manager separately
  • Trace level logging (to see all protocol frames, obviously mostly suitable for development)
  • WithResubscribe option of unsubscribe removed - it never worked properly in client libraries and had no clearly defined semantics
  • It is now possible to return custom data in Subscribe result or in Subscribe Push
  • Broker.PublishControl method signature changed - it now accepts shardKey string argument, this is not used at the moment but can be used later if we will need an order of control messages
  • PresenceManager.AddPresence signature changed - now presence expiration time is an option of PresenceManager itself
  • Field version of ConnectResult is now omitted from JSON if empty
  • Server-side subscriptions now trigger unsubscribe event (with ServerSide boolean flag set to true)
  • Slightly faster Redis history decoding - commit
  • Hub container now shards connections and subscriptions - this can reduce lock contention significantly in some workloads thus reducing operation latency. See #184
  • Various example improvements

That's what you have to do if you want to accept all connections without same-origin check introduced in v0.16.0 (can lead to CSRF vulnerability in some situations):

wsHandler := centrifuge.NewWebsocketHandler(node, centrifuge.WebsocketConfig{
    CheckOrigin: func(r *http.Request) bool {
        return true
    },
})

sockjsHandler := centrifuge.NewSockjsHandler(node, centrifuge.SockjsConfig{
    CheckOrigin: func(r *http.Request) bool {
        return true
    },
    WebsocketCheckOrigin: func(r *http.Request) bool {
        return true
    },  
})

All changes:

$ gorelease -base v0.15.0 -version v0.16.0
github.com/centrifugal/centrifuge
---------------------------------
Incompatible changes:
- (*Client).Subscribe: changed from func(string) error to func(string, ...SubscribeOption) error
- (*Client).Unsubscribe: changed from func(string, ...UnsubscribeOption) error to func(string) error
- (*Node).SetEngine: removed
- Broker.PublishControl: changed from func([]byte, string) error to func([]byte, string, string) error
- Config.ClientPresenceExpireInterval: removed
- Engine: removed
- LogLevelDebug: value changed from 1 to 2
- LogLevelError: value changed from 3 to 4
- LogLevelInfo: value changed from 2 to 3
- MemoryEngine: removed
- MemoryEngineConfig: removed
- NewMemoryEngine: removed
- NewRedisEngine: removed
- PresenceManager.AddPresence: changed from func(string, string, *ClientInfo, time.Duration) error to func(string, string, *ClientInfo) error
- RedisEngine: removed
- RedisEngineConfig: removed
- RedisShardConfig.ClusterAddrs: removed
- RedisShardConfig.Host: removed
- RedisShardConfig.Port: removed
- RedisShardConfig.Prefix: removed
- RedisShardConfig.PubSubNumWorkers: removed
- RedisShardConfig.SentinelAddrs: removed
- Transport.Write: changed from func([]byte) error to func(...[]byte) error
- TransportInfo.DisabledPushFlags: added
- TransportInfo.Unidirectional: added
- UnsubscribeOptions.Resubscribe: removed
- WithResubscribe: removed
Compatible changes:
- (*Client).Connect: added
- (*Node).Subscribe: added
- ConnectRequest: added
- DefaultRedisBrokerPrefix: added
- DefaultRedisConnectTimeout: added
- DefaultRedisPresenceManagerPrefix: added
- DefaultRedisPresenceTTL: added
- DefaultRedisReadTimeout: added
- DefaultRedisWriteTimeout: added
- LogLevelTrace: added
- MemoryBroker: added
- MemoryBrokerConfig: added
- MemoryPresenceManager: added
- MemoryPresenceManagerConfig: added
- NewMemoryBroker: added
- NewMemoryPresenceManager: added
- NewRedisBroker: added
- NewRedisPresenceManager: added
- NewRedisShard: added
- PushFlagConnect: added
- PushFlagDisconnect: added
- PushFlagJoin: added
- PushFlagLeave: added
- PushFlagMessage: added
- PushFlagPublication: added
- PushFlagSubscribe: added
- PushFlagUnsubscribe: added
- RedisBroker: added
- RedisBrokerConfig: added
- RedisPresenceManager: added
- RedisPresenceManagerConfig: added
- RedisShard: added
- RedisShardConfig.Address: added
- RedisShardConfig.ClusterAddresses: added
- RedisShardConfig.SentinelAddresses: added
- SubscribeOption: added
- SubscribeOptions.Data: added
- SubscribeRequest: added
- UnsubscribeEvent.ServerSide: added
- WebsocketConfig.Unidirectional: added
- WithChannelInfo: added
- WithExpireAt: added
- WithJoinLeave: added
- WithPosition: added
- WithPresence: added
- WithRecover: added
- WithSubscribeClient: added
- WithSubscribeData: added
- WithUnsubscribeClient: added

v0.16.0 is a valid semantic version for this release.

v0.15.0

25 Jan 19:42

Choose a tag to compare

  • Add Node.Survey method – it allows gathering results from all running nodes. It's possible to define your own survey handlers. See example. Keep in mind that Survey does not scale very well as number of Centrifuge Nodes grows. Though it has reasonably good performance to perform rare tasks even with relatively large number of nodes – see benchmark in pull request
  • The main motivation of adding Node.Survey was attempt to remove Broker.Channels method – which is not supported by most of existing PUB/SUB brokers and does not work in Redis cluster. Broker.Channels now removed, but it can be replaced with survey if needed
  • Improve clustering - node will now send a SHUTDOWN message so other nodes have a chance to realize that node left cluster almost immediately
  • Signature of Since history call option changed – it now accepts a pointer to StreamPosition. This change simplifies a code to construct history call
  • Added SubscribeOptions.Position boolean flag to enable positioning in channel stream. Positioning means that Centrifuge will check that the client did not miss any message from PUB/SUB system, as soon as loss detected client will be disconnected with Insufficient State reason. This is very similar to what Recover: true option did, but Position: true does not enable recovery. As soon as Position flag enabled Centrifuge will expose top stream StreamPosition information to a client in Subscribe Reply
  • Added possibility to iterate over a channel history stream from client side. See an example that demonstrates this
  • New Config options: HistoryMaxPublicationLimit and RecoveryMaxPublicationLimit to control maximum number of publications to return during history call or recovery process. See Centrifuge documentation for detailed description
  • New example that shows Centrifuge integration with Tarantool. Tarantool engine implementation can outperform Redis (up to 5-10x for presence and history operations), though while example contains a full-featured fast Engine implementation – it's still just an example at the moment and have not been tested in production environment
  • New blog post in Centrifugo blog where we introduce Centrifuge library
  • Most examples now do not use jQuery which was replaced by vanilla JS
$ gorelease -base v0.14.2 -version v0.15.0
github.com/centrifugal/centrifuge
---------------------------------
Incompatible changes:
- (*MemoryEngine).Channels: removed
- (*MemoryEngine).PublishControl: changed from func([]byte) error to func([]byte, string) error
- (*Node).Channels: removed
- (*RedisEngine).Channels: removed
- (*RedisEngine).PublishControl: changed from func([]byte) error to func([]byte, string) error
- Broker.Channels, method set of Engine: removed
- Broker.Channels: removed
- Broker.PublishControl: changed from func([]byte) error to func([]byte, string) error
- BrokerEventHandler.HandlePublication: changed from func(string, *Publication) error to func(string, *Publication, StreamPosition) error
- Since: changed from func(StreamPosition) HistoryOption to func(*StreamPosition) HistoryOption
Compatible changes:
- (*Node).ID: added
- (*Node).OnSurvey: added
- (*Node).Survey: added
- Config.HistoryMaxPublicationLimit: added
- Config.RecoveryMaxPublicationLimit: added
- ErrorUnrecoverablePosition: added
- HistoryEvent.Filter: added
- SubscribeOptions.Position: added
- SurveyCallback: added
- SurveyEvent: added
- SurveyHandler: added
- SurveyReply: added
- SurveyResult: added

v0.15.0 is a valid semantic version for this release.

v0.14.2

28 Dec 17:26
b4d3739

Choose a tag to compare

  • fix concurrent map access which could result in runtime crash when using presence feature.

v0.14.1

13 Nov 07:31
cb40100

Choose a tag to compare

  • remove unused history_full metric, add history action metric to track all history calls.

v0.14.0

07 Nov 14:11
bc633f1

Choose a tag to compare

  • Add possibility to disconnect user with custom Disconnect object, and with client ID whitelist.
  • Thus fixing non-working WithReconnect option when calling node.Disconnect method.
  • No error returned from client.Disconnect method anymore. It was always nil before.

Here is what changed since v0.13.0:

gorelease -base v0.13.0 -version v0.14.0
github.com/centrifugal/centrifuge
---------------------------------
Incompatible changes:
- (*Client).Disconnect: changed from func(*Disconnect) error to func(*Disconnect)
- DisconnectOptions.Reconnect: removed
- DisconnectOptions: old is comparable, new is not
- WithReconnect: removed
Compatible changes:
- DisconnectOptions.ClientWhitelist: added
- DisconnectOptions.Disconnect: added
- WithClientWhitelist: added
- WithDisconnect: added

v0.14.0 is a valid semantic version for this release.

v0.13.0

30 Oct 18:36
47dfbae

Choose a tag to compare

This release solves two important issues from v1.0.0 library milestone. It has API changes, though as always it's possible to implement the same as before, and adapting new version should be pretty straightforward.

  • #163 Provide a way to add concurrent processing of protocol commands. Before this change protocol commands could only be processed one by one. The obvious drawback in this case is that one slow RPC could result into stopping other requests from being processed thus affecting overall latency. This required changing client handler API and use asynchronous callback style API for returning replies from event handlers. This approach while not being very idiomatic allows using whatever concurrency strategy developer wants without losing the possibility to control event order.
  • #161 Eliminating ChannelOptionsFunc – now all channel options can be provided when calling Publish operation (history size and TTL) or by returning from event handlers inside SubscribeReply (enabling channel presence, join/leave messages, recovery in a channel). This means that channel options can now be controlled per-connection (not only per channel as before). For example if you need admin connection to subscribe to channel but not participate in channel presence – you are able to not enable presence for that connection.
  • Server-side subscriptions now set over Subscriptions map (instead of Channels). Again – subscribe options can be set with per-connection resolution.
  • Change signature of Publish method in Broker interface – method now accepts []byte data instead of *Publication.
  • Function options for Unsubbscribe and Disconnect methods now have boolean argument.
  • History functional option WithNoLimit removed – use WithLimit(centrifuge.NoLimit) instead.
  • Config option ClientUserConnectionLimit renamed to UserConnectionLimit. If UserConnectionLimit set then now connection will be disconnected with DisconnectConnectionLimit instead of returning a LimitExceeded error.

Since API changes are pretty big, let's look at example program and how to adapt it from v0.12.0 to v0.13.0.

The program based on v0.12.0 API:

package main

import (
	"context"

	"github.com/centrifugal/centrifuge"
)

func main() {
	cfg := centrifuge.DefaultConfig
	cfg.ChannelOptionsFunc = func(channel string) (centrifuge.ChannelOptions, bool, error) {
		return centrifuge.ChannelOptions{
			Presence:        true,
			JoinLeave:       true,
			HistorySize:     100,
			HistoryLifetime: 300,
			HistoryRecover:  true,
		}, true, nil
	}

	node, _ := centrifuge.New(cfg)

	node.OnConnecting(func(ctx context.Context, e centrifuge.ConnectEvent) (centrifuge.ConnectReply, error) {
		return centrifuge.ConnectReply{
			Credentials: &centrifuge.Credentials{UserID: "42"},
			// Subscribe to a server-side channel.
			Channels: []string{"news"},
		}, nil
	})

	node.OnConnect(func(c *centrifuge.Client) {
		println("client connected")
	})

	node.OnSubscribe(func(c *centrifuge.Client, e centrifuge.SubscribeEvent) (centrifuge.SubscribeReply, error) {
		return centrifuge.SubscribeReply{}, nil
	})

	node.OnPublish(func(c *centrifuge.Client, e centrifuge.PublishEvent) (centrifuge.PublishReply, error) {
		return centrifuge.PublishReply{}, nil
	})

	node.OnDisconnect(func(c *centrifuge.Client, e centrifuge.DisconnectEvent) {
		println("client disconnected")
	})

	_ = node.Run()
}

With v0.13.0 the same program becomes:

package main

import (
	"context"
	"time"

	"github.com/centrifugal/centrifuge"
)

func main() {
	node, _ := centrifuge.New(centrifuge.DefaultConfig)

	node.OnConnecting(func(ctx context.Context, e centrifuge.ConnectEvent) (centrifuge.ConnectReply, error) {
		return centrifuge.ConnectReply{
			Credentials: &centrifuge.Credentials{UserID: "42"},
			// Subscribe to a server-side channel.
			Subscriptions: map[string]centrifuge.SubscribeOptions{
				"news": {Presence: true, JoinLeave: true, Recover: true},
			},
		}, nil
	})

	node.OnConnect(func(client *centrifuge.Client) {
		println("client connected")

		client.OnSubscribe(func(e centrifuge.SubscribeEvent, cb centrifuge.SubscribeCallback) {
			cb(centrifuge.SubscribeReply{
				Options: centrifuge.SubscribeOptions{
					Presence:  true,
					JoinLeave: true,
					Recover:   true,
				},
			}, nil)
		})

		client.OnPublish(func(e centrifuge.PublishEvent, cb centrifuge.PublishCallback) {
			// BTW you can publish here explicitly using node.Publish method – see Result
			// field of PublishReply and chat_json example.
			cb(centrifuge.PublishReply{
				Options: centrifuge.PublishOptions{
					HistorySize: 100,
					HistoryTTL:  5 * time.Minute,
				},
			}, nil)
		})

		client.OnDisconnect(func(e centrifuge.DisconnectEvent) {
			println("client disconnected")
		})
	})

	_ = node.Run()
}

As you can see there are three important changes:

  1. You should now set up event handlers inside node.OnConnect closure
  2. Event handlers now have callback argument that you should call with corresponding Reply as soon as you have it
  3. For server-side subscriptions you should now return Subscriptions field in ConnectReply which is map[string]SubscribeOptions instead of []string slice.

See new example that demonstrates concurrency using bounded semaphore.

Note that every feature enabled for a channel increases resource usage on a server. You should only enable presence, recovery, join/leave features and maintaining history in channels where this is necessary.

See also updated Tips and tricks section in a README – it now contains information about connection life cycle and event handler concurrency.