What is the fan-out/fan-in pattern in Golang?

The fan-out/fan-in pattern is a concurrencyConcurrency refers to the simultaneous execution of multiple tasks or processes, enabling better resource utilization and program efficiency. design pattern commonly used in Go for parallelizing and coordinating concurrent tasks. It is particularly useful when you have a time-consuming task that can be divided into smaller subtasks that can be executed concurrently.

How it works

The pattern consists of two main stages: fan-out and fan-in.

  1. Fan-out: In the fan-out stage, a single task is divided into multiple smaller subtasks, which are then executed concurrently. Each subtask can be assigned to a separate goroutine (lightweight concurrent thread in Go) to run in parallel. This stage distributes the workload across multiple goroutines, allowing for parallel processing.

  2. Fan-in: In the fan-in stage, the results or outputs from all the concurrently executing subtasks are collected and combined into a single result. This stage waits for all the subtasks to complete and aggregates their results. The fan-in stage can also handle synchronization and coordination between the goroutines to ensure that all results are collected before proceeding.

Fan-out/fan-in
Fan-out/fan-in

How to implement

The fan-out/fan-in pattern is typically implemented using a combination of goroutines and channels in Go. Each subtask is assigned to a goroutine, and channels are used to pass data between the goroutines. The fan-in stage waits for the completion of all subtasks by coordinating through synchronization primitives like WaitGroups or using a channel to signal the completion of each subtask.

Let’s take an example, we have a task of doubling numbers. We create multiple worker goroutines (numWorkers) to concurrently process the jobs. The jobs are sent to the jobs channel, and the results are received from the results channel.

The main goroutine produces the jobs by sending them to the jobs channel. The worker goroutines receive the jobs from the channel, perform the task (doubling the number), and send the results to the results channel.

The fan-in stage is coordinated using a WaitGroup. The main goroutine waits for the completion of all jobs by calling the Wait() method. Once all jobs are completed, it closes the results channel. The main goroutine then processes the results by receiving them from the results channel.

Code

Below is the example that demonstrates the fan-out/fan-in pattern in Go:

package main
import (
"fmt"
"sync"
)
// worker performs the task on jobs received and sends results to the results channel.
func worker(id int, jobs <-chan int, results chan<- int) {
for job := range jobs {
// Perform the task here
result := job * 2
results <- result
}
}
func main() {
numJobs := 10
numWorkers := 4
// Create buffered channels for jobs and results
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
// Fan-out: Create worker goroutines
for w := 1; w <= numWorkers; w++ {
go worker(w, jobs, results)
}
// Produce jobs
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs) // Close the jobs channel to signal no more jobs will be sent
// Fan-in: Collect results
var wg sync.WaitGroup
wg.Add(numJobs) // Set WaitGroup counter to the number of jobs
// Launch a goroutine to wait for all jobs to finish
go func() {
wg.Wait() // Wait for all jobs to be done
close(results) // Close the results channel after all jobs are processed
}()
// Process results
for result := range results {
fmt.Println("Result:", result)
wg.Done() // Decrease the WaitGroup counter as each result is processed
}
}
Fan-out/fan-in pattern example

Explanation

Here is a line-by-line explanation of the above code:

  • Lines 9–15: Defines a function worker which takes three parameters:

    • id: This is the worker’s identifier.
    • jobs: This is a read-only channel from which the worker gets jobs.
    • results: This is a write-only channel where the worker sends results.

    The worker continuously receives jobs from the jobs channel, processes them (doubling the job value), and sends the results to the results channel.

  • Lines 18–19: Defines the number of jobs and the number of worker goroutines.

  • Lines 22–23: Two buffered channels: jobs and results are created. Buffered channels have a capacity which prevents blocking in cases where the channel’s counterpart (read or write) isn’t ready.

  • Lines 26–28: A loop is used to create numWorkers goroutines, each invoking the worker function with the worker’s ID and the jobs and results channels.

  • Lines 31–34: A loop produces numJobs jobs by sending job numbers (1 through numJobs) to the jobs channel. After sending all jobs, the channel is closed to signal that no more jobs will be sent.

  • Lines 37–38: A sync.WaitGroup named wg is created to wait for all jobs to be completed. The counter is initially set to the number of jobs.

  • Lines 41–44: A goroutine is launched to wait for the completion of all jobs. It waits using wg.Wait() until the counter reaches zero, then it closes the results channel to signal that no more results will be sent.

  • Lines 47–50: Enters a loop to process results from the results channel. It prints each result and calls wg.Done() to decrement the WaitGroup counter for each processed result. This loop continues until the results channel is closed, and all results have been processed, at which point the program exits.

Copyright ©2024 Educative, Inc. All rights reserved