|
35 | 35 | import java.time.Duration;
|
36 | 36 | import java.time.Instant;
|
37 | 37 | import java.util.Optional;
|
| 38 | +import java.util.concurrent.TimeUnit; |
38 | 39 | import java.util.function.Function;
|
39 | 40 |
|
40 | 41 | /**
|
|
51 | 52 | public class TombstoneEmittingProcessor<K, V> extends ContextualFixedKeyProcessor<K, V, V> {
|
52 | 53 |
|
53 | 54 | private static final Logger LOGGER = LoggerFactory.getLogger(TombstoneEmittingProcessor.class);
|
| 55 | + private static final int PUNCTUATE_LATENCY_MEASUREMENT_RECORD_THRESHOLD = 1000; |
| 56 | + private static final long MAX_PUNCTUATE_LATENCY_MS = TimeUnit.SECONDS.toMillis(5); |
54 | 57 |
|
55 | 58 | private final String storeName;
|
56 | 59 | private final Duration checkInterval;
|
@@ -94,8 +97,35 @@ public void close() {
|
94 | 97 | private void punctuate(final long timestamp) {
|
95 | 98 | final Instant cutoffTimestamp = Instant.ofEpochMilli(timestamp).minus(maxLifetime);
|
96 | 99 |
|
| 100 | + long recordsProcessed = 0; |
| 101 | + final long punctuateStartTimeMillis = System.currentTimeMillis(); |
| 102 | + long punctuateLatencyMillis; |
| 103 | + |
97 | 104 | try (final KeyValueIterator<K, Long> all = store.all()) {
|
98 | 105 | while (all.hasNext()) {
|
| 106 | + // Only measure punctuation latency every N records to reduce the |
| 107 | + // CPU overhead of time comparisons. |
| 108 | + recordsProcessed++; |
| 109 | + if (recordsProcessed % PUNCTUATE_LATENCY_MEASUREMENT_RECORD_THRESHOLD == 0) { |
| 110 | + // For large state stores, it could happen that iterating over, and deserializing, |
| 111 | + // all entries takes too long. Punctuation blocks record processing and thus also |
| 112 | + // consumer polling. Enforce a ceiling as to how long punctuation can reasonably |
| 113 | + // take to prevent the consumer from being considered dead by the broker. |
| 114 | + // |
| 115 | + // Punctuation may be invoked for every single record so eventually the |
| 116 | + // work will get done, although spread over a longer time frame. |
| 117 | + punctuateLatencyMillis = System.currentTimeMillis() - punctuateStartTimeMillis; |
| 118 | + if (punctuateLatencyMillis >= MAX_PUNCTUATE_LATENCY_MS) { |
| 119 | + LOGGER.warn(""" |
| 120 | + Punctuator took {}ms to iterate over state store records. \ |
| 121 | + The maximum punctuation latency is {}ms. Aborting iteration \ |
| 122 | + to not block stream task thread for too long.""", |
| 123 | + punctuateLatencyMillis, |
| 124 | + MAX_PUNCTUATE_LATENCY_MS); |
| 125 | + break; |
| 126 | + } |
| 127 | + } |
| 128 | + |
99 | 129 | final KeyValue<K, Long> record = all.next();
|
100 | 130 | if (record.value != null && Instant.ofEpochMilli(record.value).isBefore(cutoffTimestamp)) {
|
101 | 131 | LOGGER.debug("Sending tombstone for key {}", record.key);
|
|
0 commit comments