Skip to content

Commit d8f548b

Browse files
YuweiXiaoYuwei Xiao
andauthored
Support parallel DuckDB threads for Postgres table scan (#762)
Currently, we use a single DuckDB thread for Postgres table scan, even though multiple Postgres workers will be initialized. This leads to a performance bottleneck when scanning large amounts of data. This PR parallelizes the conversion from Postgres tuple to DuckDB data chunk. Below are benchmark results on a 5GB TPCH lineitem table. - Benchmark query: `select * from lineitem order by 1 limit 1` - Other GUC setups: `duckdb.max_workers_per_postgres_scan` = 2 | Threads (`duckdb.threads_for_postgres_scan`) | Costs (seconds) | |---|---| | 1 | 15.8 | | 2 | 8.7 | | 4 | 5.8 | --------- Co-authored-by: Yuwei Xiao <qianzhen@leadincloud.com>
1 parent 81439c8 commit d8f548b

14 files changed

+328
-28
lines changed

include/pgduckdb/pg/relations.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ bool TupleIsNull(TupleTableSlot *slot);
2222

2323
void SlotGetAllAttrs(TupleTableSlot *slot);
2424

25+
TupleTableSlot *ExecStoreMinimalTupleUnsafe(MinimalTuple minmal_tuple, TupleTableSlot *slot, bool shouldFree);
26+
2527
double EstimateRelSize(Relation rel);
2628

2729
Oid GetRelidFromSchemaAndTable(const char *, const char *);

include/pgduckdb/pgduckdb_guc.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ extern bool duckdb_allow_community_extensions;
1515
extern bool duckdb_allow_unsigned_extensions;
1616
extern bool duckdb_autoinstall_known_extensions;
1717
extern bool duckdb_autoload_known_extensions;
18+
extern int duckdb_threads_for_postgres_scan;
1819
extern int duckdb_max_workers_per_postgres_scan;
1920
extern char *duckdb_postgres_role;
2021
extern char *duckdb_motherduck_session_hint;

include/pgduckdb/pgduckdb_types.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,7 @@ duckdb::Value ConvertPostgresParameterToDuckValue(Datum value, Oid postgres_type
3737
void ConvertPostgresToDuckValue(Oid attr_type, Datum value, duckdb::Vector &result, uint64_t offset);
3838
bool ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, uint64_t col);
3939
void InsertTupleIntoChunk(duckdb::DataChunk &output, PostgresScanLocalState &scan_local_state, TupleTableSlot *slot);
40+
void InsertTuplesIntoChunk(duckdb::DataChunk &output, PostgresScanLocalState &scan_local_state, TupleTableSlot **slots,
41+
int num_slots);
4042

4143
} // namespace pgduckdb

