The fan-out/fan-in pattern is a
The pattern consists of two main stages: fan-out and fan-in.
Fan-out: In the fan-out stage, a single task is divided into multiple smaller subtasks, which are then executed concurrently. Each subtask can be assigned to a separate goroutine (lightweight concurrent thread in Go) to run in parallel. This stage distributes the workload across multiple goroutines, allowing for parallel processing.
Fan-in: In the fan-in stage, the results or outputs from all the concurrently executing subtasks are collected and combined into a single result. This stage waits for all the subtasks to complete and aggregates their results. The fan-in stage can also handle synchronization and coordination between the goroutines to ensure that all results are collected before proceeding.
The fan-out/fan-in pattern is typically implemented using a combination of goroutines and channels in Go. Each subtask is assigned to a goroutine, and channels are used to pass data between the goroutines. The fan-in stage waits for the completion of all subtasks by coordinating through synchronization primitives like WaitGroups or using a channel to signal the completion of each subtask.
Let’s take an example, we have a task of doubling numbers. We create multiple worker goroutines (numWorkers
) to concurrently process the jobs. The jobs are sent to the jobs
channel, and the results are received from the results
channel.
The main goroutine produces the jobs by sending them to the jobs
channel. The worker goroutines receive the jobs from the channel, perform the task (doubling the number), and send the results to the results
channel.
The fan-in stage is coordinated using a WaitGroup
. The main goroutine waits for the completion of all jobs by calling the Wait()
method. Once all jobs are completed, it closes the results
channel. The main goroutine then processes the results by receiving them from the results
channel.
Below is the example that demonstrates the fan-out/fan-in pattern in Go:
package mainimport ("fmt""sync")// worker performs the task on jobs received and sends results to the results channel.func worker(id int, jobs <-chan int, results chan<- int) {for job := range jobs {// Perform the task hereresult := job * 2results <- result}}func main() {numJobs := 10numWorkers := 4// Create buffered channels for jobs and resultsjobs := make(chan int, numJobs)results := make(chan int, numJobs)// Fan-out: Create worker goroutinesfor w := 1; w <= numWorkers; w++ {go worker(w, jobs, results)}// Produce jobsfor j := 1; j <= numJobs; j++ {jobs <- j}close(jobs) // Close the jobs channel to signal no more jobs will be sent// Fan-in: Collect resultsvar wg sync.WaitGroupwg.Add(numJobs) // Set WaitGroup counter to the number of jobs// Launch a goroutine to wait for all jobs to finishgo func() {wg.Wait() // Wait for all jobs to be doneclose(results) // Close the results channel after all jobs are processed}()// Process resultsfor result := range results {fmt.Println("Result:", result)wg.Done() // Decrease the WaitGroup counter as each result is processed}}
Here is a line-by-line explanation of the above code:
Lines 9–15: Defines a function worker
which takes three parameters:
id
: This is the worker’s identifier.jobs
: This is a read-only channel from which the worker gets jobs.results
: This is a write-only channel where the worker sends results.The worker continuously receives jobs from the jobs
channel, processes them (doubling the job value), and sends the results to the results
channel.
Lines 18–19: Defines the number of jobs and the number of worker goroutines.
Lines 22–23: Two buffered channels: jobs
and results
are created. Buffered channels have a capacity which prevents blocking in cases where the channel’s counterpart (read or write) isn’t ready.
Lines 26–28: A loop is used to create numWorkers
goroutines, each invoking the worker
function with the worker’s ID and the jobs
and results
channels.
Lines 31–34: A loop produces numJobs
jobs by sending job numbers (1 through numJobs
) to the jobs
channel. After sending all jobs, the channel is closed to signal that no more jobs will be sent.
Lines 37–38: A sync.WaitGroup
named wg
is created to wait for all jobs to be completed. The counter is initially set to the number of jobs.
Lines 41–44: A goroutine is launched to wait for the completion of all jobs. It waits using wg.Wait()
until the counter reaches zero, then it closes the results
channel to signal that no more results will be sent.
Lines 47–50: Enters a loop to process results
from the results channel. It prints each result and calls wg.Done()
to decrement the WaitGroup
counter for each processed result. This loop continues until the results
channel is closed, and all results have been processed, at which point the program exits.
Free Resources