Skip to content

Commit 63f0bb2

Browse files
enriquetomasmbAlejandroAvilesSerranoFerTV
authored
[Feature] Include realistic mobility and frontend-controller adaptation (#41)
* Servicio de conexión UDP Implementación de servicio cliente/servidor para que dispositivos ajenos a la federación entablen comunicación * fix_filename Changed file name: nebulamulticasting from nebulaconnection * fix_nebulamulticasting Additions: -communication - init_ecs - stop_ecs Remove: -nebulamulticasting -stop condition on nebula_server * feat_mobility_module Additions: -neighbormanagement folder -mobility messages -proto files updated -functionalities on comunnications and engine to new messages - get_loss on nebula model - model_info on propagator * fix_function_arguments * fix_compiler_errors type notation on factory methods * feat_timer_generator Additions: -TimerGenerator class to generate timer used for waiting updates from nodes -Weight_modifier for updates received if NodeManager is up * fix_timer_integrated Additions: - TimerGenerator functions and references to be used - Updated learning cycle to use timer if mobility is up * fix_lock_release_excep check lock is acquire * fix_waiting_logic Changed the way the node wait and update its state of waiting. * fixed_some_mobility_configurations * update_nebula_p2b.py * feat_mobility_upgrade .External_connection_service working -Messages and callbacks integrated * feat_late_starting_trainning -late nodes start trainning -topology structures set apropiately * feat_LateNodes_train Later creation nodes now can be integrated into the trainning process * fix_fast_push fast push to sinchronize network integrated * feat_weights_modifiers Weight modifiers applied to late connected nodes * fix_slow_push_strategy slow push strategy working properly fixing metrics from late connection nodes * fix_metric_delay Metric delay done for late creation nodes * feat_fast_push fast push integrated weight strategy integrated fix modified weights * fix_fast_reboot fast reboot when device arrives fully integrated * feat_upgrading_network_robustness mechanisms itnegrated: .-reconnect_to_federation .-upgrade_connection_robustness * fix_ecs_run_shutdown * fix_info_points * feat_nebulamulticasting_on_off -fixed learnign rate error -fixed push sync error -fix sync errors * fix_errors_reestructuring * fix_restructure_errors fixed all restructure errors found service working * feat_connection_optimizator Connection optimizator to clear inactive connections * feat_network_optimization networkoptimizer as a controller connectionoptimizer to clean inactive connections timergenerator to generate dynamic timeouts for aggregation * fix_minor_errors * fix_remove_weight_error * fix_solving_distributions * feat_additional_data_dist_png additional nodes now show their data distributions * fix_additional_nodes_ip * ft_test_setup * fix_no_coinciding_samples * fix_update * feat_defaultMH default model handler integrated refactor fastreboot * fix_error_defaultHM * fix_wrong_payload * fix_keyerror_np * fix_general_errors * fix_reestructure_loop * fix_slow_push_issue * fix:concurrency_issue * fix_get_neighbors_np * daily_update * change_scenario_config * fix_momentum * fix_com_error * updt_momentum * feat_momentum momemtum logic implemented * feat_momemtum_penalty * updt_momemtum_penalty_ext * update momemtum * update_messages_refactor * fix_msg_errors * fix_messages_factory * fix_error * fix_factory_message_action * fix_message_factory feat message_template * feat_refactor_messages * fix_clean_code * fix_refactor_communciations * fix_handle_model_error await was required * feat_message_events * fix_error_msg * fix_event_error * fix_errors * update_momemtum * updt * fix_momemtum_config * optimization_sinc * opt_sinc++ * fix_momentum * fix_disconnect_error * fix_disconnection_node * fix_TCP_temporary_port * fix_notself_agg * fix_tcp_ports * feat_node_disconnection * feat_blacklist * fix_resinc_error * fix_resinc_after_disc * fix_resinc_Node * feat_target_attacks * feature select changing targets * feat standar mobility strategies * feataure update storage * fix update storage errors * fix_no_round_mechs * fix_error * Feature update handlers interface * feature dfl no rounds * fix updates handling and ecs service * fix_propagator_error * opt_test_mobility * refactor situational awareness * feature situational awareness module functionalities * feature nebula discover service asynchronous * optimization code * feature beacon service * fix fully integrated beacon * feature geolocalization in beacon * fix daily update * feature nebula gps service * fix mobility errors * feature updating mobility module * fix mobility errors * fix missing await * fix generate network conditions * fix mobility low threshold error * feature network simulator * feature integrated nebula network simulator * feature SA submodules * fix additional node network conditions * fix network conditions fist attemp * feature training policy interface * feature event system for addon functionalities * feat aggregation event * feature event system integrated * feature QDS tp - update received event * feature round start event * feature speed oriented selection * feature CFL implementation for udpate storage * feature beacon received event * feature sos sa strategy * opt sat sos * feature sat hts * fix error offers accepted after stopped lt process * opt engine * fix evaluation before aggregation * fix additional participants datasets * opt nebuladataset factory * saving wating scenarios in the database * feature hybrid datasets * feat datasets 'n' splitted * feature split dataset IID subsets * wip hybrid datasets * fix dirichlet subset generated * feature unbalanced IID hybrid datasets * feature cifar10 hybrid data partitioning * fix mnist error * fix dflupdatehandler & qds * updt * opt space * feature nebula plugin loader * refactor(controller): migrate user modifications from front-end to controller Moved the logic for user modifications from the front-end to the controller to enhance separation of concerns and simplify maintenance. * feature SA command * fix communciation manager importatition * feature connectivity commands * refactor previous created endpoints and scenarios endpoints added * feature suggestionbuffer * fix suggestionbuffer * daile update * fix endpoints for scenarios * feature integrated suggestions system * fix error interface * opt suggestion buffer * feat integrated more events * refactor communication manager property * fix early updates received before starting learning * feature training policies as SA Agents * fix owner missing SACommands * morning update * removed unused parameters * databases removed of frontend docker container * fix monitor page and node related endpoints * fix samodule mediate function * fix sa module arbitatrion * feature close to integrate node forgiveness * feature system monitoring + forget nodes * feature static arbitatrion policy * feature behavior reputation * feature reputation messages * feature dinamically loading sa components * refactor situational awareness module * refactor SA module * feature consistency reputation * feature advanced consistency metrics * fix merge errors * refactor soem stuff * refactor private methods * refactor pluging loader * refactor ring topology np * feature sa frontend * fix ring error * fix random topology cs * refactor intro message * feature sar components dinamically loaded * fix ring error * fix not simultaneos ring extra participants * pre integration * sa commentaries * optimization ring error * fix strict ring topology * feature undirect connections inactivity * feature connetion priority * daily update * controller and frontend splitted * upgrade collaborative rep * refactor controller folder created * upgrade collaborative * feature distance neighbor policy * node endpoints created in the controller * fix distance errors * fix await erros * feature distance candidate selector * fix mobility longitude * default mobility * fix mob * additional merge changes * remove bad dir * added docstrings for the frontend * fix same owner commands conflict * improve node dynamism * fix filtered discovers * feature ttl for corfimations * fix rep messages * fix rep setup * rep proto * remove unused files * add/remove comments * added docstrings for the controller * update message definition * update format, style and comments * remove situational awareness from participant.json.example * fix dynamism on node distance * fix strict_topology * fix delay error * fix flooding * fix processes deployment * fix delayer attack --------- Co-authored-by: Alejandro.A.S <jandrosambasil@gmail.com> Co-authored-by: FerTV <fernando.torres.vega@gmail.com>
1 parent 06faefb commit 63f0bb2

File tree

113 files changed

+9610
-11768
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

113 files changed

+9610
-11768
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<h1 align="center">NEBULA: A Platform for Decentralized Federated Learning</h1>
77

88
<p align="center">
9-
<a href="https://nebula-dfl.com">nebula-dfl.com</a> |
9+
<a href="https://nebula-dfl.com">nebula-dfl.com</a> |
1010
<a href="https://nebula-dfl.eu">nebula-dfl.eu</a> |
1111
<a href="https://federeratedlearning.inf.um.es">federatedlearning.inf.um.es</a>
1212
</p>

analysis/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<h1 align="center">NEBULA: A Platform for Decentralized Federated Learning</h1>
88

99
<p align="center">
10-
<a href="https://nebula-dfl.com">nebula-dfl.com</a> |
10+
<a href="https://nebula-dfl.com">nebula-dfl.com</a> |
1111
<a href="https://nebula-dfl.eu">nebula-dfl.eu</a> |
1212
<a href="https://federeratedlearning.inf.um.es">federatedlearning.inf.um.es</a>
1313
</p>

app/main.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44

55
sys.path.append(os.path.join(os.path.dirname(__file__), "..")) # Parent directory where is the NEBULA module
66
import nebula
7-
from nebula.controller import Controller
8-
from nebula.scenarios import ScenarioManagement
7+
from nebula.controller.controller import Controller
8+
from nebula.controller.scenarios import ScenarioManagement
99

1010
argparser = argparse.ArgumentParser(description="Controller of NEBULA platform", add_help=False)
1111

@@ -54,8 +54,6 @@
5454
help="Statistics port (default: 8080)",
5555
)
5656