include/pgduckdb/scan/postgres_scan.hpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@ struct PostgresScanGlobalState : public duckdb::GlobalTableFunctionState {
1818
~PostgresScanGlobalState();
1919
idx_t
2020
MaxThreads() const override {
21-
return 1;
21+
return max_threads;
2222
}
2323
void ConstructTableScanQuery(const duckdb::TableFunctionInitInput &input);
24+
bool RegisterLocalState();
25+
void UnregisterLocalState();
2426

2527
private:
2628
int ExtractQueryFilters(duckdb::TableFilter *filter, const char *column_name, duckdb::string &filters,
@@ -35,18 +37,22 @@ struct PostgresScanGlobalState : public duckdb::GlobalTableFunctionState {
3537
bool count_tuples_only;
3638
duckdb::vector<AttrNumber> output_columns;
3739
std::atomic<std::uint32_t> total_row_count;
40+
std::atomic<std::int32_t> registered_local_states;
3841
std::ostringstream scan_query;
3942
duckdb::shared_ptr<PostgresTableReader> table_reader_global_state;
4043
MemoryContext duckdb_scan_memory_ctx;
44+
idx_t max_threads;
4145
};
4246

4347
// Local State
44-
48+
#define LOCAL_STATE_SLOT_BATCH_SIZE 32
4549
struct PostgresScanLocalState : public duckdb::LocalTableFunctionState {
4650
PostgresScanLocalState(PostgresScanGlobalState *global_state);
4751
~PostgresScanLocalState() override;
4852

4953
PostgresScanGlobalState *global_state;
54+
TupleTableSlot *slots[LOCAL_STATE_SLOT_BATCH_SIZE];
55+
std::vector<uint8_t> minimal_tuple_buffer[LOCAL_STATE_SLOT_BATCH_SIZE];
5056

5157
size_t output_vector_size;
5258
bool exhausted_scan;

include/pgduckdb/scan/postgres_table_reader.hpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
#include "pgduckdb/pg/declarations.hpp"
44

5+
#include <vector>
6+
57
#include "pgduckdb/utility/cpp_only_file.hpp" // Must be last include.
68

79
namespace pgduckdb {
@@ -13,6 +15,12 @@ class PostgresTableReader {
1315
TupleTableSlot *GetNextTuple();
1416
void Init(const char *table_scan_query, bool count_tuples_only);
1517
void Cleanup();
18+
bool GetNextMinimalWorkerTuple(std::vector<uint8_t> &minimal_tuple_buffer);
19+
TupleTableSlot *InitTupleSlot();
20+
int
21+
NumWorkersLaunched() const {
22+
return nworkers_launched;
23+
}
1624

1725
private:
1826
PostgresTableReader(const PostgresTableReader &) = delete;

src/pg/relations.cpp

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,18 @@ TupleIsNull(TupleTableSlot *slot) {
6666

6767
void
6868
SlotGetAllAttrs(TupleTableSlot *slot) {
69-
PostgresFunctionGuard(slot_getallattrs, slot);
69+
// It is safe to call slot_getallattrs directly without the PostgresFunctionGuard because the function doesn't
70+
// perform any memory allocations. Assertions or errors are guaranteed not to occur for minimal slots.
71+
slot_getallattrs(slot);
72+
}
73+
74+
TupleTableSlot *
75+
ExecStoreMinimalTupleUnsafe(MinimalTuple minmal_tuple, TupleTableSlot *slot, bool shouldFree) {
76+
// It's safe to call ExecStoreMinimalTuple without the PostgresFunctionGuard as long as the slot is not "owned" by
77+
// the tuple, i.e., TTS_SHOULDFREE(slot) is false. This is because it does not allocate in memory contexts and the
78+
// only error it can throw is when the slot is not a minimal slot. That error is an obvious programming error so we
79+
// can ignore it here.
80+
return ::ExecStoreMinimalTuple(minmal_tuple, slot, shouldFree);
7081
}
7182

7283
Relation

src/pgduckdb_detoast.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ ToastFetchDatum(struct varlena *attr) {
134134
return result;
135135
}
136136

137+
// This function is thread-safe and does not utilize the PostgreSQL memory context.
137138
Datum
138139
DetoastPostgresDatum(struct varlena *attr, bool *should_free) {
139140
struct varlena *toasted_value = nullptr;

src/pgduckdb_guc.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ bool duckdb_force_execution = false;
113113
bool duckdb_unsafe_allow_mixed_transactions = false;
114114
bool duckdb_convert_unsupported_numeric_to_double = false;
115115
bool duckdb_log_pg_explain = false;
116+
int duckdb_threads_for_postgres_scan = 2;
116117
int duckdb_max_workers_per_postgres_scan = 2;
117118
char *duckdb_motherduck_session_hint = strdup("");
118119
char *duckdb_postgres_role = strdup("");
@@ -146,9 +147,12 @@ InitGUC() {
146147
DefineCustomVariable("duckdb.log_pg_explain", "Logs the EXPLAIN plan of a Postgres scan at the NOTICE log level",
147148
&duckdb_log_pg_explain);
148149

150+
DefineCustomVariable("duckdb.threads_for_postgres_scan",
151+
"Maximum number of DuckDB threads used for a single Postgres scan",
152+
&duckdb_threads_for_postgres_scan, 1, MAX_PARALLEL_WORKER_LIMIT);
149153
DefineCustomVariable("duckdb.max_workers_per_postgres_scan",
150154
"Maximum number of PostgreSQL workers used for a single Postgres scan",
151-
&pgduckdb::duckdb_max_workers_per_postgres_scan, 0, MAX_PARALLEL_WORKER_LIMIT);
155+
&duckdb_max_workers_per_postgres_scan, 0, MAX_PARALLEL_WORKER_LIMIT);
152156

153157
DefineCustomVariable("duckdb.postgres_role",
154158
"Which postgres role should be allowed to use DuckDB execution, use the secrets and create "

src/pgduckdb_types.cpp

Lines changed: 79 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include "pgduckdb/pgduckdb_utils.hpp"
1212
#include "pgduckdb/pgduckdb_metadata_cache.hpp"
1313
#include "pgduckdb/scan/postgres_scan.hpp"
14+
#include "pgduckdb/pg/memory.hpp"
1415
#include "pgduckdb/pg/types.hpp"
1516

1617
extern "C" {
@@ -355,8 +356,8 @@ ConvertTimestampTzDatum(const duckdb::Value &value) {
355356
if (!ValidTimestampOrTimestampTz(rawValue))
356357
throw duckdb::OutOfRangeException(
357358
"The TimestampTz value should be between min and max value (%s <-> %s)",
358-
duckdb::Timestamp::ToString(static_cast<duckdb::timestamp_t>(PGDUCKDB_MIN_TIMESTAMP_VALUE)),
359-
duckdb::Timestamp::ToString(static_cast<duckdb::timestamp_t>(PGDUCKDB_MAX_TIMESTAMP_VALUE)));
359+
duckdb::Timestamp::ToString(static_cast<duckdb::timestamp_tz_t>(PGDUCKDB_MIN_TIMESTAMP_VALUE)),
360+
duckdb::Timestamp::ToString(static_cast<duckdb::timestamp_tz_t>(PGDUCKDB_MAX_TIMESTAMP_VALUE)));
360361

361362
return TimestampTzGetDatum(rawValue - pgduckdb::PGDUCKDB_DUCK_TIMESTAMP_OFFSET);
362363
}
@@ -1984,6 +1985,82 @@ InsertTupleIntoChunk(duckdb::DataChunk &output, PostgresScanLocalState &scan_loc
19841985
scan_global_state->total_row_count++;
19851986
}
19861987

1988+
/*
1989+
* Returns true if the given type can be converted from a Postgres datum to a DuckDB value
1990+
* without requiring any Postgres-specific functions or memory allocations (such as palloc).
1991+
*/
1992+
static bool
1993+
IsThreadSafeTypeForPostgresToDuckDB(Oid attr_type, duckdb::LogicalTypeId duckdb_type) {
1994+
if (duckdb_type == duckdb::LogicalTypeId::VARCHAR) {
1995+
return attr_type != JSONBOID;
1996+
}
1997+
if (duckdb_type == duckdb::LogicalTypeId::LIST || duckdb_type == duckdb::LogicalTypeId::BIT) {
1998+
return false;
1999+
}
2000+
2001+
return true;
2002+
}
2003+
2004+
/*
2005+
* Insert batch of tuples into chunk. This function is thread-safe and is meant for multi-threaded scans.
2006+
*
2007+
* Global lock & PG memory context are handled for unsafe types, e.g., JSONB/LIST/VARBIT.
2008+
*/
2009+
void
2010+
InsertTuplesIntoChunk(duckdb::DataChunk &output, PostgresScanLocalState &scan_local_state, TupleTableSlot **slots,
2011+
int num_slots) {
2012+
if (num_slots == 0) {
2013+
return;
2014+
}
2015+
2016+
auto scan_global_state = scan_local_state.global_state;
2017+
int natts = slots[0]->tts_tupleDescriptor->natts;
2018+
D_ASSERT(!scan_global_state->count_tuples_only);
2019+
2020+
for (int duckdb_output_index = 0; duckdb_output_index < natts; duckdb_output_index++) {
2021+
auto &result = output.data[duckdb_output_index];
2022+
auto attr = slots[0]->tts_tupleDescriptor->attrs[duckdb_output_index];
2023+
bool is_safe_type = IsThreadSafeTypeForPostgresToDuckDB(attr.atttypid, result.GetType().id());
2024+
2025+
std::unique_ptr<std::lock_guard<std::recursive_mutex>> lock_guard;
2026+
MemoryContext old_ctx = NULL;
2027+
if (!is_safe_type) {
2028+
lock_guard = std::make_unique<std::lock_guard<std::recursive_mutex>>(GlobalProcessLock::GetLock());
2029+
old_ctx = pg::MemoryContextSwitchTo(scan_global_state->duckdb_scan_memory_ctx);
2030+
}
2031+
2032+
for (int row = 0; row < num_slots; row++) {
2033+
if (slots[row]->tts_isnull[duckdb_output_index]) {
2034+
auto &array_mask = duckdb::FlatVector::Validity(result);
2035+
array_mask.SetInvalid(scan_local_state.output_vector_size + row);
2036+
} else {
2037+
if (attr.attlen == -1) {
2038+
bool should_free = false;
2039+
Datum detoasted_value = DetoastPostgresDatum(
2040+
reinterpret_cast<varlena *>(slots[row]->tts_values[duckdb_output_index]), &should_free);
2041+
ConvertPostgresToDuckValue(attr.atttypid, detoasted_value, result,
2042+
scan_local_state.output_vector_size + row);
2043+
if (should_free) {
2044+
duckdb_free(reinterpret_cast<void *>(detoasted_value));
2045+
}
2046+
} else {
2047+
ConvertPostgresToDuckValue(attr.atttypid, slots[row]->tts_values[duckdb_output_index], result,
2048+
scan_local_state.output_vector_size + row);
2049+
}
2050+
}
2051+
}
2052+
2053+
if (!is_safe_type) {
2054+
pg::MemoryContextSwitchTo(old_ctx);
2055+
pg::MemoryContextReset(scan_global_state->duckdb_scan_memory_ctx);
2056+
// Lock will be automatically unlocked when lock_guard goes out of scope
2057+
}
2058+
}
2059+
2060+
scan_local_state.output_vector_size += num_slots;
2061+
scan_global_state->total_row_count += num_slots;
2062+
}
2063+
19872064
NumericVar
19882065
FromNumeric(Numeric num) {
19892066
NumericVar dest;

0 commit comments

Comments
 (0)