Documentation
¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Bus ¶
type Bus[T any] struct { // contains filtered or unexported fields }
Bus is an in-memory bus, dispatching messages to subscribers.
Example ¶
package main
import (
"fmt"
"strings"
"time"
"git.sr.ht/~sbinet/msgbus"
)
func main() {
bus := msgbus.New[string]()
ch1 := bus.Subscribe("topic1", msgbus.WithTimeout[string](1*time.Second))
ch2 := bus.Subscribe("topic2", msgbus.WithSize[string](5))
ch3 := bus.Subscribe("topic2")
msg1 := new(strings.Builder)
msg2 := new(strings.Builder)
msg3 := new(strings.Builder)
done1 := make(chan struct{})
done2 := make(chan struct{})
done3 := make(chan struct{})
go func() {
defer close(done1)
for v := range ch1 {
msg1.WriteString(v + "\n")
}
}()
go func() {
defer close(done2)
for v := range ch2 {
msg2.WriteString(v + "\n")
}
}()
go func() {
defer close(done3)
for v := range ch3 {
msg3.WriteString(v + "\n")
}
}()
go func() {
defer bus.Unsubscribe("topic1", ch1)
bus.Publish("topic1", "t1-m1")
bus.Publish("topic1", "t1-m2")
bus.Publish("topic1", "t1-m3")
}()
go func() {
defer bus.Unsubscribe("topic2", ch2)
defer bus.Unsubscribe("topic2", ch3)
bus.Publish("topic2", "t2-m1")
bus.Publish("topic2", "t2-m2")
bus.Publish("topic2", "t2-m3")
bus.Publish("topic2", "t2-m4")
}()
<-done1
<-done2
<-done3
fmt.Printf("chan1:\n%s\n", msg1)
fmt.Printf("chan2:\n%s\n", msg2)
fmt.Printf("chan3:\n%s\n", msg3)
}
Output: chan1: t1-m1 t1-m2 t1-m3 chan2: t2-m1 t2-m2 t2-m3 t2-m4 chan3: t2-m1 t2-m2 t2-m3 t2-m4
func (*Bus[T]) Unsubscribe ¶
Unsubscribe closes the previous channel subscription for the provided topic.
Unsubscribe panics if the provided channel wasn't associated with the provided topic.
Click to show internal directories.
Click to hide internal directories.