Skip to content

Commit c74aabb

Browse files
committed
WIP: Add get_by_ct call.
1 parent 7db0001 commit c74aabb

File tree

3 files changed

+131
-18
lines changed

3 files changed

+131
-18
lines changed

CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,9 @@ endif()
233233
if ( gfortran_compiler AND ( NOT CMAKE_Fortran_COMPILER_VERSION VERSION_LESS 8.0.0 ) )
234234
add_definitions(-DGCC_GE_8) # Tell library to build against GFortran 8.x bindings w/ descriptor change
235235
endif()
236+
if ( gfortran_compiler AND ( NOT CMAKE_Fortran_COMPILER_VERSION VERSION_LESS 14.0.0 ) )
237+
add_definitions(-DGCC_GE_15) # Tell library to build against GFortran 15.x bindings
238+
endif()
236239

237240
if(gfortran_compiler)
238241
set(OLD_REQUIRED_FLAGS ${CMAKE_REQUIRED_FLAGS})

src/application-binary-interface/libcaf.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,15 @@ void PREFIX(caf_sendget)(caf_token_t, size_t, int, gfc_descriptor_t *,
261261
gfc_descriptor_t *, caf_vector_t *, int, int, bool,
262262
int *);
263263

264+
#ifdef GCC_GE_15
265+
void PREFIX(get_by_ct)(
266+
caf_token_t token, int image_index, size_t bufsize,
267+
void *set_buf /*void (*set)(void *buffer, void *set_data)*/, void *set_data,
268+
void (*get)(void **buffer, bool *free_buffer, void *base, void *get_data),
269+
void *get_data, size_t get_data_size, int *stat,
270+
caf_team_t *team __attribute__((unused)),
271+
int *team_number __attribute__((unused)));
272+
#endif
264273
#ifdef GCC_GE_8
265274
void PREFIX(get_by_ref)(caf_token_t, int, gfc_descriptor_t *dst,
266275
caf_reference_t *refs, int dst_kind, int src_kind,

src/runtime-libraries/mpi/mpi_caf.c

Lines changed: 119 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ typedef struct
253253
int dest_image;
254254
int dest_tag;
255255
int flags;
256-
void (*access)(void *dst, void *base, void *data);
256+
void (*access)(void **dst, bool *free_dst, void *base, void *data);
257257
char data[];
258258
} ct_msg_t;
259259

@@ -436,6 +436,9 @@ communication_thread(void *)
436436
MPI_Status status;
437437
MPI_Message msg_han;
438438
MPI_Comm comm;
439+
void *baseptr, *buffer;
440+
int flag;
441+
bool free_buffer;
439442

440443
dprint("ct: Started.\n");
441444

