Skip to content

Commit 0657c93

Browse files
committed
Merge remote-tracking branch 'origin/main' into feat_reputation
2 parents 9cfceeb + 9f7f5bb commit 0657c93

31 files changed

+1282
-551
lines changed

app/deployer.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -668,12 +668,6 @@ def run_frontend(self):
668668
"/var/run/docker.sock not found, please check if Docker is running and Docker Compose is installed."
669669
)
670670

671-
try:
672-
subprocess.check_call(["nvidia-smi"])
673-
self.gpu_available = True
674-
except Exception:
675-
logging.info("No GPU available for the frontend, nodes will be deploy in CPU mode")
676-
677671
network_name = f"{os.environ['USER']}_nebula-net-base"
678672

679673
# Create the Docker network
@@ -684,7 +678,6 @@ def run_frontend(self):
684678
environment = {
685679
"NEBULA_CONTROLLER_NAME": os.environ["USER"],
686680
"NEBULA_PRODUCTION": self.production,
687-
"NEBULA_GPU_AVAILABLE": self.gpu_available,
688681
"NEBULA_ADVANCED_ANALYTICS": self.advanced_analytics,
689682
"NEBULA_FRONTEND_LOG": "/nebula/app/logs/frontend.log",
690683
"NEBULA_LOGS_DIR": "/nebula/app/logs/",
@@ -756,6 +749,12 @@ def run_controller(self):
756749
)
757750

758751
network_name = f"{os.environ['USER']}_nebula-net-base"
752+
753+
try:
754+
subprocess.check_call(["nvidia-smi"])
755+
self.gpu_available = True
756+
except Exception:
757+
logging.info("No GPU available for the frontend, nodes will be deploy in CPU mode")
759758

760759
# Create the Docker network
761760
base = DockerUtils.create_docker_network(network_name)
@@ -790,6 +789,11 @@ def run_controller(self):
790789
],
791790
extra_hosts={"host.docker.internal": "host-gateway"},
792791
port_bindings={self.controller_port: self.controller_port},
792+
device_requests=[{
793+
"Driver": "nvidia",
794+
"Count": -1,
795+
"Capabilities": [["gpu"]],
796+
}] if self.gpu_available else None,
793797
)
794798

