Skip to content

Commit e577b58

Browse files
alexweavfrancoposa
authored andcommitted
mimirpb: Associate metadata to series when possible during RW2 conversion (#12342)
#### What this PR does RW2 attaches metric metadata to series, where RW1 carries metadata separately. Previously, we took a fast path, where we always generate new series just to carry each metadata. This PR changes the conversion to metadata into the corresponding series, if both are present in the same request. The main benefit is on requests that are already RW2 (and tend to carry a mixture of series and metadata). We squeeze some additional bytes out of the new format by not repeating the label name references two times per metric family, plus dropping extra `repeated TimeSeriesRW2` separators in protobuf. It does not change data-only or metadata-only pushes. We now do extra work in the distributor to match these elements up. Previously we theorized this wasn't worth it, but now after analyzing typical distributor profile data, I expect the extra work here to not be as impactful on perf or TCO as once thought. The upside is we keep the record size absolutely minimized, as clients shift over to send RW2 push requests. #### Which issue(s) this PR fixes or relates to contrib: grafana/mimir-squad#2253 #### Checklist - [x] Tests updated. - [ ] Documentation added. - [ ] `CHANGELOG.md` updated - the order of entries should be `[CHANGE]`, `[FEATURE]`, `[ENHANCEMENT]`, `[BUGFIX]`. If changelog entry is not needed, please add the `changelog-not-needed` label to the PR. - [ ] [`about-versioning.md`](https://github.com/grafana/mimir/blob/main/docs/sources/mimir/configure/about-versioning.md) updated with experimental features.
1 parent 822e7d6 commit e577b58

File tree

5 files changed

+102
-79
lines changed

5 files changed

+102
-79
lines changed

pkg/mimirpb/prealloc_rw2.go

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@
22

33
package mimirpb
44

5-
import "fmt"
5+
import (
6+
"fmt"
7+
8+
"github.com/prometheus/prometheus/model/labels"
9+
)
610

711
func ReuseRW2(req *WriteRequest) {
812
reuseSymbolsSlice(req.SymbolsRW2)
@@ -30,27 +34,53 @@ func FromWriteRequestToRW2Request(rw1 *WriteRequest, commonSymbols []string, off
3034
defer reuseSymbolsTable(symbols)
3135
symbols.ConfigureCommonSymbols(offset, commonSymbols)
3236

37+
var metadataMap map[string]*MetricMetadata
38+
if len(rw1.Metadata) > 0 && len(rw1.Timeseries) > 0 {
39+
metadataMap = make(map[string]*MetricMetadata, len(rw1.Metadata))
40+
for _, meta := range rw1.Metadata {
41+
metadataMap[meta.MetricFamilyName] = meta
42+
}
43+
}
44+
3345
rw2Timeseries := make([]TimeSeriesRW2, 0, len(rw1.Timeseries)+len(rw1.Metadata)) // TODO: Pool-ify this allocation
3446
for _, ts := range rw1.Timeseries {
3547
refs := make([]uint32, 0, len(ts.Labels)*2) // TODO: Pool-ify this allocation
48+
metricName := ""
3649
for i := range ts.Labels {
50+
if ts.Labels[i].Name == labels.MetricName {
51+
metricName = ts.Labels[i].Value
52+
}
3753
refs = append(refs, symbols.Symbolize(ts.Labels[i].Name), symbols.Symbolize(ts.Labels[i].Value))
3854
}
3955

56+
var metadata MetadataRW2
57+
if len(metadataMap) > 0 {
58+
if rw1Metadata, ok := metadataMap[metricName]; ok {
59+
metadata = FromMetricMetadataToMetadataRW2(rw1Metadata, symbols)
60+
delete(metadataMap, metricName)
61+
}
62+
}
63+
4064
rw2Timeseries = append(rw2Timeseries, TimeSeriesRW2{
4165
LabelsRefs: refs,
4266
Samples: ts.Samples,
4367
Histograms: ts.Histograms,
4468
Exemplars: FromExemplarsToExemplarsRW2(ts.Exemplars, symbols),
45-
Metadata: MetadataRW2{},
69+
Metadata: metadata,
4670
CreatedTimestamp: ts.CreatedTimestamp,
4771
})
4872
}
4973

50-
// Represent metadata as extra, empty timeseries rather than attaching it to existing series.
51-
// We never replicate metadata if there are multiple series with the same name, and it removes the requirement to match up the metadata to the right series.
74+
// If there are extra metadata not associated with any timeseries, we fabricate an empty timeseries to carry it.
5275
for _, meta := range rw1.Metadata {
53-
labelsRefs := []uint32{symbols.Symbolize("__name__"), symbols.Symbolize(meta.MetricFamilyName)}
76+
// If we are matching metadata to series, and we already matched this metadata, we can skip it.
77+
if metadataMap != nil {
78+
if _, ok := metadataMap[meta.MetricFamilyName]; !ok {
79+
continue
80+
}
81+
}
82+
83+
labelsRefs := []uint32{symbols.Symbolize(labels.MetricName), symbols.Symbolize(meta.MetricFamilyName)}
5484
rw2meta := FromMetricMetadataToMetadataRW2(meta, symbols)
5585
rw2Timeseries = append(rw2Timeseries, TimeSeriesRW2{
5686
LabelsRefs: labelsRefs,

pkg/mimirpb/prealloc_rw2_test.go

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ func TestWriteRequestRW2Conversion(t *testing.T) {
237237
require.Equal(t, expTimeseries, rw2.TimeseriesRW2)
238238
})
239239

240-
t.Run("metadata", func(t *testing.T) {
240+
t.Run("metadata matching existing series", func(t *testing.T) {
241241
req := &WriteRequest{
242242
Timeseries: []PreallocTimeseries{
243243
{
@@ -264,15 +264,67 @@ func TestWriteRequestRW2Conversion(t *testing.T) {
264264
{
265265
LabelsRefs: []uint32{1, 2, 3, 4},
266266
Samples: []Sample{{Value: 123, TimestampMs: 1234567890}, {Value: 456, TimestampMs: 1234567900}},
267+
Metadata: MetadataRW2{
268+
Type: METRIC_TYPE_COUNTER,
269+
HelpRef: 5,
270+
UnitRef: 6,
271+
},
272+
},
273+
}
274+
require.NoError(t, err)
275+
require.Nil(t, rw2.Timeseries)
276+
require.Nil(t, rw2.Metadata)
277+
require.Equal(t, expSymbols, rw2.SymbolsRW2)
278+
require.Equal(t, expTimeseries, rw2.TimeseriesRW2)
279+
})
280+
281+
t.Run("metadata without corresponding series", func(t *testing.T) {
282+
req := &WriteRequest{
283+
Timeseries: []PreallocTimeseries{
284+
{
285+
TimeSeries: &TimeSeries{
286+
Labels: FromLabelsToLabelAdapters(labels.FromMap(map[string]string{"__name__": "my_cool_series", "job": "foo/bar"})),
287+
Samples: []Sample{{Value: 123, TimestampMs: 1234567890}, {Value: 456, TimestampMs: 1234567900}},
288+
},
289+
},
290+
},
291+
Metadata: []*MetricMetadata{
292+
{
293+
Type: COUNTER,
294+
MetricFamilyName: "my_cool_series",
295+
Help: "It's a cool series.",
296+
Unit: "megawatts",
297+
},
298+
{
299+
Type: COUNTER,
300+
MetricFamilyName: "my_cool_series_2",
301+
Help: "It's a second cool series.",
302+
Unit: "gigawatts",
303+
},
267304
},
305+
}
306+
307+
rw2, err := FromWriteRequestToRW2Request(req, nil, 0)
308+
309+
expSymbols := []string{"", "__name__", "my_cool_series", "job", "foo/bar", "It's a cool series.", "megawatts", "my_cool_series_2", "It's a second cool series.", "gigawatts"}
310+
expTimeseries := []TimeSeriesRW2{
268311
{
269-
LabelsRefs: []uint32{1, 2},
312+
LabelsRefs: []uint32{1, 2, 3, 4},
313+
Samples: []Sample{{Value: 123, TimestampMs: 1234567890}, {Value: 456, TimestampMs: 1234567900}},
270314
Metadata: MetadataRW2{
271315
Type: METRIC_TYPE_COUNTER,
272316
HelpRef: 5,
273317
UnitRef: 6,
274318
},
275319
},
320+
{
321+
LabelsRefs: []uint32{1, 7},
322+
Metadata: MetadataRW2{
323+
Type: METRIC_TYPE_COUNTER,
324+
HelpRef: 8,
325+
UnitRef: 9,
326+
},
327+
},
276328
}
277329
require.NoError(t, err)
278330
require.Nil(t, rw2.Timeseries)

pkg/mimirpb/split_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,7 @@ func TestSplitWriteRequestByMaxMarshalSize_Fuzzy(t *testing.T) {
388388
merged.Metadata = append(merged.Metadata, partial.Metadata...)
389389
}
390390

391-
assert.Equal(t, req, merged)
391+
require.Equal(t, req, merged)
392392
}
393393
})
394394

@@ -406,14 +406,14 @@ func TestSplitWriteRequestByMaxMarshalSize_Fuzzy(t *testing.T) {
406406
// Remember the original request, to compare against.
407407
req := generateWriteRequestRW2(numSeries, numLabelsPerSeries, numSamplesPerSeries, numMetadata)
408408
reqCpy := generateWriteRequestRW2(numSeries, numLabelsPerSeries, numSamplesPerSeries, numMetadata)
409-
assert.Equal(t, req, reqCpy)
409+
require.Equal(t, req, reqCpy)
410410

411411
maxSize := req.Size() / (1 + rnd.Intn(10))
412412
partials := SplitWriteRequestByMaxMarshalSizeRW2(req, req.Size(), maxSize, 0, nil)
413413

414414
// Ensure the merge of all partial requests is equal to the original one.
415415
merged := mergeRW2s(partials)
416-
assert.Equal(t, reqCpy, merged)
416+
require.Equal(t, reqCpy, merged)
417417
}
418418
})
419419
}
@@ -720,6 +720,11 @@ func mergeRW2s(partials []*WriteRequest) *WriteRequest {
720720
newLbls[i] = st.Symbolize(strVal)
721721
}
722722

723+
helpTxt := partial.SymbolsRW2[ts.Metadata.HelpRef]
724+
helpRef := st.Symbolize(helpTxt)
725+
unitTxt := partial.SymbolsRW2[ts.Metadata.UnitRef]
726+
unitRef := st.Symbolize(unitTxt)
727+
723728
newExemplars := make([]ExemplarRW2, 0, len(ts.Exemplars))
724729
for _, ex := range ts.Exemplars {
725730
newLbls := make([]uint32, len(ex.LabelsRefs))
@@ -737,11 +742,6 @@ func mergeRW2s(partials []*WriteRequest) *WriteRequest {
737742
newExemplars = nil
738743
}
739744

740-
helpTxt := partial.SymbolsRW2[ts.Metadata.HelpRef]
741-
helpRef := st.Symbolize(helpTxt)
742-
unitTxt := partial.SymbolsRW2[ts.Metadata.UnitRef]
743-
unitRef := st.Symbolize(unitTxt)
744-
745745
newTS := TimeSeriesRW2{
746746
LabelsRefs: newLbls,
747747
Samples: ts.Samples,

pkg/storage/ingest/version_test.go

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -243,30 +243,19 @@ func TestRecordSerializer(t *testing.T) {
243243
err = DeserializeRecordContent(record.Value, resultReq, 2)
244244
require.NoError(t, err)
245245

246-
require.Len(t, resultReq.Timeseries, 5)
246+
require.Len(t, resultReq.Timeseries, 3)
247247
require.Equal(t, req.Timeseries, resultReq.Timeseries[0:2])
248248

249249
// The only way to carry a metadata in RW2.0 is attached to a timeseries.
250250
// Metadata not attached to any series in the request must fabricate extra timeseries to house it.
251-
// In format V2 we also avoid the metadata-to-series matching operation to avoid ambiguity in the event multiple series share a family name.
252251
expMetadataSeries := []mimirpb.PreallocTimeseries{
253-
{TimeSeries: &mimirpb.TimeSeries{
254-
Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings(labels.MetricName, "series_1")),
255-
Samples: []mimirpb.Sample{},
256-
Exemplars: []mimirpb.Exemplar{},
257-
}},
258-
{TimeSeries: &mimirpb.TimeSeries{
259-
Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings(labels.MetricName, "series_2")),
260-
Samples: []mimirpb.Sample{},
261-
Exemplars: []mimirpb.Exemplar{},
262-
}},
263252
{TimeSeries: &mimirpb.TimeSeries{
264253
Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings(labels.MetricName, "series_3")),
265254
Samples: []mimirpb.Sample{},
266255
Exemplars: []mimirpb.Exemplar{},
267256
}},
268257
}
269-
require.Equal(t, expMetadataSeries, resultReq.Timeseries[2:5])
258+
require.Equal(t, expMetadataSeries, resultReq.Timeseries[2:])
270259

271260
require.Nil(t, resultReq.SymbolsRW2)
272261
require.Nil(t, resultReq.TimeseriesRW2)

pkg/storage/ingest/writer_test.go

Lines changed: 3 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -989,27 +989,6 @@ func TestMarshalWriteRequestToRecords(t *testing.T) {
989989
Exemplars: []mimirpb.Exemplar{},
990990
},
991991
},
992-
{
993-
TimeSeries: &mimirpb.TimeSeries{
994-
Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings("__name__", "series_1")),
995-
Samples: []mimirpb.Sample{},
996-
Exemplars: []mimirpb.Exemplar{},
997-
},
998-
},
999-
{
1000-
TimeSeries: &mimirpb.TimeSeries{
1001-
Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings("__name__", "series_2")),
1002-
Samples: []mimirpb.Sample{},
1003-
Exemplars: []mimirpb.Exemplar{},
1004-
},
1005-
},
1006-
{
1007-
TimeSeries: &mimirpb.TimeSeries{
1008-
Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings("__name__", "series_3")),
1009-
Samples: []mimirpb.Sample{},
1010-
Exemplars: []mimirpb.Exemplar{},
1011-
},
1012-
},
1013992
}
1014993
assert.Equal(t, expTimeseries, actual.Timeseries)
1015994
})
@@ -1064,7 +1043,7 @@ func TestMarshalWriteRequestToRecords(t *testing.T) {
10641043

10651044
records, err := marshalWriteRequestToRecords(1, "user-1", req, limit, splitRequestVersionTwo)
10661045
require.NoError(t, err)
1067-
require.Len(t, records, 4)
1046+
require.Len(t, records, 3)
10681047

10691048
// Assert each record, and decode all partial WriteRequests.
10701049
partials := make([]*mimirpb.WriteRequest, 0, len(records))
@@ -1097,33 +1076,6 @@ func TestMarshalWriteRequestToRecords(t *testing.T) {
10971076
Exemplars: []mimirpb.Exemplar{},
10981077
},
10991078
},
1100-
{
1101-
TimeSeries: &mimirpb.TimeSeries{
1102-
Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings("__name__", "series_2")),
1103-
Samples: []mimirpb.Sample{{TimestampMs: 1, Value: 2.0}},
1104-
Exemplars: []mimirpb.Exemplar{},
1105-
},
1106-
},
1107-
},
1108-
Metadata: []*mimirpb.MetricMetadata{},
1109-
}, {
1110-
Source: mimirpb.RULE,
1111-
SkipLabelValidation: true,
1112-
Timeseries: []mimirpb.PreallocTimeseries{
1113-
{
1114-
TimeSeries: &mimirpb.TimeSeries{
1115-
Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings("__name__", "series_3")),
1116-
Samples: []mimirpb.Sample{{TimestampMs: 1, Value: 2.0}},
1117-
Exemplars: []mimirpb.Exemplar{},
1118-
},
1119-
},
1120-
{
1121-
TimeSeries: &mimirpb.TimeSeries{
1122-
Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings("__name__", "series_1")),
1123-
Samples: []mimirpb.Sample{},
1124-
Exemplars: []mimirpb.Exemplar{},
1125-
},
1126-
},
11271079
},
11281080
Metadata: []*mimirpb.MetricMetadata{
11291081
{
@@ -1139,7 +1091,7 @@ func TestMarshalWriteRequestToRecords(t *testing.T) {
11391091
{
11401092
TimeSeries: &mimirpb.TimeSeries{
11411093
Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings("__name__", "series_2")),
1142-
Samples: []mimirpb.Sample{},
1094+
Samples: []mimirpb.Sample{{TimestampMs: 1, Value: 2.0}},
11431095
Exemplars: []mimirpb.Exemplar{},
11441096
},
11451097
},
@@ -1158,7 +1110,7 @@ func TestMarshalWriteRequestToRecords(t *testing.T) {
11581110
{
11591111
TimeSeries: &mimirpb.TimeSeries{
11601112
Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings("__name__", "series_3")),
1161-
Samples: []mimirpb.Sample{},
1113+
Samples: []mimirpb.Sample{{TimestampMs: 1, Value: 2.0}},
11621114
Exemplars: []mimirpb.Exemplar{},
11631115
},
11641116
},

0 commit comments

Comments
 (0)