Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 18 additions & 21 deletions core/src/main/java/org/apache/iceberg/BaseFilesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@

package org.apache.iceberg;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import java.util.Map;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ManifestEvaluator;
import org.apache.iceberg.expressions.Projections;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types.StructType;
Expand Down Expand Up @@ -92,7 +94,16 @@ public TableScan appendsAfter(long fromSnapshotId) {
protected CloseableIterable<FileScanTask> planFiles(TableOperations ops, Snapshot snapshot, Expression rowFilter,
boolean ignoreResiduals, boolean caseSensitive,
boolean colStats) {
CloseableIterable<ManifestFile> filtered = filterManifests(manifests(), rowFilter, caseSensitive);
Map<Integer, PartitionSpec> specsById = table().specs();
Comment thread
aokolnychyi marked this conversation as resolved.

LoadingCache<Integer, ManifestEvaluator> evalCache = Caffeine.newBuilder().build(specId -> {
PartitionSpec spec = specsById.get(specId);
PartitionSpec transformedSpec = transformSpec(fileSchema, spec);
return ManifestEvaluator.forRowFilter(rowFilter, transformedSpec, caseSensitive);
});

CloseableIterable<ManifestFile> filteredManifests = CloseableIterable.filter(manifests(),
manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest));

String schemaString = SchemaParser.toJson(schema());
String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
Expand All @@ -103,28 +114,14 @@ protected CloseableIterable<FileScanTask> planFiles(TableOperations ops, Snapsho
// This data task needs to use the table schema, which may not include a partition schema to avoid having an
// empty struct in the schema for unpartitioned tables. Some engines, like Spark, can't handle empty structs in
// all cases.
return CloseableIterable.transform(filtered, manifest ->
new ManifestReadTask(ops.io(), ops.current().specsById(),
manifest, schema(), schemaString, specString, residuals));
return CloseableIterable.transform(filteredManifests, manifest ->
new ManifestReadTask(table(), manifest, schema(), schemaString, specString, residuals));
}

/**
* Returns an iterable of manifest files to explore for this Files metadata table scan
*/
protected abstract CloseableIterable<ManifestFile> manifests();

private CloseableIterable<ManifestFile> filterManifests(CloseableIterable<ManifestFile> manifests,
Expression rowFilter,
boolean caseSensitive) {
// use an inclusive projection to remove the partition name prefix and filter out any non-partition expressions
PartitionSpec spec = transformSpec(fileSchema, table().spec(), PARTITION_FIELD_PREFIX);
Expression partitionFilter = Projections.inclusive(spec, caseSensitive).project(rowFilter);

ManifestEvaluator manifestEval = ManifestEvaluator.forPartitionFilter(
partitionFilter, table().spec(), caseSensitive);

return CloseableIterable.filter(manifests, manifestEval::eval);
}
}

static class ManifestReadTask extends BaseFileScanTask implements DataTask {
Expand All @@ -133,11 +130,11 @@ static class ManifestReadTask extends BaseFileScanTask implements DataTask {
private final ManifestFile manifest;
private final Schema schema;

ManifestReadTask(FileIO io, Map<Integer, PartitionSpec> specsById, ManifestFile manifest,
ManifestReadTask(Table table, ManifestFile manifest,
Schema schema, String schemaString, String specString, ResidualEvaluator residuals) {
super(DataFiles.fromManifest(manifest), null, schemaString, specString, residuals);
this.io = io;
this.specsById = specsById;
this.io = table.io();
this.specsById = Maps.newHashMap(table.specs());
this.manifest = manifest;
this.schema = schema;
}
Expand Down
10 changes: 4 additions & 6 deletions core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
* deserialization.
*/
abstract class BaseMetadataTable implements Table, HasTableOperations, Serializable {
protected static final String PARTITION_FIELD_PREFIX = "partition.";
private final PartitionSpec spec = PartitionSpec.unpartitioned();
private final SortOrder sortOrder = SortOrder.unsorted();
private final TableOperations ops;
Expand All @@ -52,18 +51,17 @@ protected BaseMetadataTable(TableOperations ops, Table table, String name) {
* This method transforms the table's partition spec to a spec that is used to rewrite the user-provided filter
* expression against the given metadata table.
* <p>
* The resulting partition spec maps $partitionPrefix.X fields to partition X using an identity partition transform.
* The resulting partition spec maps partition.X fields to partition X using an identity partition transform.
* When this spec is used to project an expression for the given metadata table, the projection will remove
* predicates for non-partition fields (not in the spec) and will remove the "$partitionPrefix." prefix from fields.
* predicates for non-partition fields (not in the spec) and will remove the "partition." prefix from fields.
*
* @param metadataTableSchema schema of the metadata table
* @param spec spec on which the metadata table schema is based
* @param partitionPrefix prefix to remove from each field in the partition spec
* @return a spec used to rewrite the metadata table filters to partition filters using an inclusive projection
*/
static PartitionSpec transformSpec(Schema metadataTableSchema, PartitionSpec spec, String partitionPrefix) {
static PartitionSpec transformSpec(Schema metadataTableSchema, PartitionSpec spec) {
PartitionSpec.Builder identitySpecBuilder = PartitionSpec.builderFor(metadataTableSchema).checkConflicts(false);
spec.fields().forEach(pf -> identitySpecBuilder.identity(partitionPrefix + pf.name(), pf.name()));
spec.fields().forEach(pf -> identitySpecBuilder.add(pf.fieldId(), pf.name(), "identity"));
Copy link
Copy Markdown
Member Author

@szehon-ho szehon-ho Apr 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This uses the field id to do the transform from metadata table schema to table schema to catch partition field rename cases (old way was to manual remove 'partition.' from the field name). From the suggestion: #4520 (comment)

return identitySpecBuilder.build();
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/PartitionsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ static CloseableIterable<FileScanTask> planFiles(StaticTableScan scan) {

// use an inclusive projection to remove the partition name prefix and filter out any non-partition expressions
Expression partitionFilter = Projections
.inclusive(transformSpec(scan.schema(), table.spec(), PARTITION_FIELD_PREFIX), caseSensitive)
.inclusive(transformSpec(scan.schema(), table.spec()), caseSensitive)
.project(scan.filter());

ManifestGroup manifestGroup = new ManifestGroup(table.io(), snapshot.dataManifests(), snapshot.deleteManifests())
Expand Down
Loading