Skip to content

Commit 6af215a

Browse files
[Feature/events] Improve event system (#36)
* Servicio de conexión UDP Implementación de servicio cliente/servidor para que dispositivos ajenos a la federación entablen comunicación * fix_filename Changed file name: nebulamulticasting from nebulaconnection * fix_nebulamulticasting Additions: -communication - init_ecs - stop_ecs Remove: -nebulamulticasting -stop condition on nebula_server * feat_mobility_module Additions: -neighbormanagement folder -mobility messages -proto files updated -functionalities on comunnications and engine to new messages - get_loss on nebula model - model_info on propagator * fix_function_arguments * fix_compiler_errors type notation on factory methods * feat_timer_generator Additions: -TimerGenerator class to generate timer used for waiting updates from nodes -Weight_modifier for updates received if NodeManager is up * fix_timer_integrated Additions: - TimerGenerator functions and references to be used - Updated learning cycle to use timer if mobility is up * fix_lock_release_excep check lock is acquire * fix_waiting_logic Changed the way the node wait and update its state of waiting. * fixed_some_mobility_configurations * update_nebula_p2b.py * feat_mobility_upgrade .External_connection_service working -Messages and callbacks integrated * feat_late_starting_trainning -late nodes start trainning -topology structures set apropiately * feat_LateNodes_train Later creation nodes now can be integrated into the trainning process * fix_fast_push fast push to sinchronize network integrated * feat_weights_modifiers Weight modifiers applied to late connected nodes * fix_slow_push_strategy slow push strategy working properly fixing metrics from late connection nodes * fix_metric_delay Metric delay done for late creation nodes * feat_fast_push fast push integrated weight strategy integrated fix modified weights * fix_fast_reboot fast reboot when device arrives fully integrated * feat_upgrading_network_robustness mechanisms itnegrated: .-reconnect_to_federation .-upgrade_connection_robustness * fix_ecs_run_shutdown * fix_info_points * feat_nebulamulticasting_on_off -fixed learnign rate error -fixed push sync error -fix sync errors * fix_errors_reestructuring * fix_restructure_errors fixed all restructure errors found service working * feat_connection_optimizator Connection optimizator to clear inactive connections * feat_network_optimization networkoptimizer as a controller connectionoptimizer to clean inactive connections timergenerator to generate dynamic timeouts for aggregation * fix_minor_errors * fix_remove_weight_error * fix_solving_distributions * feat_additional_data_dist_png additional nodes now show their data distributions * fix_additional_nodes_ip * ft_test_setup * fix_no_coinciding_samples * fix_update * feat_defaultMH default model handler integrated refactor fastreboot * fix_error_defaultHM * fix_wrong_payload * fix_keyerror_np * fix_general_errors * fix_reestructure_loop * fix_slow_push_issue * fix:concurrency_issue * fix_get_neighbors_np * daily_update * change_scenario_config * fix_momentum * fix_com_error * updt_momentum * feat_momentum momemtum logic implemented * feat_momemtum_penalty * updt_momemtum_penalty_ext * update momemtum * update_messages_refactor * fix_msg_errors * fix_messages_factory * fix_error * fix_factory_message_action * fix_message_factory feat message_template * feat_refactor_messages * fix_clean_code * fix_refactor_communciations * fix_handle_model_error await was required * feat_message_events * fix_error_msg * fix_event_error * fix_errors * update_momemtum * updt * fix_momemtum_config * optimization_sinc * opt_sinc++ * fix_momentum * fix_disconnect_error * fix_disconnection_node * fix_TCP_temporary_port * fix_notself_agg * fix_tcp_ports * feat_node_disconnection * feat_blacklist * fix_resinc_error * fix_resinc_after_disc * fix_resinc_Node * feat_target_attacks * feature select changing targets * feat standar mobility strategies * feataure update storage * fix update storage errors * fix_no_round_mechs * fix_error * Feature update handlers interface * feature dfl no rounds * fix updates handling and ecs service * fix_propagator_error * opt_test_mobility * refactor situational awareness * feature situational awareness module functionalities * feature nebula discover service asynchronous * optimization code * feature beacon service * fix fully integrated beacon * feature geolocalization in beacon * fix daily update * feature nebula gps service * fix mobility errors * feature updating mobility module * fix mobility errors * fix missing await * fix generate network conditions * fix mobility low threshold error * feature network simulator * feature integrated nebula network simulator * feature SA submodules * fix additional node network conditions * fix network conditions fist attemp * feature training policy interface * feature event system for addon functionalities * feat aggregation event * feature event system integrated * feature QDS tp - update received event * feature round start event * feature speed oriented selection * feature CFL implementation for udpate storage * feature event system * change location of topology image * remove upper table in dashboard * improve readability, explanations, and fix some issues * Rename GPS directory --------- Co-authored-by: enriquetomasmb <enriquetomasmb@gmail.com> Co-authored-by: Enrique Tomás <64653672+enriquetomasmb@users.noreply.github.com>
1 parent 754ca88 commit 6af215a

39 files changed

+2113
-1067
lines changed

nebula/addons/attacks/communications/communicationattack.py

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,24 @@
11
import logging
2+
import random
23
import types
34
from abc import abstractmethod
4-
import random
55

66
from nebula.addons.attacks.attacks import Attack
77

88

99
class CommunicationAttack(Attack):
10-
def __init__(self, engine,
11-
target_class,
12-
target_method,
13-
round_start_attack,
14-
round_stop_attack,
15-
attack_interval,
16-
decorator_args=None,
17-
selectivity_percentage: int = 100,
18-
selection_interval: int = None
19-
):
10+
def __init__(
11+
self,
12+
engine,
13+
target_class,
14+
target_method,
15+
round_start_attack,
16+
round_stop_attack,
17+
attack_interval,
18+
decorator_args=None,
19+
selectivity_percentage: int = 100,
20+
selection_interval: int = None,
21+
):
2022
super().__init__()
2123
self.engine = engine
2224
self.target_class = target_class
@@ -41,7 +43,7 @@ def decorator(self, *args):
4143

4244
async def select_targets(self):
4345
if self.selectivity_percentage != 100:
44-
if self.selection_interval:
46+
if self.selection_interval:
4547
if self.last_selection_round % self.selection_interval == 0:
4648
logging.info("Recalculating targets...")
4749
all_nodes = await self.engine.cm.get_addrs_current_connections(only_direct=True)
@@ -55,10 +57,10 @@ async def select_targets(self):
5557
else:
5658
logging.info("All neighbors selected as targets")
5759
self.targets = await self.engine.cm.get_addrs_current_connections(only_direct=True)
58-
60+
5961
logging.info(f"Selected {self.selectivity_percentage}% targets from neighbors: {self.targets}")
60-
self.last_selection_round+=1
61-
62+
self.last_selection_round += 1
63+
6264
async def _inject_malicious_behaviour(self):
6365
"""Inject malicious behavior into the target method."""
6466
decorated_method = self.decorator(self.decorator_args)(self.original_method)
@@ -80,7 +82,9 @@ async def attack(self):
8082
elif self.engine.round == self.round_stop_attack:
8183
logging.info(f"[{self.__class__.__name__}] Stoping attack")
8284
await self._restore_original_behaviour()
83-
elif (self.engine.round == self.round_start_attack) or ((self.engine.round - self.round_start_attack) % self.attack_interval == 0):
85+
elif (self.engine.round == self.round_start_attack) or (
86+
(self.engine.round - self.round_start_attack) % self.attack_interval == 0
87+
):
8488
await self.select_targets()
8589
logging.info(f"[{self.__class__.__name__}] Performing attack")
8690
await self._inject_malicious_behaviour()

nebula/addons/attacks/communications/delayerattack.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,9 @@ def decorator(self, delay: int):
5656
def decorator(func):
5757
@wraps(func)
5858
async def wrapper(*args, **kwargs):
59-
if len(args) > 1:
59+
if len(args) > 1:
6060
dest_addr = args[1]
61-
if dest_addr in self.targets:
61+
if dest_addr in self.targets:
6262
logging.info(f"[DelayerAttack] Delaying model propagation to {dest_addr} by {delay} seconds")
6363
await asyncio.sleep(delay)
6464
_, *new_args = args # Exclude self argument
Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
import logging
22
from functools import wraps
3-
3+
44
from nebula.addons.attacks.communications.communicationattack import CommunicationAttack
5-
6-
5+
6+
77
class FloodingAttack(CommunicationAttack):
88
"""
99
Implements an attack that delays the execution of a target method by a specified amount of time.
1010
"""
11-
11+
1212
def __init__(self, engine, attack_params: dict):
1313
"""
1414
Initializes the DelayerAttack with the engine and attack parameters.
15-
15+
1616
Args:
1717
engine: The engine managing the attack context.
1818
attack_params (dict): Parameters for the attack, including the delay duration.
@@ -28,9 +28,9 @@ def __init__(self, engine, attack_params: dict):
2828
raise ValueError(f"Missing required attack parameter: {e}")
2929
except ValueError:
3030
raise ValueError("Invalid value in attack_params. Ensure all values are integers.")
31-
31+
3232
self.verbose = False
33-
33+
3434
super().__init__(
3535
engine,
3636
engine._cm,
@@ -42,33 +42,35 @@ def __init__(self, engine, attack_params: dict):
4242
self.target_percentage,
4343
self.selection_interval,
4444
)
45-
45+
4646
def decorator(self, flooding_factor: int):
4747
"""
4848
Decorator that adds a delay to the execution of the original method.
49-
49+
5050
Args:
5151
flooding_factor (int): The number of times to repeat the function execution.
52-
52+
5353
Returns:
5454
function: A decorator function that wraps the target method with the delay logic.
5555
"""
56-
56+
5757
def decorator(func):
5858
@wraps(func)
5959
async def wrapper(*args, **kwargs):
60-
if len(args) > 1:
60+
if len(args) > 1:
6161
dest_addr = args[1]
62-
if dest_addr in self.targets:
62+
if dest_addr in self.targets:
6363
logging.info(f"[FloodingAttack] Flooding message to {dest_addr} by {flooding_factor} times")
6464
for i in range(flooding_factor):
6565
if self.verbose:
66-
logging.info(f"[FloodingAttack] Sending duplicate {i+1}/{flooding_factor} to {dest_addr}")
66+
logging.info(
67+
f"[FloodingAttack] Sending duplicate {i + 1}/{flooding_factor} to {dest_addr}"
68+
)
6769
_, *new_args = args # Exclude self argument
6870
await func(*new_args, **kwargs)
6971
_, *new_args = args # Exclude self argument
7072
return await func(*new_args)
71-
73+
7274
return wrapper
73-
74-
return decorator
75+
76+
return decorator

nebula/addons/gps/gpsmodule.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
from abc import ABC, abstractmethod
2+
3+
4+
class GPSModule(ABC):
5+
@abstractmethod
6+
async def start(self):
7+
pass
8+
9+
@abstractmethod
10+
async def stop(self):
11+
pass
12+
13+
@abstractmethod
14+
async def is_running(self):
15+
pass
16+
17+
@abstractmethod
18+
async def calculate_distance(self, self_lat, self_long, other_lat, other_long):
19+
pass
20+
21+
22+
class GPSModuleException(Exception):
23+
pass
24+
25+
26+
def factory_gpsmodule(gps_module, config, addr, update_interval: float = 5.0, verbose=False) -> GPSModule:
27+
from nebula.addons.gps.nebulagps import NebulaGPS
28+
29+
GPS_SERVICES = {
30+
"nebula": NebulaGPS,
31+
}
32+
33+
gps_module = GPS_SERVICES.get(gps_module, NebulaGPS)
34+
35+
if gps_module:
36+
return gps_module(config, addr, update_interval, verbose)
37+
else:
38+
raise GPSModuleException(f"GPS Module {gps_module} not found")

nebula/addons/gps/nebulagps.py

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
import asyncio
2+
import logging
3+
import socket
4+
5+
from geopy import distance
6+
7+
from nebula.addons.gps.gpsmodule import GPSModule
8+
from nebula.core.eventmanager import EventManager
9+
from nebula.core.nebulaevents import GPSEvent
10+
from nebula.core.utils.locker import Locker
11+
12+
13+
class NebulaGPS(GPSModule):
14+
BROADCAST_IP = "255.255.255.255" # Broadcast IP
15+
BROADCAST_PORT = 50001 # Port used for GPS
16+
INTERFACE = "eth2" # Interface to avoid network conditions
17+
18+
def __init__(self, config, addr, update_interval: float = 5.0, verbose=False):
19+
self._config = config
20+
self._addr = addr
21+
self.update_interval = update_interval # Frequency
22+
self.running = False
23+
self._node_locations = {} # Dictionary for storing node locations
24+
self._broadcast_socket = None
25+
self._nodes_location_lock = Locker("nodes_location_lock", async_lock=True)
26+
self._verbose = verbose
27+
28+
async def start(self):
29+
"""Inicia el servicio de GPS, enviando y recibiendo ubicaciones."""
30+
logging.info("Starting NebulaGPS service...")
31+
self.running = True
32+
33+
# Create broadcast socket
34+
self._broadcast_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
35+
self._broadcast_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
36+
37+
# Bind socket on eth2 to also receive data
38+
self._broadcast_socket.bind(("", self.BROADCAST_PORT))
39+
40+
# Start sending and receiving tasks
41+
asyncio.create_task(self._send_location_loop())
42+
asyncio.create_task(self._receive_location_loop())
43+
asyncio.create_task(self._notify_geolocs())
44+
45+
async def stop(self):
46+
"""Stops the GPS service."""
47+
logging.info("Stopping NebulaGPS service...")
48+
self.running = False
49+
if self._broadcast_socket:
50+
self._broadcast_socket.close()
51+
self._broadcast_socket = None
52+
53+
async def is_running(self):
54+
return self.running
55+
56+
async def get_geoloc(self):
57+
latitude = self._config.participant["mobility_args"]["latitude"]
58+
longitude = self._config.participant["mobility_args"]["longitude"]
59+
return (latitude, longitude)
60+
61+
async def calculate_distance(self, self_lat, self_long, other_lat, other_long):
62+
distance_m = distance.distance((self_lat, self_long), (other_lat, other_long)).m
63+
return distance_m
64+
65+
async def _send_location_loop(self):
66+
"""Send the geolocation periodically by broadcast."""
67+
while self.running:
68+
latitude, longitude = await self.get_geoloc() # Obtener ubicación actual
69+
message = f"GPS-UPDATE {self._addr} {latitude} {longitude}"
70+
self._broadcast_socket.sendto(message.encode(), (self.BROADCAST_IP, self.BROADCAST_PORT))
71+
if self._verbose:
72+
logging.info(f"Sent GPS location: ({latitude}, {longitude})")
73+
await asyncio.sleep(self.update_interval)
74+
75+
async def _receive_location_loop(self):
76+
"""Escucha y almacena geolocalizaciones de otros nodos."""
77+
while self.running:
78+
try:
79+
data, addr = await asyncio.get_running_loop().run_in_executor(
80+
None, self._broadcast_socket.recvfrom, 1024
81+
)
82+
message = data.decode().strip()
83+
if message.startswith("GPS-UPDATE"):
84+
_, sender_addr, lat, lon = message.split()
85+
if sender_addr != self._addr:
86+
async with self._nodes_location_lock:
87+
self._node_locations[sender_addr] = (float(lat), float(lon))
88+
if self._verbose:
89+
logging.info(f"Received GPS from {addr[0]}: {lat}, {lon}")
90+
except Exception as e:
91+
logging.error(f"Error receiving GPS update: {e}")
92+
93+
async def _notify_geolocs(self):
94+
while True:
95+
await asyncio.sleep(self.update_interval)
96+
await self._nodes_location_lock.acquire_async()
97+
geolocs: dict = self._node_locations.copy()
98+
await self._nodes_location_lock.release_async()
99+
if geolocs:
100+
distances = {}
101+
self_lat, self_long = await self.get_geoloc()
102+
for addr, (lat, long) in geolocs.items():
103+
dist = await self.calculate_distance(self_lat, self_long, lat, long)
104+
distances[addr] = (dist, (lat, long))
105+
gpsevent = GPSEvent(distances)
106+
asyncio.create_task(EventManager.get_instance().publish_addonevent(gpsevent))

0 commit comments

Comments
 (0)