Go Concurrency Pattern: Pipeline

As we know golang provides concurrency primitives which makes it easy to construct concurrent code, we are going to see construction of pipelines using channels and goroutines.

Pipeline is a pattern where we break down complicated tasks to smaller sub tasks, and the output of the first sub task, will be input for the next sub task.

In Go, this pipeline model can be extended to utilise channels and goroutines to perform the processing of tasks concurrently. Pipeline is equivalent to a series of stages connected by the channels, implementing functions which will receive a channel and return another channel to be consumed by another function.

Each stage in the pipeline has any number of inbound and outbound channels. Inbound channels are for receiving values as input and outbound channels are for sending values as output. Except the first and last stages of the pipeline which have only either of channels i.e only outbound or only inbound channels.

For a pipeline, we must have following entities in our pipeline,

  • Generator or Producer, first stage, which would be responsible for producing the input for pipeline processing.
  • Stages, where actual processing can be performed, usually producing new values
  • Canceller or Consumer, last stage, mechanism to end of processing in pipeline

Let’s go over the pipeline example,

package main

import (
	"fmt"
	"strconv"
)

func greetPerson(people ...string) <-chan string {
	outbound := make(chan string)
	go func() {
		for _, person := range people {
			outbound <- "Hello " + person + "!"
		}
		close(outbound)
	}()
	return outbound
}

func assignWork(inbound <-chan string) <-chan string {
	outbound := make(chan string)
	go func() {
		for person := range inbound {
			work := len(person)
			outbound <- person + " You have to work on " + strconv.Itoa(work)
		}
		close(outbound)
	}()
	return outbound
}

func main() {
	greetedPeople := greetPerson("John", "Dan", "Michael")
	work := assignWork(greetedPeople)

	for task := range work {
		fmt.Println(task)
	}
}

Output of above program would look like this,

Hello John! You have to work on 11
Hello Dan! You have to work on 10
Hello Michael! You have to work on 14
Program exited.


The first stage, greetPerson is a function that converts a list of strings to a channel, it starts a goroutine that sends out person names into the channel by adding greetings and closes the channel when all values are processed.

The second stage, assignWork, receives people's strings from a channel and emits the string with a person assigned with work. After the inbound channel to function is closed, this stage has sent all values to the outbound channel, it closes it.

And the main function sets up the pipeline and runs the last stage. It receives values from the second stage assignWork and prints out each one, until the outbound channel from the second stage is closed.

Pipeline is a well-known concurrency pattern, and go allows us to implement primitives provided by it like goroutine and channels.

Vindeep Chaudhari

Vindeep Chaudhari

Senior Software Engineer in Backend at PLG Works