Skip to content

Commit 8caab4a

Browse files
authored
Merge pull request #25 from rarimo/fix/ipfs-retry
Add retries for ipfs request
2 parents 0974765 + dea4b9d commit 8caab4a

File tree

6 files changed

+87
-15
lines changed

6 files changed

+87
-15
lines changed

config.yaml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,12 @@ voting_v2:
4040
gas_limit: 800000
4141
enable: true
4242
check_with_subscribe: false
43-
ipfs_url: "https://ipfs.rarimo.com"
4443

44+
ipfs:
45+
url: "https://ipfs.rarimo.com/ipfs"
46+
max_retries: 3
47+
min_abnormal_period: 2s
48+
max_abnormal_period: 8s
4549

4650
replicator:
4751
address: 0x10f370A6d8782E0e0E85ba948be6DA2465Aab4E2

internal/config/ipfs.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package config
2+
3+
import (
4+
"time"
5+
6+
"gitlab.com/distributed_lab/figure/v3"
7+
"gitlab.com/distributed_lab/kit/kv"
8+
"gitlab.com/distributed_lab/logan/v3"
9+
"gitlab.com/distributed_lab/logan/v3/errors"
10+
)
11+
12+
const (
13+
ipfsYamlKey = "ipfs"
14+
)
15+
16+
type Ipfs struct {
17+
Url string `fig:"url"`
18+
MaxRetries uint64 `fig:"max_retries"`
19+
MinAbnormalPeriod time.Duration `fig:"min_abnormal_period"`
20+
MaxAbnormalPeriod time.Duration `fig:"max_abnormal_period"`
21+
}
22+
23+
func (c *config) Ipfs() Ipfs {
24+
return c.ipfs.Do(func() interface{} {
25+
result := Ipfs{
26+
Url: "https://ipfs.rarimo.com/ipfs",
27+
MaxRetries: 5,
28+
MinAbnormalPeriod: 30 * time.Second,
29+
MaxAbnormalPeriod: 30 * time.Second,
30+
}
31+
32+
err := figure.
33+
Out(&result).
34+
With(figure.BaseHooks).
35+
From(kv.MustGetStringMap(c.getter, ipfsYamlKey)).
36+
Please()
37+
if err != nil {
38+
panic(errors.Wrap(err, "failed to figure out config", logan.F{"key": ipfsYamlKey}))
39+
}
40+
41+
return result
42+
}).(Ipfs)
43+
}

internal/config/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ type Config interface {
1818
VotingV2Configer
1919
Pinger() Pinger
2020
Replicator() Replicator
21+
Ipfs() Ipfs
2122
}
2223

