Skip to content

Commit edc39a3

Browse files
author
Сатаров Юрий Сергеевич
committed
[DEX-2853] feat: failed item caching
1 parent 0449cd2 commit edc39a3

File tree

11 files changed

+374
-22
lines changed

11 files changed

+374
-22
lines changed

.rubocop_todo.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
# Offense count: 1
1010
# Configuration parameters: AllowedMethods, AllowedPatterns, CountRepeatedAttributes.
1111
Metrics/AbcSize:
12-
Max: 18
12+
Max: 19
1313

1414
# Offense count: 5
1515
# Configuration parameters: CountComments, CountAsOne, AllowedMethods, AllowedPatterns.

CHANGELOG.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,17 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
1313

1414
### Fixed
1515

16+
## [6.18.0] - 2025-02-04
17+
18+
### Added
19+
20+
- Add failed item caching in case of unrecoverable database conn issues
21+
- Add `retry_latency` metric to measure retries
22+
23+
### Fixed
24+
25+
- Fix item processing cutoff timeout to be less than generic redis lock timeout
26+
1627
## [6.17.0] - 2025-01-30
1728

1829
### Added

app/interactors/sbmt/outbox/process_item.rb

Lines changed: 68 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,24 @@
11
# frozen_string_literal: true
22

33
require "sbmt/outbox/metrics/utils"
4+
require "sbmt/outbox/v2/redis_item_meta"
45

56
module Sbmt
67
module Outbox
78
class ProcessItem < Sbmt::Outbox::DryInteractor
89
param :item_class, reader: :private
910
param :item_id, reader: :private
1011
option :worker_version, reader: :private, optional: true, default: -> { 1 }
12+
option :cache_ttl_sec, reader: :private, optional: true, default: -> { 5 * 60 }
13+
option :redis, reader: :private, optional: true, default: -> {}
1114

1215
METRICS_COUNTERS = %i[error_counter retry_counter sent_counter fetch_error_counter discarded_counter].freeze
1316

14-
delegate :log_success, :log_info, :log_failure, to: "Sbmt::Outbox.logger"
17+
delegate :log_success, :log_info, :log_failure, :log_debug, to: "Sbmt::Outbox.logger"
1518
delegate :item_process_middlewares, to: "Sbmt::Outbox"
1619
delegate :box_type, :box_name, :owner, to: :item_class
1720

18-
attr_accessor :process_latency
21+
attr_accessor :process_latency, :retry_latency
1922

