Skip to content

Flink: fix TableSink anonymous object#7866

Merged
nastra merged 13 commits into
apache:masterfrom
mumuhhh:master
Jun 26, 2023
Merged

Flink: fix TableSink anonymous object#7866
nastra merged 13 commits into
apache:masterfrom
mumuhhh:master

Conversation

@mumuhhh
Copy link
Copy Markdown
Contributor

@mumuhhh mumuhhh commented Jun 20, 2023

create a table description using TableDescriptor.
When calling table.insertInto(descriptor), an error is reported. The exceptions are as follows:
This ObjectIdentifier instance refers to an anonymous object, hence it cannot be converted to ObjectPath and cannot be serialized.

@github-actions github-actions Bot added the flink label Jun 20, 2023
@nastra
Copy link
Copy Markdown
Contributor

nastra commented Jun 20, 2023

@mumuhhh could you please add some description to the PR, explaining what issue this PR is fixing?

@nastra
Copy link
Copy Markdown
Contributor

nastra commented Jun 21, 2023

@mumuhhh could you add a test please that reproduces this issue?

@mumuhhh
Copy link
Copy Markdown
Contributor Author

mumuhhh commented Jun 21, 2023

@mumuhhh could you add a test please that reproduces this issue?

Table table = tEnv.from(TableDescriptor.forConnector("datagen")
  .schema(Schema.newBuilder()
    .column("f0", DataTypes.STRING())
    .build())
  .build());

TableDescriptor descriptor = TableDescriptor.forConnector("iceberg")
  .schema(Schema.newBuilder()
    .column("f0", DataTypes.STRING())
    .build())
  .option("catalog-name", "hadoop_test")
  .option("catalog-type", "hadoop")
  .option("catalog-database", "test_db")
  .option("catalog-table", "test")
  .option("warehouse", "/tmp/iceberg/warehouse/")
  .build()

table.insertInto(descriptor)

@nastra
Copy link
Copy Markdown
Contributor

nastra commented Jun 21, 2023

@mumuhhh sorry for not being clear, what I mean is: Can you please add a unit test as part of this PR to Iceberg that reproduces this?

mumuhhh added 2 commits June 21, 2023 16:33
# Conflicts:
#	flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java
#	flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java
#	flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java
.option("warehouse", warehouseDir.getAbsolutePath())
.build();

Exception exception =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please replace this with

Assertions.assertThatThrownBy(() -> table.insertInto(descriptor).execute())
    .isInstanceOf(ValidationException.class)
    .hasMessage("...")

