Skip to content

Conversation

@Mytherin
Copy link
Collaborator

This PR enables inter-pipeline parallelism, allowing for parallel execution within one pipeline by splitting up the scans and creating individual tasks. Data sources can now implement an optional method ParallelScanInfo that allows them to specify how the scan should be split up into separate sources.

void ParallelScanInfo(ClientContext &context, std::function<void(unique_ptr<OperatorTaskInfo>)> callback) override;

The data sources can inherit from the OperatorTaskInfo to provide per-task information on how to scan the data source. For an example of this see the code in physical_table_scan.cpp. Currently this splitting up is only done for the normal table scans, and the table is split up into chunks of 100 vectors (102.400 tuples).

Sinks also need to explicitly implement parallelism for this to function. For now, parallelism has only been implemented in the PhysicalHashJoin, PhysicalHashAggregate and PhysicalSimpleAggregate. Note that the parallelism in the PhysicalHashAggregate is currently very poor, involving a simple global lock over the table. This will be fixed in the future.

Other sinks also need to be extended to support parallel execution. I think the following sinks are the most interesting for now: ORDER BY, WINDOW, INSERT, DELETE, UPDATE, TOP N and the other joins (Blockwise NLJ join, piecewise merge join).

Current TPC-H Results

To continue the trend, this PR results in the following parallel speed-up for TPC-H:

Progress is made, but more work to be done :) Particularly in the aggregate HTs.

Query Single Threaded 4 Threads Speedup
Q01 1.17 0.54 2.16x
Q02 0.20 0.12 1.66x
Q03 0.43 0.16 2.77x
Q04 0.50 0.26 1.93x
Q05 0.60 0.17 3.63x
Q06 0.20 0.06 3.36x
Q07 1.02 0.38 2.66x
Q08 0.56 0.16 3.55x
Q09 4.20 1.36 3.10x
Q10 0.83 0.35 2.37x
Q11 0.07 0.03 2.80x
Q12 0.62 0.25 2.51x
Q13 0.88 0.43 2.07x
Q14 0.24 0.09 2.87x
Q15 0.42 0.13 3.28x
Q16 0.23 0.20 1.12x
Q17 0.67 0.39 1.72x
Q18 1.63 1.37 1.19x
Q19 1.13 0.36 3.10x
Q20 0.27 0.13 2.13x
Q21 1.88 1.20 1.56x
Q22 0.18 0.11 1.64x

Mytherin and others added 30 commits July 14, 2020 13:01
…tor to allow executor to work exclusively on its own query
…ptional interface ParallelScanInfo that allows them to emit OperatorTaskInfo objects. For every object they emit a task is created. The OperatorTaskInfo is then accessible within the GetChunk method through the TaskContext object. This allows all different scan types to be partitioned, including e.g. scanning an aggregate HT or reading a parquet/CSV file.
… querygraph of a specific tree in the log file
…o be launched for every vector, and run this on every query with EnableQueryVerification enabled
@Mytherin Mytherin merged commit de6073e into master Jul 23, 2020
@Alex-Monahan
Copy link
Contributor

I looked through the commit history of the 0.2.0 release, and this is not included, correct?

@Mytherin
Copy link
Collaborator Author

Not yet no :) We wanted to give it some time to cool down before putting it in a release, as it is a pretty big change to the execution engine.

@Alex-Monahan
Copy link
Contributor

We appreciate your focus on reliability and your caution here! Still very excited about this. I assume you're thinking of having a benchmarking bonanza after you implement it? :-)

@Mytherin
Copy link
Collaborator Author

We want to do a few more rounds of optimizations before starting the benchmarking bonanza :) But this is indeed on the agenda.

@Mytherin Mytherin deleted the parallelpipelines branch July 24, 2020 15:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants