Open
Description
Search before asking
- I searched in the issues and found nothing similar.
Paimon version
master : [7f01330]
Compute Engine
Flink
Minimal reproduce step
Basically executing a Test case in PreAggregationITCase.java
If you run a ddl like this, since aggregation-function avg is not in the classpath, nothing happens and it run successfully.
@Test
public void testWrongOption() {
// VALUES does not guarantee order, but order is important for list aggregations.
// So we need to sort the input data.
batchSql(
"CREATE TABLE IF NOT EXISTS T4 ("
+ "j INT, k INT, "
+ "a INT, "
+ "b INT,"
+ "PRIMARY KEY (j,k) NOT ENFORCED)"
+ " WITH ('merge-engine'='aggregation', "
+ "'fields.a.aggregate-function'='avg', "
+ "'fields.b.aggregate-function'='last_non_null_value'"
+ ");");
}
You won't get error until executing an insert like this.
@Test
public void testWrongOption() {
// VALUES does not guarantee order, but order is important for list aggregations.
// So we need to sort the input data.
batchSql(
"CREATE TABLE IF NOT EXISTS T4 ("
+ "j INT, k INT, "
+ "a INT, "
+ "b INT,"
+ "PRIMARY KEY (j,k) NOT ENFORCED)"
+ " WITH ('merge-engine'='aggregation', "
+ "'fields.a.aggregate-function'='avg', "
+ "'fields.b.aggregate-function'='last_non_null_value'"
+ ");");
batchSql("INSERT INTO T4 VALUES (1, 2, 1, 2)");
}
The error message is as below:
Caused by: org.apache.paimon.factories.FactoryException: Could not find any factory for identifier 'avg' that implements 'org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory' in the classpath.
Available factory identifiers are:
custom
bool_and
bool_or
collect
first_non_null_value
first_not_null_value
first_value
hll_sketch
last_non_null_value
last_value
listagg
max
merge_map
min
nested_update
primary-key
product
rbm32
rbm64
sum
theta_sketch
at org.apache.paimon.factories.FactoryUtil.discoverFactory(FactoryUtil.java:66)
at org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory.create(FieldAggregatorFactory.java:38)
at org.apache.paimon.mergetree.compact.aggregate.AggregateMergeFunction$Factory.create(AggregateMergeFunction.java:145)
at org.apache.paimon.mergetree.compact.MergeFunctionFactory.create(MergeFunctionFactory.java:30)
at org.apache.paimon.operation.KeyValueFileStoreWrite.createWriter(KeyValueFileStoreWrite.java:222)
at org.apache.paimon.operation.KeyValueFileStoreWrite.createWriter(KeyValueFileStoreWrite.java:95)
at org.apache.paimon.operation.AbstractFileStoreWrite.createWriterContainer(AbstractFileStoreWrite.java:445)
at org.apache.paimon.operation.AbstractFileStoreWrite.lambda$getWriterWrapper$5(AbstractFileStoreWrite.java:407)
at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
at org.apache.paimon.operation.AbstractFileStoreWrite.getWriterWrapper(AbstractFileStoreWrite.java:406)
at org.apache.paimon.operation.AbstractFileStoreWrite.write(AbstractFileStoreWrite.java:157)
at org.apache.paimon.table.sink.TableWriteImpl.writeAndReturn(TableWriteImpl.java:187)
at org.apache.paimon.flink.sink.StoreSinkWriteImpl.write(StoreSinkWriteImpl.java:195)
at org.apache.paimon.flink.sink.DynamicBucketRowWriteOperator.processElement(DynamicBucketRowWriteOperator.java:54)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:238)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)
What doesn't meet your expectations?
I think it's suppose to validate the options and throw an exception when we executing a ddl with wrong options instead of throw an exception when we executing the insert.
Anything else?
No response
Are you willing to submit a PR?
- I'm willing to submit a PR!