API Reference¶
Complete API documentation for the concurrent package.
Package Overview¶
Package concurrent provides concurrency helpers for Go applications.
Types¶
Pool¶
Worker pool for concurrent job processing.
Methods:
- NewPool[T, R](n int, fn func(context.Context, T) (R, error)) *Pool[T, R]
- Run(ctx context.Context, jobs <-chan T) <-chan R
Pipeline¶
Composable data processing pipeline.
Methods:
- NewPipeline[T](ctx context.Context) *Pipeline[T]
- AddStage(stage Stage[T, T]) *Pipeline[T]
- Run(input <-chan T) <-chan T
- Close()
PipelineBuilder¶
Fluent builder for pipelines.
Methods:
- NewPipelineBuilder[T](ctx context.Context) *PipelineBuilder[T]
- AddStage(stage Stage[T, T]) *PipelineBuilder[T]
- Build() *Pipeline[T]
RateLimiter¶
Token bucket rate limiter.
Methods:
- NewRateLimiter(limit int, interval time.Duration) *RateLimiter
- Allow() bool
- Wait(ctx context.Context) error
- Refill()
BurstRateLimit¶
Burst-capable rate limiter.
Methods:
- NewBurstRateLimit(limit int, interval time.Duration, burst int) *BurstRateLimit
- Allow() bool
- Wait(ctx context.Context) error
- Refill()
RetryConfig¶
type RetryConfig struct {
MaxRetries int
BaseDelay time.Duration
MaxDelay time.Duration
Multiplier float64
Jitter bool
}
Configuration for retry behavior.
CircuitBreaker¶
Circuit breaker for failure handling.
Methods:
- NewCircuitBreaker(failureThreshold int, resetTimeout time.Duration) *CircuitBreaker
- Execute(ctx context.Context, fn func() error) error
- State() CircuitState
CircuitState¶
Circuit breaker state.
Constants:
- StateClosed
- StateOpen
- StateHalfOpen
Functions¶
MapConcurrent¶
func MapConcurrent[T any, R any](
ctx context.Context,
in []T,
n int,
fn func(context.Context, T) (R, error),
) ([]R, error)
Processes a slice concurrently with bounded parallelism.
FanOut¶
func FanOut[T any, R any](
ctx context.Context,
input <-chan T,
workers int,
fn func(context.Context, T) (R, error),
) <-chan R
Distributes work to multiple workers.
FanIn¶
Merges multiple input channels.
FanOutFanIn¶
func FanOutFanIn[T any, R any](
ctx context.Context,
input <-chan T,
workers int,
fn func(context.Context, T) (R, error),
) <-chan R
Combines fan-out and fan-in.
RoundRobin¶
func RoundRobin[T any, R any](
ctx context.Context,
input <-chan T,
workers int,
fn func(context.Context, T) (R, error),
) <-chan R
Distributes work in round-robin fashion.
RateLimit¶
func RateLimit[T any](
ctx context.Context,
input <-chan T,
limit int,
interval time.Duration,
) <-chan T
Rate limits a channel of items.
Retry¶
Executes a function with retry logic.
RetryWithBackoff¶
func RetryWithBackoff[T any](
ctx context.Context,
item T,
fn RetryableFunc[T],
maxRetries int,
baseDelay time.Duration,
) error
Retries with exponential backoff.
RetryForever¶
func RetryForever[T any](
ctx context.Context,
item T,
fn RetryableFunc[T],
baseDelay time.Duration,
) error
Retries indefinitely until success.
WithRetry¶
Wraps a function with retry logic.
DefaultRetryConfig¶
Returns default retry configuration.
NewRetryableError¶
Creates a retryable error.
IsRetryable¶
Checks if an error is retryable.
Pipeline Stages¶
Map¶
Applies a function to each item.
Filter¶
Filters items based on a predicate.
Batch¶
Batches items into slices.
Unbatch¶
Unbatches slices into individual items.
Tee¶
Splits input to multiple outputs.
Merge¶
Merges multiple inputs into one output.
Type Aliases¶
Stage¶
A pipeline stage transformation function.
RetryableFunc¶
A function that can be retried.
RetryableError¶
An error that indicates retryability.