Design Patterns in Golang - Part-5

Published 11-02-2017 00:00:00

Messaging Patterns

Fan-In Messaging Patterns

Fan-In is a messaging pattern used to create a funnel for work amongst workers (clients: source, server: destination).

We can model fan-in using the Go channels.

// Merge different channels in one channel
func Merge(cs ...<-chan int) <-chan int {
	var wg sync.WaitGroup

	out := make(chan int)

	// Start an send goroutine for each input channel in cs. send
	// copies values from c to out until c is closed, then calls wg.Done.
	send := func(c <-chan int) {
		for n := range c {
			out <- n
		}
		wg.Done()
	}

	wg.Add(len(cs))
	for _, c := range cs {
		go send(c)
	}

	// Start a goroutine to close out once all the send goroutines are
	// done.  This must start after the wg.Add call.
	go func() {
		wg.Wait()
		close(out)
	}()
	return out
}

The Merge function converts a list of channels to a single channel by starting a goroutine for each inbound channel that copies the values to the sole outbound channel.

Once all the output goroutines have been started, Merge a goroutine is started to close the main channel.

Fan-Out Messaging Pattern

Fan-Out is a messaging pattern used for distributing work amongst workers (producer: source, consumers: destination).

We can model fan-out using the Go channels.

// Split a channel into n channels that receive messages in a round-robin fashion.
func Split(ch <-chan int, n int) []<-chan int {
	cs := make([]chan int)
	for i := 0; i < n; i++ {
		cs = append(cs, make(chan int))
	}

	// Distributes the work in a round robin fashion among the stated number
	// of channels until the main channel has been closed. In that case, close
	// all channels and return.
	distributeToChannels := func(ch <-chan int, cs []chan<- int) {
		// Close every channel when the execution ends.
		defer func(cs []chan<- int) {
			for _, c := range cs {
				close(c)
			}
		}(cs)

		for {
			for _, c := range cs {
				select {
				case val, ok := <-ch:
					if !ok {
						return
					}

					c <- val
				}
			}
		}
	}

	go distributeToChannels(ch, cs)

	return cs
}

The Split function converts a single channel into a list of channels by using a goroutine to copy received values to channels in the list in a round-robin fashion.

Publish & Subscribe Messaging Pattern

Publish-Subscribe is a messaging pattern used to communicate messages between different components without these components knowing anything about each other’s identity.

It is similar to the Observer behavioral design pattern. The fundamental design principals of both Observer and Publish-Subscribe is the decoupling of those interested in being informed about Event Messages from the informer (Observers or Publishers). Meaning that you don’t have to program the messages to be sent directly to specific receivers.

To accomplish this, an intermediary, called a “message broker” or “event bus”, receives published messages, and then routes them on to subscribers.

There are three components messages, topics, users.

type Message struct {
    // Contents
}


type Subscription struct {
	ch chan<- Message

	Inbox chan Message
}

func (s *Subscription) Publish(msg Message) error {
	if _, ok := <-s.ch; !ok {
		return errors.New("Topic has been closed")
	}

	s.ch <- msg

	return nil
}
type Topic struct {
	Subscribers    []Session
	MessageHistory []Message
}

func (t *Topic) Subscribe(uid uint64) (Subscription, error) {
    // Get session and create one if it's the first

    // Add session to the Topic & MessageHistory

    // Create a subscription
}

func (t *Topic) Unsubscribe(Subscription) error {
	// Implementation
}

func (t *Topic) Delete() error {
	// Implementation
}
type User struct {
    ID uint64
    Name string
}

type Session struct {
    User User
    Timestamp time.Time
}

Improvements

Events can be published in a parallel fashion by utilizing stackless goroutines.

Performance can be improved by dealing with straggler subscribers by using a buffered inbox and you stop sending events once the inbox is full.

Stability Patterns

Circuit Breaker Pattern

Similar to electrical fuses that prevent fires when a circuit that is connected to the electrical grid starts drawing a high amount of power which causes the wires to heat up and combust, the circuit breaker design pattern is a fail-first mechanism that shuts down the circuit, request/response relationship or a service in the case of software development, to prevent bigger failures.

Note: The words “circuit” and “service” are used synonymously throught this document.

Implementation

Below is the implementation of a very simple circuit breaker to illustrate the purpose of the circuit breaker design pattern.

Operation Counter

circuit.Counter is a simple counter that records success and failure states of a circuit along with a timestamp and calculates the consecutive number of failures.

package circuit

import (
	"time"
)

type State int

const (
	UnknownState State = iota
	FailureState
	SuccessState
)

type Counter interface {
	Count(State)
	ConsecutiveFailures() uint32
	LastActivity() time.Time
	Reset()
}

Circuit Breaker

Circuit is wrapped using the circuit.Breaker closure that keeps an internal operation counter. It returns a fast error if the circuit has failed consecutively more than the specified threshold. After a while it retries the request and records it.

Note: Context type is used here to carry deadlines, cancelation signals, and other request-scoped values across API boundaries and between processes.

package circuit

import (
	"context"
	"time"
)

type Circuit func(context.Context) error

func Breaker(c Circuit, failureThreshold uint32) Circuit {
	cnt := NewCounter()

	return func(ctx context) error {
		if cnt.ConsecutiveFailures() >= failureThreshold {
			canRetry := func(cnt Counter) {
				backoffLevel := Cnt.ConsecutiveFailures() - failureThreshold

				// Calculates when should the circuit breaker resume propagating requests
				// to the service
				shouldRetryAt := cnt.LastActivity().Add(time.Seconds * 2 << backoffLevel)

				return time.Now().After(shouldRetryAt)
			}

			if !canRetry(cnt) {
				// Fails fast instead of propagating requests to the circuit since
				// not enough time has passed since the last failure to retry
				return ErrServiceUnavailable
			}
		}

		// Unless the failure threshold is exceeded the wrapped service mimics the
		// old behavior and the difference in behavior is seen after consecutive failures
		if err := c(ctx); err != nil {
			cnt.Count(FailureState)
			return err
		}

		cnt.Count(SuccessState)
		return nil
	}
}
  • sony/gobreaker is a well-tested and intuitive circuit breaker implementation for real-world use cases.

Profiling Patterns

Timing Functions

When optimizing code, sometimes a quick and dirty time measurement is required as opposed to utilizing profiler tools/frameworks to validate assumptions.

Time measurements can be performed by utilizing time package and defer statements.

Implementation

package profile

import (
    "time"
    "log"
)

func Duration(invocation time.Time, name string) {
    elapsed := time.Since(invocation)

    log.Printf("%s lasted %s", name, elapsed)
}

Usage

func BigIntFactorial(x big.Int) *big.Int {
    // Arguments to a defer statement is immediately evaluated and stored.
    // The deferred function receives the pre-evaluated values when its invoked.
    defer profile.Duration(time.Now(), "IntFactorial")

    y := big.NewInt(1)
    for one := big.NewInt(1); x.Sign() > 0; x.Sub(x, one) {
        y.Mul(y, x)
    }

    return x.Set(y)
}