Skip to content

Commit b7814df

Browse files
committed
Add SPI to support plugin for custom plan checkers
This adds a new SPI to support plugin custom plan checkers to be provided for validating intermediate, final, and fragment stages of a logical plan. The motivation for this is to allow for a native plan checker that will eagerly validate a plan on the native sidecar to quickly fail the query if there are incompatibilities. Add unit test to verify that a queued query can be validated and fail while queued. This is done by using the new custom plan checker SPI to add plugin that will trigger a failure when validating the plan. See also: #23596 RFC: https://github.com/prestodb/rfcs/blob/main/RFC-0008-plan-checker.md
1 parent 4b0ab66 commit b7814df

File tree

32 files changed

+578
-27
lines changed

32 files changed

+578
-27
lines changed

presto-main/src/main/java/com/facebook/presto/execution/scheduler/SqlQueryScheduler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -594,7 +594,7 @@ private StreamingPlanSection tryCostBasedOptimize(StreamingPlanSection section)
594594
.forEach(currentSubPlan -> {
595595
Optional<PlanFragment> newPlanFragment = performRuntimeOptimizations(currentSubPlan);
596596
if (newPlanFragment.isPresent()) {
597-
planChecker.validatePlanFragment(newPlanFragment.get().getRoot(), session, metadata, warningCollector);
597+
planChecker.validatePlanFragment(newPlanFragment.get(), session, metadata, warningCollector);
598598
oldToNewFragment.put(currentSubPlan.getFragment(), newPlanFragment.get());
599599
}
600600
});

presto-main/src/main/java/com/facebook/presto/server/PluginManager.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import com.facebook.presto.spi.eventlistener.EventListenerFactory;
3737
import com.facebook.presto.spi.function.FunctionNamespaceManagerFactory;
3838
import com.facebook.presto.spi.nodestatus.NodeStatusNotificationProviderFactory;
39+
import com.facebook.presto.spi.plan.PlanCheckerProviderFactory;
3940
import com.facebook.presto.spi.prerequisites.QueryPrerequisitesFactory;
4041
import com.facebook.presto.spi.resourceGroups.ResourceGroupConfigurationManagerFactory;
4142
import com.facebook.presto.spi.security.PasswordAuthenticatorFactory;
@@ -48,6 +49,7 @@
4849
import com.facebook.presto.spi.ttl.NodeTtlFetcherFactory;
4950
import com.facebook.presto.sql.analyzer.AnalyzerProviderManager;
5051
import com.facebook.presto.sql.analyzer.QueryPreparerProviderManager;
52+
import com.facebook.presto.sql.planner.sanity.PlanCheckerProviderManager;
5153
import com.facebook.presto.storage.TempStorageManager;
5254
import com.facebook.presto.tracing.TracerProviderManager;
5355
import com.facebook.presto.ttl.clusterttlprovidermanagers.ClusterTtlProviderManager;
@@ -131,6 +133,7 @@ public class PluginManager
131133
private final AnalyzerProviderManager analyzerProviderManager;
132134
private final QueryPreparerProviderManager queryPreparerProviderManager;
133135
private final NodeStatusNotificationManager nodeStatusNotificationManager;
136+
private final PlanCheckerProviderManager planCheckerProviderManager;
134137

135138
@Inject
136139
public PluginManager(
@@ -152,7 +155,8 @@ public PluginManager(
152155
ClusterTtlProviderManager clusterTtlProviderManager,
153156
HistoryBasedPlanStatisticsManager historyBasedPlanStatisticsManager,
154157
TracerProviderManager tracerProviderManager,
155-
NodeStatusNotificationManager nodeStatusNotificationManager)
158+
NodeStatusNotificationManager nodeStatusNotificationManager,
159+
PlanCheckerProviderManager planCheckerProviderManager)
156160
{
157161
requireNonNull(nodeInfo, "nodeInfo is null");
158162
requireNonNull(config, "config is null");
@@ -184,6 +188,7 @@ public PluginManager(
184188
this.analyzerProviderManager = requireNonNull(analyzerProviderManager, "analyzerProviderManager is null");
185189
this.queryPreparerProviderManager = requireNonNull(queryPreparerProviderManager, "queryPreparerProviderManager is null");
186190
this.nodeStatusNotificationManager = requireNonNull(nodeStatusNotificationManager, "nodeStatusNotificationManager is null");
191+
this.planCheckerProviderManager = requireNonNull(planCheckerProviderManager, "planCheckerProviderManager is null");
187192
}
188193

189194
public void loadPlugins()
@@ -348,6 +353,11 @@ public void installPlugin(Plugin plugin)
348353
log.info("Registering node status notification provider %s", nodeStatusNotificationProviderFactory.getName());
349354
nodeStatusNotificationManager.addNodeStatusNotificationProviderFactory(nodeStatusNotificationProviderFactory);
350355
}
356+
357+
for (PlanCheckerProviderFactory planCheckerProviderFactory : plugin.getPlanCheckerProviderFactories()) {
358+
log.info("Registering plan checker provider factory %s", planCheckerProviderFactory.getName());
359+
planCheckerProviderManager.addPlanCheckerProviderFactory(planCheckerProviderFactory);
360+
}
351361
}
352362

