Adds a readVectored implementation to S3InputStream.#15581
Conversation
|
Still working on this PR and need to make a couple of additional changes as well as benchmark again, but could I please get an initial review? I would really like to get this merged in before the next release as having a |
|
I can concur with the claimed speedup; we've had s3a running with vectored reads for 3y now. |
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
| } | ||
|
|
||
| private ExecutorService executorService() { | ||
| private ExecutorService executorService(String name, int size) { |
There was a problem hiding this comment.
This change doesn't really work since it's a static initialization but you're also setting a name/size. Basically, the first one wins? This isn't really a workable update to handling the executor service.
There was a problem hiding this comment.
Also, the executor service was recently changed due accommodate credential refresh scheduling, so the service is now more generic, but currently based on the delete threads property. We can probably update this to be more generic along with a more general io threads property. Alternatively, we could have a separate static service explicitly for the S3InputStream, which might be preferable to overloading the FileIO service for multiple operations.
| public FileRange(CompletableFuture<ByteBuffer> byteBuffer, long offset, int length) | ||
| throws EOFException { | ||
| Preconditions.checkNotNull(byteBuffer, "byteBuffer can't be null"); | ||
| public FileRange(long offset, int length) { |
There was a problem hiding this comment.
I'm trying to understand why this is necessary. If we're coalescing ranges, we should probably be using something like Netty's CompositeByteBuf to wrap the file ranges and skipped ranges into a larger write buffer. This would allow us to read the composite ranges together but still populate the buffers directly.
Looking at how we're handling the ranges, the CompositeByteBuf approach probably doesn't make sense, but I have an alternate suggestion.
| * @param linkedRanges map to store linked ranges for each coalesced range | ||
| * @return a new list of coalesced ranges | ||
| */ | ||
| private List<FileRange> coalesce( |
There was a problem hiding this comment.
I think we can do this better by creating a wrapper object for the original FileRanges without impacting the existing public api. Rather than creating new FileRanges and inserting them, just create a inner wrapper class (e.g. CoalescedFileRanges) that contains the ordered list of file ranges that should be combined. Then we can dispatch the read requests for the total combined range and populate internal buffers.
There was a problem hiding this comment.
yes agree on this as we did something similar in Hadoop S3A implementation.
|
@ahmarsuhail this looks good in concept, but we need to figure out a better path for the executor service handling and ideally not change the |
| * @param linkedRanges map to store linked ranges for each coalesced range | ||
| * @return a new list of coalesced ranges | ||
| */ | ||
| private List<FileRange> coalesce( |
There was a problem hiding this comment.
yes agree on this as we did something similar in Hadoop S3A implementation.
| Preconditions.checkNotNull(byteBuffer, "byteBuffer can't be null"); | ||
| public FileRange(long offset, int length) { | ||
| Preconditions.checkArgument( | ||
| length() >= 0, "Invalid length: %s in range (must be >= 0)", length); |
There was a problem hiding this comment.
we should use the the input length and offset in the preconditions. someone found this a bug #15926
This is a copy of #14352
Adds a
readVectored()implementation to S3InputStream which ensures that parquet column chunks are read in parallel.ReadVectored implementations increase TPC-DS benchmark performance by ~10-20%, as without this column chunks are read sequentially leading to significant IO stalls.