@@ -26,6 +26,7 @@ import (
26
26
"github.com/pkg/errors"
27
27
"github.com/prometheus-community/parquet-common/queryable"
28
28
"github.com/prometheus-community/parquet-common/schema"
29
+ "github.com/prometheus-community/parquet-common/search"
29
30
parquetStorage "github.com/prometheus-community/parquet-common/storage"
30
31
"github.com/prometheus/prometheus/model/labels"
31
32
"github.com/prometheus/prometheus/storage"
@@ -595,7 +596,15 @@ func (s *ParquetBucketStore) createLabelsAndChunksIterators(
595
596
}
596
597
597
598
decoder := schema .NewPrometheusParquetChunksDecoder (chunkenc .NewPool ())
598
- q , err := parquet .NewParquetChunkQuerier (decoder , shardsFinder , s .querierOpts ... )
599
+ opts := s .querierOpts
600
+ if shardSelector != nil {
601
+ shardFilter := func (_ context.Context , _ * storage.SelectHints ) (search.MaterializedLabelsFilter , bool ) {
602
+ return materializedLabelsShardFilter {shardSelector : shardSelector }, true
603
+ }
604
+ opts = append (opts , parquet .WithMaterializedLabelsFilterCallback (shardFilter ))
605
+ }
606
+
607
+ q , err := parquet .NewParquetChunkQuerier (decoder , shardsFinder , opts ... )
599
608
if err != nil {
600
609
return nil , nil , errors .Wrap (err , "error creating parquet queryable" )
601
610
}
@@ -618,7 +627,7 @@ func (s *ParquetBucketStore) createLabelsAndChunksIterators(
618
627
// not support streaming results, the iterator we get from q.Select is
619
628
// already backed by a slice. So we are not losing as much as it may seem.
620
629
// We are planning to implement proper streaming.
621
- lbls , aggrChunks , err := toLabelsAndAggChunksSlice (chunkSeriesSet , shardSelector , req .SkipChunks )
630
+ lbls , aggrChunks , err := toLabelsAndAggChunksSlice (chunkSeriesSet , req .SkipChunks )
622
631
if err != nil {
623
632
return nil , nil , errors .Wrap (err , "error converting parquet series set to labels and chunks slice" )
624
633
}
@@ -642,6 +651,31 @@ func (s *ParquetBucketStore) createLabelsAndChunksIterators(
642
651
return labelsIt , newConcreteIterator (aggrChunks ), nil
643
652
}
644
653
654
+ type materializedLabelsShardFilter struct {
655
+ shardSelector * sharding.ShardSelector
656
+ }
657
+
658
+ func (f materializedLabelsShardFilter ) Filter (lbls labels.Labels ) bool {
659
+ return shardOwnedUncached (f .shardSelector , lbls )
660
+ }
661
+
662
+ func (f materializedLabelsShardFilter ) Close () {
663
+ }
664
+
665
+ // shardOwnedUncached checks if the given labels belong to the shard specified
666
+ // by the shard selector. As opposed to shardOwned & friends from the
667
+ // non-Parquet path, this function does not cache hashes. This is because, at
668
+ // least yet, we don't have easy access to an identifier for the series in the
669
+ // block to use as a cache key.
670
+ func shardOwnedUncached (shard * sharding.ShardSelector , lset labels.Labels ) bool {
671
+ if shard == nil {
672
+ return true
673
+ }
674
+
675
+ hash := labels .StableHash (lset )
676
+ return hash % shard .ShardCount == shard .ShardIndex
677
+ }
678
+
645
679
func (s * ParquetBucketStore ) sendStreamingSeriesLabelsAndStats (req * storepb.SeriesRequest , srv storegatewaypb.StoreGateway_SeriesServer , stats * safeQueryStats , labelsIt iterator [labels.Labels ]) (numSeries int , err error ) {
646
680
var (
647
681
encodeDuration = time .Duration (0 )
@@ -1131,17 +1165,13 @@ func (s *ParquetBucketStore) cleanUpUnownedBlocks() error {
1131
1165
// storage.ChunkSeriesSet and returns them as slices, converting the chunks to
1132
1166
// storepb.AggrChunk format. If skipChunks is true, the chunks slice will be
1133
1167
// empty.
1134
- func toLabelsAndAggChunksSlice (chunkSeriesSet storage.ChunkSeriesSet , shardSelector * sharding. ShardSelector , skipChunks bool ) ([]labels.Labels , [][]storepb.AggrChunk , error ) {
1168
+ func toLabelsAndAggChunksSlice (chunkSeriesSet storage.ChunkSeriesSet , skipChunks bool ) ([]labels.Labels , [][]storepb.AggrChunk , error ) {
1135
1169
var seriesLabels []labels.Labels
1136
1170
var aggrChunks [][]storepb.AggrChunk
1137
1171
1138
1172
for chunkSeriesSet .Next () {
1139
1173
chunkSeries := chunkSeriesSet .At ()
1140
1174
lbls := chunkSeries .Labels ()
1141
- if ! shardOwnedUncached (shardSelector , lbls ) {
1142
- continue
1143
- }
1144
-
1145
1175
seriesLabels = append (seriesLabels , lbls )
1146
1176
1147
1177
if skipChunks {
@@ -1168,20 +1198,6 @@ func toLabelsAndAggChunksSlice(chunkSeriesSet storage.ChunkSeriesSet, shardSelec
1168
1198
return seriesLabels , aggrChunks , chunkSeriesSet .Err ()
1169
1199
}
1170
1200
1171
- // shardOwnedUncached checks if the given labels belong to the shard specified
1172
- // by the shard selector. As opposed to shardOwned & friends from the
1173
- // non-Parquet path, this function does not cache hashes. This is because, at
1174
- // least yet, we don't have easy access to an identifier for the series in the
1175
- // block to use as a cache key.
1176
- func shardOwnedUncached (shard * sharding.ShardSelector , lset labels.Labels ) bool {
1177
- if shard == nil {
1178
- return true
1179
- }
1180
-
1181
- hash := labels .StableHash (lset )
1182
- return hash % shard .ShardCount == shard .ShardIndex
1183
- }
1184
-
1185
1201
func prometheusChunkEncodingToStorePBChunkType (enc chunkenc.Encoding ) storepb.Chunk_Encoding {
1186
1202
switch enc {
1187
1203
case chunkenc .EncXOR :
0 commit comments