Skip to content

Commit 72c07cc

Browse files
committed
Accept query id and slug in QueuedStatement
1 parent c111f79 commit 72c07cc

File tree

4 files changed

+252
-2
lines changed

4 files changed

+252
-2
lines changed

presto-docs/src/main/sphinx/rest/statement.rst

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,21 @@ Statement Resource
117117
}
118118
}
119119

120+
.. function:: PUT /v1/statement/{queryId}?slug={slug}
121+
122+
:query query: SQL Query to execute
123+
:query queryId: Query identifier to associate with this query
124+
:query slug: Nonce to associate with this query, that will be required for subsequent requests
125+
:reqheader X-Presto-User: User to execute statement on behalf of (optional)
126+
:reqheader X-Presto-Source: Source of query
127+
:reqheader X-Presto-Catalog: Catalog to execute query against
128+
:reqheader X-Presto-Schema: Schema to execute query against
129+
130+
Submits a statement to Presto for execution. This function is
131+
the analogue of the POST, and behaves exactly the same. The
132+
difference is that a query id and slug can be explicitly provided,
133+
instead of Presto generating it.
134+
120135
.. function:: GET /v1/statement/{queryId}/{token}
121136

122137
:query queryId: The query identifier returned from the initial POST to /v1/statement

presto-main/src/main/java/com/facebook/presto/server/protocol/QueuedStatementResource.java

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import javax.ws.rs.GET;
4949
import javax.ws.rs.HeaderParam;
5050
import javax.ws.rs.POST;
51+
import javax.ws.rs.PUT;
5152
import javax.ws.rs.Path;
5253
import javax.ws.rs.PathParam;
5354
import javax.ws.rs.Produces;
@@ -99,6 +100,7 @@
99100
import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
100101
import static javax.ws.rs.core.MediaType.TEXT_PLAIN_TYPE;
101102
import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
103+
import static javax.ws.rs.core.Response.Status.CONFLICT;
102104
import static javax.ws.rs.core.Response.Status.NOT_FOUND;
103105

104106
@Path("/")
@@ -222,6 +224,58 @@ public Response postStatement(
222224
return withCompressionConfiguration(Response.ok(query.getInitialQueryResults(uriInfo, xForwardedProto, xPrestoPrefixUrl, binaryResults)), compressionEnabled).build();
223225
}
224226

