Skip to content

Commit 0e287bf

Browse files
committed
threads
1 parent 2b4b107 commit 0e287bf

File tree

3 files changed

+59
-46
lines changed

3 files changed

+59
-46
lines changed

x/streamer/keeper/distribute.go

Lines changed: 10 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
"cosmossdk.io/math"
77
sdk "github.com/cosmos/cosmos-sdk/types"
8+
"github.com/osmosis-labs/osmosis/v15/osmoutils"
89

910
"github.com/dymensionxyz/dymension/v3/utils/pagination"
1011
"github.com/dymensionxyz/dymension/v3/x/streamer/types"
@@ -53,48 +54,6 @@ func (k Keeper) DistributeToGauge(ctx sdk.Context, coins sdk.Coins, record types
5354
return totalAllocated, nil
5455
}
5556

56-
// UpdateStreamAtEpochStart updates the stream for a new epoch. Streams distribute rewards post factum.
57-
// Meaning, first increase the filled epoch pointer, then distribute rewards for this epoch.
58-
func (k Keeper) UpdateStreamAtEpochStart(ctx sdk.Context, stream types.Stream) (types.Stream, error) {
59-
// Check if stream has completed its distribution. This is a post factum check.
60-
if stream.FilledEpochs >= stream.NumEpochsPaidOver {
61-
err := k.moveActiveStreamToFinishedStream(ctx, stream)
62-
if err != nil {
63-
return types.Stream{}, fmt.Errorf("move active stream to finished stream: %w", err)
64-
}
65-
return stream, nil
66-
}
67-
68-
// If the stream is not finalized, update it for the next distribution
69-
70-
remainCoins := stream.Coins.Sub(stream.DistributedCoins...)
71-
remainEpochs := stream.NumEpochsPaidOver - stream.FilledEpochs
72-
epochCoins := remainCoins.QuoInt(math.NewIntFromUint64(remainEpochs))
73-
74-
// If the stream uses a sponsorship plan, query it and update stream distr info. The distribution
75-
// might be empty and this is a valid scenario. In that case, we'll just skip at without
76-
// filling the epoch.
77-
if stream.Sponsored {
78-
distr, err := k.sk.GetDistribution(ctx)
79-
if err != nil {
80-
return types.Stream{}, fmt.Errorf("get sponsorship distribution: %w", err)
81-
}
82-
// Update stream distr info
83-
stream.DistributeTo = types.DistrInfoFromDistribution(distr)
84-
}
85-
86-
// Add coins to distribute during the next epoch
87-
stream.EpochCoins = epochCoins
88-
89-
// Don't fill streams in which there's nothing to fill. Note that rewards are distributed post factum.
90-
// I.e., first increase the filled epoch number, then distribute rewards during the epoch.
91-
if !stream.DistributeTo.TotalWeight.IsZero() {
92-
stream.FilledEpochs += 1
93-
}
94-
95-
return stream, nil
96-
}
97-
9857
type DistributeRewardsResult struct {
9958
NewPointer types.EpochPointer
10059
FilledStreams []types.Stream
@@ -117,12 +76,18 @@ func (k Keeper) DistributeRewards(
11776

11877
// Distribute to all the remaining gauges that are left after EndBlock
11978
newPointer, iterations := IterateEpochPointer(pointer, streams, limit, func(v StreamGauge) pagination.Stop {
120-
distributed, errX := k.DistributeToGauge(ctx, v.Stream.EpochCoins, v.Gauge, v.Stream.DistributeTo.TotalWeight)
121-
if errX != nil {
79+
var distributed sdk.Coins
80+
err := osmoutils.ApplyFuncIfNoError(ctx, func(ctx sdk.Context) error {
81+
var err error
82+
distributed, err = k.DistributeToGauge(ctx, v.Stream.EpochCoins, v.Gauge, v.Stream.DistributeTo.TotalWeight)
83+
return err
84+
})
85+
if err != nil {
12286
// Ignore this gauge
12387
k.Logger(ctx).
124-
With("streamID", v.Stream.Id, "gaugeID", v.Gauge.GaugeId, "error", errX.Error()).
88+
With("streamID", v.Stream.Id, "gaugeID", v.Gauge.GaugeId, "error", err.Error()).
12589
Error("Failed to distribute to gauge")
90+
return pagination.Continue
12691
}
12792

12893
totalDistributed = totalDistributed.Add(distributed...)

x/streamer/keeper/hooks.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,12 @@ func (k Keeper) AfterEpochEnd(ctx sdk.Context, epochIdentifier string) (sdk.Coin
9797

9898
// Update streams with respect to a new epoch and save them
9999
for _, s := range distrResult.FilledStreams {
100+
updated, err := k.UpdateStreamAtEpochEnd(ctx, s)
101+
if err != nil {
102+
return sdk.Coins{}, fmt.Errorf("update stream '%d' at epoch start: %w", s.Id, err)
103+
}
100104
// Save the stream
101-
err = k.SetStream(ctx, &s)
105+
err = k.SetStream(ctx, &updated)
102106
if err != nil {
103107
return sdk.Coins{}, fmt.Errorf("set stream: %w", err)
104108
}

x/streamer/keeper/stream.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,56 @@ import (
44
"fmt"
55
"sort"
66

7+
"cosmossdk.io/math"
78
sdk "github.com/cosmos/cosmos-sdk/types"
89
"github.com/cosmos/gogoproto/proto"
910

1011
"github.com/dymensionxyz/dymension/v3/x/streamer/types"
1112
)
1213

14+
// UpdateStreamAtEpochStart updates the stream for a new epoch: estimates coins that streamer will
15+
// distribute during this epoch and updates a sponsored distribution if needed.
16+
func (k Keeper) UpdateStreamAtEpochStart(ctx sdk.Context, stream types.Stream) (types.Stream, error) {
17+
remainCoins := stream.Coins.Sub(stream.DistributedCoins...)
18+
remainEpochs := stream.NumEpochsPaidOver - stream.FilledEpochs
19+
epochCoins := remainCoins.QuoInt(math.NewIntFromUint64(remainEpochs))
20+
21+
// If the stream uses a sponsorship plan, query it and update stream distr info. The distribution
22+
// might be empty and this is a valid scenario. In that case, we'll just skip without filling the epoch.
23+
if stream.Sponsored {
24+
distr, err := k.sk.GetDistribution(ctx)
25+
if err != nil {
26+
return types.Stream{}, fmt.Errorf("get sponsorship distribution: %w", err)
27+
}
28+
// Update stream distr info
29+
stream.DistributeTo = types.DistrInfoFromDistribution(distr)
30+
}
31+
32+
// Add coins to distribute during the next epoch
33+
stream.EpochCoins = epochCoins
34+
35+
return stream, nil
36+
}
37+
38+
// UpdateStreamAtEpochEnd updates the stream at the end of the epoch: increases the filled epoch number
39+
// and makes the stream finished if needed.
40+
func (k Keeper) UpdateStreamAtEpochEnd(ctx sdk.Context, stream types.Stream) (types.Stream, error) {
41+
// Don't fill streams in which there's nothing to fill. This might happen when using sponsored streams.
42+
if !stream.DistributeTo.TotalWeight.IsZero() {
43+
stream.FilledEpochs += 1
44+
}
45+
46+
// Check if stream has completed its distribution. This is a post factum check.
47+
if stream.FilledEpochs >= stream.NumEpochsPaidOver {
48+
err := k.moveActiveStreamToFinishedStream(ctx, stream)
49+
if err != nil {
50+
return types.Stream{}, fmt.Errorf("move active stream to finished stream: %w", err)
51+
}
52+
}
53+
54+
return stream, nil
55+
}
56+
1357
// GetStreamByID returns stream from stream ID.
1458
func (k Keeper) GetStreamByID(ctx sdk.Context, streamID uint64) (*types.Stream, error) {
1559
stream := types.Stream{}

0 commit comments

Comments
 (0)