@@ -7,7 +7,7 @@ import { ITxLogItem } from '@dequanto/txs/receipt/ITxLogItem';
7
7
import { $date } from '@dequanto/utils/$date' ;
8
8
import { $require } from '@dequanto/utils/$require' ;
9
9
import { TAddress } from '@dequanto/models/TAddress' ;
10
- import { IEventsIndexerMetaStore , IEventsIndexerStore } from './storage/interfaces' ;
10
+ import { IEventsIndexerMetaStore , IEventsIndexerStore , TEventsIndexerItem } from './storage/interfaces' ;
11
11
import { FsEventsIndexerStore } from './storage/FsEventsIndexerStore' ;
12
12
import { FsEventsMetaStore } from './storage/FsEventsMetaStore' ;
13
13
import { class_Dfr } from 'atma-utils' ;
@@ -17,12 +17,12 @@ import { TEth } from '@dequanto/models/TEth';
17
17
import { l } from '@dequanto/utils/$logger' ;
18
18
19
19
20
- export class EventsIndexer < T extends ContractBase > {
20
+ export class EventsIndexer < TContract extends ContractBase > {
21
21
22
22
public store : IEventsIndexerStore
23
23
public storeMeta : IEventsIndexerMetaStore
24
24
25
- constructor ( public contract : T , public options : {
25
+ constructor ( public contract : TContract , public options : {
26
26
// Load events from the contract that was deployed to multiple addresses
27
27
addresses ?: TAddress [ ]
28
28
name ?: string
@@ -71,37 +71,40 @@ export class EventsIndexer <T extends ContractBase> {
71
71
$require . True ( this . store instanceof FsEventsIndexerStore ) ;
72
72
$require . True ( this . storeMeta instanceof FsEventsMetaStore ) ;
73
73
74
- await ( this . store as FsEventsIndexerStore < T > ) . ensureMigrated ( ) ;
75
- await ( this . storeMeta as FsEventsMetaStore < T > ) . ensureMigrated ( ) ;
74
+ await ( this . store as FsEventsIndexerStore < TContract > ) . ensureMigrated ( ) ;
75
+ await ( this . storeMeta as FsEventsMetaStore < TContract > ) . ensureMigrated ( ) ;
76
76
}
77
77
78
78
async getPastLogs <
79
- TLogName extends GetEventLogNames < T > ,
79
+ TLogName extends GetEventLogNames < TContract > ,
80
80
> (
81
81
event : TLogName | TLogName [ ] | '*' ,
82
- // Fetch all logs and filter later if needed
83
- //- params?: T[TMethodName] extends (options: { params?: infer TParams }) => any ? TParams : never,
84
82
filter ?: {
85
83
fromBlock ?: number
86
84
toBlock ?: number
85
+ params ?: TLogName extends keyof TContract
86
+ ? ( TContract [ TLogName ] extends ( options : { params ?: infer TParams } ) => any ? TParams : never )
87
+ : never ,
87
88
}
88
89
) : Promise < {
89
- logs : ITxLogItem < GetTypes < T > [ 'Events' ] [ TLogName ] [ 'outputParams' ] > [ ] ,
90
+ logs : ITxLogItem < GetTypes < TContract > [ 'Events' ] [ TLogName ] [ 'outputParams' ] > [ ] ,
90
91
} > {
91
92
let contract = this . contract ;
92
93
let client = contract . client ;
93
94
let toBlock = filter ?. toBlock ?? await client . getBlockNumber ( ) ;
94
95
let events = Array . isArray ( event ) ? event as string [ ] : [ event as string ] ;
95
- let ranges = await this . getRanges ( events , this . options . initialBlockNumber , toBlock ) ;
96
- let logs = await this . getPastLogsRanges ( ranges , events , toBlock , filter ?. fromBlock ) ;
97
-
96
+ let filterKey = this . getFilterKey ( filter ?. params ) ;
97
+ let ranges = await this . getRanges ( events , this . options . initialBlockNumber , toBlock , { filterKey } ) ;
98
+ let logs = await this . getPastLogsRanges ( ranges , events , toBlock , filter ?. fromBlock , {
99
+ params : filter ?. params as any
100
+ } ) ;
98
101
return {
99
102
logs : logs as any
100
103
} ;
101
104
}
102
105
103
106
async * getPastLogsStream <
104
- TLogName extends GetEventLogNames < T > ,
107
+ TLogName extends GetEventLogNames < TContract > ,
105
108
> (
106
109
event : TLogName | TLogName [ ] | '*' ,
107
110
// Fetch all logs and filter later if needed
@@ -110,10 +113,13 @@ export class EventsIndexer <T extends ContractBase> {
110
113
fromBlock ?: number
111
114
toBlock ?: number
112
115
blockRangeLimits ?: WClient [ 'blockRangeLimits' ]
116
+ params ?: TLogName extends keyof TContract
117
+ ? ( TContract [ TLogName ] extends ( options : { params ?: infer TParams } ) => any ? TParams : never )
118
+ : never ,
113
119
} ,
114
120
) : AsyncGenerator <
115
121
TLogsRangeProgress <
116
- ITxLogItem < GetTypes < T > [ 'Events' ] [ TLogName ] [ 'outputParams' ] , string >
122
+ ITxLogItem < GetTypes < TContract > [ 'Events' ] [ TLogName ] [ 'outputParams' ] , string >
117
123
> // next result
118
124
, void // void returns
119
125
, void // next doesn't get any parameter
@@ -122,12 +128,16 @@ export class EventsIndexer <T extends ContractBase> {
122
128
let client = contract . client ;
123
129
let toBlock = options ?. toBlock ?? await client . getBlockNumber ( ) ;
124
130
let events = Array . isArray ( event ) ? event as string [ ] : [ event as string ] ;
125
- let ranges = await this . getRanges ( events , options ?. fromBlock ?? this . options . initialBlockNumber , toBlock ) ;
131
+ let filterKey = this . getFilterKey ( options ?. params ) ;
132
+ let ranges = await this . getRanges ( events , options ?. fromBlock ?? this . options . initialBlockNumber , toBlock , {
133
+ filterKey
134
+ } ) ;
126
135
127
136
let dfrInner = new class_Dfr < any > ( ) ;
128
137
let dfrOuter = new class_Dfr < any > ( ) ;
129
138
130
139
this . getPastLogsRanges ( ranges , events , toBlock , options ?. fromBlock , {
140
+ params : options ?. params ,
131
141
blockRangeLimits : options ?. blockRangeLimits ,
132
142
streamed : true ,
133
143
async onProgress ( info ) {
@@ -175,9 +185,14 @@ export class EventsIndexer <T extends ContractBase> {
175
185
}
176
186
}
177
187
178
- private async getRanges ( events : string [ ] , initialBlockNumber : number , toBlock : number , fromBlock ?: number ) : Promise < TRange [ ] > {
188
+ private async getRanges ( events : string [ ] , initialBlockNumber : number , toBlock : number , opts : {
189
+ fromBlock ?: number
190
+ filterKey ?: string
191
+ } ) : Promise < TRange [ ] > {
179
192
let logsMetaArr = await this . storeMeta . fetch ( ) ;
180
- let eventsBlock = alot ( logsMetaArr ) . toDictionary ( x => x . event , x => x . lastBlock ) ;
193
+ let eventsBlock = alot ( logsMetaArr )
194
+ . filter ( x => x . filterKey == opts ?. filterKey )
195
+ . toDictionary ( x => x . event , x => x . lastBlock ) ;
181
196
let logsMeta = events . map ( event => {
182
197
let blockNr = eventsBlock [ event ] ?? initialBlockNumber ;
183
198
return {
@@ -187,7 +202,7 @@ export class EventsIndexer <T extends ContractBase> {
187
202
} ) ;
188
203
let hasInitialBlock = logsMeta . every ( x => x . lastBlock != null ) ;
189
204
if ( hasInitialBlock === false ) {
190
- let blockNr = fromBlock ?? await this . getInitialBlockNumber ( ) ;
205
+ let blockNr = opts ?. fromBlock ?? await this . getInitialBlockNumber ( ) ;
191
206
logsMeta . filter ( x => x . lastBlock == null ) . forEach ( x => x . lastBlock = blockNr ) ;
192
207
} ;
193
208
@@ -210,7 +225,7 @@ export class EventsIndexer <T extends ContractBase> {
210
225
}
211
226
return ranges ;
212
227
}
213
- private async getPastLogsRanges ( ranges : TRange [ ] , events : string [ ] , toBlock : number , fromBlock ?: number , options ?: TEventLogOptions < TEth . Log > ) {
228
+ private async getPastLogsRanges ( ranges : TRange [ ] , events : string [ ] , toBlock : number , fromBlock ?: number , options ?: TEventLogOptions < any > ) {
214
229
// Save indexed logs every 2 minutes
215
230
const PERSIST_INTERVAL = $date . parseTimespan ( '2min' ) ;
216
231
// Save indexed logs every 10k logs
@@ -225,7 +240,8 @@ export class EventsIndexer <T extends ContractBase> {
225
240
let arr = await this . getItemsFromStore ( {
226
241
fromBlock : fromBlock ,
227
242
toBlock : toBlock ,
228
- events
243
+ events,
244
+ params : options ?. params ,
229
245
} ) ;
230
246
if ( arr ?. length > 0 ) {
231
247
let latestBlock = arr [ arr . length - 1 ] . blockNumber ;
@@ -266,6 +282,7 @@ export class EventsIndexer <T extends ContractBase> {
266
282
fromBlock : fromBlock ,
267
283
toBlock : toBlock ,
268
284
blockRangeLimits : options ?. blockRangeLimits ,
285
+ params : options ?. params ,
269
286
onProgress : async info => {
270
287
onProgressCount ++ ;
271
288
@@ -302,7 +319,7 @@ export class EventsIndexer <T extends ContractBase> {
302
319
buffer = [ ] ;
303
320
time = now ;
304
321
savedCount += arr . length ;
305
- await this . upsert ( arr , events , info . latestBlock ) ;
322
+ await this . upsert ( arr , events , info . latestBlock , options ) ;
306
323
}
307
324
308
325
if ( options ?. onProgress ) {
@@ -314,13 +331,13 @@ export class EventsIndexer <T extends ContractBase> {
314
331
} ) ;
315
332
316
333
if ( buffer . length > 0 ) {
317
- await this . upsert ( buffer , events , toBlock ) ;
334
+ await this . upsert ( buffer , events , toBlock , options ) ;
318
335
}
319
336
}
320
337
321
338
322
339
// Upsert final, if buffer is empty, we still persist the toBlock
323
- await this . upsert ( buffer , events , toBlock ) ;
340
+ await this . upsert ( buffer , events , toBlock , options ) ;
324
341
325
342
if ( isStreamed === true ) {
326
343
return ;
@@ -329,7 +346,8 @@ export class EventsIndexer <T extends ContractBase> {
329
346
let logs = await this . getItemsFromStore ( {
330
347
fromBlock : fromBlock ,
331
348
toBlock : toBlock + 1 ,
332
- events : events
349
+ events : events ,
350
+ params : options ?. params
333
351
} ) ;
334
352
335
353
return logs ;
@@ -340,16 +358,27 @@ export class EventsIndexer <T extends ContractBase> {
340
358
fromBlock ?: number
341
359
toBlock ?: number
342
360
events ?: string [ ]
361
+ params ?: Record < string , any >
343
362
} ) {
344
363
let arr = await this . store . fetch ( filter ) ;
345
364
let events = filter . events ;
346
365
if ( events ?. [ 0 ] !== '*' ) {
347
366
let requestedEvents = alot ( events ) . toDictionary ( x => x ) ;
348
367
arr = arr . filter ( x => x . event in requestedEvents ) ;
349
368
}
350
-
369
+ let filterKey = this . getFilterKey ( filter . params ) ;
370
+ arr = arr . filter ( x => x . filterKey == filterKey ) ;
351
371
return arr ;
352
372
}
373
+ private getFilterKey ( params ) {
374
+ if ( params == null ) {
375
+ return ;
376
+ }
377
+ let arr = Array . isArray ( params )
378
+ ? params
379
+ : Object . keys ( params ) . map ( key => `${ key } =${ params [ key ] } ` ) ;
380
+ return arr . join ( '_' ) ;
381
+ }
353
382
354
383
private async getInitialBlockNumber ( ) {
355
384
let client = this . contract . client ;
@@ -363,11 +392,19 @@ export class EventsIndexer <T extends ContractBase> {
363
392
}
364
393
365
394
@memd . deco . queued ( )
366
- private async upsert ( logs , events : string [ ] , latestBlock : number ) {
395
+ private async upsert ( logs : TEventsIndexerItem [ ] , events : string [ ] , latestBlock : number , options : { params ? } ) {
396
+ let filterKey = this . getFilterKey ( options ?. params ) ;
367
397
if ( logs ?. length > 0 ) {
398
+ for ( let log of logs ) {
399
+ log . filterKey = filterKey ;
400
+ }
368
401
await this . store . upsertMany ( logs ) ;
369
402
}
370
- const logsMeta = events . map ( event => ( { event, lastBlock : latestBlock } ) ) ;
403
+ const logsMeta = events . map ( event => ( {
404
+ event,
405
+ lastBlock : latestBlock ,
406
+ filterKey : filterKey ,
407
+ } ) ) ;
371
408
await this . storeMeta . upsertMany ( logsMeta ) ;
372
409
}
373
410
}
0 commit comments