core: Adding read vector to range readable interface and adding mappe…#13997
Conversation
|
|
||
| private static List<ParquetObjectRange> convertRanges(List<ParquetFileRange> ranges) { | ||
| return ranges.stream() | ||
| .map( |
There was a problem hiding this comment.
this just maps between the internal parquet hadoop range and the new iceberg one.
6cc3884 to
6e19605
Compare
48d647d to
fa97848
Compare
…r to parquet stream.
fa97848 to
8f05471
Compare
| * this class is written by @mukundthakur, and taken from | ||
| * /hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java (thank you!). | ||
| */ | ||
| public final class VectoredReadUtils { |
There was a problem hiding this comment.
I don't feel like we need this class. There are three things this does, but it should probalby be just one. The validateRangeRequest should just be handled in the constructor of the FileRange (we currently don't have any validation there). The sortRangeList is a subset of validateAndSortRanges which seems duplicative.
I'd suggest moving validateAndSort to the RangeReadable interface as a static utility that can be used by implementors and avoid creating this util class.
| private final long offset; | ||
| private final int length; | ||
|
|
||
| public FileRange(CompletableFuture<ByteBuffer> byteBuffer, long offset, int length) { |
There was a problem hiding this comment.
Looking at the parquet implementation, I don't think you can pass the byteBuffer future in like this. I believe this is intended to be set by the implementation so that it can be returned to the invoker.
There was a problem hiding this comment.
We not passing the bytebuffer in here right, we passing a future that completes with a byte buffer, we need a way to map the futures in Iceberg to the future's we are setting in Parquet,
So when we call parquetFileRange.setDataReadFuture(future); we need to have a way of tracking that future in Iceberg and that's what this gives us.
cdbffc4 to
65ce5a7
Compare
| } | ||
|
|
||
| @Override | ||
| public void readVectored(List<ParquetFileRange> ranges, ByteBufferAllocator allocate) |
There was a problem hiding this comment.
Can we add some tests at the ParquetIO level to validate this? I know we're adding some in S3FileIO, but it would be good to have this interface tested (even if there's a mock implementation)
There was a problem hiding this comment.
I added in testRangeReadableAdapterReadVectored which does something similar to the tests in S3FileIO, but focused a bit more on checking that the buffers/ranges are being used correctly, I skipped the other operations but can add them in if we want. Let me know
| optionsBuilder.withDecryption(fileDecryptionProperties); | ||
| } | ||
|
|
||
| optionsBuilder.withUseHadoopVectoredIo(true); |
There was a problem hiding this comment.
There were some efforts to allow Iceberg working without Hadoop on the classpath.
I'm not sure how far away these efforts went, and also not sure how this change will effect that effort.
Could you please help me understand the consequences of always using withUseHadoopVectoredIo?
Thanks,
Peter
There was a problem hiding this comment.
For part 1 about the effort to reduce the dependencies on Hadoop I don't think that was ever completed I do see a TODO comment about wanting to do it. I am probably making the effort more complicated as I am adding 2 new imports from Hadoop but I don't think that is a big risk.
for 2) withUseHadoopVectoredIo is used in the file reader in conjunction with readVectoredAvailable() so moving to always using readVector doesn't change anything unless the stream also supports readVectored.
https://github.com/apache/parquet-java/blob/f50dd6cb4b526cf4b585993c1b69a838cd8151f3/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L1303
There was a problem hiding this comment.
I think the naming of this option is a little misleading. The withUseHadoopVectoredIo doesn't necessarily depend on hadoop as @stubz151 mentions, but rather enables the vectored io behavior in Parquet.
861cdf3 to
c6e04ea
Compare
d35250f to
278a0fb
Compare
pvary
left a comment
There was a problem hiding this comment.
It's ok from my side, but I would like to ask someone else to take a look as well.
278a0fb to
bedfe8a
Compare
bedfe8a to
0aa316b
Compare
0aa316b to
180613d
Compare
apache#13997) Core: Adding read vector to range readable interface and adding mapper to parquet stream.
apache#13997) Core: Adding read vector to range readable interface and adding mapper to parquet stream.
apache#13997) Core: Adding read vector to range readable interface and adding mapper to parquet stream.
apache#13997) Core: Adding read vector to range readable interface and adding mapper to parquet stream.
What Am I doing
Adding a read vector implementation to the range readable interface. To do this I'm adding the methods to check if it's enabled and proving an interface which one can implement.
#13254
Changes
testing
Tested with the AAL implementation and it is passed correctly with flag
--conf "spark.sql.iceberg.read.vector.enabled=true" \
Notes
Kept the AAL changes seperate to not bloat this PR but can include them we want to see a functional implementation of this.