Core: Add position deletes metadata table#6365
Conversation
f1ff895 to
a99a078
Compare
| static PartitionSpec transformSpec(Schema metadataTableSchema, PartitionSpec spec) { | ||
| PartitionSpec.Builder identitySpecBuilder = | ||
| PartitionSpec.builderFor(metadataTableSchema).checkConflicts(false); | ||
| PartitionSpec.builderFor(metadataTableSchema) |
There was a problem hiding this comment.
Before this change, predicate pushdown would make the PositionDeletes scan tasks have wrong partition field id, spec id, so they will not work in the DeleteFile read.
Though it only happens in corner cases like dropped partition fields (where the auto-generated field-ids are not correct anymore). Added a test for this in TestMetadataTableScansWithPartitionEvolution
| SPEC_ID.fieldId(), | ||
| PARTITION_COLUMN_ID); | ||
| PARTITION_COLUMN_ID, | ||
| POSITION_DELETE_TABLE_PARTITION_FIELD_ID, |
There was a problem hiding this comment.
Why do we need a Position_Delete version of Spec_ID and File_Path, shouldn't we be able to use the original metadata columns for these?
There was a problem hiding this comment.
Yea its true, we could do that. I think I was going back and forth whether we wanted to have that or not, as these are 'main' columns with a proper name, versus hidden columns (whose name start with _), in other words they are not the exact same column. Im open
There was a problem hiding this comment.
Do we even have to make them metadata columns then? I thought they would be just regular columns in a table. I don't think they should be added to META_COLUMNS. I think metadata columns should be only about columns we can project on demand. That's why we did not add changelog columns here.
Let me also think about reserving field IDs for them. It is similar yet different use case compared to changelog columns as there is no changelog table as such.
There was a problem hiding this comment.
Removed from list of metadata columns. It was from an earlier change where I was trying to re-use existing Spark RowReader and it checked that projected column must either be part of schema, or metadata column.
Kept the field ids in this file to reserve them, to avoid conflict with "row" struct of this table, which has the data table schema.
| } | ||
|
|
||
| @Test | ||
| public void testBasicSplitPlanningDeleteFiles() { |
There was a problem hiding this comment.
Do we store split offsets of Delete files? If so should we be checking the splitting on those boundaries?
There was a problem hiding this comment.
I think not but could be wrong, I dont see it being set in DeleteFile builder: https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/FileMetadata.java#L38
There was a problem hiding this comment.
Hum, I guess we have decided they will be smaller than data files so I guess it doesn't matter.
There was a problem hiding this comment.
I don't see a reason why we wouldn't store that. I think it was overlooked.
@szehon-ho, can we do that in a follow-up PR? Not urgent, at least we should create an issue.
RussellSpitzer
left a comment
There was a problem hiding this comment.
I think this is really close, I have a few remaining questions but just minor issues
|
I'll have time to take a look tomorrow too. |
| SPEC_ID.fieldId(), | ||
| PARTITION_COLUMN_ID); | ||
| PARTITION_COLUMN_ID, | ||
| POSITION_DELETE_TABLE_PARTITION_FIELD_ID, |
There was a problem hiding this comment.
Do we even have to make them metadata columns then? I thought they would be just regular columns in a table. I don't think they should be added to META_COLUMNS. I think metadata columns should be only about columns we can project on demand. That's why we did not add changelog columns here.
Let me also think about reserving field IDs for them. It is similar yet different use case compared to changelog columns as there is no changelog table as such.
|
|
||
| public static class PositionDeletesTableScan | ||
| extends AbstractTableScan< | ||
| BatchScan, org.apache.iceberg.ScanTask, ScanTaskGroup<org.apache.iceberg.ScanTask>> |
There was a problem hiding this comment.
Can we give ScanTask in this class a more specific name and then avoid this qualified import?
| this.table = table; | ||
| } | ||
|
|
||
| protected Table table() { |
There was a problem hiding this comment.
Removed this, as I inherit now from BaseMetadataTable.
f2b5ff1 to
fb6faab
Compare
| PartitionSpec spec = transformedSpecs.get(entry.file().specId()); | ||
| String specString = PartitionSpecParser.toJson(spec); | ||
| return new PositionDeleteScanTask( | ||
| entry.file().copy(), |
There was a problem hiding this comment.
I spent some time on this. But if we iterate through the tasks without making a copy here, it corrupts the older tasks already iterated through (setting them all their file values to the latest tasks's!). See TestMetadataTableScans::testPositionDeletesUnpartitioned and run without this copy.
It looks like it's because the underlying AvroIterable reuses containers. So here I add copy() to avoid this problem. Maybe I can use copyWithoutStats()?
There was a problem hiding this comment.
That would be the correct thing to do here, if you see ManifestEntries.java that's how it works because you need the full list of tasks not just an iterator of them
There was a problem hiding this comment.
To be clear, you need copyWithoutStats here since you aren't using the metrics past this point. Just to save a bit of memory
There was a problem hiding this comment.
I think your other option is to just not return a parallel itterable and rely on callers to know they need to copy
There was a problem hiding this comment.
Thanks, yea it was something in the ParallelIterable that messed it up. Anyway I changed to copyWithoutStats, it'll make it easier for the caller. Definitely stumped on this for half a day, glad I added the extra test though and saw this.
aokolnychyi
left a comment
There was a problem hiding this comment.
I did another round. I'll need to check transformSpec with fresh eyes. Getting close, though. Thanks, @szehon-ho!
| Types.LongType.get(), | ||
| "Commit snapshot ID"); | ||
|
|
||
| public static final int POSITION_DELETE_TABLE_PARTITION_FIELD_ID = Integer.MAX_VALUE - 107; |
There was a problem hiding this comment.
If I understand correctly, the table schema will include these 3 columns in addition to columns in delete files. It is not bad to reserve some IDs but have we thought about keeping the table schema limited to the content of delete files and supporting already existing _spec_id, _partition, _file metadata columns? Values for metadata columns will be only projected on demand, just like we can do that for regular tables.
It seems cleaner to me and shouldn't be hard to do since we will have a dedicated reader.
There was a problem hiding this comment.
Thinking one this a little bit, one problem is, this table is partitioned by "partition", and if I remember correctly there's some complication in partitioning by a hidden metadata column. So ended up making all of them actual columns..
| m -> { | ||
| // Filter partitions | ||
| CloseableIterable<ManifestEntry<DeleteFile>> deleteFileEntries = | ||
| ManifestFiles.readDeleteManifest(m, tableOps().io(), transformedSpecs) |
There was a problem hiding this comment.
Do we need to pass a projection while reading delete manifests?
There was a problem hiding this comment.
I assume you mean, with/without stats? Fixed. Took me a little bit to figure it out, I needed to add a DELETE_SCAN_COLUMNS here that has content, versus the base scan's SCAN_COLUMNS which does not. As I have a filter on position_delete file content.
| protected static final List<String> DELETE_SCAN_COLUMNS = | ||
| ImmutableList.of( | ||
| "snapshot_id", | ||
| "content", |
There was a problem hiding this comment.
This has to differ from data file's SCAN_COLUMNS by including content, which is used in the scan to filter later.
aeabee3 to
e053f26
Compare
|
Rebased and squash commits |
62e620f to
e2c03aa
Compare
3623d15 to
c326236
Compare
b84e94c to
93d07ef
Compare
| private MetadataColumns() {} | ||
|
|
||
| // IDs Integer.MAX_VALUE - (1-100) are used for metadata columns | ||
| public static final int FILE_PATH_COLUMN_ID = Integer.MAX_VALUE - 1; |
| Types.NestedField.optional( | ||
| MetadataColumns.DELETE_FILE_ROW_FIELD_ID, | ||
| "row", | ||
| table().schema().asStruct(), |
There was a problem hiding this comment.
Once we add support to engines, we will have to test schema evolution.
There was a problem hiding this comment.
Yea, good point, will need to check this.
29bd430 to
b4e641a
Compare
|
|
||
| List<ScanTask> tasks = Lists.newArrayList(scan.planFiles()); | ||
|
|
||
| Assert.assertEquals( |
There was a problem hiding this comment.
Added new tests for ScanMetrics. These cover number of manifests skipped and read. Somejhow the existing ManifestReader code we invoke does not update metrics about number of deleteFiles skipped and read (at least in the code path we use): https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/ManifestReader.java#L228
aokolnychyi
left a comment
There was a problem hiding this comment.
LGTM. Great work, @szehon-ho! I left some optional comments. Feel free to merge whenever you are ready.
| } | ||
|
|
||
| @Test | ||
| public void testBasicSplitPlanningDeleteFiles() { |
There was a problem hiding this comment.
I don't see a reason why we wouldn't store that. I think it was overlooked.
@szehon-ho, can we do that in a follow-up PR? Not urgent, at least we should create an issue.
ebf59ba to
767474a
Compare
|
Thanks, @szehon-ho! Thanks for reviewing, @RussellSpitzer! |
This breaks up the pr #4812 , and is just the part to add the table PositionDeletesTable.
It is now based on @aokolnychyi 's newly-added BatchScan interface (#5922), added for this purpose so the scan is free to not return FileScanTask. It returns a custom ScanTask that scan DeleteFiles rather than DataFiles.