-
Notifications
You must be signed in to change notification settings - Fork 822
Add cluster docs #146
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Add cluster docs #146
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
Parl Cluster | ||
============ | ||
|
||
Get Started | ||
########### | ||
|
||
Cluster Structure Overview | ||
-------------------------- | ||
|
||
| There are three core concepts in a Parl cluster: master, worker and client. | ||
|
||
- **Master:** The master node is the control center of a parl cluster, which | ||
provides connections to workers and clients. It receives tasks from clients | ||
and allocate vacant workers to run the tasks. | ||
|
||
- **Worker:** A worker provides the cpu computation resources for the cluster. | ||
It will initiate separate job subprocesses waiting for tasks from the master. | ||
|
||
- **Client:** For each training program, there is a unique global client which | ||
submits tasks to the master node. | ||
|
||
.. image:: ./cluster_structure.png | ||
:width: 600px | ||
:align: center | ||
|
||
Master | ||
------ | ||
| There is only one master node in each parl cluster, we can start a master by | ||
calling ``xparl start --port 1234`` with a assigned port number. This command | ||
will also simultaneously start a local worker which connects to the new | ||
master. | ||
|
||
| **master socket** will receive all kinds of message from workers, clients or | ||
cluster monitor, such as: | ||
|
||
- A new worker connects the cluster. The master will start a heartbeat to check | ||
worker's status, and worker's jobs will be added to master's job center. | ||
- A new client connects the cluster: The master will start a heartbeat to check | ||
client's status, and wait for client to submit a task. | ||
- A worker updates its job buffer: The master will replace the new jobs for the | ||
killed old jobs in the job center. | ||
- Cluster monitor query cluster status: The master will return the detailed | ||
status of the cluster (i.e. total cpu number, used cpu number, load average | ||
...) to the monitor. | ||
|
||
.. image:: ./master.png | ||
:width: 600px | ||
:align: center | ||
|
||
Worker | ||
------ | ||
|
||
| We can add more computation resources to a existed cluster by calling | ||
``xparl --connect master_address`` command. This command will create a local | ||
**Worker** object and then connect to the cluster. | ||
|
||
| When we start a new worker, it will first initiate separate job subprocesses | ||
in a job buffer. And then send the initialized worker to the master node. | ||
|
||
| The worker will send a heartbeat signal to each job to check if it's still | ||
alive. When the worker find a job subprocess is dead, it will drop the dead | ||
job from the job buffer, start a new job and update worker information to | ||
the master node. | ||
|
||
.. image:: ./worker.png | ||
:width: 600px | ||
:align: center | ||
|
||
Client | ||
------ | ||
|
||
| We have a global client for each training program, it submits training tasks | ||
to the master node. User do not need to interact with client object directly. | ||
We can create a new global client or get an existed global client by calling | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
``parl.connect(master_address)``. | ||
|
||
| The global client will read local python scripts and configuration files, | ||
which will later be sent to remote jobs. | ||
|
||
.. image:: ./client.png | ||
:width: 600px | ||
:align: center | ||
|
||
Actor | ||
----- | ||
|
||
| **Actor** is an object defined by users which aims to solve a specific task. | ||
We use ``@parl.remote_class`` decorator to convert an actor to a | ||
remote class object, and each actor is connected to the global client. | ||
|
||
.. code-block:: python | ||
|
||
# connect global client to the master node | ||
parl.connect(master_address) | ||
|
||
@parl.remote_class | ||
class Actor(object) | ||
def __init__(self): | ||
... | ||
|
||
| When a decorated actor class object is instantiated, the global client will | ||
submit a task to the master node. Then the master node will pick a vacant job | ||
from the job center and send the job back to the client. The actor will make | ||
a connection with the job and send local files, class definition and | ||
initialization arguments to the job. Then the job will instantiate a local | ||
actor in the job process. | ||
|
||
| When the actor call a function, the real computation will be executed in the | ||
job process by job's local actor. | ||
|
||
.. image:: ./actor.png | ||
:width: 600px | ||
:align: center |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,7 +32,7 @@ class Client(object): | |
connect to the same global client in a training task. | ||
|
||
Attributes: | ||
submit_job_socket (zmq.Context.socket): A socket which submits job to | ||
submit_task_socket (zmq.Context.socket): A socket which submits job to | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A socket which submits tasks to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed. |
||
the master node. | ||
pyfiles (bytes): A serialized dictionary containing the code of python | ||
files in local working directory. | ||
|
@@ -104,15 +104,15 @@ def read_local_files(self, distributed_files=[]): | |
def _create_sockets(self, master_address): | ||
""" Each client has 1 sockets as start: | ||
|
||
(1) submit_job_socket: submits jobs to master node. | ||
(1) submit_task_socket: submits tasks to master node. | ||
""" | ||
|
||
# submit_job_socket: submits job to master | ||
self.submit_job_socket = self.ctx.socket(zmq.REQ) | ||
self.submit_job_socket.linger = 0 | ||
self.submit_job_socket.setsockopt( | ||
# submit_task_socket: submits job to master | ||
self.submit_task_socket = self.ctx.socket(zmq.REQ) | ||
self.submit_task_socket.linger = 0 | ||
self.submit_task_socket.setsockopt( | ||
zmq.RCVTIMEO, remote_constants.HEARTBEAT_TIMEOUT_S * 1000) | ||
self.submit_job_socket.connect("tcp://{}".format(master_address)) | ||
self.submit_task_socket.connect("tcp://{}".format(master_address)) | ||
self.start_time = time.time() | ||
thread = threading.Thread(target=self._reply_heartbeat) | ||
thread.setDaemon(True) | ||
|
@@ -121,12 +121,12 @@ def _create_sockets(self, master_address): | |
|
||
# check if the master is connected properly | ||
try: | ||
self.submit_job_socket.send_multipart([ | ||
self.submit_task_socket.send_multipart([ | ||
remote_constants.CLIENT_CONNECT_TAG, | ||
to_byte(self.heartbeat_master_address), | ||
to_byte(socket.gethostname()) | ||
]) | ||
_ = self.submit_job_socket.recv_multipart() | ||
_ = self.submit_task_socket.recv_multipart() | ||
except zmq.error.Again as e: | ||
logger.warning("[Client] Can not connect to the master, please " | ||
"check if master is started and ensure the input " | ||
|
@@ -232,11 +232,11 @@ def _create_job_monitor(self, job_heartbeat_socket): | |
|
||
job_heartbeat_socket.close(0) | ||
|
||
def submit_job(self): | ||
"""Send a job to the Master node. | ||
def submit_task(self): | ||
"""Send a task to the Master node. | ||
|
||
When a `@parl.remote_class` object is created, the global client | ||
sends a job to the master node. Then the master node will allocate | ||
sends a task to the master node. Then the master node will allocate | ||
a vacant job from its job pool to the remote object. | ||
|
||
Returns: | ||
|
@@ -247,11 +247,11 @@ def submit_job(self): | |
while True: | ||
# A lock to prevent multiple actors from submitting job at the same time. | ||
self.lock.acquire() | ||
self.submit_job_socket.send_multipart([ | ||
self.submit_task_socket.send_multipart([ | ||
remote_constants.CLIENT_SUBMIT_TAG, | ||
to_byte(self.heartbeat_master_address) | ||
]) | ||
message = self.submit_job_socket.recv_multipart() | ||
message = self.submit_task_socket.recv_multipart() | ||
self.lock.release() | ||
|
||
tag = message[0] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
an existed