msgbus

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 3, 2024 License: BSD-3-Clause Imports: 4 Imported by: 3

README

msgbus

Build status GoDoc

msgbus is a simple message bus for Go.

Example

func main() {
    bus := msgbus.New[string]()

    ch1 := bus.Subscribe("topic1")
    ch2 := bus.Subscribe("topic2", msgbus.WithTimeout[string](1*time.Second))
    ch3 := bus.Subscribe("topic2", msgbus.WithSize[string](5))

    defer bus.Unsubscribe("topic1", ch1)
    defer bus.Unsubscribe("topic2", ch2)
    defer bus.Unsubscribe("topic2", ch3)

    go func() {
        for v := range ch1 {
            fmt.Printf("ch1: %q\n", v)
        }
    }()

    go func() {
        for v := range ch2 {
            fmt.Printf("ch2: %q\n", v)
        }
    }()

    go func() {
        for v := range ch3 {
            fmt.Printf("ch3: %q\n", v)
        }
    }()

    bus.Publish("topic1", "msg-A")
    bus.Publish("topic2", "msg-1")
    bus.Publish("topic2", "msg-2")
    bus.Publish("topic1", "msg-B")
    bus.Publish("topic2", "msg-3")
}

License

  • msgbus is released under the BSD-3 license.

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Bus

type Bus[T any] struct {
	// contains filtered or unexported fields
}

Bus is an in-memory bus, dispatching messages to subscribers.

Example
package main

import (
	"fmt"
	"strings"
	"time"

	"git.sr.ht/~sbinet/msgbus"
)

func main() {
	bus := msgbus.New[string]()
	ch1 := bus.Subscribe("topic1", msgbus.WithTimeout[string](1*time.Second))
	ch2 := bus.Subscribe("topic2", msgbus.WithSize[string](5))
	ch3 := bus.Subscribe("topic2")

	msg1 := new(strings.Builder)
	msg2 := new(strings.Builder)
	msg3 := new(strings.Builder)

	done1 := make(chan struct{})
	done2 := make(chan struct{})
	done3 := make(chan struct{})

	go func() {
		defer close(done1)
		for v := range ch1 {
			msg1.WriteString(v + "\n")
		}
	}()

	go func() {
		defer close(done2)
		for v := range ch2 {
			msg2.WriteString(v + "\n")
		}
	}()

	go func() {
		defer close(done3)
		for v := range ch3 {
			msg3.WriteString(v + "\n")
		}
	}()

	go func() {
		defer bus.Unsubscribe("topic1", ch1)

		bus.Publish("topic1", "t1-m1")
		bus.Publish("topic1", "t1-m2")
		bus.Publish("topic1", "t1-m3")
	}()

	go func() {
		defer bus.Unsubscribe("topic2", ch2)
		defer bus.Unsubscribe("topic2", ch3)

		bus.Publish("topic2", "t2-m1")
		bus.Publish("topic2", "t2-m2")
		bus.Publish("topic2", "t2-m3")
		bus.Publish("topic2", "t2-m4")
	}()

	<-done1
	<-done2
	<-done3

	fmt.Printf("chan1:\n%s\n", msg1)
	fmt.Printf("chan2:\n%s\n", msg2)
	fmt.Printf("chan3:\n%s\n", msg3)

}
Output:

chan1:
t1-m1
t1-m2
t1-m3

chan2:
t2-m1
t2-m2
t2-m3
t2-m4

chan3:
t2-m1
t2-m2
t2-m3
t2-m4

func New

func New[T any]() *Bus[T]

New creates a new bus.

func (*Bus[T]) Publish

func (bus *Bus[T]) Publish(topic string, msg T)

Publish sends the provided message to subscriber(s) of the provided topic.

func (*Bus[T]) Subscribe

func (bus *Bus[T]) Subscribe(topic string, opts ...Option[T]) <-chan T

Subscribe creates a channel listening on the provided topic.

func (*Bus[T]) Unsubscribe

func (bus *Bus[T]) Unsubscribe(topic string, ch <-chan T)

Unsubscribe closes the previous channel subscription for the provided topic.

Unsubscribe panics if the provided channel wasn't associated with the provided topic.

type Option

type Option[T any] func(cfg *config[T])

Option configures a Bus channel.

func WithSize

func WithSize[T any](sz int) Option[T]

WithSize configures a Bus channel to use a channel with provided buffer size.

func WithTimeout

func WithTimeout[T any](dt time.Duration) Option[T]

WithTimeout configures a Bus channel to use the provided duration for a timeout.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL