Skip to content

[Bug] [seatunnel-connectors] [seatunnel-connector-spark-clickhouse] Data cannot be imported to all nodes by configuring split_mode and sharding_key #4772

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: dev
Choose a base branch
from

Conversation

wuxizhi777
Copy link

Purpose of this pull request

#4770

Check list

Sorry, something went wrong.

@TyrantLucifer TyrantLucifer requested a review from Hisoka-X May 17, 2023 16:08

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
@zhilinli123
Copy link
Contributor

PTAL : @Hisoka-X

@Hisoka-X
Copy link
Member

Hisoka-X commented Jun 6, 2023

Good catch! Can you add UT for getShard method? Thanks

@zhilinli123
Copy link
Contributor

zhilinli123 commented Jun 6, 2023

Good catch! Can you add UT for getShard method? Thanks

image It's not a negative number, right? @Hisoka-X

@Hisoka-X
Copy link
Member

Hisoka-X commented Jun 6, 2023

It's not a negative number, right? @Hisoka-X

@zhilinli123 It can't be negative. I suggest compare two result of change before and change after.

@zhilinli123
Copy link
Contributor

It's not a negative number, right? @Hisoka-X

@zhilinli123 It can't be negative. I suggest compare two result of change before and change after.

@Test
 public void testHashValue() {
     String a = "a,b,c,d,e,f";
     for (Object o : Arrays.stream(a.split(",")).toArray()) {
         System.out.println(getShard(o));
     }
 }
 public int getShard(Object shardValue) {
     int offset =
             (int)
                     (HASH_INSTANCE.hash(
                             ByteBuffer.wrap(
                                     shardValue
                                             .toString()
                                             .getBytes(StandardCharsets.UTF_8)),
                             0)
                                     & Long.MAX_VALUE % 6);
     return offset;
 }

//  Before            After
//  1                   -5
//  1                   3
//  1                   -5
//  0                   2
//  0                   4
//  1                   -3

@zhilinli123
Copy link
Contributor

zhilinli123 commented Jun 6, 2023

It's not a negative number, right? @Hisoka-X

@zhilinli123 It can't be negative. I suggest compare two result of change before and change after.

    public static void main(String[] args) {
        // Create an instance of the XXHash64 algorithm
        XXHashFactory factory = XXHashFactory.fastestInstance();
        XXHash64 hash64 = factory.hash64();

        // Define your input data
        byte[] input ;
        String a = "a,b,c,d,e,f";
        for (String inputs : a.split(",")) {
            input = inputs.getBytes();
            // Calculate the hash value
            long hashValue = hash64.hash(input, 0, input.length, 0);

            // Apply modulo operation to get a non-negative result
            int modulo = 6;
            long nonNegativeResult = (hashValue & Long.MAX_VALUE) % modulo;

            // Print the non-negative result
            System.out.println("Non-negative result: " + nonNegativeResult);

        }
    }

Console:

Non-negative result: 3
Non-negative result: 3
Non-negative result: 3
Non-negative result: 2
Non-negative result: 4
Non-negative result: 5

I have written a simple test demo about this problem, please help to see if it meets the expectations
@Hisoka-X

@Hisoka-X
Copy link
Member

Hisoka-X commented Jun 7, 2023

@zhilinli123 Any reason for negative result?

@Carl-Zhou-CN
Copy link
Member

Carl-Zhou-CN commented Jun 7, 2023

@Hisoka-X ,@zhilinli123 My guess is that it may be due to the conversion of a long to an int, which resulted in a negative number.

@zhilinli123
Copy link
Contributor

@Hisoka-X ,@zhilinli123 My guess is that it may be due to the conversion of a long to an int, which resulted in a negative number.

I put the results of the execution on top

@Carl-Zhou-CN
Copy link
Member

@Hisoka-X ,@zhilinli123 My guess is that it may be due to the conversion of a long to an int, which resulted in a negative number.

I put the results of the execution on top

I am just explaining the reason for the appearance of negative numbers.

@Carl-Zhou-CN
Copy link
Member

I think
int offset =
(int)
(HASH_INSTANCE.hash(
ByteBuffer.wrap(
shardValue
.toString()
.getBytes(StandardCharsets.UTF_8)),
0)
& Integer.MAX_VALUE) % 6 ;

@wuxizhi777
Copy link
Author

wuxizhi777 commented Jun 8, 2023

public class test {
    private static final XXHash64 HASH_INSTANCE = XXHashFactory.fastestInstance().hash64();
    public   int getShard(Object shardValue){
        int shardWeightCount = 6;
        int offset =
                (int)
                        ((HASH_INSTANCE.hash(
                                ByteBuffer.wrap(
                                        shardValue
                                                .toString()
                                                .getBytes(StandardCharsets.UTF_8)),
                                0)
                                & Long.MAX_VALUE)
                                % shardWeightCount);
        return  offset;
    }

