Skip to content

Commit 9f54207

Browse files
lukmanulhakkeemebyhr
authored andcommitted
Fix aggregate queries with labels column in prometheus
Remove the map to Block conversion of MetricHeader to resolve the issue with aggregate queries with labels column in Prometheus Co-authored-by: Yuya Ebihara <ebyhry@gmail.com>
1 parent e4c7e05 commit 9f54207

File tree

4 files changed

+27
-18
lines changed

4 files changed

+27
-18
lines changed

presto-prometheus/src/main/java/com/facebook/presto/plugin/prometheus/PrometheusRecordCursor.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ private Object getFieldValue(int field)
270270
int columnIndex = fieldToColumnIndex[field];
271271
switch (columnIndex) {
272272
case 0:
273-
return fields.getLabels();
273+
return getBlockFromMap(columnHandles.get(columnIndex).getColumnType(), fields.getLabels());
274274
case 1:
275275
return fields.getTimestamp();
276276
case 2:
@@ -284,13 +284,12 @@ private void checkFieldType(int field, Type expected)
284284
Type actual = getType(field);
285285
checkArgument(actual.equals(expected), "Expected field %s to be type %s but is %s", field, expected, actual);
286286
}
287-
288287
private List<PrometheusStandardizedRow> prometheusResultsInStandardizedForm(List<PrometheusMetricResult> results)
289288
{
290289
return results.stream().map(result ->
291290
result.getTimeSeriesValues().getValues().stream().map(prometheusTimeSeriesValue ->
292291
new PrometheusStandardizedRow(
293-
getBlockFromMap(columnHandles.get(0).getColumnType(), metricHeaderToMap(result.getMetricHeader())),
292+
result.getMetricHeader(),
294293
prometheusTimeSeriesValue.getTimestamp(),
295294
Double.parseDouble(prometheusTimeSeriesValue.getValue())))
296295
.collect(Collectors.toList()))

presto-prometheus/src/main/java/com/facebook/presto/plugin/prometheus/PrometheusStandardizedRow.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,26 +13,25 @@
1313
*/
1414
package com.facebook.presto.plugin.prometheus;
1515

16-
import com.facebook.presto.common.block.Block;
17-
1816
import java.time.Instant;
17+
import java.util.Map;
1918

2019
import static java.util.Objects.requireNonNull;
2120

2221
public class PrometheusStandardizedRow
2322
{
24-
private final Block labels;
23+
private final Map<String, String> labels;
2524
private final Instant timestamp;
2625
private final Double value;
2726

28-
public PrometheusStandardizedRow(Block labels, Instant timestamp, Double value)
27+
public PrometheusStandardizedRow(Map<String, String> labels, Instant timestamp, Double value)
2928
{
3029
this.labels = requireNonNull(labels, "labels is null");
3130
this.timestamp = requireNonNull(timestamp, "timestamp is null");
3231
this.value = requireNonNull(value, "value is null");
3332
}
3433

35-
public Block getLabels()
34+
public Map<String, String> getLabels()
3635
{
3736
return labels;
3837
}

presto-prometheus/src/test/java/com/facebook/presto/plugin/prometheus/TestPrometheusMetricsIntegration.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,4 +118,13 @@ public void testPushDown()
118118
MaterializedResult results = runner.execute(session, "SELECT * FROM prometheus.default.up WHERE timestamp > (NOW() - INTERVAL '15' SECOND)").toTestTypes();
119119
assertEquals(results.getRowCount(), 1);
120120
}
121+
@Test(priority = 3, dependsOnMethods = "testConfirmMetricAvailableAndCheckUp")
122+
public void testCountQuery()
123+
{
124+
MaterializedResult countResult = runner.execute(session, "SELECT COUNT(*) FROM prometheus.default.up").toTestTypes();
125+
assertEquals(countResult.getRowCount(), 1);
126+
MaterializedRow countRow = countResult.getMaterializedRows().get(0);
127+
long countValue = (Long) countRow.getField(0);
128+
assert countValue >= 1 : "Expected COUNT(*) to be >= 1, but got " + countValue;
129+
}
121130
}