57-
argparser.add_argument("-t", "--test", dest="test", action="store_true", default=False, help="Run tests")
58-
5957
argparser.add_argument(
6058
"-st",
6159
"--stop",

docs/_prebuilt/developerguide.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -829,8 +829,8 @@ To add a new message to the application, follow these steps:
829829
FEDERATION_READY = 3;
830830
}
831831
Action action = 1;
832-
repeated string arguments = 2;
833-
int32 round = 3;
832+
repeated string arguments = 2;
833+
int32 round = 3;
834834
}
835835
```
836836

@@ -905,4 +905,4 @@ Note that **EventType** is the class that represents the event (not a specific i
905905

906906
When the event is published, all subscribed listeners for that event type will be triggered. As mentioned, there are three different **publish** functions, each tied to a specific type of event.
907907

908-
Finally, to **create a new event**, go to the file **/core/nebulaevents.py**. Depending on the type of event you wish to implement, create a class that extends one of the three native event types. After doing this, the usage of your new event is transparent to the rest of the system, and you can use the functions described above without any issues.
908+
Finally, to **create a new event**, go to the file **/core/nebulaevents.py**. Depending on the type of event you wish to implement, create a class that extends one of the three native event types. After doing this, the usage of your new event is transparent to the rest of the system, and you can use the functions described above without any issues.

nebula/addons/attacks/communications/communicationattack.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import logging
22
import random
3+
import random
34
import types
45
from abc import abstractmethod
56

67
from nebula.addons.attacks.attacks import Attack
8+
from nebula.core.network.communications import CommunicationsManager
79

810

911
class CommunicationAttack(Attack):
@@ -46,21 +48,23 @@ async def select_targets(self):
4648
if self.selection_interval:
4749
if self.last_selection_round % self.selection_interval == 0:
4850
logging.info("Recalculating targets...")
49-
all_nodes = await self.engine.cm.get_addrs_current_connections(only_direct=True)
51+
all_nodes = await CommunicationsManager.get_instance().get_addrs_current_connections(only_direct=True)
5052
num_targets = max(1, int(len(all_nodes) * (self.selectivity_percentage / 100)))
5153
self.targets = set(random.sample(list(all_nodes), num_targets))
5254
elif not self.targets:
5355
logging.info("Calculating targets...")
54-
all_nodes = await self.engine.cm.get_addrs_current_connections(only_direct=True)
56+
all_nodes = await CommunicationsManager.get_instance().get_addrs_current_connections(only_direct=True)
5557
num_targets = max(1, int(len(all_nodes) * (self.selectivity_percentage / 100)))
5658
self.targets = set(random.sample(list(all_nodes), num_targets))
5759
else:
5860
logging.info("All neighbors selected as targets")
59-
self.targets = await self.engine.cm.get_addrs_current_connections(only_direct=True)
61+
self.targets = await CommunicationsManager.get_instance().get_addrs_current_connections(only_direct=True)
6062

6163
logging.info(f"Selected {self.selectivity_percentage}% targets from neighbors: {self.targets}")
6264
self.last_selection_round += 1
6365

66+
self.last_selection_round += 1
67+
6468
async def _inject_malicious_behaviour(self):
6569
"""Inject malicious behavior into the target method."""
6670
decorated_method = self.decorator(self.decorator_args)(self.original_method)

nebula/addons/attacks/communications/delayerattack.py

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from functools import wraps
44

55
from nebula.addons.attacks.communications.communicationattack import CommunicationAttack
6+
from nebula.core.network.communications import CommunicationsManager
67

78

89
class DelayerAttack(CommunicationAttack):
@@ -32,8 +33,8 @@ def __init__(self, engine, attack_params: dict):
3233

3334
super().__init__(
3435
engine,
35-
engine._cm,
36-
"send_model",
36+
CommunicationsManager.get_instance(),
37+
"send_message",
3738
round_start,
3839
round_stop,
3940
attack_interval,
@@ -43,27 +44,27 @@ def __init__(self, engine, attack_params: dict):
4344
)
4445

4546
def decorator(self, delay: int):
46-
"""
47-
Decorator that adds a delay to the execution of the original method.
47+
"""
48+
Decorator that adds a delay to the execution of the original method.
4849
49-
Args:
50-
delay (int): The time in seconds to delay the method execution.
50+
Args:
51+
delay (int): The time in seconds to delay the method execution.
5152
52-
Returns:
53-
function: A decorator function that wraps the target method with the delay logic.
54-
"""
53+
Returns:
54+
function: A decorator function that wraps the target method with the delay logic.
55+
"""
5556

56-
def decorator(func):
57-
@wraps(func)
58-
async def wrapper(*args, **kwargs):
59-
if len(args) > 1:
60-
dest_addr = args[1]
61-
if dest_addr in self.targets:
62-
logging.info(f"[DelayerAttack] Delaying model propagation to {dest_addr} by {delay} seconds")
63-
await asyncio.sleep(delay)
64-
_, *new_args = args # Exclude self argument
65-
return await func(*new_args)
57+
def decorator(func):
58+
@wraps(func)
59+
async def wrapper(*args, **kwargs):
60+
if len(args) == 4 and args[3] == "model":
61+
dest_addr = args[1]
62+
if dest_addr in self.targets:
63+
logging.info(f"[DelayerAttack] Delaying model propagation to {dest_addr} by {delay} seconds")
64+
await asyncio.sleep(delay)
65+
_, *new_args = args # Exclude self argument
66+
return await func(*new_args)
6667

67-
return wrapper
68+
return wrapper
6869

69-
return decorator
70+
return decorator

nebula/addons/attacks/communications/floodingattack.py

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
1-
import asyncio
21
import logging
32
from functools import wraps
4-
import time
53

64
from nebula.addons.attacks.communications.communicationattack import CommunicationAttack
5+
from nebula.core.network.communications import CommunicationsManager
76

87

98
class FloodingAttack(CommunicationAttack):
@@ -35,8 +34,8 @@ def __init__(self, engine, attack_params: dict):
3534

3635
super().__init__(
3736
engine,
38-
engine._cm,
39-
"send_model",
37+
CommunicationsManager.get_instance(),
38+
"send_message",
4039
round_start,
4140
round_stop,
4241
attack_interval,
@@ -59,7 +58,7 @@ def decorator(self, flooding_factor: int):
5958
def decorator(func):
6059
@wraps(func)
6160
async def wrapper(*args, **kwargs):
62-
if len(args) > 1:
61+
if len(args) == 4 and args[3] == "model":
6362
dest_addr = args[1]
6463
if dest_addr in self.targets:
6564
logging.info(f"[FloodingAttack] Flooding message to {dest_addr} by {flooding_factor} times")
@@ -68,13 +67,11 @@ async def wrapper(*args, **kwargs):
6867
logging.info(
6968
f"[FloodingAttack] Sending duplicate {i + 1}/{flooding_factor} to {dest_addr}"
7069
)
71-
_, dest_addr, _, serialized_model, weight = args # Exclude self argument
72-
new_args = [dest_addr, i, serialized_model, weight]
70+
_, *new_args = args # Exclude self argument
7371
await func(*new_args, **kwargs)
74-
_, dest_addr, _, serialized_model, weight = args # Exclude self argument
75-
new_args = [dest_addr, i, serialized_model, weight]
72+
_, *new_args = args
7673
return await func(*new_args)
77-
74+
7875
return wrapper
7976

8077
return decorator

nebula/addons/attacks/dataset/datasetattack.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,11 @@ async def attack(self):
3535
"""
3636
if self.engine.round not in range(self.round_start_attack, self.round_stop_attack + 1):
3737
pass
38-
elif self.engine.round == self.round_stop_attack:
38+
elif self.engine.round == self.round_stop_attack:
3939
logging.info(f"[{self.__class__.__name__}] Stopping attack")
40-
elif self.engine.round >= self.round_start_attack and ((self.engine.round - self.round_start_attack) % self.attack_interval == 0):
40+
elif self.engine.round >= self.round_start_attack and (
41+
(self.engine.round - self.round_start_attack) % self.attack_interval == 0
42+
):
4143
logging.info(f"[{self.__class__.__name__}] Performing attack")
4244
self.engine.trainer.datamodule.train_set = self.get_malicious_dataset()
4345

nebula/addons/attacks/dataset/labelflipping.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import copy
1111
import logging
1212
import random
13+
1314
import numpy as np
1415

1516
from nebula.addons.attacks.dataset.datasetattack import DatasetAttack

nebula/addons/attacks/model/gllneuroninversion.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def __init__(self, engine, attack_params):
3434
raise ValueError(f"Missing required attack parameter: {e}")
3535
except ValueError:
3636
raise ValueError("Invalid value in attack_params. Ensure all values are integers.")
37-
37+
3838
super().__init__(engine, round_start, round_stop, attack_interval)
3939

4040
def model_attack(self, received_weights):

0 commit comments

Comments
 (0)