Skip to content

Commit 3ecf1ca

Browse files
authored
GH-48334: [C++][Parquet] Support reading encrypted bloom filters (#49334)
### Rationale for this change Reading bloom filters from encrypted Parquet files previously raised an exception. This change implements encrypted bloom filter deserialization by decrypting the Thrift header (module id 8) and bitset (module id 9) separately, and adds the necessary validation and tests. ### What changes are included in this PR? - Wire metadata decryptor creation into the bloom filter reader - Add BlockSplitBloomFilter::DeserializeEncrypted(...) for encrypted bloom filters - Remove the fuzzer workaround that swallowed encrypted bloom filter exceptions ### Are these changes tested? Yes. ### Are there any user-facing changes? Yes. * GitHub Issue: #48334 Authored-by: fenfeng9 <[email protected]> Signed-off-by: Gang Wu <[email protected]>
1 parent 2a526c1 commit 3ecf1ca

File tree

7 files changed

+336
-50
lines changed

7 files changed

+336
-50
lines changed

cpp/src/parquet/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,7 @@ add_parquet_test(arrow-metadata-test SOURCES arrow/arrow_metadata_test.cc
416416
if(PARQUET_REQUIRE_ENCRYPTION)
417417
add_parquet_test(encryption-test
418418
SOURCES
419+
encryption/bloom_filter_encryption_test.cc
419420
encryption/encryption_internal_test.cc
420421
encryption/write_configurations_test.cc
421422
encryption/read_configurations_test.cc

cpp/src/parquet/arrow/fuzz_internal.cc

Lines changed: 23 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,23 @@ Status FuzzReadPageIndex(RowGroupPageIndexReader* reader, const SchemaDescriptor
155155
return st;
156156
}
157157

158+
Status FuzzReadBloomFilter(RowGroupBloomFilterReader* reader, int column,
159+
std::uniform_int_distribution<uint64_t>& hash_dist,
160+
std::default_random_engine& rng) {
161+
Status st;
162+
BEGIN_PARQUET_CATCH_EXCEPTIONS
163+
std::unique_ptr<BloomFilter> bloom;
164+
bloom = reader->GetColumnBloomFilter(column);
165+
// If the column has a bloom filter, find a bunch of random hashes
166+
if (bloom != nullptr) {
167+
for (int k = 0; k < 100; ++k) {
168+
bloom->FindHash(hash_dist(rng));
169+
}
170+
}
171+
END_PARQUET_CATCH_EXCEPTIONS
172+
return st;
173+
}
174+
158175
ReaderProperties MakeFuzzReaderProperties(MemoryPool* pool) {
159176
FileDecryptionProperties::Builder builder;
160177
builder.key_retriever(MakeKeyRetriever());
@@ -207,31 +224,12 @@ Status FuzzReader(const uint8_t* data, int64_t size) {
207224
}
208225
{
209226
// Read and decode bloom filters
210-
try {
211-
auto& bloom_reader = pq_file_reader->GetBloomFilterReader();
212-
std::uniform_int_distribution<uint64_t> hash_dist;
213-
for (int i = 0; i < num_row_groups; ++i) {
214-
auto bloom_rg = bloom_reader.RowGroup(i);
215-
for (int j = 0; j < num_columns; ++j) {
216-
std::unique_ptr<BloomFilter> bloom;
217-
bloom = bloom_rg->GetColumnBloomFilter(j);
218-
// If the column has a bloom filter, find a bunch of random hashes
219-
if (bloom != nullptr) {
220-
for (int k = 0; k < 100; ++k) {
221-
bloom->FindHash(hash_dist(rng));
222-
}
223-
}
224-
}
225-
}
226-
} catch (const ParquetException& exc) {
227-
// XXX we just want to ignore encrypted bloom filters and validate the
228-
// rest of the file; there is no better way of doing this until GH-46597
229-
// is done.
230-
// (also see GH-48334 for reading encrypted bloom filters)
231-
if (std::string_view(exc.what())
232-
.find("BloomFilter decryption is not yet supported") ==
233-
std::string_view::npos) {
234-
throw;
227+
auto& bloom_reader = pq_file_reader->GetBloomFilterReader();
228+
std::uniform_int_distribution<uint64_t> hash_dist;
229+
for (int i = 0; i < num_row_groups; ++i) {
230+
auto bloom_rg = bloom_reader.RowGroup(i);
231+
for (int j = 0; j < num_columns; ++j) {
232+
st &= FuzzReadBloomFilter(bloom_rg.get(), j, hash_dist, rng);
235233
}
236234
}
237235
}

cpp/src/parquet/bloom_filter.cc

Lines changed: 154 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,52 @@
1717

1818
#include <cstdint>
1919
#include <cstring>
20+
#include <limits>
2021
#include <memory>
2122

23+
#include "arrow/io/memory.h"
2224
#include "arrow/result.h"
2325
#include "arrow/util/logging_internal.h"
2426
#include "arrow/util/macros.h"
2527

2628
#include "generated/parquet_types.h"
2729

2830
#include "parquet/bloom_filter.h"
31+
#include "parquet/encryption/encryption_internal.h"
32+
#include "parquet/encryption/internal_file_decryptor.h"
2933
#include "parquet/exception.h"
3034
#include "parquet/thrift_internal.h"
3135
#include "parquet/xxhasher.h"
3236

3337
namespace parquet {
38+
namespace {
39+
40+
constexpr int32_t kCiphertextLengthSize = 4;
41+
42+
/// Parse the 4-byte little-endian length prefix and return the total ciphertext size,
43+
/// including the 4-byte length field itself.
44+
int64_t ParseCiphertextTotalLength(const uint8_t* data, int64_t length) {
45+
if (length < kCiphertextLengthSize) {
46+
throw ParquetException("Ciphertext length buffer is too small");
47+
}
48+
uint32_t buffer_size =
49+
(static_cast<uint32_t>(data[3]) << 24) | (static_cast<uint32_t>(data[2]) << 16) |
50+
(static_cast<uint32_t>(data[1]) << 8) | (static_cast<uint32_t>(data[0]));
51+
return static_cast<int64_t>(buffer_size) + kCiphertextLengthSize;
52+
}
53+
54+
void CheckBloomFilterShortRead(int64_t expected, int64_t actual,
55+
std::string_view context) {
56+
if (ARROW_PREDICT_FALSE(actual < expected)) {
57+
std::stringstream ss;
58+
ss << context << " read failed: expected ";
59+
ss << expected << " bytes, got " << actual;
60+
throw ParquetException(ss.str());
61+
}
62+
}
63+
64+
} // namespace
65+
3466
constexpr uint32_t BlockSplitBloomFilter::SALT[kBitsSetPerBlock];
3567

3668
BlockSplitBloomFilter::BlockSplitBloomFilter(::arrow::MemoryPool* pool)
@@ -75,10 +107,11 @@ void BlockSplitBloomFilter::Init(const uint8_t* bitset, uint32_t num_bytes) {
75107
this->hasher_ = std::make_unique<XxHasher>();
76108
}
77109

78-
static constexpr uint32_t kBloomFilterHeaderSizeGuess = 256;
110+
namespace {
111+
112+
constexpr uint32_t kBloomFilterHeaderSizeGuess = 256;
79113

80-
static ::arrow::Status ValidateBloomFilterHeader(
81-
const format::BloomFilterHeader& header) {
114+
::arrow::Status ValidateBloomFilterHeader(const format::BloomFilterHeader& header) {
82115
if (!header.algorithm.__isset.BLOCK) {
83116
return ::arrow::Status::Invalid(
84117
"Unsupported Bloom filter algorithm: ", header.algorithm, ".");
@@ -104,6 +137,122 @@ static ::arrow::Status ValidateBloomFilterHeader(
104137
return ::arrow::Status::OK();
105138
}
106139

140+
BlockSplitBloomFilter DeserializeEncryptedFromStream(
141+
const ReaderProperties& properties, ArrowInputStream* input,
142+
std::optional<int64_t> bloom_filter_length, Decryptor* decryptor,
143+
int16_t row_group_ordinal, int16_t column_ordinal) {
144+
ThriftDeserializer deserializer(properties);
145+
format::BloomFilterHeader header;
146+
147+
// Read the length-prefixed ciphertext for the header.
148+
PARQUET_ASSIGN_OR_THROW(auto length_buf, input->Read(kCiphertextLengthSize));
149+
CheckBloomFilterShortRead(kCiphertextLengthSize, length_buf->size(),
150+
"Bloom filter header length");
151+
152+
const int64_t header_cipher_total_len =
153+
ParseCiphertextTotalLength(length_buf->data(), length_buf->size());
154+
if (ARROW_PREDICT_FALSE(header_cipher_total_len >
155+
std::numeric_limits<int32_t>::max())) {
156+
throw ParquetException("Bloom filter header ciphertext length overflows int32");
157+
}
158+
if (bloom_filter_length && header_cipher_total_len > *bloom_filter_length) {
159+
throw ParquetException(
160+
"Bloom filter length less than encrypted bloom filter header length");
161+
}
162+
163+
// Read the full header ciphertext and decrypt the Thrift header.
164+
auto header_cipher_buf =
165+
AllocateBuffer(properties.memory_pool(), header_cipher_total_len);
166+
std::memcpy(header_cipher_buf->mutable_data(), length_buf->data(),
167+
kCiphertextLengthSize);
168+
const int64_t header_cipher_remaining = header_cipher_total_len - kCiphertextLengthSize;
169+
PARQUET_ASSIGN_OR_THROW(auto read_size, input->Read(header_cipher_remaining,
170+
header_cipher_buf->mutable_data() +
171+
kCiphertextLengthSize));
172+
CheckBloomFilterShortRead(header_cipher_remaining, read_size, "Bloom filter header");
173+
174+
// Bloom filter header and bitset are separate encrypted modules with different AADs.
175+
UpdateDecryptor(decryptor, row_group_ordinal, column_ordinal,
176+
encryption::kBloomFilterHeader);
177+
auto header_cipher_len = static_cast<uint32_t>(header_cipher_total_len);
178+
try {
179+
deserializer.DeserializeMessage(header_cipher_buf->data(), &header_cipher_len,
180+
&header, decryptor);
181+
} catch (std::exception& e) {
182+
std::stringstream ss;
183+
ss << "Deserializing bloom filter header failed.\n" << e.what();
184+
throw ParquetException(ss.str());
185+
}
186+
if (ARROW_PREDICT_FALSE(header_cipher_len != header_cipher_total_len)) {
187+
std::stringstream ss;
188+
ss << "Encrypted bloom filter header length mismatch: expected "
189+
<< header_cipher_total_len << " bytes, got " << header_cipher_len;
190+
throw ParquetException(ss.str());
191+
}
192+
PARQUET_THROW_NOT_OK(ValidateBloomFilterHeader(header));
193+
194+
const int32_t bloom_filter_size = header.numBytes;
195+
UpdateDecryptor(decryptor, row_group_ordinal, column_ordinal,
196+
encryption::kBloomFilterBitset);
197+
const int32_t bitset_cipher_len = decryptor->CiphertextLength(bloom_filter_size);
198+
const int64_t total_cipher_len =
199+
header_cipher_total_len + static_cast<int64_t>(bitset_cipher_len);
200+
if (bloom_filter_length && *bloom_filter_length != total_cipher_len) {
201+
std::stringstream ss;
202+
ss << "Bloom filter length (" << bloom_filter_length.value()
203+
<< ") does not match the actual bloom filter (size: " << total_cipher_len << ").";
204+
throw ParquetException(ss.str());
205+
}
206+
207+
// Read and decrypt the bitset bytes.
208+
PARQUET_ASSIGN_OR_THROW(auto bitset_cipher_buf, input->Read(bitset_cipher_len));
209+
CheckBloomFilterShortRead(bitset_cipher_len, bitset_cipher_buf->size(),
210+
"Bloom filter bitset");
211+
212+
const int32_t bitset_plain_len =
213+
decryptor->PlaintextLength(static_cast<int32_t>(bitset_cipher_len));
214+
if (ARROW_PREDICT_FALSE(bitset_plain_len != bloom_filter_size)) {
215+
throw ParquetException("Bloom filter bitset size does not match header");
216+
}
217+
218+
auto bitset_plain_buf = AllocateBuffer(properties.memory_pool(), bitset_plain_len);
219+
int32_t decrypted_len =
220+
decryptor->Decrypt(bitset_cipher_buf->span_as<const uint8_t>(),
221+
bitset_plain_buf->mutable_span_as<uint8_t>());
222+
if (ARROW_PREDICT_FALSE(decrypted_len != bitset_plain_len)) {
223+
throw ParquetException("Bloom filter bitset decryption failed");
224+
}
225+
226+
// Initialize the bloom filter from the decrypted bitset.
227+
BlockSplitBloomFilter bloom_filter(properties.memory_pool());
228+
bloom_filter.Init(bitset_plain_buf->data(), bloom_filter_size);
229+
return bloom_filter;
230+
}
231+
232+
} // namespace
233+
234+
BlockSplitBloomFilter BlockSplitBloomFilter::DeserializeEncrypted(
235+
const ReaderProperties& properties, ArrowInputStream* input,
236+
std::optional<int64_t> bloom_filter_length, Decryptor* decryptor,
237+
int16_t row_group_ordinal, int16_t column_ordinal) {
238+
if (decryptor == nullptr) {
239+
throw ParquetException("Bloom filter decryptor must be provided");
240+
}
241+
242+
// Read the full Bloom filter payload up front when the total length is known.
243+
if (bloom_filter_length.has_value()) {
244+
PARQUET_ASSIGN_OR_THROW(auto bloom_filter_buf, input->Read(*bloom_filter_length));
245+
CheckBloomFilterShortRead(*bloom_filter_length, bloom_filter_buf->size(),
246+
"Bloom filter");
247+
::arrow::io::BufferReader reader(bloom_filter_buf);
248+
return DeserializeEncryptedFromStream(properties, &reader, bloom_filter_length,
249+
decryptor, row_group_ordinal, column_ordinal);
250+
}
251+
252+
return DeserializeEncryptedFromStream(properties, input, bloom_filter_length, decryptor,
253+
row_group_ordinal, column_ordinal);
254+
}
255+
107256
BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(
108257
const ReaderProperties& properties, ArrowInputStream* input,
109258
std::optional<int64_t> bloom_filter_length) {
@@ -126,8 +275,7 @@ BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(
126275
// This gets used, then set by DeserializeThriftMsg
127276
uint32_t header_size = static_cast<uint32_t>(header_buf->size());
128277
try {
129-
deserializer.DeserializeMessage(reinterpret_cast<const uint8_t*>(header_buf->data()),
130-
&header_size, &header);
278+
deserializer.DeserializeMessage(header_buf->data(), &header_size, &header);
131279
DCHECK_LE(header_size, header_buf->size());
132280
} catch (std::exception& e) {
133281
std::stringstream ss;
@@ -166,9 +314,7 @@ BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(
166314
PARQUET_ASSIGN_OR_THROW(
167315
auto read_size, input->Read(required_read_size,
168316
buffer->mutable_data() + bloom_filter_bytes_in_header));
169-
if (ARROW_PREDICT_FALSE(read_size < required_read_size)) {
170-
throw ParquetException("Bloom Filter read failed: not enough data");
171-
}
317+
CheckBloomFilterShortRead(required_read_size, read_size, "Bloom filter");
172318
BlockSplitBloomFilter bloom_filter(properties.memory_pool());
173319
bloom_filter.Init(buffer->data(), bloom_filter_size);
174320
return bloom_filter;

cpp/src/parquet/bloom_filter.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
#include "arrow/util/bit_util.h"
2525
#include "arrow/util/logging.h"
26+
#include "parquet/encryption/type_fwd.h"
2627
#include "parquet/hasher.h"
2728
#include "parquet/platform.h"
2829
#include "parquet/types.h"
@@ -328,6 +329,24 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public BloomFilter {
328329
const ReaderProperties& properties, ArrowInputStream* input_stream,
329330
std::optional<int64_t> bloom_filter_length = std::nullopt);
330331

332+
/// Deserialize an encrypted Bloom filter from an input stream.
333+
///
334+
/// The same metadata decryptor is used for both the serialized header and bitset,
335+
/// while switching module AADs between the two encrypted modules.
336+
///
337+
/// @param properties The parquet reader properties.
338+
/// @param input_stream The input stream from which to construct the bloom filter.
339+
/// @param bloom_filter_length The length of the serialized bloom filter including
340+
/// header.
341+
/// @param decryptor Decryptor for encrypted Bloom filter modules.
342+
/// @param row_group_ordinal Ordinal of the row group containing this Bloom filter.
343+
/// @param column_ordinal Ordinal of the column containing this Bloom filter.
344+
/// @return The BlockSplitBloomFilter.
345+
static BlockSplitBloomFilter DeserializeEncrypted(
346+
const ReaderProperties& properties, ArrowInputStream* input_stream,
347+
std::optional<int64_t> bloom_filter_length, Decryptor* decryptor,
348+
int16_t row_group_ordinal, int16_t column_ordinal);
349+
331350
private:
332351
inline void InsertHashImpl(uint64_t hash);
333352

0 commit comments

Comments
 (0)