@@ -451,37 +454,41 @@ communication_thread(void *)
451454
{
452455
MPI_Get_count(&status, MPI_BYTE, &cnt);
453456

454-
ct_msg_t *msg;
455457
if (cnt >= sizeof(ct_msg_t))
456458
{
457-
msg = alloca(cnt);
459+
ct_msg_t *msg = alloca(cnt);
460+
458461
ierr = MPI_Mrecv(msg, cnt, MPI_BYTE, &msg_han, &status);
459462
chk_err(ierr);
460463
dprint("ct: Received request of size %ld.\n", cnt);
461464

462-
void *bptr;
463-
int flag;
464-
ierr = MPI_Win_get_attr(msg->win, MPI_WIN_BASE, &bptr, &flag);
465+
ierr = MPI_Win_get_attr(msg->win, MPI_WIN_BASE, &baseptr, &flag);
465466
chk_err(ierr);
466-
dprint("ct: Local base for win %ld is %p (set: %b).\n", msg->win, bptr,
467-
flag);
467+
dprint("ct: Local base for win %ld is %p (set: %b) Executing getter at "
468+
"%p.\n",
469+
msg->win, baseptr, flag, msg->access);
468470
if (!flag)
469471
{
470472
dprint("ct: Error: Window %p memory is not allocated.\n", msg->win);
471473
}
474+
msg->access(&buffer, &free_buffer, baseptr, msg->data);
475+
dprint("ct: getter executed.\n");
472476
comm = (msg->flags & CT_INTER_CT) ? ct_COMM : CAF_COMM_WORLD;
473477
dprint("ct: Sending %ld bytes to image %d, tag %d on comm %x (%s).\n",
474478
msg->transfer_size, msg->dest_image, msg->dest_tag, comm,
475479
comm == CAF_COMM_WORLD ? "CAF_COMM_WORLD" : "ct_COMM");
476-
ierr = MPI_Send(bptr, msg->transfer_size, MPI_BYTE, msg->dest_image,
480+
ierr = MPI_Send(buffer, msg->transfer_size, MPI_BYTE, msg->dest_image,
477481
msg->dest_tag, comm);
478482
chk_err(ierr);
483+
if (free_buffer)
484+
free(buffer);
479485
}
480486
else if (!commthread_running)
481487
{
482488
/* Pickup empty message. */
483489
dprint("ct: Got termination message. Terminating.\n");
484-
ierr = MPI_Mrecv(&msg, cnt, MPI_BYTE, &msg_han, &status);
490+
baseptr = NULL;
491+
ierr = MPI_Mrecv(baseptr, cnt, MPI_BYTE, &msg_han, &status);
485492
chk_err(ierr);
486493
}
487494
else
@@ -3669,6 +3676,13 @@ PREFIX(send)(caf_token_t token, size_t offset, int image_index,
36693676
}
36703677
}
36713678

3679+
void
3680+
get_access(void **dst, bool *dst_is_tmp, void *base, void *)
3681+
{
3682+
*dst = base;
3683+
*dst_is_tmp = false;
3684+
}
3685+
36723686
/* Get array data from a remote src to a local dest. */
36733687

36743688
void
@@ -3810,18 +3824,19 @@ PREFIX(get)(caf_token_t token, size_t offset, int image_index,
38103824
{
38113825
const size_t trans_size
38123826
= ((dst_size > src_size) ? src_size : dst_size) * size;
3813-
ct_msg_t *buf = alloca(sizeof(ct_msg_t));
3814-
buf->win = *p;
3815-
buf->transfer_size = trans_size;
3816-
buf->dest_image = mpi_this_image;
3817-
buf->dest_tag = CAF_CT_TAG + 1;
3818-
buf->flags = 0;
3819-
ierr = MPI_Send(buf, sizeof(ct_msg_t), MPI_BYTE, remote_image,
3827+
ct_msg_t *msg = alloca(sizeof(ct_msg_t));
3828+
msg->win = *p;
3829+
msg->transfer_size = trans_size;
3830+
msg->dest_image = mpi_this_image;
3831+
msg->dest_tag = CAF_CT_TAG + 1;
3832+
msg->flags = 0;
3833+
msg->access = &get_access;
3834+
ierr = MPI_Send(msg, sizeof(ct_msg_t), MPI_BYTE, remote_image,
38203835
CAF_CT_TAG, ct_COMM);
38213836
chk_err(ierr);
38223837
ierr
38233838
= MPI_Recv(dest->base_addr, trans_size, MPI_BYTE, image_index - 1,
3824-
buf->dest_tag, CAF_COMM_WORLD, MPI_STATUS_IGNORE);
3839+
msg->dest_tag, CAF_COMM_WORLD, MPI_STATUS_IGNORE);
38253840
chk_err(ierr);
38263841

38273842
// CAF_Win_lock(MPI_LOCK_SHARED, remote_image, *p);
@@ -4890,6 +4905,92 @@ get_for_ref(caf_reference_t *ref, size_t *i, size_t dst_index,
48904905
}
48914906
}
48924907

4908+
#ifdef GCC_GE_15
4909+
void
4910+
PREFIX(get_by_ct)(
4911+
caf_token_t token, int image_index, size_t bufsize, void *set_buf,
4912+
/*void (*set)(void *buffer, void *set_data), */ void *set_data,
4913+
void (*get)(void **buffer, bool *free_buffer, void *base, void *get_data),
4914+
void *get_data, size_t get_data_size, int *stat,
4915+
caf_team_t *team __attribute__((unused)),
4916+
int *team_number __attribute__((unused)))
4917+
{
4918+
MPI_Group current_team_group, win_group;
4919+
int ierr, this_image, remote_image;
4920+
int trans_ranks[2];
4921+
bool free_t_buff, free_msg;
4922+
void *t_buff;
4923+
ct_msg_t *msg;
4924+
const size_t msg_size = sizeof(ct_msg_t) + get_data_size;
4925+
4926+
if (stat)
4927+
*stat = 0;
4928+
4929+
// Get mapped remote image
4930+
ierr = MPI_Comm_group(CAF_COMM_WORLD, &current_team_group);
4931+
chk_err(ierr);
4932+
ierr = MPI_Win_get_group(*TOKEN(token), &win_group);
4933+
chk_err(ierr);
4934+
ierr = MPI_Group_translate_ranks(current_team_group, 2,
4935+
(int[]){image_index - 1, mpi_this_image},
4936+
win_group, trans_ranks);
4937+
chk_err(ierr);
4938+
remote_image = trans_ranks[0];
4939+
this_image = trans_ranks[1];
4940+
ierr = MPI_Group_free(&current_team_group);
4941+
chk_err(ierr);
4942+
ierr = MPI_Group_free(&win_group);
4943+
chk_err(ierr);
4944+
4945+
check_image_health(remote_image, stat);
4946+
4947+
dprint("Entering get_by_ct(), win_rank = %d, this_rank = %d, getter: %p.\n",
4948+
remote_image, this_image, get);
4949+
4950+
// create get msg
4951+
if ((free_msg = (((msg = alloca(msg_size))) == NULL)))
4952+
{
4953+
msg = malloc(msg_size);
4954+
if (msg == NULL)
4955+
caf_runtime_error("Unable to allocate memory "
4956+
"for internal message in get_by_ct().");
4957+
}
4958+
msg->win = *TOKEN(token);
4959+
msg->transfer_size = bufsize;
4960+
msg->dest_image = mpi_this_image;
4961+
msg->dest_tag = CAF_CT_TAG + 1;
4962+
msg->flags = 0;
4963+
msg->access = get;
4964+
memcpy(msg->data, get_data, get_data_size);
4965+
4966+
// call get on remote
4967+
ierr = MPI_Send(msg, msg_size, MPI_BYTE, remote_image, CAF_CT_TAG, ct_COMM);
4968+
chk_err(ierr);
4969+
4970+
// allocate local buffer
4971+
if ((free_t_buff = (((t_buff = alloca(bufsize))) == NULL)))
4972+
{
4973+
t_buff = malloc(bufsize);
4974+
if (t_buff == NULL)
4975+
caf_runtime_error("Unable to allocate memory "
4976+
"for internal buffer in get_by_ct().");
4977+
}
4978+
ierr = MPI_Recv(t_buff, bufsize, MPI_BYTE, image_index - 1, msg->dest_tag,
4979+
CAF_COMM_WORLD, MPI_STATUS_IGNORE);
4980+
chk_err(ierr);
4981+
4982+
// set (buffer, set_data)
4983+
memcpy(set_buf, t_buff, bufsize);
4984+
// set(t_buff, set_data);
4985+
4986+
// free (buffer)
4987+
if (free_msg)
4988+
free(msg);
4989+
if (free_t_buff)
4990+
free(t_buff);
4991+
}
4992+
#endif
4993+
48934994
void
48944995
PREFIX(get_by_ref)(caf_token_t token, int image_index, gfc_descriptor_t *dst,
48954996
caf_reference_t *refs, int dst_kind, int src_kind,

0 commit comments

Comments
 (0)