Skip to content

Adds a readVectored implementation to S3InputStream.#15581

Open
ahmarsuhail wants to merge 2 commits into
apache:mainfrom
ahmarsuhail:read-vectored
Open

Adds a readVectored implementation to S3InputStream.#15581
ahmarsuhail wants to merge 2 commits into
apache:mainfrom
ahmarsuhail:read-vectored

Conversation

@ahmarsuhail
Copy link
Copy Markdown

This is a copy of #14352

Adds a readVectored() implementation to S3InputStream which ensures that parquet column chunks are read in parallel.

ReadVectored implementations increase TPC-DS benchmark performance by ~10-20%, as without this column chunks are read sequentially leading to significant IO stalls.

@ahmarsuhail
Copy link
Copy Markdown
Author

@danielcweeks @pvary @geruh

Still working on this PR and need to make a couple of additional changes as well as benchmark again, but could I please get an initial review? I would really like to get this merged in before the next release as having a readVectored() implementation makes a significant difference to performance, thank you!

@steveloughran
Copy link
Copy Markdown
Contributor

I can concur with the claimed speedup; we've had s3a running with vectored reads for 3y now.

@github-actions
Copy link
Copy Markdown

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.

}

private ExecutorService executorService() {
private ExecutorService executorService(String name, int size) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change doesn't really work since it's a static initialization but you're also setting a name/size. Basically, the first one wins? This isn't really a workable update to handling the executor service.

Copy link
Copy Markdown
Contributor

@danielcweeks danielcweeks Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the executor service was recently changed due accommodate credential refresh scheduling, so the service is now more generic, but currently based on the delete threads property. We can probably update this to be more generic along with a more general io threads property. Alternatively, we could have a separate static service explicitly for the S3InputStream, which might be preferable to overloading the FileIO service for multiple operations.

public FileRange(CompletableFuture<ByteBuffer> byteBuffer, long offset, int length)
throws EOFException {
Preconditions.checkNotNull(byteBuffer, "byteBuffer can't be null");
public FileRange(long offset, int length) {
Copy link
Copy Markdown
Contributor

@danielcweeks danielcweeks Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to understand why this is necessary. If we're coalescing ranges, we should probably be using something like Netty's CompositeByteBuf to wrap the file ranges and skipped ranges into a larger write buffer. This would allow us to read the composite ranges together but still populate the buffers directly.

Looking at how we're handling the ranges, the CompositeByteBuf approach probably doesn't make sense, but I have an alternate suggestion.

* @param linkedRanges map to store linked ranges for each coalesced range
* @return a new list of coalesced ranges
*/
private List<FileRange> coalesce(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can do this better by creating a wrapper object for the original FileRanges without impacting the existing public api. Rather than creating new FileRanges and inserting them, just create a inner wrapper class (e.g. CoalescedFileRanges) that contains the ordered list of file ranges that should be combined. Then we can dispatch the read requests for the total combined range and populate internal buffers.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes agree on this as we did something similar in Hadoop S3A implementation.

@danielcweeks
Copy link
Copy Markdown
Contributor

@ahmarsuhail this looks good in concept, but we need to figure out a better path for the executor service handling and ideally not change the FileRange api.

* @param linkedRanges map to store linked ranges for each coalesced range
* @return a new list of coalesced ranges
*/
private List<FileRange> coalesce(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes agree on this as we did something similar in Hadoop S3A implementation.

Preconditions.checkNotNull(byteBuffer, "byteBuffer can't be null");
public FileRange(long offset, int length) {
Preconditions.checkArgument(
length() >= 0, "Invalid length: %s in range (must be >= 0)", length);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should use the the input length and offset in the preconditions. someone found this a bug #15926

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants