From b739cc920dd2b37724d6086a05632c6f384b5a17 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E5=B8=8C=E6=99=BA?= Date: Wed, 17 May 2023 16:31:48 +0800 Subject: [PATCH 1/6] =?UTF-8?q?Add=20parentheses=EF=BC=88=EF=BC=89,=20beca?= =?UTF-8?q?use=20'%'=20is=20higher=20than=20'&'?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 1 + .../seatunnel/clickhouse/sink/client/ShardRouter.java | 6 ++++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index d9230f4d1bb..94ad5a8fdfe 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,4 @@ + # Apache SeaTunnel (Incubating) seatunnel logo diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java index 140e40b3b13..edf48530d1c 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java @@ -111,13 +111,15 @@ public Shard getShard(Object shardValue) { } int offset = (int) - (HASH_INSTANCE.hash( + ( + (HASH_INSTANCE.hash( ByteBuffer.wrap( shardValue .toString() .getBytes(StandardCharsets.UTF_8)), 0) - & Long.MAX_VALUE % shardWeightCount); + & Long.MAX_VALUE) + % shardWeightCount); return shards.lowerEntry(offset + 1).getValue(); } From 2fe10919a19e98f76be846bf92874d5e210b964e Mon Sep 17 00:00:00 2001 From: wuxizhi777 Date: Wed, 17 May 2023 18:54:30 +0800 Subject: [PATCH 2/6] =?UTF-8?q?Add=20parentheses=EF=BC=88=EF=BC=89,=20beca?= =?UTF-8?q?use=20'%'=20is=20higher=20than=20'&'?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 1 - 1 file changed, 1 deletion(-) diff --git a/README.md b/README.md index 94ad5a8fdfe..d9230f4d1bb 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,3 @@ - # Apache SeaTunnel (Incubating) seatunnel logo From 175358f32f8a6d233c3e5198d7a447cadb2070d8 Mon Sep 17 00:00:00 2001 From: wuxizhi777 Date: Wed, 17 May 2023 19:17:30 +0800 Subject: [PATCH 3/6] Adjustment format --- .../clickhouse/sink/client/ShardRouter.java | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java index edf48530d1c..dc55bdeb559 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java @@ -111,15 +111,14 @@ public Shard getShard(Object shardValue) { } int offset = (int) - ( - (HASH_INSTANCE.hash( - ByteBuffer.wrap( - shardValue - .toString() - .getBytes(StandardCharsets.UTF_8)), - 0) - & Long.MAX_VALUE) - % shardWeightCount); + ((HASH_INSTANCE.hash( + ByteBuffer.wrap( + shardValue + .toString() + .getBytes(StandardCharsets.UTF_8)), + 0) + & Long.MAX_VALUE) + % shardWeightCount); return shards.lowerEntry(offset + 1).getValue(); } From 5709e821342b55fdc7a2c2b7c3abe2c4eb999c05 Mon Sep 17 00:00:00 2001 From: wuxizhi777 Date: Thu, 8 Jun 2023 14:19:23 +0800 Subject: [PATCH 4/6] test unit for https://github.com/apache/seatunnel/pull/4772 --- .../clickhouse/ClickhouseFactoryTest.java | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java index e6c50b0611a..0b9d840610d 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java @@ -24,7 +24,15 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import net.jpountz.xxhash.XXHash64; +import net.jpountz.xxhash.XXHashFactory; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + public class ClickhouseFactoryTest { + private static final XXHash64 HASH_INSTANCE = XXHashFactory.fastestInstance().hash64(); @Test public void testOptionRule() { @@ -32,4 +40,27 @@ public void testOptionRule() { Assertions.assertNotNull((new ClickhouseSinkFactory()).optionRule()); Assertions.assertNotNull((new ClickhouseFileSinkFactory()).optionRule()); } + + public int getShard(Object shardValue) { + int shardWeightCount = 6; + int offset = + (int) + ((HASH_INSTANCE.hash( + ByteBuffer.wrap( + shardValue + .toString() + .getBytes(StandardCharsets.UTF_8)), + 0) + & Long.MAX_VALUE) + % shardWeightCount); + return offset; + } + + @Test + public void testShared() { + String a = "a,b,c,d,e,f"; + for (Object o : Arrays.stream(a.split(",")).toArray()) { + System.out.println(getShard(o)); + } + } } From a0d36bbb022818ffd88c2adb6dbd38e0054a8217 Mon Sep 17 00:00:00 2001 From: wuxizhi777 Date: Tue, 27 Jun 2023 23:31:47 +0800 Subject: [PATCH 5/6] add UT for getShard --- .idea/vcs.xml | 32 -------------------------------- 1 file changed, 32 deletions(-) delete mode 100644 .idea/vcs.xml diff --git a/.idea/vcs.xml b/.idea/vcs.xml deleted file mode 100644 index 81f2456ebc3..00000000000 --- a/.idea/vcs.xml +++ /dev/null @@ -1,32 +0,0 @@ - - - - - - - - - - \ No newline at end of file From fb0a852d6570c5808ba666a639a8672b23b7a215 Mon Sep 17 00:00:00 2001 From: wuxizhi777 Date: Tue, 27 Jun 2023 23:32:11 +0800 Subject: [PATCH 6/6] add UT for getShard --- .../clickhouse/ClickhouseFactoryTest.java | 55 ++++++++++++------- 1 file changed, 34 insertions(+), 21 deletions(-) diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java index 0b9d840610d..a814d99073b 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java @@ -27,9 +27,10 @@ import net.jpountz.xxhash.XXHash64; import net.jpountz.xxhash.XXHashFactory; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.Arrays; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; public class ClickhouseFactoryTest { private static final XXHash64 HASH_INSTANCE = XXHashFactory.fastestInstance().hash64(); @@ -41,26 +42,38 @@ public void testOptionRule() { Assertions.assertNotNull((new ClickhouseFileSinkFactory()).optionRule()); } - public int getShard(Object shardValue) { - int shardWeightCount = 6; - int offset = - (int) - ((HASH_INSTANCE.hash( - ByteBuffer.wrap( - shardValue - .toString() - .getBytes(StandardCharsets.UTF_8)), - 0) - & Long.MAX_VALUE) - % shardWeightCount); - return offset; - } - @Test public void testShared() { - String a = "a,b,c,d,e,f"; - for (Object o : Arrays.stream(a.split(",")).toArray()) { - System.out.println(getShard(o)); + // Create an instance of the XXHash64 algorithm + XXHashFactory factory = XXHashFactory.fastestInstance(); + XXHash64 hash64 = factory.hash64(); + + // Define your input data + byte[] input; + ArrayList strings = new ArrayList<>(); + + Map resultCount = new HashMap<>(); + for (int i = 1; i <= 1000000; i++) { + input = UUID.randomUUID().toString().getBytes(); + // Calculate the hash value + long hashValue = hash64.hash(input, 0, input.length, 0); + + // Apply modulo operation to get a non-negative result + int modulo = 10; + long nonNegativeResult = (hashValue & Long.MAX_VALUE) % modulo; + Long keyValue = resultCount.get(nonNegativeResult); + + if (keyValue != null) { + resultCount.put(nonNegativeResult, keyValue + 1L); + + } else { + resultCount.put(nonNegativeResult, 1L); + } + } + Long totalResult = 0L; + for (Long key : resultCount.keySet()) { + totalResult += resultCount.get(key); } + Assertions.assertEquals(1000000, totalResult); } }