Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
e4f2745
the very beginning
maiadegraaf Nov 3, 2023
de7d5f9
still working on it
maiadegraaf Nov 6, 2023
476a99c
Revert "still working on it"
maiadegraaf Nov 6, 2023
c48a094
so close
maiadegraaf Nov 7, 2023
90c981f
preliminary works
maiadegraaf Nov 8, 2023
36a14cd
work on index in reduce
maiadegraaf Nov 10, 2023
b0eed85
Try to add casting to reduce bind
maiadegraaf Nov 14, 2023
d1f5b93
Cast return type to input type, throws error for varchar input
maiadegraaf Nov 15, 2023
05e2803
start testing nested calls
maiadegraaf Nov 16, 2023
5ac1449
true->false
maiadegraaf Nov 29, 2023
f857296
change tests
maiadegraaf Nov 29, 2023
1490bab
Use list_column_format_index in ExecuteReduce
maiadegraaf Dec 1, 2023
5ba0bfb
All tests work!
maiadegraaf Dec 4, 2023
7f6cd32
Nested test issue when a list is NULL
maiadegraaf Dec 6, 2023
5229faa
Add active_rows sel
maiadegraaf Dec 6, 2023
97b3689
format fix
maiadegraaf Dec 6, 2023
3f6eea5
Merge remote-tracking branch 'origin/main' into list_reduce
maiadegraaf Dec 6, 2023
e450986
move file to list/CMakeLists
maiadegraaf Dec 6, 2023
7190b0f
add additional documentation and tests for strings
maiadegraaf Dec 7, 2023
ad70b30
fix nits and rework variables into structs
maiadegraaf Dec 8, 2023
55a2625
Combine ListReduceBind and ListLambdaBind
maiadegraaf Dec 8, 2023
2cf109c
NULL -> nullptr
maiadegraaf Dec 8, 2023
6db5a12
More tests
maiadegraaf Dec 8, 2023
a705251
Add final tests
maiadegraaf Dec 8, 2023
99ed869
add struct to lambda functions
maiadegraaf Dec 8, 2023
341502c
ff
maiadegraaf Dec 11, 2023
dddb7d9
run generate_functions
maiadegraaf Dec 11, 2023
fcad511
nits
maiadegraaf Dec 11, 2023
023629d
simplify odd & even result chunks
maiadegraaf Dec 12, 2023
11bbece
Change active rows vector to a validity mask
maiadegraaf Dec 13, 2023
e2bf36f
Add more benchmarks
maiadegraaf Dec 13, 2023
2591455
only initialize selection vectors once
maiadegraaf Dec 18, 2023
09eb429
Merge branch 'main' into list_reduce
maiadegraaf Dec 18, 2023
093ec0e
Add test for large tables and remove validity mask logic when initial…
maiadegraaf Dec 19, 2023
034039b
rename count variables
maiadegraaf Dec 19, 2023
0a482c8
fix test
maiadegraaf Dec 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions benchmark/micro/lambdas/list_reduce/complex_expression.benchmark
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# name: benchmark/micro/lambdas/list_reduce/complex_expression.benchmark
# description: list_reduce with a complex expression
# group: [list_reduce]

name list complex expression
group micro
subgroup lambdas

load
CREATE TABLE tbl AS SELECT range(i, i + 1) || range(i + 1, i + 2) || range(i + 2, i + 3) || range(i + 3, i + 4) || range(i, i + 1) || range(i + 1, i + 2) || range(i + 2, i + 3) || range(i + 3, i + 4) || range(i + 1, i + 2) || range(i + 2, i + 3) AS l FROM range(500000) t(i);

run
SELECT list_reduce(l, (x, y) -> list_reduce(l, (a, b) -> x + y + a + b)) FROM tbl;
13 changes: 13 additions & 0 deletions benchmark/micro/lambdas/list_reduce/list_length_2.benchmark
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# name: benchmark/micro/lambdas/list_reduce/list_length_2.benchmark
# description: list_reduce with lists that have a length of 2 benchmark
# group: [list_reduce]

name list length 2
group micro
subgroup lambdas

