Skip to content

Commit 49ef8ca

Browse files
committed
WIP: Simple get() works using thread.
1 parent f9a68b9 commit 49ef8ca

File tree

1 file changed

+118
-10
lines changed

1 file changed

+118
-10
lines changed

src/runtime-libraries/mpi/mpi_caf.c

Lines changed: 118 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -232,12 +232,18 @@ MPI_Datatype *dts;
232232
char *msgbody;
233233
pthread_mutex_t lock_am;
234234
int done_am = 0;
235+
pthread_t commthread;
236+
bool commthread_running = true;
237+
static const int CAF_CT_TAG = 13;
235238

236239
char err_buffer[MPI_MAX_ERROR_STRING];
237240

238241
/* All CAF runtime calls should use this comm instead of MPI_COMM_WORLD for
239242
* interoperability purposes. */
240243
MPI_Comm CAF_COMM_WORLD;
244+
MPI_Comm ct_COMM;
245+
246+
static const int CT_STATUS_TERM_REQ = -1;
241247

242248
static caf_teams_list *teams_list = NULL;
243249
static caf_used_teams_list *used_teams = NULL;
@@ -405,6 +411,69 @@ helperFunction()
405411
}
406412
#endif
407413

414+
void *
415+
communication_thread(void *)
416+
{
417+
int ierr = 0;
418+
int cnt;
419+
MPI_Status status;
420+
421+
dprint("ct: Started.\n");
422+
423+
do
424+
{
425+
dprint("ct: Waiting for request.\n");
426+
ierr = MPI_Probe(MPI_ANY_SOURCE, CAF_CT_TAG, ct_COMM, &status);
427+
dprint("ct: Woke up.\n");
428+
if (status.MPI_ERROR == MPI_SUCCESS)
429+
{
430+
MPI_Get_count(&status, MPI_BYTE, &cnt);
431+
432+
struct
433+
{
434+
MPI_Win win;
435+
size_t sz;
436+
} msg;
437+
if (cnt >= sizeof(msg))
438+
{
439+
ierr = MPI_Recv(&msg, cnt, MPI_BYTE, status.MPI_SOURCE, status.MPI_TAG,
440+
ct_COMM, &status);
441+
chk_err(ierr);
442+
dprint("ct: Received request of size %ld.\n", cnt);
443+
444+
void *bptr;
445+
int flag;
446+
ierr = MPI_Win_get_attr(msg.win, MPI_WIN_BASE, &bptr, &flag);
447+
chk_err(ierr);
448+
dprint("ct: Local base for win %ld is %p (set: %b).\n", msg.win, bptr,
449+
flag);
450+
if (!flag)
451+
{
452+
dprint("ct: Error: Window %p memory is not allocated.\n", msg.win);
453+
}
454+
ierr = MPI_Send(bptr, msg.sz, MPI_BYTE, status.MPI_SOURCE,
455+
status.MPI_TAG + 1, ct_COMM);
456+
chk_err(ierr);
457+
}
458+
else if (!commthread_running)
459+
{
460+
/* Pickup empty message. */
461+
MPI_Recv(&msg, cnt, MPI_BYTE, status.MPI_SOURCE, status.MPI_TAG,
462+
ct_COMM, &status);
463+
}
464+
else
465+
{
466+
dprint("ct: Error: message to small, ignoring (got: %ld, exp: %ld).\n",
467+
cnt, sizeof(msg));
468+
}
469+
}
470+
else
471+
chk_err(ierr);
472+
} while (commthread_running);
473+
dprint("ct: Ended.\n");
474+
return NULL;
475+
}
476+
408477
/* Keep in sync with single.c. */
409478

410479
static void
@@ -841,7 +910,7 @@ PREFIX(init)(int *argc, char ***argv)
841910
if (caf_num_images == 0)
842911
{
843912
int ierr = 0, i = 0, j = 0, rc, prov_lev = 0;
844-
int is_init = 0, prior_thread_level = MPI_THREAD_FUNNELED;
913+
int is_init = 0, prior_thread_level = MPI_THREAD_MULTIPLE;
845914
ierr = MPI_Initialized(&is_init);
846915
chk_err(ierr);
847916

@@ -850,6 +919,7 @@ PREFIX(init)(int *argc, char ***argv)
850919
ierr = MPI_Query_thread(&prior_thread_level);
851920
chk_err(ierr);
852921
}
922+
dprint("Main thread: thread level: %d\n", prior_thread_level);
853923
#ifdef HELPER
854924
if (is_init)
855925
{
@@ -990,6 +1060,11 @@ PREFIX(init)(int *argc, char ***argv)
9901060
*win_model, flag);
9911061
}
9921062
#endif
1063+
1064+
ierr = MPI_Comm_dup(MPI_COMM_WORLD, &ct_COMM);
1065+
chk_err(ierr);
1066+
ierr = pthread_create(&commthread, NULL, &communication_thread, NULL);
1067+
chk_err(ierr);
9931068
}
9941069
}
9951070

