Flink: fix TableSink anonymous object#7866
Conversation
|
@mumuhhh could you please add some description to the PR, explaining what issue this PR is fixing? |
|
@mumuhhh could you add a test please that reproduces this issue? |
|
|
@mumuhhh sorry for not being clear, what I mean is: Can you please add a unit test as part of this PR to Iceberg that reproduces this? |
# Conflicts: # flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java # flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java # flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java
| .option("warehouse", warehouseDir.getAbsolutePath()) | ||
| .build(); | ||
|
|
||
| Exception exception = |
There was a problem hiding this comment.
please replace this with
Assertions.assertThatThrownBy(() -> table.insertInto(descriptor).execute())
.isInstanceOf(ValidationException.class)
.hasMessage("...")
| exception.getCause().getMessage()); | ||
| Assertions.assertThatThrownBy(() -> table.insertInto(descriptor).execute()) | ||
| .isInstanceOf(ValidationException.class) | ||
| .hasCause( |
There was a problem hiding this comment.
for cause validation you can do .isInstanceOf(ValidationException.class).cause().isInstanceOf(TableException.class).hasMessage(..)
|
Previously, it was the code that reproduced the exception. I fixed writing to anonymous tables, so I removed exception capture. |
| .option("warehouse", warehouseDir.getAbsolutePath()) | ||
| .build(); | ||
|
|
||
| table.insertInto(descriptor).execute(); |
There was a problem hiding this comment.
Right now this test doesn't have any particular assertion other than just executing code that was previously failing.
I see that this isn't failing anymore with your fix, but it's generally good practice to add some assertions here that make sure the write you expect succeeded (independently from whether an exception has been thrown or not).
| .build(); | ||
|
|
||
| table.insertInto(descriptor).execute().await(); | ||
| Assert.assertTrue( |
There was a problem hiding this comment.
we generally want to avoid plain assertTrue/assertFalse assertions as those are less descriptive. It would be better to have
table.insertInto(descriptor).execute();
Awaitility.await()
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(
() ->
Assertions.assertThat(
warehouseDir.toPath().resolve("test_db").resolve("test").toFile())
.exists());
This will handle the async nature of the test and also provide a meaningful error msg in case the assertion ever fails
| @Override | ||
| public DynamicTableSink createDynamicTableSink(Context context) { | ||
| ObjectPath objectPath = context.getObjectIdentifier().toObjectPath(); | ||
| ObjectIdentifier objectIdentifier = context.getObjectIdentifier(); |
There was a problem hiding this comment.
what is the anonymous class issue? which class is anonymous? do you have the full stack trace?
toObjectPath seems fine to me.
public ObjectPath toObjectPath() throws TableException {
if (catalogName == null) {
throw new TableException(
"This ObjectIdentifier instance refers to an anonymous object, "
+ "hence it cannot be converted to ObjectPath and cannot be serialized.");
}
return new ObjectPath(databaseName, objectName);
}
There was a problem hiding this comment.
Execute the following code:
File warehouseDir = Files.createTempDir();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
Table table =
tEnv.from(
TableDescriptor.forConnector("datagen")
.schema(
Schema.newBuilder()
.column("f0", DataTypes.STRING())
.build())
.option("number-of-rows", "3")
.build());
TableDescriptor descriptor =
TableDescriptor.forConnector("iceberg")
.schema(Schema.newBuilder().column("f0", DataTypes.STRING()).build())
.option("catalog-name", "hadoop_test")
.option("catalog-type", "hadoop")
.option("catalog-database", "test_db")
.option("catalog-table", "test")
.option("warehouse", warehouseDir.getAbsolutePath())
.build();
table.insertInto(descriptor).execute();
An exception will occur
Unable to create a sink for writing table '*anonymous_iceberg$2*'.
Table options are:
'catalog-database'='test_db'
'catalog-name'='hadoop_test'
'catalog-table'='test'
'catalog-type'='hadoop'
'connector'='iceberg'
'warehouse'='C:\Users\huawei\AppData\Local\Temp\1687560451312-0'
org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table '*anonymous_iceberg$2*'.
Table options are:
'catalog-database'='test_db'
'catalog-name'='hadoop_test'
'catalog-table'='test'
'catalog-type'='hadoop'
'connector'='iceberg'
'warehouse'='C:\Users\huawei\AppData\Local\Temp\1687560451312-0'
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:262)
at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:434)
at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:227)
at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:185)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:285)
at scala.collection.TraversableLike.map$(TraversableLike.scala:278)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1656)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:782)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:861)
at org.apache.flink.table.api.internal.TablePipelineImpl.execute(TablePipelineImpl.java:56)
at com.test.TestFlinkAnonymousTable.testWriteAnonymousTable(TestFlinkAnonymousTable.java:48)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
at org.gradle.api.internal.tasks.testing.worker.TestWorker$2.run(TestWorker.java:176)
at org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129)
at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100)
at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60)
at org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:133)
at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:71)
at worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
at worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)
Caused by: org.apache.flink.table.api.TableException: This ObjectIdentifier instance refers to an anonymous object, hence it cannot be converted to ObjectPath and cannot be serialized.
at org.apache.flink.table.catalog.ObjectIdentifier.toObjectPath(ObjectIdentifier.java:112)
at org.apache.iceberg.flink.FlinkDynamicTableFactory.createDynamicTableSink(FlinkDynamicTableFactory.java:108)
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:259)
... 63 more
There was a problem hiding this comment.
thanks. I misunderstood the error msg earlier. I thought it is about Java anonymous class and Java serialization. it is the unnamed ObjectIdentifier with null catalog name. this change makes sense to me now.
|
thanks for the fix @mumuhhh and thanks for reviewing @stevenzwu |
create a table description using
TableDescriptor.When calling
table.insertInto(descriptor), an error is reported. The exceptions are as follows:This ObjectIdentifier instance refers to an anonymous object, hence it cannot be converted to ObjectPath and cannot be serialized.