Skip to content

Commit

Permalink
set cpu affinity to numa in MULTI situation (openvinotoolkit#13407)
Browse files Browse the repository at this point in the history
* change gpunum to 3

* hold threads for GPU for MULTI:GPU,CPU

* need to first check if there is a CPU in the device list

* use getNumberOfCPUCores to get CPU cores

* load GPU first

* assign the correct value to multiSContext->_devicePriorities

* load GPU first and load CPU last and set numa for CPU

* MULTI set CPU affinity to “NUMA” during load network

* Load the CPU last while maintaining the original device priority

* not using vector for CPU

* There is no user setting affinity in MULTI, and NUMA is set for the CPU

* pass key ENABLE_HYPER_THREAD to CPU plugin and merge xiaoxia PR

* set ENABLE_HYPER_THREAD to NO

* modify log

* Modify the code according to xiaoxia and wanglei comments

* Modify the code according to bell comments
  • Loading branch information
wgzintel authored Nov 19, 2022
1 parent 384a961 commit 42b816a
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ DECLARE_CONFIG_KEY(FORCE_DISABLE_CACHE);
*/
DECLARE_CONFIG_KEY(CONFIG_DEVICE_ID);

/**
* @brief enable hyper thread
*/
DECLARE_CONFIG_KEY(ENABLE_HYPER_THREAD);
} // namespace PluginConfigInternalParams

} // namespace InferenceEngine
Expand Down
4 changes: 3 additions & 1 deletion src/inference/dev_api/threading/ie_istreams_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ class INFERENCE_ENGINE_API_CLASS(IStreamsExecutor) : public ITaskExecutor {
* @return configured values
*/
static Config MakeDefaultMultiThreaded(const Config& initial, const bool fp_intesive = true);
static int GetDefaultNumStreams(); // no network specifics considered (only CPU's caps);
static int GetDefaultNumStreams(
const bool enable_hyper_thread = true); // no network specifics considered (only CPU's caps);
static int GetHybridNumStreams(std::map<std::string, std::string>& config, const int stream_mode);
static void UpdateHybridCustomThreads(Config& config);

Expand All @@ -102,6 +103,7 @@ class INFERENCE_ENGINE_API_CLASS(IStreamsExecutor) : public ITaskExecutor {
int _threads_per_stream_big = 0; //!< Threads per stream in big cores
int _threads_per_stream_small = 0; //!< Threads per stream in small cores
int _small_core_offset = 0; //!< Calculate small core start offset when binding cpu cores
bool _enable_hyper_thread = true; //!< enable hyper thread
enum StreamMode { DEFAULT, AGGRESSIVE, LESSAGGRESSIVE };
enum PreferredCoreType {
ANY,
Expand Down
41 changes: 27 additions & 14 deletions src/inference/src/threading/ie_istreams_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,17 @@ std::vector<std::string> IStreamsExecutor::Config::SupportedKeys() const {
CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_BIG),
CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_SMALL),
CONFIG_KEY_INTERNAL(SMALL_CORE_OFFSET),
CONFIG_KEY_INTERNAL(ENABLE_HYPER_THREAD),
ov::num_streams.name(),
ov::inference_num_threads.name(),
ov::affinity.name(),
};
}
int IStreamsExecutor::Config::GetDefaultNumStreams() {
int IStreamsExecutor::Config::GetDefaultNumStreams(const bool enable_hyper_thread) {
const int sockets = static_cast<int>(getAvailableNUMANodes().size());
// bare minimum of streams (that evenly divides available number of core)
const int num_cores = sockets == 1 ? parallel_get_max_threads() : getNumberOfCPUCores();
const int num_cores = sockets == 1 ? (enable_hyper_thread ? parallel_get_max_threads() : getNumberOfCPUCores())
: getNumberOfCPUCores();
if (0 == num_cores % 4)
return std::max(4, num_cores / 4);
else if (0 == num_cores % 5)
Expand Down Expand Up @@ -280,6 +282,14 @@ void IStreamsExecutor::Config::SetConfig(const std::string& key, const std::stri
<< ". Expected only non negative numbers";
}
_small_core_offset = val_i;
} else if (key == CONFIG_KEY_INTERNAL(ENABLE_HYPER_THREAD)) {
if (value == CONFIG_VALUE(YES)) {
_enable_hyper_thread = true;
} else if (value == CONFIG_VALUE(NO)) {
_enable_hyper_thread = false;
} else {
OPENVINO_UNREACHABLE("Unsupported enable hyper thread type");
}
} else {
IE_THROW() << "Wrong value for property key " << key;
}
Expand Down Expand Up @@ -328,6 +338,8 @@ Parameter IStreamsExecutor::Config::GetConfig(const std::string& key) const {
return {std::to_string(_threads_per_stream_small)};
} else if (key == CONFIG_KEY_INTERNAL(SMALL_CORE_OFFSET)) {
return {std::to_string(_small_core_offset)};
} else if (key == CONFIG_KEY_INTERNAL(ENABLE_HYPER_THREAD)) {
return {_enable_hyper_thread ? CONFIG_VALUE(YES) : CONFIG_VALUE(NO)};
} else {
IE_THROW() << "Wrong value for property key " << key;
}
Expand Down Expand Up @@ -445,18 +457,19 @@ IStreamsExecutor::Config IStreamsExecutor::Config::MakeDefaultMultiThreaded(cons
<< streamExecutorConfig._threads_per_stream_small << ")";
}
#endif
const auto hwCores = !bLatencyCase && numaNodesNum == 1
// throughput case on a single-NUMA node machine uses all available cores
? parallel_get_max_threads()
// in the rest of cases:
// multi-node machine
// or
// latency case, single-node yet hybrid case that uses
// all core types
// or
// big-cores only, but the #cores is "enough" (pls see the logic above)
// it is usually beneficial not to use the hyper-threading (which is default)
: num_cores_default;
const auto hwCores =
!bLatencyCase && numaNodesNum == 1
// throughput case on a single-NUMA node machine uses all available cores
? (streamExecutorConfig._enable_hyper_thread ? parallel_get_max_threads() : num_cores_default)
// in the rest of cases:
// multi-node machine
// or
// latency case, single-node yet hybrid case that uses
// all core types
// or
// big-cores only, but the #cores is "enough" (pls see the logic above)
// it is usually beneficial not to use the hyper-threading (which is default)
: num_cores_default;
const auto threads =
streamExecutorConfig._threads ? streamExecutorConfig._threads : (envThreads ? envThreads : hwCores);
streamExecutorConfig._threadsPerStream =
Expand Down
33 changes: 2 additions & 31 deletions src/plugins/auto/auto_schedule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,37 +122,8 @@ void AutoSchedule::init(const ScheduleContext::Ptr& sContext) {
_loadContext[ACTUALDEVICE].metaDevices = _autoSContext->_devicePriorities;
if (isCumulative) {
std::list<DeviceInformation> validDevices =
_autoSContext->_plugin->GetValidDevice(_autoSContext->_devicePriorities, _loadContext[ACTUALDEVICE].networkPrecision);

// check if device priority is enabled
bool enableDevicePriority =
std::find_if(std::begin(validDevices), std::end(validDevices), [](DeviceInformation& di) {
return di.devicePriority > 0;
}) != std::end(validDevices);

// for the case of -d "AUTO" or "AUTO: -xxx"
if (!enableDevicePriority) {
std::list<DeviceInformation>::iterator itCPUDevice;
int GPUNums = 0, CPUNums = 0;
for (auto it = validDevices.begin(); it != validDevices.end(); it++) {
if (it->deviceName.find("GPU") != std::string::npos) {
GPUNums++;
}

if (it->deviceName.find("CPU") == 0) {
CPUNums++;
itCPUDevice = it;
}
}

// remove CPU from default candidate list for Cumulative Throughput mode
if (GPUNums >= 3 && CPUNums > 0 && !_autoSContext->_bindBuffer) {
validDevices.erase(itCPUDevice);
LOG_INFO_TAG("GPUNums:%d, remove CPU from default candidate list for "
"CUMULATIVE_THROUGHPUT",
GPUNums);
}
}
_autoSContext->_plugin->GetValidDevice(_autoSContext->_devicePriorities,
_loadContext[ACTUALDEVICE].networkPrecision);

std::string deviceName = "MULTI:";
for (auto& device : validDevices) {
Expand Down
118 changes: 89 additions & 29 deletions src/plugins/auto/plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -455,40 +455,100 @@ IExecutableNetworkInternal::Ptr MultiDeviceInferencePlugin::LoadNetworkImpl(cons
std::mutex load_mutex;
std::vector<Task> loads;
std::once_flag readNetworkFlag;
for (auto& p : metaDevices) {
loads.push_back([&]() {
auto tmpiter = fullConfig.find(CONFIG_KEY(ALLOW_AUTO_BATCHING));
if (tmpiter != fullConfig.end()) {
if (tmpiter->second == PluginConfigParams::NO)
multiSContext->_batchingDisabled = true;
p.config.insert({tmpiter->first, tmpiter->second});

auto loadInferEngTask = [&](DeviceInformation& p) {
auto tmpiter = fullConfig.find(CONFIG_KEY(ALLOW_AUTO_BATCHING));
if (tmpiter != fullConfig.end()) {
if (tmpiter->second == PluginConfigParams::NO) {
LOG_INFO_TAG("set %s=%s", tmpiter->first.c_str(), tmpiter->second.c_str());
multiSContext->_batchingDisabled = true;
}
insertPropToConfig(CONFIG_KEY(AUTO_BATCH_TIMEOUT), p.deviceName, p.config);
insertPropToConfig(CONFIG_KEY(CACHE_DIR), p.deviceName, p.config);
const auto& deviceName = p.deviceName;
const auto& deviceConfig = p.config;
SoExecutableNetworkInternal exec_net;
if (modelPath.empty()) {
exec_net = GetCore()->LoadNetwork(network, deviceName, deviceConfig);
} else if (GetCore()->DeviceSupportsImportExport(deviceName)) {
exec_net = GetCore()->LoadNetwork(modelPath, deviceName, deviceConfig);
} else {
std::call_once(readNetworkFlag, [&]() {
network = GetCore()->ReadNetwork(modelPath, std::string());
});
exec_net = GetCore()->LoadNetwork(network, deviceName, deviceConfig);
p.config.insert({tmpiter->first, tmpiter->second});
}
insertPropToConfig(CONFIG_KEY(AUTO_BATCH_TIMEOUT), p.deviceName, p.config);
insertPropToConfig(CONFIG_KEY(CACHE_DIR), p.deviceName, p.config);
const auto& deviceName = p.deviceName;
const auto& deviceConfig = p.config;
SoExecutableNetworkInternal exec_net;
LOG_DEBUG_TAG("load network to device:%s", deviceName.c_str());
if (modelPath.empty()) {
exec_net = GetCore()->LoadNetwork(network, deviceName, deviceConfig);
} else if (GetCore()->DeviceSupportsImportExport(deviceName)) {
exec_net = GetCore()->LoadNetwork(modelPath, deviceName, deviceConfig);
} else {
std::call_once(readNetworkFlag, [&]() {
network = GetCore()->ReadNetwork(modelPath, std::string());
});
exec_net = GetCore()->LoadNetwork(network, deviceName, deviceConfig);
}

try {
std::string sStreamNums = "";
std::string sThreadNums = "";
if (deviceName.find("CPU") != std::string::npos) {
sStreamNums = exec_net->GetMetric(ov::num_streams.name()).as<std::string>();
sThreadNums = exec_net->GetMetric(ov::inference_num_threads.name()).as<std::string>();
} else if (deviceName.find("GPU") != std::string::npos) {
sStreamNums = exec_net->GetConfig(PluginConfigParams::KEY_GPU_THROUGHPUT_STREAMS).as<std::string>();
sThreadNums = exec_net->GetConfig(GPUConfigParams::KEY_GPU_MAX_NUM_THREADS).as<std::string>();
}

// print CPU or GPU streams num and threads num
if (!sStreamNums.empty() && !sThreadNums.empty()) {
LOG_INFO_TAG("after load network, %s streamNums:%s, %s threadNums:%s",
deviceName.c_str(),
sStreamNums.c_str(),
deviceName.c_str(),
sThreadNums.c_str());
}
std::unique_lock<std::mutex> lock{load_mutex};
executableNetworkPerDevice.insert({deviceName, exec_net});
multiNetworkConfig.insert(deviceConfig.begin(), deviceConfig.end());
} catch (...) {
LOG_DEBUG_TAG("deviceName:%s cannot get streamNums and threadNums from exec_net", deviceName.c_str());
}
std::unique_lock<std::mutex> lock{load_mutex};
executableNetworkPerDevice.insert({deviceName, exec_net});
multiNetworkConfig.insert(deviceConfig.begin(), deviceConfig.end());
};

// Check if CPU is in device list
auto iterCPU = std::find_if(metaDevices.begin(), metaDevices.end(), [&](DeviceInformation& d) {
return d.deviceName.find("CPU") != std::string::npos;
});
// Load devices other than CPU first
for (auto& p : metaDevices) {
if (iterCPU != metaDevices.end() && p.deviceName == iterCPU->deviceName) {
continue;
}
loads.push_back([&]() {
loadInferEngTask(p);
});
}

auto executor = executorManager()->getIdleCPUStreamsExecutor(
IStreamsExecutor::Config{"MultiDeviceAsyncLoad",
static_cast<int>(std::thread::hardware_concurrency()) /* max possible #streams*/,
0 /*default threads per stream, workaround for ticket 62376*/,
IStreamsExecutor::ThreadBindingType::NONE});
executor->runAndWait(loads);
IStreamsExecutor::Config{"MultiDeviceAsyncLoad",
static_cast<int>(std::thread::hardware_concurrency()) /* max possible #streams*/,
0 /*default threads per stream, workaround for ticket 62376*/,
IStreamsExecutor::ThreadBindingType::NONE});
if (loads.size() > 0) {
// Wait for the device to load the network
executor->runAndWait(loads);
loads.clear();
}

// Finally load the CPU
if (iterCPU != metaDevices.end()) {
if (!executableNetworkPerDevice.empty() && iterCPU->config.find(ov::affinity.name()) == iterCPU->config.end()) {
LOG_DEBUG_TAG("set affinity to NUMA and disable hyper thread for CPU");
// If the other devices load successfully and no user set affinity then set NUMA to CPU
iterCPU->config.insert({ov::affinity.name(), ov::affinity(ov::Affinity::NUMA).second.as<std::string>()});
iterCPU->config.insert({CONFIG_KEY_INTERNAL(ENABLE_HYPER_THREAD), CONFIG_VALUE(NO)});
}
loads.push_back([&]() {
loadInferEngTask(*iterCPU);
});
// Wait for CPU to load the network
executor->runAndWait(loads);
}

if (executableNetworkPerDevice.empty())
IE_THROW(NotFound) << "Failed to load network to any device "
<< "that the " << GetName() << " device is initialized to work with";
Expand Down
2 changes: 1 addition & 1 deletion src/plugins/intel_cpu/src/plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ void Engine::ApplyPerformanceHints(std::map<std::string, std::string> &config, c
engConfig.streamExecutorConfig._threadBindingType ==
InferenceEngine::IStreamsExecutor::ThreadBindingType::HYBRID_AWARE
? IStreamsExecutor::Config::GetHybridNumStreams(config, IStreamsExecutor::Config::StreamMode::DEFAULT)
: IStreamsExecutor::Config::GetDefaultNumStreams();
: IStreamsExecutor::Config::GetDefaultNumStreams(engConfig.streamExecutorConfig._enable_hyper_thread);
int num_streams = default_num_streams;
if (networkToleranceForLowCache.max_mem_tolerance == ov::MemBandwidthPressure::UNKNOWN) {
if ((networkToleranceForLowCache.ratio_compute_convs == ov::MemBandwidthPressure::ALL)
Expand Down

0 comments on commit 42b816a

Please sign in to comment.