Core: Files table predicate filtering wrong for tables with evolved partition specs#4520
Conversation
|
Thanks to @aokolnychyi for finding it in #4382 (comment) and before. FYI @RussellSpitzer @rdblue seems its bad issue for tables with changed partitions specs, as files table may return less results with a filter. It's present in 0.12 and 0.13 release, and I think a good candidate for 0.13.2 release. Sorry I did not check this case initially. |
| Expression rowFilter, | ||
| boolean caseSensitive) { | ||
|
|
||
| private boolean filter(ManifestFile manifestFile, Expression rowFilter, boolean caseSensitive) { |
There was a problem hiding this comment.
Instead of giving up if the manifest spec ID does not match the current table spec, I think we should use the manifest spec to project the row filter and push down whatever we can. Remember that the partition type will be a union of all partition columns ever used in the table.
Another problem with the current implementation is that we call transformSpec and create a manifest evaluator for every manifest file. That's expensive. We usually use LoadingCache to reuse manifest evaluators.
Also, why not use ManifestEvalutor.forRowFilter instead of doing the projection ourselves?
Will something like this work if we modify planFiles?
Map<Integer, PartitionSpec> specsById = ops.current().specsById();
LoadingCache<Integer, ManifestEvaluator> evalCache = Caffeine.newBuilder().build(specId -> {
PartitionSpec spec = specsById.get(specId);
PartitionSpec transformedSpec = transformSpec(fileSchema, spec, PARTITION_FIELD_PREFIX);
return ManifestEvaluator.forRowFilter(rowFilter, transformedSpec, caseSensitive);
});
CloseableIterable<ManifestFile> filtered = CloseableIterable.filter(
manifests(),
manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest));
There was a problem hiding this comment.
Hi @aokolnychyi yea ideally we would try harder to filter even on older specs, I was just looking to fix the problem earlier.
So the problem I run into here, is if a field is renamed (ie, TestMetadataTablesWithPartitionEvolution.testFilesMetadataTable). In this case, transformSpec() method tries to bind the expression with the old spec on the fileSchema. But it can't find the old field anymore. As you know, the fileSchema is derived from Partitioning.partitionType, which has union of all spec fields, but I'm guessing not the old names.
Stack trace:
Cannot find source column: partition.category_bucket_8
java.lang.IllegalArgumentException: Cannot find source column: partition.category_bucket_8
at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument(Preconditions.java:220)
at org.apache.iceberg.PartitionSpec$Builder.findSourceColumn(PartitionSpec.java:397)
at org.apache.iceberg.PartitionSpec$Builder.identity(PartitionSpec.java:402)
at org.apache.iceberg.BaseMetadataTable.lambda$transformSpec$0(BaseMetadataTable.java:66)
at org.apache.iceberg.relocated.com.google.common.collect.ImmutableList.forEach(ImmutableList.java:422)
at org.apache.iceberg.BaseMetadataTable.transformSpec(BaseMetadataTable.java:66)
at org.apache.iceberg.BaseFilesTable$BaseFilesTableScan.lambda$planFiles$0(BaseFilesTable.java:103)
at com.github.benmanes.caffeine.cache.LocalLoadingCache.lambda$newMappingFunction$2(LocalLoadingCache.java:141)
at com.github.benmanes.caffeine.cache.UnboundedLocalCache.lambda$computeIfAbsent$2(UnboundedLocalCache.java:238)
at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705)
at com.github.benmanes.caffeine.cache.UnboundedLocalCache.computeIfAbsent(UnboundedLocalCache.java:234)
at com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108)
at com.github.benmanes.caffeine.cache.LocalLoadingCache.get(LocalLoadingCache.java:54)
at org.apache.iceberg.BaseFilesTable$BaseFilesTableScan.lambda$planFiles$1(BaseFilesTable.java:109)
at org.apache.iceberg.io.CloseableIterable$3.shouldKeep(CloseableIterable.java:82)
at org.apache.iceberg.io.FilterIterator.advance(FilterIterator.java:67)
at org.apache.iceberg.io.FilterIterator.hasNext(FilterIterator.java:50)
at org.apache.iceberg.io.CloseableIterable$4$1.hasNext(CloseableIterable.java:108)
at org.apache.iceberg.relocated.com.google.common.collect.Iterators.addAll(Iterators.java:366)
at org.apache.iceberg.relocated.com.google.common.collect.Lists.newArrayList(Lists.java:147)
at org.apache.iceberg.relocated.com.google.common.collect.Lists.newArrayList(Lists.java:133)
at org.apache.iceberg.spark.source.SparkBatchQueryScan.files(SparkBatchQueryScan.java:115)
at org.apache.iceberg.spark.source.SparkBatchQueryScan.tasks(SparkBatchQueryScan.java:128)
at org.apache.iceberg.spark.source.SparkScan.toBatch(SparkScan.java:108)
at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.batch$lzycompute(BatchScanExec.scala:40)
at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.batch(BatchScanExec.scala:40)
at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.partitions$lzycompute(BatchScanExec.scala:52)
at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.partitions(BatchScanExec.scala:52)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase.supportsColumnar(DataSourceV2ScanExecBase.scala:93)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase.supportsColumnar$(DataSourceV2ScanExecBase.scala:92)
at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.supportsColumnar(BatchScanExec.scala:35)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy.apply(DataSourceV2Strategy.scala:123)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:68)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:68)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:68)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:68)
at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:468)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$2(QueryExecution.scala:157)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:196)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:196)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:157)
at org.apache.spark.sql.execution.QueryExecution.withCteMap(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:150)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:150)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$2(QueryExecution.scala:170)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:196)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:196)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:170)
at org.apache.spark.sql.execution.QueryExecution.withCteMap(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:163)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:101)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
at org.apache.spark.sql.Dataset.collectAsList(Dataset.scala:2982)
at org.apache.iceberg.spark.source.TestMetadataTablesWithPartitionEvolution.assertPartitions(TestMetadataTablesWithPartitionEvolution.java:328)
at org.apache.iceberg.spark.source.TestMetadataTablesWithPartitionEvolution.testFilesMetadataTable(TestMetadataTablesWithPartitionEvolution.java:185)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runners.Suite.runChild(Suite.java:128)
at org.junit.runners.Suite.runChild(Suite.java:27)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
at org.gradle.api.internal.tasks.testing.worker.TestWorker$2.run(TestWorker.java:176)
at org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129)
at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100)
at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60)
at org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:133)
at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:71)
at worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
at worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)
There was a problem hiding this comment.
It looks like a bug in transformSpec that assumes old partition fields are never renamed. If we switch to using column IDs, will it help?
static PartitionSpec transformSpec(Schema metadataTableSchema, PartitionSpec spec) {
PartitionSpec.Builder identitySpecBuilder = PartitionSpec.builderFor(metadataTableSchema).checkConflicts(false);
spec.fields().forEach(pf -> identitySpecBuilder.add(pf.fieldId(), pf.name(), "identity"));
return identitySpecBuilder.build();
}
2b5fa55 to
9e3ab98
Compare
| formatVersion() | ||
| 1 | ||
| }, | ||
| { "testhive", SparkCatalog.class.getName(), |
There was a problem hiding this comment.
Just adding two more runs , should not take too long, and I think will make this test much more deterministic (right now randomly picks a version)
| static PartitionSpec transformSpec(Schema metadataTableSchema, PartitionSpec spec) { | ||
| PartitionSpec.Builder identitySpecBuilder = PartitionSpec.builderFor(metadataTableSchema).checkConflicts(false); | ||
| spec.fields().forEach(pf -> identitySpecBuilder.identity(partitionPrefix + pf.name(), pf.name())); | ||
| spec.fields().forEach(pf -> identitySpecBuilder.add(pf.fieldId(), pf.name(), "identity")); |
There was a problem hiding this comment.
This uses the field id to do the transform from metadata table schema to table schema to catch partition field rename cases (old way was to manual remove 'partition.' from the field name). From the suggestion: #4520 (comment)
|
@rdblue would you also want to take a look at this too if you have some time (as its a bit tricky code)? Thanks |
|
Adding an observation, the case where wrong results are filtered out is limited to V2 tables, where partition fields are removed (ie, see the initial description). In tests for any partition spec modification in V1 (in V1, fields are never removed), or V2 tables where partition fields are modified or added but not removed, we get a IndexOutOfBoundsException in the ManifestEvaluator if we try to query files written by different partition spec than current one. Still worth to fix, just making a note for the impact. |
|
I merged this one to unblock one local change I have. @RussellSpitzer @rdblue, if you could still review, it would be great. |
|
Thanks all, ill also look at merging this to the 0.13.x branch for the 0.13.2 release (as the metadata tables hierarchy is different in master). @rdblue Let me know if this is not the right branch. |
The change #2926 introduced partition predicate push down for the "files" metadata table. However, it may be incorrect in the case that the partition spec evolves and data written in both specs. The projectionFilter is constructed by projecting the expression on the current table's partition spec, and evaluating the values against the manifest file.
Say for example, we have a table that had a former partition spec that is "data", and a new partition spec that is "id", with manifests written with both specs.
A query like "select * from my_table.files where files.partition.id=1" projects successfully against the current partition spec "id" to have a ManifestEvaluator looking for value 1. It looks for this against all manifest-files, even those written with the old partition-spec "data". This erroneously filters those old manifests where data is 1, and correctly filters new manifests where id is 1, giving wrong results.
Release Note: Over-aggressive filtering of results occur only on V2 tables, where partition fields are removed as in the example and replaced by equiavalent ones, with data across both specs. In all other cases I have tested, IndexOutOfBoundException occurs (ie, in tables where partition fields are added or renamed, but not removed, or in V1, where partition fields are never removed but rather replaced by void transform). This should fix both cases.