Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions presto-docs/src/main/sphinx/rest/statement.rst
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,21 @@ Statement Resource
}
}

.. function:: PUT /v1/statement/{queryId}?slug={slug}

:query query: SQL Query to execute
:query queryId: Query identifier to associate with this query
:query slug: Nonce to associate with this query, that will be required for subsequent requests
:reqheader X-Presto-User: User to execute statement on behalf of (optional)
:reqheader X-Presto-Source: Source of query
:reqheader X-Presto-Catalog: Catalog to execute query against
:reqheader X-Presto-Schema: Schema to execute query against

Submits a statement to Presto for execution. This function is
the analogue of the POST, and behaves exactly the same. The
difference is that a query id and slug can be explicitly provided,
instead of Presto generating it.

.. function:: GET /v1/statement/{queryId}/{token}

:query queryId: The query identifier returned from the initial POST to /v1/statement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
Expand Down Expand Up @@ -99,6 +100,7 @@
import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
import static javax.ws.rs.core.MediaType.TEXT_PLAIN_TYPE;
import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
import static javax.ws.rs.core.Response.Status.CONFLICT;
import static javax.ws.rs.core.Response.Status.NOT_FOUND;

@Path("/")
Expand Down Expand Up @@ -222,6 +224,58 @@ public Response postStatement(
return withCompressionConfiguration(Response.ok(query.getInitialQueryResults(uriInfo, xForwardedProto, xPrestoPrefixUrl, binaryResults)), compressionEnabled).build();
}

/**
* HTTP endpoint for submitting queries to the Presto Coordinator.
* Presto performs lazy execution. The submission of a query returns
* a placeholder for the result set, but the query gets
* scheduled/dispatched only when the client polls for results.
* This endpoint accepts a pre-minted queryId and slug, instead of
* generating it.
*
* @param statement The statement or sql query string submitted
* @param queryId Pre-minted query ID to associate with this query
* @param slug Pre-minted slug to protect this query
* @param xForwardedProto Forwarded protocol (http or https)
* @param servletRequest The http request
* @param uriInfo {@link javax.ws.rs.core.UriInfo}
* @return {@link javax.ws.rs.core.Response} HTTP response code
*/
@PUT
@Path("/v1/statement/{queryId}")
@Produces(APPLICATION_JSON)
public Response putStatement(
String statement,
@PathParam("queryId") QueryId queryId,
@QueryParam("slug") String slug,
@DefaultValue("false") @QueryParam("binaryResults") boolean binaryResults,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was added for fuzzer testing, and no client presently uses this. I'd remove it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that no client is using it, and I kept it here since the POST supports it. If we want to remove support for this, we should probably raise an issue first and remove it as part of a separate PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I asked around and it looks like we are actually using this flag for our client, so we should keep it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you share more details on the client that's using it? Is it an internal client?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amitkdutta do you have any details on this client?

@HeaderParam(X_FORWARDED_PROTO) String xForwardedProto,
@HeaderParam(PRESTO_PREFIX_URL) String xPrestoPrefixUrl,
@Context HttpServletRequest servletRequest,
@Context UriInfo uriInfo)
{
if (isNullOrEmpty(statement)) {
throw badRequest(BAD_REQUEST, "SQL statement is empty");
}

abortIfPrefixUrlInvalid(xPrestoPrefixUrl);

// TODO: For future cases we may want to start tracing from client. Then continuation of tracing
// will be needed instead of creating a new trace here.
SessionContext sessionContext = new HttpRequestSessionContext(
servletRequest,
sqlParserOptions,
tracerProviderManager.getTracerProvider(),
Optional.of(sessionPropertyManager));
Query attemptedQuery = new Query(statement, sessionContext, dispatchManager, executingQueryResponseProvider, 0, queryId, slug);
Query query = queries.computeIfAbsent(queryId, unused -> attemptedQuery);

if (attemptedQuery != query && !attemptedQuery.getSlug().equals(query.getSlug()) || query.getLastToken() != 0) {
throw badRequest(CONFLICT, "Query already exists");
}

return withCompressionConfiguration(Response.ok(query.getInitialQueryResults(uriInfo, xForwardedProto, xPrestoPrefixUrl, binaryResults)), compressionEnabled).build();
}

