In this post I will cover some best practices for building concurrent applications in Go using basic concurrency patterns and native primitives. The patterns themselves are applicable to any language, but for these examples we will use Go.
Clone the repository for this post to follow along
1git clone https://github.com/benjivesterby/may2021-triangle-meetup.git
Table Of Contents
Pipeline Pattern
A pipeline pattern1 2 consists of a number of stages that are connected by a series of channels. The first stage is the source of the data, the last stage is the sink of the data. A good example of this would be a data pipeline where the first step mines the data, the subsequent steps cleans the data, and the final step stores the sanitized data in a database.
Example Pipeline
As the diagram shows, the first stage is the source of the data, the last stage is the sink of the data. Here is a code example showing the pipeline pattern in action.
1// Example of a pipeline pattern
2func main() {
3 // Call each function passing the output of the previous function
4 // as the input to the next function
5
6 d := []data{
7 // ... data ...
8 }
9
10 // Create the pipeline
11 sink(sanitize(source(d)))
12}
13
14func source(in []data) <-chan data {
15 out := make(chan data)
16
17 go func(in []data) {
18 defer close(out)
19
20 for _, d := range data {
21 // Load data into the front of the pipeline
22 out <- d
23 }
24 }(in)
25
26 return out
27}
28
29func sanitize(in <-chan data) <-chan data {
30 out := make(chan data)
31
32 go func(in <-chan data, out chan<-data) {
33 defer close(out)
34 for {
35 d, ok := <- in
36 if !ok {
37 return
38 }
39 // ... Do some work
40
41 // push the data out
42 out <- d
43 }
44 }(in, out)
45
46 return out
47}
48
49func sink(in <-chan data) {
50 for {
51 d, ok := <- in
52 if !ok {
53 return
54 }
55 // ... Process the sanitized data
56 }
57}
For an executable example of this pattern, see the pipeline in the Github project for this post. From the project root folder run the following command to execute the example pipeline.
1go run patterns/pipeline/main.go
Built with a series of Go routines each routine uses a channel of data coming in, and one going out. Each routine is responsible for its own work and pushes the results to the next step in the pipeline. Using this pattern you can create pipelines of any size and complexity.
Fan-Out Pattern
The Fan-Out pattern is a pattern that allows for a number of routines to consume data from a single source. This pattern is useful when you need to load balance a large amount of data processing across multiple routines.
For an executable example of this pattern, see the fan-out in the Github project for this post. From the project root folder run the following command to execute the example fan-out.
1go run patterns/fanout/main.go
Example Fan-Out
Here is an example of the fan-out pattern where data is being processed by a set of worker routines.
1// Example of a fan-out pattern for processing data in parallel
2func main() {
3 dataPipeline := make(chan data)
4
5 // Create three worker routines
6 go process(dataPipeline)
7 go process(dataPipeline)
8 go process(dataPipeline)
9
10 // Load the data into the pipeline
11 for _, d := range ReadData() {
12 dataPipeline <- d
13 }
14
15 // Close the pipeline when all data has been read in
16 // NOTE: This will immediately close the application regardless
17 // of whether the workers have completed their processing.
18 // See the section on Best Practices at the end of the post
19 // for more information on how to handle this.
20 close(dataPipeline)
21}
22
23func ReadData() []data {
24 // ...
25}
26
27func process(in <-chan data) {
28 for {
29 d, ok := <- in
30 if !ok {
31 return
32 }
33 // ... Do some work
34 }
35}
In the example above the process
function is called three times, each as its
own routine. The in
channel is passed to each routine and data is read off the
channel by the for
loop. The in
channel is closed when the data is
exhausted.
Replicator Pattern
So we have seen parallel processing of data, but a fan-out pattern can also be used to replicate data across multiple routines.
1// Example of a fan-out replication pattern
2func main() {
3 dataPipeline := make(chan data)
4
5 // Pass the write-only channels from the three proc calls to the fan-out
6 go replicate(dataPipeline, proc1(), proc2(), proc3())
7
8 // Load the data into the pipeline
9 for _, d := range ReadData() {
10 dataPipeline <- d
11 }
12}
13
14func proc1() chan<- data {/* ... */}
15func proc2() chan<- data {/* ... */}
16func proc3() chan<- data {/* ... */}
17
18func replicate(in <-chan data, outgoing ...chan<- data) {
19 for {
20 d, ok := <- in // Read from the input channel
21 if !ok {
22 return
23 }
24
25 // Replicate the data to each of the outgoing channels
26 for _, out := range outgoing {
27 out <- d
28 }
29 }
30}
NOTE: When using the replicator pattern be mindful of the type of data. This is particularly important when using pointers because the replicator is not copying the data, it is passing the pointer.
The above example shows a fan-out pattern where the replicate
function is
called with three channels through a variadic argument. The in
channel
provides the base data and it is copied to the outgoing channels.
Type Fan-Out
The last fan-out pattern we will cover here is the type fan-out
pattern.
This is particularly useful when dealing with a channel of interface{}
types.
This pattern allows for data to be directed to appropriate processors based on
the type of the data.
1// Example of a type fan-out pattern from the github project
2// for this post
3func TypeFan(in <-chan interface{}) (
4 <-chan int,
5 <-chan string,
6 <-chan []byte,
7 ) {
8 ints := make(chan int)
9 strings := make(chan string)
10 bytes := make(chan []byte)
11
12 go func(
13 in <-chan interface{},
14 ints chan<- int,
15 strings chan<- string,
16 bytes chan<- []byte,
17 ) {
18 defer close(ints)
19 defer close(strings)
20 defer close(bytes)
21
22 for {
23 data, ok := <-in
24 if !ok {
25 return
26 }
27
28 // Type Switch on the data coming in and
29 // push the data to the appropriate channel
30 switch value := data.(type) {
31 case nil: // Special case in type switch
32 // Do nothing
33 case int:
34 ints <- value
35 case string:
36 strings <- value
37 case []byte:
38 bytes <- value
39 default:
40 fmt.Printf("%T is an unsupported type", data)
41 }
42 }
43 }(in, ints, strings, bytes)
44
45 return ints, strings, bytes
46}
This example shows how to accept a channel of empty interface (i.e.
interface{}
) and use a type-switch to determine which channel to send the data
to.
Fan-In / Consolidator Pattern
With a Fan-In pattern, data is read in from multiple channels and consolidated to a single channel as output data.3 The Fan-In pattern is the opposite of the Fan-Out pattern.
For an executable example of this pattern, see the fan-in in the Github project for this post. From the project root folder run the following command to execute the example fan-in.
1go run patterns/fanin/main.go
Example Fan-In
Here is an example of the fan-in pattern where data is being mined by a set of worker routines and it is all being funneled into a single channel.
1// Example of a fan-in pattern mining data in
2// parallel, but processing it all synchronously
3func main() {
4 miner1 := mine()
5 miner2 := mine()
6 miner3 := mine()
7
8 // Take miner data and consolidate it into a single channel
9 out := consolidate(miner1, miner2, miner3)
10
11 // Process the data
12 for {
13 data, ok := <- out
14 if !ok {
15 return
16 }
17 // ... Do some work
18 }
19}
20
21func consolidate(miners ...<-chan data) <-chan data {
22 out := make(chan data)
23
24 // Iterate over the miners and start a routine for
25 // consuming each one and merging them into the output
26 for _, in := range miners {
27 go func(in <-chan data, out chan<- data) {
28 for {
29 d, ok := <-in // Pull data from the miner
30 if !ok {
31 return
32 }
33 out <- d // Send the data to the output
34 }
35 }(in, out)
36 }
37
38 return out
39}
40
41func mine() <-chan data {
42 out := make(chan data)
43
44 go func() {
45 // ... populate the channel with mined data
46 }()
47
48 return out
49}
The above example utilizes the fan-in pattern to consolidate incoming data from a set of mock data miners.
Combining and Nesting Patterns
Each of these patterns can be combined to create more complex patterns. This is incredibly useful since the majority of applications will not use just one concurrency pattern.
Here is an example of combining all the patterns into a request response life-cycle. In this example the data comes in from a single source, fans out to multiple pipelines, and then fans back into a single response to the user.
When building applications I recommend building diagrams to help conceptualize the concurrent design elements. I really like diagrams.net for this. The process of building out these diagrams can help solidify the final product and make it easier to understand the design. Having the designs made as part of your process will also help sell the design to other engineers and leadership.
BEST PRACTICE:
Use Go’s native concurrency primitives when possible
Best Practices
While it is considered best practice to primarily use the Go concurrency
primitives to manage concurrency in your Go applications, there are situations
where it becomes necessary to use the sync
package to help manage concurrency.
A good example of this is when you need to ensure that all routines are properly
exited when implementing something like io.Closer
. If, for example, your code
spawns N routines and you want to ensure that all of them are properly exited
when the Close
method is called, you can use the sync.WaitGroup
to wait for
the routines that are still running to be closed.
The method for doing this is shown below.
1// Example of using a wait group to
2// ensure all routines are properly exited
3type mytype struct {
4 ctx context.Context
5 cancel context.CancelFunc
6 wg sync.WaitGroup
7 wgMu sync.Mutex
8}
9
10// DoSomething spawns a go routine
11// every time it's called
12func (t *mytype) DoSomething() {
13 // Increment the waitgroup to ensure
14 // Close properly blocks
15 t.wgMu.Lock()
16 t.wg.Add(1)
17 t.wgMu.Unlock()
18
19 go func() {
20 // Decrement the waitgroup
21 // when the routine exits
22 defer t.wg.Done()
23
24 // ... do something
25 }()
26}
27
28func (t *mytype) Close() error {
29 // Cancel the internal context
30 t.cancel()
31
32 // Wait for all routines to exit
33 t.wgMu.Lock()
34 t.wg.Wait()
35 t.wgMu.Unlock()
36 return nil
37}
There are a couple of important elements in the code above. Firstly it uses a
sync.WaitGroup
to increment and decrement the number of routines that are
running. Secondly it uses a sync.Mutex
to ensure that only one routine is
modifying the sync.WaitGroup
at a time (a mutex is not necessary for the
.Wait()
method).
Click here to read a very thourough explanation of this by Leo Lara.4
For a functional example of a situation where using the sync
package is
necessary, see the Plex
library.
Generics (COMING SOON!)
These patterns become much easier to use with the introduction of generics in
Go 1.18
which I will cover in an upcoming post.