Hive partitioned write: lazy partitioning initialization #11765
Merged
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This PR changes the way that partitions are initialized for hive partitioned writes. Previously, all found partitions would be eagerly initialized for each partitioned collection through the methods
SynchronizeLocalMap,GrowAppendStateandGrowPartitions. This initializes aColumnDataCollectionandDataChunkfor each partition for each thread.After #10976, we flush the intermediate partitioned states after a set amount of rows (500K by default). After this, we reset the partitioned state. In the current set-up, after the reset happens, the synchronize methods are called and all partitioned states are initialized again. When dealing with many partitions, especially on wider tables, this consumes a lot of memory. We are also doing work that might not be necessary, as not every batch of 500K rows might contain all partitions.
This PR reworks the initialization of the partitioned states so that, when a new partition is found locally, we initialize the partitioned states for that partition. The rest of the states are left uninitialized. This greatly reduces memory usage in many cases when there are many partitions. This is especially true when the partitions are not randomly spread but co-located, for example when partitioning by dates and the dates are sorted.
Benchmarks
Below are some timings running the same query as in #10976, with
nreplaced by the number of partitions. This time we go up to the total number of unique values in theCounterIDcolumn (6506). The table itself is wide (100~ columns, with long strings) which makes this partitioning more challenging as well.As shown the old version generally performs worse with more partitions, and runs
OOMin both the temp directory and the non-temp directory cases for the full data set, whereas the temp directory is not necessary in the current implementation.