Skip to content
/ lapluma Public

La Pluma 是一个微型、函数式风格的 Go 工具库,旨在提供一套简洁、可组合的数据处理工具。 包含两个核心组件:Iterator 用于串行的数据处理,Pipe 则基于 Go 的 channel 和 context 实现了并发的数据处理。

License

Notifications You must be signed in to change notification settings

muzhy/lapluma

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

22 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

La Pluma: A Lightweight Go Data Streaming Library

La Pluma is a compact, functional-style Go utility library that provides a simple and composable set of data processing tools. It includes two core components: Iterator for sequential data processing and Pipe, which leverages Go's channels and context for concurrent data processing.

README.md

Core Design

The library's core is built around simplicity and composability. By offering a set of orthogonal, single-purpose operations (such as Map, Filter, and Reduce), it enables the construction of clear, readable, and powerful data processing pipelines.

Core Concepts

1. Iterator - Sequential Data Streams

Iterator is a standard iterator interface that defines element-by-element access to a data sequence.

Main Operations:

  • FromSlice(data []E) Iterator[E]: Creates an iterator from a slice.
  • FromMap(data map[K]V) Iterator[Pair[K,V]]: Creates an iterator from a map.
  • Map[E, R](it Iterator[E], handler func(E) R) Iterator[R]: Applies a non-error-prone transformation to each element.
  • Filter[E](it Iterator[E], filter func(E) bool) Iterator[E]: Filters out elements that do not meet a condition.
  • Reduce[E, R](it Iterator[E], handler func(R, E) R, initial R) R: Aggregates the sequence into a single value.
  • Collect[E](it Iterator[E]) []E: Gathers all elements from an iterator into a slice.
  • Iter[E any](it Iterator[E]) iter.Seq[E] and Iter2[K, V any](it Iterator[lapluma.Pair[K, V]]) iter.Seq2[K, V]: Creates an iterator that conforms to the iter package, allowing it to be used with a for-range loop.

Example:

// Create an iterator
data := []int{1, 2, 3, 4, 5}
it := iterator.FromSlice(data)

// Chained operations
result := iterator.Collect(
    iterator.Filter(
        iterator.Map(it, func(x int) int { return x * 2 }),
        func(x int) bool { return x > 5 }
    )
) // [6, 8, 10]

// Or use a for-range loop to process the iterator result directly:
it := iterator.Filter(
    iterator.Map(it, func(x int) int { return x * 2 }),
    func(x int) bool { return x > 5 }
)
for data := range Iter(filteredIt) {
	fmt.Printf("%v", data)
}

The Iterator is designed for sequential processing and is not concurrent-safe. If you require concurrency, convert it to a Pipe by calling pipe.FromIterator.

2. Pipe - Concurrent Data Streams

Pipe is the concurrent version of Iterator. It's built on Go channels, and each operation (e.g., Map, Filter) runs in a separate group of goroutines, forming a processing pipeline.

All Pipe operations are integrated with context.Context for easy timeout control and graceful shutdown.

Main Operations:

  • FromSlice(data []E, ctx context.Context) *Pipe[E]: Creates a concurrent pipe from a slice.
  • FromIterator(it iterator.Iterator[E], ctx context.Context) *Pipe[E]: Creates a concurrent pipe from an iterator.
  • Map, Filter, Reduce, and other functions have the same functionality as their Iterator counterparts, but they execute concurrently.

Example:

ctx := context.Background()

// Create a concurrent pipe
p := pipe.FromSlice([]int{1, 2, 3, 4, 5}, ctx)

// Concurrent processing (3 worker goroutines)
result := pipe.Collect(
    pipe.Filter(
        pipe.Map(p, cpuIntensiveTask, 3), // Concurrency level 3
        func(x int) bool { return x > 10 },
        2, // Concurrency level 2
    )
)

Integration with Standard Iterators

import "iter"

// Convert to a standard iterator
seq := iterator.Iter(myIterator)
for value := range seq {
    // Process the value
}

// Key-value pair iteration
seq2 := iterator.Iter2(mapIterator)
for k, v := range seq2 {
    // Process the key and value
}

Error Handling

La Pluma intentionally simplifies the signatures of core transformation functions, such as Map's handler being func(T) R instead of func(T) (R, error). This is a design choice to treat errors as part of the data stream.

The following two patterns are recommended for handling operations that might fail:

Pattern 1: Pre-filtering

If some data is invalid from the start or doesn't meet the processing criteria, use Filter to remove it before it enters the core processing logic.

// Example: Process only positive numbers
pipe := FromSlice([]int{1, -2, 3, -4}, ctx)
positivePipe := Filter(pipe, func(n int) bool {
    return n > 0
})
// ... subsequent operations will only see {1, 3}

Pattern 2: Using TryMap for Fallible Transformations

When the data transformation itself can fail (e.g., parsing a string, calling an external API), use the TryMap function. Its handler signature is func(T) (R, error). When the handler returns a non-nil error, TryMap automatically skips (discards) that element and continues processing the next one. This allows the pipeline to keep running even when encountering "data-level" errors without being interrupted.

import (
    "strconv"
    "errors"
)

// Example: Convert strings to integers, skip on failure
stringPipe := FromSlice([]string{"1", "two", "3", "four"}, ctx)

// Use TryMap; the handler returns (int, error)
intPipe := TryMap(stringPipe, func(s string) (int, error) {
    i, err := strconv.Atoi(s)
    if err != nil {
        // Return an error, and this element will be discarded
        return 0, errors.New("not a number")
    }
    return i, nil
})

// The final Reduce operation will only process the successfully converted {1, 3}
sum := Reduce(intPipe, func(acc, n int) int { return acc + n }, 0)
// The result of sum is 4

Other

If you need to collect errors during the Map process, consider using Result[T] from util.go as the return value.


Installation

go get github.com/muzhy/lapluma

Running Tests

go test ./...

Q&A

Q: Why use a nested approach instead of a chained one?

A: lapluma is built on generics, which reduces runtime overhead and allows for type-safe checks during compilation. However, Go's generics mechanism does not support type parameters for methods. Therefore, functions like Map cannot be implemented as methods and must use a nested approach. To maintain a consistent interface, all functions are implemented in a nested style.


Future Plans

  • Provide more transformation operations, such as Distinct, Zip, and Peek.
  • Improve the error handling mechanism.
  • Standardize the concurrency control parameters for Pipe.

About

La Pluma 是一个微型、函数式风格的 Go 工具库,旨在提供一套简洁、可组合的数据处理工具。 包含两个核心组件:Iterator 用于串行的数据处理,Pipe 则基于 Go 的 channel 和 context 实现了并发的数据处理。

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages