Skip to content

Commit

Permalink
Use chunk data types instead of loop over chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
devreal committed Aug 16, 2017
1 parent 92d7e6c commit 601cb27
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#include <dash/dart/if/dart_communication.h>

DART_INTERNAL
int dart__mpi__datatype_sizes[DART_TYPE_COUNT];
extern int dart__mpi__datatype_sizes[DART_TYPE_COUNT];

/** DART handle type for non-blocking one-sided operations. */
struct dart_handle_struct
Expand All @@ -28,6 +28,9 @@ struct dart_handle_struct
dart_ret_t
dart__mpi__datatype_init() DART_INTERNAL;

dart_ret_t
dart__mpi__datatype_fini() DART_INTERNAL;

static inline MPI_Op dart__mpi__op(dart_operation_t dart_op) {
switch (dart_op) {
case DART_OP_MIN : return MPI_MIN;
Expand Down
217 changes: 142 additions & 75 deletions dart-impl/mpi/src/dart_communication.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <alloca.h>

int dart__mpi__datatype_sizes[DART_TYPE_COUNT];
static MPI_Datatype dart__mpi__max_chunk_datatype[DART_TYPE_COUNT];

static inline size_t minsize(size_t a, size_t b)
{
Expand All @@ -41,13 +42,43 @@ dart_ret_t
dart__mpi__datatype_init()
{
for (int i = DART_TYPE_UNDEFINED+1; i < DART_TYPE_COUNT; i++) {

// query the size of the data type
int ret = MPI_Type_size(
dart__mpi__datatype(i),
&dart__mpi__datatype_sizes[i]);
if (ret != MPI_SUCCESS) {
DART_LOG_ERROR("Failed to query size of DART data type %i", i);
return DART_ERR_INVAL;
}

// create the chunk data type
ret = MPI_Type_contiguous(INT_MAX,
dart__mpi__datatype(i),
&dart__mpi__max_chunk_datatype[i]);
if (ret != MPI_SUCCESS) {
DART_LOG_ERROR("Failed to create chunk type of DART data type %i", i);
return DART_ERR_INVAL;
}
ret = MPI_Type_commit(&dart__mpi__max_chunk_datatype[i]);
if (ret != MPI_SUCCESS) {
DART_LOG_ERROR("Failed to commit chunk type of DART data type %i", i);
return DART_ERR_INVAL;
}

}
return DART_OK;
}

dart_ret_t
dart__mpi__datatype_fini()
{
for (int i = DART_TYPE_UNDEFINED+1; i < DART_TYPE_COUNT; i++) {
int ret = MPI_Type_free(&dart__mpi__max_chunk_datatype[i]);
if (ret != MPI_SUCCESS) {
DART_LOG_ERROR("Failed to commit chunk type of DART data type %i", i);
return DART_ERR_INVAL;
}
}
return DART_OK;
}
Expand Down Expand Up @@ -177,30 +208,43 @@ dart_ret_t dart_get(
* MPI uses offset type int, do not copy more than INT_MAX elements:
*/
// chunk up the get
const size_t chunk_size = INT_MAX;
char * dest_ptr = (char*) dest;
size_t copied = 0;
const size_t nchunks = nelem / INT_MAX;
const size_t remainder = nelem % INT_MAX;
char * dest_ptr = (char*) dest;

while (copied < nelem) {
int to_copy = minsize(chunk_size, nelem - copied);
DART_LOG_TRACE("dart_get: MPI_Get (dest %p, size %d)", dest_ptr, to_copy);
if (nchunks > 0) {
DART_LOG_TRACE(
"dart_get: MPI_Get (dest %p, size %zu)", dest_ptr, nchunks * INT_MAX);
if (MPI_Get(dest_ptr,
to_copy,
nchunks,
dart__mpi__max_chunk_datatype[dtype],
team_unit_id.id,
offset,
nchunks,
dart__mpi__max_chunk_datatype[dtype],
win) != MPI_SUCCESS) {
DART_LOG_ERROR("dart_get ! MPI_Get failed");
return DART_ERR_INVAL;
}
offset += nchunks * INT_MAX;
dest_ptr += nchunks * INT_MAX;
}

if (remainder > 0) {
DART_LOG_TRACE("dart_get: MPI_Get (dest %p, size %zu)", dest_ptr, remainder);
if (MPI_Get(dest_ptr,
remainder,
mpi_dtype,
team_unit_id.id,
offset,
to_copy,
remainder,
mpi_dtype,
win) != MPI_SUCCESS) {
DART_LOG_ERROR("dart_get ! MPI_Get failed");
return DART_ERR_INVAL;
}
dest_ptr += chunk_size;
offset += chunk_size;
copied += chunk_size;
}


DART_LOG_DEBUG("dart_get > finished");
return DART_OK;
}
Expand Down Expand Up @@ -267,27 +311,41 @@ dart_ret_t dart_put(


// chunk up the put
const size_t chunk_size = INT_MAX;
const char * src_ptr = (const char*) src;
size_t copied = 0;
const size_t nchunks = nelem / INT_MAX;
const size_t remainder = nelem % INT_MAX;
const char * src_ptr = (const char*) src;

if (nchunks > 0) {
DART_LOG_TRACE(
"dart_put: MPI_Put (src %p, size %zu)", src_ptr, nchunks * INT_MAX);
if (MPI_Put(src_ptr,
nchunks,
dart__mpi__max_chunk_datatype[dtype],
team_unit_id.id,
offset,
nchunks,
dart__mpi__max_chunk_datatype[dtype],
win) != MPI_SUCCESS) {
DART_LOG_ERROR("dart_put ! MPI_Put failed");
return DART_ERR_INVAL;
}
offset += nchunks * INT_MAX;
src_ptr += nchunks * INT_MAX;
}

while (copied < nelem) {
int to_copy = minsize(chunk_size, nelem - copied);
DART_LOG_TRACE("dart_get: MPI_Put (src %p, size %d)", src_ptr, to_copy);
if (remainder > 0) {
DART_LOG_TRACE("dart_put: MPI_Put (src %p, size %zu)", src_ptr, remainder);
if (MPI_Put(src_ptr,
to_copy,
remainder,
mpi_dtype,
team_unit_id.id,
offset,
to_copy,
remainder,
mpi_dtype,
win) != MPI_SUCCESS) {
DART_LOG_ERROR("dart_get ! MPI_Put failed");
DART_LOG_ERROR("dart_put ! MPI_Put failed");
return DART_ERR_INVAL;
}
src_ptr += chunk_size;
offset += chunk_size;
copied += chunk_size;
}

return DART_OK;
Expand Down Expand Up @@ -835,27 +893,42 @@ dart_ret_t dart_put_blocking(
* Using MPI_Put as MPI_Win_flush is required to ensure remote completion.
*/
// chunk up the put
const size_t chunk_size = INT_MAX;
const char * src_ptr = (const char*) src;
size_t copied = 0;
const size_t nchunks = nelem / INT_MAX;
const size_t remainder = nelem % INT_MAX;
const char * src_ptr = (const char*) src;

if (nchunks > 0) {
DART_LOG_TRACE(
"dart_get: MPI_Put (src %p, size %zu)", src_ptr, nchunks * INT_MAX);
if (MPI_Put(src_ptr,
nchunks,
dart__mpi__max_chunk_datatype[dtype],
team_unit_id.id,
offset,
nchunks,
dart__mpi__max_chunk_datatype[dtype],
win) != MPI_SUCCESS) {
DART_LOG_ERROR("dart_get ! MPI_Put failed");
return DART_ERR_INVAL;
}
src_ptr += nchunks * INT_MAX;
offset += nchunks * INT_MAX;
}

while (copied < nelem) {
int to_copy = minsize(chunk_size, nelem - copied);
DART_LOG_TRACE("dart_get: MPI_Put (src %p, size %d)", src_ptr, to_copy);
if (remainder > 0) {
DART_LOG_TRACE(
"dart_get: MPI_Put (src %p, size %zu)", src_ptr, remainder);
if (MPI_Put(src_ptr,
to_copy,
remainder,
mpi_dtype,
team_unit_id.id,
offset,
to_copy,
remainder,
mpi_dtype,
win) != MPI_SUCCESS) {
DART_LOG_ERROR("dart_get ! MPI_Put failed");
return DART_ERR_INVAL;
}
src_ptr += chunk_size;
offset += chunk_size;
copied += chunk_size;
}


Expand Down Expand Up @@ -965,56 +1038,50 @@ dart_ret_t dart_get_blocking(
/*
* Using MPI_Get as MPI_Win_flush is required to ensure remote completion.
*/
// chunk up the get
const size_t nchunks = nelem / INT_MAX;
const size_t remainder = nelem % INT_MAX;
char * dest_ptr = (char*) dest;
MPI_Request reqs[2];
int nreqs = 0;

if (nchunks > 0) {
DART_LOG_TRACE("dart_get_blocking: MPI_Rget (dest %p, size %zu)",
dest_ptr, nchunks * INT_MAX);
if (MPI_Rget(dest_ptr,
nchunks,
dart__mpi__max_chunk_datatype[dtype],
team_unit_id.id,
offset,
nchunks,
dart__mpi__max_chunk_datatype[dtype],
win,
&reqs[nreqs++]) != MPI_SUCCESS) {
DART_LOG_ERROR("dart_get ! MPI_Get failed");
return DART_ERR_INVAL;
}
offset += nchunks * INT_MAX;
dest_ptr += nchunks * INT_MAX;
}

if (nelem < INT_MAX) {
DART_LOG_DEBUG("dart_get_blocking: MPI_Rget");
MPI_Request req;
if (MPI_Rget(dest,
nelem,
if (remainder > 0) {
DART_LOG_TRACE(
"dart_get_blocking: MPI_Rget (dest %p, size %zu)", dest_ptr, remainder);
if (MPI_Rget(dest_ptr,
remainder,
mpi_dtype,
team_unit_id.id,
offset,
nelem,
remainder,
mpi_dtype,
win,
&req)
!= MPI_SUCCESS) {
DART_LOG_ERROR("dart_get_blocking ! MPI_Rget failed");
return DART_ERR_INVAL;
}
DART_LOG_DEBUG("dart_get_blocking: MPI_Wait");
if (MPI_Wait(&req, MPI_STATUS_IGNORE) != MPI_SUCCESS) {
DART_LOG_ERROR("dart_get_blocking ! MPI_Wait failed");
&reqs[nreqs++]) != MPI_SUCCESS) {
DART_LOG_ERROR("dart_get ! MPI_Get failed");
return DART_ERR_INVAL;
}
} else {
// chunk up the get
const size_t chunk_size = INT_MAX;
char *dest_ptr = (char*) dest;
const int num_chunks = (int)ceil(((double)nelem) / ((double)INT_MAX));
MPI_Request *reqs = alloca(num_chunks * sizeof(MPI_Request));
for (int chunk = 0; chunk < num_chunks; ++chunk) {
DART_LOG_TRACE("dart_get: MPI_Rget (chunk %d)", chunk);
int to_copy = minsize(chunk_size, nelem - (chunk*chunk_size));
if (MPI_Rget(dest_ptr,
to_copy,
mpi_dtype,
team_unit_id.id,
offset,
to_copy,
mpi_dtype,
win,
&reqs[chunk]) != MPI_SUCCESS) {
DART_LOG_ERROR("dart_get ! MPI_Rget failed");
return DART_ERR_INVAL;
}
dest_ptr += chunk_size;
offset += chunk_size;
}
// wait for all requests to finish
MPI_Waitall(num_chunks, reqs, MPI_STATUSES_IGNORE);
}

MPI_Waitall(nreqs, reqs, MPI_STATUSES_IGNORE);

DART_LOG_DEBUG("dart_get_blocking > finished");
return DART_OK;
Expand Down
2 changes: 2 additions & 0 deletions dart-impl/mpi/src/dart_initialization.c
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,8 @@ dart_ret_t dart_exit()

MPI_Comm_free(&dart_comm_world);

dart__mpi__datatype_fini();

if (_init_by_dart) {
DART_LOG_DEBUG("%2d: dart_exit: MPI_Finalize", unitid.id);
MPI_Finalize();
Expand Down

0 comments on commit 601cb27

Please sign in to comment.