load
CREATE TABLE tbl AS SELECT range(i, i + 1) || range(i + 1, i + 2) AS l FROM range(5000000) t(i);

run
SELECT list_reduce(l, (x, y) -> x + y) FROM tbl;
13 changes: 13 additions & 0 deletions benchmark/micro/lambdas/list_reduce/list_length_5000.benchmark
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# name: benchmark/micro/lambdas/list_reduce/list_length_5000.benchmark
# description: list_reduce with lists that have a length of 5000 benchmark
# group: [list_reduce]

name list length 5000
group micro
subgroup lambdas

load
CREATE TABLE t2 AS SELECT range(5000) as l FROM range(10000);

run
SELECT list_reduce(l, (x, y) -> x + y) FROM t2;
13 changes: 13 additions & 0 deletions benchmark/micro/lambdas/list_reduce/list_variable_length.benchmark
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# name: benchmark/micro/lambdas/list_reduce/list_variable_length.benchmark
# description: list_reduce with list of variable length
# group: [list_reduce]

name list variable length
group micro
subgroup lambdas

load
CREATE TABLE tbl AS SELECT range((i * 95823983533) % 100000 + 1) AS l from range(5000) t(i);

run
SELECT list_reduce(l, (x, y) -> x + y) FROM tbl;
19 changes: 19 additions & 0 deletions benchmark/micro/lambdas/list_reduce/varchar.benchmark
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# name: benchmark/micro/lambdas/list_reduce/varchar.benchmark
# description: list_reduce on varchars
# group: [list_reduce]

name list reduce on varchars
group micro
subgroup lambdas

load
CREATE TEMPORARY TABLE strings_temp AS
SELECT ((i * 95823983533) % 100000)::VARCHAR AS s1,
((i * 547892347987) % 1000)::VARCHAR AS s2,
((i * 847892347987) % 100)::VARCHAR AS s3,
FROM range(10000) tbl(i);
CREATE TEMPORARY TABLE concat_strings_tmp AS SELECT 'a' || s1 || repeat(s2, s2::INT) || s2 || 'c' || s3 AS l FROM strings_temp;
CREATE TABLE tbl AS SELECT string_split(l, '1') || string_split(l, '0') || string_split(l, '2') AS l FROM concat_strings_tmp;