    @Test
    public  void testShared(){
        String a = "a,b,c,d,e,f";
        for(Object o: Arrays.stream(a.split(",")).toArray()){
            System.out.println(getShard(o));
        }

    }
}

===================
the result is :
3
3
3
2
4
5

@Carl-Zhou-CN
Copy link
Member

@wuxizhi777 You're right. Can you help add this test case to this pull request?

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
Copy link
Member

@TyrantLucifer TyrantLucifer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your contribution. Please add unit test.

@github-actions github-actions bot removed the reviewed label Jun 8, 2023
@TyrantLucifer
Copy link
Member

@Hisoka-X PTAL.

public void testShared() {
String a = "a,b,c,d,e,f";
for (Object o : Arrays.stream(a.split(",")).toArray()) {
System.out.println(getShard(o));
Copy link
Member

@Hisoka-X Hisoka-X Jun 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use Assertions to make sure to result are right. Also please provide the test case to make sure to all shard can be selected. Tips: maybe we can use 10000 keys with random string to invoke getShard then get each shard index count which return by getShard. I believe when the number of keys more large, the shard index will be more balanced.

Copy link
Contributor

@zhilinli123 zhilinli123 Jun 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use Assertions to make sure to result are right. Also please provide the test case to make sure to all shard can be selected. Tips: maybe we can use 10000 keys with random string to invoke getShard then get each shard index count which return by getShard. I believe when the number of keys more large, the shard index will be more balanced.

public static void main(String[] args) {
        // Create an instance of the XXHash64 algorithm
        XXHashFactory factory = XXHashFactory.fastestInstance();
        XXHash64 hash64 = factory.hash64();

        // Define your input data
        byte[] input;
        ArrayList<String> strings = new ArrayList<>();

        Map<Long, Long> resultCount = new HashMap<>();
        for (int i = 1; i <= 100000; i++) {
            input = UUID.randomUUID().toString().getBytes();
            // Calculate the hash value
            long hashValue = hash64.hash(input, 0, input.length, 0);

            // Apply modulo operation to get a non-negative result
            int modulo = 6;
            long nonNegativeResult = (hashValue & Long.MAX_VALUE) % modulo;
            Long keyValue = resultCount.get(nonNegativeResult);  /

            if (keyValue != null) {
                resultCount.put(nonNegativeResult, keyValue + 1L);

            } else {
                resultCount.put(nonNegativeResult, 1L);
            }

            // Print the non-negative result
//            System.out.println("Non-negative result: " + nonNegativeResult);

        }
        Long totalResult = 0L;
        for (Long key : resultCount.keySet()) {
            System.out.println("Key:"+key+" count:"+resultCount.get(key));
            totalResult+=resultCount.get(key);
        }
        System.out.println("Sum:"+totalResult);

        
//        Console Result:
//        Key:0 count:16651
//        Key:1 count:16595
//        Key:2 count:16648
//        Key:3 count:16650
//        Key:4 count:16946
//        Key:5 count:16510
//        Sum:100000

    }

Looks good, PTAL

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, please add this part test code to branch. Also remember, Please use Assertions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, please add this part test code to branch. Also remember, Please use Assertions

CC @wuxizhi777

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, boss, I hown

@1309637127
Copy link

This cause 2 problems:
1、Seatunnel push data to only part of the nodes
We have 6 nodes in the product environment and only 4 nodes have data, the other have no data.
2、We expect the xxHash have the same xxHash result as Clickhouse
We have further ETL job in Clickhouse, we use cust_no as the sharding key, we expect all the data concerning with the same cust_no stay in the same node, but we've test the xxHash result with Clickhouse result they are not equal.

Below is the java xxHash and Clickhouse xxHash result "00000186620663":
java xxHash: -1055457840867779448
java xxHash & Long.MAX result: 8167914195986996360
Clickhouse xxHash: 17391286232841772168

And with the xxHash result seatunnel push the customer to shard0 and in clickhouse put the customer is in shard 5

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
@ic4y ic4y closed this Aug 6, 2023
@ic4y ic4y reopened this Aug 6, 2023
@zhilinli123
Copy link
Contributor

PTAL: @Hisoka-X

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
@@ -1,32 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert this file


@Test
public void testOptionRule() {
Assertions.assertNotNull((new ClickhouseSourceFactory()).optionRule());
Assertions.assertNotNull((new ClickhouseSinkFactory()).optionRule());
Assertions.assertNotNull((new ClickhouseFileSinkFactory()).optionRule());
}

@Test
public void testShared() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test case look weird, it only prove the logic of (hashValue & Long.MAX_VALUE) % modulo are right. But can not make sure the getShard method return right result. I think the test case should test getShard method.

Copy link

This pull request has been automatically marked as stale because it has not had recent activity for 120 days. It will be closed in 7 days if no further activity occurs.

@github-actions github-actions bot added the stale label Jul 14, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
8 participants