Fan Out/In¶
Fan-out/fan-in patterns distribute work across multiple workers and merge results efficiently. The concurrent package provides several variants for different distribution strategies.
Overview¶
- FanOut: Distributes work from a single input channel to multiple workers
- FanIn: Merges multiple input channels into a single output channel
- FanOutFanIn: Combines both patterns for parallel processing
- RoundRobin: Distributes work in round-robin fashion
FanOut¶
Distributes work from a single input channel to multiple workers, each processing items concurrently.
Example¶
package main
import (
"context"
"fmt"
"github.com/logimos/concurrent"
)
func main() {
ctx := context.Background()
input := make(chan int)
// Process with 5 workers
output := concurrent.FanOut(ctx, input, 5, func(ctx context.Context, n int) (int, error) {
return n * n, nil
})
// Send work
go func() {
for i := 1; i <= 10; i++ {
input <- i
}
close(input)
}()
// Collect results
for result := range output {
fmt.Println(result)
}
}
API¶
func FanOut[T any, R any](ctx context.Context, input <-chan T, workers int, fn func(context.Context, T) (R, error)) <-chan R
Parameters:
- ctx: Context for cancellation
- input: Input channel of work items
- workers: Number of worker goroutines
- fn: Processing function
Returns: Output channel of results
FanIn¶
Merges multiple input channels into a single output channel.
Example¶
ch1 := make(chan int)
ch2 := make(chan int)
ch3 := make(chan int)
output := concurrent.FanIn(ctx, ch1, ch2, ch3)
// Send to channels
go func() { ch1 <- 1; close(ch1) }()
go func() { ch2 <- 2; close(ch2) }()
go func() { ch3 <- 3; close(ch3) }()
// Collect merged results
for result := range output {
fmt.Println(result) // Order may vary
}
API¶
Parameters:
- ctx: Context for cancellation
- inputs: Variable number of input channels
Returns: Merged output channel
FanOutFanIn¶
Combines fan-out and fan-in for parallel processing with result merging.
Example¶
input := make(chan string)
// Process with 3 workers and merge results
output := concurrent.FanOutFanIn(ctx, input, 3, func(ctx context.Context, s string) (string, error) {
return strings.ToUpper(s), nil
})
go func() {
input <- "hello"
input <- "world"
close(input)
}()
for result := range output {
fmt.Println(result)
}
API¶
func FanOutFanIn[T any, R any](ctx context.Context, input <-chan T, workers int, fn func(context.Context, T) (R, error)) <-chan R
RoundRobin¶
Distributes work in round-robin fashion to multiple workers.
Example¶
input := make(chan int)
// Distribute evenly across 4 workers
output := concurrent.RoundRobin(ctx, input, 4, func(ctx context.Context, n int) (int, error) {
return n * 2, nil
})
go func() {
for i := 1; i <= 8; i++ {
input <- i
}
close(input)
}()
for result := range output {
fmt.Println(result)
}
API¶
func RoundRobin[T any, R any](ctx context.Context, input <-chan T, workers int, fn func(context.Context, T) (R, error)) <-chan R
Note: RoundRobin ensures work is distributed evenly across workers, unlike FanOut which distributes work as workers become available.
Use Cases¶
FanOut¶
- When you want concurrent processing but don't care about work distribution order
- Processing independent tasks from a single source
FanIn¶
- Merging results from multiple processing pipelines
- Combining outputs from different data sources
FanOutFanIn¶
- Parallel processing with automatic result merging
- Simplifies the pattern when you need both fan-out and fan-in
RoundRobin¶
- When you need even work distribution
- Load balancing across workers
Best Practices¶
- Close input channels: Always close input channels when done sending
- Consume output: Read from output channels until closed
- Handle errors: Errors in processing functions are silently dropped - wrap if needed
- Use context: Pass contexts with appropriate timeouts
- Worker count: Choose worker count based on CPU cores and I/O characteristics
Error Handling¶
By default, errors from processing functions are dropped. To handle errors:
type Result struct {
Value int
Error error
}
output := concurrent.FanOut(ctx, input, 5, func(ctx context.Context, n int) (Result, error) {
result, err := process(n)
return Result{Value: result, Error: err}, nil
})
for r := range output {
if r.Error != nil {
fmt.Printf("Error: %v\n", r.Error)
} else {
fmt.Printf("Success: %d\n", r.Value)
}
}