25
25
import com .facebook .presto .common .type .IntegerType ;
26
26
import com .facebook .presto .common .type .MapType ;
27
27
import com .facebook .presto .common .type .Type ;
28
+ import com .facebook .presto .common .type .TypeManager ;
28
29
import com .facebook .presto .common .type .VarcharType ;
30
+ import com .facebook .presto .sessionpropertyproviders .JavaWorkerSessionPropertyProvider ;
29
31
import com .facebook .presto .spi .ConnectorId ;
32
+ import com .facebook .presto .spi .NodeManager ;
30
33
import com .facebook .presto .spi .PrestoException ;
31
34
import com .facebook .presto .spi .session .PropertyMetadata ;
35
+ import com .facebook .presto .spi .session .SessionPropertyContext ;
36
+ import com .facebook .presto .spi .session .WorkerSessionPropertyProvider ;
37
+ import com .facebook .presto .spi .session .WorkerSessionPropertyProviderFactory ;
38
+ import com .facebook .presto .spiller .NodeSpillConfig ;
39
+ import com .facebook .presto .sql .analyzer .FeaturesConfig ;
40
+ import com .facebook .presto .sql .analyzer .JavaFeaturesConfig ;
32
41
import com .facebook .presto .sql .planner .ParameterRewriter ;
33
42
import com .facebook .presto .sql .tree .Expression ;
34
43
import com .facebook .presto .sql .tree .ExpressionTreeRewriter ;
35
44
import com .facebook .presto .sql .tree .NodeRef ;
36
45
import com .facebook .presto .sql .tree .Parameter ;
46
+ import com .google .common .base .Suppliers ;
37
47
import com .google .common .collect .ImmutableList ;
48
+ import com .google .common .collect .ImmutableMap ;
38
49
import com .google .common .collect .Maps ;
39
50
40
51
import javax .annotation .Nullable ;
47
58
import java .util .TreeMap ;
48
59
import java .util .concurrent .ConcurrentHashMap ;
49
60
import java .util .concurrent .ConcurrentMap ;
61
+ import java .util .function .Supplier ;
50
62
51
63
import static com .facebook .presto .common .type .TypeUtils .writeNativeValue ;
52
64
import static com .facebook .presto .spi .StandardErrorCode .INVALID_SESSION_PROPERTY ;
53
65
import static com .facebook .presto .sql .planner .ExpressionInterpreter .evaluateConstantExpression ;
54
66
import static com .google .common .base .MoreObjects .firstNonNull ;
55
67
import static com .google .common .base .Preconditions .checkState ;
68
+ import static com .google .common .collect .ImmutableList .toImmutableList ;
56
69
import static java .lang .String .format ;
57
70
import static java .util .Objects .requireNonNull ;
71
+ import static java .util .concurrent .TimeUnit .HOURS ;
58
72
59
73
public final class SessionPropertyManager
60
74
{
61
75
private static final JsonCodecFactory JSON_CODEC_FACTORY = new JsonCodecFactory ();
62
76
private final ConcurrentMap <String , PropertyMetadata <?>> systemSessionProperties = new ConcurrentHashMap <>();
63
77
private final ConcurrentMap <ConnectorId , Map <String , PropertyMetadata <?>>> connectorSessionProperties = new ConcurrentHashMap <>();
78
+ private final Map <String , WorkerSessionPropertyProvider > workerSessionPropertyProviders ;
79
+ private final Map <String , WorkerSessionPropertyProviderFactory > workerSessionPropertyProviderFactories = new ConcurrentHashMap <>();
80
+ private final Supplier <Map <String , PropertyMetadata <?>>> memoizedWorkerSessionProperties ;
81
+ private final Optional <NodeManager > nodeManager ;
82
+ private final Optional <TypeManager > functionAndTypeManager ;
64
83
65
- public SessionPropertyManager ()
84
+ @ Inject
85
+ public SessionPropertyManager (
86
+ SystemSessionProperties systemSessionProperties ,
87
+ Map <String , WorkerSessionPropertyProvider > workerSessionPropertyProviders ,
88
+ FunctionAndTypeManager functionAndTypeManager ,
89
+ NodeManager nodeManager )
66
90
{
67
- this (new SystemSessionProperties ( ));
91
+ this (systemSessionProperties . getSessionProperties (), workerSessionPropertyProviders , Optional . ofNullable ( functionAndTypeManager ), Optional . ofNullable ( nodeManager ));
68
92
}
69
93
70
- @ Inject
71
- public SessionPropertyManager (SystemSessionProperties systemSessionProperties )
94
+ public SessionPropertyManager (
95
+ List <PropertyMetadata <?>> sessionProperties ,
96
+ Map <String , WorkerSessionPropertyProvider > workerSessionPropertyProviders ,
97
+ Optional <TypeManager > functionAndTypeManager ,
98
+ Optional <NodeManager > nodeManager )
99
+ {
100
+ this .nodeManager = requireNonNull (nodeManager , "nodeManager is null" );
101
+ this .functionAndTypeManager = requireNonNull (functionAndTypeManager , "functionAndTypeManager is null" );
102
+ this .memoizedWorkerSessionProperties = Suppliers .memoizeWithExpiration (this ::getWorkerSessionProperties ,
103
+ 1 , HOURS );
104
+ this .workerSessionPropertyProviders = new ConcurrentHashMap <>(workerSessionPropertyProviders );
105
+ addSystemSessionProperties (sessionProperties );
106
+ }
107
+
108
+ public static SessionPropertyManager createTestingSessionPropertyManager ()
109
+ {
110
+ return createTestingSessionPropertyManager (new SystemSessionProperties ().getSessionProperties (), new JavaFeaturesConfig (), new NodeSpillConfig ());
111
+ }
112
+
113
+ public static SessionPropertyManager createTestingSessionPropertyManager (SystemSessionProperties systemSessionProperties )
114
+ {
115
+ return createTestingSessionPropertyManager (systemSessionProperties .getSessionProperties (), new JavaFeaturesConfig (), new NodeSpillConfig ());
116
+ }
117
+
118
+ public static SessionPropertyManager createTestingSessionPropertyManager (List <PropertyMetadata <?>> sessionProperties )
119
+ {
120
+ return createTestingSessionPropertyManager (sessionProperties , new JavaFeaturesConfig (), new NodeSpillConfig ());
121
+ }
122
+
123
+ public static SessionPropertyManager createTestingSessionPropertyManager (
124
+ List <PropertyMetadata <?>> sessionProperties ,
125
+ JavaFeaturesConfig javaFeaturesConfig ,
126
+ NodeSpillConfig nodeSpillConfig )
72
127
{
73
- this (systemSessionProperties .getSessionProperties ());
128
+ return new SessionPropertyManager (
129
+ sessionProperties ,
130
+ ImmutableMap .of (
131
+ "java-worker" ,
132
+ new JavaWorkerSessionPropertyProvider (
133
+ new FeaturesConfig (),
134
+ javaFeaturesConfig ,
135
+ nodeSpillConfig )),
136
+ Optional .empty (),
137
+ Optional .empty ());
138
+ }
139
+
140
+ public void loadSessionPropertyProvider (String sessionPropertyProviderName )
141
+ {
142
+ WorkerSessionPropertyProviderFactory factory = workerSessionPropertyProviderFactories .get (sessionPropertyProviderName );
143
+ checkState (factory != null , "No factory for session property provider : " + sessionPropertyProviderName );
144
+ WorkerSessionPropertyProvider sessionPropertyProvider = factory .create (new SessionPropertyContext (functionAndTypeManager , nodeManager ));
145
+ if (workerSessionPropertyProviders .putIfAbsent (sessionPropertyProviderName , sessionPropertyProvider ) != null ) {
146
+ throw new IllegalArgumentException ("System session property provider is already registered for property provider : " + sessionPropertyProviderName );
147
+ }
74
148
}
75
149
76
- public SessionPropertyManager ( List < PropertyMetadata <?>> systemSessionProperties )
150
+ public void loadSessionPropertyProviders ( )
77
151
{
78
- addSystemSessionProperties (systemSessionProperties );
152
+ for (String sessionPropertyProviderName : workerSessionPropertyProviderFactories .keySet ()) {
153
+ loadSessionPropertyProvider (sessionPropertyProviderName );
154
+ }
79
155
}
80
156
81
157
public void addSystemSessionProperties (List <PropertyMetadata <?>> systemSessionProperties )
@@ -108,7 +184,9 @@ public void removeConnectorSessionProperties(ConnectorId connectorId)
108
184
public Optional <PropertyMetadata <?>> getSystemSessionPropertyMetadata (String name )
109
185
{
110
186
requireNonNull (name , "name is null" );
111
-
187
+ if (systemSessionProperties .get (name ) == null ) {
188
+ return Optional .ofNullable (memoizedWorkerSessionProperties .get ().get (name ));
189
+ }
112
190
return Optional .ofNullable (systemSessionProperties .get (name ));
113
191
}
114
192
@@ -124,6 +202,20 @@ public Optional<PropertyMetadata<?>> getConnectorSessionPropertyMetadata(Connect
124
202
return Optional .ofNullable (properties .get (propertyName ));
125
203
}
126
204
205
+ private Map <String , PropertyMetadata <?>> getWorkerSessionProperties ()
206
+ {
207
+ List <PropertyMetadata <?>> workerSessionPropertiesList = workerSessionPropertyProviders .values ().stream ()
208
+ .flatMap (manager -> manager .getSessionProperties ().stream ())
209
+ .collect (toImmutableList ());
210
+ Map <String , PropertyMetadata <?>> workerSessionProperties = new ConcurrentHashMap <>();
211
+ workerSessionPropertiesList .forEach (sessionProperty -> {
212
+ requireNonNull (sessionProperty , "sessionProperty is null" );
213
+ // TODO: Implement fail fast in case of duplicate entries.
214
+ workerSessionProperties .put (sessionProperty .getName (), sessionProperty );
215
+ });
216
+ return workerSessionProperties ;
217
+ }
218
+
127
219
public List <SessionPropertyValue > getAllSessionProperties (Session session , Map <String , ConnectorId > catalogs )
128
220
{
129
221
requireNonNull (session , "session is null" );
@@ -165,6 +257,19 @@ public List<SessionPropertyValue> getAllSessionProperties(Session session, Map<S
165
257
}
166
258
}
167
259
260
+ for (PropertyMetadata <?> property : new TreeMap <>(memoizedWorkerSessionProperties .get ()).values ()) {
261
+ String defaultValue = firstNonNull (property .getDefaultValue (), "" ).toString ();
262
+ String value = systemProperties .getOrDefault (property .getName (), defaultValue );
263
+ sessionPropertyValues .add (new SessionPropertyValue (
264
+ value ,
265
+ defaultValue ,
266
+ property .getName (),
267
+ Optional .empty (),
268
+ property .getName (),
269
+ property .getDescription (),
270
+ property .getSqlType ().getDisplayName (),
271
+ property .isHidden ()));
272
+ }
168
273
return sessionPropertyValues .build ();
169
274
}
170
275
0 commit comments