Skip to content

Commit 38ca2ec

Browse files
committed
[native] Propagate table writer concurrency settings
1 parent 189e817 commit 38ca2ec

File tree

2 files changed

+29
-1
lines changed

2 files changed

+29
-1
lines changed

presto-native-execution/presto_cpp/main/common/Configs.cpp

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,8 @@ SystemConfig::SystemConfig() {
139139
BOOL_PROP(kHttpServerBindToNodeInternalAddressOnlyEnabled, false),
140140
NONE_PROP(kDiscoveryUri),
141141
NUM_PROP(kMaxDriversPerTask, 16),
142+
NONE_PROP(kTaskWriterCount),
143+
NONE_PROP(kTaskPartitionedWriterCount),
142144
NUM_PROP(kConcurrentLifespansPerTask, 1),
143145
STR_PROP(kTaskMaxPartialAggregationMemory, "16MB"),
144146
NUM_PROP(kHttpServerNumIoThreadsHwMultiplier, 1.0),
@@ -341,6 +343,14 @@ int32_t SystemConfig::maxDriversPerTask() const {
341343
return optionalProperty<int32_t>(kMaxDriversPerTask).value();
342344
}
343345

346+
folly::Optional<int32_t> SystemConfig::taskWriterCount() const {
347+
return optionalProperty<int32_t>(kTaskWriterCount);
348+
}
349+
350+
folly::Optional<int32_t> SystemConfig::taskPartitionedWriterCount() const {
351+
return optionalProperty<int32_t>(kTaskPartitionedWriterCount);
352+
}
353+
344354
int32_t SystemConfig::concurrentLifespansPerTask() const {
345355
return optionalProperty<int32_t>(kConcurrentLifespansPerTask).value();
346356
}
@@ -876,7 +886,7 @@ void BaseVeloxQueryConfig::updateLoadedValues(
876886
auto systemConfig = SystemConfig::instance();
877887

878888
using namespace velox::core;
879-
const std::unordered_map<std::string, std::string> updatedValues{
889+
std::unordered_map<std::string, std::string> updatedValues{
880890
{QueryConfig::kPrestoArrayAggIgnoreNulls,
881891
bool2String(systemConfig->useLegacyArrayAgg())},
882892
{QueryConfig::kMaxOutputBufferSize,
@@ -890,6 +900,17 @@ void BaseVeloxQueryConfig::updateLoadedValues(
890900
SystemConfig::kTaskMaxPartialAggregationMemory)},
891901
};
892902

903+
auto taskWriterCount = systemConfig->taskWriterCount();
904+
if (taskWriterCount.has_value()) {
905+
updatedValues[QueryConfig::kTaskWriterCount] =
906+
std::to_string(taskWriterCount.value());
907+
}
908+
auto taskPartitionedWriterCount = systemConfig->taskPartitionedWriterCount();
909+
if (taskPartitionedWriterCount.has_value()) {
910+
updatedValues[QueryConfig::kTaskPartitionedWriterCount] =
911+
std::to_string(taskPartitionedWriterCount.value());
912+
}
913+
893914
std::stringstream updated;
894915
for (const auto& pair : updatedValues) {
895916
updated << " " << pair.first << "=" << pair.second << "\n";

presto-native-execution/presto_cpp/main/common/Configs.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,9 @@ class SystemConfig : public ConfigBase {
179179
static constexpr std::string_view kDiscoveryUri{"discovery.uri"};
180180
static constexpr std::string_view kMaxDriversPerTask{
181181
"task.max-drivers-per-task"};
182+
static constexpr std::string_view kTaskWriterCount{"task.writer-count"};
183+
static constexpr std::string_view kTaskPartitionedWriterCount{
184+
"task.partitioned-writer-count"};
182185
static constexpr std::string_view kConcurrentLifespansPerTask{
183186
"task.concurrent-lifespans-per-task"};
184187
static constexpr std::string_view kTaskMaxPartialAggregationMemory{
@@ -695,6 +698,10 @@ class SystemConfig : public ConfigBase {
695698

696699
int32_t maxDriversPerTask() const;
697700

701+
folly::Optional<int32_t> taskWriterCount() const;
702+
703+
folly::Optional<int32_t> taskPartitionedWriterCount() const;
704+
698705
int32_t concurrentLifespansPerTask() const;
699706

700707
double httpServerNumIoThreadsHwMultiplier() const;

0 commit comments

Comments
 (0)