Skip to content
/ pubsub Public

A simple and generic publish/subscribe package for Go.

License

Notifications You must be signed in to change notification settings

mdawar/pubsub

Repository files navigation

pubsub

Go Reference Go Report Card Tests

pubsub is a simple and generic topic-based publish/subscribe package for in-process communication in Go.

Installation

go get -u github.com/mdawar/pubsub

Import

import "github.com/mdawar/pubsub"

Usage

Create a Broker

// Create a broker and specify the topics type and the message payloads type.
broker := pubsub.NewBroker[string, string]()

// The message payload type can be any type.
events := pubsub.NewBroker[string, Event]()

Subscriptions

A subscription is simply a channel, everything that you know about channels can be applied to subscriptions.

// Create a subscription to a topic.
// A subscription is a channel that receives messages published to the topic.
// By default the channel is unbuffered (capacity = 0).
sub1 := broker.Subscribe("events")

// A subscription can be created to multiple topics.
// The channel will receive all the messages published to all the specified topics.
sub2 := broker.Subscribe("events", "actions", "testing")

// Create a subscription with a buffered channel.
// The channel capacity is specified as the first parameter.
sub3 := broker.SubscribeWithCapacity(10, "events")

// Unsubscribe from a specific topic or topics.
// The channel will still receive messages for the other topics.
broker.Unsubscribe(sub2, "actions", "testing")

// Unsubscribe from all topics.
// The channel will not be closed, it will only stop receiving messages.
// Note: Specifying all the topics is much more efficient if performance is critical.
broker.Unsubscribe(sub2)

Publishing Messages

// Publish a message with the specified payload.
// The payload can be of any type that is specified when creating the broker.
//
// The message will be sent concurrently to the subscribers, ensuring that a slow
// consumer won't affect the other subscribers.
//
// This call will block until all the subscription channels receive the message
// or until the context is canceled.
//
// A nil return value indicates that all the subscribers received the message.
broker.Publish(context.TODO(), "events", "Sample message")
// Publish a message and timeout after 1 second.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// In this case, Publish will deliver the message to subscribers that are
// ready and will wait for the others for up to the timeout duration.
err := broker.Publish(ctx, "events", "Sample message")
// The error is not nil if the context was canceled or the deadline exceeded.
if err != nil {
  if errors.Is(err, context.DeadlineExceeded) {
    // Timed out.
  } else if errors.Is(err, context.Canceled) {
    // Canceled.
  }
}
// Non blocking publish (Fire and forget).
//
// The message is sent sequentially to the subscribers that are ready to receive it
// and the others are skipped.
//
// Note: Message delivery is not guaranteed.
broker.TryPublish("events", "A message that may not be delivered")

// Buffered subscriptions can be used for guaranteed delivery with a non-blocking publish.
//
// Publish will still block if any subscription's channel buffer is full, or any of the
// subscriptions is an unbuffered channel.
sub := broker.SubscribeWithCapacity(1, "events")
broker.Publish(context.TODO(), "events", "Guaranteed delivery message")

Messages

sub := broker.Subscribe("events")

// Receive a message from the channel.
msg := <-sub

// The topic that the message was published on.
msg.Topic
// The payload that was published using Publish() or TryPublish().
msg.Payload

Topics

// Get a slice of all the topics registered on the broker.
// Note: The order of the topics is not guaranteed.
topics := broker.Topics()

// Get the total number of topics.
topicsCount := broker.NumTopics()

// Get the subscribers count on a specific topic.
count := broker.Subscribers("events")

Tests

go test -race -cover
# If you have "just" installed.
just test
# Or using make.
make test

Benchmarks

go test -bench .
# Or Using "just".
just benchmark
# Or using make.
make benchmark

About

A simple and generic publish/subscribe package for Go.

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages