Skip to content

Commit 7db0001

Browse files
committed
CT: Fixup get().
1 parent 2e4b5ee commit 7db0001

File tree

1 file changed

+70
-45
lines changed

1 file changed

+70
-45
lines changed

src/runtime-libraries/mpi/mpi_caf.c

Lines changed: 70 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -233,18 +233,35 @@ MPI_Datatype *dts;
233233
char *msgbody;
234234
pthread_mutex_t lock_am;
235235
int done_am = 0;
236+
237+
/* Communication thread variables, constants and structures. */
238+
static const int CAF_CT_TAG = 13;
236239
pthread_t commthread;
240+
MPI_Comm ct_COMM;
237241
bool commthread_running = true;
238-
static const int CAF_CT_TAG = 13;
242+
enum CT_MSG_FLAGS
243+
{
244+
/* Use the inter communication thread communicator. */
245+
CT_INTER_CT = 1,
246+
/* Use 1 << 1 for next flag. */
247+
};
248+
249+
typedef struct
250+
{
251+
MPI_Win win;
252+
size_t transfer_size;
253+
int dest_image;
254+
int dest_tag;
255+
int flags;
256+
void (*access)(void *dst, void *base, void *data);
257+
char data[];
258+
} ct_msg_t;
239259

240260
char err_buffer[MPI_MAX_ERROR_STRING];
241261

242262
/* All CAF runtime calls should use this comm instead of MPI_COMM_WORLD for
243263
* interoperability purposes. */
244264
MPI_Comm CAF_COMM_WORLD;
245-
MPI_Comm ct_COMM;
246-
247-
static const int CT_STATUS_TERM_REQ = -1;
248265

249266
static caf_teams_list *teams_list = NULL;
250267
static caf_used_teams_list *used_teams = NULL;
@@ -415,59 +432,71 @@ helperFunction()
415432
void *
416433
communication_thread(void *)
417434
{
418-
int ierr = 0;
419-
int cnt;
435+
int ierr = 0, cnt;
420436
MPI_Status status;
437+
MPI_Message msg_han;
438+
MPI_Comm comm;
421439

422440
dprint("ct: Started.\n");
423441

424442
do
425443
{
426-
dprint("ct: Waiting for request.\n");
427-
ierr = MPI_Probe(MPI_ANY_SOURCE, CAF_CT_TAG, ct_COMM, &status);
428-
dprint("ct: Woke up.\n");
429-
if (status.MPI_ERROR == MPI_SUCCESS)
444+
dprint("ct: Probing for incoming message.\n");
445+
ierr = MPI_Mprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, ct_COMM, &msg_han, &status);
446+
chk_err(ierr);
447+
dprint("ct: Message received from %d, tag %d, mpi-status: %d, processing "
448+
"...\n",
449+
status.MPI_SOURCE, status.MPI_TAG, status.MPI_ERROR);
450+
if (status.MPI_TAG == CAF_CT_TAG && status.MPI_ERROR == MPI_SUCCESS)
430451
{
431452
MPI_Get_count(&status, MPI_BYTE, &cnt);
432453

433-
struct
434-
{
435-
MPI_Win win;
436-
size_t sz;
437-
} msg;
438-
if (cnt >= sizeof(msg))
454+
ct_msg_t *msg;
455+
if (cnt >= sizeof(ct_msg_t))
439456
{
440-
ierr = MPI_Recv(&msg, cnt, MPI_BYTE, status.MPI_SOURCE, status.MPI_TAG,
441-
ct_COMM, &status);
457+
msg = alloca(cnt);
458+
ierr = MPI_Mrecv(msg, cnt, MPI_BYTE, &msg_han, &status);
442459
chk_err(ierr);
443460
dprint("ct: Received request of size %ld.\n", cnt);
444461

445462
void *bptr;
446463
int flag;
447-
ierr = MPI_Win_get_attr(msg.win, MPI_WIN_BASE, &bptr, &flag);
464+
ierr = MPI_Win_get_attr(msg->win, MPI_WIN_BASE, &bptr, &flag);
448465
chk_err(ierr);
449-
dprint("ct: Local base for win %ld is %p (set: %b).\n", msg.win, bptr,
466+
dprint("ct: Local base for win %ld is %p (set: %b).\n", msg->win, bptr,
450467
flag);
451468
if (!flag)
452469
{
453-
dprint("ct: Error: Window %p memory is not allocated.\n", msg.win);
470+
dprint("ct: Error: Window %p memory is not allocated.\n", msg->win);
454471
}
455-
ierr = MPI_Send(bptr, msg.sz, MPI_BYTE, status.MPI_SOURCE,
456-
status.MPI_TAG + 1, ct_COMM);
472+
comm = (msg->flags & CT_INTER_CT) ? ct_COMM : CAF_COMM_WORLD;
473+
dprint("ct: Sending %ld bytes to image %d, tag %d on comm %x (%s).\n",
474+
msg->transfer_size, msg->dest_image, msg->dest_tag, comm,
475+
comm == CAF_COMM_WORLD ? "CAF_COMM_WORLD" : "ct_COMM");
476+
ierr = MPI_Send(bptr, msg->transfer_size, MPI_BYTE, msg->dest_image,
477+
msg->dest_tag, comm);
457478
chk_err(ierr);
458479
}
459480
else if (!commthread_running)
460481
{
461482
/* Pickup empty message. */
462-
MPI_Recv(&msg, cnt, MPI_BYTE, status.MPI_SOURCE, status.MPI_TAG,
463-
ct_COMM, &status);
483+
dprint("ct: Got termination message. Terminating.\n");
484+
ierr = MPI_Mrecv(&msg, cnt, MPI_BYTE, &msg_han, &status);
485+
chk_err(ierr);
464486
}
465487
else
466488
{
467489
dprint("ct: Error: message to small, ignoring (got: %ld, exp: %ld).\n",
468-
cnt, sizeof(msg));
490+
cnt, sizeof(ct_msg_t));
469491
}
470492
}
493+
else if (ierr == MPI_SUCCESS)
494+
{
495+
/* There is a message, but not for us. */
496+
dprint("ct: Message not for us received. Setting it free again.\n");
497+
// ierr = MPI_Request_free(&msg_han);
498+
chk_err(ierr);
499+
}
471500
else
472501
chk_err(ierr);
473502
} while (commthread_running);
@@ -1062,7 +1091,7 @@ PREFIX(init)(int *argc, char ***argv)
10621091
}
10631092
#endif
10641093