2324
type config struct {
@@ -32,6 +33,7 @@ type config struct {
3233

3334
pinger comfig.Once
3435
replicator comfig.Once
36+
ipfs comfig.Once
3537
}
3638

3739
func New(getter kv.Getter) Config {

internal/config/voting_v2_config.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ type VotingV2Config struct {
4444
Block uint64
4545
Enable bool
4646
WithSub bool
47-
IpfsUrl string
4847
}
4948

5049
func (e *ethereumVoting) VotingV2Config() *VotingV2Config {
@@ -78,7 +77,6 @@ func (e *ethereumVoting) VotingV2Config() *VotingV2Config {
7877
GasLimit uint64 `fig:"gas_limit"`
7978
Enable bool `fig:"enable"`
8079
WithSub bool `fig:"check_with_subscribe"`
81-
IpfsUrl string `fig:"ipfs_url"`
8280
}{}
8381
err := figure.
8482
Out(&networkConfig).
@@ -110,7 +108,6 @@ func (e *ethereumVoting) VotingV2Config() *VotingV2Config {
110108
result.mut = &sync.Mutex{}
111109
result.Block = networkConfig.Block
112110
result.GasLimit = networkConfig.GasLimit
113-
result.IpfsUrl = networkConfig.IpfsUrl
114111
return &result
115112
}).(*VotingV2Config)
116113
}

internal/service/checker/process_logs.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ import (
1717
"gitlab.com/distributed_lab/logan/v3/errors"
1818
)
1919

20+
var DuplicateEventErr = errors.New("Duplicate event in db")
21+
2022
// processLog based on the event name and filtered log, it parses and update the relevant information
2123
func (ch *checker) processLog(vLog types.Log, eventName string) error {
2224
var processedEvent data.ProcessedEvent
@@ -27,6 +29,13 @@ func (ch *checker) processLog(vLog types.Log, eventName string) error {
2729
processedEvent.TransactionHash = vLog.TxHash[:]
2830
err := ch.insertProcessedEventLog(processedEvent)
2931
if err != nil {
32+
if err == DuplicateEventErr {
33+
ch.log.WithFields(logan.F{
34+
"hash_tx": vLog.TxHash.Hex(),
35+
"log_index": processedEvent.LogIndex,
36+
}).Debug("event already processed and stored")
37+
return nil
38+
}
3039
return errors.Wrap(err, "failed insert processed event")
3140
}
3241
sender, err := ch.getSender(vLog.TxHash)
@@ -149,7 +158,7 @@ func (ch *checker) getSender(txHash common.Hash) (string, error) {
149158
func (ch *checker) insertProcessedEventLog(processedEvent data.ProcessedEvent) error {
150159
isExist, err := ch.processedEventQ.CheckProcessedEventExist(processedEvent)
151160
if isExist {
152-
return errors.New("Duplicate event in db")
161+
return DuplicateEventErr
153162
}
154163
if err != nil {
155164
return errors.Wrap(err, "failed to check processed event exist in db")

internal/service/checker/proposal_info.go

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package checker
22

33
import (
4+
"context"
45
"encoding/hex"
56
"encoding/json"
67
"fmt"
@@ -16,6 +17,7 @@ import (
1617
"github.com/rarimo/proof-verification-relayer/internal/data"
1718
"github.com/rarimo/proof-verification-relayer/resources"
1819
"gitlab.com/distributed_lab/logan/v3/errors"
20+
"gitlab.com/distributed_lab/running"
1921
)
2022

2123
type ProposalInfoFromContract struct {
@@ -34,7 +36,6 @@ type ProposalInfoFromContract struct {
3436
func GetProposalInfo(proposalId int64, cfg config.Config, creatorAddress string) (*ProposalInfoFromContract, error) {
3537
contractAddress := cfg.VotingV2Config().Address
3638
client := cfg.VotingV2Config().RPC
37-
ipfsUrl := cfg.VotingV2Config().IpfsUrl
3839
caller, err := contracts.NewProposalsStateCaller(contractAddress, client)
3940
if err != nil {
4041
return nil, errors.Wrap(err, "failed get new ProposalsStateCaller")
@@ -49,9 +50,21 @@ func GetProposalInfo(proposalId int64, cfg config.Config, creatorAddress string)
4950
endTimestamp := proposalInfo.Config.StartTimestamp + proposalInfo.Config.Duration
5051

5152
// Description in proposalInfo.Config contains the CID for the resource that corresponds to this proposal in the IPFS repository
52-
desc, err := getProposalDescFromIpfs(proposalInfo.Config.Description, ipfsUrl)
53-
if err != nil {
54-
return nil, errors.Wrap(err, "failed get proposal info from ipfs")
53+
proposalMetadata := &resources.ProposalInfoAttributesMetadata{}
54+
var lastErr error
55+
running.WithThreshold(context.Background(), cfg.Log(), "getProposalDescFromIpfs", func(ctx context.Context) (bool, error) {
56+
desc, err := getProposalDescFromIpfs(proposalInfo.Config.Description, cfg.Ipfs().Url)
57+
if err != nil {
58+
lastErr = err
59+
return false, errors.Wrap(err, "failed get proposal info from ipfs")
60+
}
61+
62+
proposalMetadata = desc
63+
return true, nil
64+
}, cfg.Ipfs().MinAbnormalPeriod, cfg.Ipfs().MaxAbnormalPeriod, cfg.Ipfs().MaxRetries)
65+
66+
if lastErr != nil {
67+
return nil, errors.Wrap(lastErr, "failed get proposal info from ipfs")
5568
}
5669
votingConfig := proposalInfo.Config
5770

@@ -118,7 +131,7 @@ func GetProposalInfo(proposalId int64, cfg config.Config, creatorAddress string)
118131
return &ProposalInfoFromContract{
119132
StartTimestamp: int64(startTimeStamp),
120133
EndTimestamp: int64(endTimestamp),
121-
Metadata: *desc,
134+
Metadata: *proposalMetadata,
122135
ParsedVotingWhitelistData: parsedVotingWhitelistData,
123136
}, nil
124137
}
@@ -228,22 +241,26 @@ func getBoundaryInYears(boundaryDateHex string, startTimeStamp uint64) (int64, e
228241

229242
// getProposalDescFromIpfs requests and returns proposal information from ipfs by CID
230243
func getProposalDescFromIpfs(desId string, ipfsUrl string) (*resources.ProposalInfoAttributesMetadata, error) {
244+
var data resources.ProposalInfoAttributesMetadata
231245
requestURL := fmt.Sprintf("%s/%s", ipfsUrl, desId)
246+
232247
resp, err := http.Get(requestURL)
233248
if err != nil {
234-
return nil, fmt.Errorf("error making http request: %v", err)
249+
return nil, fmt.Errorf("failed to make request: %v", err)
235250
}
236251

237-
defer resp.Body.Close()
252+
if resp.StatusCode != http.StatusOK {
253+
return nil, fmt.Errorf("non-200 response from ipfs: %d", resp.StatusCode)
254+
}
238255

239256
body, err := io.ReadAll(resp.Body)
257+
resp.Body.Close()
240258
if err != nil {
241-
return nil, fmt.Errorf("failed read body response: %v", err)
259+
return nil, fmt.Errorf("failed to read body: %v", err)
242260
}
243261

244-
var data resources.ProposalInfoAttributesMetadata
245262
if err := json.Unmarshal(body, &data); err != nil {
246-
return nil, fmt.Errorf("failed parse JSON: %v", err)
263+
return nil, fmt.Errorf("failed to parse JSON: %v", err)
247264
}
248265

249266
return &data, nil

0 commit comments

Comments
 (0)