353363
public void installCoordinatorPlugin(CoordinatorPlugin plugin)

presto-main/src/main/java/com/facebook/presto/server/PrestoServer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import com.facebook.presto.server.security.ServerSecurityModule;
4848
import com.facebook.presto.sql.analyzer.FeaturesConfig;
4949
import com.facebook.presto.sql.parser.SqlParserOptions;
50+
import com.facebook.presto.sql.planner.sanity.PlanCheckerProviderManager;
5051
import com.facebook.presto.storage.TempStorageManager;
5152
import com.facebook.presto.storage.TempStorageModule;
5253
import com.facebook.presto.tracing.TracerProviderManager;
@@ -177,6 +178,7 @@ public void run()
177178
injector.getInstance(TracerProviderManager.class).loadTracerProvider();
178179
injector.getInstance(NodeStatusNotificationManager.class).loadNodeStatusNotificationProvider();
179180
injector.getInstance(GracefulShutdownHandler.class).loadNodeStatusNotification();
181+
injector.getInstance(PlanCheckerProviderManager.class).loadPlanCheckerProviders();
180182
startAssociatedProcesses(injector);
181183

182184
injector.getInstance(Announcer.class).start();

presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,8 @@
146146
import com.facebook.presto.spi.PageSorter;
147147
import com.facebook.presto.spi.analyzer.ViewDefinition;
148148
import com.facebook.presto.spi.function.SqlInvokedFunction;
149+
import com.facebook.presto.spi.plan.SimplePlanFragment;
150+
import com.facebook.presto.spi.plan.SimplePlanFragmentSerde;
149151
import com.facebook.presto.spi.relation.DeterminismEvaluator;
150152
import com.facebook.presto.spi.relation.DomainTranslator;
151153
import com.facebook.presto.spi.relation.PredicateCompiler;
@@ -200,7 +202,9 @@
200202
import com.facebook.presto.sql.planner.NodePartitioningManager;
201203
import com.facebook.presto.sql.planner.PartitioningProviderManager;
202204
import com.facebook.presto.sql.planner.PlanFragment;
205+
import com.facebook.presto.sql.planner.plan.JsonCodecSimplePlanFragmentSerde;
203206
import com.facebook.presto.sql.planner.sanity.PlanChecker;
207+
import com.facebook.presto.sql.planner.sanity.PlanCheckerProviderManager;
204208
import com.facebook.presto.sql.relational.RowExpressionDeterminismEvaluator;
205209
import com.facebook.presto.sql.relational.RowExpressionDomainTranslator;
206210
import com.facebook.presto.sql.tree.Expression;
@@ -625,6 +629,8 @@ public ListeningExecutorService createResourceManagerExecutor(ResourceManagerCon
625629
// plan
626630
jsonBinder(binder).addKeySerializerBinding(VariableReferenceExpression.class).to(VariableReferenceExpressionSerializer.class);
627631
jsonBinder(binder).addKeyDeserializerBinding(VariableReferenceExpression.class).to(VariableReferenceExpressionDeserializer.class);
632+
jsonCodecBinder(binder).bindJsonCodec(SimplePlanFragment.class);
633+
binder.bind(SimplePlanFragmentSerde.class).to(JsonCodecSimplePlanFragmentSerde.class).in(Scopes.SINGLETON);
628634

629635
// history statistics
630636
configBinder(binder).bindConfig(HistoryBasedOptimizationConfig.class);
@@ -785,6 +791,8 @@ public ListeningExecutorService createResourceManagerExecutor(ResourceManagerCon
785791
//Optional Status Detector
786792
newOptionalBinder(binder, NodeStatusService.class);
787793
binder.bind(NodeStatusNotificationManager.class).in(Scopes.SINGLETON);
794+
795+
binder.bind(PlanCheckerProviderManager.class).in(Scopes.SINGLETON);
788796
}
789797

790798
@Provides

presto-main/src/main/java/com/facebook/presto/server/testing/TestingPrestoServer.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import com.facebook.presto.sql.planner.ConnectorPlanOptimizerManager;
7575
import com.facebook.presto.sql.planner.NodePartitioningManager;
7676
import com.facebook.presto.sql.planner.Plan;
77+
import com.facebook.presto.sql.planner.sanity.PlanCheckerProviderManager;
7778
import com.facebook.presto.storage.TempStorageManager;
7879
import com.facebook.presto.testing.ProcedureTester;
7980
import com.facebook.presto.testing.TestingAccessControlManager;
@@ -173,6 +174,7 @@ public class TestingPrestoServer
173174
private final boolean nodeSchedulerIncludeCoordinator;
174175
private final ServerInfoResource serverInfoResource;
175176
private final ResourceManagerClusterStateProvider clusterStateProvider;
177+
private final PlanCheckerProviderManager planCheckerProviderManager;
176178

177179
public static class TestShutdownAction
178180
implements ShutdownAction
@@ -379,6 +381,7 @@ public TestingPrestoServer(
379381
statsCalculator = injector.getInstance(StatsCalculator.class);
380382
eventListenerManager = ((TestingEventListenerManager) injector.getInstance(EventListenerManager.class));
381383
clusterStateProvider = null;
384+
planCheckerProviderManager = injector.getInstance(PlanCheckerProviderManager.class);
382385
}
383386
else if (resourceManager) {
384387
dispatchManager = null;
@@ -390,6 +393,7 @@ else if (resourceManager) {
390393
statsCalculator = null;
391394
eventListenerManager = ((TestingEventListenerManager) injector.getInstance(EventListenerManager.class));
392395
clusterStateProvider = injector.getInstance(ResourceManagerClusterStateProvider.class);
396+
planCheckerProviderManager = null;
393397
}
394398
else if (coordinatorSidecar) {
395399
dispatchManager = null;
@@ -401,6 +405,7 @@ else if (coordinatorSidecar) {
401405
statsCalculator = null;
402406
eventListenerManager = null;
403407
clusterStateProvider = null;
408+
planCheckerProviderManager = null;
404409
}
405410
else if (catalogServer) {
406411
dispatchManager = null;
@@ -412,6 +417,7 @@ else if (catalogServer) {
412417
statsCalculator = null;
413418
eventListenerManager = null;
414419
clusterStateProvider = null;
420+
planCheckerProviderManager = null;
415421
}
416422
else {
417423
dispatchManager = null;
@@ -423,6 +429,7 @@ else if (catalogServer) {
423429
statsCalculator = null;
424430
eventListenerManager = null;
425431
clusterStateProvider = null;
432+
planCheckerProviderManager = null;
426433
}
427434
localMemoryManager = injector.getInstance(LocalMemoryManager.class);
428435
nodeManager = injector.getInstance(InternalNodeManager.class);
@@ -662,6 +669,11 @@ public ClusterMemoryManager getClusterMemoryManager()
662669
return (ClusterMemoryManager) clusterMemoryManager;
663670
}
664671

672+
public PlanCheckerProviderManager getPlanCheckerProviderManager()
673+
{
674+
return planCheckerProviderManager;
675+
}
676+
665677
public GracefulShutdownHandler getGracefulShutdownHandler()
666678
{
667679
return gracefulShutdownHandler;

presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,6 @@ private SubPlan buildFragment(PlanNode root, FragmentProperties properties, Plan
140140
properties.getPartitionedSources());
141141

142142
Set<VariableReferenceExpression> fragmentVariableTypes = extractOutputVariables(root);
143-
planChecker.validatePlanFragment(root, session, metadata, warningCollector);
144-
145143
Set<PlanNodeId> tableWriterNodeIds = PlanFragmenterUtils.getTableWriterNodeIds(root);
146144
boolean outputTableWriterFragment = tableWriterNodeIds.stream().anyMatch(outputTableWriterNodeIds::contains);
147145
if (outputTableWriterFragment) {
@@ -164,6 +162,8 @@ private SubPlan buildFragment(PlanNode root, FragmentProperties properties, Plan
164162
Optional.of(statsAndCosts.getForSubplan(root)),
165163
Optional.of(jsonFragmentPlan(root, fragmentVariableTypes, statsAndCosts.getForSubplan(root), metadata.getFunctionAndTypeManager(), session)));
166164

165+
planChecker.validatePlanFragment(fragment, session, metadata, warningCollector);
166+
167167
return new SubPlan(fragment, properties.getChildren());
168168
}
169169

presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.facebook.presto.sql.planner.BasePlanFragmenter.FragmentProperties;
3030
import com.facebook.presto.sql.planner.plan.SimplePlanRewriter;
3131
import com.facebook.presto.sql.planner.sanity.PlanChecker;
32+
import com.facebook.presto.sql.planner.sanity.PlanCheckerProviderManager;
3233
import com.google.common.collect.ImmutableList;
3334

3435
import javax.inject.Inject;
@@ -54,13 +55,13 @@ public class PlanFragmenter
5455
private final PlanChecker singleNodePlanChecker;
5556

5657
@Inject
57-
public PlanFragmenter(Metadata metadata, NodePartitioningManager nodePartitioningManager, QueryManagerConfig queryManagerConfig, FeaturesConfig featuresConfig)
58+
public PlanFragmenter(Metadata metadata, NodePartitioningManager nodePartitioningManager, QueryManagerConfig queryManagerConfig, FeaturesConfig featuresConfig, PlanCheckerProviderManager planCheckerProviderManager)
5859
{
5960
this.metadata = requireNonNull(metadata, "metadata is null");
6061
this.nodePartitioningManager = requireNonNull(nodePartitioningManager, "nodePartitioningManager is null");
6162
this.config = requireNonNull(queryManagerConfig, "queryManagerConfig is null");
62-
this.distributedPlanChecker = new PlanChecker(requireNonNull(featuresConfig, "featuresConfig is null"), false);
63-
this.singleNodePlanChecker = new PlanChecker(requireNonNull(featuresConfig, "featuresConfig is null"), true);
63+
this.distributedPlanChecker = new PlanChecker(requireNonNull(featuresConfig, "featuresConfig is null"), false, planCheckerProviderManager);
64+
this.singleNodePlanChecker = new PlanChecker(requireNonNull(featuresConfig, "featuresConfig is null"), true, planCheckerProviderManager);
6465
}
6566

6667
public SubPlan createSubPlans(Session session, Plan plan, boolean forceSingleNode, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector)
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.sql.planner.plan;
15+
16+
import com.facebook.airlift.json.JsonCodec;
17+
import com.facebook.presto.spi.plan.SimplePlanFragment;
18+
import com.facebook.presto.spi.plan.SimplePlanFragmentSerde;
19+
import com.google.inject.Inject;
20+
21+
import java.nio.charset.StandardCharsets;
22+
23+
import static java.util.Objects.requireNonNull;
24+
25+
public class JsonCodecSimplePlanFragmentSerde
26+
implements SimplePlanFragmentSerde
27+
{
28+
private final JsonCodec<SimplePlanFragment> codec;
29+
30+
@Inject
31+
public JsonCodecSimplePlanFragmentSerde(JsonCodec<SimplePlanFragment> codec)
32+
{
33+
this.codec = requireNonNull(codec, "SimplePlanFragment JSON codec is null");
34+
}
35+
36+
@Override
37+
public String serialize(SimplePlanFragment planFragment)
38+
{
39+
return new String(codec.toBytes(planFragment), StandardCharsets.UTF_8);
40+
}
41+
42+
@Override
43+
public SimplePlanFragment deserialize(String value)
44+
{
45+
return codec.fromBytes(value.getBytes(StandardCharsets.UTF_8));
46+
}
47+
}

presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/PlanChecker.java

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,28 +16,35 @@
1616
import com.facebook.presto.Session;
1717
import com.facebook.presto.metadata.Metadata;
1818
import com.facebook.presto.spi.WarningCollector;
19+
import com.facebook.presto.spi.plan.PlanCheckerProvider;
1920
import com.facebook.presto.spi.plan.PlanNode;
21+
import com.facebook.presto.spi.plan.SimplePlanFragment;
2022
import com.facebook.presto.sql.analyzer.FeaturesConfig;
23+
import com.facebook.presto.sql.planner.PlanFragment;
2124
import com.google.common.collect.ImmutableListMultimap;
2225
import com.google.common.collect.Multimap;
2326

2427
import javax.inject.Inject;
2528

29+
import static java.util.Objects.requireNonNull;
30+
2631
/**
2732
* Perform checks on the plan that may generate warnings or errors.
2833
*/
2934
public final class PlanChecker
3035
{
3136
private final Multimap<Stage, Checker> checkers;
37+
private final PlanCheckerProviderManager planCheckerProviderManager;
3238

3339
@Inject
34-
public PlanChecker(FeaturesConfig featuresConfig)
40+
public PlanChecker(FeaturesConfig featuresConfig, PlanCheckerProviderManager planCheckerProviderManager)
3541
{
36-
this(featuresConfig, false);
42+
this(featuresConfig, false, planCheckerProviderManager);
3743
}
3844

39-
public PlanChecker(FeaturesConfig featuresConfig, boolean forceSingleNode)
45+
public PlanChecker(FeaturesConfig featuresConfig, boolean forceSingleNode, PlanCheckerProviderManager planCheckerProviderManager)
4046
{
47+
this.planCheckerProviderManager = requireNonNull(planCheckerProviderManager, "planCheckerProviderManager is null");
4148
ImmutableListMultimap.Builder<Stage, Checker> builder = ImmutableListMultimap.builder();
4249
builder.putAll(
4350
Stage.INTERMEDIATE,
@@ -78,25 +85,58 @@ public PlanChecker(FeaturesConfig featuresConfig, boolean forceSingleNode)
7885
public void validateFinalPlan(PlanNode planNode, Session session, Metadata metadata, WarningCollector warningCollector)
7986
{
8087
checkers.get(Stage.FINAL).forEach(checker -> checker.validate(planNode, session, metadata, warningCollector));
88+
for (PlanCheckerProvider provider : planCheckerProviderManager.getPlanCheckerProviders()) {
89+
for (com.facebook.presto.spi.plan.PlanChecker checker : provider.getFinalPlanCheckers()) {
90+
checker.validate(planNode, warningCollector);
91+
}
92+
}
8193
}
8294

8395
public void validateIntermediatePlan(PlanNode planNode, Session session, Metadata metadata, WarningCollector warningCollector)
8496
{
8597
checkers.get(Stage.INTERMEDIATE).forEach(checker -> checker.validate(planNode, session, metadata, warningCollector));
98+
for (PlanCheckerProvider provider : planCheckerProviderManager.getPlanCheckerProviders()) {
99+
for (com.facebook.presto.spi.plan.PlanChecker checker : provider.getIntermediatePlanCheckers()) {
100+
checker.validate(planNode, warningCollector);
101+
}
102+
}
86103
}
87104

88-
public void validatePlanFragment(PlanNode planNode, Session session, Metadata metadata, WarningCollector warningCollector)
105+
public void validatePlanFragment(PlanFragment planFragment, Session session, Metadata metadata, WarningCollector warningCollector)
89106
{
90-
checkers.get(Stage.FRAGMENT).forEach(checker -> checker.validate(planNode, session, metadata, warningCollector));
107+
checkers.get(Stage.FRAGMENT).forEach(checker -> checker.validateFragment(planFragment, session, metadata, warningCollector));
108+
for (PlanCheckerProvider provider : planCheckerProviderManager.getPlanCheckerProviders()) {
109+
for (com.facebook.presto.spi.plan.PlanChecker checker : provider.getFragmentPlanCheckers()) {
110+
checker.validateFragment(toSimplePlanFragment(planFragment), warningCollector);
111+
}
112+
}
91113
}
92114

93115
public interface Checker
94116
{
95117
void validate(PlanNode planNode, Session session, Metadata metadata, WarningCollector warningCollector);
118+
119+
default void validateFragment(PlanFragment planFragment, Session session, Metadata metadata, WarningCollector warningCollector)
120+
{
121+
validate(planFragment.getRoot(), session, metadata, warningCollector);
122+
}
96123
}
97124

98125
private enum Stage
99126
{
100127
INTERMEDIATE, FINAL, FRAGMENT
101128
}
129+
130+
private static SimplePlanFragment toSimplePlanFragment(PlanFragment planFragment)
131+
{
132+
return new SimplePlanFragment(
133+
planFragment.getId(),
134+
planFragment.getRoot(),
135+
planFragment.getVariables(),
136+
planFragment.getPartitioning(),
137+
planFragment.getTableScanSchedulingOrder(),
138+
planFragment.getPartitioningScheme(),
139+
planFragment.getStageExecutionDescriptor(),
140+
planFragment.isOutputTableWriterFragment());
141+
}
102142
}

0 commit comments

Comments
 (0)