Skip to content

feat: Add ChanSink for sending stream items to a Go channel (attempt 2) #41

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 1, 2025

Conversation

vladimirvivien
Copy link
Owner

@vladimirvivien vladimirvivien commented May 25, 2025

The ChanSink allows streaming data from an Automi pipeline to be collected into a channel. Items in the channel can be accessed as shown below:

src := sources.Slice([][]string{ ... }
snk := sinks.Chan[[]string]()
strm := From(src)
strm.WithLogSink(sinks.Func(testutil.LogSinkFunc(t)))
strm.Into(snk)

...
var receivedData [][]string
for data := range snk.Get() {
    receivedData = append(receivedData, data)
}

See documentation for detail.

Fixes #42

…ckage.

The `ChanSink` allows streaming data from an Automi pipeline directly into a
Go channel.

Key features:
- Generic type `ChanSink[T any]` for flexibility with different data types.
- Constructor `Chan[T any](outputChan chan T)` to initialize the sink with a
  channel you provide.
- Gracefully handles input channel closure and context cancellation.

A corresponding test file, `sinks/chansink_test.go`, has been added with
test cases covering:
- Basic data flow from input to output channel.
- Context cancellation behavior.
- Handling of items with incorrect data types.
This commit addresses a timeout issue in `TestChanSink_Open` caused by the
`ChanSink` not closing its output channel.

The `Open` method in `sinks/chansink.go` has been modified to include
`defer close(s.output)` in its processing goroutine. This ensures that
the output channel is closed when the input channel is exhausted or the
context is cancelled, allowing downstream consumers to terminate correctly.

I've reviewed the code and performed checks on the
`ChanSink` implementation and its tests, and found no issues.
The `TestChanSink_Open` test, along with other tests for `ChanSink`,
passes reliably.

Original commit message parts:

Add ChanSink to sinks package

This commit introduces a new sink type, `ChanSink`, to the `sinks` package.
The `ChanSink` allows streaming data from an Automi pipeline directly into a
Go channel.

Key features:
- Generic type `ChanSink[T any]` for flexibility with different data types.
- Constructor `Chan[T any](outputChan chan T)` to initialize the sink with a
  user-provided channel.
- `SetInput`, `Open`, `Get`, and `SetLogFunc` methods adhering to the
  existing sink pattern.
- Handles type assertion, ensuring only data of the expected type is sent to
  the output channel.
- Gracefully handles input channel closure and context cancellation.

A corresponding test file, `sinks/chansink_test.go`, has been added with
test cases covering:
- Basic data flow from input to output channel.
- Context cancellation behavior.
- Handling of items with incorrect data types.

Note: Running tests for the entire package currently
results in a timeout, suggesting a pre-existing issue or inefficiency in
the broader test suite when the race detector is enabled. This issue is
separate from the functionality of the newly added ChanSink.
@vladimirvivien vladimirvivien requested a review from Copilot May 31, 2025 13:05
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Adds a new ChanSink component for streaming pipeline items into a Go channel, complete with generics support, context cancellation, and tests.

  • Introduces ChanSink[T any] and constructor Chan[T any](outputChan chan T).
  • Implements streaming logic with type assertions, context handling, and channel closure.
  • Adds unit tests covering normal flow, context cancellation, and wrong-data-type scenarios.

Reviewed Changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.

File Description
sinks/chansink.go Implements the generic ChanSink with Open, SetInput, Get, and logging.
sinks/chansink_test.go Adds three test functions validating basic flow, context cancellation, and type errors.
go.mod Removes the explicit toolchain directive.
Comments suppressed due to low confidence (2)

sinks/chansink_test.go:26

  • [nitpick] The name sinkErr implies a single error value, but this is actually a channel of errors. Consider renaming to sinkErrCh to clarify its type.
sinkErr := cs.Open(ctx)

sinks/chansink_test.go:58

  • Rather than inferring closure indirectly, add an explicit assertion to verify cs.Get() is closed, e.g.:
if _, ok := <-cs.Get(); ok {
    t.Error("expected output channel to be closed")
}
// Check if the sink's output channel was closed by the sink (indirectly via the receiving goroutine)

@vladimirvivien vladimirvivien force-pushed the channel-sink branch 2 times, most recently from d7319ad to 874868c Compare May 31, 2025 21:41
@vladimirvivien vladimirvivien merged commit b01a80e into main Jun 1, 2025
2 checks passed
@vladimirvivien vladimirvivien deleted the channel-sink branch June 1, 2025 04:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Introduce a Channel Sink
1 participant