@@ -210,25 +210,35 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTaskImpl(
210
210
const protocol::TaskId& taskId,
211
211
const std::string& updateJson,
212
212
const bool summarize,
213
- long startProcessCpuTime)>& createOrUpdateFunc) {
213
+ long startProcessCpuTime,
214
+ bool receiveThrift)>& createOrUpdateFunc) {
214
215
protocol::TaskId taskId = pathMatch[1 ];
215
216
bool summarize = message->hasQueryParam (" summarize" );
217
+
218
+ auto & headers = message->getHeaders ();
219
+ auto acceptHeader = headers.getSingleOrEmpty (proxygen::HTTP_HEADER_ACCEPT);
220
+ auto sendThrift =
221
+ acceptHeader.find (http::kMimeTypeApplicationThrift ) != std::string::npos;
222
+ auto contentHeader = headers.getSingleOrEmpty (proxygen::HTTP_HEADER_CONTENT_TYPE);
223
+ auto receiveThrift =
224
+ contentHeader.find (http::kMimeTypeApplicationThrift ) != std::string::npos;
225
+
216
226
return new http::CallbackRequestHandler (
217
- [this , taskId, summarize, createOrUpdateFunc](
227
+ [this , taskId, summarize, createOrUpdateFunc, sendThrift, receiveThrift ](
218
228
proxygen::HTTPMessage* /* message*/ ,
219
229
const std::vector<std::unique_ptr<folly::IOBuf>>& body,
220
230
proxygen::ResponseHandler* downstream,
221
231
std::shared_ptr<http::CallbackRequestHandlerState> handlerState) {
222
232
folly::via (
223
233
httpSrvCpuExecutor_,
224
- [this , &body, taskId, summarize, createOrUpdateFunc]() {
234
+ [this , &body, taskId, summarize, createOrUpdateFunc, receiveThrift ]() {
225
235
const auto startProcessCpuTimeNs = util::getProcessCpuTimeNs ();
226
236
std::string updateJson = util::extractMessageBody (body);
227
237
228
238
std::unique_ptr<protocol::TaskInfo> taskInfo;
229
239
try {
230
240
taskInfo = createOrUpdateFunc (
231
- taskId, updateJson, summarize, startProcessCpuTimeNs);
241
+ taskId, updateJson, summarize, startProcessCpuTimeNs, receiveThrift );
232
242
} catch (const velox::VeloxException& e) {
233
243
// Creating an empty task, putting errors inside so that next
234
244
// status fetch from coordinator will catch the error and well
@@ -243,12 +253,19 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTaskImpl(
243
253
throw ;
244
254
}
245
255
}
246
- return json (* taskInfo) ;
256
+ return taskInfo;
247
257
})
248
258
.via (folly::EventBaseManager::get ()->getEventBase ())
249
- .thenValue ([downstream, handlerState]( auto && taskInfoJson ) {
259
+ .thenValue ([downstream, handlerState, sendThrift](std::unique_ptr<protocol::TaskInfo> taskInfo ) {
250
260
if (!handlerState->requestExpired ()) {
251
- http::sendOkResponse (downstream, taskInfoJson);
261
+ if (sendThrift) {
262
+ thrift::TaskInfo thriftTaskInfo;
263
+ toThrift (*taskInfo, thriftTaskInfo);
264
+ http::sendOkThriftResponse (
265
+ downstream, thriftWrite (thriftTaskInfo));
266
+ } else {
267
+ http::sendOkResponse (downstream, json (*taskInfo));
268
+ }
252
269
}
253
270
})
254
271
.thenError (
@@ -277,7 +294,8 @@ proxygen::RequestHandler* TaskResource::createOrUpdateBatchTask(
277
294
[&](const protocol::TaskId& taskId,
278
295
const std::string& updateJson,
279
296
const bool summarize,
280
- long startProcessCpuTime) {
297
+ long startProcessCpuTime,
298
+ bool /* receiveThrift*/ ) {
281
299
protocol::BatchTaskUpdateRequest batchUpdateRequest =
282
300
json::parse (updateJson);
283
301
auto updateRequest = batchUpdateRequest.taskUpdateRequest ;
@@ -329,13 +347,25 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTask(
329
347
[&](const protocol::TaskId& taskId,
330
348
const std::string& updateJson,
331
349
const bool summarize,
332
- long startProcessCpuTime) {
333
- protocol::TaskUpdateRequest updateRequest = json::parse (updateJson);
350
+ long startProcessCpuTime,
351
+ bool receiveThrift) {
352
+ protocol::TaskUpdateRequest updateRequest;
353
+ if (receiveThrift) {
354
+ auto thriftTaskUpdateRequest = std::make_shared<thrift::TaskUpdateRequest>();
355
+ thriftRead (updateJson, thriftTaskUpdateRequest);
356
+ fromThrift (*thriftTaskUpdateRequest, updateRequest);
357
+ } else {
358
+ updateRequest = json::parse (updateJson);
359
+ }
334
360
velox::core::PlanFragment planFragment;
335
361
std::shared_ptr<velox::core::QueryCtx> queryCtx;
336
362
if (updateRequest.fragment ) {
337
- auto fragment =
338
- velox::encoding::Base64::decode (*updateRequest.fragment );
363
+ std::string fragment;
364
+ if (receiveThrift) {
365
+ fragment = *updateRequest.fragment ;
366
+ } else {
367
+ fragment = velox::encoding::Base64::decode (*updateRequest.fragment );
368
+ }
339
369
protocol::PlanFragment prestoPlan = json::parse (fragment);
340
370
341
371
queryCtx =
@@ -511,11 +541,11 @@ proxygen::RequestHandler* TaskResource::getTaskStatus(
511
541
512
542
auto & headers = message->getHeaders ();
513
543
auto acceptHeader = headers.getSingleOrEmpty (proxygen::HTTP_HEADER_ACCEPT);
514
- auto useThrift =
544
+ auto sendThrift =
515
545
acceptHeader.find (http::kMimeTypeApplicationThrift ) != std::string::npos;
516
546
517
547
return new http::CallbackRequestHandler (
518
- [this , useThrift , taskId, currentState, maxWait](
548
+ [this , sendThrift , taskId, currentState, maxWait](
519
549
proxygen::HTTPMessage* /* message*/ ,
520
550
const std::vector<std::unique_ptr<folly::IOBuf>>& /* body*/ ,
521
551
proxygen::ResponseHandler* downstream,
@@ -525,7 +555,7 @@ proxygen::RequestHandler* TaskResource::getTaskStatus(
525
555
httpSrvCpuExecutor_,
526
556
[this ,
527
557
evb,
528
- useThrift ,
558
+ sendThrift ,
529
559
taskId,
530
560
currentState,
531
561
maxWait,
@@ -535,10 +565,10 @@ proxygen::RequestHandler* TaskResource::getTaskStatus(
535
565
.getTaskStatus (taskId, currentState, maxWait, handlerState)
536
566
.via (evb)
537
567
.thenValue (
538
- [useThrift , downstream, taskId, handlerState](
568
+ [sendThrift , downstream, taskId, handlerState](
539
569
std::unique_ptr<protocol::TaskStatus> taskStatus) {
540
570
if (!handlerState->requestExpired ()) {
541
- if (useThrift ) {
571
+ if (sendThrift ) {
542
572
thrift::TaskStatus thriftTaskStatus;
543
573
toThrift (*taskStatus, thriftTaskStatus);
544
574
http::sendOkThriftResponse (
0 commit comments