exception.getCause().getMessage());
Assertions.assertThatThrownBy(() -> table.insertInto(descriptor).execute())
.isInstanceOf(ValidationException.class)
.hasCause(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for cause validation you can do .isInstanceOf(ValidationException.class).cause().isInstanceOf(TableException.class).hasMessage(..)

@mumuhhh
Copy link
Copy Markdown
Contributor Author

mumuhhh commented Jun 21, 2023

Previously, it was the code that reproduced the exception. I fixed writing to anonymous tables, so I removed exception capture.

.option("warehouse", warehouseDir.getAbsolutePath())
.build();

table.insertInto(descriptor).execute();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now this test doesn't have any particular assertion other than just executing code that was previously failing.
I see that this isn't failing anymore with your fix, but it's generally good practice to add some assertions here that make sure the write you expect succeeded (independently from whether an exception has been thrown or not).

@nastra nastra requested a review from stevenzwu June 21, 2023 10:34
.build();

table.insertInto(descriptor).execute().await();
Assert.assertTrue(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we generally want to avoid plain assertTrue/assertFalse assertions as those are less descriptive. It would be better to have

    table.insertInto(descriptor).execute();
    Awaitility.await()
        .atMost(5, TimeUnit.SECONDS)
        .untilAsserted(
            () ->
                Assertions.assertThat(
                        warehouseDir.toPath().resolve("test_db").resolve("test").toFile())
                    .exists());

This will handle the async nature of the test and also provide a meaningful error msg in case the assertion ever fails

@github-actions github-actions Bot added the build label Jun 21, 2023
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
ObjectPath objectPath = context.getObjectIdentifier().toObjectPath();
ObjectIdentifier objectIdentifier = context.getObjectIdentifier();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the anonymous class issue? which class is anonymous? do you have the full stack trace?

toObjectPath seems fine to me.

    public ObjectPath toObjectPath() throws TableException {
        if (catalogName == null) {
            throw new TableException(
                    "This ObjectIdentifier instance refers to an anonymous object, "
                            + "hence it cannot be converted to ObjectPath and cannot be serialized.");
        }
        return new ObjectPath(databaseName, objectName);
    }

Copy link
Copy Markdown
Contributor Author

@mumuhhh mumuhhh Jun 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Execute the following code:

        File warehouseDir = Files.createTempDir();
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();

        TableEnvironment tEnv = TableEnvironment.create(settings);
        Table table =
                tEnv.from(
                        TableDescriptor.forConnector("datagen")
                                .schema(
                                        Schema.newBuilder()
                                                .column("f0", DataTypes.STRING())
                                                .build())
                                .option("number-of-rows", "3")
                                .build());

        TableDescriptor descriptor =
                TableDescriptor.forConnector("iceberg")
                        .schema(Schema.newBuilder().column("f0", DataTypes.STRING()).build())
                        .option("catalog-name", "hadoop_test")
                        .option("catalog-type", "hadoop")
                        .option("catalog-database", "test_db")
                        .option("catalog-table", "test")
                        .option("warehouse", warehouseDir.getAbsolutePath())
                        .build();

        table.insertInto(descriptor).execute();

An exception will occur

Unable to create a sink for writing table '*anonymous_iceberg$2*'.

Table options are:

'catalog-database'='test_db'
'catalog-name'='hadoop_test'
'catalog-table'='test'
'catalog-type'='hadoop'
'connector'='iceberg'
'warehouse'='C:\Users\huawei\AppData\Local\Temp\1687560451312-0'
org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table '*anonymous_iceberg$2*'.

Table options are:

'catalog-database'='test_db'
'catalog-name'='hadoop_test'
'catalog-table'='test'
'catalog-type'='hadoop'
'connector'='iceberg'
'warehouse'='C:\Users\huawei\AppData\Local\Temp\1687560451312-0'
	at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:262)
	at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:434)
	at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:227)
	at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:185)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:285)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:278)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1656)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:782)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:861)
	at org.apache.flink.table.api.internal.TablePipelineImpl.execute(TablePipelineImpl.java:56)
	at com.test.TestFlinkAnonymousTable.testWriteAnonymousTable(TestFlinkAnonymousTable.java:48)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
	at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
	at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
	at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
	at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
	at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker$2.run(TestWorker.java:176)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60)
	at org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
	at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:133)
	at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:71)
	at worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
	at worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)
Caused by: org.apache.flink.table.api.TableException: This ObjectIdentifier instance refers to an anonymous object, hence it cannot be converted to ObjectPath and cannot be serialized.
	at org.apache.flink.table.catalog.ObjectIdentifier.toObjectPath(ObjectIdentifier.java:112)
	at org.apache.iceberg.flink.FlinkDynamicTableFactory.createDynamicTableSink(FlinkDynamicTableFactory.java:108)
	at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:259)
	... 63 more

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks. I misunderstood the error msg earlier. I thought it is about Java anonymous class and Java serialization. it is the unnamed ObjectIdentifier with null catalog name. this change makes sense to me now.

@nastra
Copy link
Copy Markdown
Contributor

nastra commented Jun 26, 2023

thanks for the fix @mumuhhh and thanks for reviewing @stevenzwu

@nastra nastra merged commit 5e0e5e3 into apache:master Jun 26, 2023
rodmeneses pushed a commit to rodmeneses/iceberg that referenced this pull request Feb 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants