1717
1818#include < cstdint>
1919#include < cstring>
20+ #include < limits>
2021#include < memory>
2122
23+ #include " arrow/io/memory.h"
2224#include " arrow/result.h"
2325#include " arrow/util/logging_internal.h"
2426#include " arrow/util/macros.h"
2527
2628#include " generated/parquet_types.h"
2729
2830#include " parquet/bloom_filter.h"
31+ #include " parquet/encryption/encryption_internal.h"
32+ #include " parquet/encryption/internal_file_decryptor.h"
2933#include " parquet/exception.h"
3034#include " parquet/thrift_internal.h"
3135#include " parquet/xxhasher.h"
3236
3337namespace parquet {
38+ namespace {
39+
40+ constexpr int32_t kCiphertextLengthSize = 4 ;
41+
42+ // / Parse the 4-byte little-endian length prefix and return the total ciphertext size,
43+ // / including the 4-byte length field itself.
44+ int64_t ParseCiphertextTotalLength (const uint8_t * data, int64_t length) {
45+ if (length < kCiphertextLengthSize ) {
46+ throw ParquetException (" Ciphertext length buffer is too small" );
47+ }
48+ uint32_t buffer_size =
49+ (static_cast <uint32_t >(data[3 ]) << 24 ) | (static_cast <uint32_t >(data[2 ]) << 16 ) |
50+ (static_cast <uint32_t >(data[1 ]) << 8 ) | (static_cast <uint32_t >(data[0 ]));
51+ return static_cast <int64_t >(buffer_size) + kCiphertextLengthSize ;
52+ }
53+
54+ void CheckBloomFilterShortRead (int64_t expected, int64_t actual,
55+ std::string_view context) {
56+ if (ARROW_PREDICT_FALSE (actual < expected)) {
57+ std::stringstream ss;
58+ ss << context << " read failed: expected " ;
59+ ss << expected << " bytes, got " << actual;
60+ throw ParquetException (ss.str ());
61+ }
62+ }
63+
64+ } // namespace
65+
3466constexpr uint32_t BlockSplitBloomFilter::SALT[kBitsSetPerBlock ];
3567
3668BlockSplitBloomFilter::BlockSplitBloomFilter (::arrow::MemoryPool* pool)
@@ -75,10 +107,11 @@ void BlockSplitBloomFilter::Init(const uint8_t* bitset, uint32_t num_bytes) {
75107 this ->hasher_ = std::make_unique<XxHasher>();
76108}
77109
78- static constexpr uint32_t kBloomFilterHeaderSizeGuess = 256 ;
110+ namespace {
111+
112+ constexpr uint32_t kBloomFilterHeaderSizeGuess = 256 ;
79113
80- static ::arrow::Status ValidateBloomFilterHeader (
81- const format::BloomFilterHeader& header) {
114+ ::arrow::Status ValidateBloomFilterHeader (const format::BloomFilterHeader& header) {
82115 if (!header.algorithm .__isset .BLOCK ) {
83116 return ::arrow::Status::Invalid (
84117 " Unsupported Bloom filter algorithm: " , header.algorithm , " ." );
@@ -104,6 +137,122 @@ static ::arrow::Status ValidateBloomFilterHeader(
104137 return ::arrow::Status::OK ();
105138}
106139
140+ BlockSplitBloomFilter DeserializeEncryptedFromStream (
141+ const ReaderProperties& properties, ArrowInputStream* input,
142+ std::optional<int64_t > bloom_filter_length, Decryptor* decryptor,
143+ int16_t row_group_ordinal, int16_t column_ordinal) {
144+ ThriftDeserializer deserializer (properties);
145+ format::BloomFilterHeader header;
146+
147+ // Read the length-prefixed ciphertext for the header.
148+ PARQUET_ASSIGN_OR_THROW (auto length_buf, input->Read (kCiphertextLengthSize ));
149+ CheckBloomFilterShortRead (kCiphertextLengthSize , length_buf->size (),
150+ " Bloom filter header length" );
151+
152+ const int64_t header_cipher_total_len =
153+ ParseCiphertextTotalLength (length_buf->data (), length_buf->size ());
154+ if (ARROW_PREDICT_FALSE (header_cipher_total_len >
155+ std::numeric_limits<int32_t >::max ())) {
156+ throw ParquetException (" Bloom filter header ciphertext length overflows int32" );
157+ }
158+ if (bloom_filter_length && header_cipher_total_len > *bloom_filter_length) {
159+ throw ParquetException (
160+ " Bloom filter length less than encrypted bloom filter header length" );
161+ }
162+
163+ // Read the full header ciphertext and decrypt the Thrift header.
164+ auto header_cipher_buf =
165+ AllocateBuffer (properties.memory_pool (), header_cipher_total_len);
166+ std::memcpy (header_cipher_buf->mutable_data (), length_buf->data (),
167+ kCiphertextLengthSize );
168+ const int64_t header_cipher_remaining = header_cipher_total_len - kCiphertextLengthSize ;
169+ PARQUET_ASSIGN_OR_THROW (auto read_size, input->Read (header_cipher_remaining,
170+ header_cipher_buf->mutable_data () +
171+ kCiphertextLengthSize ));
172+ CheckBloomFilterShortRead (header_cipher_remaining, read_size, " Bloom filter header" );
173+
174+ // Bloom filter header and bitset are separate encrypted modules with different AADs.
175+ UpdateDecryptor (decryptor, row_group_ordinal, column_ordinal,
176+ encryption::kBloomFilterHeader );
177+ auto header_cipher_len = static_cast <uint32_t >(header_cipher_total_len);
178+ try {
179+ deserializer.DeserializeMessage (header_cipher_buf->data (), &header_cipher_len,
180+ &header, decryptor);
181+ } catch (std::exception& e) {
182+ std::stringstream ss;
183+ ss << " Deserializing bloom filter header failed.\n " << e.what ();
184+ throw ParquetException (ss.str ());
185+ }
186+ if (ARROW_PREDICT_FALSE (header_cipher_len != header_cipher_total_len)) {
187+ std::stringstream ss;
188+ ss << " Encrypted bloom filter header length mismatch: expected "
189+ << header_cipher_total_len << " bytes, got " << header_cipher_len;
190+ throw ParquetException (ss.str ());
191+ }
192+ PARQUET_THROW_NOT_OK (ValidateBloomFilterHeader (header));
193+
194+ const int32_t bloom_filter_size = header.numBytes ;
195+ UpdateDecryptor (decryptor, row_group_ordinal, column_ordinal,
196+ encryption::kBloomFilterBitset );
197+ const int32_t bitset_cipher_len = decryptor->CiphertextLength (bloom_filter_size);
198+ const int64_t total_cipher_len =
199+ header_cipher_total_len + static_cast <int64_t >(bitset_cipher_len);
200+ if (bloom_filter_length && *bloom_filter_length != total_cipher_len) {
201+ std::stringstream ss;
202+ ss << " Bloom filter length (" << bloom_filter_length.value ()
203+ << " ) does not match the actual bloom filter (size: " << total_cipher_len << " )." ;
204+ throw ParquetException (ss.str ());
205+ }
206+
207+ // Read and decrypt the bitset bytes.
208+ PARQUET_ASSIGN_OR_THROW (auto bitset_cipher_buf, input->Read (bitset_cipher_len));
209+ CheckBloomFilterShortRead (bitset_cipher_len, bitset_cipher_buf->size (),
210+ " Bloom filter bitset" );
211+
212+ const int32_t bitset_plain_len =
213+ decryptor->PlaintextLength (static_cast <int32_t >(bitset_cipher_len));
214+ if (ARROW_PREDICT_FALSE (bitset_plain_len != bloom_filter_size)) {
215+ throw ParquetException (" Bloom filter bitset size does not match header" );
216+ }
217+
218+ auto bitset_plain_buf = AllocateBuffer (properties.memory_pool (), bitset_plain_len);
219+ int32_t decrypted_len =
220+ decryptor->Decrypt (bitset_cipher_buf->span_as <const uint8_t >(),
221+ bitset_plain_buf->mutable_span_as <uint8_t >());
222+ if (ARROW_PREDICT_FALSE (decrypted_len != bitset_plain_len)) {
223+ throw ParquetException (" Bloom filter bitset decryption failed" );
224+ }
225+
226+ // Initialize the bloom filter from the decrypted bitset.
227+ BlockSplitBloomFilter bloom_filter (properties.memory_pool ());
228+ bloom_filter.Init (bitset_plain_buf->data (), bloom_filter_size);
229+ return bloom_filter;
230+ }
231+
232+ } // namespace
233+
234+ BlockSplitBloomFilter BlockSplitBloomFilter::DeserializeEncrypted (
235+ const ReaderProperties& properties, ArrowInputStream* input,
236+ std::optional<int64_t > bloom_filter_length, Decryptor* decryptor,
237+ int16_t row_group_ordinal, int16_t column_ordinal) {
238+ if (decryptor == nullptr ) {
239+ throw ParquetException (" Bloom filter decryptor must be provided" );
240+ }
241+
242+ // Read the full Bloom filter payload up front when the total length is known.
243+ if (bloom_filter_length.has_value ()) {
244+ PARQUET_ASSIGN_OR_THROW (auto bloom_filter_buf, input->Read (*bloom_filter_length));
245+ CheckBloomFilterShortRead (*bloom_filter_length, bloom_filter_buf->size (),
246+ " Bloom filter" );
247+ ::arrow::io::BufferReader reader (bloom_filter_buf);
248+ return DeserializeEncryptedFromStream (properties, &reader, bloom_filter_length,
249+ decryptor, row_group_ordinal, column_ordinal);
250+ }
251+
252+ return DeserializeEncryptedFromStream (properties, input, bloom_filter_length, decryptor,
253+ row_group_ordinal, column_ordinal);
254+ }
255+
107256BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize (
108257 const ReaderProperties& properties, ArrowInputStream* input,
109258 std::optional<int64_t > bloom_filter_length) {
@@ -126,8 +275,7 @@ BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(
126275 // This gets used, then set by DeserializeThriftMsg
127276 uint32_t header_size = static_cast <uint32_t >(header_buf->size ());
128277 try {
129- deserializer.DeserializeMessage (reinterpret_cast <const uint8_t *>(header_buf->data ()),
130- &header_size, &header);
278+ deserializer.DeserializeMessage (header_buf->data (), &header_size, &header);
131279 DCHECK_LE (header_size, header_buf->size ());
132280 } catch (std::exception& e) {
133281 std::stringstream ss;
@@ -166,9 +314,7 @@ BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(
166314 PARQUET_ASSIGN_OR_THROW (
167315 auto read_size, input->Read (required_read_size,
168316 buffer->mutable_data () + bloom_filter_bytes_in_header));
169- if (ARROW_PREDICT_FALSE (read_size < required_read_size)) {
170- throw ParquetException (" Bloom Filter read failed: not enough data" );
171- }
317+ CheckBloomFilterShortRead (required_read_size, read_size, " Bloom filter" );
172318 BlockSplitBloomFilter bloom_filter (properties.memory_pool ());
173319 bloom_filter.Init (buffer->data (), bloom_filter_size);
174320 return bloom_filter;
0 commit comments