Messaging Patterns
Fan-In Messaging Patterns
Fan-In is a messaging pattern used to create a funnel for work amongst workers (clients: source, server: destination).
We can model fan-in using the Go channels.
// Merge different channels in one channel
func Merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an send goroutine for each input channel in cs. send
// copies values from c to out until c is closed, then calls wg.Done.
send := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go send(c)
}
// Start a goroutine to close out once all the send goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}
The Merge
function converts a list of channels to a single channel by starting a goroutine for each inbound channel that copies the values to the sole outbound channel.
Once all the output goroutines have been started, Merge
a goroutine is started to close the main channel.
Fan-Out Messaging Pattern
Fan-Out is a messaging pattern used for distributing work amongst workers (producer: source, consumers: destination).
We can model fan-out using the Go channels.
// Split a channel into n channels that receive messages in a round-robin fashion.
func Split(ch <-chan int, n int) []<-chan int {
cs := make([]chan int)
for i := 0; i < n; i++ {
cs = append(cs, make(chan int))
}
// Distributes the work in a round robin fashion among the stated number
// of channels until the main channel has been closed. In that case, close
// all channels and return.
distributeToChannels := func(ch <-chan int, cs []chan<- int) {
// Close every channel when the execution ends.
defer func(cs []chan<- int) {
for _, c := range cs {
close(c)
}
}(cs)
for {
for _, c := range cs {
select {
case val, ok := <-ch:
if !ok {
return
}
c <- val
}
}
}
}
go distributeToChannels(ch, cs)
return cs
}
The Split
function converts a single channel into a list of channels by using
a goroutine to copy received values to channels in the list in a round-robin fashion.
Publish & Subscribe Messaging Pattern
Publish-Subscribe is a messaging pattern used to communicate messages between different components without these components knowing anything about each other’s identity.
It is similar to the Observer behavioral design pattern.
The fundamental design principals of both Observer and Publish-Subscribe is the decoupling of
those interested in being informed about Event Messages
from the informer (Observers or Publishers).
Meaning that you don’t have to program the messages to be sent directly to specific receivers.
To accomplish this, an intermediary, called a “message broker” or “event bus”, receives published messages, and then routes them on to subscribers.
There are three components messages, topics, users.
type Message struct {
// Contents
}
type Subscription struct {
ch chan<- Message
Inbox chan Message
}
func (s *Subscription) Publish(msg Message) error {
if _, ok := <-s.ch; !ok {
return errors.New("Topic has been closed")
}
s.ch <- msg
return nil
}
type Topic struct {
Subscribers []Session
MessageHistory []Message
}
func (t *Topic) Subscribe(uid uint64) (Subscription, error) {
// Get session and create one if it's the first
// Add session to the Topic & MessageHistory
// Create a subscription
}
func (t *Topic) Unsubscribe(Subscription) error {
// Implementation
}
func (t *Topic) Delete() error {
// Implementation
}
type User struct {
ID uint64
Name string
}
type Session struct {
User User
Timestamp time.Time
}
Improvements
Events can be published in a parallel fashion by utilizing stackless goroutines.
Performance can be improved by dealing with straggler subscribers by using a buffered inbox and you stop sending events once the inbox is full.
Stability Patterns
Circuit Breaker Pattern
Similar to electrical fuses that prevent fires when a circuit that is connected to the electrical grid starts drawing a high amount of power which causes the wires to heat up and combust, the circuit breaker design pattern is a fail-first mechanism that shuts down the circuit, request/response relationship or a service in the case of software development, to prevent bigger failures.
Note: The words “circuit” and “service” are used synonymously throught this document.
Implementation
Below is the implementation of a very simple circuit breaker to illustrate the purpose of the circuit breaker design pattern.
Operation Counter
circuit.Counter
is a simple counter that records success and failure states of
a circuit along with a timestamp and calculates the consecutive number of
failures.
package circuit
import (
"time"
)
type State int
const (
UnknownState State = iota
FailureState
SuccessState
)
type Counter interface {
Count(State)
ConsecutiveFailures() uint32
LastActivity() time.Time
Reset()
}
Circuit Breaker
Circuit is wrapped using the circuit.Breaker
closure that keeps an internal operation counter.
It returns a fast error if the circuit has failed consecutively more than the specified threshold.
After a while it retries the request and records it.
Note: Context type is used here to carry deadlines, cancelation signals, and other request-scoped values across API boundaries and between processes.
package circuit
import (
"context"
"time"
)
type Circuit func(context.Context) error
func Breaker(c Circuit, failureThreshold uint32) Circuit {
cnt := NewCounter()
return func(ctx context) error {
if cnt.ConsecutiveFailures() >= failureThreshold {
canRetry := func(cnt Counter) {
backoffLevel := Cnt.ConsecutiveFailures() - failureThreshold
// Calculates when should the circuit breaker resume propagating requests
// to the service
shouldRetryAt := cnt.LastActivity().Add(time.Seconds * 2 << backoffLevel)
return time.Now().After(shouldRetryAt)
}
if !canRetry(cnt) {
// Fails fast instead of propagating requests to the circuit since
// not enough time has passed since the last failure to retry
return ErrServiceUnavailable
}
}
// Unless the failure threshold is exceeded the wrapped service mimics the
// old behavior and the difference in behavior is seen after consecutive failures
if err := c(ctx); err != nil {
cnt.Count(FailureState)
return err
}
cnt.Count(SuccessState)
return nil
}
}
Related Works
- sony/gobreaker is a well-tested and intuitive circuit breaker implementation for real-world use cases.
Profiling Patterns
Timing Functions
When optimizing code, sometimes a quick and dirty time measurement is required as opposed to utilizing profiler tools/frameworks to validate assumptions.
Time measurements can be performed by utilizing time
package and defer
statements.
Implementation
package profile
import (
"time"
"log"
)
func Duration(invocation time.Time, name string) {
elapsed := time.Since(invocation)
log.Printf("%s lasted %s", name, elapsed)
}
Usage
func BigIntFactorial(x big.Int) *big.Int {
// Arguments to a defer statement is immediately evaluated and stored.
// The deferred function receives the pre-evaluated values when its invoked.
defer profile.Duration(time.Now(), "IntFactorial")
y := big.NewInt(1)
for one := big.NewInt(1); x.Sign() > 0; x.Sub(x, one) {
y.Mul(y, x)
}
return x.Set(y)
}