Skip to content

Commit

Permalink
Multi-GPU
Browse files Browse the repository at this point in the history
- Parallelize batches among GPUs and tree-reduce the gradients
- The effective batch size scales with the number of devices
- Batch size is multiplied by the number of devices
- Split batches between GPUs, and tree-reduce the gradients
- Detect machine topology (twin-GPU boards, P2P connectivity)
- Track device in syncedmem (thanks @thatguymike)
- Insert a callback in the solver for minimal code change
- Accept list for gpu flag of caffe tool, e.g. '-gpu 0,1' or '-gpu all'.
  Run on default GPU if no ID given.
- Add multi-GPU solver test
- Deterministic architecture for reproducible runs
cypof authored and shelhamer committed Aug 9, 2015

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent d2f0457 commit e5575cf
Showing 18 changed files with 949 additions and 141 deletions.
1 change: 1 addition & 0 deletions include/caffe/caffe.hpp
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@
#include "caffe/layer.hpp"
#include "caffe/layer_factory.hpp"
#include "caffe/net.hpp"
#include "caffe/parallel.hpp"
#include "caffe/proto/caffe.pb.h"
#include "caffe/solver.hpp"
#include "caffe/util/benchmark.hpp"
7 changes: 7 additions & 0 deletions include/caffe/common.hpp
Original file line number Diff line number Diff line change
@@ -149,6 +149,11 @@ class Caffe {
static void SetDevice(const int device_id);
// Prints the current GPU status.
static void DeviceQuery();
// Parallel training info
inline static int solver_count() { return Get().solver_count_; }
inline static void set_solver_count(int val) { Get().solver_count_ = val; }
inline static bool root_solver() { return Get().root_solver_; }
inline static void set_root_solver(bool val) { Get().root_solver_ = val; }

protected:
#ifndef CPU_ONLY
@@ -158,6 +163,8 @@ class Caffe {
shared_ptr<RNG> random_generator_;

Brew mode_;
int solver_count_;
bool root_solver_;

private:
// The private constructor to avoid duplicate instantiation.
3 changes: 2 additions & 1 deletion include/caffe/internal_thread.hpp
Original file line number Diff line number Diff line change
@@ -42,7 +42,8 @@ class InternalThread {
bool must_stop();

private:
void entry(int device, Caffe::Brew mode, int rand_seed);
void entry(int device, Caffe::Brew mode, int rand_seed, int solver_count,
bool root_solver);

shared_ptr<boost::thread> thread_;
};
4 changes: 3 additions & 1 deletion include/caffe/layer_factory.hpp
Original file line number Diff line number Diff line change
@@ -71,7 +71,9 @@ class LayerRegistry {

// Get a layer using a LayerParameter.
static shared_ptr<Layer<Dtype> > CreateLayer(const LayerParameter& param) {
LOG(INFO) << "Creating layer " << param.name();
if (Caffe::root_solver()) {
LOG(INFO) << "Creating layer " << param.name();
}
const string& type = param.type();
CreatorRegistry& registry = Registry();
CHECK_EQ(registry.count(type), 1) << "Unknown layer type: " << type
118 changes: 118 additions & 0 deletions include/caffe/parallel.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
#ifndef CAFFE_PARALLEL_HPP_
#define CAFFE_PARALLEL_HPP_

#include <boost/date_time/posix_time/posix_time.hpp>

#include <vector>

#include "caffe/blob.hpp"
#include "caffe/common.hpp"
#include "caffe/internal_thread.hpp"
#include "caffe/layer.hpp"
#include "caffe/proto/caffe.pb.h"
#include "caffe/solver.hpp"
#include "caffe/syncedmem.hpp"
#include "caffe/util/blocking_queue.hpp"

namespace caffe {

// Represents a net parameters. Once a net is created, its parameter buffers can
// be replaced by ones from Params, to allow parallelization. Params ensures
// parameters are allocated in one consecutive array.
template<typename Dtype>
class Params {
public:
explicit Params(shared_ptr<Solver<Dtype> > root_solver);
virtual ~Params() {
}

inline size_t size() const {
return size_;
}
inline Dtype* data() const {
return data_;
}
inline Dtype* diff() const {
return diff_;
}

protected:
const size_t size_; // Size of buffers
Dtype* data_; // Network parameters
Dtype* diff_; // Gradient

DISABLE_COPY_AND_ASSIGN(Params);
};

// Params stored in GPU memory.
template<typename Dtype>
class GPUParams : public Params<Dtype> {
public:
GPUParams(shared_ptr<Solver<Dtype> > root_solver, int device);
virtual ~GPUParams();

void configure(Solver<Dtype>* solver) const;

protected:
using Params<Dtype>::size_;
using Params<Dtype>::data_;
using Params<Dtype>::diff_;
};

class DevicePair {
public:
DevicePair(int parent, int device)
: parent_(parent),
device_(device) {
}
inline int parent() {
return parent_;
}
inline int device() {
return device_;
}

// Group GPUs in pairs, by proximity depending on machine's topology
static void compute(const vector<int> devices, vector<DevicePair>* pairs);

protected:
int parent_;
int device_;
};

// Synchronous data parallelism using map-reduce between local GPUs.
template<typename Dtype>
class P2PSync : public GPUParams<Dtype>, public Solver<Dtype>::Callback,
public InternalThread {
public:
explicit P2PSync(shared_ptr<Solver<Dtype> > root_solver,
P2PSync<Dtype>* parent, const SolverParameter& param);
virtual ~P2PSync();

inline const shared_ptr<Solver<Dtype> >& solver() const {
return solver_;
}

void run(const vector<int>& gpus);

protected:
void on_start();
void on_gradients_ready();

void InternalThreadEntry();

P2PSync<Dtype>* parent_;
vector<P2PSync<Dtype>*> children_;
BlockingQueue<P2PSync<Dtype>*> queue_;
const int initial_iter_;
Dtype* parent_grads_;
shared_ptr<Solver<Dtype> > solver_;

using Params<Dtype>::size_;
using Params<Dtype>::data_;
using Params<Dtype>::diff_;
};

} // namespace caffe

#endif
38 changes: 38 additions & 0 deletions include/caffe/solver.hpp
Original file line number Diff line number Diff line change
@@ -32,12 +32,27 @@ class Solver {
// methods to restore the state from the appropriate snapshot type.
void Restore(const char* resume_file);
virtual ~Solver() {}
inline const SolverParameter& param() const { return param_; }
inline shared_ptr<Net<Dtype> > net() { return net_; }
inline const vector<shared_ptr<Net<Dtype> > >& test_nets() {
return test_nets_;
}
int iter() { return iter_; }

// Invoked at specific points during an iteration
class Callback {
protected:
virtual void on_start() = 0;
virtual void on_gradients_ready() = 0;

template <typename T>
friend class Solver;
};
const vector<Callback*>& callbacks() const { return callbacks_; }
void add_callback(Callback* value) {
callbacks_.push_back(value);
}

protected:
// Make and apply the update value for the current iteration.
virtual void ApplyUpdate() = 0;
@@ -62,10 +77,33 @@ class Solver {
int current_step_;
shared_ptr<Net<Dtype> > net_;
vector<shared_ptr<Net<Dtype> > > test_nets_;
vector<Callback*> callbacks_;

DISABLE_COPY_AND_ASSIGN(Solver);
};

/**
* @brief Solver that only computes gradients, used as worker
* for multi-GPU training.
*/
template <typename Dtype>
class WorkerSolver : public Solver<Dtype> {
public:
explicit WorkerSolver(const SolverParameter& param)
: Solver<Dtype>(param) {}

protected:
void ApplyUpdate() {}
void SnapshotSolverState(const string& model_filename) {
LOG(FATAL) << "Should not be called on worker solver.";
}
void RestoreSolverStateFromBinaryProto(const string& state_file) {
LOG(FATAL) << "Should not be called on worker solver.";
}
void RestoreSolverStateFromHDF5(const string& state_file) {
LOG(FATAL) << "Should not be called on worker solver.";
}
};

/**
* @brief Optimizes the parameters of a Net using
7 changes: 5 additions & 2 deletions include/caffe/syncedmem.hpp
Original file line number Diff line number Diff line change
@@ -45,14 +45,15 @@ class SyncedMemory {
public:
SyncedMemory()
: cpu_ptr_(NULL), gpu_ptr_(NULL), size_(0), head_(UNINITIALIZED),
own_cpu_data_(false) {}
own_cpu_data_(false), own_gpu_data_(false), gpu_device_(-1) {}
explicit SyncedMemory(size_t size)
: cpu_ptr_(NULL), gpu_ptr_(NULL), size_(size), head_(UNINITIALIZED),
own_cpu_data_(false) {}
own_cpu_data_(false), own_gpu_data_(false), gpu_device_(-1) {}
~SyncedMemory();
const void* cpu_data();
void set_cpu_data(void* data);
const void* gpu_data();
void set_gpu_data(void* data);
void* mutable_cpu_data();
void* mutable_gpu_data();
enum SyncedHead { UNINITIALIZED, HEAD_AT_CPU, HEAD_AT_GPU, SYNCED };
@@ -71,6 +72,8 @@ class SyncedMemory {
size_t size_;
SyncedHead head_;
bool own_cpu_data_;
bool own_gpu_data_;
int gpu_device_;

DISABLE_COPY_AND_ASSIGN(SyncedMemory);
}; // class SyncedMemory
5 changes: 3 additions & 2 deletions src/caffe/common.cpp
Original file line number Diff line number Diff line change
@@ -51,7 +51,8 @@ void GlobalInit(int* pargc, char*** pargv) {
#ifdef CPU_ONLY // CPU-only Caffe.

Caffe::Caffe()
: random_generator_(), mode_(Caffe::CPU) { }
: random_generator_(), mode_(Caffe::CPU),
solver_count_(1), root_solver_(true) { }

Caffe::~Caffe() { }

@@ -95,7 +96,7 @@ void* Caffe::RNG::generator() {

Caffe::Caffe()
: cublas_handle_(NULL), curand_generator_(NULL), random_generator_(),
mode_(Caffe::CPU) {
mode_(Caffe::CPU), solver_count_(1), root_solver_(true) {
// Try to create a cublas handler, and report an error if failed (but we will
// keep the program running as one might just want to run CPU code).
if (cublasCreate(&cublas_handle_) != CUBLAS_STATUS_SUCCESS) {
4 changes: 1 addition & 3 deletions src/caffe/data_reader.cpp
Original file line number Diff line number Diff line change
@@ -76,9 +76,7 @@ void DataReader::Body::InternalThreadEntry() {
shared_ptr<db::Cursor> cursor(db->NewCursor());
vector<shared_ptr<QueuePair> > qps;
try {
// int solver_count = param_.phase() == TRAIN ? Caffe::solver_count() : 1;
// TODO single solver until multi-gpu merge
int solver_count = 1;
int solver_count = param_.phase() == TRAIN ? Caffe::solver_count() : 1;

// To ensure deterministic runs, only start running once all solvers
// are ready. But solvers need to peek on one item during initialization,
4 changes: 3 additions & 1 deletion src/caffe/data_transformer.cpp
Original file line number Diff line number Diff line change
@@ -19,7 +19,9 @@ DataTransformer<Dtype>::DataTransformer(const TransformationParameter& param,
CHECK_EQ(param_.mean_value_size(), 0) <<
"Cannot specify mean_file and mean_value at the same time";
const string& mean_file = param.mean_file();
LOG(INFO) << "Loading mean file from: " << mean_file;
if (Caffe::root_solver()) {
LOG(INFO) << "Loading mean file from: " << mean_file;
}
BlobProto blob_proto;
ReadProtoFromBinaryFileOrDie(mean_file.c_str(), &blob_proto);
data_mean_.FromProto(blob_proto);
9 changes: 7 additions & 2 deletions src/caffe/internal_thread.cpp
Original file line number Diff line number Diff line change
@@ -27,21 +27,26 @@ void InternalThread::StartInternalThread() {
#endif
Caffe::Brew mode = Caffe::mode();
int rand_seed = caffe_rng_rand();
int solver_count = Caffe::solver_count();
bool root_solver = Caffe::root_solver();

try {
thread_.reset(new boost::thread(&InternalThread::entry, this, device, mode,
rand_seed));
rand_seed, solver_count, root_solver));
} catch (std::exception& e) {
LOG(FATAL) << "Thread exception: " << e.what();
}
}

void InternalThread::entry(int device, Caffe::Brew mode, int rand_seed) {
void InternalThread::entry(int device, Caffe::Brew mode, int rand_seed,
int solver_count, bool root_solver) {
#ifndef CPU_ONLY
CUDA_CHECK(cudaSetDevice(device));
#endif
Caffe::set_mode(mode);
Caffe::set_random_seed(rand_seed);
Caffe::set_solver_count(solver_count);
Caffe::set_root_solver(root_solver);

InternalThreadEntry();
}
Loading

0 comments on commit e5575cf

Please sign in to comment.