Skip to content

Commit 5d2c3f0

Browse files
committed
Accept query id and slug in QueuedStatement
1 parent 90d1c0d commit 5d2c3f0

File tree

2 files changed

+111
-2
lines changed

2 files changed

+111
-2
lines changed

presto-main/src/main/java/com/facebook/presto/server/protocol/QueuedStatementResource.java

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,53 @@ public Response postStatement(
227227
return withCompressionConfiguration(Response.ok(query.getInitialQueryResults(uriInfo, xForwardedProto, xPrestoPrefixUrl, binaryResults)), compressionEnabled).build();
228228
}
229229

230+
/**
231+
* HTTP endpoint for submitting queries to the Presto Coordinator.
232+
* Presto performs lazy execution. The submission of a query returns
233+
* a placeholder for the result set, but the query gets
234+
* scheduled/dispatched only when the client polls for results.
235+
* This endpoint accepts a pre-minted queryId and slug, instead of
236+
* generating it.
237+
* @param statement The statement or sql query string submitted
238+
* @param queryId Pre-minted query ID to associate with this query
239+
* @param slug Pre-minted slug to protect this query
240+
* @param xForwardedProto Forwarded protocol (http or https)
241+
* @param servletRequest The http request
242+
* @param uriInfo {@link javax.ws.rs.core.UriInfo}
243+
* @return {@link javax.ws.rs.core.Response} HTTP response code
244+
*/
245+
@POST
246+
@Path("/v1/statement/queued/{queryId}")
247+
@Produces(APPLICATION_JSON)
248+
public Response postStatement(
249+
String statement,
250+
@PathParam("queryId") QueryId queryId,
251+
@QueryParam("slug") String slug,
252+
@DefaultValue("false") @QueryParam("binaryResults") boolean binaryResults,
253+
@HeaderParam(X_FORWARDED_PROTO) String xForwardedProto,
254+
@HeaderParam(PRESTO_PREFIX_URL) String xPrestoPrefixUrl,
255+
@Context HttpServletRequest servletRequest,
256+
@Context UriInfo uriInfo)
257+
{
258+
if (isNullOrEmpty(statement)) {
259+
throw badRequest(BAD_REQUEST, "SQL statement is empty");
260+
}
261+
262+
abortIfPrefixUrlInvalid(xPrestoPrefixUrl);
263+
264+
// TODO: For future cases we may want to start tracing from client. Then continuation of tracing
265+
// will be needed instead of creating a new trace here.
266+
SessionContext sessionContext = new HttpRequestSessionContext(
267+
servletRequest,
268+
sqlParserOptions,
269+
tracerProviderManager.getTracerProvider(),
270+
Optional.of(sessionPropertyManager));
271+
Query query = new Query(statement, sessionContext, dispatchManager, queryResultsProvider, 0, queryId, slug);
272+
queries.put(query.getQueryId(), query);
273+
274+
return withCompressionConfiguration(Response.ok(query.getInitialQueryResults(uriInfo, xForwardedProto, xPrestoPrefixUrl, binaryResults)), compressionEnabled).build();
275+
}
276+
230277
/**
231278
* HTTP endpoint for re-processing a failed query
232279
* @param queryId Query Identifier of the query to be retried
@@ -455,21 +502,34 @@ private static final class Query
455502
private final DispatchManager dispatchManager;
456503
private final LocalQueryProvider queryProvider;
457504
private final QueryId queryId;
458-
private final String slug = "x" + randomUUID().toString().toLowerCase(ENGLISH).replace("-", "");
505+
private final String slug;
459506
private final AtomicLong lastToken = new AtomicLong();
460507
private final int retryCount;
461508

462509
@GuardedBy("this")
463510
private ListenableFuture<?> querySubmissionFuture;
464511

465512
public Query(String query, SessionContext sessionContext, DispatchManager dispatchManager, LocalQueryProvider queryResultsProvider, int retryCount)
513+
{
514+
this(query, sessionContext, dispatchManager, queryResultsProvider, retryCount, dispatchManager.createQueryId(), createSlug());
515+
}
516+
517+
public Query(
518+
String query,
519+
SessionContext sessionContext,
520+
DispatchManager dispatchManager,
521+
LocalQueryProvider queryResultsProvider,
522+
int retryCount,
523+
QueryId queryId,
524+
String slug)
466525
{
467526
this.query = requireNonNull(query, "query is null");
468527
this.sessionContext = requireNonNull(sessionContext, "sessionContext is null");
469528
this.dispatchManager = requireNonNull(dispatchManager, "dispatchManager is null");
470529
this.queryProvider = requireNonNull(queryResultsProvider, "queryExecutor is null");
471-
this.queryId = dispatchManager.createQueryId();
472530
this.retryCount = retryCount;
531+
this.queryId = requireNonNull(queryId, "queryId is null");
532+
this.slug = requireNonNull(slug, "slug is null");
473533
}
474534

475535
/**
@@ -640,6 +700,11 @@ private QueryResults createQueryResults(long token, UriInfo uriInfo, String xFor
640700
dispatchInfo.getWaitingForPrerequisitesTime());
641701
}
642702

703+
private static String createSlug()
704+
{
705+
return "x" + randomUUID().toString().toLowerCase(ENGLISH).replace("-", "");
706+
}
707+
643708
private URI getNextUri(long token, UriInfo uriInfo, String xForwardedProto, String xPrestoPrefixUrl, DispatchInfo dispatchInfo, boolean binaryResults)
644709
{
645710
// if failed, query is complete

presto-main/src/test/java/com/facebook/presto/server/TestServer.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.facebook.presto.common.Page;
2727
import com.facebook.presto.common.block.BlockEncodingManager;
2828
import com.facebook.presto.common.type.TimeZoneNotSupportedException;
29+
import com.facebook.presto.execution.QueryIdGenerator;
2930
import com.facebook.presto.execution.buffer.PagesSerdeFactory;
3031
import com.facebook.presto.server.testing.TestingPrestoServer;
3132
import com.facebook.presto.spi.QueryId;
@@ -262,6 +263,40 @@ public void testQuery()
262263
assertEquals(rows, ImmutableList.of(ImmutableList.of("system")));
263264
}
264265

266+
@Test
267+
public void testQueryWithPreMintedQueryIdAndSlug()
268+
{
269+
QueryId queryId = new QueryIdGenerator().createNextQueryId();
270+
String slug = "xxx";
271+
Request request = preparePost()
272+
.setUri(uriFor("/v1/statement/queued/", queryId, slug))
273+
.setBodyGenerator(createStaticBodyGenerator("show catalogs", UTF_8))
274+
.setHeader(PRESTO_USER, "user")
275+
.setHeader(PRESTO_SOURCE, "source")
276+
.setHeader(PRESTO_CATALOG, "catalog")
277+
.setHeader(PRESTO_SCHEMA, "schema")
278+
.build();
279+
280+
QueryResults queryResults = client.execute(request, createJsonResponseHandler(QUERY_RESULTS_CODEC));
281+
282+
// verify slug in nextUri is same as requested
283+
assertEquals(queryResults.getNextUri().getQuery(), "slug=xxx");
284+
285+
// verify nextUri points to requested query id
286+
assertEquals(queryResults.getNextUri().getPath(), format("/v1/statement/queued/%s/1", queryId));
287+
288+
while (queryResults.getNextUri() != null) {
289+
queryResults = client.execute(prepareGet().setUri(queryResults.getNextUri()).build(), createJsonResponseHandler(QUERY_RESULTS_CODEC));
290+
}
291+
292+
if (queryResults.getError() != null) {
293+
fail(queryResults.getError().toString());
294+
}
295+
296+
// verify query id was passed down properly
297+
assertEquals(server.getDispatchManager().getQueryInfo(queryId).getQueryId(), queryId);
298+
}
299+
265300
@Test
266301
public void testTransactionSupport()
267302
{
@@ -327,4 +362,13 @@ public URI uriFor(String path)
327362
{
328363
return HttpUriBuilder.uriBuilderFrom(server.getBaseUrl()).replacePath(path).build();
329364
}
365+
366+
public URI uriFor(String path, QueryId queryId, String slug)
367+
{
368+
return HttpUriBuilder.uriBuilderFrom(server.getBaseUrl())
369+
.replacePath(path)
370+
.appendPath(queryId.getId())
371+
.addParameter("slug", slug)
372+
.build();
373+
}
330374
}

0 commit comments

Comments
 (0)