Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions spark/v3.3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,6 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
}

tasks.withType(Test) {
// For vectorized reads
// Allow unsafe memory access to avoid the costly check arrow does to check if index is within bounds
systemProperty("arrow.enable_unsafe_memory_access", "true")
// Disable expensive null check for every get(index) call.
// Iceberg manages nullability checks itself instead of relying on arrow.
systemProperty("arrow.enable_null_check_for_get", "false")

// Vectorized reads need more memory
maxHeapSize '2560m'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,6 @@ public class VectorizedReadDictionaryEncodedFlatParquetDataBenchmark
public void setupBenchmark() {
setupSpark(true);
appendData();
// Allow unsafe memory access to avoid the costly check arrow does to check if index is within
// bounds
System.setProperty("arrow.enable_unsafe_memory_access", "true");
// Disable expensive null check for every get(index) call.
// Iceberg manages nullability checks itself instead of relying on arrow.
System.setProperty("arrow.enable_null_check_for_get", "false");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,6 @@ public class VectorizedReadFlatParquetDataBenchmark extends IcebergSourceBenchma
public void setupBenchmark() {
setupSpark();
appendData();
// Allow unsafe memory access to avoid the costly check arrow does to check if index is within
// bounds
System.setProperty("arrow.enable_unsafe_memory_access", "true");
// Disable expensive null check for every get(index) call.
// Iceberg manages nullability checks itself instead of relying on arrow.
System.setProperty("arrow.enable_null_check_for_get", "false");
}

@TearDown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.apache.arrow.vector.NullCheckingForGet;
import org.apache.iceberg.Schema;
import org.apache.iceberg.arrow.vectorized.VectorizedReaderBuilder;
import org.apache.iceberg.data.DeleteFilter;
Expand All @@ -29,16 +30,43 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.parquet.schema.MessageType;
import org.apache.spark.sql.catalyst.InternalRow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class VectorizedSparkParquetReaders {

private static final Logger LOG = LoggerFactory.getLogger(VectorizedSparkParquetReaders.class);
private static final String ENABLE_UNSAFE_MEMORY_ACCESS = "arrow.enable_unsafe_memory_access";
private static final String ENABLE_UNSAFE_MEMORY_ACCESS_ENV = "ARROW_ENABLE_UNSAFE_MEMORY_ACCESS";
private static final String ENABLE_NULL_CHECK_FOR_GET = "arrow.enable_null_check_for_get";
private static final String ENABLE_NULL_CHECK_FOR_GET_ENV = "ARROW_ENABLE_NULL_CHECK_FOR_GET";

static {
try {
enableUnsafeMemoryAccess();
disableNullCheckForGet();
} catch (Exception e) {
LOG.warn("Couldn't set Arrow properties, which may impact read performance", e);
}
}

private VectorizedSparkParquetReaders() {}

/**
* @deprecated will be removed in 1.3.0, use {@link #buildReader(Schema, MessageType, Map,
* DeleteFilter)} instead.
*/
@Deprecated
public static ColumnarBatchReader buildReader(
Schema expectedSchema, MessageType fileSchema, boolean setArrowValidityVector) {
return buildReader(expectedSchema, fileSchema, setArrowValidityVector, Maps.newHashMap());
}

/**
* @deprecated will be removed in 1.3.0, use {@link #buildReader(Schema, MessageType, Map,
* DeleteFilter)} instead.
*/
@Deprecated
public static ColumnarBatchReader buildReader(
Schema expectedSchema,
MessageType fileSchema,
Expand All @@ -56,6 +84,11 @@ public static ColumnarBatchReader buildReader(
ColumnarBatchReader::new));
}

/**
* @deprecated will be removed in 1.3.0, use {@link #buildReader(Schema, MessageType, Map,
* DeleteFilter)} instead.
*/
@Deprecated
public static ColumnarBatchReader buildReader(
Schema expectedSchema,
MessageType fileSchema,
Expand All @@ -75,6 +108,57 @@ public static ColumnarBatchReader buildReader(
deleteFilter));
}

public static ColumnarBatchReader buildReader(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is added to make sure NullCheckingForGet.NULL_CHECKING_ENABLED is referenced after we set system properties in this class. Otherwise, Arrow would memorize them earlier and our logic would have no effect. See BoundsChecking and NullCheckingForGet in Arrow for details.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We never had specific Iceberg configs for this behavior. We always relied on system properties for Arrow.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

In our internal Spark fork, we have the arrow config settings configured in the executor JVM args. I can confirm that things have been running fine with them in our prod env for the last 2+ years. So, +1.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's nice to hear, @samarthjain! All of our tests also set these properties so it seems fairly safe.

Schema expectedSchema,
MessageType fileSchema,
Map<Integer, ?> idToConstant,
DeleteFilter<InternalRow> deleteFilter) {
return (ColumnarBatchReader)
TypeWithSchemaVisitor.visit(
expectedSchema.asStruct(),
fileSchema,
new ReaderBuilder(
expectedSchema,
fileSchema,
NullCheckingForGet.NULL_CHECKING_ENABLED,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor point: Since we are going to read/write this system property everywhere, I guess there is no point to pass around this parameter. Always good to minimize the parameters. I'm OK to fix it or not though.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did that initially too but I had to change more places as it is used in tests. So I gave up and reverted.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it is going to change a lot of places. I'm OK with another PR or something.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I'll double check.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took another look. We should be able to get rid of this property once the deprecated methods are removed. We can then switch to setting the Arrow property in tests.

idToConstant,
ColumnarBatchReader::new,
deleteFilter));
}

// enables unsafe memory access to avoid costly checks to see if index is within bounds
// as long as it is not configured explicitly (see BoundsChecking in Arrow)
private static void enableUnsafeMemoryAccess() {
String value = confValue(ENABLE_UNSAFE_MEMORY_ACCESS, ENABLE_UNSAFE_MEMORY_ACCESS_ENV);
if (value == null) {
LOG.info("Enabling {}", ENABLE_UNSAFE_MEMORY_ACCESS);
System.setProperty(ENABLE_UNSAFE_MEMORY_ACCESS, "true");
} else {
LOG.info("Unsafe memory access was configured explicitly: {}", value);
}
}

// disables expensive null checks for every get call in favor of Iceberg nullability
// as long as it is not configured explicitly (see NullCheckingForGet in Arrow)
private static void disableNullCheckForGet() {
String value = confValue(ENABLE_NULL_CHECK_FOR_GET, ENABLE_NULL_CHECK_FOR_GET_ENV);
if (value == null) {
LOG.info("Disabling {}", ENABLE_NULL_CHECK_FOR_GET);
System.setProperty(ENABLE_NULL_CHECK_FOR_GET, "false");
} else {
LOG.info("Null checking for get calls was configured explicitly: {}", value);
}
}

private static String confValue(String propName, String envName) {
String propValue = System.getProperty(propName);
if (propValue != null) {
return propValue;
}

return System.getenv(envName);
}

private static class ReaderBuilder extends VectorizedReaderBuilder {
private final DeleteFilter<InternalRow> deleteFilter;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.util.Map;
import java.util.Set;
import org.apache.arrow.vector.NullCheckingForGet;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.ScanTask;
Expand Down Expand Up @@ -88,11 +87,7 @@ private CloseableIterable<ColumnarBatch> newParquetIterable(
.createBatchedReaderFunc(
fileSchema ->
VectorizedSparkParquetReaders.buildReader(
requiredSchema,
fileSchema, /* setArrowValidityVector */
NullCheckingForGet.NULL_CHECKING_ENABLED,
idToConstant,
deleteFilter))
requiredSchema, fileSchema, idToConstant, deleteFilter))
.recordsPerBatch(batchSize)
.filter(residual)
.caseSensitive(caseSensitive())
Expand Down