Skip to content

Commit 942e648

Browse files
vhsu14facebook-github-bot
authored andcommitted
Migrate TaskStatus, TaskUpdateRequest, TaskInfo to Thrift with JSON fields in CPP (prestodb#25079)
Summary: Pull Request resolved: prestodb#25079 Differential Revision: D72886878
1 parent 8dd2b4c commit 942e648

23 files changed

+2106
-247
lines changed

presto-native-execution/presto_cpp/main/TaskResource.cpp

Lines changed: 47 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -210,25 +210,35 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTaskImpl(
210210
const protocol::TaskId& taskId,
211211
const std::string& updateJson,
212212
const bool summarize,
213-
long startProcessCpuTime)>& createOrUpdateFunc) {
213+
long startProcessCpuTime,
214+
bool receiveThrift)>& createOrUpdateFunc) {
214215
protocol::TaskId taskId = pathMatch[1];
215216
bool summarize = message->hasQueryParam("summarize");
217+
218+
auto& headers = message->getHeaders();
219+
auto acceptHeader = headers.getSingleOrEmpty(proxygen::HTTP_HEADER_ACCEPT);
220+
auto sendThrift =
221+
acceptHeader.find(http::kMimeTypeApplicationThrift) != std::string::npos;
222+
auto contentHeader = headers.getSingleOrEmpty(proxygen::HTTP_HEADER_CONTENT_TYPE);
223+
auto receiveThrift =
224+
contentHeader.find(http::kMimeTypeApplicationThrift) != std::string::npos;
225+
216226
return new http::CallbackRequestHandler(
217-
[this, taskId, summarize, createOrUpdateFunc](
227+
[this, taskId, summarize, createOrUpdateFunc, sendThrift, receiveThrift](
218228
proxygen::HTTPMessage* /*message*/,
219229
const std::vector<std::unique_ptr<folly::IOBuf>>& body,
220230
proxygen::ResponseHandler* downstream,
221231
std::shared_ptr<http::CallbackRequestHandlerState> handlerState) {
222232
folly::via(
223233
httpSrvCpuExecutor_,
224-
[this, &body, taskId, summarize, createOrUpdateFunc]() {
234+
[this, &body, taskId, summarize, createOrUpdateFunc, receiveThrift]() {
225235
const auto startProcessCpuTimeNs = util::getProcessCpuTimeNs();
226236
std::string updateJson = util::extractMessageBody(body);
227237

228238
std::unique_ptr<protocol::TaskInfo> taskInfo;
229239
try {
230240
taskInfo = createOrUpdateFunc(
231-
taskId, updateJson, summarize, startProcessCpuTimeNs);
241+
taskId, updateJson, summarize, startProcessCpuTimeNs, receiveThrift);
232242
} catch (const velox::VeloxException& e) {
233243
// Creating an empty task, putting errors inside so that next
234244
// status fetch from coordinator will catch the error and well
@@ -243,12 +253,19 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTaskImpl(
243253
throw;
244254
}
245255
}
246-
return json(*taskInfo);
256+
return taskInfo;
247257
})
248258
.via(folly::EventBaseManager::get()->getEventBase())
249-
.thenValue([downstream, handlerState](auto&& taskInfoJson) {
259+
.thenValue([downstream, handlerState, sendThrift](std::unique_ptr<protocol::TaskInfo> taskInfo) {
250260
if (!handlerState->requestExpired()) {
251-
http::sendOkResponse(downstream, taskInfoJson);
261+
if (sendThrift) {
262+
thrift::TaskInfo thriftTaskInfo;
263+
toThrift(*taskInfo, thriftTaskInfo);
264+
http::sendOkThriftResponse(
265+
downstream, thriftWrite(thriftTaskInfo));
266+
} else {
267+
http::sendOkResponse(downstream, json(*taskInfo));
268+
}
252269
}
253270
})
254271
.thenError(
@@ -277,7 +294,8 @@ proxygen::RequestHandler* TaskResource::createOrUpdateBatchTask(
277294
[&](const protocol::TaskId& taskId,
278295
const std::string& updateJson,
279296
const bool summarize,
280-
long startProcessCpuTime) {
297+
long startProcessCpuTime,
298+
bool /*receiveThrift*/) {
281299
protocol::BatchTaskUpdateRequest batchUpdateRequest =
282300
json::parse(updateJson);
283301
auto updateRequest = batchUpdateRequest.taskUpdateRequest;
@@ -329,13 +347,25 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTask(
329347
[&](const protocol::TaskId& taskId,
330348
const std::string& updateJson,
331349
const bool summarize,
332-
long startProcessCpuTime) {
333-
protocol::TaskUpdateRequest updateRequest = json::parse(updateJson);
350+
long startProcessCpuTime,
351+
bool receiveThrift) {
352+
protocol::TaskUpdateRequest updateRequest;
353+
if (receiveThrift) {
354+
auto thriftTaskUpdateRequest = std::make_shared<thrift::TaskUpdateRequest>();
355+
thriftRead(updateJson, thriftTaskUpdateRequest);
356+
fromThrift(*thriftTaskUpdateRequest, updateRequest);
357+
} else {
358+
updateRequest = json::parse(updateJson);
359+
}
334360
velox::core::PlanFragment planFragment;
335361
std::shared_ptr<velox::core::QueryCtx> queryCtx;
336362
if (updateRequest.fragment) {
337-
auto fragment =
338-
velox::encoding::Base64::decode(*updateRequest.fragment);
363+
std::string fragment;
364+
if (receiveThrift) {
365+
fragment = *updateRequest.fragment;
366+
} else {
367+
fragment = velox::encoding::Base64::decode(*updateRequest.fragment);
368+
}
339369
protocol::PlanFragment prestoPlan = json::parse(fragment);
340370

341371
queryCtx =
@@ -511,11 +541,11 @@ proxygen::RequestHandler* TaskResource::getTaskStatus(
511541

512542
auto& headers = message->getHeaders();
513543
auto acceptHeader = headers.getSingleOrEmpty(proxygen::HTTP_HEADER_ACCEPT);
514-
auto useThrift =
544+
auto sendThrift =
515545
acceptHeader.find(http::kMimeTypeApplicationThrift) != std::string::npos;
516546

517547
return new http::CallbackRequestHandler(
518-
[this, useThrift, taskId, currentState, maxWait](
548+
[this, sendThrift, taskId, currentState, maxWait](
519549
proxygen::HTTPMessage* /*message*/,
520550
const std::vector<std::unique_ptr<folly::IOBuf>>& /*body*/,
521551
proxygen::ResponseHandler* downstream,
@@ -525,7 +555,7 @@ proxygen::RequestHandler* TaskResource::getTaskStatus(
525555
httpSrvCpuExecutor_,
526556
[this,
527557
evb,
528-
useThrift,
558+
sendThrift,
529559
taskId,
530560
currentState,
531561
maxWait,
@@ -535,10 +565,10 @@ proxygen::RequestHandler* TaskResource::getTaskStatus(
535565
.getTaskStatus(taskId, currentState, maxWait, handlerState)
536566
.via(evb)
537567
.thenValue(
538-
[useThrift, downstream, taskId, handlerState](
568+
[sendThrift, downstream, taskId, handlerState](
539569
std::unique_ptr<protocol::TaskStatus> taskStatus) {
540570
if (!handlerState->requestExpired()) {
541-
if (useThrift) {
571+
if (sendThrift) {
542572
thrift::TaskStatus thriftTaskStatus;
543573
toThrift(*taskStatus, thriftTaskStatus);
544574
http::sendOkThriftResponse(

presto-native-execution/presto_cpp/main/TaskResource.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ class TaskResource {
7676
const protocol::TaskId&,
7777
const std::string&,
7878
const bool,
79-
long)>& createOrUpdateFunc);
79+
long,
80+
const bool)>& createOrUpdateFunc);
8081

8182
proxygen::RequestHandler* deleteTask(
8283
proxygen::HTTPMessage* message,

presto-native-execution/presto_cpp/main/common/tests/test_json.h

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,14 @@
1414
#pragma once
1515

1616
#include <fstream>
17-
#include <ios>
1817
#include <iosfwd>
18+
#include <boost/filesystem.hpp>
19+
#include <boost/algorithm/string.hpp>
1920

2021
#include "presto_cpp/presto_protocol/core/presto_protocol_core.h"
2122

23+
namespace fs = boost::filesystem;
24+
2225
namespace nlohmann {
2326

2427
// This is required avoid stack overflow when a gtest error printer is invoked.
@@ -48,3 +51,19 @@ inline std::string slurp(const std::string& path) {
4851
buf << input.rdbuf();
4952
return buf.str();
5053
}
54+
55+
inline std::string getDataPath(const std::string& dirUnderFbcode, const std::string& fileName) {
56+
std::string currentPath = fs::current_path().c_str();
57+
if (boost::algorithm::ends_with(currentPath, "fbcode")) {
58+
return currentPath + dirUnderFbcode + fileName;
59+
}
60+
61+
// CLion runs the tests from cmake-build-release/ or cmake-build-debug/
62+
// directory. Hard-coded json files are not copied there and test fails with
63+
// file not found. Fixing the path so that we can trigger these tests from
64+
// CLion.
65+
boost::algorithm::replace_all(currentPath, "cmake-build-release/", "");
66+
boost::algorithm::replace_all(currentPath, "cmake-build-debug/", "");
67+
68+
return currentPath + "/data/" + fileName;
69+
}

presto-native-execution/presto_cpp/main/thrift/Makefile

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,21 +10,20 @@
1010
# See the License for the specific language governing permissions and
1111
# limitations under the License.
1212

13-
all: ProtocolToThrift.h ProtocolToThrift.cpp
13+
# all: ProtocolToThrift.h ProtocolToThrift.cpp
1414

15-
ProtocolToThrift.h: ProtocolToThrift-hpp.mustache presto_protocol-to-thrift-json.json
16-
echo "// DO NOT EDIT : This file is generated by presto_protocol-to-thrift-json.py" > ProtocolToThrift.h
17-
chevron -d presto_protocol-to-thrift-json.json ProtocolToThrift-hpp.mustache >> ProtocolToThrift.h
18-
clang-format -style=file -i ProtocolToThrift.h
15+
# ProtocolToThrift.h: ProtocolToThrift-hpp.mustache presto_protocol-to-thrift-json.json
16+
# echo "// DO NOT EDIT : This file is generated by presto_protocol-to-thrift-json.py" > ProtocolToThrift.h
17+
# chevron -d presto_protocol-to-thrift-json.json ProtocolToThrift-hpp.mustache >> ProtocolToThrift.h
18+
# clang-format -style=file -i ProtocolToThrift.h
1919

20-
ProtocolToThrift.cpp: ProtocolToThrift-cpp.mustache presto_protocol-to-thrift-json.json
21-
echo "// DO NOT EDIT : This file is generated by presto_protocol-to-thrift-json.py" > ProtocolToThrift.cpp
22-
chevron -d presto_protocol-to-thrift-json.json ProtocolToThrift-cpp.mustache >> ProtocolToThrift.cpp
23-
clang-format -style=file -i ProtocolToThrift.cpp
20+
# ProtocolToThrift.cpp: ProtocolToThrift-cpp.mustache presto_protocol-to-thrift-json.json
21+
# echo "// DO NOT EDIT : This file is generated by presto_protocol-to-thrift-json.py" > ProtocolToThrift.cpp
22+
# chevron -d presto_protocol-to-thrift-json.json ProtocolToThrift-cpp.mustache >> ProtocolToThrift.cpp
23+
# clang-format -style=file -i ProtocolToThrift.cpp
2424

25-
presto_protocol-to-thrift-json.json: presto_protocol-to-thrift-json.py presto_protocol-to-thrift-json.yml presto_thrift.json ../../presto_protocol/presto_protocol.json
26-
./presto_protocol-to-thrift-json.py presto_thrift.json ../../presto_protocol/presto_protocol.json | jq . > presto_protocol-to-thrift-json.json
25+
# presto_protocol-to-thrift-json.json: presto_protocol-to-thrift-json.py presto_protocol-to-thrift-json.yml presto_thrift.json ../../presto_protocol/presto_protocol.json
26+
# ./presto_protocol-to-thrift-json.py presto_thrift.json ../../presto_protocol/presto_protocol.json | jq . > presto_protocol-to-thrift-json.json
2727

28-
presto_thrift.json: presto_thrift.thrift ./thrift2json.py
29-
./thrift2json.py presto_thrift.thrift | jq . > presto_thrift.json
30-
28+
# presto_thrift.json: presto_thrift.thrift ./thrift2json.py
29+
# ./thrift2json.py presto_thrift.thrift | jq . > presto_thrift.json

0 commit comments

Comments
 (0)