|
72 | 72 | import com.google.inject.Injector;
|
73 | 73 | import com.google.inject.Module;
|
74 | 74 | import com.google.inject.Provides;
|
| 75 | +import io.airlift.units.DataSize; |
75 | 76 | import io.airlift.units.Duration;
|
76 | 77 | import org.testng.annotations.DataProvider;
|
77 | 78 | import org.testng.annotations.Test;
|
|
90 | 91 | import javax.ws.rs.core.MediaType;
|
91 | 92 | import javax.ws.rs.core.UriInfo;
|
92 | 93 |
|
| 94 | +import java.lang.reflect.Method; |
93 | 95 | import java.net.URI;
|
94 | 96 | import java.util.HashMap;
|
95 | 97 | import java.util.Map;
|
@@ -222,6 +224,66 @@ public void testHTTPRemoteTaskSize()
|
222 | 224 | assertTrue(httpRemoteTaskFactory.getTaskUpdateRequestSize() > 0);
|
223 | 225 | }
|
224 | 226 |
|
| 227 | + @Test(timeOut = 50000) |
| 228 | + public void testHTTPRemoteBadTaskSize() |
| 229 | + throws Exception |
| 230 | + { |
| 231 | + AtomicLong lastActivityNanos = new AtomicLong(System.nanoTime()); |
| 232 | + TestingTaskResource testingTaskResource = new TestingTaskResource(lastActivityNanos, FailureScenario.NO_FAILURE); |
| 233 | + boolean useThriftEncoding = false; |
| 234 | + DataSize maxDataSize = DataSize.succinctBytes(1024); |
| 235 | + InternalCommunicationConfig internalCommunicationConfig = new InternalCommunicationConfig() |
| 236 | + .setThriftTransportEnabled(useThriftEncoding) |
| 237 | + .setMaxTaskUpdateSize(maxDataSize); |
| 238 | + |
| 239 | + HttpRemoteTaskFactory httpRemoteTaskFactory = createHttpRemoteTaskFactory(testingTaskResource, useThriftEncoding, internalCommunicationConfig); |
| 240 | + |
| 241 | + RemoteTask remoteTask = createRemoteTask(httpRemoteTaskFactory); |
| 242 | + testingTaskResource.setInitialTaskInfo(remoteTask.getTaskInfo()); |
| 243 | + remoteTask.start(); |
| 244 | + waitUntilIdle(lastActivityNanos); |
| 245 | + httpRemoteTaskFactory.stop(); |
| 246 | + |
| 247 | + assertTrue(remoteTask.getTaskStatus().getState().isDone(), format("TaskStatus is not in a done state: %s", remoteTask.getTaskStatus())); |
| 248 | + assertEquals(getOnlyElement(remoteTask.getTaskStatus().getFailures()).getMessage(), "TaskUpdate size of 1.97kB has exceeded the limit of 1kB"); |
| 249 | + } |
| 250 | + |
| 251 | + @Test(dataProvider = "getUpdateSize") |
| 252 | + public void testGetExceededTaskUpdateSizeListMessage(int updateSizeInBytes, int maxDataSizeInBytes, |
| 253 | + String expectedMessage) throws Exception |
| 254 | + { |
| 255 | + AtomicLong lastActivityNanos = new AtomicLong(System.nanoTime()); |
| 256 | + TestingTaskResource testingTaskResource = new TestingTaskResource(lastActivityNanos, FailureScenario.NO_FAILURE); |
| 257 | + boolean useThriftEncoding = false; |
| 258 | + DataSize maxDataSize = DataSize.succinctBytes(maxDataSizeInBytes); |
| 259 | + InternalCommunicationConfig internalCommunicationConfig = new InternalCommunicationConfig() |
| 260 | + .setThriftTransportEnabled(useThriftEncoding) |
| 261 | + .setMaxTaskUpdateSize(maxDataSize); |
| 262 | + HttpRemoteTaskFactory httpRemoteTaskFactory = createHttpRemoteTaskFactory(testingTaskResource, useThriftEncoding, internalCommunicationConfig); |
| 263 | + RemoteTask remoteTask = createRemoteTask(httpRemoteTaskFactory); |
| 264 | + |
| 265 | + Method targetMethod = HttpRemoteTask.class.getDeclaredMethod("getExceededTaskUpdateSizeMessage", new Class[]{byte[].class}); |
| 266 | + targetMethod.setAccessible(true); |
| 267 | + byte[] taskUpdateRequestJson = new byte[updateSizeInBytes]; |
| 268 | + String message = (String) targetMethod.invoke(remoteTask, new Object[]{taskUpdateRequestJson}); |
| 269 | + assertEquals(message, expectedMessage); |
| 270 | + } |
| 271 | + |
| 272 | + @DataProvider(name = "getUpdateSize") |
| 273 | + protected Object[][] getUpdateSize() |
| 274 | + { |
| 275 | + return new Object[][] { |
| 276 | + {2000, 1000, "TaskUpdate size of 1.95kB has exceeded the limit of 1000B"}, |
| 277 | + {2000, 1024, "TaskUpdate size of 1.95kB has exceeded the limit of 1kB"}, |
| 278 | + {5000, 4 * 1024, "TaskUpdate size of 4.88kB has exceeded the limit of 4kB"}, |
| 279 | + {2 * 1024, 1024, "TaskUpdate size of 2kB has exceeded the limit of 1kB"}, |
| 280 | + {1024 * 1024, 512 * 1024, "TaskUpdate size of 1MB has exceeded the limit of 512kB"}, |
| 281 | + {16 * 1024 * 1024, 8 * 1024 * 1024, "TaskUpdate size of 16MB has exceeded the limit of 8MB"}, |
| 282 | + {485 * 1000 * 1000, 1024 * 1024 * 512, "TaskUpdate size of 462.53MB has exceeded the limit of 512MB"}, |
| 283 | + {1024 * 1024 * 1024, 1024 * 1024 * 512, "TaskUpdate size of 1GB has exceeded the limit of 512MB"}, |
| 284 | + {860492511, 524288000, "TaskUpdate size of 820.63MB has exceeded the limit of 500MB"}}; |
| 285 | + } |
| 286 | + |
225 | 287 | private void runTest(FailureScenario failureScenario, boolean useThriftEncoding)
|
226 | 288 | throws Exception
|
227 | 289 | {
|
@@ -272,6 +334,13 @@ private RemoteTask createRemoteTask(HttpRemoteTaskFactory httpRemoteTaskFactory)
|
272 | 334 |
|
273 | 335 | private static HttpRemoteTaskFactory createHttpRemoteTaskFactory(TestingTaskResource testingTaskResource, boolean useThriftEncoding)
|
274 | 336 | throws Exception
|
| 337 | + { |
| 338 | + InternalCommunicationConfig internalCommunicationConfig = new InternalCommunicationConfig().setThriftTransportEnabled(useThriftEncoding); |
| 339 | + return createHttpRemoteTaskFactory(testingTaskResource, useThriftEncoding, internalCommunicationConfig); |
| 340 | + } |
| 341 | + |
| 342 | + private static HttpRemoteTaskFactory createHttpRemoteTaskFactory(TestingTaskResource testingTaskResource, boolean useThriftEncoding, InternalCommunicationConfig internalCommunicationConfig) |
| 343 | + throws Exception |
275 | 344 | {
|
276 | 345 | Bootstrap app = new Bootstrap(
|
277 | 346 | new JsonModule(),
|
@@ -347,7 +416,7 @@ private HttpRemoteTaskFactory createHttpRemoteTaskFactory(
|
347 | 416 | metadataUpdatesJsonCodec,
|
348 | 417 | metadataUpdatesSmileCodec,
|
349 | 418 | new RemoteTaskStats(),
|
350 |
| - new InternalCommunicationConfig().setThriftTransportEnabled(useThriftEncoding), |
| 419 | + internalCommunicationConfig, |
351 | 420 | createTestMetadataManager(),
|
352 | 421 | new TestQueryManager(),
|
353 | 422 | new HandleResolver(),
|
|
0 commit comments