From 02c294f2233889ba3b7254e6ede8c20c29c23ef8 Mon Sep 17 00:00:00 2001 From: prithvip Date: Thu, 8 Aug 2024 13:57:44 -0700 Subject: [PATCH] Accept query id and slug in QueuedStatement --- .../src/main/sphinx/rest/statement.rst | 15 +++ .../protocol/QueuedStatementResource.java | 76 +++++++++++- .../facebook/presto/server/TestServer.java | 110 ++++++++++++++++++ .../src/main/resources/queued_statement.yaml | 53 +++++++++ 4 files changed, 252 insertions(+), 2 deletions(-) diff --git a/presto-docs/src/main/sphinx/rest/statement.rst b/presto-docs/src/main/sphinx/rest/statement.rst index 21854ff1e8625..c7408a9eed023 100644 --- a/presto-docs/src/main/sphinx/rest/statement.rst +++ b/presto-docs/src/main/sphinx/rest/statement.rst @@ -117,6 +117,21 @@ Statement Resource } } +.. function:: PUT /v1/statement/{queryId}?slug={slug} + + :query query: SQL Query to execute + :query queryId: Query identifier to associate with this query + :query slug: Nonce to associate with this query, that will be required for subsequent requests + :reqheader X-Presto-User: User to execute statement on behalf of (optional) + :reqheader X-Presto-Source: Source of query + :reqheader X-Presto-Catalog: Catalog to execute query against + :reqheader X-Presto-Schema: Schema to execute query against + + Submits a statement to Presto for execution. This function is + the analogue of the POST, and behaves exactly the same. The + difference is that a query id and slug can be explicitly provided, + instead of Presto generating it. + .. function:: GET /v1/statement/{queryId}/{token} :query queryId: The query identifier returned from the initial POST to /v1/statement diff --git a/presto-main/src/main/java/com/facebook/presto/server/protocol/QueuedStatementResource.java b/presto-main/src/main/java/com/facebook/presto/server/protocol/QueuedStatementResource.java index ffbb26691398e..82b4b55cbb459 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/protocol/QueuedStatementResource.java +++ b/presto-main/src/main/java/com/facebook/presto/server/protocol/QueuedStatementResource.java @@ -48,6 +48,7 @@ import javax.ws.rs.GET; import javax.ws.rs.HeaderParam; import javax.ws.rs.POST; +import javax.ws.rs.PUT; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; @@ -99,6 +100,7 @@ import static javax.ws.rs.core.MediaType.APPLICATION_JSON; import static javax.ws.rs.core.MediaType.TEXT_PLAIN_TYPE; import static javax.ws.rs.core.Response.Status.BAD_REQUEST; +import static javax.ws.rs.core.Response.Status.CONFLICT; import static javax.ws.rs.core.Response.Status.NOT_FOUND; @Path("/") @@ -222,6 +224,58 @@ public Response postStatement( return withCompressionConfiguration(Response.ok(query.getInitialQueryResults(uriInfo, xForwardedProto, xPrestoPrefixUrl, binaryResults)), compressionEnabled).build(); } + /** + * HTTP endpoint for submitting queries to the Presto Coordinator. + * Presto performs lazy execution. The submission of a query returns + * a placeholder for the result set, but the query gets + * scheduled/dispatched only when the client polls for results. + * This endpoint accepts a pre-minted queryId and slug, instead of + * generating it. + * + * @param statement The statement or sql query string submitted + * @param queryId Pre-minted query ID to associate with this query + * @param slug Pre-minted slug to protect this query + * @param xForwardedProto Forwarded protocol (http or https) + * @param servletRequest The http request + * @param uriInfo {@link javax.ws.rs.core.UriInfo} + * @return {@link javax.ws.rs.core.Response} HTTP response code + */ + @PUT + @Path("/v1/statement/{queryId}") + @Produces(APPLICATION_JSON) + public Response putStatement( + String statement, + @PathParam("queryId") QueryId queryId, + @QueryParam("slug") String slug, + @DefaultValue("false") @QueryParam("binaryResults") boolean binaryResults, + @HeaderParam(X_FORWARDED_PROTO) String xForwardedProto, + @HeaderParam(PRESTO_PREFIX_URL) String xPrestoPrefixUrl, + @Context HttpServletRequest servletRequest, + @Context UriInfo uriInfo) + { + if (isNullOrEmpty(statement)) { + throw badRequest(BAD_REQUEST, "SQL statement is empty"); + } + + abortIfPrefixUrlInvalid(xPrestoPrefixUrl); + + // TODO: For future cases we may want to start tracing from client. Then continuation of tracing + // will be needed instead of creating a new trace here. + SessionContext sessionContext = new HttpRequestSessionContext( + servletRequest, + sqlParserOptions, + tracerProviderManager.getTracerProvider(), + Optional.of(sessionPropertyManager)); + Query attemptedQuery = new Query(statement, sessionContext, dispatchManager, executingQueryResponseProvider, 0, queryId, slug); + Query query = queries.computeIfAbsent(queryId, unused -> attemptedQuery); + + if (attemptedQuery != query && !attemptedQuery.getSlug().equals(query.getSlug()) || query.getLastToken() != 0) { + throw badRequest(CONFLICT, "Query already exists"); + } + + return withCompressionConfiguration(Response.ok(query.getInitialQueryResults(uriInfo, xForwardedProto, xPrestoPrefixUrl, binaryResults)), compressionEnabled).build(); + } + /** * HTTP endpoint for re-processing a failed query * @param queryId Query Identifier of the query to be retried @@ -386,7 +440,7 @@ private static final class Query private final DispatchManager dispatchManager; private final ExecutingQueryResponseProvider executingQueryResponseProvider; private final QueryId queryId; - private final String slug = "x" + randomUUID().toString().toLowerCase(ENGLISH).replace("-", ""); + private final String slug; private final AtomicLong lastToken = new AtomicLong(); private final int retryCount; @@ -394,13 +448,26 @@ private static final class Query private ListenableFuture querySubmissionFuture; public Query(String query, SessionContext sessionContext, DispatchManager dispatchManager, ExecutingQueryResponseProvider executingQueryResponseProvider, int retryCount) + { + this(query, sessionContext, dispatchManager, executingQueryResponseProvider, retryCount, dispatchManager.createQueryId(), createSlug()); + } + + public Query( + String query, + SessionContext sessionContext, + DispatchManager dispatchManager, + ExecutingQueryResponseProvider executingQueryResponseProvider, + int retryCount, + QueryId queryId, + String slug) { this.query = requireNonNull(query, "query is null"); this.sessionContext = requireNonNull(sessionContext, "sessionContext is null"); this.dispatchManager = requireNonNull(dispatchManager, "dispatchManager is null"); this.executingQueryResponseProvider = requireNonNull(executingQueryResponseProvider, "executingQueryResponseProvider is null"); - this.queryId = dispatchManager.createQueryId(); this.retryCount = retryCount; + this.queryId = requireNonNull(queryId, "queryId is null"); + this.slug = requireNonNull(slug, "slug is null"); } /** @@ -584,6 +651,11 @@ private QueryResults createQueryResults(long token, UriInfo uriInfo, String xFor dispatchInfo.getWaitingForPrerequisitesTime()); } + private static String createSlug() + { + return "x" + randomUUID().toString().toLowerCase(ENGLISH).replace("-", ""); + } + private URI getNextUri(long token, UriInfo uriInfo, String xForwardedProto, String xPrestoPrefixUrl, DispatchInfo dispatchInfo, boolean binaryResults) { // if failed, query is complete diff --git a/presto-main/src/test/java/com/facebook/presto/server/TestServer.java b/presto-main/src/test/java/com/facebook/presto/server/TestServer.java index 34391051d679f..020c90f844663 100644 --- a/presto-main/src/test/java/com/facebook/presto/server/TestServer.java +++ b/presto-main/src/test/java/com/facebook/presto/server/TestServer.java @@ -18,6 +18,7 @@ import com.facebook.airlift.http.client.HttpUriBuilder; import com.facebook.airlift.http.client.Request; import com.facebook.airlift.http.client.StatusResponseHandler; +import com.facebook.airlift.http.client.UnexpectedResponseException; import com.facebook.airlift.http.client.jetty.JettyHttpClient; import com.facebook.airlift.json.JsonCodec; import com.facebook.airlift.testing.Closeables; @@ -26,6 +27,7 @@ import com.facebook.presto.common.Page; import com.facebook.presto.common.block.BlockEncodingManager; import com.facebook.presto.common.type.TimeZoneNotSupportedException; +import com.facebook.presto.execution.QueryIdGenerator; import com.facebook.presto.execution.buffer.PagesSerdeFactory; import com.facebook.presto.server.testing.TestingPrestoServer; import com.facebook.presto.spi.QueryId; @@ -48,9 +50,11 @@ import static com.facebook.airlift.http.client.FullJsonResponseHandler.createFullJsonResponseHandler; import static com.facebook.airlift.http.client.JsonResponseHandler.createJsonResponseHandler; +import static com.facebook.airlift.http.client.Request.Builder.fromRequest; import static com.facebook.airlift.http.client.Request.Builder.prepareGet; import static com.facebook.airlift.http.client.Request.Builder.prepareHead; import static com.facebook.airlift.http.client.Request.Builder.preparePost; +import static com.facebook.airlift.http.client.Request.Builder.preparePut; import static com.facebook.airlift.http.client.StaticBodyGenerator.createStaticBodyGenerator; import static com.facebook.airlift.http.client.StatusResponseHandler.createStatusResponseHandler; import static com.facebook.airlift.json.JsonCodec.jsonCodec; @@ -262,6 +266,103 @@ public void testQuery() assertEquals(rows, ImmutableList.of(ImmutableList.of("system"))); } + @Test + public void testQueryWithPreMintedQueryIdAndSlug() + { + QueryId queryId = new QueryIdGenerator().createNextQueryId(); + String slug = "xxx"; + Request request = preparePut() + .setUri(uriFor("/v1/statement/", queryId, slug)) + .setBodyGenerator(createStaticBodyGenerator("show catalogs", UTF_8)) + .setHeader(PRESTO_USER, "user") + .setHeader(PRESTO_SOURCE, "source") + .setHeader(PRESTO_CATALOG, "catalog") + .setHeader(PRESTO_SCHEMA, "schema") + .build(); + + QueryResults queryResults = client.execute(request, createJsonResponseHandler(QUERY_RESULTS_CODEC)); + + // verify slug in nextUri is same as requested + assertEquals(queryResults.getNextUri().getQuery(), "slug=xxx"); + + // verify nextUri points to requested query id + assertEquals(queryResults.getNextUri().getPath(), format("/v1/statement/queued/%s/1", queryId)); + + while (queryResults.getNextUri() != null) { + queryResults = client.execute(prepareGet().setUri(queryResults.getNextUri()).build(), createJsonResponseHandler(QUERY_RESULTS_CODEC)); + } + + if (queryResults.getError() != null) { + fail(queryResults.getError().toString()); + } + + // verify query id was passed down properly + assertEquals(server.getDispatchManager().getQueryInfo(queryId).getQueryId(), queryId); + } + + @Test + public void testPutStatementIdempotency() + { + QueryId queryId = new QueryIdGenerator().createNextQueryId(); + Request request = preparePut() + .setUri(uriFor("/v1/statement/", queryId, "slug")) + .setBodyGenerator(createStaticBodyGenerator("show catalogs", UTF_8)) + .setHeader(PRESTO_USER, "user") + .setHeader(PRESTO_SOURCE, "source") + .setHeader(PRESTO_CATALOG, "catalog") + .setHeader(PRESTO_SCHEMA, "schema") + .build(); + + client.execute(request, createJsonResponseHandler(QUERY_RESULTS_CODEC)); + // Execute PUT request again should succeed + QueryResults queryResults = client.execute(request, createJsonResponseHandler(QUERY_RESULTS_CODEC)); + + while (queryResults.getNextUri() != null) { + queryResults = client.execute(prepareGet().setUri(queryResults.getNextUri()).build(), createJsonResponseHandler(QUERY_RESULTS_CODEC)); + } + if (queryResults.getError() != null) { + fail(queryResults.getError().toString()); + } + } + + @Test(expectedExceptions = UnexpectedResponseException.class, expectedExceptionsMessageRegExp = "Expected response code to be \\[.*\\], but was 409") + public void testPutStatementWithDifferentSlugFails() + { + QueryId queryId = new QueryIdGenerator().createNextQueryId(); + Request request = preparePut() + .setUri(uriFor("/v1/statement/", queryId, "slug")) + .setBodyGenerator(createStaticBodyGenerator("show catalogs", UTF_8)) + .setHeader(PRESTO_USER, "user") + .setHeader(PRESTO_SOURCE, "source") + .setHeader(PRESTO_CATALOG, "catalog") + .setHeader(PRESTO_SCHEMA, "schema") + .build(); + client.execute(request, createJsonResponseHandler(QUERY_RESULTS_CODEC)); + + Request badRequest = fromRequest(request) + .setUri(uriFor("/v1/statement/", queryId, "different_slug")) + .build(); + client.execute(badRequest, createJsonResponseHandler(QUERY_RESULTS_CODEC)); + } + + @Test(expectedExceptions = UnexpectedResponseException.class, expectedExceptionsMessageRegExp = "Expected response code to be \\[.*\\], but was 409") + public void testPutStatementAfterGetFails() + { + QueryId queryId = new QueryIdGenerator().createNextQueryId(); + Request request = preparePut() + .setUri(uriFor("/v1/statement/", queryId, "slug")) + .setBodyGenerator(createStaticBodyGenerator("show catalogs", UTF_8)) + .setHeader(PRESTO_USER, "user") + .setHeader(PRESTO_SOURCE, "source") + .setHeader(PRESTO_CATALOG, "catalog") + .setHeader(PRESTO_SCHEMA, "schema") + .build(); + + QueryResults queryResults = client.execute(request, createJsonResponseHandler(QUERY_RESULTS_CODEC)); + client.execute(prepareGet().setUri(queryResults.getNextUri()).build(), createJsonResponseHandler(QUERY_RESULTS_CODEC)); + client.execute(request, createJsonResponseHandler(QUERY_RESULTS_CODEC)); + } + @Test public void testTransactionSupport() { @@ -327,4 +428,13 @@ public URI uriFor(String path) { return HttpUriBuilder.uriBuilderFrom(server.getBaseUrl()).replacePath(path).build(); } + + public URI uriFor(String path, QueryId queryId, String slug) + { + return HttpUriBuilder.uriBuilderFrom(server.getBaseUrl()) + .replacePath(path) + .appendPath(queryId.getId()) + .addParameter("slug", slug) + .build(); + } } diff --git a/presto-openapi/src/main/resources/queued_statement.yaml b/presto-openapi/src/main/resources/queued_statement.yaml index dde93a083d146..0e8fc295e8515 100644 --- a/presto-openapi/src/main/resources/queued_statement.yaml +++ b/presto-openapi/src/main/resources/queued_statement.yaml @@ -46,6 +46,59 @@ paths: $ref: './schemas.yaml/#/components/schemas/QueryResults' '400': description: Bad request + /v1/statement/{queryId}: + put: + summary: Submit a new query + description: Submits a new query to the Presto coordinator, with a pre-minted query id and slug + requestBody: + required: true + content: + text/plain: + schema: + type: string + description: The statement or SQL query string to be submitted + parameters: + - name: queryId + in: path + required: true + schema: + type: string + description: The query id to associate with this query + - name: slug + in: query + required: true + schema: + type: string + description: Nonce to associate with this query, which is required for future requests + - name: binaryResults + in: query + required: false + schema: + type: boolean + description: Whether to return results in binary format + - name: X-Forwarded-Proto + in: header + required: false + schema: + type: string + description: Forwarded protocol (http or https) + - name: Presto-Prefix-URL + in: header + required: false + schema: + type: string + description: Prefix URL for Presto + responses: + '200': + description: Query submitted successfully + content: + application/json: + schema: + $ref: './schemas.yaml/#/components/schemas/QueryResults' + '400': + description: Bad request + '409': + description: Conflict, this query already exists /v1/statement/queued/retry/{queryId}: get: summary: Retry a failed query