Skip to content

Commit 030e120

Browse files
committed
[native] Add ability to cancel Tasks with stuck Drivers.
1 parent d849304 commit 030e120

File tree

5 files changed

+57
-5
lines changed

5 files changed

+57
-5
lines changed

presto-native-execution/presto_cpp/main/TaskManager.cpp

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1098,16 +1098,47 @@ bool TaskManager::getStuckOpCalls(
10981098
std::vector<velox::exec::Task::OpCallInfo>& stuckOpCalls) const {
10991099
const auto thresholdDurationMs =
11001100
SystemConfig::instance()->driverStuckOperatorThresholdMs();
1101+
const auto thresholdCancelMs =
1102+
SystemConfig::instance()
1103+
->driverCancelTasksWithStuckOperatorsThresholdMs();
1104+
stuckOpCalls.clear();
1105+
11011106
const std::chrono::milliseconds lockTimeoutMs(thresholdDurationMs);
11021107
auto taskMap = taskMap_.rlock(lockTimeoutMs);
11031108
if (!taskMap) {
11041109
return false;
11051110
}
1106-
for (const auto& [_, prestoTask] : *taskMap) {
1107-
if (prestoTask->task != nullptr &&
1108-
!prestoTask->task->getLongRunningOpCalls(
1109-
lockTimeoutMs, thresholdDurationMs, stuckOpCalls)) {
1110-
deadlockTasks.push_back(prestoTask->task->taskId());
1111+
1112+
for (const auto& [id, prestoTask] : *taskMap) {
1113+
if (prestoTask->task != nullptr) {
1114+
const auto numPrevStuckOps = stuckOpCalls.size();
1115+
if (!prestoTask->task->getLongRunningOpCalls(
1116+
lockTimeoutMs, thresholdDurationMs, stuckOpCalls)) {
1117+
deadlockTasks.push_back(id);
1118+
continue;
1119+
}
1120+
// See if we need to cancel the Task - it should be running, the cancel
1121+
// threshold should be valid and it should have at least one stuck driver
1122+
// that was stuck for enough time.
1123+
if (numPrevStuckOps < stuckOpCalls.size() && thresholdCancelMs != 0 &&
1124+
prestoTask->task->isRunning()) {
1125+
for (auto it = stuckOpCalls.begin() + numPrevStuckOps;
1126+
it != stuckOpCalls.end();
1127+
++it) {
1128+
if (it->durationMs >= thresholdCancelMs) {
1129+
std::stringstream ss;
1130+
ss << "Task " << id
1131+
<< " cancelled due to stuck operator: tid=" << it->tid
1132+
<< " opCall=" << it->opCall
1133+
<< " duration= " << velox::succinctMillis(it->durationMs);
1134+
const std::string msg = ss.str();
1135+
LOG(ERROR) << msg;
1136+
prestoTask->task->setError(msg);
1137+
RECORD_METRIC_VALUE(kCounterNumCancelledTasksByStuckDriver, 1);
1138+
break;
1139+
}
1140+
}
1141+
}
11111142
}
11121143
}
11131144
return true;

presto-native-execution/presto_cpp/main/common/Configs.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,8 @@ SystemConfig::SystemConfig() {
157157
NUM_PROP(kDriverNumCpuThreadsHwMultiplier, 4.0),
158158
BOOL_PROP(kDriverThreadsBatchSchedulingEnabled, false),
159159
NUM_PROP(kDriverStuckOperatorThresholdMs, 30 * 60 * 1000),
160+
NUM_PROP(
161+
kDriverCancelTasksWithStuckOperatorsThresholdMs, 40 * 60 * 1000),
160162
NUM_PROP(kSpillerNumCpuThreadsHwMultiplier, 1.0),
161163
STR_PROP(kSpillerFileCreateConfig, ""),
162164
NONE_PROP(kSpillerSpillPath),
@@ -377,6 +379,12 @@ size_t SystemConfig::driverStuckOperatorThresholdMs() const {
377379
return optionalProperty<size_t>(kDriverStuckOperatorThresholdMs).value();
378380
}
379381

382+
size_t SystemConfig::driverCancelTasksWithStuckOperatorsThresholdMs() const {
383+
return optionalProperty<size_t>(
384+
kDriverCancelTasksWithStuckOperatorsThresholdMs)
385+
.value();
386+
}
387+
380388
double SystemConfig::spillerNumCpuThreadsHwMultiplier() const {
381389
return optionalProperty<double>(kSpillerNumCpuThreadsHwMultiplier).value();
382390
}

presto-native-execution/presto_cpp/main/common/Configs.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,13 @@ class SystemConfig : public ConfigBase {
235235
static constexpr std::string_view kDriverStuckOperatorThresholdMs{
236236
"driver.stuck-operator-threshold-ms"};
237237

238+
/// Immediately cancels any Task when it is detected that it has at least one
239+
/// stuck Operator for at least the time specified by this threshold.
240+
/// Use zero to disable canceling.
241+
static constexpr std::string_view
242+
kDriverCancelTasksWithStuckOperatorsThresholdMs{
243+
"driver.cancel-tasks-with-stuck-operators-threshold-ms"};
244+
238245
/// Floating point number used in calculating how many threads we would use
239246
/// for Spiller CPU executor: hw_concurrency x multiplier.
240247
/// If 0.0 then spilling is disabled.
@@ -705,6 +712,8 @@ class SystemConfig : public ConfigBase {
705712

706713
size_t driverStuckOperatorThresholdMs() const;
707714

715+
size_t driverCancelTasksWithStuckOperatorsThresholdMs() const;
716+
708717
double spillerNumCpuThreadsHwMultiplier() const;
709718

710719
std::string spillerFileCreateConfig() const;

presto-native-execution/presto_cpp/main/common/Counters.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ void registerPrestoMetrics() {
4444
DEFINE_METRIC(kCounterNumZombiePrestoTasks, facebook::velox::StatType::AVG);
4545
DEFINE_METRIC(
4646
kCounterNumTasksWithStuckOperator, facebook::velox::StatType::AVG);
47+
DEFINE_METRIC(
48+
kCounterNumCancelledTasksByStuckDriver, facebook::velox::StatType::COUNT);
4749
DEFINE_METRIC(kCounterNumTasksDeadlock, facebook::velox::StatType::AVG);
4850
DEFINE_METRIC(
4951
kCounterNumTaskManagerLockTimeOut, facebook::velox::StatType::AVG);

presto-native-execution/presto_cpp/main/common/Counters.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ constexpr folly::StringPiece kCounterNumZombiePrestoTasks{
6767
"presto_cpp.num_zombie_presto_tasks"};
6868
constexpr folly::StringPiece kCounterNumTasksWithStuckOperator{
6969
"presto_cpp.num_tasks_with_stuck_operator"};
70+
constexpr folly::StringPiece kCounterNumCancelledTasksByStuckDriver{
71+
"presto_cpp.num_cancelled_tasks_by_stuck_driver"};
7072
constexpr folly::StringPiece kCounterNumTasksDeadlock{
7173
"presto_cpp.num_tasks_deadlock"};
7274
constexpr folly::StringPiece kCounterNumTaskManagerLockTimeOut{

0 commit comments

Comments
 (0)