Golang Concurrency Pattern: worker pool

A worker pool is a concurrency pattern in which a fixed number of workers runs parallely to execute the multiple tasks present in a queue.

In golang, we use goroutines and channels to build this pattern. A worker is defined by goroutine and a queue is defined by channel. You can find more information about goroutines and channels in the following blog posts:

The defined group of workers will pull the tasks from the queue and start executing them. When any worker is done with the execution of the current task it pulls the next task from the queue. This process continues until the queue is empty.

This will significantly reduce the execution time as instead of a single process, multiple processes will run in parallel.

Let us see a comparison of the example using the worker pool and without using the worker pool.
We will solve the following problem in the first part without using the worker pool.
For better demonstration, we will be adding a one-second sleep time between job start and job end.

package main

import (
   "fmt"
   "time"
)

func main() {
   startTime := time.Now()
   totalJobs := 5

   for i := 1; i <= totalJobs; i++ {
       processJobs(i)
   }
   fmt.Println("Total time ", time.Since(startTime))
}

func processJobs(i int) {
   fmt.Println("Started  job", i)
   time.Sleep(time.Second)
   fmt.Println("Finished job", i)
}

Output:

Started  job 1
Finished job 1
Started  job 2
Finished job 2
Started  job 3
Finished job 3
Started  job 4
Finished job 4
Started  job 5
Finished job 5
Total time  5.004637107s

Now let's try to solve the same problem using a worker pool.

package main

import (
   "fmt"
   "sync"
   "time"
)

func main() {
   startTime := time.Now()
   totalJobs := 5
   jobs := make(chan int, totalJobs)
   var wg sync.WaitGroup

   for w := 1; w <= 2; w++ {
       wg.Add(1)
       go worker(w, jobs, &wg)
   }

   for job := 1; job <= totalJobs; job++ {
       jobs <- job
   }

   close(jobs)
   wg.Wait()
   fmt.Println("Total time ", time.Since(startTime))
}

func worker(w int, jobs chan int, wg *sync.WaitGroup) {
   defer wg.Done()

   for job := range jobs {
       processJobs(w, job)
   }
}

func processJobs(w int, job int) {
   fmt.Println("Worker", w, "started  job", job)
   time.Sleep(time.Second)
   fmt.Println("Worker", w, "finished job", job)
}

In the above code, we created channel jobs for sending the work/job to workers. Then we started 2 workers. Initially, they are in a blocked state because they didn't receive any jobs yet. We sent 5 jobs to them in the next loop and closed the channel.

We have used the wait groups from the sync package to wait until the execution of all the goroutines is completed.
Let's see the output of the above code.

Worker 1 started  job 2
Worker 2 started  job 1
Worker 2 finished job 1
Worker 2 started  job 3
Worker 1 finished job 2
Worker 1 started  job 4
Worker 2 finished job 3
Worker 2 started  job 5
Worker 1 finished job 4
Worker 2 finished job 5
Total time  3.003438183s

As you can see from the output of both the code, the time taken by the code with the worker pool is less than the time taken by the code without the worker pool.

The above problem was simple but for the problems where we need to process a lot of data, the worker pool plays an important role in reducing the execution time and improving the performance of the process.

Vaibhav Dighe

Vaibhav Dighe

Software Engineer at PLG Works
Pune, maharashtra