Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions extension/tpch/dbgen/bm_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ static char alpha_num[65] = "0123456789abcdefghijklmnopqrstuvwxyz ABCDEFGHIJKLMN
char *getenv PROTO((const char *name));
#endif
void usage();
void permute_dist(distribution *d, seed_t *seed);
void permute_dist(distribution *d, seed_t *seed, DBGenContext *ctx);

/*
* tpch_env_config: look for a environmental variable setting and return its
Expand Down Expand Up @@ -302,24 +302,23 @@ void read_dist(const char *path, const char *name, distribution *target) {
fprintf(stderr, "Read error on dist '%s'\n", name);
exit(1);
}
target->permute = (long *)NULL;
return;
}

/*
* agg_str(set, count) build an aggregated string from count unique
* selections taken from set
*/
void agg_str(distribution *set, long count, seed_t *seed, char *dest) {
void agg_str(distribution *set, long count, seed_t *seed, char *dest, DBGenContext *ctx) {
distribution *d;
int i;

d = set;
*dest = '\0';

permute_dist(d, seed);
permute_dist(d, seed, ctx);
for (i = 0; i < count; i++) {
strcat(dest, DIST_MEMBER(set, DIST_PERMUTE(d, i)));
strcat(dest, DIST_MEMBER(set, ctx->permute[i]));
strcat(dest, " ");
}
*(dest + (int)strlen(dest) - 1) = '\0';
Expand Down
34 changes: 16 additions & 18 deletions extension/tpch/dbgen/build.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*/
#include <stdio.h>
#include <string.h>
#include <mutex>
#ifndef VMS
#include <sys/types.h>
#endif
Expand All @@ -20,6 +21,7 @@
#include "dbgen/dsstypes.h"

#include <math.h>

#include "dbgen/rng64.h"

#define LEAP_ADJ(yr, mnth) ((LEAP(yr) && (mnth) >= 2) ? 1 : 0)
Expand Down Expand Up @@ -64,13 +66,12 @@ static void gen_phone(DSS_HUGE ind, char *target, seed_t *seed) {

long mk_cust(DSS_HUGE n_cust, customer_t *c, DBGenContext *ctx) {
DSS_HUGE i;
static int bInit = 0;
static std::once_flag bInit;
static char szFormat[100];

if (!bInit) {
std::call_once (bInit, [&](){
snprintf(szFormat, sizeof(szFormat), C_NAME_FMT, 9, &HUGE_FORMAT[1]);
bInit = 1;
}
});
c->custkey = n_cust;
snprintf(c->name, sizeof(c->name), szFormat, C_NAME_TAG, n_cust);
V_STR(C_ADDR_LEN, &ctx->Seed[C_ADDR_SD], c->address);
Expand Down Expand Up @@ -117,15 +118,13 @@ long mk_order(DSS_HUGE index, order_t *o, DBGenContext *ctx, long upd_num) {
char tmp_str[2];
char **mk_ascdate PROTO((void));
int delta = 1;
static int bInit = 0;
static std::once_flag bInit;
static char szFormat[100];

if (!bInit) {
std::call_once (bInit, [&](){
snprintf(szFormat, sizeof(szFormat), O_CLRK_FMT, 9, &HUGE_FORMAT[1]);
bInit = 1;
}
if (asc_date == NULL)
asc_date = mk_ascdate();
});
mk_sparse(index, &o->okey, (upd_num == 0) ? 0 : 1 + upd_num / (10000 / UPD_PCT));
if (ctx->scale_factor >= 30000)
RANDOM64(o->custkey, O_CKEY_MIN, O_CKEY_MAX, &ctx->Seed[O_CKEY_SD]);
Expand Down Expand Up @@ -216,17 +215,17 @@ long mk_part(DSS_HUGE index, part_t *p, DBGenContext *ctx) {
DSS_HUGE temp;
long snum;
DSS_HUGE brnd;
static int bInit = 0;
static std::once_flag bInit;
static char szFormat[100];
static char szBrandFormat[100];

if (!bInit) {

std::call_once (bInit, [&](){
snprintf(szFormat, sizeof(szFormat), P_MFG_FMT, 1, &HUGE_FORMAT[1]);
snprintf(szBrandFormat, sizeof(szBrandFormat), P_BRND_FMT, 2, &HUGE_FORMAT[1]);
bInit = 1;
}
});
p->partkey = index;
agg_str(&colors, (long)P_NAME_SCL, &ctx->Seed[P_NAME_SD], p->name);
agg_str(&colors, (long)P_NAME_SCL, &ctx->Seed[P_NAME_SD], p->name, ctx);
RANDOM(temp, P_MFG_MIN, P_MFG_MAX, &ctx->Seed[P_MFG_SD]);
snprintf(p->mfgr, sizeof(p->mfgr), szFormat, P_MFG_TAG, temp);
RANDOM(brnd, P_BRND_MIN, P_BRND_MAX, &ctx->Seed[P_BRND_SD]);
Expand All @@ -252,13 +251,12 @@ long mk_part(DSS_HUGE index, part_t *p, DBGenContext *ctx) {

long mk_supp(DSS_HUGE index, supplier_t *s, DBGenContext *ctx) {
DSS_HUGE i, bad_press, noise, offset, type;
static int bInit = 0;
static std::once_flag bInit;
static char szFormat[100];

if (!bInit) {
std::call_once (bInit, [&](){
snprintf(szFormat, sizeof(szFormat), S_NAME_FMT, 9, &HUGE_FORMAT[1]);
bInit = 1;
}
});
s->suppkey = index;
snprintf(s->name, sizeof(s->name), szFormat, S_NAME_TAG, index);
V_STR(S_ADDR_LEN, &ctx->Seed[S_ADDR_SD], s->address);
Expand Down
196 changes: 143 additions & 53 deletions extension/tpch/dbgen/dbgen.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "duckdb/catalog/catalog.hpp"
#include "duckdb/main/appender.hpp"
#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp"
#include "duckdb/common/thread.hpp"
#endif

#define DECLARER /* EXTERN references get defined here */
Expand Down Expand Up @@ -473,16 +474,97 @@ void skip(int table, int children, DSS_HUGE step, DBGenContext &dbgen_ctx) {
}
}

struct TPCHDBgenParameters {
TPCHDBgenParameters(ClientContext &context, Catalog &catalog, const string &schema, const string &suffix) {
tables.resize(REGION + 1);
for (size_t i = PART; i <= REGION; i++) {
auto tname = get_table_name(i);
if (!tname.empty()) {
string full_tname = string(tname) + string(suffix);
auto &tbl_catalog = catalog.GetEntry<TableCatalogEntry>(context, schema, full_tname);
tables[i] = &tbl_catalog;
}
}

}

vector<optional_ptr<TableCatalogEntry>> tables;
};

class TPCHDataAppender {
public:
TPCHDataAppender(ClientContext &context, TPCHDBgenParameters &parameters, DBGenContext base_context, idx_t flush_count) :
context(context), parameters(parameters) {
dbgen_ctx = base_context;
append_info = duckdb::unique_ptr<tpch_append_information[]>(new tpch_append_information[REGION + 1]);
memset(append_info.get(), 0, sizeof(tpch_append_information) * REGION + 1);
for (size_t i = PART; i <= REGION; i++) {
if (parameters.tables[i]) {
auto &tbl_catalog = *parameters.tables[i];
append_info[i].appender = make_uniq<InternalAppender>(context, tbl_catalog, flush_count);
}
}
}

void GenerateTableData(int table_index, idx_t row_count, idx_t offset) {
gen_tbl(context, table_index, static_cast<DSS_HUGE>(row_count), append_info.get(), &dbgen_ctx, offset);
}

void AppendData(int children, int current_step) {
DSS_HUGE i;
DSS_HUGE rowcnt = 0;
for (i = PART; i <= REGION; i++) {
if (table & (1 << i)) {
if (i < NATION) {
rowcnt = dbgen_ctx.tdefs[i].base * dbgen_ctx.scale_factor;
} else {
rowcnt = dbgen_ctx.tdefs[i].base;
}
if (children > 1 && current_step != -1) {
size_t part_size = std::ceil((double)rowcnt / (double)children);
auto part_offset = part_size * current_step;
auto part_end = part_offset + part_size;
rowcnt = part_end > rowcnt ? rowcnt - part_offset : part_size;
skip(i, children, part_offset, dbgen_ctx);
if (rowcnt > 0) {
// generate part of the table
GenerateTableData((int) i, rowcnt, part_offset);
}
} else {
// generate full table
GenerateTableData((int) i, rowcnt, 0);
}
}
}
}

void Flush() {
// flush any incomplete chunks
for (idx_t i = PART; i <= REGION; i++) {
if (append_info[i].appender) {
append_info[i].appender->Flush();
append_info[i].appender.reset();
}
}
}

private:
ClientContext &context;
TPCHDBgenParameters &parameters;
unique_ptr<tpch_append_information[]> append_info;
DBGenContext dbgen_ctx;
};

static void ParallelTPCHAppend(TPCHDataAppender *appender, int children, int current_step) {
appender->AppendData(children, current_step);
}

void DBGenWrapper::LoadTPCHData(ClientContext &context, double flt_scale, string catalog_name, string schema,
string suffix, int children_p, int current_step) {
string suffix, int children, int current_step) {
if (flt_scale == 0) {
return;
}

// generate the actual data
DSS_HUGE rowcnt = 0;
DSS_HUGE extra;
DSS_HUGE i;
// all tables
table = (1 << CUST) | (1 << SUPP) | (1 << NATION) | (1 << REGION) | (1 << PART_PSUPP) | (1 << ORDER_LINE);
force = 0;
Expand All @@ -495,9 +577,10 @@ void DBGenWrapper::LoadTPCHData(ClientContext &context, double flt_scale, string
set_seeds = 0;
updates = 0;

DBGenContext dbgen_ctx;
d_path = NULL;

tdef *tdefs = dbgen_ctx.tdefs;
DBGenContext base_context;
tdef *tdefs = base_context.tdefs;
tdefs[PART].base = 200000;
tdefs[PSUPP].base = 200000;
tdefs[SUPP].base = 10000;
Expand All @@ -509,18 +592,11 @@ void DBGenWrapper::LoadTPCHData(ClientContext &context, double flt_scale, string
tdefs[NATION].base = NATIONS_MAX;
tdefs[REGION].base = NATIONS_MAX;

children = children_p;
d_path = NULL;

if (current_step >= children) {
return;
}

if (flt_scale < MIN_SCALE) {
int i;
int int_scale;

dbgen_ctx.scale_factor = 1;
base_context.scale_factor = 1;
int_scale = (int)(1000 * flt_scale);
for (i = PART; i < REGION; i++) {
tdefs[i].base = (DSS_HUGE)(int_scale * tdefs[i].base) / 1000;
Expand All @@ -529,58 +605,72 @@ void DBGenWrapper::LoadTPCHData(ClientContext &context, double flt_scale, string
}
}
} else {
dbgen_ctx.scale_factor = (long)flt_scale;
base_context.scale_factor = (long)flt_scale;
}
load_dists(10 * 1024 * 1024, &dbgen_ctx); // 10MiB

if (current_step >= children) {
return;
}

load_dists(10 * 1024 * 1024, &base_context); // 10MiB
/* have to do this after init */
tdefs[NATION].base = nations.count;
tdefs[REGION].base = regions.count;

auto &catalog = Catalog::GetCatalog(context, catalog_name);

auto append_info = duckdb::unique_ptr<tpch_append_information[]>(new tpch_append_information[REGION + 1]);
memset(append_info.get(), 0, sizeof(tpch_append_information) * REGION + 1);
for (size_t i = PART; i <= REGION; i++) {
auto tname = get_table_name(i);
if (!tname.empty()) {
string full_tname = string(tname) + string(suffix);
auto &tbl_catalog = catalog.GetEntry<TableCatalogEntry>(context, schema, full_tname);
append_info[i].appender = make_uniq<InternalAppender>(context, tbl_catalog);
TPCHDBgenParameters parameters(context, catalog, schema, suffix);
bool explicit_partial_generation = children > 1 && current_step != -1;
auto thread_count = TaskScheduler::GetScheduler(context).NumberOfThreads();
if (explicit_partial_generation || thread_count <= 1) {
// if we are doing explicit partial generation the parallelism is managed outside of dbgen
// only generate the chunk we are interested in
TPCHDataAppender appender(context, parameters, base_context, BaseAppender::DEFAULT_FLUSH_COUNT);
appender.AppendData(children, current_step);
appender.Flush();
} else {
// we split into 20 children per scale factor by default
static constexpr idx_t CHILDREN_PER_SCALE_FACTOR = 20;
idx_t child_count;
if (flt_scale < 1) {
child_count = 1;
} else {
child_count = MinValue<idx_t>(static_cast<idx_t>(CHILDREN_PER_SCALE_FACTOR * flt_scale), MAX_CHILDREN);
}
}

for (i = PART; i <= REGION; i++) {
if (table & (1 << i)) {
if (i < NATION) {
rowcnt = tdefs[i].base * dbgen_ctx.scale_factor;
} else {
rowcnt = tdefs[i].base;
idx_t step = 0;
vector<TPCHDataAppender> finished_appenders;
while(step < child_count) {
// launch N threads
vector<TPCHDataAppender> new_appenders;
vector<std::thread> threads;
idx_t launched_step = step;
// initialize the appenders for each thread
// note we prevent the threads themselves from flushing the appenders by specifying a very high flush count here
for(idx_t thr_idx = 0; thr_idx < thread_count && launched_step < child_count; thr_idx++, launched_step++) {
new_appenders.emplace_back(context, parameters, base_context, NumericLimits<int64_t>::Maximum());
}
if (children > 1 && current_step != -1) {
size_t part_size = std::ceil((double)rowcnt / (double)children);
auto part_offset = part_size * current_step;
auto part_end = part_offset + part_size;
rowcnt = part_end > rowcnt ? rowcnt - part_offset : part_size;
skip(i, children, part_offset, dbgen_ctx);
if (rowcnt > 0) {
// generate part of the table
gen_tbl(context, (int)i, rowcnt, append_info.get(), &dbgen_ctx, part_offset);
}
} else {
// generate full table
gen_tbl(context, (int)i, rowcnt, append_info.get(), &dbgen_ctx);
// launch the threads
for(idx_t thr_idx = 0; thr_idx < new_appenders.size(); thr_idx++) {
threads.emplace_back(ParallelTPCHAppend, &new_appenders[thr_idx], child_count, step);
step++;
}
// flush the previous batch of appenders while waiting (if any are there)
// now flush the appenders in-order
for(auto &appender : finished_appenders) {
appender.Flush();
}
finished_appenders.clear();
// wait for all threads to finish
for(auto &thread : threads) {
thread.join();
}
finished_appenders = std::move(new_appenders);
}
}
// flush any incomplete chunks
for (size_t i = PART; i <= REGION; i++) {
if (append_info[i].appender) {
append_info[i].appender->Flush();
append_info[i].appender.reset();
// flush the final batch of appenders
for(auto &appender : finished_appenders) {
appender.Flush();
}
}

cleanup_dists();
}

Expand Down
Loading