Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions extension/parquet/include/column_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,12 @@ class ColumnReader {
template <class CONVERSION, bool HAS_DEFINES, bool CHECKED>
void PlainSkipTemplatedInternal(ByteBuffer &plain_data, const uint8_t *__restrict defines,
const uint64_t num_values, idx_t row_offset = 0) {
if (!HAS_DEFINES && !CHECKED && CONVERSION::PlainConstantSize() > 0) {
plain_data.unsafe_inc(num_values * CONVERSION::PlainConstantSize());
if (!HAS_DEFINES && CONVERSION::PlainConstantSize() > 0) {
if (CHECKED) {
plain_data.inc(num_values * CONVERSION::PlainConstantSize());
} else {
plain_data.unsafe_inc(num_values * CONVERSION::PlainConstantSize());
}
return;
}
for (idx_t row_idx = row_offset; row_idx < row_offset + num_values; row_idx++) {
Expand Down
9 changes: 7 additions & 2 deletions extension/parquet/include/decode_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,17 @@ class ParquetDecodeUtils {
} while (val != 0);
}

template <class T>
template <class T, bool CHECKED = true>
static T VarintDecode(ByteBuffer &buf) {
T result = 0;
uint8_t shift = 0;
while (true) {
auto byte = buf.read<uint8_t>();
uint8_t byte;
if (CHECKED) {
byte = buf.read<uint8_t>();
} else {
byte = buf.unsafe_read<uint8_t>();
}
result |= T(byte & 127) << shift;
if ((byte & 128) == 0) {
break;
Expand Down
50 changes: 26 additions & 24 deletions extension/parquet/include/parquet_rle_bp_decoder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,10 @@ class RleBpDecoder {
literal_count_ -= literal_batch;
values_read += literal_batch;
} else {
if (!NextCounts()) {
if (values_read != batch_size) {
throw std::runtime_error("RLE decode did not find enough values");
}
return;
}
NextCounts();
}
}
if (values_read != batch_size) {
throw std::runtime_error("RLE decode did not find enough values");
}
D_ASSERT(values_read == batch_size);
}

void Skip(uint32_t batch_size) {
Expand All @@ -71,17 +64,10 @@ class RleBpDecoder {
literal_count_ -= literal_batch;
values_skipped += literal_batch;
} else {
if (!NextCounts()) {
if (values_skipped != batch_size) {
throw std::runtime_error("RLE decode did not find enough values");
}
return;
}
NextCounts();
}
}
if (values_skipped != batch_size) {
throw std::runtime_error("RLE decode did not find enough values");
}
D_ASSERT(values_skipped == batch_size);
}

static uint8_t ComputeBitWidth(idx_t val) {
Expand Down Expand Up @@ -110,14 +96,19 @@ class RleBpDecoder {

/// Fills literal_count_ and repeat_count_ with next values. Returns false if there
/// are no more.
bool NextCounts() {
template <bool CHECKED>
void NextCountsTemplated() {
// Read the next run's indicator int, it could be a literal or repeated run.
// The int is encoded as a vlq-encoded value.
if (bitpack_pos != 0) {
buffer_.inc(1);
if (CHECKED) {
buffer_.inc(1);
} else {
buffer_.unsafe_inc(1);
}
bitpack_pos = 0;
}
auto indicator_value = ParquetDecodeUtils::VarintDecode<uint32_t>(buffer_);
auto indicator_value = ParquetDecodeUtils::VarintDecode<uint32_t, CHECKED>(buffer_);

// lsb indicates if it is a literal run or repeated run
bool is_literal = indicator_value & 1;
Expand All @@ -127,16 +118,27 @@ class RleBpDecoder {
repeat_count_ = indicator_value >> 1;
// (ARROW-4018) this is not big-endian compatible, lol
current_value_ = 0;
if (CHECKED) {
buffer_.available(byte_encoded_len);
}
for (auto i = 0; i < byte_encoded_len; i++) {
current_value_ |= (buffer_.read<uint8_t>() << (i * 8));
auto next_byte = Load<uint8_t>(buffer_.ptr + i);
current_value_ |= (next_byte << (i * 8));
}
buffer_.unsafe_inc(byte_encoded_len);
// sanity check
if (repeat_count_ > 0 && current_value_ > max_val) {
throw std::runtime_error("Payload value bigger than allowed. Corrupted file?");
}
}
// TODO complain if we run out of buffer
return true;
}

void NextCounts() {
if (buffer_.check_available(byte_encoded_len + sizeof(uint32_t) + 2)) {
NextCountsTemplated<false>();
} else {
NextCountsTemplated<true>();
}
}
};
} // namespace duckdb
10 changes: 0 additions & 10 deletions extension/parquet/include/reader/templated_column_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,9 @@ class TemplatedColumnReader : public ColumnReader {
PlainSkipTemplated<VALUE_CONVERSION>(plain_data, defines, num_values);
}

// FIXME: need to profile this to see if it makes sense for primitive types
// (or at what ratio of count / num_values it makes sense)
// void PlainSelect(shared_ptr<ResizeableBuffer> &plain_data, uint8_t *defines, idx_t num_values, Vector &result,
// const SelectionVector &sel, idx_t count) override {
// PlainSelectTemplated<VALUE_TYPE, VALUE_CONVERSION>(*plain_data, defines, num_values, result, sel, count);
// }

bool SupportsDirectFilter() const override {
return true;
}
// bool SupportsDirectSelect() const override {
// return true;
// }
};

template <class PARQUET_PHYSICAL_TYPE, class DUCKDB_PHYSICAL_TYPE,
Expand Down
Loading