Skip to content

Commit 8cb5acb

Browse files
Chore: review changes; constructor func update
1 parent 22a61a9 commit 8cb5acb

File tree

3 files changed

+32
-37
lines changed

3 files changed

+32
-37
lines changed

sinks/chansink.go

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,39 +9,43 @@ import (
99
"github.com/vladimirvivien/automi/log"
1010
)
1111

12-
// ChanSink sends streamed items to an output channel.
13-
type ChanSink[T any] struct {
14-
output chan T
12+
// ChanSink represents a sink backed by a Go channel.
13+
type ChanSink[IN any, CHAN <-chan IN] struct {
14+
chansink chan IN
1515
input <-chan any
1616
logf api.StreamLogFunc
1717
}
1818

19-
// Chan is the constructor function which returns a new ChanSink.
20-
func Chan[T any](outputChan chan T) *ChanSink[T] {
21-
return &ChanSink[T]{
22-
output: outputChan,
19+
// BufferedChan returns a new ChanSink backed by a buffered channel.
20+
func BufferedChan[IN any, CHAN <-chan IN](bufferSize int) *ChanSink[IN,CHAN] {
21+
return &ChanSink[IN,CHAN]{
22+
chansink: make(chan IN, bufferSize),
2323
logf: log.NoLogFunc,
2424
}
2525
}
2626

27-
// SetInput sets the source for the sink.
28-
func (s *ChanSink[T]) SetInput(in <-chan any) {
27+
// Chan returns a new ChanSink backed by an unbuffered channel.
28+
func Chan[IN any, CHAN <-chan IN]() *ChanSink[IN,CHAN] {
29+
return BufferedChan[IN,CHAN](0)
30+
}
31+
32+
// SetInput sets the input for the sink.
33+
func (s *ChanSink[IN,CHAN]) SetInput(in <-chan any) {
2934
s.input = in
3035
}
3136

32-
// Get returns the output channel used by the sink.
33-
func (s *ChanSink[T]) Get() <-chan T {
34-
return s.output
37+
// Get returns a receive-only channel used by the sink.
38+
func (s *ChanSink[IN,CHAN]) Get() CHAN {
39+
return s.chansink
3540
}
3641

3742
// SetLogFunc sets a logging func for the component.
38-
func (s *ChanSink[T]) SetLogFunc(f api.StreamLogFunc) {
43+
func (s *ChanSink[IN,CHAN]) SetLogFunc(f api.StreamLogFunc) {
3944
s.logf = f
4045
}
4146

4247
// Open starts the sink and returns and waits on the returned
43-
// channel for the sink to be done or an error to be received.
44-
func (s *ChanSink[T]) Open(ctx context.Context) <-chan error {
48+
func (s *ChanSink[IN,CHAN]) Open(ctx context.Context) <-chan error {
4549
result := make(chan error)
4650

4751
s.logf(ctx, log.LogInfo(
@@ -56,7 +60,7 @@ func (s *ChanSink[T]) Open(ctx context.Context) <-chan error {
5660
"Component closing",
5761
slog.String("sink", "Chan"),
5862
))
59-
close(s.output) // Ensure output channel is closed
63+
close(s.chansink) // Ensure output channel is closed
6064
}()
6165

6266
for {
@@ -65,7 +69,7 @@ func (s *ChanSink[T]) Open(ctx context.Context) <-chan error {
6569
if !opened {
6670
return
6771
}
68-
data, ok := item.(T)
72+
data, ok := item.(IN)
6973
if !ok {
7074
s.logf(ctx, log.LogDebug(
7175
"Error: unexpected data type",
@@ -74,7 +78,7 @@ func (s *ChanSink[T]) Open(ctx context.Context) <-chan error {
7478
))
7579
continue
7680
}
77-
s.output <- data
81+
s.chansink <- data
7882
case <-ctx.Done():
7983
return
8084
}

sinks/chansink_test.go

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,9 @@ import (
88
)
99

1010
func TestChanSink_Open(t *testing.T) {
11-
outputChan := make(chan string)
12-
cs := Chan[string](outputChan)
11+
cs := Chan[string]()
1312

14-
inputChan := make(chan interface{})
13+
inputChan := make(chan any)
1514
go func() {
1615
inputChan <- "A"
1716
inputChan <- "B"
@@ -23,7 +22,7 @@ func TestChanSink_Open(t *testing.T) {
2322
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
2423
defer cancel()
2524

26-
sinkErr := cs.Open(ctx)
25+
sink := cs.Open(ctx)
2726

2827
var receivedData []string
2928
doneReceiving := make(chan struct{})
@@ -36,7 +35,7 @@ func TestChanSink_Open(t *testing.T) {
3635
}()
3736

3837
select {
39-
case err := <-sinkErr:
38+
case err := <-sink:
4039
if err != nil {
4140
t.Fatalf("ChanSink.Open() returned an error: %v", err)
4241
}
@@ -54,18 +53,11 @@ func TestChanSink_Open(t *testing.T) {
5453
if !reflect.DeepEqual(expectedData, receivedData) {
5554
t.Errorf("Received data did not match expected data. Got %v, expected %v", receivedData, expectedData)
5655
}
57-
58-
// Check if the sink's output channel was closed by the sink (indirectly via the receiving goroutine)
59-
// This is a bit tricky to test directly for a send-only channel from the sink's perspective.
60-
// However, if the receiving goroutine exited, it implies the channel was closed.
61-
// A more direct test might involve trying to send to cs.Get() after it's supposed to be closed,
62-
// but that's not how this sink is designed to be used.
6356
}
6457

6558
func TestChanSink_Open_ContextCancel(t *testing.T) {
66-
outputChan := make(chan int)
67-
cs := Chan[int](outputChan)
68-
inputChan := make(chan interface{})
59+
cs := Chan[int]()
60+
inputChan := make(chan any)
6961

7062
cs.SetInput(inputChan)
7163

@@ -98,9 +90,8 @@ func TestChanSink_Open_ContextCancel(t *testing.T) {
9890
}
9991

10092
func TestChanSink_Open_WrongDataType(t *testing.T) {
101-
outputChan := make(chan string)
102-
cs := Chan[string](outputChan) // Expecting string
103-
inputChan := make(chan interface{})
93+
cs := Chan[string]() // Expecting string
94+
inputChan := make(chan any)
10495

10596
go func() {
10697
inputChan <- "CorrectType"

sinks/slice.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ func (s *SliceSink[IN, SLICE]) SetLogFunc(f api.StreamLogFunc) {
3939
s.logf = f
4040
}
4141

42-
// Open starts the collector and returns and waits on the returned
43-
// channel for the collector to be done or an error to be received.
42+
// Open opens the sink to start collecting items. It returns a channel
43+
// that will receive an error if the sink encounters an error during setup operation.
4444
func (s *SliceSink[IN, SLICE]) Open(ctx context.Context) <-chan error {
4545
result := make(chan error)
4646

0 commit comments

Comments
 (0)