1065-
ierr = MPI_Comm_dup(MPI_COMM_WORLD, &ct_COMM);
1094+
ierr = MPI_Comm_dup(CAF_COMM_WORLD, &ct_COMM);
10661095
chk_err(ierr);
10671096
ierr = pthread_create(&commthread, NULL, &communication_thread, NULL);
10681097
chk_err(ierr);
@@ -1207,7 +1236,7 @@ finalize_internal(int status_code)
12071236

12081237
dprint("Sending termination signal to communication thread.\n");
12091238
commthread_running = false;
1210-
ierr = MPI_Send(NULL, 0, MPI_BYTE, caf_this_image - 1, CAF_CT_TAG, ct_COMM);
1239+
ierr = MPI_Send(NULL, 0, MPI_BYTE, mpi_this_image, CAF_CT_TAG, ct_COMM);
12111240
chk_err(ierr);
12121241
dprint("Termination signal send, waiting for thread join.\n");
12131242
ierr = pthread_join(commthread, NULL);
@@ -1439,15 +1468,9 @@ void PREFIX(register)(size_t size, caf_register_t type, caf_token_t *token,
14391468
p = TOKEN(mpi_token);
14401469

14411470
#if MPI_VERSION >= 3
1442-
void *flavor;
1443-
int flag = -1;
1444-
ierr = MPI_Win_allocate /*_shared*/ (actual_size, 1, MPI_INFO_NULL,
1445-
CAF_COMM_WORLD, &mem, p);
1446-
chk_err(ierr);
1447-
ierr = MPI_Win_get_attr(*p, MPI_WIN_CREATE_FLAVOR, &flavor, &flag);
1471+
ierr = MPI_Win_allocate(actual_size, 1, MPI_INFO_NULL, CAF_COMM_WORLD,
1472+
&mem, p);
14481473
chk_err(ierr);
1449-
dprint("win %d has create flavor: %x, flag: %d.\n", *p, *(int *)flavor,
1450-
flag);
14511474
CAF_Win_lock_all(*p);
14521475
#else
14531476
ierr = MPI_Alloc_mem(actual_size, MPI_INFO_NULL, &mem);
@@ -3787,16 +3810,18 @@ PREFIX(get)(caf_token_t token, size_t offset, int image_index,
37873810
{
37883811
const size_t trans_size
37893812
= ((dst_size > src_size) ? src_size : dst_size) * size;
3790-
struct
3791-
{
3792-
MPI_Win win;
3793-
size_t sz;
3794-
} buf = {*p, trans_size};
3795-
int tag = CAF_CT_TAG; // + caf_this_image) % 0xffff;
3813+
ct_msg_t *buf = alloca(sizeof(ct_msg_t));
3814+
buf->win = *p;
3815+
buf->transfer_size = trans_size;
3816+
buf->dest_image = mpi_this_image;
3817+
buf->dest_tag = CAF_CT_TAG + 1;
3818+
buf->flags = 0;
3819+
ierr = MPI_Send(buf, sizeof(ct_msg_t), MPI_BYTE, remote_image,
3820+
CAF_CT_TAG, ct_COMM);
3821+
chk_err(ierr);
37963822
ierr
3797-
= MPI_Sendrecv(&buf, sizeof(buf), MPI_BYTE, remote_image, tag,
3798-
dest->base_addr, trans_size, MPI_BYTE,
3799-
remote_image, tag + 1, ct_COMM, MPI_STATUS_IGNORE);
3823+
= MPI_Recv(dest->base_addr, trans_size, MPI_BYTE, image_index - 1,
3824+
buf->dest_tag, CAF_COMM_WORLD, MPI_STATUS_IGNORE);
38003825
chk_err(ierr);
38013826

38023827
// CAF_Win_lock(MPI_LOCK_SHARED, remote_image, *p);

0 commit comments

Comments
 (0)