Skip to content

Commit 3b92ed6

Browse files
vhsu14AnuragKDwivedi
authored andcommitted
[native] Migrate TaskStatus, TaskUpdateRequest, TaskInfo to Thrift with JSON fields in CPP (prestodb#25079)
## Description This PR depends on prestodb#25020. Changes include: - Use IDL generated by coordinator and make relevant changes to enable thrift for TaskStatus, TaskUpdateRequest, and TaskInfo on native worker. - Remove old pipeline of using python, json, and chevron templates for producing C++ code. ## Motivation and Context We observed that coordinator can spend too much cpu/heap memory on json serde for taskUpdateRequest. ## Impact <!---Describe any public API or user-facing feature change or any performance impact--> ## Test Plan Verifier ## Contributor checklist - [ ] Please make sure your submission complies with our [contributing guide](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md), in particular [code style](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#code-style) and [commit standards](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#commit-standards). - [ ] PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced. - [ ] Documented new properties (with its default value), SQL syntax, functions, or other functionality. - [ ] If release notes are required, they follow the [release notes guidelines](https://github.com/prestodb/presto/wiki/Release-Notes-Guidelines). - [ ] Adequate tests were added if applicable. - [ ] CI passed. ## Release Notes Please follow [release notes guidelines](https://github.com/prestodb/presto/wiki/Release-Notes-Guidelines) and fill in the release notes below. ``` == RELEASE NOTES == General Changes * Improve communication between coordinator and worker with thrift serde. ```
1 parent e9a1938 commit 3b92ed6

30 files changed

+2141
-1369
lines changed

presto-native-execution/presto_cpp/main/TaskResource.cpp

Lines changed: 48 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -208,27 +208,37 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTaskImpl(
208208
const std::vector<std::string>& pathMatch,
209209
const std::function<std::unique_ptr<protocol::TaskInfo>(
210210
const protocol::TaskId& taskId,
211-
const std::string& updateJson,
211+
const std::string& requestBody,
212212
const bool summarize,
213-
long startProcessCpuTime)>& createOrUpdateFunc) {
213+
long startProcessCpuTime,
214+
bool receiveThrift)>& createOrUpdateFunc) {
214215
protocol::TaskId taskId = pathMatch[1];
215216
bool summarize = message->hasQueryParam("summarize");
217+
218+
auto& headers = message->getHeaders();
219+
const auto& acceptHeader = headers.getSingleOrEmpty(proxygen::HTTP_HEADER_ACCEPT);
220+
const auto sendThrift =
221+
acceptHeader.find(http::kMimeTypeApplicationThrift) != std::string::npos;
222+
const auto& contentHeader = headers.getSingleOrEmpty(proxygen::HTTP_HEADER_CONTENT_TYPE);
223+
const auto receiveThrift =
224+
contentHeader.find(http::kMimeTypeApplicationThrift) != std::string::npos;
225+
216226
return new http::CallbackRequestHandler(
217-
[this, taskId, summarize, createOrUpdateFunc](
227+
[this, taskId, summarize, createOrUpdateFunc, sendThrift, receiveThrift](
218228
proxygen::HTTPMessage* /*message*/,
219229
const std::vector<std::unique_ptr<folly::IOBuf>>& body,
220230
proxygen::ResponseHandler* downstream,
221231
std::shared_ptr<http::CallbackRequestHandlerState> handlerState) {
222232
folly::via(
223233
httpSrvCpuExecutor_,
224-
[this, &body, taskId, summarize, createOrUpdateFunc]() {
234+
[this, &body, taskId, summarize, createOrUpdateFunc, receiveThrift]() {
225235
const auto startProcessCpuTimeNs = util::getProcessCpuTimeNs();
226-
std::string updateJson = util::extractMessageBody(body);
236+
std::string requestBody = util::extractMessageBody(body);
227237

228238
std::unique_ptr<protocol::TaskInfo> taskInfo;
229239
try {
230240
taskInfo = createOrUpdateFunc(
231-
taskId, updateJson, summarize, startProcessCpuTimeNs);
241+
taskId, requestBody, summarize, startProcessCpuTimeNs, receiveThrift);
232242
} catch (const velox::VeloxException& e) {
233243
// Creating an empty task, putting errors inside so that next
234244
// status fetch from coordinator will catch the error and well
@@ -243,12 +253,19 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTaskImpl(
243253
throw;
244254
}
245255
}
246-
return json(*taskInfo);
256+
return taskInfo;
247257
})
248258
.via(folly::EventBaseManager::get()->getEventBase())
249-
.thenValue([downstream, handlerState](auto&& taskInfoJson) {
259+
.thenValue([downstream, handlerState, sendThrift](auto taskInfo) {
250260
if (!handlerState->requestExpired()) {
251-
http::sendOkResponse(downstream, taskInfoJson);
261+
if (sendThrift) {
262+
thrift::TaskInfo thriftTaskInfo;
263+
toThrift(*taskInfo, thriftTaskInfo);
264+
http::sendOkThriftResponse(
265+
downstream, thriftWrite(thriftTaskInfo));
266+
} else {
267+
http::sendOkResponse(downstream, json(*taskInfo));
268+
}
252269
}
253270
})
254271
.thenError(
@@ -275,11 +292,12 @@ proxygen::RequestHandler* TaskResource::createOrUpdateBatchTask(
275292
message,
276293
pathMatch,
277294
[&](const protocol::TaskId& taskId,
278-
const std::string& updateJson,
295+
const std::string& requestBody,
279296
const bool summarize,
280-
long startProcessCpuTime) {
297+
long startProcessCpuTime,
298+
bool /*receiveThrift*/) {
281299
protocol::BatchTaskUpdateRequest batchUpdateRequest =
282-
json::parse(updateJson);
300+
json::parse(requestBody);
283301
auto updateRequest = batchUpdateRequest.taskUpdateRequest;
284302
VELOX_USER_CHECK_NOT_NULL(updateRequest.fragment);
285303

@@ -327,16 +345,22 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTask(
327345
message,
328346
pathMatch,
329347
[&](const protocol::TaskId& taskId,
330-
const std::string& updateJson,
348+
const std::string& requestBody,
331349
const bool summarize,
332-
long startProcessCpuTime) {
333-
protocol::TaskUpdateRequest updateRequest = json::parse(updateJson);
350+
long startProcessCpuTime,
351+
bool receiveThrift) {
352+
protocol::TaskUpdateRequest updateRequest;
353+
if (receiveThrift) {
354+
auto thriftTaskUpdateRequest = std::make_shared<thrift::TaskUpdateRequest>();
355+
thriftRead(requestBody, thriftTaskUpdateRequest);
356+
fromThrift(*thriftTaskUpdateRequest, updateRequest);
357+
} else {
358+
updateRequest = json::parse(requestBody);
359+
}
334360
velox::core::PlanFragment planFragment;
335361
std::shared_ptr<velox::core::QueryCtx> queryCtx;
336362
if (updateRequest.fragment) {
337-
auto fragment =
338-
velox::encoding::Base64::decode(*updateRequest.fragment);
339-
protocol::PlanFragment prestoPlan = json::parse(fragment);
363+
protocol::PlanFragment prestoPlan = json::parse(receiveThrift ? *updateRequest.fragment : velox::encoding::Base64::decode(*updateRequest.fragment));
340364

341365
queryCtx =
342366
taskManager_.getQueryContextManager()->findOrCreateQueryCtx(
@@ -510,12 +534,12 @@ proxygen::RequestHandler* TaskResource::getTaskStatus(
510534
auto maxWait = getMaxWait(message);
511535

512536
auto& headers = message->getHeaders();
513-
auto acceptHeader = headers.getSingleOrEmpty(proxygen::HTTP_HEADER_ACCEPT);
514-
auto useThrift =
537+
const auto& acceptHeader = headers.getSingleOrEmpty(proxygen::HTTP_HEADER_ACCEPT);
538+
const auto sendThrift =
515539
acceptHeader.find(http::kMimeTypeApplicationThrift) != std::string::npos;
516540

517541
return new http::CallbackRequestHandler(
518-
[this, useThrift, taskId, currentState, maxWait](
542+
[this, sendThrift, taskId, currentState, maxWait](
519543
proxygen::HTTPMessage* /*message*/,
520544
const std::vector<std::unique_ptr<folly::IOBuf>>& /*body*/,
521545
proxygen::ResponseHandler* downstream,
@@ -525,7 +549,7 @@ proxygen::RequestHandler* TaskResource::getTaskStatus(
525549
httpSrvCpuExecutor_,
526550
[this,
527551
evb,
528-
useThrift,
552+
sendThrift,
529553
taskId,
530554
currentState,
531555
maxWait,
@@ -535,10 +559,10 @@ proxygen::RequestHandler* TaskResource::getTaskStatus(
535559
.getTaskStatus(taskId, currentState, maxWait, handlerState)
536560
.via(evb)
537561
.thenValue(
538-
[useThrift, downstream, taskId, handlerState](
562+
[sendThrift, downstream, taskId, handlerState](
539563
std::unique_ptr<protocol::TaskStatus> taskStatus) {
540564
if (!handlerState->requestExpired()) {
541-
if (useThrift) {
565+
if (sendThrift) {
542566
thrift::TaskStatus thriftTaskStatus;
543567
toThrift(*taskStatus, thriftTaskStatus);
544568
http::sendOkThriftResponse(

presto-native-execution/presto_cpp/main/TaskResource.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ class TaskResource {
7676
const protocol::TaskId&,
7777
const std::string&,
7878
const bool,
79-
long)>& createOrUpdateFunc);
79+
long,
80+
const bool)>& createOrUpdateFunc);
8081

8182
proxygen::RequestHandler* deleteTask(
8283
proxygen::HTTPMessage* message,

presto-native-execution/presto_cpp/main/common/tests/test_json.h

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,14 @@
1414
#pragma once
1515

1616
#include <fstream>
17-
#include <ios>
1817
#include <iosfwd>
18+
#include <boost/filesystem.hpp>
19+
#include <boost/algorithm/string.hpp>
1920

2021
#include "presto_cpp/presto_protocol/core/presto_protocol_core.h"
2122

23+
namespace fs = boost::filesystem;
24+
2225
namespace nlohmann {
2326

2427
// This is required avoid stack overflow when a gtest error printer is invoked.
@@ -48,3 +51,19 @@ inline std::string slurp(const std::string& path) {
4851
buf << input.rdbuf();
4952
return buf.str();
5053
}
54+
55+
inline std::string getDataPath(const std::string& dirUnderFbcode, const std::string& fileName) {
56+
std::string currentPath = fs::current_path().c_str();
57+
if (boost::algorithm::ends_with(currentPath, "fbcode")) {
58+
return currentPath + dirUnderFbcode + fileName;
59+
}
60+
61+
// CLion runs the tests from cmake-build-release/ or cmake-build-debug/
62+
// directory. Hard-coded json files are not copied there and test fails with
63+
// file not found. Fixing the path so that we can trigger these tests from
64+
// CLion.
65+
boost::algorithm::replace_all(currentPath, "cmake-build-release/", "");
66+
boost::algorithm::replace_all(currentPath, "cmake-build-debug/", "");
67+
68+
return currentPath + "/data/" + fileName;
69+
}

presto-native-execution/presto_cpp/main/tests/CMakeLists.txt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@ add_executable(
2222
ServerOperationTest.cpp
2323
SessionPropertiesTest.cpp
2424
TaskManagerTest.cpp
25-
QueryContextManagerTest.cpp)
25+
QueryContextManagerTest.cpp
26+
TaskInfoTest.cpp
27+
TaskStatusTest.cpp
28+
TaskUpdateRequestTest.cpp)
2629

2730
if(DEFINED PRESTO_MEMORY_CHECKER_TYPE AND PRESTO_MEMORY_CHECKER_TYPE STREQUAL
2831
"LINUX_MEMORY_CHECKER")
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
15+
#include <gtest/gtest.h>
16+
#include "presto_cpp/main/thrift/ProtocolToThrift.h"
17+
#include "presto_cpp/presto_protocol/core/Duration.h"
18+
#include "presto_cpp/main/common/tests/test_json.h"
19+
#include "presto_cpp/main/connectors/PrestoToVeloxConnector.h"
20+
21+
using namespace facebook;
22+
using namespace facebook::presto::protocol;
23+
24+
class TaskInfoTest : public ::testing::Test {};
25+
26+
const std::string BASE_DATA_PATH = "/github/presto-trunk/presto-native-execution/presto_cpp/main/tests/data/";
27+
28+
TEST_F(TaskInfoTest, duration) {
29+
double thrift = 0;
30+
facebook::presto::thrift::toThrift(Duration(123, TimeUnit::MILLISECONDS), thrift);
31+
ASSERT_EQ(thrift, 123);
32+
}
33+
34+
TEST_F(TaskInfoTest, binaryMetadataUpdates) {
35+
std::string str = slurp(getDataPath(BASE_DATA_PATH, "MetadataUpdates.json"));
36+
json j = json::parse(str);
37+
registerPrestoToVeloxConnector(std::make_unique<facebook::presto::HivePrestoToVeloxConnector>("hive"));
38+
MetadataUpdates metadataUpdates = j;
39+
std::unique_ptr<std::string> thriftMetadataUpdates = std::make_unique<std::string>();
40+
facebook::presto::thrift::toThrift(metadataUpdates, *thriftMetadataUpdates);
41+
42+
json thriftJson = json::parse(*thriftMetadataUpdates);
43+
ASSERT_EQ(j, thriftJson);
44+
45+
presto::unregisterPrestoToVeloxConnector("hive");
46+
}
47+
48+
TEST_F(TaskInfoTest, taskInfo) {
49+
std::string str = slurp(getDataPath(BASE_DATA_PATH, "TaskInfo.json"));
50+
json j = json::parse(str);
51+
registerPrestoToVeloxConnector(std::make_unique<facebook::presto::HivePrestoToVeloxConnector>("hive"));
52+
TaskInfo taskInfo = j;
53+
facebook::presto::thrift::TaskInfo thriftTaskInfo;
54+
facebook::presto::thrift::toThrift(taskInfo, thriftTaskInfo);
55+
56+
json thriftJson = json::parse(*thriftTaskInfo.metadataUpdates()->metadataUpdates());
57+
ASSERT_EQ(taskInfo.metadataUpdates, thriftJson);
58+
ASSERT_EQ(thriftTaskInfo.needsPlan(), false);
59+
ASSERT_EQ(thriftTaskInfo.outputBuffers()->buffers()->size(), 2);
60+
ASSERT_EQ(thriftTaskInfo.outputBuffers()->buffers()[0].bufferId()->id(), 100);
61+
ASSERT_EQ(thriftTaskInfo.outputBuffers()->buffers()[1].bufferId()->id(), 200);
62+
ASSERT_EQ(thriftTaskInfo.stats()->blockedReasons()->count(facebook::presto::thrift::BlockedReason::WAITING_FOR_MEMORY), 1);
63+
ASSERT_EQ(thriftTaskInfo.stats()->runtimeStats()->metrics()->size(), 2);
64+
ASSERT_EQ(thriftTaskInfo.stats()->runtimeStats()->metrics()["test_metric1"].sum(), 123);
65+
ASSERT_EQ(thriftTaskInfo.stats()->runtimeStats()->metrics()["test_metric2"].name(), "test_metric2");
66+
67+
presto::unregisterPrestoToVeloxConnector("hive");
68+
}
69+
70+
TEST_F(TaskInfoTest, taskId) {
71+
TaskId taskId = "queryId.1.2.3.4";
72+
facebook::presto::thrift::TaskId thriftTaskId;
73+
facebook::presto::thrift::toThrift(taskId, thriftTaskId);
74+
75+
ASSERT_EQ(thriftTaskId.stageExecutionId()->stageId()->queryId(), "queryId");
76+
ASSERT_EQ(thriftTaskId.stageExecutionId()->stageId()->id(), 1);
77+
ASSERT_EQ(thriftTaskId.stageExecutionId()->id(), 2);
78+
ASSERT_EQ(thriftTaskId.id(), 3);
79+
ASSERT_EQ(thriftTaskId.attemptNumber(), 4);
80+
}
81+
82+
83+
TEST_F(TaskInfoTest, operatorStatsEmptyBlockedReason) {
84+
std::string str = slurp(getDataPath(BASE_DATA_PATH, "OperatorStatsEmptyBlockedReason.json"));
85+
json j = json::parse(str);
86+
OperatorStats operatorStats = j;
87+
facebook::presto::thrift::OperatorStats thriftOperatorStats;
88+
facebook::presto::thrift::toThrift(operatorStats, thriftOperatorStats);
89+
90+
ASSERT_EQ(thriftOperatorStats.blockedReason().has_value(), false);
91+
ASSERT_EQ(thriftOperatorStats.blockedWall(), 80);
92+
ASSERT_EQ(thriftOperatorStats.finishCpu(), 1000);
93+
}
94+
95+
TEST_F(TaskInfoTest, operatorStats) {
96+
std::string str = slurp(getDataPath(BASE_DATA_PATH, "OperatorStats.json"));
97+
json j = json::parse(str);
98+
OperatorStats operatorStats = j;
99+
facebook::presto::thrift::OperatorStats thriftOperatorStats;
100+
facebook::presto::thrift::toThrift(operatorStats, thriftOperatorStats);
101+
102+
ASSERT_EQ(thriftOperatorStats.blockedReason(), facebook::presto::thrift::BlockedReason::WAITING_FOR_MEMORY);
103+
}

0 commit comments

Comments
 (0)