Pipeline Example¶
This example demonstrates how to build data processing pipelines using the Pipeline type.
Basic Example¶
package main
import (
"context"
"fmt"
"github.com/logimos/concurrent"
)
func main() {
ctx := context.Background()
input := make(chan int)
pipeline := concurrent.NewPipeline[int](ctx)
// Multiply by 2
pipeline.AddStage(concurrent.Map(func(n int) int {
return n * 2
}))
// Filter even numbers
pipeline.AddStage(concurrent.Filter(func(n int) bool {
return n%2 == 0
}))
output := pipeline.Run(input)
// Send data
go func() {
for i := 1; i <= 10; i++ {
input <- i
}
close(input)
}()
// Process results
for result := range output {
fmt.Println(result)
}
pipeline.Close()
}
Example: Data Transformation Pipeline¶
package main
import (
"context"
"fmt"
"strings"
"github.com/logimos/concurrent"
)
type Data struct {
Text string
Len int
}
func main() {
ctx := context.Background()
input := make(chan string)
pipeline := concurrent.NewPipeline[string](ctx)
// Transform: uppercase
pipeline.AddStage(concurrent.Map(func(s string) string {
return strings.ToUpper(s)
}))
// Filter: keep only long strings
pipeline.AddStage(concurrent.Map(func(s string) Data {
return Data{Text: s, Len: len(s)}
}))
pipeline.AddStage(concurrent.Filter(func(d Data) bool {
return d.Len > 5
}))
output := pipeline.Run(input)
go func() {
input <- "hello"
input <- "world"
input <- "go"
input <- "pipeline"
close(input)
}()
for result := range output {
fmt.Printf("%s (%d chars)\n", result.Text, result.Len)
}
pipeline.Close()
}
Example: Batching Pipeline¶
package main
import (
"context"
"fmt"
"github.com/logimos/concurrent"
)
func main() {
ctx := context.Background()
input := make(chan int)
pipeline := concurrent.NewPipeline[int](ctx)
// Square each number
pipeline.AddStage(concurrent.Map(func(n int) int {
return n * n
}))
// Batch into groups of 5
pipeline.AddStage(concurrent.Batch[int](5))
output := pipeline.Run(input)
go func() {
for i := 1; i <= 12; i++ {
input <- i
}
close(input)
}()
for batch := range output {
fmt.Println(batch)
}
pipeline.Close()
}
Output:
Example: Using Pipeline Builder¶
package main
import (
"context"
"fmt"
"github.com/logimos/concurrent"
)
func main() {
ctx := context.Background()
input := make(chan int)
pipeline := concurrent.NewPipelineBuilder[int](ctx).
AddStage(concurrent.Map(func(n int) int { return n * 2 })).
AddStage(concurrent.Filter(func(n int) bool { return n > 10 })).
AddStage(concurrent.Batch[int](3)).
Build()
output := pipeline.Run(input)
go func() {
for i := 1; i <= 10; i++ {
input <- i
}
close(input)
}()
for batch := range output {
fmt.Println(batch)
}
pipeline.Close()
}