/**
* HTTP endpoint for re-processing a failed query
* @param queryId Query Identifier of the query to be retried
Expand Down Expand Up @@ -386,21 +440,34 @@ private static final class Query
private final DispatchManager dispatchManager;
private final ExecutingQueryResponseProvider executingQueryResponseProvider;
private final QueryId queryId;
private final String slug = "x" + randomUUID().toString().toLowerCase(ENGLISH).replace("-", "");
private final String slug;
private final AtomicLong lastToken = new AtomicLong();
private final int retryCount;

@GuardedBy("this")
private ListenableFuture<?> querySubmissionFuture;

public Query(String query, SessionContext sessionContext, DispatchManager dispatchManager, ExecutingQueryResponseProvider executingQueryResponseProvider, int retryCount)
{
this(query, sessionContext, dispatchManager, executingQueryResponseProvider, retryCount, dispatchManager.createQueryId(), createSlug());
}

public Query(
String query,
SessionContext sessionContext,
DispatchManager dispatchManager,
ExecutingQueryResponseProvider executingQueryResponseProvider,
int retryCount,
QueryId queryId,
String slug)
{
this.query = requireNonNull(query, "query is null");
this.sessionContext = requireNonNull(sessionContext, "sessionContext is null");
this.dispatchManager = requireNonNull(dispatchManager, "dispatchManager is null");
this.executingQueryResponseProvider = requireNonNull(executingQueryResponseProvider, "executingQueryResponseProvider is null");
this.queryId = dispatchManager.createQueryId();
this.retryCount = retryCount;
this.queryId = requireNonNull(queryId, "queryId is null");
this.slug = requireNonNull(slug, "slug is null");
}

/**
Expand Down Expand Up @@ -584,6 +651,11 @@ private QueryResults createQueryResults(long token, UriInfo uriInfo, String xFor
dispatchInfo.getWaitingForPrerequisitesTime());
}

private static String createSlug()
{
return "x" + randomUUID().toString().toLowerCase(ENGLISH).replace("-", "");
}

private URI getNextUri(long token, UriInfo uriInfo, String xForwardedProto, String xPrestoPrefixUrl, DispatchInfo dispatchInfo, boolean binaryResults)
{
// if failed, query is complete
Expand Down
110 changes: 110 additions & 0 deletions presto-main/src/test/java/com/facebook/presto/server/TestServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.airlift.http.client.HttpUriBuilder;
import com.facebook.airlift.http.client.Request;
import com.facebook.airlift.http.client.StatusResponseHandler;
import com.facebook.airlift.http.client.UnexpectedResponseException;
import com.facebook.airlift.http.client.jetty.JettyHttpClient;
import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.testing.Closeables;
Expand All @@ -26,6 +27,7 @@
import com.facebook.presto.common.Page;
import com.facebook.presto.common.block.BlockEncodingManager;
import com.facebook.presto.common.type.TimeZoneNotSupportedException;
import com.facebook.presto.execution.QueryIdGenerator;
import com.facebook.presto.execution.buffer.PagesSerdeFactory;
import com.facebook.presto.server.testing.TestingPrestoServer;
import com.facebook.presto.spi.QueryId;
Expand All @@ -48,9 +50,11 @@

import static com.facebook.airlift.http.client.FullJsonResponseHandler.createFullJsonResponseHandler;
import static com.facebook.airlift.http.client.JsonResponseHandler.createJsonResponseHandler;
import static com.facebook.airlift.http.client.Request.Builder.fromRequest;
import static com.facebook.airlift.http.client.Request.Builder.prepareGet;
import static com.facebook.airlift.http.client.Request.Builder.prepareHead;
import static com.facebook.airlift.http.client.Request.Builder.preparePost;
import static com.facebook.airlift.http.client.Request.Builder.preparePut;
import static com.facebook.airlift.http.client.StaticBodyGenerator.createStaticBodyGenerator;
import static com.facebook.airlift.http.client.StatusResponseHandler.createStatusResponseHandler;
import static com.facebook.airlift.json.JsonCodec.jsonCodec;
Expand Down Expand Up @@ -262,6 +266,103 @@ public void testQuery()
assertEquals(rows, ImmutableList.of(ImmutableList.of("system")));
}

@Test
public void testQueryWithPreMintedQueryIdAndSlug()
{
QueryId queryId = new QueryIdGenerator().createNextQueryId();
String slug = "xxx";
Request request = preparePut()
.setUri(uriFor("/v1/statement/", queryId, slug))
.setBodyGenerator(createStaticBodyGenerator("show catalogs", UTF_8))
.setHeader(PRESTO_USER, "user")
.setHeader(PRESTO_SOURCE, "source")
.setHeader(PRESTO_CATALOG, "catalog")
.setHeader(PRESTO_SCHEMA, "schema")
.build();

QueryResults queryResults = client.execute(request, createJsonResponseHandler(QUERY_RESULTS_CODEC));

// verify slug in nextUri is same as requested
assertEquals(queryResults.getNextUri().getQuery(), "slug=xxx");

// verify nextUri points to requested query id
assertEquals(queryResults.getNextUri().getPath(), format("/v1/statement/queued/%s/1", queryId));

while (queryResults.getNextUri() != null) {
queryResults = client.execute(prepareGet().setUri(queryResults.getNextUri()).build(), createJsonResponseHandler(QUERY_RESULTS_CODEC));
}

if (queryResults.getError() != null) {
fail(queryResults.getError().toString());
}

// verify query id was passed down properly
assertEquals(server.getDispatchManager().getQueryInfo(queryId).getQueryId(), queryId);
}

@Test
public void testPutStatementIdempotency()
{
QueryId queryId = new QueryIdGenerator().createNextQueryId();
Request request = preparePut()
.setUri(uriFor("/v1/statement/", queryId, "slug"))
.setBodyGenerator(createStaticBodyGenerator("show catalogs", UTF_8))
.setHeader(PRESTO_USER, "user")
.setHeader(PRESTO_SOURCE, "source")
.setHeader(PRESTO_CATALOG, "catalog")
.setHeader(PRESTO_SCHEMA, "schema")
.build();

client.execute(request, createJsonResponseHandler(QUERY_RESULTS_CODEC));
// Execute PUT request again should succeed
QueryResults queryResults = client.execute(request, createJsonResponseHandler(QUERY_RESULTS_CODEC));

while (queryResults.getNextUri() != null) {
queryResults = client.execute(prepareGet().setUri(queryResults.getNextUri()).build(), createJsonResponseHandler(QUERY_RESULTS_CODEC));
}
if (queryResults.getError() != null) {
fail(queryResults.getError().toString());
}
}

@Test(expectedExceptions = UnexpectedResponseException.class, expectedExceptionsMessageRegExp = "Expected response code to be \\[.*\\], but was 409")
public void testPutStatementWithDifferentSlugFails()
{
QueryId queryId = new QueryIdGenerator().createNextQueryId();
Request request = preparePut()
.setUri(uriFor("/v1/statement/", queryId, "slug"))
.setBodyGenerator(createStaticBodyGenerator("show catalogs", UTF_8))
.setHeader(PRESTO_USER, "user")
.setHeader(PRESTO_SOURCE, "source")
.setHeader(PRESTO_CATALOG, "catalog")
.setHeader(PRESTO_SCHEMA, "schema")
.build();
client.execute(request, createJsonResponseHandler(QUERY_RESULTS_CODEC));

Request badRequest = fromRequest(request)
.setUri(uriFor("/v1/statement/", queryId, "different_slug"))
.build();
client.execute(badRequest, createJsonResponseHandler(QUERY_RESULTS_CODEC));
}

@Test(expectedExceptions = UnexpectedResponseException.class, expectedExceptionsMessageRegExp = "Expected response code to be \\[.*\\], but was 409")
public void testPutStatementAfterGetFails()
{
QueryId queryId = new QueryIdGenerator().createNextQueryId();
Request request = preparePut()
.setUri(uriFor("/v1/statement/", queryId, "slug"))
.setBodyGenerator(createStaticBodyGenerator("show catalogs", UTF_8))
.setHeader(PRESTO_USER, "user")
.setHeader(PRESTO_SOURCE, "source")
.setHeader(PRESTO_CATALOG, "catalog")
.setHeader(PRESTO_SCHEMA, "schema")
.build();

QueryResults queryResults = client.execute(request, createJsonResponseHandler(QUERY_RESULTS_CODEC));
client.execute(prepareGet().setUri(queryResults.getNextUri()).build(), createJsonResponseHandler(QUERY_RESULTS_CODEC));
client.execute(request, createJsonResponseHandler(QUERY_RESULTS_CODEC));
}

@Test
public void testTransactionSupport()
{
Expand Down Expand Up @@ -327,4 +428,13 @@ public URI uriFor(String path)
{
return HttpUriBuilder.uriBuilderFrom(server.getBaseUrl()).replacePath(path).build();
}

public URI uriFor(String path, QueryId queryId, String slug)
{
return HttpUriBuilder.uriBuilderFrom(server.getBaseUrl())
.replacePath(path)
.appendPath(queryId.getId())
.addParameter("slug", slug)
.build();
}
}
53 changes: 53 additions & 0 deletions presto-openapi/src/main/resources/queued_statement.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,59 @@ paths:
$ref: './schemas.yaml/#/components/schemas/QueryResults'
'400':
description: Bad request
/v1/statement/{queryId}:
put:
summary: Submit a new query
description: Submits a new query to the Presto coordinator, with a pre-minted query id and slug
requestBody:
required: true
content:
text/plain:
schema:
type: string
description: The statement or SQL query string to be submitted
parameters:
- name: queryId
in: path
required: true
schema:
type: string
description: The query id to associate with this query
- name: slug
in: query
required: true
schema:
type: string
description: Nonce to associate with this query, which is required for future requests
- name: binaryResults
in: query
required: false
schema:
type: boolean
description: Whether to return results in binary format
- name: X-Forwarded-Proto
in: header
required: false
schema:
type: string
description: Forwarded protocol (http or https)
- name: Presto-Prefix-URL
in: header
required: false
schema:
type: string
description: Prefix URL for Presto
responses:
'200':
description: Query submitted successfully
content:
application/json:
schema:
$ref: './schemas.yaml/#/components/schemas/QueryResults'
'400':
description: Bad request
'409':
description: Conflict, this query already exists
/v1/statement/queued/retry/{queryId}:
get:
summary: Retry a failed query
Expand Down
Loading