Skip to content

Commit

Permalink
add pinned memory to host, add omp
Browse files Browse the repository at this point in the history
  • Loading branch information
tqchen committed Jan 18, 2015
1 parent 2fdf835 commit 28ce2b0
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 9 deletions.
4 changes: 2 additions & 2 deletions mshadow-ps/Makefile
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# set LD_LIBRARY_PATH
export CC = gcc
export CXX = clang++
export CXX = g++
export NVCC =nvcc
export CFLAGS = -Wall -O3 -msse3 -Wno-unknown-pragmas -funroll-loops -I../ -DMSHADOW_STAND_ALONE=1 -std=c++11
export CFLAGS = -Wall -O3 -msse3 -Wno-unknown-pragmas -funroll-loops -I../ -DMSHADOW_STAND_ALONE=1 -std=c++11 -fopenmp
export LDFLAGS= -lm -lpthread
export NVCCFLAGS = -O3 --use_fast_math -ccbin $(CXX)

Expand Down
59 changes: 52 additions & 7 deletions mshadow-ps/ps_local-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@
#define MSHADOW_PS_LOCAL_INL_H_
#include <map>
#include <utility>
#if defined(_OPENMP)
#include <omp.h>
#ifdef _MSC_VER
typedef int ms_omp_uint;
#else
typedef unsigned ms_omp_uint;
#endif
#endif

#include "./thread.h"
#include "./thread_util.h"
#include "./ps.h"
Expand All @@ -27,6 +36,8 @@ class LocalServer : public IParamServer<xpu, DType> {
init_end = 0;
perdev_pull_thread = 1;
perdev_push_thread = 1;
bigarray_bound = 1000 * 1000;
nthread_reduction = 8;
}
// destructor
virtual ~LocalServer(void) {
Expand Down Expand Up @@ -72,6 +83,12 @@ class LocalServer : public IParamServer<xpu, DType> {
}
utils::Error("unknown push operation %s", val);
}
if (!strcmp(name, "reduce_thread")) {
nthread_reduction = atoi(val);
}
if (!strcmp(name, "bigarray_bound")) {
bigarray_bound = static_cast<size_t>(atol(val));
}
if (!strcmp(name, "pull_thread")) {
if (!strcmp(val, "ndev")) {
perdev_pull_thread = 1;
Expand Down Expand Up @@ -282,9 +299,7 @@ class LocalServer : public IParamServer<xpu, DType> {
}
switch (op) {
case kSum: {
for (index_t i = 1; i < data.size(0); ++i) {
data[0] += data[i];
}
this->ReduceSum(data);
this->PullReady(data[0], key);
return;
}
Expand Down Expand Up @@ -315,7 +330,7 @@ class LocalServer : public IParamServer<xpu, DType> {
/*! \brief data structure to hold temporal push result */
struct PushEntry {
// temporal space to hold input data
TensorContainer<cpu, 4, DType> data;
Tensor<cpu, 4, DType> data;
// indicator whether the certain devices is already copied in
std::vector<bool> copied;
// number of data copied in
Expand All @@ -324,13 +339,19 @@ class LocalServer : public IParamServer<xpu, DType> {
int copyin_version;
// constructor
PushEntry(void)
: data(false), copyin_version(0) {}
: copyin_version(0) {}
~PushEntry(void) {
if (data.dptr_ != NULL) {
mshadow::FreeHost<xpu>(&data);
}
}
// constructor
inline void Init(int ndevice, Shape<2> shape) {
data.Resize(Shape4(2, ndevice, shape[0], shape[1]));
data.shape_ = Shape4(2, ndevice, shape[0], shape[1]);
mshadow::AllocHost<xpu>(&data);
num_copied = 0;
copied.resize(ndevice, false);
}
}
};
// a record to remember things related to pull request
struct PullReqRecord {
Expand Down Expand Up @@ -407,10 +428,34 @@ class LocalServer : public IParamServer<xpu, DType> {
utils::ConditionVariable wait_cond;
//---------configurations of server-------
int init_end;
// number of reduction thread
int nthread_reduction;
// the threshold for big array
size_t bigarray_bound;
// whether use pull thread per device
int perdev_pull_thread;
// whether use push thread per device
int perdev_push_thread;
// perform sum reduction
inline void ReduceSum(Tensor<cpu, 3, DType> data) {
#if defined(_OPENMP)
if (data[0].MSize() >= bigarray_bound &&
nthread_reduction != 0) {
ms_omp_uint ntask = static_cast<ms_omp_uint>(data.size(1));
#pragma omp parallel for schedule(static) num_threads(nthread_reduction)
for (ms_omp_uint j = 0; j < ntask; ++j) {
for (index_t i = 1; i < data.size(0); ++i) {
data[0][j] += data[i][j];
}
}
} else
#endif
{
for (index_t i = 1; i < data.size(0); ++i) {
data[0] += data[i];
}
}
}
// push handler
inline void PushProc(utils::ThreadPQueue<PullTask> *queue) {
while (!destroy_signal) {
Expand Down
44 changes: 44 additions & 0 deletions mshadow/tensor_cpu-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,50 @@ inline void DeleteStream<cpu>(Stream<cpu> *stream) {
delete stream;
}

template<typename xpu>
inline void *AllocHost_(size_t size);
template<typename xpu>
inline void FreeHost_(void * dptr);

#ifdef __CUDACC__
template<>
inline void *AllocHost_<gpu>(size_t size) {
void *dptr;
utils::Check(cudaMallocHost(&dptr, size,
cudaHostAllocPortable) == cudaSuccess,
"AllocHost");
return dptr;
}
template<>
inline void FreeHost_<gpu>(void *dptr) {
cudaFreeHost(dptr);
}
#endif

template<>
inline void *AllocHost_<cpu>(size_t size) {
size_t pitch;
return sse2::AlignedMallocPitch(&pitch, size, 1);
}
template<>
inline void FreeHost_<cpu>(void *dptr) {
sse2::AlignedFree(dptr);
}

template<typename xpu, int dim, typename DType>
inline void AllocHost(Tensor<cpu, dim, DType> *obj) {
obj->stride_ = obj->size(dim - 1);
utils::Assert(obj->CheckContiguous(), "AllocHost");
void *dptr = AllocHost_<xpu>(obj->MSize() * sizeof(DType));
obj->dptr_ = reinterpret_cast<DType*>(dptr);
}
template<typename xpu, int dim, typename DType>
inline void FreeHost(Tensor<cpu, dim, DType> *obj) {
utils::Assert(obj->dptr_ != NULL, "FreeHost:: double free");
FreeHost_<xpu>(obj->dptr_);
obj->dptr_ = NULL;
}

template<int dim, typename DType>
inline void AllocSpace(Tensor<cpu, dim, DType> *obj, bool pad) {
size_t pitch;
Expand Down

0 comments on commit 28ce2b0

Please sign in to comment.