Skip to content

Commit 9d549de

Browse files
[Feature] Upgrade node roles architecture and SDFL (#47)
* feature RoleBehavior interface * fix cfl and trust role behavior implementation * mod minor changes on disconnection method * updt minor changes * update: - get_round concurrence access - update shutdown protocol * feature model propagation event * upgrade shutdown protocol update controller network remove * upgrade shutdown process on node * clean code * fake behavior added * feature leadership tranfer * add trainer_aggregator role * fix removing nodes * trainer aggregator role added * feature leadership transfer among nodes * fix self selection on leadership transfer * feature resolver no updates conflict * upgrade documentation * upgrade aggregation waiting time release * upgrade monitor SDFL * remove duplicates and comments * latest reputation changes added * get_round fixed --------- Co-authored-by: FerTV <fernando.torres.vega@gmail.com> Co-authored-by: FerTV <45948619+FerTV@users.noreply.github.com>
1 parent c9017f1 commit 9d549de

File tree

21 files changed

+841
-567
lines changed

21 files changed

+841
-567
lines changed

nebula/addons/gps/nebulagps.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ async def _send_location_loop(self):
9090
from nebula.core.network.communications import CommunicationsManager
9191

9292
cm = CommunicationsManager.get_instance()
93-
if cm.learning_finished():
93+
if await cm.learning_finished():
9494
logging.info("GPS: Learning cycle finished, stopping location broadcast")
9595
break
9696
except Exception:
@@ -111,7 +111,7 @@ async def _receive_location_loop(self):
111111
from nebula.core.network.communications import CommunicationsManager
112112

113113
cm = CommunicationsManager.get_instance()
114-
if cm.learning_finished():
114+
if await cm.learning_finished():
115115
logging.info("GPS: Learning cycle finished, stopping location reception")
116116
break
117117
except Exception:
@@ -139,7 +139,7 @@ async def _notify_geolocs(self):
139139
from nebula.core.network.communications import CommunicationsManager
140140

141141
cm = CommunicationsManager.get_instance()
142-
if cm.learning_finished():
142+
if await cm.learning_finished():
143143
logging.info("GPS: Learning cycle finished, stopping geolocation notifications")
144144
break
145145
except Exception:

nebula/addons/mobility.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -77,20 +77,20 @@ def __init__(self, config, verbose=False):
7777
def cm(self):
7878
return CommunicationsManager.get_instance()
7979

80-
@property
81-
def round(self):
82-
"""
83-
Gets the current round number from the Communications Manager.
84-
85-
This property retrieves the current round number that is being managed by the
86-
CommunicationsManager instance associated with this module. It provides an
87-
interface to access the ongoing round of the communication process without
88-
directly exposing the underlying method in the CommunicationsManager.
89-
90-
Returns:
91-
int: The current round number managed by the CommunicationsManager.
92-
"""
93-
return self.cm.get_round()
80+
# @property
81+
# def round(self):
82+
# """
83+
# Gets the current round number from the Communications Manager.
84+
85+
# This property retrieves the current round number that is being managed by the
86+
# CommunicationsManager instance associated with this module. It provides an
87+
# interface to access the ongoing round of the communication process without
88+
# directly exposing the underlying method in the CommunicationsManager.
89+
90+
# Returns:
91+
# int: The current round number managed by the CommunicationsManager.
92+
# """
93+
# return self.cm.get_round()
9494

9595
async def start(self):
9696
"""

nebula/addons/networksimulation/nebulanetworksimulator.py

Lines changed: 34 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -51,41 +51,40 @@ async def is_running(self):
5151

5252
async def _change_network_conditions_based_on_distances(self, gpsevent: GPSEvent):
5353
distances = await gpsevent.get_event_data()
54-
while await self.is_running():
55-
if self._verbose:
56-
logging.info("Refresh | conditions based on distances...")
57-
try:
58-
for addr, (distance, _) in distances.items():
59-
if distance is None:
60-
# If the distance is not found, we skip the node
61-
continue
62-
conditions = await self._calculate_network_conditions(distance)
63-
# Only update the network conditions if they have changed
64-
if (
65-
addr not in self._current_network_conditions
66-
or self._current_network_conditions[addr] != conditions
67-
):
68-
addr_ip = addr.split(":")[0]
69-
self._set_network_condition_for_addr(
70-
self._node_interface, addr_ip, conditions["bandwidth"], conditions["delay"]
71-
)
72-
self._set_network_condition_for_multicast(
73-
self._node_interface,
74-
addr_ip,
75-
self.IP_MULTICAST,
76-
conditions["bandwidth"],
77-
conditions["delay"],
78-
)
79-
async with self._network_conditions_lock:
80-
self._current_network_conditions[addr] = conditions
81-
else:
82-
if self._verbose:
83-
logging.info("network conditions havent changed since last time")
84-
except KeyError:
85-
logging.exception(f"📍 Connection {addr} not found")
86-
except Exception:
87-
logging.exception("📍 Error changing connections based on distance")
88-
await asyncio.sleep(self._refresh_interval)
54+
if self._verbose:
55+
logging.info("Refresh | conditions based on distances...")
56+
try:
57+
for addr, (distance, _) in distances.items():
58+
if distance is None:
59+
# If the distance is not found, we skip the node
60+
continue
61+
conditions = await self._calculate_network_conditions(distance)
62+
# Only update the network conditions if they have changed
63+
if (
64+
addr not in self._current_network_conditions
65+
or self._current_network_conditions[addr] != conditions
66+
):
67+
addr_ip = addr.split(":")[0]
68+
self._set_network_condition_for_addr(
69+
self._node_interface, addr_ip, conditions["bandwidth"], conditions["delay"]
70+
)
71+
self._set_network_condition_for_multicast(
72+
self._node_interface,
73+
addr_ip,
74+
self.IP_MULTICAST,
75+
conditions["bandwidth"],
76+
conditions["delay"],
77+
)
78+
async with self._network_conditions_lock:
79+
self._current_network_conditions[addr] = conditions
80+
else:
81+
if self._verbose:
82+
logging.info("network conditions havent changed since last time")
83+
except KeyError:
84+
logging.exception(f"📍 Connection {addr} not found")
85+
except Exception:
86+
logging.exception("📍 Error changing connections based on distance")
87+
await asyncio.sleep(self._refresh_interval)
8988

9089
async def set_thresholds(self, thresholds: dict):
9190
async with self._network_conditions_lock:

0 commit comments

Comments
 (0)