Skip to content

[Bug] aggregation options are not verified when executing DDL #5079

Open
@qingfei1994

Description

@qingfei1994

Search before asking

  • I searched in the issues and found nothing similar.

Paimon version

master : [7f01330]

Compute Engine

Flink

Minimal reproduce step

Basically executing a Test case in PreAggregationITCase.java
If you run a ddl like this, since aggregation-function avg is not in the classpath, nothing happens and it run successfully.

 @Test
        public void testWrongOption() {
            // VALUES does not guarantee order, but order is important for list aggregations.
            // So we need to sort the input data.
            batchSql(
                    "CREATE TABLE IF NOT EXISTS T4 ("
                            + "j INT, k INT, "
                            + "a INT, "
                            + "b INT,"
                            + "PRIMARY KEY (j,k) NOT ENFORCED)"
                            + " WITH ('merge-engine'='aggregation', "
                            + "'fields.a.aggregate-function'='avg', "
                            + "'fields.b.aggregate-function'='last_non_null_value'"
                            + ");");
        }

You won't get error until executing an insert like this.

  @Test
        public void testWrongOption() {
            // VALUES does not guarantee order, but order is important for list aggregations.
            // So we need to sort the input data.
            batchSql(
                    "CREATE TABLE IF NOT EXISTS T4 ("
                            + "j INT, k INT, "
                            + "a INT, "
                            + "b INT,"
                            + "PRIMARY KEY (j,k) NOT ENFORCED)"
                            + " WITH ('merge-engine'='aggregation', "
                            + "'fields.a.aggregate-function'='avg', "
                            + "'fields.b.aggregate-function'='last_non_null_value'"
                            + ");");
            batchSql("INSERT INTO T4 VALUES (1, 2, 1, 2)");
        }

The error message is as below:

Caused by: org.apache.paimon.factories.FactoryException: Could not find any factory for identifier 'avg' that implements 'org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory' in the classpath.

Available factory identifiers are:

custom
bool_and
bool_or
collect
first_non_null_value
first_not_null_value
first_value
hll_sketch
last_non_null_value
last_value
listagg
max
merge_map
min
nested_update
primary-key
product
rbm32
rbm64
sum
theta_sketch
	at org.apache.paimon.factories.FactoryUtil.discoverFactory(FactoryUtil.java:66)
	at org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory.create(FieldAggregatorFactory.java:38)
	at org.apache.paimon.mergetree.compact.aggregate.AggregateMergeFunction$Factory.create(AggregateMergeFunction.java:145)
	at org.apache.paimon.mergetree.compact.MergeFunctionFactory.create(MergeFunctionFactory.java:30)
	at org.apache.paimon.operation.KeyValueFileStoreWrite.createWriter(KeyValueFileStoreWrite.java:222)
	at org.apache.paimon.operation.KeyValueFileStoreWrite.createWriter(KeyValueFileStoreWrite.java:95)
	at org.apache.paimon.operation.AbstractFileStoreWrite.createWriterContainer(AbstractFileStoreWrite.java:445)
	at org.apache.paimon.operation.AbstractFileStoreWrite.lambda$getWriterWrapper$5(AbstractFileStoreWrite.java:407)
	at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
	at org.apache.paimon.operation.AbstractFileStoreWrite.getWriterWrapper(AbstractFileStoreWrite.java:406)
	at org.apache.paimon.operation.AbstractFileStoreWrite.write(AbstractFileStoreWrite.java:157)
	at org.apache.paimon.table.sink.TableWriteImpl.writeAndReturn(TableWriteImpl.java:187)
	at org.apache.paimon.flink.sink.StoreSinkWriteImpl.write(StoreSinkWriteImpl.java:195)
	at org.apache.paimon.flink.sink.DynamicBucketRowWriteOperator.processElement(DynamicBucketRowWriteOperator.java:54)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:238)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
	at java.lang.Thread.run(Thread.java:748)

What doesn't meet your expectations?

I think it's suppose to validate the options and throw an exception when we executing a ddl with wrong options instead of throw an exception when we executing the insert.

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions