Skip to content

Commit 1264a49

Browse files
[Feature] Add node deployment using processes (Multiplatform) (#11)
* Include deployment using processes, improve communications, new ready flag, and async boost * improve reporter readability, and model compression * update libs, reconnection functionality, and documentation * Update network metrics * Training in a isolated process, improve async, improve network metrics * Include removing child processes * Update requirements * torch compatibility and exceptions management * file permissions during execution * Include windows support * Include windows support (ps1) * Include windows support (ps1) * Include windows support (ps1) * force utf-8 and windows deployment * Fix event handler * Improve exception management * Increase timeout on handler * Add lock and conditions in event manager * Remove main process when leaving * Improve exception visualization * killing processes fixed * fix traceback * Include logging in child processes * Include logging in child processes * Include output file * Remove redundant log * Include traceback logs * Training logs moved to independent log file * Fix logging import and commands file on windows * docer checked fixed * Include custom progress bar * Fix controller endpoint * fix docker report finished * Remove file descriptors, include independent logger for training, update loggins in training-related classes * fix process scenario finished frontend * Include log for training/testing finished per round * Reduce console logs * Improve reproducibility * Update installation guide and deployment page * Persistent avoiding model from initialization round * include await in reset * Improve logging in data processing, include a condition in the selection of classes during dirichlet * Improve logging * Improve logging, dirichlet, frontend popup * release locked locks * Minor fix during logging --------- Co-authored-by: FerTV <fernando.torres.vega@gmail.com>
1 parent 6e2632b commit 1264a49

30 files changed

+1241
-436
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ app/
137137
data/
138138
*.db*
139139
*.out
140+
*.pid
140141

141142
.requirements.txt
142143
data-analysis/

docs/installation.rst

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -95,25 +95,32 @@ by listing the version of the NEBULA with the following command line::
9595
python app/main.py --version
9696

9797

98-
Building the nebula participant
98+
Building the nebula node
9999
====================================
100+
There are two ways to deploy the node in the federation: using Docker containers or isolated processes.
101+
You can choose the one that best fits your needs in the frontend.
100102

101-
Docker image
102-
-------------------------
103-
You can build the docker image using the following command line in the root directory::
103+
1. Using Docker containers
104+
--------------------------------
105+
You need to build the docker image using the following command line in the root directory::
104106

105107
docker build -t nebula-core .
106108

107109
In case of using GPU in the docker, you have to follow the instructions in the following link to install nvidia-container-toolkit::
108110

109111
https://docs.nvidia.com/datacenter/cloud-native/container-toolkit/install-guide.html
110112

111-
Checking the docker images
112-
==========================
113113
You can check the docker images using the following command line::
114114

115115
docker images
116116

117+
2. Using isolated processes
118+
------------------------------------
119+
You need to install the requirements of the node (core) using the following command line in the root directory::
120+
121+
pip3 install -r nebula/requirements.txt
122+
123+
117124
Running NEBULA
118125
==================
119126
To run NEBULA, you can use the following command line::
@@ -152,7 +159,7 @@ To stop NEBULA, you can use the following command line::
152159

153160
python app/main.py --stop
154161

155-
Be careful, this command will stop all the containers related to NEBULA: frontend, controller, and participants.
162+
Be careful, this command will stop all the containers related to NEBULA: frontend, controller, and nodes.
156163

157164

158165
Possible issues during the installation or execution
@@ -205,4 +212,13 @@ If frontend is not working, restart docker daemon
205212

206213
===================================
207214

208-
If the frontend is not working, check the logs in app/logs/server.log
215+
Error: Too many open files
216+
217+
Solution: Increase the number of open files
218+
219+
ulimit -n 65536
220+
221+
Also, you can add the following lines to the file /etc/security/limits.conf
222+
223+
* soft nofile 65536
224+
* hard nofile 65536

docs/nebula.core.utils.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,14 @@ nebula.core.utils.nebulalogger\_tensorboard module
5252
:undoc-members:
5353
:show-inheritance:
5454

55+
nebula.core.utils.tasks module
56+
------------------------------
57+
58+
.. automodule:: nebula.core.utils.tasks
59+
:members:
60+
:undoc-members:
61+
:show-inheritance:
62+
5563
Module contents
5664
---------------
5765

docs/requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
Sphinx==8.0.2
2-
sphinx-autoapi==3.2.1
2+
sphinx-autoapi==3.3.1
33
sphinx-book-theme==1.1.3

nebula/addons/functions.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
import logging
22

33

4-
def print_msg_box(msg, indent=1, width=None, title=None):
4+
def print_msg_box(msg, indent=1, width=None, title=None, logger_name=None):
55
"""Print message-box with optional title."""
6+
if logger_name:
7+
logger = logging.getLogger(logger_name)
8+
else:
9+
logger = logging.getLogger()
10+
611
if not isinstance(msg, str):
712
raise TypeError("msg parameter must be a string")
813

@@ -18,4 +23,4 @@ def print_msg_box(msg, indent=1, width=None, title=None):
1823
box += f'║{space}{"-" * len(title):<{width}}{space}\n' # underscore
1924
box += "".join([f"║{space}{line:<{width}}{space}\n" for line in lines])
2025
box += f'╚{"═" * (width + indent * 2)}╝' # lower_border
21-
logging.info(box)
26+
logger.info(box)

nebula/addons/reporter.py

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,17 @@ def __init__(self, config, trainer, cm: "CommunicationsManager"):
2323
self.data_queue = asyncio.Queue()
2424
self.url = f'http://{self.config.participant["scenario_args"]["controller"]}/nebula/dashboard/{self.config.participant["scenario_args"]["name"]}/node/update'
2525
self.counter = 0
26+
27+
self.first_net_metrics = True
28+
self.prev_bytes_sent = 0
29+
self.prev_bytes_recv = 0
30+
self.prev_packets_sent = 0
31+
self.prev_packets_recv = 0
32+
33+
self.acc_bytes_sent = 0
34+
self.acc_bytes_recv = 0
35+
self.acc_packets_sent = 0
36+
self.acc_packets_recv = 0
2637

2738
async def enqueue_data(self, name, value):
2839
await self.data_queue.put((name, value))
@@ -33,7 +44,7 @@ async def start(self):
3344

3445
async def run_reporter(self):
3546
while True:
36-
if self.config.participant["scenario_args"]["controller"] == "nebula-frontend":
47+
if self.config.participant["scenario_args"]["controller"] != "nebula-test":
3748
await self.__report_status_to_controller()
3849
await self.__report_data_queue()
3950
await self.__report_resources()
@@ -45,7 +56,7 @@ async def run_reporter(self):
4556

4657
async def report_scenario_finished(self):
4758
url = f'http://{self.config.participant["scenario_args"]["controller"]}/nebula/dashboard/{self.config.participant["scenario_args"]["name"]}/node/done'
48-
data = json.dumps({"ip": self.config.participant["network_args"]["ip"], "port": self.config.participant["network_args"]["port"]})
59+
data = json.dumps({"idx": self.config.participant["device_args"]["idx"]})
4960
headers = {
5061
"Content-Type": "application/json",
5162
"User-Agent": f'NEBULA Participant {self.config.participant["device_args"]["idx"]}',
@@ -87,6 +98,9 @@ async def __report_status_to_controller(self):
8798
logging.debug(text)
8899
except aiohttp.ClientError as e:
89100
logging.error(f"Error connecting to the controller at {self.url}: {e}")
101+
except Exception as e:
102+
logging.error(f"Error sending status to controller, will try again in a few seconds: {e}")
103+
await asyncio.sleep(5)
90104

91105
async def __report_resources(self):
92106
cpu_percent = psutil.cpu_percent()
@@ -115,6 +129,28 @@ async def __report_resources(self):
115129
bytes_recv = net_io_counters.bytes_recv
116130
packets_sent = net_io_counters.packets_sent
117131
packets_recv = net_io_counters.packets_recv
132+
133+
if self.first_net_metrics:
134+
bytes_sent_diff = 0
135+
bytes_recv_diff = 0
136+
packets_sent_diff = 0
137+
packets_recv_diff = 0
138+
self.first_net_metrics = False
139+
else:
140+
bytes_sent_diff = bytes_sent - self.prev_bytes_sent
141+
bytes_recv_diff = bytes_recv - self.prev_bytes_recv
142+
packets_sent_diff = packets_sent - self.prev_packets_sent
143+
packets_recv_diff = packets_recv - self.prev_packets_recv
144+
145+
self.prev_bytes_sent = bytes_sent
146+
self.prev_bytes_recv = bytes_recv
147+
self.prev_packets_sent = packets_sent
148+
self.prev_packets_recv = packets_recv
149+
150+
self.acc_bytes_sent += bytes_sent_diff
151+
self.acc_bytes_recv += bytes_recv_diff
152+
self.acc_packets_sent += packets_sent_diff
153+
self.acc_packets_recv += packets_recv_diff
118154

119155
current_connections = await self.cm.get_addrs_current_connections(only_direct=True)
120156

@@ -127,10 +163,10 @@ async def __report_resources(self):
127163
"RAM/RAM process (%)": memory_percent_process,
128164
"RAM/RAM process (MB)": memory_process,
129165
"Disk/Disk (%)": disk_percent,
130-
"Network/Network (bytes sent)": bytes_sent,
131-
"Network/Network (bytes received)": bytes_recv,
132-
"Network/Network (packets sent)": packets_sent,
133-
"Network/Network (packets received)": packets_recv,
166+
"Network/Network (bytes sent)": round(self.acc_bytes_sent / (1024 ** 2), 3),
167+
"Network/Network (bytes received)": round(self.acc_bytes_recv / (1024 ** 2), 3),
168+
"Network/Network (packets sent)": self.acc_packets_sent,
169+
"Network/Network (packets received)": self.acc_packets_recv,
134170
"Network/Connections": len(current_connections),
135171
}
136172
self.trainer.logger.log_data(resources)

nebula/config/config.py

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,11 @@
33
import os
44
from logging import Formatter, FileHandler
55

6+
CYAN = "\x1b[0;36m"
7+
RESET = "\x1b[0m"
8+
9+
TRAINING_LOGGER = "nebula.training"
10+
611

712
class Config:
813
topology = {}
@@ -23,6 +28,7 @@ def __init__(self, entity, topology_config_file=None, participant_config_file=No
2328
if self.participant != {}:
2429
self.__default_config()
2530
self.__set_default_logging()
31+
self.__set_training_logging()
2632

2733
def __getstate__(self):
2834
# Return the attributes of the class that should be serialized
@@ -39,6 +45,10 @@ def get_topology_config(self):
3945
def get_participant_config(self):
4046
return json.dumps(self.participant, indent=2)
4147

48+
def get_train_logging_config(self):
49+
# TBD
50+
pass
51+
4252
def __default_config(self):
4353
self.participant["device_args"]["name"] = f"participant_{self.participant['device_args']['idx']}_{self.participant['network_args']['ip']}_{self.participant['network_args']['port']}"
4454
self.participant["network_args"]["addr"] = f"{self.participant['network_args']['ip']}:{self.participant['network_args']['port']}"
@@ -56,31 +66,56 @@ def __set_default_logging(self):
5666
logging.basicConfig(level=level, handlers=[console_handler, file_handler, file_handler_only_debug, exp_errors_file_handler])
5767

5868
def __setup_logging(self, log_filename):
59-
CYAN = "\x1b[0;36m"
60-
RESET = "\x1b[0m"
6169
info_file_format = f"%(asctime)s - {self.participant['device_args']['name']} - [%(filename)s:%(lineno)d] %(message)s"
6270
debug_file_format = f"%(asctime)s - {self.participant['device_args']['name']} - [%(filename)s:%(lineno)d] %(message)s\n[in %(pathname)s:%(lineno)d]"
6371
log_console_format = f"{CYAN}%(asctime)s - {self.participant['device_args']['name']} - [%(filename)s:%(lineno)d]{RESET}\n%(message)s"
6472

6573
console_handler = logging.StreamHandler()
66-
console_handler.setLevel(logging.INFO if self.participant["device_args"]["logging"] else logging.CRITICAL)
74+
console_handler.setLevel(logging.CRITICAL)
6775
console_handler.setFormatter(Formatter(log_console_format))
6876

69-
file_handler = FileHandler("{}.log".format(log_filename), mode="w")
77+
file_handler = FileHandler("{}.log".format(log_filename), mode="w", encoding="utf-8")
7078
file_handler.setLevel(logging.INFO if self.participant["device_args"]["logging"] else logging.CRITICAL)
7179
file_handler.setFormatter(Formatter(info_file_format))
7280

73-
file_handler_only_debug = FileHandler("{}_debug.log".format(log_filename), mode="w")
81+
file_handler_only_debug = FileHandler("{}_debug.log".format(log_filename), mode="w", encoding="utf-8")
7482
file_handler_only_debug.setLevel(logging.DEBUG if self.participant["device_args"]["logging"] else logging.CRITICAL)
7583
file_handler_only_debug.addFilter(lambda record: record.levelno == logging.DEBUG)
7684
file_handler_only_debug.setFormatter(Formatter(debug_file_format))
7785

78-
exp_errors_file_handler = FileHandler("{}_error.log".format(log_filename), mode="w")
86+
exp_errors_file_handler = FileHandler("{}_error.log".format(log_filename), mode="w", encoding="utf-8")
7987
exp_errors_file_handler.setLevel(logging.WARNING if self.participant["device_args"]["logging"] else logging.CRITICAL)
8088
exp_errors_file_handler.setFormatter(Formatter(debug_file_format))
8189

8290
return console_handler, file_handler, file_handler_only_debug, exp_errors_file_handler
8391

92+
def __set_training_logging(self):
93+
training_log_filename = f"{self.log_filename}_training"
94+
info_file_format = f"%(asctime)s - {self.participant['device_args']['name']} - [%(filename)s:%(lineno)d] %(message)s"
95+
log_console_format = f"{CYAN}%(asctime)s - {self.participant['device_args']['name']} - [%(filename)s:%(lineno)d]{RESET}\n%(message)s"
96+
level = logging.DEBUG if self.participant["device_args"]["logging"] else logging.CRITICAL
97+
98+
console_handler = logging.StreamHandler()
99+
console_handler.setLevel(logging.CRITICAL)
100+
console_handler.setFormatter(Formatter(log_console_format))
101+
102+
file_handler = FileHandler("{}.log".format(training_log_filename), mode="w", encoding="utf-8")
103+
file_handler.setLevel(level)
104+
file_handler.setFormatter(Formatter(info_file_format))
105+
106+
logger = logging.getLogger(TRAINING_LOGGER)
107+
logger.setLevel(level)
108+
logger.addHandler(console_handler)
109+
logger.addHandler(file_handler)
110+
logger.propagate = False
111+
112+
pl_logger = logging.getLogger("lightning.pytorch")
113+
pl_logger.setLevel(logging.INFO)
114+
pl_logger.handlers = []
115+
pl_logger.propagate = False
116+
pl_logger.addHandler(console_handler)
117+
pl_logger.addHandler(file_handler)
118+
84119
def to_json(self):
85120
# Return participant configuration as a json string
86121
return json.dumps(self.participant, sort_keys=False, indent=2)

0 commit comments

Comments
 (0)