Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 8f051e9

Browse files
committedFeb 1, 2025·
Update stream method names
1 parent 8723247 commit 8f051e9

File tree

4 files changed

+22
-22
lines changed

4 files changed

+22
-22
lines changed
 

‎stream/stream.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func From(src api.Source) *Stream {
5656
// return s
5757
// }
5858

59-
func (s *Stream) Flow(nodes ...api.Operator) *Stream {
59+
func (s *Stream) Run(nodes ...api.Operator) *Stream {
6060
s.nodes = nodes
6161
return s
6262
}

‎stream/stream_exec_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ func TestStreamWithOpertors(t *testing.T) {
2525
source := sources.Slice([]string{"hello", "world"})
2626
sink := sinks.Slice[string]()
2727

28-
strm := From(source).Flow(
28+
strm := From(source).Run(
2929
exec.Map[string, string](func(ctx context.Context, in string) string {
3030
return strings.ToUpper(in)
3131
}),
@@ -49,7 +49,7 @@ func TestStreamWithOpertors(t *testing.T) {
4949
source := sources.Slice([]string{"HELLO", "WORLD", "HOW", "ARE", "YOU"})
5050
sink := sinks.Slice[string]()
5151

52-
strm := From(source).Flow(
52+
strm := From(source).Run(
5353
exec.Filter[string](func(ctx context.Context, in string) bool {
5454
return !strings.Contains(in, "O")
5555
}),
@@ -74,7 +74,7 @@ func TestStreamWithOpertors(t *testing.T) {
7474
source := sources.Slice([]string{"HELLO", "WORLD", "HOW", "ARE", "YOU"})
7575
sink := sinks.Slice[api.StreamItem[string]]()
7676

77-
strm := From(source).Flow(
77+
strm := From(source).Run(
7878
exec.Filter(func(ctx context.Context, in string) bool {
7979
return !strings.Contains(in, "O")
8080
}),
@@ -102,7 +102,7 @@ func TestStreamWithOpertors(t *testing.T) {
102102
source := sources.Slice([]string{"HELLO", "WORLD", "HOW", "ARE", "YOU"})
103103
sink := sinks.Slice[api.StreamItem[string]]()
104104

105-
strm := From(source).Flow(
105+
strm := From(source).Run(
106106
exec.Execute[string, any](func(ctx context.Context, in string) any {
107107
if strings.Contains(in, "O") {
108108
return errors.New("unsupported word")
@@ -133,7 +133,7 @@ func TestStreamWithOpertors(t *testing.T) {
133133
source := sources.Slice([]string{"HELLO", "WORLD", "HOW", "ARE", "YOU"})
134134
sink := sinks.Slice[api.StreamItem[string]]()
135135

136-
strm := From(source).Flow(
136+
strm := From(source).Run(
137137
exec.Execute[string, any](func(ctx context.Context, in string) any {
138138
if strings.Contains(in, "O") {
139139
return api.Error("unsupported word")
@@ -164,7 +164,7 @@ func TestStreamWithOpertors(t *testing.T) {
164164
source := sources.Slice([]string{"HELLO", "WORLD", "HOW", "ARE", "YOU"})
165165
sink := sinks.Slice[any]()
166166

167-
strm := From(source).Flow(
167+
strm := From(source).Run(
168168
exec.Execute[string, any](func(ctx context.Context, in string) any {
169169
if strings.Contains(in, "O") {
170170
return api.ErrorWithItem("unsupported data", api.StreamItem[string]{Item: in})

‎stream/stream_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func TestStreamInitGraph(t *testing.T) {
4242
return in
4343
}
4444

45-
strm := From(src).Flow(
45+
strm := From(src).Run(
4646
exec.Execute(op1),
4747
exec.Execute(op2),
4848
).Into(snk)
@@ -88,7 +88,7 @@ func TestStreamOpenWithOp(t *testing.T) {
8888
return runeCount
8989
}
9090

91-
strm := From(src).Flow(
91+
strm := From(src).Run(
9292
exec.Execute(op1),
9393
exec.Execute(op2),
9494
).Into(snk)

‎stream/stream_window_test.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func TestStreamWindow_GroupByIndex(t *testing.T) {
3535
{"request", "/i/b", "00:AA:22:DD", "accepted"},
3636
})
3737

38-
strm := From(src).Flow(
38+
strm := From(src).Run(
3939
// Window(with incoming type T), will output []T
4040
window.All[[]string](),
4141
// GroupByIndex receives []T, output map[T][][]T
@@ -79,7 +79,7 @@ func TestStreamWindow_GroupByStructField(t *testing.T) {
7979
{Event: "response", Src: "/i/d", Device: "00:BB:22:DD", Result: "served"},
8080
})
8181

82-
strm := From(src).Flow(
82+
strm := From(src).Run(
8383
// Window(with incoming type T), will output []T
8484
window.All[log](),
8585
// GroupByStructField receives []T, output map[any][]T
@@ -121,7 +121,7 @@ func TestStreamWindow_GroupByMapKey(t *testing.T) {
121121
{"Event": "response", "Src": "/i/d", "Device": "00:BB:22:DD", "Result": "served"},
122122
})
123123

124-
strm := From(src).Flow(
124+
strm := From(src).Run(
125125
// Window(with incoming type T), will output []T
126126
window.Size[map[string]string](4),
127127
// GroupByMap receives []T, output map[any][]T
@@ -157,7 +157,7 @@ func TestStreamWindow_SumByIndex(t *testing.T) {
157157
{79, 104, 724, 4, 2},
158158
})
159159

160-
strm := From(src).Flow(
160+
strm := From(src).Run(
161161
// Window(with incoming type T), will output []T
162162
window.All[[]int](),
163163
// SumByIndex receives []T, outputs sum of values float64
@@ -196,7 +196,7 @@ func TestStreamWindow_SumByStructField(t *testing.T) {
196196
{"Memphis", "plane", "propeller", 48},
197197
})
198198

199-
strm := From(src).Flow(
199+
strm := From(src).Run(
200200
// Window(with incoming type T), will output []T
201201
window.All[vehicle](),
202202
// SumByStructField receives []T, outputs sum of values float64
@@ -232,7 +232,7 @@ func TestStreamWindow_SumByMapKey(t *testing.T) {
232232
{"vehicle": 5},
233233
})
234234

235-
strm := From(src).Flow(
235+
strm := From(src).Run(
236236
// Window(with incoming type T), will output []T
237237
window.All[map[string]int](),
238238
// SumByStructField receives []T, outputs sum of values float64
@@ -262,7 +262,7 @@ func TestStreamWindow_SumByMapKey(t *testing.T) {
262262
func TestStreamWindow_SumAll1D(t *testing.T) {
263263
src := sources.Slice([]float32{10.0, 70.0, 20.0, 40.0, 60, 90, 0, 80, 30})
264264

265-
strm := From(src).Flow(
265+
strm := From(src).Run(
266266
// Window(with incoming type T), will output []T
267267
window.All[float32](),
268268
// SumByStructField receives []T, outputs sum of values float64
@@ -296,7 +296,7 @@ func TestStreamWindow_SumAll2D(t *testing.T) {
296296
{0, 80, 30},
297297
})
298298

299-
strm := From(src).Flow(
299+
strm := From(src).Run(
300300
// Window(with incoming type T), will output []T
301301
window.All[[]int](),
302302
// SumByStructField receives []T, outputs sum of values float64
@@ -326,7 +326,7 @@ func TestStreamWindow_SumAll2D(t *testing.T) {
326326
func TestStreamWindow_SortSlice(t *testing.T) {
327327
src := sources.Slice([]string{"Spirit", "Voyager", "BigFoot", "Enola", "Memphis"})
328328

329-
strm := From(src).Flow(
329+
strm := From(src).Run(
330330
// Window(with incoming type T), will output []T
331331
window.All[string](),
332332
// SortSlice receives []T, outputs sum of values float64
@@ -360,7 +360,7 @@ func TestStreamWindow_SortSliceByIndex(t *testing.T) {
360360
{"Memphis", "plane", "propeller"},
361361
})
362362

363-
strm := From(src).Flow(
363+
strm := From(src).Run(
364364
// Window(with incoming type T), will output []T
365365
window.All[[]string](),
366366
// SortSlice receives []T, outputs sum of values float64
@@ -404,7 +404,7 @@ func TestStreamWindow_SortSliceByStructField(t *testing.T) {
404404
{"Memphis", "plane", "propeller", 48},
405405
})
406406

407-
strm := From(src).Flow(
407+
strm := From(src).Run(
408408
// Window(with incoming type T), will output []T
409409
window.All[vehicle](),
410410
// SortSlice receives []T, outputs sum of values float64
@@ -443,7 +443,7 @@ func TestStreamWindow_SortByMapKey(t *testing.T) {
443443
{"Vehicle": "Memphis", "Kind": "plane", "Engine": "propeller"},
444444
})
445445

446-
strm := From(src).Flow(
446+
strm := From(src).Run(
447447
// Window(with incoming type T), will output []T
448448
window.All[map[string]string](),
449449
// SortSlice receives []T, outputs sum of values float64
@@ -482,7 +482,7 @@ func TestStreamWindow_SortWithFunc(t *testing.T) {
482482
"Memphis",
483483
})
484484

485-
strm := From(src).Flow(
485+
strm := From(src).Run(
486486
// Window(with incoming type T), will output []T
487487
window.All[string](),
488488
// SortSlice receives []T, outputs sum of values float64

0 commit comments

Comments
 (0)
Please sign in to comment.