presto-prometheus/src/test/java/com/facebook/presto/plugin/prometheus/TestPrometheusRecordSet.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import static com.facebook.presto.plugin.prometheus.PrometheusRecordCursor.getBlockFromMap;
3737
import static com.facebook.presto.plugin.prometheus.PrometheusRecordCursor.getMapFromBlock;
3838
import static com.facebook.presto.plugin.prometheus.TestPrometheusTable.TYPE_MANAGER;
39+
import static com.google.common.collect.ImmutableMap.toImmutableMap;
3940
import static java.time.Instant.ofEpochMilli;
4041
import static org.testng.Assert.assertEquals;
4142
import static org.testng.Assert.assertFalse;
@@ -64,27 +65,28 @@ public void testCursorSimple()
6465
List<PrometheusStandardizedRow> actual = new ArrayList<>();
6566
while (cursor.advanceNextPosition()) {
6667
actual.add(new PrometheusStandardizedRow(
67-
(Block) cursor.getObject(0),
68+
getMapFromBlock(varcharMapType, (Block) cursor.getObject(0)).entrySet().stream()
69+
.collect(toImmutableMap(entry -> (String) entry.getKey(), entry -> (String) entry.getValue())),
6870
((Instant) cursor.getObject(1)),
6971
cursor.getDouble(2)));
7072
assertFalse(cursor.isNull(0));
7173
assertFalse(cursor.isNull(1));
7274
assertFalse(cursor.isNull(2));
7375
}
7476
List<PrometheusStandardizedRow> expected = ImmutableList.<PrometheusStandardizedRow>builder()
75-
.add(new PrometheusStandardizedRow(getBlockFromMap(varcharMapType,
76-
ImmutableMap.of("instance", "localhost:9090", "__name__", "up", "job", "prometheus")), ofEpochMilli(1565962969044L), 1.0))
77-
.add(new PrometheusStandardizedRow(getBlockFromMap(varcharMapType,
78-
ImmutableMap.of("instance", "localhost:9090", "__name__", "up", "job", "prometheus")), ofEpochMilli(1565962984045L), 1.0))
79-
.add(new PrometheusStandardizedRow(getBlockFromMap(varcharMapType,
80-
ImmutableMap.of("instance", "localhost:9090", "__name__", "up", "job", "prometheus")), ofEpochMilli(1565962999044L), 1.0))
81-
.add(new PrometheusStandardizedRow(getBlockFromMap(varcharMapType,
82-
ImmutableMap.of("instance", "localhost:9090", "__name__", "up", "job", "prometheus")), ofEpochMilli(1565963014044L), 1.0))
77+
.add(new PrometheusStandardizedRow(
78+
ImmutableMap.of("instance", "localhost:9090", "__name__", "up", "job", "prometheus"), ofEpochMilli(1565962969044L), 1.0))
79+
.add(new PrometheusStandardizedRow(
80+
ImmutableMap.of("instance", "localhost:9090", "__name__", "up", "job", "prometheus"), ofEpochMilli(1565962984045L), 1.0))
81+
.add(new PrometheusStandardizedRow(
82+
ImmutableMap.of("instance", "localhost:9090", "__name__", "up", "job", "prometheus"), ofEpochMilli(1565962999044L), 1.0))
83+
.add(new PrometheusStandardizedRow(
84+
ImmutableMap.of("instance", "localhost:9090", "__name__", "up", "job", "prometheus"), ofEpochMilli(1565963014044L), 1.0))
8385
.build();
8486
List<PairLike<PrometheusStandardizedRow, PrometheusStandardizedRow>> pairs = Streams.zip(actual.stream(), expected.stream(), PairLike::new)
8587
.collect(Collectors.toList());
8688
pairs.forEach(pair -> {
87-
assertEquals(getMapFromBlock(varcharMapType, pair.getFirst().getLabels()), getMapFromBlock(varcharMapType, pair.getSecond().getLabels()));
89+
assertEquals(getMapFromBlock(varcharMapType, getBlockFromMap(varcharMapType, pair.getFirst().getLabels())), getMapFromBlock(varcharMapType, getBlockFromMap(varcharMapType, pair.getSecond().getLabels())));
8890
assertEquals(pair.getFirst().getTimestamp(), pair.getSecond().getTimestamp());
8991
assertEquals(pair.getFirst().getValue(), pair.getSecond().getValue());
9092
});

0 commit comments

Comments
 (0)