介紹
在caffe2的core code里面如果查找與context相關的內容則無外乎以下幾個文件店展。接下來的一個章節(jié)里腺律,我們將會對它們的主要內容逐個進行分析饱狂。
$ ls context*
context_base.cc context.cc context_gpu.h context.h
context_base.h context_gpu.cu context_gpu_test.cc context_test.cc
Context本身是對物理設備的一種抽象,可以將之視為類似于Tensorflow中Device或Caffe中Engine的東東轨域。我們使用Tensor來表示數(shù)據(jù)存儲單元或使用Operator來表示數(shù)據(jù)單元之間的基本運算都需要落實在具體物理計算設備上來執(zhí)行。一般說來當今最常用的兩種用于AI上的計算設備還是CPU和GPU。所以默認地caffe2也同其它大多數(shù)Framework一樣提供了這兩種計算設備的抽象,分別為CPUContext/CPUStaticContext與CUDAContext/CUDAStaticContext赊窥。而這兩個設備的抽象接口則被實現(xiàn)在 兩個虛基類BaseStaticContext/BaseContext里面。這是個典型的面向對象的設計方式狸页。
BaseContext和BaseStaticContext
BaseStaticContext
BaseStaticContext里面主要包含一些基本的內存分配操作(如New)與Device type查詢或設置操作锨能,此外還可以由它來生成完成大部分更高層次功能的BaseContext子類對象并返回。
以下為它的基本接口肴捉。
class CAFFE2_API BaseStaticContext {
public:
virtual ~BaseStaticContext() noexcept {}
virtual std::pair<void*, MemoryDeleter> New(size_t nbytes) const = 0;
virtual std::unique_ptr<BaseContext> CreateContext() = 0;
virtual std::unique_ptr<BaseContext> CreateContext(const DeviceOption&) = 0;
virtual DeviceType GetDeviceType() = 0;
/*
* @brief: Sets the DeviceOption for argument `device` based on the
* current context and the a data pointer
*/
virtual void ExtractDeviceOption(DeviceOption* device, const void* /*data*/) {
device->set_device_type(GetDeviceType());
}
};
BaseContext
BaseContext里面包含了所有一個設備為完成caffe2所支持的operator操作需要提供的基本功能腹侣。它是一個大的context功能虛擬接口的集合。
它里面的主要函數(shù)功能見名即知其意齿穗,簡要介紹如下:
SwitchToDevice: 這個主要是針對GPU設備而設計的傲隶,習慣了CUDA編程的人都明白,我們在開始一個GPU CUDA操作前窃页,總是需要先將當前的環(huán)境切換到某個GPU上跺株,并拿到相應的cudaStream才能接著向下進行;
WaitEvent/Record: 與async執(zhí)行op操作有關脖卖,class Event在async執(zhí)行中發(fā)揮著較大的作用乒省,這兩個函數(shù)一般提供用來記錄此Context下正被等待或已經(jīng)發(fā)生了的events;
FinishDeviceComputation: 主要用于GPU等可async執(zhí)行的Device context里面畦木,它相當于一個在多個CUDA stream上執(zhí)行流的sync過程中袖扛,類似于多線程編程里面的一個Barrier點;
Copy_xyx: 這些Copy函數(shù)主要是一些utilities函數(shù)十籍,用來完成Device to Device/Device to CPU/CPU to Device等各種類型之間的數(shù)據(jù)拷貝蛆封;
此外我們也還要有一個static的BaseStaticContext數(shù)組用來指向各種類型的device context的StaticContext以方便地在這些設備上進行分配內存或操作設備類型等;
它的friend StaticContextFunctionRegisterer則主要用來靜態(tài)地(即在編譯時)將已知的StaticeDevice對象注冊上勾栗,進而方便接下來使用惨篱。
class CAFFE2_API BaseContext {
public:
virtual ~BaseContext() noexcept {}
virtual BaseStaticContext* GetStaticContext() const = 0;
/* Sorry for the naming, will get rid of this in future diff */
virtual DeviceType GetDevicetype() const = 0;
virtual void SwitchToDevice(int /*stream_id*/) = 0;
inline void SwitchToDevice() {
SwitchToDevice(0);
}
virtual void WaitEvent(const Event& ev) = 0;
virtual void Record(Event* ev, const char* err_msg = nullptr) const = 0;
virtual void FinishDeviceComputation() = 0;
// This used to be arbitrary cross-device copy, but it turns out everyone
// did direct CPU-X copy, so we just make three functions for it (to avoid
// double dispatch). This will get obsoleted by C10. where copies
// will be proper operators (and get to rely on multiple dispatch there.)
virtual void CopyBytesSameDevice(size_t nbytes, const void* src, void* dst) = 0;
virtual void CopyBytesFromCPU(size_t nbytes, const void* src, void* dst) = 0;
virtual void CopyBytesToCPU(size_t nbytes, const void* src, void* dst) = 0;
virtual void CopyBytesToDevice(size_t nbytes, const void* src, void* dst, DeviceType type);
inline void CopyItemsFromCPU(const TypeMeta& meta, size_t n, const void* src, void* dst);
inline void CopyItemsToCPU(const TypeMeta& meta, size_t n, const void* src, void* dst);
static BaseStaticContext* static_context_[COMPILE_TIME_MAX_DEVICE_TYPES];
template <int d>
friend struct StaticContextFunctionRegisterer;
};
CPUContext和CPUStaticContext
CPUStaticContext
以下為CPUStaticContext的具體實現(xiàn)。它的New主要通過使用借助CPUAllocator來完成围俘,并可通過打開FLAGS_caffe2_report_cpu_memory_usage這個開關來記錄砸讳、顯示CPU設備之上內存分配的情況琢融。
class CAFFE2_API CPUStaticContext : public BaseStaticContext {
public:
std::pair<void*, MemoryDeleter> New(size_t nbytes) const override {
auto data_and_deleter = GetCPUAllocator()->New(nbytes);
if (FLAGS_caffe2_report_cpu_memory_usage) {
reporter_.New(data_and_deleter.first, nbytes);
data_and_deleter.second = ReportAndDelete;
}
return data_and_deleter;
}
std::unique_ptr<BaseContext> CreateContext() override {
return caffe2::make_unique<CPUContext>();
}
std::unique_ptr<BaseContext> CreateContext(
const DeviceOption& option) override {
return caffe2::make_unique<CPUContext>(option);
}
DeviceType GetDeviceType() override {
return CPU;
}
protected:
static MemoryAllocationReporter reporter_;
private:
static void ReportAndDelete(void* ptr) {
reporter_.Delete(ptr);
GetCPUAllocator()->GetDeleter()(ptr);
}
};
CPUContext
以下為CPUContext里面的主要功能函數(shù)實現(xiàn)。
可以看出BaseContext里面提供的許多功能函數(shù)其實在CPUContext里是并不需要的像SwitchToDevice等簿寂;另外CPUContext默認并不支持async操作漾抬,因此那些使用它的Operators顯然也無法以async的方式執(zhí)行。它的設備間Copy的方式比較簡單陶耍,直接使用memcpy就好了奋蔚。另外每個類型Context還需要具有一個隨機數(shù)發(fā)生器她混,用于支持operator里面可能需要的random操作烈钞。
class CAFFE2_API CPUContext final : public BaseContext {
public:
typedef std::mt19937 rand_gen_type;
CPUContext() : random_seed_(RandomNumberSeed()) {}
explicit CPUContext(const DeviceOption& option)
: random_seed_(
option.has_random_seed() ? option.random_seed()
: RandomNumberSeed()) {
CAFFE_ENFORCE_EQ(option.device_type(), CPU);
}
~CPUContext() noexcept override {}
BaseStaticContext* GetStaticContext() const override {
return GetCPUStaticContext();
}
inline void SwitchToDevice(int /*stream_id*/) override {}
using BaseContext::SwitchToDevice;
inline void WaitEvent(const Event& ev) override {
ev.Wait(CPU, this);
}
inline void Record(Event* ev, const char* err_msg = nullptr) const override {
CAFFE_ENFORCE(ev, "Event must not be null.");
ev->Record(CPU, this, err_msg);
}
inline static std::pair<void*, MemoryDeleter> New(size_t nbytes) {
return StaticContext()->New(nbytes);
}
void CopyBytesSameDevice(size_t nbytes, const void* src, void* dst) override {
if (nbytes == 0) {
return;
}
CAFFE_ENFORCE(src);
CAFFE_ENFORCE(dst);
memcpy(dst, src, nbytes);
}
.............
.............
// By default CPU operators don't have async device parts
static bool HasAsyncPartDefault() {
return false;
}
static bool SupportsAsyncScheduling() {
return false;
}
// CPU streams are not implemented and are silently ignored by CPU ops,
// return true to signal executor to schedule a CPU op
static bool IsStreamFree(
const DeviceOption& /* option */,
int /* stream_id */) {
return true;
}
DeviceType GetDevicetype() const override {
return CPU;
}
protected:
// TODO(jiayq): instead of hard-coding a generator, make it more flexible.
int random_seed_{1701};
std::unique_ptr<rand_gen_type> random_generator_;
};
CUDAContext和CUDAStaticContext
caffe2中CUDAContext的設計要復雜的多。畢竟它是當下承擔operator執(zhí)行的主力坤按。沒辦法毯欣,眼下真正是大家都已偏見地視AI計算==GPU了。臭脓。(可實際上我們真的有其它許多選擇像優(yōu)化過了的CPU底層庫像MKLDNN酗钞,抽象封裝過了的FPGA庫或者甚至是Google提供的TPU等AI芯片)。来累。
首先CUDA memory的type有許多砚作。我們大致有以下三種選擇。
enum class CudaMemoryPoolType {
NONE = 0,
CUB = 1,
THC = 2,
};
然后因為GPU CUDA編程有著強大的并行編程能力嘹锁。一般我們需要指定某一GPU下的某一個cudaStream用于執(zhí)行具體的計算葫录,為此需要協(xié)調好Framework這邊多線程在執(zhí)行GPU上互相排斥執(zhí)行的多個cudaStream時的線性操作。為此caffe2里面在CUDAContext里面提供了一些thread local的變量用于表示這么一些全局的資源领猾。
class CAFFE2_API ThreadLocalCUDAObjects {
friend class CUDAContext;
private:
ThreadLocalCUDAObjects() {
for (int i = 0; i < CAFFE2_COMPILE_TIME_MAX_GPUS; ++i) {
cuda_streams_[i] = vector<cudaStream_t>();
cublas_handles_[i] = vector<cublasHandle_t>();
#ifdef CAFFE2_USE_CUDNN
cudnn_handles_[i] = vector<cudnnHandle_t>();
#endif // CAFFE2_USE_CUDNN
}
}
cudaStream_t GetStream(int gpu, int stream_id);
cublasHandle_t GetHandle(int gpu, int stream_id);
#ifdef CAFFE2_USE_CUDNN
cudnnHandle_t GetCudnnHandle(int gpu, int stream_id);
#endif
.............
...............
vector<cudaStream_t> cuda_streams_[CAFFE2_COMPILE_TIME_MAX_GPUS];
vector<cublasHandle_t> cublas_handles_[CAFFE2_COMPILE_TIME_MAX_GPUS];
#ifdef CAFFE2_USE_CUDNN
vector<cudnnHandle_t> cudnn_handles_[CAFFE2_COMPILE_TIME_MAX_GPUS];
#endif // CAFFE2_USE_CUDNN
};
最后caffe2通過下面這個static函數(shù)來lazily決定是否初始化眾多GPU環(huán)境需要的東東米同。。主要是確定GPUs之間的peer to peer連接是沒有問題的摔竿。
///////////////////////////////////////////////////////////////////////////////
// A wrapper to allow us to lazily initialize all cuda environments that Caffe
// uses. This gets done the first time a caffe2::CUDAContext::New() gets called
// which is probably the decisive indication that this caffe2 run is going to
// use GPUs. We avoid cuda initialization with core/init.h functionalities so
// that we have minimal resource impact in case we will need to run multiple
// caffe2 instances on a GPU machine.
///////////////////////////////////////////////////////////////////////////////
static void Caffe2InitializeCuda() {
// If the current run does not have any cuda devices, do nothing.
if (!HasCudaGPU()) {
VLOG(1) << "No cuda gpu present. Skipping.";
return;
}
// Check if the number of GPUs matches the expected compile-time max number
// of GPUs.
CAFFE_ENFORCE_LE(
NumCudaDevices(),
CAFFE2_COMPILE_TIME_MAX_GPUS,
"Number of CUDA devices on the machine is larger than the compiled "
"max number of gpus expected (",
CAFFE2_COMPILE_TIME_MAX_GPUS,
"). Increase that and recompile the caffe binary.");
for (int i = 0; i < NumCudaDevices(); ++i) {
DeviceGuard g(i);
// Enable peer access.
const int peer_group = i / CAFFE2_CUDA_MAX_PEER_SIZE;
const int peer_start = peer_group * CAFFE2_CUDA_MAX_PEER_SIZE;
const int peer_end = std::min(
NumCudaDevices(), (peer_group + 1) * CAFFE2_CUDA_MAX_PEER_SIZE);
VLOG(1) << "Enabling peer access within group #" << peer_group
<< ", from gpuid " << peer_start << " to " << peer_end - 1
<< ", for gpuid " << i << ".";
for (int j = peer_start; j < peer_end; ++j) {
if (i == j) continue;
int can_access;
CUDA_ENFORCE(cudaDeviceCanAccessPeer(&can_access, i, j));
if (can_access) {
VLOG(1) << "Enabling peer access from " << i << " to " << j;
// Note: just for future reference, the 0 here is not a gpu id, it is
// a reserved flag for cudaDeviceEnablePeerAccess that should always be
// zero currently.
CUDA_ENFORCE(cudaDeviceEnablePeerAccess(j, 0));
}
}
}
#ifdef CAFFE2_USE_CUDNN
// Check the versions of cuDNN that were compiled and linked with are compatible
CheckCuDNNVersions();
#endif // CAFFE2_USE_CUDNN
}
下面這個stuct Caffe2CudaInitializerHelper則是具體負責諸多的初始化函數(shù)發(fā)起面粮。而且只初始一次。
// Caffe2CudaInitializerHelper is a minimal struct whose sole purpose is to
// detect the first hint that this Caffe2 run is going to use GPU: either
// CUDAContext is initialized or CUDAContext::New is called. It then runs
// all the related cuda initialization functions.
namespace {
struct Caffe2CudaInitializerHelper {
Caffe2CudaInitializerHelper() {
// We cannot use bool because nvcc changes bool to __nv_bool which does
// not have a std::atomic instantiation.
static std::atomic<char> first_call(1);
if (first_call.fetch_and((char)0)) {
Caffe2InitializeCuda();
Caffe2SetCUDAMemoryPool();
Caffe2UsePinnedCPUAllocator();
}
}
};
} // namespace
CUDAStaticContext
下面我們主要看下CUDA下內存分配的方式继低,可以看出它會根據(jù)當下所用的memory pool類型提供不同類型的GPU memory分配熬苍。此外還會對這些分配的GPU內存分別以各種數(shù)據(jù)結構變量像g_size_map/g_cuda_device_affiliation來記錄以追蹤分配的內存的大小及所屬的GPU id。
當然在它的Delete函數(shù)里自然也會根據(jù)對應的memory type分別執(zhí)行不同類型的內存釋放操作袁翁,在此不表柴底。
class CAFFE2_API CUDAStaticContext final : public BaseStaticContext {
public:
std::pair<void*, MemoryDeleter> CUDAStaticContext::New(size_t nbytes) const {
// Lock the mutex
std::lock_guard<std::mutex> lock(CUDAContext::mutex());
// A one-time caffe2 cuda initializer.
static Caffe2CudaInitializerHelper g_cuda_initializer_;
void* ptr = nullptr;
if (FLAGS_caffe2_gpu_memory_tracking) {
TrackMemoryAlloc(nbytes);
}
switch (g_cuda_memory_pool_type) {
case CudaMemoryPoolType::NONE:
CUDA_ENFORCE(cudaMalloc(&ptr, nbytes));
if (FLAGS_caffe2_gpu_memory_tracking) {
g_size_map[ptr] = nbytes;
g_cuda_device_affiliation[ptr] = CaffeCudaGetDevice();
}
return {ptr, Delete};
case CudaMemoryPoolType::CUB:
CUDA_ENFORCE(g_cub_allocator->DeviceAllocate(&ptr, nbytes));
g_cuda_device_affiliation[ptr] = CaffeCudaGetDevice();
VLOG(2) << "CUB allocating pointer " << ptr << " on device "
<< CaffeCudaGetDevice();
if (FLAGS_caffe2_gpu_memory_tracking) {
g_size_map[ptr] = nbytes;
}
return {ptr, Delete};
case CudaMemoryPoolType::THC:
CUDA_ENFORCE(g_thc_allocator->Alloc(&ptr, nbytes, 0 /* stream */));
if (FLAGS_caffe2_gpu_memory_tracking) {
g_size_map[ptr] = nbytes;
g_cuda_device_affiliation[ptr] = CaffeCudaGetDevice();
}
return {ptr, Delete};
}
return {nullptr, Delete};
}
};
bPI memory時應當如何呢。
/**
* An allocator that does the CPU memory allocation with pinned memory.
*
* This is needed because if we want to do any asynchronous cuda memcpy,
* the underlying CPU memory also needs to be allocated into pinned memory
* space. As a result, whenever Caffe2 is built with GPU and there is
* GPU present during runtime, at global initialization time we will set
* the CPU memory allocator to allocate pinned memory.
*/
struct CAFFE2_API PinnedCPUAllocator final : CPUAllocator {
PinnedCPUAllocator() {}
~PinnedCPUAllocator() override {}
std::pair<void*, MemoryDeleter> New(size_t nbytes) override {
void* data;
std::lock_guard<std::mutex> lock(CUDAContext::mutex());
if (IsNUMAEnabled()) {
auto ptr_and_deleter = baseAllocator_.New(nbytes);
data = ptr_and_deleter.first;
CAFFE_ENFORCE(data);
CUDA_ENFORCE(cudaHostRegister(data, nbytes, cudaHostRegisterDefault));
} else {
CUDA_ENFORCE(cudaMallocHost(&data, nbytes));
}
memset(data, 0, nbytes);
return {data, Delete};
}
MemoryDeleter GetDeleter() override {
return Delete;
}
.....
.....
DefaultCPUAllocator baseAllocator_;
};
CUDAContext
最后我們看到了CUDAContext梦裂,它是實現(xiàn)GPU諸多底層操作支持的主力似枕。可以看得出來Caffe2里面的很多interfaces都是為了滿足CUDA編程的需要而加的像SwitchToDevice/FinishDeviceComputation等等年柠。當然它也提了許多自己的utility 函數(shù)凿歼,而且多是static函數(shù)褪迟,用于滿足公共的GPU資源的查詢需要,因此大多需要使用Mutext來保證線性訪問答憔。
class CAFFE2_API CUDAContext final : public BaseContext {
public:
// The default cuda context constructor.
explicit CUDAContext(const int gpu_id = -1);
explicit CUDAContext(const DeviceOption& option);
~CUDAContext() override {
if (curand_generator_) {
CURAND_CHECK(curandDestroyGenerator(curand_generator_));
}
FinishDeviceComputation();
}
inline void SwitchToDevice(int stream_id) override {
set_stream_id(stream_id);
CaffeCudaSetDevice(gpu_id_);
}
void FinishDeviceComputation() override {
cudaStreamSynchronize(cuda_objects_.GetStream(gpu_id_, stream_id_));
cudaError_t error = cudaGetLastError();
if (error != cudaSuccess) {
CAFFE_THROW("Encountered CUDA error: ", cudaGetErrorString(error));
}
}
..........
.........
static std::vector<long> TotalMemoryByGpu();
static std::vector<long> MaxMemoryByGpu();
static bool IsStreamFree(const DeviceOption& option, int stream_id) {
auto stream = CUDAContext::cuda_stream(option.cuda_gpu_id(), stream_id);
return cudaStreamQuery(stream) == cudaSuccess;
}
............
............
protected:
void set_stream_id(int stream_id) {
stream_id_ = stream_id;
}
int gpu_id_;
int stream_id_ = 0;
int random_seed_;
curandGenerator_t curand_generator_{nullptr};
static thread_local ThreadLocalCUDAObjects cuda_objects_;
};