227+
/**
228+
* HTTP endpoint for submitting queries to the Presto Coordinator.
229+
* Presto performs lazy execution. The submission of a query returns
230+
* a placeholder for the result set, but the query gets
231+
* scheduled/dispatched only when the client polls for results.
232+
* This endpoint accepts a pre-minted queryId and slug, instead of
233+
* generating it.
234+
*
235+
* @param statement The statement or sql query string submitted
236+
* @param queryId Pre-minted query ID to associate with this query
237+
* @param slug Pre-minted slug to protect this query
238+
* @param xForwardedProto Forwarded protocol (http or https)
239+
* @param servletRequest The http request
240+
* @param uriInfo {@link javax.ws.rs.core.UriInfo}
241+
* @return {@link javax.ws.rs.core.Response} HTTP response code
242+
*/
243+
@PUT
244+
@Path("/v1/statement/{queryId}")
245+
@Produces(APPLICATION_JSON)
246+
public Response putStatement(
247+
String statement,
248+
@PathParam("queryId") QueryId queryId,
249+
@QueryParam("slug") String slug,
250+
@DefaultValue("false") @QueryParam("binaryResults") boolean binaryResults,
251+
@HeaderParam(X_FORWARDED_PROTO) String xForwardedProto,
252+
@HeaderParam(PRESTO_PREFIX_URL) String xPrestoPrefixUrl,
253+
@Context HttpServletRequest servletRequest,
254+
@Context UriInfo uriInfo)
255+
{
256+
if (isNullOrEmpty(statement)) {
257+
throw badRequest(BAD_REQUEST, "SQL statement is empty");
258+
}
259+
260+
abortIfPrefixUrlInvalid(xPrestoPrefixUrl);
261+
262+
// TODO: For future cases we may want to start tracing from client. Then continuation of tracing
263+
// will be needed instead of creating a new trace here.
264+
SessionContext sessionContext = new HttpRequestSessionContext(
265+
servletRequest,
266+
sqlParserOptions,
267+
tracerProviderManager.getTracerProvider(),
268+
Optional.of(sessionPropertyManager));
269+
Query attemptedQuery = new Query(statement, sessionContext, dispatchManager, executingQueryResponseProvider, 0, queryId, slug);
270+
Query query = queries.computeIfAbsent(queryId, unused -> attemptedQuery);
271+
272+
if (attemptedQuery != query && !attemptedQuery.getSlug().equals(query.getSlug()) || query.getLastToken() != 0) {
273+
throw badRequest(CONFLICT, "Query already exists");
274+
}
275+
276+
return withCompressionConfiguration(Response.ok(query.getInitialQueryResults(uriInfo, xForwardedProto, xPrestoPrefixUrl, binaryResults)), compressionEnabled).build();
277+
}
278+
225279
/**
226280
* HTTP endpoint for re-processing a failed query
227281
* @param queryId Query Identifier of the query to be retried
@@ -386,21 +440,34 @@ private static final class Query
386440
private final DispatchManager dispatchManager;
387441
private final ExecutingQueryResponseProvider executingQueryResponseProvider;
388442
private final QueryId queryId;
389-
private final String slug = "x" + randomUUID().toString().toLowerCase(ENGLISH).replace("-", "");
443+
private final String slug;
390444
private final AtomicLong lastToken = new AtomicLong();
391445
private final int retryCount;
392446

393447
@GuardedBy("this")
394448
private ListenableFuture<?> querySubmissionFuture;
395449

396450
public Query(String query, SessionContext sessionContext, DispatchManager dispatchManager, ExecutingQueryResponseProvider executingQueryResponseProvider, int retryCount)
451+
{
452+
this(query, sessionContext, dispatchManager, executingQueryResponseProvider, retryCount, dispatchManager.createQueryId(), createSlug());
453+
}
454+
455+
public Query(
456+
String query,
457+
SessionContext sessionContext,
458+
DispatchManager dispatchManager,
459+
ExecutingQueryResponseProvider executingQueryResponseProvider,
460+
int retryCount,
461+
QueryId queryId,
462+
String slug)
397463
{
398464
this.query = requireNonNull(query, "query is null");
399465
this.sessionContext = requireNonNull(sessionContext, "sessionContext is null");
400466
this.dispatchManager = requireNonNull(dispatchManager, "dispatchManager is null");
401467
this.executingQueryResponseProvider = requireNonNull(executingQueryResponseProvider, "executingQueryResponseProvider is null");
402-
this.queryId = dispatchManager.createQueryId();
403468
this.retryCount = retryCount;
469+
this.queryId = requireNonNull(queryId, "queryId is null");
470+
this.slug = requireNonNull(slug, "slug is null");
404471
}
405472

406473
/**
@@ -584,6 +651,11 @@ private QueryResults createQueryResults(long token, UriInfo uriInfo, String xFor
584651
dispatchInfo.getWaitingForPrerequisitesTime());
585652
}
586653

654+
private static String createSlug()
655+
{
656+
return "x" + randomUUID().toString().toLowerCase(ENGLISH).replace("-", "");
657+
}
658+
587659
private URI getNextUri(long token, UriInfo uriInfo, String xForwardedProto, String xPrestoPrefixUrl, DispatchInfo dispatchInfo, boolean binaryResults)
588660
{
589661
// if failed, query is complete

presto-main/src/test/java/com/facebook/presto/server/TestServer.java

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.facebook.airlift.http.client.HttpUriBuilder;
1919
import com.facebook.airlift.http.client.Request;
2020
import com.facebook.airlift.http.client.StatusResponseHandler;
21+
import com.facebook.airlift.http.client.UnexpectedResponseException;
2122
import com.facebook.airlift.http.client.jetty.JettyHttpClient;
2223
import com.facebook.airlift.json.JsonCodec;
2324
import com.facebook.airlift.testing.Closeables;
@@ -26,6 +27,7 @@
2627
import com.facebook.presto.common.Page;
2728
import com.facebook.presto.common.block.BlockEncodingManager;
2829
import com.facebook.presto.common.type.TimeZoneNotSupportedException;
30+
import com.facebook.presto.execution.QueryIdGenerator;
2931
import com.facebook.presto.execution.buffer.PagesSerdeFactory;
3032
import com.facebook.presto.server.testing.TestingPrestoServer;
3133
import com.facebook.presto.spi.QueryId;
@@ -48,9 +50,11 @@
4850

4951
import static com.facebook.airlift.http.client.FullJsonResponseHandler.createFullJsonResponseHandler;
5052
import static com.facebook.airlift.http.client.JsonResponseHandler.createJsonResponseHandler;
53+
import static com.facebook.airlift.http.client.Request.Builder.fromRequest;
5154
import static com.facebook.airlift.http.client.Request.Builder.prepareGet;
5255
import static com.facebook.airlift.http.client.Request.Builder.prepareHead;
5356
import static com.facebook.airlift.http.client.Request.Builder.preparePost;
57+
import static com.facebook.airlift.http.client.Request.Builder.preparePut;
5458
import static com.facebook.airlift.http.client.StaticBodyGenerator.createStaticBodyGenerator;
5559
import static com.facebook.airlift.http.client.StatusResponseHandler.createStatusResponseHandler;
5660
import static com.facebook.airlift.json.JsonCodec.jsonCodec;
@@ -262,6 +266,103 @@ public void testQuery()
262266
assertEquals(rows, ImmutableList.of(ImmutableList.of("system")));
263267
}
264268

269+
@Test
270+
public void testQueryWithPreMintedQueryIdAndSlug()
271+
{
272+
QueryId queryId = new QueryIdGenerator().createNextQueryId();
273+
String slug = "xxx";
274+
Request request = preparePut()
275+
.setUri(uriFor("/v1/statement/", queryId, slug))
276+
.setBodyGenerator(createStaticBodyGenerator("show catalogs", UTF_8))
277+
.setHeader(PRESTO_USER, "user")
278+
.setHeader(PRESTO_SOURCE, "source")
279+
.setHeader(PRESTO_CATALOG, "catalog")
280+
.setHeader(PRESTO_SCHEMA, "schema")
281+
.build();
282+
283+
QueryResults queryResults = client.execute(request, createJsonResponseHandler(QUERY_RESULTS_CODEC));
284+
285+
// verify slug in nextUri is same as requested
286+
assertEquals(queryResults.getNextUri().getQuery(), "slug=xxx");
287+
288+
// verify nextUri points to requested query id
289+
assertEquals(queryResults.getNextUri().getPath(), format("/v1/statement/queued/%s/1", queryId));
290+
291+
while (queryResults.getNextUri() != null) {
292+
queryResults = client.execute(prepareGet().setUri(queryResults.getNextUri()).build(), createJsonResponseHandler(QUERY_RESULTS_CODEC));
293+
}
294+
295+
if (queryResults.getError() != null) {
296+
fail(queryResults.getError().toString());
297+
}
298+
299+
// verify query id was passed down properly
300+
assertEquals(server.getDispatchManager().getQueryInfo(queryId).getQueryId(), queryId);
301+
}
302+
303+
@Test
304+
public void testPutStatementIdempotency()
305+
{
306+
QueryId queryId = new QueryIdGenerator().createNextQueryId();
307+
Request request = preparePut()
308+
.setUri(uriFor("/v1/statement/", queryId, "slug"))
309+
.setBodyGenerator(createStaticBodyGenerator("show catalogs", UTF_8))
310+
.setHeader(PRESTO_USER, "user")
311+
.setHeader(PRESTO_SOURCE, "source")
312+
.setHeader(PRESTO_CATALOG, "catalog")
313+
.setHeader(PRESTO_SCHEMA, "schema")
314+
.build();
315+
316+
client.execute(request, createJsonResponseHandler(QUERY_RESULTS_CODEC));
317+
// Execute PUT request again should succeed
318+
QueryResults queryResults = client.execute(request, createJsonResponseHandler(QUERY_RESULTS_CODEC));
319+
320+
while (queryResults.getNextUri() != null) {
321+
queryResults = client.execute(prepareGet().setUri(queryResults.getNextUri()).build(), createJsonResponseHandler(QUERY_RESULTS_CODEC));
322+
}
323+
if (queryResults.getError() != null) {
324+
fail(queryResults.getError().toString());
325+
}
326+
}
327+
328+
@Test(expectedExceptions = UnexpectedResponseException.class, expectedExceptionsMessageRegExp = "Expected response code to be \\[.*\\], but was 409")
329+
public void testPutStatementWithDifferentSlugFails()
330+
{
331+
QueryId queryId = new QueryIdGenerator().createNextQueryId();
332+
Request request = preparePut()
333+
.setUri(uriFor("/v1/statement/", queryId, "slug"))
334+
.setBodyGenerator(createStaticBodyGenerator("show catalogs", UTF_8))
335+
.setHeader(PRESTO_USER, "user")
336+
.setHeader(PRESTO_SOURCE, "source")
337+
.setHeader(PRESTO_CATALOG, "catalog")
338+
.setHeader(PRESTO_SCHEMA, "schema")
339+
.build();
340+
client.execute(request, createJsonResponseHandler(QUERY_RESULTS_CODEC));
341+
342+
Request badRequest = fromRequest(request)
343+
.setUri(uriFor("/v1/statement/", queryId, "different_slug"))
344+
.build();
345+
client.execute(badRequest, createJsonResponseHandler(QUERY_RESULTS_CODEC));
346+
}
347+
348+
@Test(expectedExceptions = UnexpectedResponseException.class, expectedExceptionsMessageRegExp = "Expected response code to be \\[.*\\], but was 409")
349+
public void testPutStatementAfterGetFails()
350+
{
351+
QueryId queryId = new QueryIdGenerator().createNextQueryId();
352+
Request request = preparePut()
353+
.setUri(uriFor("/v1/statement/", queryId, "slug"))
354+
.setBodyGenerator(createStaticBodyGenerator("show catalogs", UTF_8))
355+
.setHeader(PRESTO_USER, "user")
356+
.setHeader(PRESTO_SOURCE, "source")
357+
.setHeader(PRESTO_CATALOG, "catalog")
358+
.setHeader(PRESTO_SCHEMA, "schema")
359+
.build();
360+
361+
QueryResults queryResults = client.execute(request, createJsonResponseHandler(QUERY_RESULTS_CODEC));
362+
client.execute(prepareGet().setUri(queryResults.getNextUri()).build(), createJsonResponseHandler(QUERY_RESULTS_CODEC));
363+
client.execute(request, createJsonResponseHandler(QUERY_RESULTS_CODEC));
364+
}
365+
265366
@Test
266367
public void testTransactionSupport()
267368
{
@@ -327,4 +428,13 @@ public URI uriFor(String path)
327428
{
328429
return HttpUriBuilder.uriBuilderFrom(server.getBaseUrl()).replacePath(path).build();
329430
}
431+
432+
public URI uriFor(String path, QueryId queryId, String slug)
433+
{
434+
return HttpUriBuilder.uriBuilderFrom(server.getBaseUrl())
435+
.replacePath(path)
436+
.appendPath(queryId.getId())
437+
.addParameter("slug", slug)
438+
.build();
439+
}
330440
}

presto-openapi/src/main/resources/queued_statement.yaml

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,59 @@ paths:
4646
$ref: './schemas.yaml/#/components/schemas/QueryResults'
4747
'400':
4848
description: Bad request
49+
/v1/statement/{queryId}:
50+
put:
51+
summary: Submit a new query
52+
description: Submits a new query to the Presto coordinator, with a pre-minted query id and slug
53+
requestBody:
54+
required: true
55+
content:
56+
text/plain:
57+
schema:
58+
type: string
59+
description: The statement or SQL query string to be submitted
60+
parameters:
61+
- name: queryId
62+
in: path
63+
required: true
64+
schema:
65+
type: string
66+
description: The query id to associate with this query
67+
- name: slug
68+
in: query
69+
required: true
70+
schema:
71+
type: string
72+
description: Nonce to associate with this query, which is required for future requests
73+
- name: binaryResults
74+
in: query
75+
required: false
76+
schema:
77+
type: boolean
78+
description: Whether to return results in binary format
79+
- name: X-Forwarded-Proto
80+
in: header
81+
required: false
82+
schema:
83+
type: string
84+
description: Forwarded protocol (http or https)
85+
- name: Presto-Prefix-URL
86+
in: header
87+
required: false
88+
schema:
89+
type: string
90+
description: Prefix URL for Presto
91+
responses:
92+
'200':
93+
description: Query submitted successfully
94+
content:
95+
application/json:
96+
schema:
97+
$ref: './schemas.yaml/#/components/schemas/QueryResults'
98+
'400':
99+
description: Bad request
100+
'409':
101+
description: Conflict, this query already exists
49102
/v1/statement/queued/retry/{queryId}:
50103
get:
51104
summary: Retry a failed query

0 commit comments

Comments
 (0)