@@ -80,6 +80,7 @@ def __init__(self, engine: "Engine"):
80
80
self .incoming_connections = {}
81
81
self .outgoing_connections = {}
82
82
self .ready_connections = set ()
83
+ self ._ready_connections_lock = Locker ("ready_connections_lock" , async_lock = True )
83
84
84
85
self ._mm = MessagesManager (addr = self .addr , config = self .config )
85
86
self .received_messages_hashes = collections .deque (
@@ -179,11 +180,13 @@ async def check_federation_ready(self):
179
180
f"🔗 check_federation_ready | Ready connections: { self .ready_connections } | Connections: { self .connections .keys ()} "
180
181
)
181
182
async with self .connections_lock :
182
- if set (self .connections .keys ()) == self .ready_connections :
183
- return True
183
+ async with self ._ready_connections_lock :
184
+ if set (self .connections .keys ()) == self .ready_connections :
185
+ return True
184
186
185
187
async def add_ready_connection (self , addr ):
186
- self .ready_connections .add (addr )
188
+ async with self ._ready_connections_lock :
189
+ self .ready_connections .add (addr )
187
190
188
191
async def start_communications (self , initial_neighbors ):
189
192
"""
@@ -198,11 +201,16 @@ async def start_communications(self, initial_neighbors):
198
201
)
199
202
await asyncio .sleep (self .config .participant ["misc_args" ]["grace_time_connection" ])
200
203
await self .start ()
204
+ neighbors = set (initial_neighbors )
205
+
206
+ if self .addr in neighbors :
207
+ neighbors .discard (self .addr )
208
+
201
209
for i in initial_neighbors :
202
210
addr = f"{ i .split (':' )[0 ]} :{ i .split (':' )[1 ]} "
203
211
await self .connect (addr , direct = True )
204
212
await asyncio .sleep (1 )
205
- while not await self .verify_connections (initial_neighbors ):
213
+ while not await self .verify_connections (neighbors ):
206
214
await asyncio .sleep (1 )
207
215
current_connections = await self .get_addrs_current_connections ()
208
216
logging .info (f"Connections verified: { current_connections } " )
0 commit comments