@@ -1059,9 +1134,12 @@ finalize_internal(int status_code)
10591134
/* Add a conventional barrier to prevent images from quitting too early. */
10601135
if (status_code == 0)
10611136
{
1062-
dprint("In barrier for finalize...");
1063-
ierr = MPI_Barrier(CAF_COMM_WORLD);
1064-
chk_err(ierr);
1137+
if (caf_num_images > 1)
1138+
{
1139+
dprint("In barrier for finalize...");
1140+
ierr = MPI_Barrier(CAF_COMM_WORLD);
1141+
chk_err(ierr);
1142+
}
10651143
}
10661144
else
10671145
/* Without failed images support, but a given status_code, we need to
@@ -1126,6 +1204,17 @@ finalize_internal(int status_code)
11261204
chk_err(ierr);
11271205
#endif // MPI_VERSION
11281206

1207+
dprint("Sending termination signal to communication thread.\n");
1208+
commthread_running = false;
1209+
ierr = MPI_Send(NULL, 0, MPI_BYTE, caf_this_image - 1, CAF_CT_TAG, ct_COMM);
1210+
chk_err(ierr);
1211+
dprint("Termination signal send, waiting for thread join.\n");
1212+
ierr = pthread_join(commthread, NULL);
1213+
dprint("Communication thread terminated with rc = %d.\n", ierr);
1214+
dprint("Freeing ct_COMM.\n");
1215+
MPI_Comm_free(&ct_COMM);
1216+
dprint("Freeed ct_COMM.\n");
1217+
11291218
/* Free the global dynamic window. */
11301219
ierr = MPI_Win_free(&global_dynamic_win);
11311220
chk_err(ierr);
@@ -1200,6 +1289,7 @@ finalize_internal(int status_code)
12001289
caf_is_finalized = 1;
12011290
#endif
12021291
free(sync_handles);
1292+
12031293
dprint("Finalisation done!!!\n");
12041294
}
12051295

@@ -1348,9 +1438,15 @@ void PREFIX(register)(size_t size, caf_register_t type, caf_token_t *token,
13481438
p = TOKEN(mpi_token);
13491439

13501440
#if MPI_VERSION >= 3
1351-
ierr = MPI_Win_allocate(actual_size, 1, MPI_INFO_NULL, CAF_COMM_WORLD,
1352-
&mem, p);
1441+
void *flavor;
1442+
int flag = -1;
1443+
ierr = MPI_Win_allocate /*_shared*/ (actual_size, 1, MPI_INFO_NULL,
1444+
CAF_COMM_WORLD, &mem, p);
1445+
chk_err(ierr);
1446+
ierr = MPI_Win_get_attr(*p, MPI_WIN_CREATE_FLAVOR, &flavor, &flag);
13531447
chk_err(ierr);
1448+
dprint("win %d has create flavor: %x, flag: %d.\n", *p, *(int *)flavor,
1449+
flag);
13541450
CAF_Win_lock_all(*p);
13551451
#else
13561452
ierr = MPI_Alloc_mem(actual_size, MPI_INFO_NULL, &mem);
@@ -3683,11 +3779,23 @@ PREFIX(get)(caf_token_t token, size_t offset, int image_index,
36833779
{
36843780
const size_t trans_size
36853781
= ((dst_size > src_size) ? src_size : dst_size) * size;
3686-
CAF_Win_lock(MPI_LOCK_SHARED, remote_image, *p);
3687-
ierr = MPI_Get(dest->base_addr, trans_size, MPI_BYTE, remote_image,
3688-
offset, trans_size, MPI_BYTE, *p);
3782+
struct
3783+
{
3784+
MPI_Win win;
3785+
size_t sz;
3786+
} buf = {*p, trans_size};
3787+
int tag = CAF_CT_TAG; // + caf_this_image) % 0xffff;
3788+
ierr
3789+
= MPI_Sendrecv(&buf, sizeof(buf), MPI_BYTE, remote_image, tag,
3790+
dest->base_addr, trans_size, MPI_BYTE,
3791+
remote_image, tag + 1, ct_COMM, MPI_STATUS_IGNORE);
36893792
chk_err(ierr);
3690-
CAF_Win_unlock(remote_image, *p);
3793+
3794+
// CAF_Win_lock(MPI_LOCK_SHARED, remote_image, *p);
3795+
// ierr = MPI_Get(dest->base_addr, trans_size, MPI_BYTE, remote_image,
3796+
// offset, trans_size, MPI_BYTE, *p);
3797+
// chk_err(ierr);
3798+
// CAF_Win_unlock(remote_image, *p);
36913799
}
36923800
else
36933801
{

0 commit comments

Comments
 (0)