795799
networking_config = client.api.create_networking_config({

nebula/addons/gps/nebulagps.py

Lines changed: 65 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,17 @@ def __init__(self, config, addr, update_interval: float = 5.0, verbose=False):
1919
self._config = config
2020
self._addr = addr
2121
self.update_interval = update_interval # Frequency
22-
self.running = False
2322
self._node_locations = {} # Dictionary for storing node locations
2423
self._broadcast_socket = None
2524
self._nodes_location_lock = Locker("nodes_location_lock", async_lock=True)
2625
self._verbose = verbose
26+
self._running = asyncio.Event()
27+
self._background_tasks = [] # Track background tasks
2728

2829
async def start(self):
2930
"""Starts the GPS service, sending and receiving locations."""
3031
logging.info("Starting NebulaGPS service...")
31-
self.running = True
32+
self._running.set()
3233

3334
# Create broadcast socket
3435
self._broadcast_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
@@ -38,20 +39,39 @@ async def start(self):
3839
self._broadcast_socket.bind(("", self.BROADCAST_PORT))
3940

4041
# Start sending and receiving tasks
41-
asyncio.create_task(self._send_location_loop())
42-
asyncio.create_task(self._receive_location_loop())
43-
asyncio.create_task(self._notify_geolocs())
42+
self._background_tasks = [
43+
asyncio.create_task(self._send_location_loop(), name="NebulaGPS_send_location"),
44+
asyncio.create_task(self._receive_location_loop(), name="NebulaGPS_receive_location"),
45+
asyncio.create_task(self._notify_geolocs(), name="NebulaGPS_notify_geolocs"),
46+
]
4447

4548
async def stop(self):
4649
"""Stops the GPS service."""
47-
logging.info("Stopping NebulaGPS service...")
48-
self.running = False
50+
logging.info("🛑 Stopping NebulaGPS service...")
51+
self._running.clear()
52+
logging.info("🛑 NebulaGPS _running event cleared")
53+
54+
# Cancel all background tasks
55+
if self._background_tasks:
56+
logging.info(f"🛑 Cancelling {len(self._background_tasks)} background tasks...")
57+
for task in self._background_tasks:
58+
if not task.done():
59+
task.cancel()
60+
try:
61+
await task
62+
except asyncio.CancelledError:
63+
pass
64+
self._background_tasks.clear()
65+
logging.info("🛑 All background tasks cancelled")
66+
4967
if self._broadcast_socket:
5068
self._broadcast_socket.close()
5169
self._broadcast_socket = None
70+
logging.info("🛑 NebulaGPS broadcast socket closed")
71+
logging.info("✅ NebulaGPS service stopped successfully")
5272

5373
async def is_running(self):
54-
return self.running
74+
return self._running.is_set()
5575

5676
async def get_geoloc(self):
5777
latitude = self._config.participant["mobility_args"]["latitude"]
@@ -64,7 +84,18 @@ async def calculate_distance(self, self_lat, self_long, other_lat, other_long):
6484

6585
async def _send_location_loop(self):
6686
"""Send the geolocation periodically by broadcast."""
67-
while self.running:
87+
while await self.is_running():
88+
# Check if learning cycle has finished
89+
try:
90+
from nebula.core.network.communications import CommunicationsManager
91+
92+
cm = CommunicationsManager.get_instance()
93+
if cm.learning_finished():
94+
logging.info("GPS: Learning cycle finished, stopping location broadcast")
95+
break
96+
except Exception:
97+
pass # If we can't get the communications manager, continue
98+
6899
latitude, longitude = await self.get_geoloc() # Obtener ubicación actual
69100
message = f"GPS-UPDATE {self._addr} {latitude} {longitude}"
70101
self._broadcast_socket.sendto(message.encode(), (self.BROADCAST_IP, self.BROADCAST_PORT))
@@ -74,7 +105,18 @@ async def _send_location_loop(self):
74105

75106
async def _receive_location_loop(self):
76107
"""Listens to and stores geolocations from other nodes."""
77-
while self.running:
108+
while await self.is_running():
109+
# Check if learning cycle has finished
110+
try:
111+
from nebula.core.network.communications import CommunicationsManager
112+
113+
cm = CommunicationsManager.get_instance()
114+
if cm.learning_finished():
115+
logging.info("GPS: Learning cycle finished, stopping location reception")
116+
break
117+
except Exception:
118+
pass # If we can't get the communications manager, continue
119+
78120
try:
79121
data, addr = await asyncio.get_running_loop().run_in_executor(
80122
None, self._broadcast_socket.recvfrom, 1024
@@ -91,7 +133,18 @@ async def _receive_location_loop(self):
91133
logging.exception(f"Error receiving GPS update: {e}")
92134

93135
async def _notify_geolocs(self):
94-
while True:
136+
while await self.is_running():
137+
# Check if learning cycle has finished
138+
try:
139+
from nebula.core.network.communications import CommunicationsManager
140+
141+
cm = CommunicationsManager.get_instance()
142+
if cm.learning_finished():
143+
logging.info("GPS: Learning cycle finished, stopping geolocation notifications")
144+
break
145+
except Exception:
146+
pass # If we can't get the communications manager, continue
147+
95148
await asyncio.sleep(self.update_interval)
96149
await self._nodes_location_lock.acquire_async()
97150
geolocs: dict = self._node_locations.copy()
@@ -102,7 +155,7 @@ async def _notify_geolocs(self):
102155
for addr, (lat, long) in geolocs.items():
103156
dist = await self.calculate_distance(self_lat, self_long, lat, long)
104157
distances[addr] = (dist, (lat, long))
105-
158+
106159
self._config.update_nodes_distance(distances)
107160
gpsevent = GPSEvent(distances)
108161
asyncio.create_task(EventManager.get_instance().publish_addonevent(gpsevent))

nebula/addons/mobility.py

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -50,24 +50,28 @@ def __init__(self, config, verbose=False):
5050
"""
5151
logging.info("Starting mobility module...")
5252
self.config = config
53-
self.grace_time = self.config.participant["mobility_args"]["grace_time_mobility"]
54-
self.period = self.config.participant["mobility_args"]["change_geo_interval"]
53+
self._verbose = verbose
54+
self._running = asyncio.Event()
55+
self._nodes_distances = {}
56+
self._nodes_distances_lock = Locker("nodes_distances_lock", async_lock=True)
57+
self._mobility_task = None # Track the background task
58+
59+
# Mobility configuration
5560
self.mobility = self.config.participant["mobility_args"]["mobility"]
5661
self.mobility_type = self.config.participant["mobility_args"]["mobility_type"]
57-
self.radius_federation = float(self.config.participant["mobility_args"]["radius_federation"])
58-
self.scheme_mobility = self.config.participant["mobility_args"]["scheme_mobility"]
59-
self.round_frequency = int(self.config.participant["mobility_args"]["round_frequency"])
62+
self.grace_time = self.config.participant["mobility_args"]["grace_time_mobility"]
63+
self.period = self.config.participant["mobility_args"]["change_geo_interval"]
6064
# INFO: These values may change according to the needs of the federation
6165
self.max_distance_with_direct_connections = 150 # meters
6266
self.max_movement_random_strategy = 50 # meters
6367
self.max_movement_nearest_strategy = 50 # meters
6468
self.max_initiate_approximation = self.max_distance_with_direct_connections * 1.2
69+
self.radius_federation = float(config.participant["mobility_args"]["radius_federation"])
70+
self.scheme_mobility = config.participant["mobility_args"]["scheme_mobility"]
71+
self.round_frequency = int(config.participant["mobility_args"]["round_frequency"])
6572
# Logging box with mobility information
6673
mobility_msg = f"Mobility: {self.mobility}\nMobility type: {self.mobility_type}\nRadius federation: {self.radius_federation}\nScheme mobility: {self.scheme_mobility}\nEach {self.round_frequency} rounds"
6774
print_msg_box(msg=mobility_msg, indent=2, title="Mobility information")
68-
self._nodes_distances = {}
69-
self._nodes_distances_lock = Locker("nodes_distances_lock", async_lock=True)
70-
self._verbose = verbose
7175

7276
@cached_property
7377
def cm(self):
@@ -103,8 +107,30 @@ async def start(self):
103107
"""
104108
await EventManager.get_instance().subscribe_addonevent(GPSEvent, self.update_nodes_distances)
105109
await EventManager.get_instance().subscribe_addonevent(GPSEvent, self.update_nodes_distances)
106-
task = asyncio.create_task(self.run_mobility())
107-
return task
110+
self._running.set()
111+
self._mobility_task = asyncio.create_task(self.run_mobility(), name="Mobility_run_mobility")
112+
return self._mobility_task
113+
114+
async def stop(self):
115+
"""
116+
Stops the mobility module.
117+
"""
118+
logging.info("Stopping Mobility module...")
119+
self._running.clear()
120+
121+
# Cancel the background task
122+
if self._mobility_task and not self._mobility_task.done():
123+
logging.info("🛑 Cancelling Mobility background task...")
124+
self._mobility_task.cancel()
125+
try:
126+
await self._mobility_task
127+
except asyncio.CancelledError:
128+
pass
129+
self._mobility_task = None
130+
logging.info("🛑 Mobility background task cancelled")
131+
132+
async def is_running(self):
133+
return self._running.is_set()
108134

109135
async def update_nodes_distances(self, gpsevent: GPSEvent):
110136
distances = await gpsevent.get_event_data()
@@ -138,7 +164,7 @@ async def run_mobility(self):
138164
if not self.mobility:
139165
return
140166
# await asyncio.sleep(self.grace_time)
141-
while True:
167+
while await self.is_running():
142168
await self.change_geo_location()
143169
await asyncio.sleep(self.period)
144170

nebula/addons/networksimulation/nebulanetworksimulator.py

Lines changed: 42 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,15 @@ def __init__(self, changing_interval, interface, verbose=False):
2626
self._network_conditions = self.NETWORK_CONDITIONS.copy()
2727
self._network_conditions_lock = Locker("network_conditions_lock", async_lock=True)
2828
self._current_network_conditions = {}
29-
self._running = False
29+
self._running = asyncio.Event()
3030

3131
@cached_property
3232
def cm(self):
3333
return CommunicationsManager.get_instance()
3434

3535
async def start(self):
3636
logging.info("🌐 Nebula Network Simulator starting...")
37-
self._running = True
37+
self._running.set()
3838
grace_time = self.cm.config.participant["mobility_args"]["grace_time_mobility"]
3939
# if self._verbose: logging.info(f"Waiting {grace_time}s to start applying network conditions based on distances between devices")
4040
# await asyncio.sleep(grace_time)
@@ -43,37 +43,49 @@ async def start(self):
4343
)
4444

4545
async def stop(self):
46-
self._running = False
46+
logging.info("🌐 Nebula Network Simulator stopping...")
47+
self._running.clear()
48+
49+
async def is_running(self):
50+
return self._running.is_set()
4751

4852
async def _change_network_conditions_based_on_distances(self, gpsevent: GPSEvent):
4953
distances = await gpsevent.get_event_data()
50-
await asyncio.sleep(self._refresh_interval)
51-
if self._verbose:
52-
logging.info("Refresh | conditions based on distances...")
53-
try:
54-
for addr, (distance, _) in distances.items():
55-
if distance is None:
56-
# If the distance is not found, we skip the node
57-
continue
58-
conditions = await self._calculate_network_conditions(distance)
59-
# Only update the network conditions if they have changed
60-
if addr not in self._current_network_conditions or self._current_network_conditions[addr] != conditions:
61-
addr_ip = addr.split(":")[0]
62-
self._set_network_condition_for_addr(
63-
self._node_interface, addr_ip, conditions["bandwidth"], conditions["delay"]
64-
)
65-
self._set_network_condition_for_multicast(
66-
self._node_interface, addr_ip, self.IP_MULTICAST, conditions["bandwidth"], conditions["delay"]
67-
)
68-
async with self._network_conditions_lock:
69-
self._current_network_conditions[addr] = conditions
70-
else:
71-
if self._verbose:
72-
logging.info("network conditions havent changed since last time")
73-
except KeyError:
74-
logging.exception(f"📍 Connection {addr} not found")
75-
except Exception:
76-
logging.exception("📍 Error changing connections based on distance")
54+
while await self.is_running():
55+
if self._verbose:
56+
logging.info("Refresh | conditions based on distances...")
57+
try:
58+
for addr, (distance, _) in distances.items():
59+
if distance is None:
60+
# If the distance is not found, we skip the node
61+
continue
62+
conditions = await self._calculate_network_conditions(distance)
63+
# Only update the network conditions if they have changed
64+
if (
65+
addr not in self._current_network_conditions
66+
or self._current_network_conditions[addr] != conditions
67+
):
68+
addr_ip = addr.split(":")[0]
69+
self._set_network_condition_for_addr(
70+
self._node_interface, addr_ip, conditions["bandwidth"], conditions["delay"]
71+
)
72+
self._set_network_condition_for_multicast(
73+
self._node_interface,
74+
addr_ip,
75+
self.IP_MULTICAST,
76+
conditions["bandwidth"],
77+
conditions["delay"],
78+
)
79+
async with self._network_conditions_lock:
80+
self._current_network_conditions[addr] = conditions
81+
else:
82+
if self._verbose:
83+
logging.info("network conditions havent changed since last time")
84+
except KeyError:
85+
logging.exception(f"📍 Connection {addr} not found")
86+
except Exception:
87+
logging.exception("📍 Error changing connections based on distance")
88+
await asyncio.sleep(self._refresh_interval)
7789

7890
async def set_thresholds(self, thresholds: dict):
7991
async with self._network_conditions_lock:

0 commit comments

Comments
 (0)