Worker Pools in Go

Abstract painting of robot workers in a line | Image generated by DALL-E

I am learning the Go programming language and building a small CLI tool for work with it. The tools main purpose is to generate a large amount of synthetic FHIR resources and insert them into a GCP FHIR store using its REST API.

My secondary goal for the tools is to make it as fast as possible. This is why I decided to make as many of the REST calls in parallel as possible taking advantage of Go’s builtin concurrency primitives. In the process I learned about goroutines, channels, sync.WaitGroup and a neat worker pool pattern that is broadly applicable.

Concurrency with goroutines

My first attempt at this was to make every single HTTP requests to the server in parallel. I wrote a function that takes a slice of closures (one closure per HTTP request) and invokes them concurrently using a goroutine for each. It worked!

func ConcurrentDo(actions []func()) {
	done := make(chan bool)

	for _, a := range actions {
		f := func(action func(), done chan<- bool) {
			action()
			done <- true
		} 
		go f(a, done)
	}

	// Wait for all actions to signal completion before returning.
	for i := 0; i < len(actions); i++ {
		<-done
	}
}

Few things to note here:

  • It is important to pass a as a parameter to the goroutine function to make sure it is copied. This is because Go iterations use the same instance of loop variable a which means that all closures created in the loop share the same instance of a (last one). To avoid this we should make an explicit copy of the variable by passing it to the function. Language designers now consider this behavior a mistake which cannot be fixed in a backward compatible way.

  • A channel is used to signal completion and we wait to receive completion from all go routines before exiting the function. In other words, from caller’s perspective ConcurrentDo behaves synchronously. This makes it simple to reason about while under the hood it operates concurrently.

  • It is possible to pass around a channel as an output channel, chan<- or an input channel chan->.

Simplify with a WaitGroup

It was quickly pointed out to me that there is sync.WaitGroup in Go standard library which is designed exactly for waiting on a group of goroutines. So I swapped the channel I was using for signaling completion with a WaitGroup. This resulted in a simpler and slightly more efficient code.

func ConcurrentDo(actions []func()) {
	var wg sync.WaitGroup
    wg.Add(len(actions))
	for _, a := range actions {
		go func(action func()) {
			defer wg.Done()
			action()
		}(a)
	}
	wg.Wait()
}

An interesting this to note is that the code is using defer to call wg.Done(). This means that even if the call to action panics we are still signaling the WaitGroup correctly and it does not hang. In practice this a moot point since my CLI program does not recover from panic and simply terminates regardless of the hang. However I figured using defer is a good habit to get into.

Worker Pool Pattern in Go

This worked well until I started to ratchet up number of actions. While Go was more than happy to spawn thousands of goroutines but the goroutines would start failing. It turned out that my program’s process was running out of its quota for opening sockets to make HTTP requests. macOS default limit for open files per process (which includes sockets) is a meager 256 (determined via ulimit -n). I could have bumped this limit up on my machine but that was just a local fix. So I went with the more sensible choice of limit number of concurrent HTTP requests which works well for all users. This is were Worker Pool comes into play.

Worker Pool is a common concurrency pattern where a number of actions are distributed on a set of workers. Each worker picks and completes a task from a queue and then picks another one until the queue is empty. The parallelism factor is now controlled by number of workers. Below is how the ConcurrentDo function changes to implement this pattern:

func ConcurrentDo(actions []func(), maxWorkers int) {
	workers := maxWorkers
	if len(actions) < maxWorkers {
		workers = len(actions)
	}

	var wg sync.WaitGroup
	wg.Add(workers)

	// Use a buffered channel with the same size as worker
	// to ensure at most that many actions are buffered.
	work := make(chan func(), workers)

	for i := 0; i < workers; i++ {
		go func() {
			defer wg.Done()
			// Continously grab an action from channel and
			// complete it until channel is closed.
			for action := range work {
				action()
			}
		}()
	}

	for _, a := range actions {
		work <- a
	}
	close(work)
	wg.Wait()
}

Let’s break this down:

  • The actions are no longer iterated over but instead they are sent over a buffered channel which is received from by the workers in the pool. The Go channel is itself the work queue!
  • We create one goroutine per worker (instead of previously one per action). Each goroutine iteratively picks up actions from the work channel and completes it.
  • We still use a sync.WaitGroup to wait on all actions to be complete. We could have instead waited for all workers to complete but I felt waiting on actions is slightly more readable without much of a performance cost.

It was a nice surprise to realize that I can use range with a channel and it works as one would naturally expect. The other improvement I made was to call close on work channel. Closure of the channel ends the loop in workers which are waiting for more work. Again, I really enjoy how this all fits together in Go.

Finally it is possible for the work channel to be a non-buffered channel which means that it buffers all actions at once. However I felt it is more prudent to use a buffered channel with a buffer that matches number of workers. Once I get a chance I will comparing the two options more deeply in particular in terms of performance.

Final Words

Overall my experience with Go concurrency has been very pleasant so far. Various primitives such as atomics based sync.WaitGroup and message passing based channels mesh well together and provide a powerful set of building blocks.