run
SELECT list_reduce(l, (a, b) -> a || b) FROM tbl;
3 changes: 3 additions & 0 deletions src/core_functions/function_list.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ static StaticFunctionDefinition internal_functions[] = {
DUCKDB_SCALAR_FUNCTION_ALIAS(ArrayFilterFun),
DUCKDB_SCALAR_FUNCTION_SET_ALIAS(ArrayGradeUpFun),
DUCKDB_SCALAR_FUNCTION_SET(ArrayInnerProductFun),
DUCKDB_SCALAR_FUNCTION_ALIAS(ArrayReduceFun),
DUCKDB_SCALAR_FUNCTION_SET_ALIAS(ArrayReverseSortFun),
DUCKDB_SCALAR_FUNCTION_SET_ALIAS(ArraySliceFun),
DUCKDB_SCALAR_FUNCTION_SET_ALIAS(ArraySortFun),
Expand Down Expand Up @@ -218,6 +219,7 @@ static StaticFunctionDefinition internal_functions[] = {
DUCKDB_SCALAR_FUNCTION_SET(ListGradeUpFun),
DUCKDB_SCALAR_FUNCTION_SET(ListInnerProductFun),
DUCKDB_SCALAR_FUNCTION_ALIAS(ListPackFun),
DUCKDB_SCALAR_FUNCTION(ListReduceFun),
DUCKDB_SCALAR_FUNCTION_SET(ListReverseSortFun),
DUCKDB_SCALAR_FUNCTION_SET(ListSliceFun),
DUCKDB_SCALAR_FUNCTION_SET(ListSortFun),
Expand Down Expand Up @@ -276,6 +278,7 @@ static StaticFunctionDefinition internal_functions[] = {
DUCKDB_SCALAR_FUNCTION(RadiansFun),
DUCKDB_SCALAR_FUNCTION(RandomFun),
DUCKDB_SCALAR_FUNCTION_SET(ListRangeFun),
DUCKDB_SCALAR_FUNCTION_ALIAS(ReduceFun),
DUCKDB_SCALAR_FUNCTION_SET_ALIAS(RegexpSplitToArrayFun),
DUCKDB_AGGREGATE_FUNCTION(RegrAvgxFun),
DUCKDB_AGGREGATE_FUNCTION(RegrAvgyFun),
Expand Down
117 changes: 55 additions & 62 deletions src/core_functions/lambda_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,6 @@ namespace duckdb {
// Helper functions
//===--------------------------------------------------------------------===//

//! LambdaColumnInfo holds information for preparing the input vectors. We prepare the input vectors
//! for executing a lambda expression on STANDARD_VECTOR_SIZE list child elements at a time.
struct LambdaColumnInfo {
explicit LambdaColumnInfo(Vector &vector) : vector(vector), sel(SelectionVector(STANDARD_VECTOR_SIZE)) {};

//! The original vector taken from args
reference<Vector> vector;
//! The selection vector to slice the original vector
SelectionVector sel;
//! The unified vector format of the original vector
UnifiedVectorFormat format;
};

//! LambdaExecuteInfo holds information for executing the lambda expression on an input chunk and
//! a resulting lambda chunk.
struct LambdaExecuteInfo {
Expand Down Expand Up @@ -173,9 +160,9 @@ struct ListFilterFunctor {
}
};

vector<LambdaColumnInfo> GetColumnInfo(DataChunk &args, const idx_t row_count) {
vector<LambdaFunctions::ColumnInfo> LambdaFunctions::GetColumnInfo(DataChunk &args, const idx_t row_count) {

vector<LambdaColumnInfo> data;
vector<ColumnInfo> data;
// skip the input list and then insert all remaining input vectors
for (idx_t i = 1; i < args.ColumnCount(); i++) {
data.emplace_back(args.data[i]);
Expand All @@ -184,9 +171,10 @@ vector<LambdaColumnInfo> GetColumnInfo(DataChunk &args, const idx_t row_count) {
return data;
}

vector<reference<LambdaColumnInfo>> GetInconstantColumnInfo(vector<LambdaColumnInfo> &data) {
vector<reference<LambdaFunctions::ColumnInfo>>
LambdaFunctions::GetInconstantColumnInfo(vector<LambdaFunctions::ColumnInfo> &data) {

vector<reference<LambdaColumnInfo>> inconstant_info;
vector<reference<ColumnInfo>> inconstant_info;
for (auto &entry : data) {
if (entry.vector.get().GetVectorType() != VectorType::CONSTANT_VECTOR) {
inconstant_info.push_back(entry);
Expand All @@ -195,8 +183,8 @@ vector<reference<LambdaColumnInfo>> GetInconstantColumnInfo(vector<LambdaColumnI
return inconstant_info;
}

void ExecuteExpression(const idx_t elem_cnt, const LambdaColumnInfo &column_info,
const vector<LambdaColumnInfo> &column_infos, const Vector &index_vector,
void ExecuteExpression(const idx_t elem_cnt, const LambdaFunctions::ColumnInfo &column_info,
const vector<LambdaFunctions::ColumnInfo> &column_infos, const Vector &index_vector,
LambdaExecuteInfo &info) {

info.input_chunk.SetCardinality(elem_cnt);
Expand All @@ -219,7 +207,7 @@ void ExecuteExpression(const idx_t elem_cnt, const LambdaColumnInfo &column_info
for (idx_t i = 0; i < column_infos.size(); i++) {

if (column_infos[i].vector.get().GetVectorType() == VectorType::CONSTANT_VECTOR) {
// only reference constant vectors
// only reference constant vectorsl
info.input_chunk.data[i + slice_offset].Reference(column_infos[i].vector);

} else {
Expand Down Expand Up @@ -279,63 +267,57 @@ LogicalType LambdaFunctions::BindBinaryLambda(const idx_t parameter_idx, const L
}
}

LogicalType LambdaFunctions::BindTernaryLambda(const idx_t parameter_idx, const LogicalType &list_child_type) {
switch (parameter_idx) {
case 0:
return list_child_type;
case 1:
return list_child_type;
case 2:
return LogicalType::BIGINT;
default:
throw BinderException("This lambda function only supports up to three lambda parameters!");
}
}

template <class FUNCTION_FUNCTOR>
void ExecuteLambda(DataChunk &args, ExpressionState &state, Vector &result) {

auto row_count = args.size();
Vector &list_column = args.data[0];

result.SetVectorType(VectorType::FLAT_VECTOR);
auto result_entries = FlatVector::GetData<list_entry_t>(result);
auto &result_validity = FlatVector::Validity(result);

if (list_column.GetType().id() == LogicalTypeId::SQLNULL) {
result_validity.SetInvalid(0);
bool result_is_null = false;
LambdaFunctions::LambdaInfo info(args, state, result, result_is_null);
if (result_is_null) {
return;
}

// get the lambda expression
auto &func_expr = state.expr.Cast<BoundFunctionExpression>();
auto &bind_info = func_expr.bind_info->Cast<ListLambdaBindData>();
auto &lambda_expr = bind_info.lambda_expr;
bool has_side_effects = lambda_expr->HasSideEffects();

// get the list column entries
UnifiedVectorFormat list_column_format;
list_column.ToUnifiedFormat(row_count, list_column_format);
auto list_entries = UnifiedVectorFormat::GetData<list_entry_t>(list_column_format);
auto result_entries = FlatVector::GetData<list_entry_t>(result);
auto inconstant_column_infos = LambdaFunctions::GetInconstantColumnInfo(info.column_infos);

// special-handling for the child_vector
auto child_vector_size = ListVector::GetListSize(list_column);
auto &child_vector = ListVector::GetEntry(list_column);
LambdaColumnInfo child_info(child_vector);
child_vector.ToUnifiedFormat(child_vector_size, child_info.format);

// get the lambda column data for all other input vectors
auto column_infos = GetColumnInfo(args, row_count);
auto inconstant_column_infos = GetInconstantColumnInfo(column_infos);
auto child_vector_size = ListVector::GetListSize(args.data[0]);
LambdaFunctions::ColumnInfo child_info(*info.child_vector);
info.child_vector->ToUnifiedFormat(child_vector_size, child_info.format);

// get the expression executor
LambdaExecuteInfo execute_info(state.GetContext(), *lambda_expr, args, bind_info.has_index, child_vector);
LambdaExecuteInfo execute_info(state.GetContext(), *info.lambda_expr, args, info.has_index, *info.child_vector);

// get list_filter specific info
ListFilterInfo list_filter_info;
FUNCTION_FUNCTOR::ReserveNewLengths(list_filter_info.entry_lengths, row_count);
FUNCTION_FUNCTOR::ReserveNewLengths(list_filter_info.entry_lengths, info.row_count);

// additional index vector
Vector index_vector(LogicalType::BIGINT);

// loop over the child entries and create chunks to be executed by the expression executor
idx_t elem_cnt = 0;
idx_t offset = 0;
for (idx_t row_idx = 0; row_idx < row_count; row_idx++) {
for (idx_t row_idx = 0; row_idx < info.row_count; row_idx++) {

auto list_idx = list_column_format.sel->get_index(row_idx);
const auto &list_entry = list_entries[list_idx];
auto list_idx = info.list_column_format.sel->get_index(row_idx);
const auto &list_entry = info.list_entries[list_idx];

// set the result to NULL for this row
if (!list_column_format.validity.RowIsValid(list_idx)) {
result_validity.SetInvalid(row_idx);
if (!info.list_column_format.validity.RowIsValid(list_idx)) {
info.result_validity->SetInvalid(row_idx);
FUNCTION_FUNCTOR::PushEmptyList(list_filter_info.entry_lengths);
continue;
}
Expand All @@ -354,22 +336,23 @@ void ExecuteLambda(DataChunk &args, ExpressionState &state, Vector &result) {
if (elem_cnt == STANDARD_VECTOR_SIZE) {

execute_info.lambda_chunk.Reset();
ExecuteExpression(elem_cnt, child_info, column_infos, index_vector, execute_info);
ExecuteExpression(elem_cnt, child_info, info.column_infos, index_vector, execute_info);
auto &lambda_vector = execute_info.lambda_chunk.data[0];

FUNCTION_FUNCTOR::AppendResult(result, lambda_vector, elem_cnt, result_entries, list_filter_info,
execute_info);
elem_cnt = 0;
}

// FIXME: reuse same selection vector for inconstant rows
// adjust indexes for slicing
child_info.sel.set_index(elem_cnt, list_entry.offset + child_idx);
for (auto &entry : inconstant_column_infos) {
entry.get().sel.set_index(elem_cnt, row_idx);
}

// set the index vector
if (bind_info.has_index) {
if (info.has_index) {
index_vector.SetValue(elem_cnt, Value::BIGINT(child_idx + 1));
}

Expand All @@ -378,20 +361,19 @@ void ExecuteLambda(DataChunk &args, ExpressionState &state, Vector &result) {
}

execute_info.lambda_chunk.Reset();
ExecuteExpression(elem_cnt, child_info, column_infos, index_vector, execute_info);
ExecuteExpression(elem_cnt, child_info, info.column_infos, index_vector, execute_info);
auto &lambda_vector = execute_info.lambda_chunk.data[0];

FUNCTION_FUNCTOR::AppendResult(result, lambda_vector, elem_cnt, result_entries, list_filter_info, execute_info);

if (args.AllConstant() && !has_side_effects) {
if (info.is_all_constant && !info.has_side_effects) {
result.SetVectorType(VectorType::CONSTANT_VECTOR);
}
}

unique_ptr<FunctionData> LambdaFunctions::ListLambdaBind(ClientContext &context, ScalarFunction &bound_function,
vector<unique_ptr<Expression>> &arguments,
const bool has_index) {

unique_ptr<FunctionData> LambdaFunctions::ListLambdaPrepareBind(vector<unique_ptr<Expression>> &arguments,
ClientContext &context,
ScalarFunction &bound_function) {
// NULL list parameter
if (arguments[0]->return_type.id() == LogicalTypeId::SQLNULL) {
bound_function.arguments[0] = LogicalType::SQLNULL;
Expand All @@ -405,10 +387,21 @@ unique_ptr<FunctionData> LambdaFunctions::ListLambdaBind(ClientContext &context,

arguments[0] = BoundCastExpression::AddArrayCastToList(context, std::move(arguments[0]));
D_ASSERT(arguments[0]->return_type.id() == LogicalTypeId::LIST);
return nullptr;
}

unique_ptr<FunctionData> LambdaFunctions::ListLambdaBind(ClientContext &context, ScalarFunction &bound_function,
vector<unique_ptr<Expression>> &arguments,
const bool has_index) {
unique_ptr<FunctionData> bind_data = ListLambdaPrepareBind(arguments, context, bound_function);
if (bind_data) {
return bind_data;
}

// get the lambda expression and put it in the bind info
auto &bound_lambda_expr = arguments[1]->Cast<BoundLambdaExpression>();
auto lambda_expr = std::move(bound_lambda_expr.lambda_expr);

return make_uniq<ListLambdaBindData>(bound_function.return_type, std::move(lambda_expr), has_index);
}

Expand Down
1 change: 1 addition & 0 deletions src/core_functions/scalar/list/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ add_library_unity(
list_distance.cpp
list_cosine_similarity.cpp
list_inner_product.cpp
list_reduce.cpp
list_transform.cpp
list_value.cpp
range.cpp)
Expand Down
8 changes: 8 additions & 0 deletions src/core_functions/scalar/list/functions.json
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,14 @@
"type": "scalar_function",
"aliases": ["array_filter", "filter"]
},
{
"name": "list_reduce",
"parameters": "list,lambda",
"description": "Returns a single value that is the result of applying the lambda function to each element of the input list, starting with the first element and then repeatedly applying the lambda function to the result of the previous application and the next element of the list.",
"example": "list_reduce([1, 2, 3], (x, y) -> x + y)",
"type": "scalar_function",
"aliases": ["array_reduce", "reduce"]
},
{
"name": "generate_series",
"parameters": "start,stop,step",
Expand Down
Loading