Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions fedn/network/combiner/hooks/hook_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def provided_functions(self, server_functions: str):
try:
request = fedn.ProvidedFunctionsRequest(function_code=server_functions)

response = self.stub.HandleProvidedFunctions(request)
response = self.stub.HandleProvidedFunctions(request, timeout=120)
return response.available_functions
except grpc.RpcError as rpc_error:
if rpc_error.code() == grpc.StatusCode.UNAVAILABLE:
Expand All @@ -65,7 +65,7 @@ def client_settings(self, global_model) -> dict:
request_function = fedn.ClientConfigRequest
args = {}
model = model_as_bytesIO(global_model)
response = self.stub.HandleClientConfig(bytesIO_request_generator(mdl=model, request_function=request_function, args=args))
response = self.stub.HandleClientConfig(bytesIO_request_generator(mdl=model, request_function=request_function, args=args), timeout=120)
return json.loads(response.client_settings)

def client_selection(self, clients: list) -> list:
Expand All @@ -87,7 +87,7 @@ def aggregate(self, previous_global, update_handler: UpdateHandler, helper, dele
# send previous global
request_function = fedn.StoreModelRequest
args = {"id": "global_model"}
response = self.stub.HandleStoreModel(bytesIO_request_generator(mdl=previous_global, request_function=request_function, args=args))
response = self.stub.HandleStoreModel(bytesIO_request_generator(mdl=previous_global, request_function=request_function, args=args), timeout=120)
logger.info(f"Store model response: {response.status}")
# send client models and metadata
nr_updates = 0
Expand All @@ -99,19 +99,19 @@ def aggregate(self, previous_global, update_handler: UpdateHandler, helper, dele
# send metadata
client_id = update.sender.client_id
request = fedn.ClientMetaRequest(metadata=json.dumps(metadata), client_id=client_id)
response = self.stub.HandleMetadata(request)
response = self.stub.HandleMetadata(request, timeout=120)
# send client model
args = {"id": client_id}
request_function = fedn.StoreModelRequest
response = self.stub.HandleStoreModel(bytesIO_request_generator(mdl=model, request_function=request_function, args=args))
response = self.stub.HandleStoreModel(bytesIO_request_generator(mdl=model, request_function=request_function, args=args), timeout=120)
logger.info(f"Store model response: {response.status}")
nr_updates += 1
if delete_models:
# delete model from disk
update_handler.delete_model(model_update=update)
# ask for aggregation
request = fedn.AggregationRequest(aggregate="aggregate")
response_generator = self.stub.HandleAggregation(request)
response_generator = self.stub.HandleAggregation(request, timeout=600)
data["nr_aggregated_models"] = nr_updates
model, _ = unpack_model(response_generator, helper)
return model, data
12 changes: 2 additions & 10 deletions fedn/network/combiner/hooks/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,18 +226,10 @@ def _retire_and_log(self, func_name: str, err: Exception):
# fetch the source line from linecache (primed earlier)

src_line = (_lc.getline(f.filename, f.lineno) or "").rstrip("\n")
logger.error(
"User function '%s' crashed at %s:%d in %s()\n> %s\nException: %s",
func_name,
f.filename,
f.lineno,
f.name,
src_line,
repr(err),
)
logger.error(f"User function '{func_name}' crashed at {f.filename}:{f.lineno} in {f.name}()\n> {src_line}\nException: {repr(err)}")
else:
# fallback: full traceback (server + user frames) if we didn't match a user frame
logger.exception("%s failed, retiring until next code update: %s", func_name, err)
logger.exception(f"{func_name} failed, retiring until next code update: {err}")


def serve():
Expand Down
Loading