Skip to content

Commit 7d2211d

Browse files
authored
Fix: Block stream read process would be terminated by empty block with zero rows (#1104)
* support skip empty block while reading rows from stream * remove unrelated comments
1 parent d59283f commit 7d2211d

File tree

2 files changed

+117
-1
lines changed

2 files changed

+117
-1
lines changed

clickhouse_rows.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ next:
5252
r.err = err
5353
return false
5454
}
55-
goto next
5655
case block := <-r.stream:
5756
if block == nil {
5857
return false
@@ -63,6 +62,7 @@ next:
6362
}
6463
r.row, r.block = 0, block
6564
}
65+
goto next
6666
}
6767
r.row++
6868
return r.row <= r.block.Rows()

clickhouse_rows_test.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
package clickhouse
2+
3+
import (
4+
"github.com/ClickHouse/clickhouse-go/v2/lib/proto"
5+
"github.com/stretchr/testify/assert"
6+
"strconv"
7+
"testing"
8+
)
9+
10+
func TestReadWithEmptyBlock(t *testing.T) {
11+
blockInitFunc := func() *proto.Block {
12+
retVal := &proto.Block{
13+
Packet: 0,
14+
Columns: nil,
15+
Timezone: nil,
16+
}
17+
retVal.AddColumn("col1", ("Int64"))
18+
retVal.AddColumn("col2", ("String"))
19+
return retVal
20+
}
21+
22+
testCases := map[string]struct {
23+
actual func() rows
24+
expected int
25+
}{
26+
"none empty": {
27+
func() rows {
28+
firstBlock := blockInitFunc()
29+
firstBlock.Append(int64(0), strconv.Itoa(0))
30+
blockChan := make(chan *proto.Block)
31+
go func() {
32+
for i := 1; i < 10; i++ {
33+
block := blockInitFunc()
34+
block.Append(int64(i), strconv.Itoa(i))
35+
blockChan <- block
36+
}
37+
close(blockChan)
38+
}()
39+
return rows{
40+
err: nil,
41+
row: 0,
42+
block: firstBlock,
43+
totals: nil,
44+
errors: nil,
45+
stream: blockChan,
46+
columns: nil,
47+
structMap: nil,
48+
}
49+
},
50+
10,
51+
},
52+
"all empty": {
53+
func() rows {
54+
firstBlock := blockInitFunc()
55+
blockChan := make(chan *proto.Block)
56+
go func() {
57+
for i := 1; i < 10; i++ {
58+
block := blockInitFunc()
59+
blockChan <- block
60+
}
61+
close(blockChan)
62+
}()
63+
return rows{
64+
err: nil,
65+
row: 0,
66+
block: firstBlock,
67+
totals: nil,
68+
errors: nil,
69+
stream: blockChan,
70+
columns: nil,
71+
structMap: nil,
72+
}
73+
},
74+
0,
75+
},
76+
"some empty": {
77+
func() rows {
78+
firstBlock := blockInitFunc()
79+
blockChan := make(chan *proto.Block)
80+
go func() {
81+
for i := 1; i < 10; i++ {
82+
block := blockInitFunc()
83+
if i%2 == 0 {
84+
block.Append(int64(i), strconv.Itoa(i))
85+
}
86+
blockChan <- block
87+
}
88+
close(blockChan)
89+
}()
90+
return rows{
91+
err: nil,
92+
row: 0,
93+
block: firstBlock,
94+
totals: nil,
95+
errors: nil,
96+
stream: blockChan,
97+
columns: nil,
98+
structMap: nil,
99+
}
100+
},
101+
4,
102+
},
103+
}
104+
105+
for name, testCase := range testCases {
106+
t.Run(name, func(t *testing.T) {
107+
actual := testCase.actual()
108+
109+
rowCnt := 0
110+
for actual.Next() {
111+
rowCnt++
112+
}
113+
assert.Equal(t, testCase.expected, rowCnt)
114+
})
115+
}
116+
}

0 commit comments

Comments
 (0)