2023
def call
2124
log_success(
@@ -26,9 +29,23 @@ def call
2629
item = nil
2730

2831
item_class.transaction do
29-
item = yield fetch_item
32+
item = yield fetch_item_and_lock_for_update
33+
34+
cached_item = fetch_redis_item_meta(redis_item_key(item_id))
35+
if cached_retries_exceeded?(cached_item)
36+
msg = "max retries exceeded: marking item as failed based on cached data: #{cached_item}"
37+
item.set_errors_count(cached_item.errors_count)
38+
track_failed(msg, item)
39+
next Failure(msg)
40+
end
41+
42+
if cached_greater_errors_count?(item, cached_item)
43+
log_failure("inconsistent item: cached_errors_count:#{cached_item.errors_count} > db_errors_count:#{item.errors_count}: setting errors_count based on cached data:#{cached_item}")
44+
item.set_errors_count(cached_item.errors_count)
45+
end
3046

3147
if item.processed_at?
48+
self.retry_latency = Time.current - item.created_at
3249
item.config.retry_strategies.each do |retry_strategy|
3350
yield check_retry_strategy(item, retry_strategy)
3451
end
@@ -62,7 +79,48 @@ def call
6279

6380
private
6481

65-
def fetch_item
82+
def cached_retries_exceeded?(cached_item)
83+
return false unless cached_item
84+
85+
item_class.max_retries_exceeded?(cached_item.errors_count)
86+
end
87+
88+
def cached_greater_errors_count?(db_item, cached_item)
89+
return false unless cached_item
90+
91+
cached_item.errors_count > db_item.errors_count
92+
end
93+
94+
def fetch_redis_item_meta(redis_key)
95+
return if worker_version < 2
96+
97+
data = redis.call("GET", redis_key)
98+
return if data.blank?
99+
100+
Sbmt::Outbox::V2::RedisItemMeta.deserialize!(data)
101+
rescue => ex
102+
log_debug("error while fetching redis meta: #{ex.message}")
103+
nil
104+
end
105+
106+
def set_redis_item_meta(item, ex)
107+
return if worker_version < 2
108+
return if item.nil?
109+
110+
redis_key = redis_item_key(item.id)
111+
error_msg = format_exception_error(ex, extract_cause: false)
112+
data = Sbmt::Outbox::V2::RedisItemMeta.new(errors_count: item.errors_count, error_msg: error_msg)
113+
redis.call("SET", redis_key, data.to_s, "EX", cache_ttl_sec)
114+
rescue => ex
115+
log_debug("error while fetching redis meta: #{ex.message}")
116+
nil
117+
end
118+
119+
def redis_item_key(item_id)
120+
"#{box_type}:#{item_class.box_name}:#{item_id}"
121+
end
122+
123+
def fetch_item_and_lock_for_update
66124
item = item_class
67125
.lock("FOR UPDATE")
68126
.find_by(id: item_id)
@@ -171,6 +229,7 @@ def track_failed(ex_or_msg, item = nil)
171229
item.pending!
172230
end
173231
rescue => e
232+
set_redis_item_meta(item, e)
174233
log_error_handling_error(e, item)
175234
end
176235

@@ -259,6 +318,7 @@ def report_metrics(item)
259318
end
260319

261320
track_process_latency(labels) if process_latency
321+
track_retry_latency(labels) if retry_latency
262322

263323
return unless counters[:sent_counter].positive?
264324

@@ -279,6 +339,10 @@ def counters
279339
def track_process_latency(labels)
280340
Yabeda.outbox.process_latency.measure(labels, process_latency.round(3))
281341
end
342+
343+
def track_retry_latency(labels)
344+
Yabeda.outbox.retry_latency.measure(labels, retry_latency.round(3))
345+
end
282346
end
283347
end
284348
end

app/models/sbmt/outbox/base_item.rb

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,17 @@ def bucket_partitions
4949
end
5050
end
5151
end
52+
53+
def max_retries_exceeded?(count)
54+
return false if config.strict_order
55+
return true unless retriable?
56+
57+
count > config.max_retries
58+
end
59+
60+
def retriable?
61+
config.max_retries > 0
62+
end
5263
end
5364

5465
enum :status, {
@@ -135,20 +146,21 @@ def touch_processed_at
135146
end
136147

137148
def retriable?
138-
config.max_retries > 0
149+
self.class.retriable?
139150
end
140151

141152
def max_retries_exceeded?
142-
return false if config.strict_order
143-
return true unless retriable?
144-
145-
errors_count > config.max_retries
153+
self.class.max_retries_exceeded?(errors_count)
146154
end
147155

148156
def increment_errors_counter
149157
increment(:errors_count)
150158
end
151159

160+
def set_errors_count(count)
161+
self.errors_count = count
162+
end
163+
152164
def add_error(ex_or_msg)
153165
increment_errors_counter
154166

config/initializers/yabeda.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,12 @@
5050
buckets: [0.005, 0.01, 0.05, 0.1, 0.25, 0.5, 1, 2, 5, 10, 20, 30].freeze,
5151
comment: "A histogram for outbox/inbox deletion latency"
5252

53+
histogram :retry_latency,
54+
tags: %i[type name partition owner],
55+
unit: :seconds,
56+
buckets: [1, 10, 20, 50, 120, 300, 900, 1800, 3600].freeze,
57+
comment: "A histogram outbox retry latency"
58+
5359
counter :deleted_counter,
5460
tags: %i[box_type box_name],
5561
comment: "A counter for the number of deleted outbox/inbox items"

lib/sbmt/outbox/engine.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ class Engine < Rails::Engine
2525
c.cdn_url = "https://cdn.jsdelivr.net/npm/sbmt-outbox-ui@0.0.8/dist/assets/index.js"
2626
end
2727
c.process_items = ActiveSupport::OrderedOptions.new.tap do |c|
28-
c.general_timeout = 120
29-
c.cutoff_timeout = 60
28+
c.general_timeout = 180
29+
c.cutoff_timeout = 90
3030
c.batch_size = 200
3131
end
3232
c.worker = ActiveSupport::OrderedOptions.new.tap do |c|
@@ -54,8 +54,8 @@ class Engine < Rails::Engine
5454
end
5555
c.processor = ActiveSupport::OrderedOptions.new.tap do |pc|
5656
pc.threads_count = 4
57-
pc.general_timeout = 120
58-
pc.cutoff_timeout = 60
57+
pc.general_timeout = 180
58+
pc.cutoff_timeout = 90
5959
pc.brpop_delay = 1
6060
end
6161

lib/sbmt/outbox/v2/processor.rb

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,24 @@ module Outbox
1010
module V2
1111
class Processor < BoxProcessor
1212
delegate :processor_config, :batch_process_middlewares, :logger, to: "Sbmt::Outbox"
13-
attr_reader :lock_timeout, :brpop_delay
13+
attr_reader :lock_timeout, :cache_ttl, :cutoff_timeout, :brpop_delay
1414

1515
REDIS_BRPOP_MIN_DELAY = 0.1
1616

1717
def initialize(
1818
boxes,
1919
threads_count: nil,
2020
lock_timeout: nil,
21+
cache_ttl: nil,
22+
cutoff_timeout: nil,
2123
brpop_delay: nil,
2224
redis: nil
2325
)
2426
@lock_timeout = lock_timeout || processor_config.general_timeout
27+
@cache_ttl = cache_ttl || @lock_timeout * 10
28+
@cutoff_timeout = cutoff_timeout || processor_config.cutoff_timeout
2529
@brpop_delay = brpop_delay || redis_brpop_delay(boxes.count, processor_config.brpop_delay)
30+
@redis = redis
2631

2732
super(boxes: boxes, threads_count: threads_count || processor_config.threads_count, name: "processor", redis: redis)
2833
end
@@ -66,14 +71,19 @@ def lock_task(scheduled_task)
6671
end
6772

6873
def process(task)
69-
lock_timer = Cutoff.new(lock_timeout)
74+
lock_timer = Cutoff.new(cutoff_timeout)
7075
last_id = 0
7176
strict_order = task.item_class.config.strict_order
7277

7378
box_worker.item_execution_runtime.measure(task.yabeda_labels) do
7479
Outbox.database_switcher.use_master do
7580
task.ids.each do |id|
76-
result = ProcessItem.call(task.item_class, id, worker_version: task.yabeda_labels[:worker_version])
81+
result = ProcessItem.call(
82+
task.item_class, id,
83+
worker_version: task.yabeda_labels[:worker_version],
84+
cache_ttl_sec: cache_ttl,
85+
redis: @redis
86+
)
7787

7888
box_worker.job_items_counter.increment(task.yabeda_labels)
7989
last_id = id

lib/sbmt/outbox/v2/redis_item_meta.rb

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# frozen_string_literal: true
2+
3+
module Sbmt
4+
module Outbox
5+
module V2
6+
class RedisItemMeta
7+
attr_reader :version, :timestamp, :errors_count, :error_msg
8+
9+
CURRENT_VERSION = 1
10+
MAX_ERROR_LEN = 200
11+
12+
def initialize(errors_count:, error_msg:, timestamp: Time.current.to_i, version: CURRENT_VERSION)
13+
@errors_count = errors_count
14+
@error_msg = error_msg
15+
@timestamp = timestamp
16+
@version = version
17+
end
18+
19+
def to_s
20+
serialize
21+
end
22+
23+
def serialize
24+
JSON.generate({
25+
version: version,
26+
timestamp: timestamp,
27+
errors_count: errors_count,
28+
error_msg: error_msg.slice(0, MAX_ERROR_LEN)
29+
})
30+
end
31+
32+
def self.deserialize!(value)
33+
raise "invalid data type: string is required" unless value.is_a?(String)
34+
35+
data = JSON.parse!(value, max_nesting: 1)
36+
new(
37+
version: data["version"],
38+
timestamp: data["timestamp"].to_i,
39+
errors_count: data["errors_count"].to_i,
40+
error_msg: data["error_msg"]
41+
)
42+
end
43+
end
44+
end
45+
end
46+
end

lib/sbmt/outbox/version.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@
22

33
module Sbmt
44
module Outbox
5-
VERSION = "6.17.0"
5+
VERSION = "6.18.0"
66
end
77
end

0 commit comments

Comments
 (0)