-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Spark 3.3: Automatically set Arrow properties for read performance #6550
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,7 @@ | |
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.function.Function; | ||
| import org.apache.arrow.vector.NullCheckingForGet; | ||
| import org.apache.iceberg.Schema; | ||
| import org.apache.iceberg.arrow.vectorized.VectorizedReaderBuilder; | ||
| import org.apache.iceberg.data.DeleteFilter; | ||
|
|
@@ -29,16 +30,43 @@ | |
| import org.apache.iceberg.relocated.com.google.common.collect.Maps; | ||
| import org.apache.parquet.schema.MessageType; | ||
| import org.apache.spark.sql.catalyst.InternalRow; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| public class VectorizedSparkParquetReaders { | ||
|
|
||
| private static final Logger LOG = LoggerFactory.getLogger(VectorizedSparkParquetReaders.class); | ||
| private static final String ENABLE_UNSAFE_MEMORY_ACCESS = "arrow.enable_unsafe_memory_access"; | ||
| private static final String ENABLE_UNSAFE_MEMORY_ACCESS_ENV = "ARROW_ENABLE_UNSAFE_MEMORY_ACCESS"; | ||
| private static final String ENABLE_NULL_CHECK_FOR_GET = "arrow.enable_null_check_for_get"; | ||
| private static final String ENABLE_NULL_CHECK_FOR_GET_ENV = "ARROW_ENABLE_NULL_CHECK_FOR_GET"; | ||
|
|
||
| static { | ||
| try { | ||
| enableUnsafeMemoryAccess(); | ||
| disableNullCheckForGet(); | ||
| } catch (Exception e) { | ||
| LOG.warn("Couldn't set Arrow properties, which may impact read performance", e); | ||
| } | ||
| } | ||
|
|
||
| private VectorizedSparkParquetReaders() {} | ||
|
|
||
| /** | ||
| * @deprecated will be removed in 1.3.0, use {@link #buildReader(Schema, MessageType, Map, | ||
| * DeleteFilter)} instead. | ||
| */ | ||
| @Deprecated | ||
| public static ColumnarBatchReader buildReader( | ||
| Schema expectedSchema, MessageType fileSchema, boolean setArrowValidityVector) { | ||
| return buildReader(expectedSchema, fileSchema, setArrowValidityVector, Maps.newHashMap()); | ||
| } | ||
|
|
||
| /** | ||
| * @deprecated will be removed in 1.3.0, use {@link #buildReader(Schema, MessageType, Map, | ||
| * DeleteFilter)} instead. | ||
| */ | ||
| @Deprecated | ||
| public static ColumnarBatchReader buildReader( | ||
| Schema expectedSchema, | ||
| MessageType fileSchema, | ||
|
|
@@ -56,6 +84,11 @@ public static ColumnarBatchReader buildReader( | |
| ColumnarBatchReader::new)); | ||
| } | ||
|
|
||
| /** | ||
| * @deprecated will be removed in 1.3.0, use {@link #buildReader(Schema, MessageType, Map, | ||
| * DeleteFilter)} instead. | ||
| */ | ||
| @Deprecated | ||
| public static ColumnarBatchReader buildReader( | ||
| Schema expectedSchema, | ||
| MessageType fileSchema, | ||
|
|
@@ -75,6 +108,57 @@ public static ColumnarBatchReader buildReader( | |
| deleteFilter)); | ||
| } | ||
|
|
||
| public static ColumnarBatchReader buildReader( | ||
| Schema expectedSchema, | ||
| MessageType fileSchema, | ||
| Map<Integer, ?> idToConstant, | ||
| DeleteFilter<InternalRow> deleteFilter) { | ||
| return (ColumnarBatchReader) | ||
| TypeWithSchemaVisitor.visit( | ||
| expectedSchema.asStruct(), | ||
| fileSchema, | ||
| new ReaderBuilder( | ||
| expectedSchema, | ||
| fileSchema, | ||
| NullCheckingForGet.NULL_CHECKING_ENABLED, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor point: Since we are going to read/write this system property everywhere, I guess there is no point to pass around this parameter. Always good to minimize the parameters. I'm OK to fix it or not though.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I did that initially too but I had to change more places as it is used in tests. So I gave up and reverted.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah, it is going to change a lot of places. I'm OK with another PR or something.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed. I'll double check.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I took another look. We should be able to get rid of this property once the deprecated methods are removed. We can then switch to setting the Arrow property in tests. |
||
| idToConstant, | ||
| ColumnarBatchReader::new, | ||
| deleteFilter)); | ||
| } | ||
|
|
||
| // enables unsafe memory access to avoid costly checks to see if index is within bounds | ||
| // as long as it is not configured explicitly (see BoundsChecking in Arrow) | ||
| private static void enableUnsafeMemoryAccess() { | ||
| String value = confValue(ENABLE_UNSAFE_MEMORY_ACCESS, ENABLE_UNSAFE_MEMORY_ACCESS_ENV); | ||
| if (value == null) { | ||
| LOG.info("Enabling {}", ENABLE_UNSAFE_MEMORY_ACCESS); | ||
| System.setProperty(ENABLE_UNSAFE_MEMORY_ACCESS, "true"); | ||
| } else { | ||
| LOG.info("Unsafe memory access was configured explicitly: {}", value); | ||
| } | ||
| } | ||
|
|
||
| // disables expensive null checks for every get call in favor of Iceberg nullability | ||
| // as long as it is not configured explicitly (see NullCheckingForGet in Arrow) | ||
| private static void disableNullCheckForGet() { | ||
| String value = confValue(ENABLE_NULL_CHECK_FOR_GET, ENABLE_NULL_CHECK_FOR_GET_ENV); | ||
| if (value == null) { | ||
| LOG.info("Disabling {}", ENABLE_NULL_CHECK_FOR_GET); | ||
| System.setProperty(ENABLE_NULL_CHECK_FOR_GET, "false"); | ||
| } else { | ||
| LOG.info("Null checking for get calls was configured explicitly: {}", value); | ||
| } | ||
| } | ||
|
|
||
| private static String confValue(String propName, String envName) { | ||
| String propValue = System.getProperty(propName); | ||
| if (propValue != null) { | ||
| return propValue; | ||
| } | ||
|
|
||
| return System.getenv(envName); | ||
| } | ||
|
|
||
| private static class ReaderBuilder extends VectorizedReaderBuilder { | ||
| private final DeleteFilter<InternalRow> deleteFilter; | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is added to make sure
NullCheckingForGet.NULL_CHECKING_ENABLEDis referenced after we set system properties in this class. Otherwise, Arrow would memorize them earlier and our logic would have no effect. SeeBoundsCheckingandNullCheckingForGetin Arrow for details.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We never had specific Iceberg configs for this behavior. We always relied on system properties for Arrow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
In our internal Spark fork, we have the arrow config settings configured in the executor JVM args. I can confirm that things have been running fine with them in our prod env for the last 2+ years. So, +1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's nice to hear, @samarthjain! All of our tests also set these properties so it seems fairly safe.