Beginning Concurrency Patterns

Part 3 of Concurrency and Parallelism in Go

Cover image

In this post I will cover some best practices for building concurrent applications in Go using basic concurrency patterns and native primitives. The patterns themselves are applicable to any language, but for these examples we will use Go.

Clone the repository for this post to follow along

1git clone https://github.com/benjivesterby/may2021-triangle-meetup.git

Table Of Contents

Pipeline Pattern

A pipeline pattern1 2 consists of a number of stages that are connected by a series of channels. The first stage is the source of the data, the last stage is the sink of the data. A good example of this would be a data pipeline where the first step mines the data, the subsequent steps cleans the data, and the final step stores the sanitized data in a database.

A diagram showing an example Pipeline Example Pipeline

As the diagram shows, the first stage is the source of the data, the last stage is the sink of the data. Here is a code example showing the pipeline pattern in action.

 1// Example of a pipeline pattern
 2func main() {
 3  // Call each function passing the output of the previous function
 4  // as the input to the next function
 5
 6  d := []data{
 7    // ... data ...
 8  }
 9
10  // Create the pipeline
11  sink(sanitize(source(d)))
12}
13
14func source(in []data) <-chan data {
15  out := make(chan data)
16
17  go func(in []data) {
18    defer close(out)
19
20    for _, d := range data {
21      // Load data into the front of the pipeline
22      out <- d
23    }
24  }(in)
25
26  return out
27}
28
29func sanitize(in <-chan data) <-chan data {
30  out := make(chan data)
31
32  go func(in <-chan data, out chan<-data) {
33    defer close(out)
34    for {
35      d, ok := <- in
36      if !ok {
37        return
38      }
39      // ... Do some work
40
41      // push the data out
42      out <- d
43    }
44  }(in, out)
45
46  return out
47}
48
49func sink(in <-chan data) {
50  for {
51    d, ok := <- in
52    if !ok {
53      return
54    }
55    // ... Process the sanitized data
56  }
57}

For an executable example of this pattern, see the pipeline in the Github project for this post. From the project root folder run the following command to execute the example pipeline.

1go run patterns/pipeline/main.go

Built with a series of Go routines each routine uses a channel of data coming in, and one going out. Each routine is responsible for its own work and pushes the results to the next step in the pipeline. Using this pattern you can create pipelines of any size and complexity.

Back To Top

Fan-Out Pattern

The Fan-Out pattern is a pattern that allows for a number of routines to consume data from a single source. This pattern is useful when you need to load balance a large amount of data processing across multiple routines.

For an executable example of this pattern, see the fan-out in the Github project for this post. From the project root folder run the following command to execute the example fan-out.

1go run patterns/fanout/main.go

A diagram showing an example Fan-Out Example Fan-Out

Here is an example of the fan-out pattern where data is being processed by a set of worker routines.

 1// Example of a fan-out pattern for processing data in parallel
 2func main() {
 3  dataPipeline := make(chan data)
 4
 5  // Create three worker routines
 6  go process(dataPipeline)
 7  go process(dataPipeline)
 8  go process(dataPipeline)
 9
10  // Load the data into the pipeline
11  for _, d := range ReadData() {
12    dataPipeline <- d
13  }
14
15  // Close the pipeline when all data has been read in
16  // NOTE: This will immediately close the application regardless
17  // of whether the workers have completed their processing.
18  // See the section on Best Practices at the end of the post
19  // for more information on how to handle this.
20  close(dataPipeline)
21}
22
23func ReadData() []data {
24  // ...
25}
26
27func process(in <-chan data) {
28  for {
29    d, ok := <- in
30    if !ok {
31      return
32    }
33    // ... Do some work
34  }
35}

In the example above the process function is called three times, each as its own routine. The in channel is passed to each routine and data is read off the channel by the for loop. The in channel is closed when the data is exhausted.

Back To Top

Replicator Pattern

So we have seen parallel processing of data, but a fan-out pattern can also be used to replicate data across multiple routines.

 1// Example of a fan-out replication pattern
 2func main() {
 3  dataPipeline := make(chan data)
 4
 5  // Pass the write-only channels from the three proc calls to the fan-out
 6  go replicate(dataPipeline, proc1(), proc2(), proc3())
 7
 8  // Load the data into the pipeline
 9  for _, d := range ReadData() {
10    dataPipeline <- d
11  }
12}
13
14func proc1() chan<- data {/* ... */}
15func proc2() chan<- data {/* ... */}
16func proc3() chan<- data {/* ... */}
17
18func replicate(in <-chan data, outgoing ...chan<- data) {
19  for {
20    d, ok := <- in // Read from the input channel
21    if !ok {
22      return
23    }
24    
25    // Replicate the data to each of the outgoing channels
26    for _, out := range outgoing {
27      out <- d
28    }
29  }
30}

NOTE: When using the replicator pattern be mindful of the type of data. This is particularly important when using pointers because the replicator is not copying the data, it is passing the pointer.

The above example shows a fan-out pattern where the replicate function is called with three channels through a variadic argument. The in channel provides the base data and it is copied to the outgoing channels.

Back To Top

Type Fan-Out

The last fan-out pattern we will cover here is the type fan-out pattern . This is particularly useful when dealing with a channel of interface{} types. This pattern allows for data to be directed to appropriate processors based on the type of the data.

 1// Example of a type fan-out pattern from the github project
 2// for this post
 3func TypeFan(in <-chan interface{}) (
 4  <-chan int,
 5  <-chan string,
 6  <-chan []byte,
 7  ) {
 8  ints := make(chan int)
 9  strings := make(chan string)
10  bytes := make(chan []byte)
11
12  go func(
13    in <-chan interface{},
14    ints chan<- int,
15    strings chan<- string,
16    bytes chan<- []byte,
17    ) {
18      defer close(ints)
19      defer close(strings)
20      defer close(bytes)
21
22      for {
23        data, ok := <-in
24        if !ok {
25          return
26        }
27
28        // Type Switch on the data coming in and
29        // push the data to the appropriate channel
30        switch value := data.(type) {
31        case nil: // Special case in type switch
32          // Do nothing
33        case int:
34          ints <- value
35        case string:
36          strings <- value
37        case []byte:
38          bytes <- value
39        default:
40          fmt.Printf("%T is an unsupported type", data)
41        }
42      }
43  }(in, ints, strings, bytes)
44
45  return ints, strings, bytes
46}

This example shows how to accept a channel of empty interface (i.e. interface{}) and use a type-switch to determine which channel to send the data to.

Back To Top

Fan-In / Consolidator Pattern

With a Fan-In pattern, data is read in from multiple channels and consolidated to a single channel as output data.3 The Fan-In pattern is the opposite of the Fan-Out pattern.

For an executable example of this pattern, see the fan-in in the Github project for this post. From the project root folder run the following command to execute the example fan-in.

1go run patterns/fanin/main.go

A diagram showing an example Fan-In Example Fan-In

Here is an example of the fan-in pattern where data is being mined by a set of worker routines and it is all being funneled into a single channel.

 1// Example of a fan-in pattern mining data in
 2// parallel, but processing it all synchronously
 3func main() {
 4  miner1 := mine()
 5  miner2 := mine()
 6  miner3 := mine()
 7
 8  // Take miner data and consolidate it into a single channel
 9  out := consolidate(miner1, miner2, miner3)
10
11  // Process the data
12  for {
13    data, ok := <- out
14    if !ok {
15      return
16    }
17    // ... Do some work
18  }
19}
20
21func consolidate(miners ...<-chan data) <-chan data {
22  out := make(chan data)
23
24  // Iterate over the miners and start a routine for
25  // consuming each one and merging them into the output
26  for _, in := range miners {
27    go func(in <-chan data, out chan<- data) {
28      for {
29        d, ok := <-in // Pull data from the miner
30        if !ok {
31          return
32        }
33        out <- d // Send the data to the output
34      }
35    }(in, out)
36  }
37
38  return out
39}
40
41func mine() <-chan data {
42  out := make(chan data)
43
44  go func() {
45    // ... populate the channel with mined data
46  }()
47
48  return out
49}

The above example utilizes the fan-in pattern to consolidate incoming data from a set of mock data miners.

Back To Top

Combining and Nesting Patterns

Each of these patterns can be combined to create more complex patterns. This is incredibly useful since the majority of applications will not use just one concurrency pattern.

Here is an example of combining all the patterns into a request response life-cycle. In this example the data comes in from a single source, fans out to multiple pipelines, and then fans back into a single response to the user.

A diagram showing an example of a request-response life-cycle

When building applications I recommend building diagrams to help conceptualize the concurrent design elements. I really like diagrams.net for this. The process of building out these diagrams can help solidify the final product and make it easier to understand the design. Having the designs made as part of your process will also help sell the design to other engineers and leadership.

Back To Top


BEST PRACTICE:
Use Go’s native concurrency primitives when possible

Best Practices

While it is considered best practice to primarily use the Go concurrency primitives to manage concurrency in your Go applications, there are situations where it becomes necessary to use the sync package to help manage concurrency.

A good example of this is when you need to ensure that all routines are properly exited when implementing something like io.Closer. If, for example, your code spawns N routines and you want to ensure that all of them are properly exited when the Close method is called, you can use the sync.WaitGroup to wait for the routines that are still running to be closed.

The method for doing this is shown below.

 1// Example of using a wait group to
 2// ensure all routines are properly exited
 3type mytype struct {
 4  ctx context.Context
 5  cancel context.CancelFunc
 6  wg sync.WaitGroup
 7  wgMu sync.Mutex
 8}
 9
10// DoSomething spawns a go routine
11// every time it's called
12func (t *mytype) DoSomething() {
13  // Increment the waitgroup to ensure
14  // Close properly blocks
15  t.wgMu.Lock()
16  t.wg.Add(1)
17  t.wgMu.Unlock()
18
19  go func() {
20    // Decrement the waitgroup
21    // when the routine exits
22    defer t.wg.Done()
23
24    // ... do something
25  }()
26}
27
28func (t *mytype) Close() error {
29  // Cancel the internal context
30  t.cancel()
31
32  // Wait for all routines to exit
33  t.wgMu.Lock()
34  t.wg.Wait()
35  t.wgMu.Unlock()
36  return nil
37}

There are a couple of important elements in the code above. Firstly it uses a sync.WaitGroup to increment and decrement the number of routines that are running. Secondly it uses a sync.Mutex to ensure that only one routine is modifying the sync.WaitGroup at a time (a mutex is not necessary for the .Wait() method).

Click here to read a very thourough explanation of this by Leo Lara. 4

For a functional example of a situation where using the sync package is necessary, see the Plex library.

Back To Top

Generics (COMING SOON!)

These patterns become much easier to use with the introduction of generics in Go 1.18 which I will cover in an upcoming post.

Back To Top


  1. Pipelines - Concurrency in Go by Katherine Cox-Buday - Page 100  ↩︎

  2. Go Concurrency Patterns: Pipelines and cancellation  ↩︎

  3. Multiplexing - Concurrency in Go by Katherine Cox-Buday - Page 117  ↩︎

  4. Closing a Go channel written by several goroutines  ↩︎

CC BY-NC-ND 4.0