DeepFaceLive/xlib/avecl/_internal/backend/Device.py
2021-10-22 19:31:35 +04:00

557 lines
20 KiB
Python

import random
from collections import deque
from typing import List, Union
import numpy as np
from . import OpenCL as CL
from .Buffer import Buffer
from .DeviceInfo import DeviceInfo
from .Kernel import Kernel
_np_dtype_to_cl = { np.uint8: CL.cl_uchar,
np.int8: CL.cl_char,
np.uint16: CL.cl_ushort,
np.int16: CL.cl_short,
np.uint32: CL.cl_uint,
np.int32: CL.cl_int,
np.uint64: CL.cl_ulong,
np.int64: CL.cl_long,
np.float16: CL.cl_half,
np.float32: CL.cl_float}
_opencl_device_ids = None
_default_device = None
_devices = {}
class Device:
"""
Represents physical TensorCL device
"""
def __init__(self, device_info : DeviceInfo, **kwargs):
if kwargs.get('_check', None) is None:
raise Exception('You should not to create Device from constructor. Use get_device()')
self._cached_data = {} # cached data (per device) by key
self._pooled_buffers = {} # Pool of cached device buffers.
self._cached_kernels = {} # compiled kernels by key
self._ctx_q = None # CL command queue
self._ctx = None # CL context
self._target_memory_usage = 0
self._total_memory_allocated = 0
self._total_buffers_allocated = 0
self._total_memory_pooled = 0
self._total_buffers_pooled = 0
self._device_info = device_info
self._device_id = _get_opencl_device_ids()[device_info.get_index()]
def __del__(self):
self.cleanup()
def __eq__(self, other):
if self is not None and other is not None and isinstance(self, Device) and isinstance(other, Device):
return self._device_id.value == other._device_id.value
return False
def __hash__(self):
return self._device_id.value
def _get_ctx(self) -> CL.cl_context:
# Create OpenCL context on demand
if self._ctx is None:
clr = CL.CLRESULT()
ctx = CL.clCreateContext( None, 1, (CL.cl_device_id * 1)( self._device_id ), None, None, clr)
if clr != CL.CLERROR.SUCCESS:
raise Exception('Unable to create OpenCL context.')
self._ctx = ctx
return self._ctx
def _get_ctx_q(self) -> CL.cl_command_queue:
# Create CommandQueue on demand
if self._ctx_q is None:
clr = CL.CLRESULT()
ctx_q = CL.clCreateCommandQueue(self._get_ctx(), self._device_id, CL.cl_command_queue_properties(0), clr)
if clr != CL.CLERROR.SUCCESS:
raise Exception('Unable to create OpenCL CommandQueue.')
self._ctx_q = ctx_q
return self._ctx_q
def _compile_kernel(self, key, kernel_text) -> CL.cl_kernel:
"""
compile or get cached kernel
"""
compiled_krn, prog = self._cached_kernels.get(key, (None, None) )
if compiled_krn is None:
clr = CL.CLRESULT()
prog = CL.clCreateProgramWithSource(self._get_ctx(), 1, CL.c_char_p(kernel_text.encode()), None, clr )
if clr != CL.CLERROR.SUCCESS:
raise Exception(f'clCreateProgramWithSource error {clr}, with kernel_text:\n{kernel_text}')
clr = CL.clBuildProgram(prog, 1, (CL.cl_device_id*1)(self._device_id), CL.c_char_p('-cl-std=CL1.2 -cl-single-precision-constant'.encode()), None, None )
if clr != CL.CLERROR.SUCCESS:
build_log_size = CL.c_size_t()
clr = CL.clGetProgramBuildInfo(prog, self._device_id, CL.CL_PROGRAM_BUILD_LOG, 0, None, CL.byref(build_log_size) )
if clr != CL.CLERROR.SUCCESS:
raise Exception(f'clGetProgramBuildInfo,error: {clr}')
build_log = CL.create_string_buffer(build_log_size.value)
clr = CL.clGetProgramBuildInfo(prog, self._device_id, CL.CL_PROGRAM_BUILD_LOG, build_log_size.value, build_log, None )
if clr != CL.CLERROR.SUCCESS:
raise Exception(f'clGetProgramBuildInfo error: {clr}')
build_log = str(build_log.value, 'utf-8')
raise Exception(f'clBuildProgram error:\n\n{build_log}')
num_kernels = CL.cl_uint()
clr = CL.clCreateKernelsInProgram(prog, 0, None, CL.byref(num_kernels))
if clr != CL.CLERROR.SUCCESS:
raise Exception(f'clCreateKernelsInProgram error: {clr}')
if num_kernels.value != 1:
raise Exception(f'Kernel must contain only one __kernel:\n\n{kernel_text}')
kernels = (CL.cl_kernel * num_kernels.value)()
clr = CL.clCreateKernelsInProgram(prog, num_kernels.value, kernels, None)
if clr != CL.CLERROR.SUCCESS:
raise Exception(f'clCreateKernelsInProgram error: {clr}')
compiled_krn = kernels[0]
self._cached_kernels[key] = (compiled_krn, prog)
return compiled_krn
def _cl_mem_alloc(self, size) -> CL.cl_mem:
self._keep_target_memory_usage()
clr = CL.CLRESULT()
mem = CL.clCreateBuffer(self._get_ctx(), CL.CL_MEM_READ_WRITE, size, None, clr)
if clr == CL.CLERROR.SUCCESS:
# Fill one byte to check memory availability
ev = CL.cl_event()
clr = CL.clEnqueueFillBuffer (self._get_ctx_q(), mem, (CL.c_char * 1)(), 1, 0, 1, 0, None, ev )
if clr == CL.CLERROR.SUCCESS:
CL.clReleaseEvent(ev)
self._total_memory_allocated += size
self._total_buffers_allocated += 1
return mem
return None
def _cl_mem_free(self, mem : CL.cl_mem):
size = CL.c_size_t()
clr = CL.clGetMemObjectInfo(mem, CL.CL_MEM_SIZE, CL.sizeof(size), CL.byref(size), None )
if clr != CL.CLERROR.SUCCESS:
raise Exception(f'clGetMemObjectInfo error: {clr}')
size = size.value
self._total_memory_allocated -= size
self._total_buffers_allocated -= 1
clr = CL.clReleaseMemObject(mem)
if clr != CL.CLERROR.SUCCESS:
raise Exception(f'clReleaseMemObject error: {clr}')
def _cl_mem_pool_alloc(self, size):
"""
allocate or get cl_mem from pool
"""
self._keep_target_memory_usage()
pool = self._pooled_buffers
# First try to get pooled buffer
ar = pool.get(size, None)
if ar is not None and len(ar) != 0:
mem = ar.pop()
self._total_memory_pooled -= size
self._total_buffers_pooled -= 1
else:
# No pooled buffer, try to allocate new
while True:
mem = self._cl_mem_alloc(size)
if mem is None:
# MemoryError.
if not self._release_random_pooled_buffers():
raise Exception(f'Unable to allocate {size // 1024**2}Mb on {self.get_description()}')
continue
break
return mem
def _cl_mem_pool_free(self, mem : CL.cl_mem):
"""
Put cl_mem to pool for reuse in future.
"""
size = CL.c_size_t()
clr = CL.clGetMemObjectInfo(mem, CL.CL_MEM_SIZE, CL.sizeof(size), CL.byref(size), None )
if clr != CL.CLERROR.SUCCESS:
raise Exception(f'clGetMemObjectInfo error: {clr}')
size = size.value
pool = self._pooled_buffers
ar = pool.get(size, None)
if ar is None:
ar = pool[size] = deque()
ar.append(mem)
self._total_memory_pooled += size
self._total_buffers_pooled += 1
def _release_random_pooled_buffers(self) -> bool:
"""
remove random 25% of pooled boofers
returns True if something was released
"""
pool = self._pooled_buffers
mems = [ (k,x) for k in pool.keys() for x in pool[k] ]
if len(mems) != 0:
mems = random.sample(mems, max(1,int(len(mems)*0.25)) )
for k, mem in mems:
self._cl_mem_free(mem)
pool[k].remove(mem)
return len(mems) != 0
def _keep_target_memory_usage(self):
targ = self._target_memory_usage
if targ != 0 and self.get_total_allocated_memory() >= targ:
self.cleanup_cached_kernels()
self._release_random_pooled_buffers()
def __str__(self):
return self.get_description()
def __repr__(self):
return f'{self.__class__.__name__} object: ' + self.__str__()
def clear_pooled_memory(self):
"""
frees pooled memory
"""
pool = self._pooled_buffers
for size_key in pool.keys():
for mem in pool[size_key]:
self._cl_mem_free(mem)
self._pooled_buffers = {}
self._total_memory_pooled = 0
self._total_buffers_pooled = 0
def cleanup_cached_kernels(self):
for kernel, prog in self._cached_kernels.values():
clr = CL.clReleaseKernel(kernel)
if clr != CL.CLERROR.SUCCESS:
raise Exception(f'clReleaseKernel error: {clr}')
clr = CL.clReleaseProgram(prog)
if clr != CL.CLERROR.SUCCESS:
raise Exception(f'clReleaseProgram error: {clr}')
self._cached_kernels = {}
def cleanup(self):
"""
Frees all resources from this Device.
"""
self._cached_data = {}
self.clear_pooled_memory()
if self._total_memory_allocated != 0:
raise Exception('Unable to cleanup CLDevice, while not all Buffers are deallocated.')
self.cleanup_cached_kernels()
if self._ctx_q is not None:
clr = CL.clReleaseCommandQueue(self._ctx_q)
if clr != CL.CLERROR.SUCCESS:
raise Exception(f'clReleaseCommandQueue error: {clr}')
self._ctx_q = None
if self._ctx is not None:
clr = CL.clReleaseContext(self._ctx)
if clr != CL.CLERROR.SUCCESS:
raise Exception(f'clReleaseContext error: {clr}')
self._ctx = None
def get_cached_data(self, key):
return self._cached_data.get(key, None)
def get_description(self) -> str:
return f"{self._device_info.get_name()} [{(self._device_info.get_total_memory() / 1024**3) :.3}Gb]"
def get_total_allocated_memory(self):
"""
get total bytes of used and pooled memory
"""
return self._total_memory_allocated
def set_cached_data(self, key, value):
"""
All cached data will be freed with cleanup()
"""
self._cached_data[key] = value
def set_target_memory_usage(self, mb : int):
"""
keep memory usage at specified position
when total allocated memory reached the target and new allocation is performing,
random pooled memory will be freed
"""
self._target_memory_usage = mb*1024*1024
def print_stat(self):
s = f'''
Total memory allocated: {self._total_memory_allocated}
Total buffers allocated: {self._total_buffers_allocated}
Total memory pooled: {self._total_memory_pooled}
Total buffers pooled: {self._total_buffers_pooled}
N of compiled kernels: {len(self._cached_kernels)}
N of cacheddata: {len(self._cached_data)}
'''
print(s)
def run_kernel(self, kernel : Kernel, *args, global_shape=None, local_shape=None, global_shape_offsets=None, wait=False):
"""
Run kernel on Device
Arguments
*args arguments will be passed to OpenCL kernel
allowed types:
Buffer
np single value
global_shape(None) tuple of ints, up to 3 dims
amount of parallel kernel executions.
in OpenCL kernel,
id can be obtained via get_global_id(dim)
local_shape(None) tuple of ints, up to 3 dims
specifies local groups of every dim of global_shape.
in OpenCL kernel,
id can be obtained via get_local_id(dim)
global_shape_offsets(None) tuple of ints
offsets for global_shape
wait(False) wait execution to complete
"""
if global_shape is None:
global_shape = kernel.get_global_shape()
if global_shape is None:
raise ValueError('global_shape must be defined.')
work_dim = len(global_shape)
global_shape_ar = (CL.c_size_t*work_dim)()
for i,v in enumerate(global_shape):
global_shape_ar[i] = v
local_shape_ar = None
if local_shape is None:
local_shape = kernel.get_local_shape()
if local_shape is not None:
if len(local_shape) != work_dim:
raise ValueError('len of local_shape must match global_shape')
local_shape_ar = (CL.c_size_t*work_dim)()
for i,v in enumerate(local_shape):
local_shape_ar[i] = v
global_shape_offsets_ar = None
if global_shape_offsets is not None:
if len(global_shape_offsets) != work_dim:
raise ValueError('len of global_shape_offsets must match global_shape')
global_shape_offsets_ar = (CL.c_size_t*work_dim)()
for i,v in enumerate(local_shape):
global_shape_offsets_ar[i] = v
krn_args = []
for i, arg in enumerate(args):
if isinstance(arg, Buffer):
arg = arg.get_cl_mem()
else:
cl_type = _np_dtype_to_cl.get(arg.__class__, None)
if cl_type is None:
raise ValueError(f'Cannot convert type {arg.__class__} to OpenCL type.')
arg = cl_type(arg)
krn_args.append(arg)
ckernel = self._compile_kernel(kernel, kernel.get_kernel_text())
for i, arg in enumerate(krn_args):
clr = CL.clSetKernelArg(ckernel, i, CL.sizeof(arg), CL.byref(arg))
if clr != CL.CLERROR.SUCCESS:
raise Exception(f'clSetKernelArg error: {clr}')
ev = CL.cl_event() if wait else None
clr = CL.clEnqueueNDRangeKernel(self._get_ctx_q(), ckernel, work_dim, global_shape_offsets_ar, global_shape_ar, local_shape_ar, 0, None, ev)
if clr != CL.CLERROR.SUCCESS:
raise Exception(f'clEnqueueNDRangeKernel error: {clr}')
if wait:
CL.clWaitForEvents(1, (CL.cl_event*1)(ev) )
CL.clReleaseEvent(ev)
def wait(self):
"""
Wait to finish all queued operations on this Device
"""
clr = CL.clFinish(self._get_ctx_q())
if clr != CL.CLERROR.SUCCESS:
raise Exception(f'clFinish error: {clr}')
def _get_opencl_device_ids() -> List[CL.cl_device_id]:
global _opencl_device_ids
if _opencl_device_ids is None:
_opencl_device_ids = []
device_types = CL.CL_DEVICE_TYPE_CPU | CL.CL_DEVICE_TYPE_ACCELERATOR | CL.CL_DEVICE_TYPE_GPU
while True:
num_platforms = CL.cl_uint()
if CL.clGetPlatformIDs(0, None, num_platforms) != CL.CLERROR.SUCCESS or \
num_platforms.value == 0:
break
platforms = (CL.cl_platform_id * num_platforms.value) ()
if CL.clGetPlatformIDs(num_platforms.value, platforms, None) != CL.CLERROR.SUCCESS:
break
for i_platform in range(num_platforms.value):
platform = platforms[i_platform]
num_devices = CL.cl_uint(0)
if CL.clGetDeviceIDs(platform, device_types, 0, None, num_devices) != CL.CLERROR.SUCCESS or \
num_devices.value == 0:
continue
device_ids = (CL.cl_device_id * num_devices.value)()
if CL.clGetDeviceIDs(platform, device_types, num_devices.value, device_ids, None) != CL.CLERROR.SUCCESS:
continue
for i in range(num_devices.value):
device_id = device_ids[i]
# Check OpenCL version.
if device_id is not None:
device_version_size = CL.c_size_t()
if CL.clGetDeviceInfo(device_id, CL.CL_DEVICE_VERSION, 0, None, device_version_size) == CL.CLERROR.SUCCESS:
device_version = CL.create_string_buffer(device_version_size.value)
if CL.clGetDeviceInfo(device_id, CL.CL_DEVICE_VERSION, device_version_size.value, device_version, None) == CL.CLERROR.SUCCESS:
device_version = str(device_version.value, 'ascii')
major, minor = device_version.split(' ')[1].split('.')
opencl_version = int(major)*10+int(minor)
if opencl_version >= 12:
_opencl_device_ids.append(device_id)
break
return _opencl_device_ids
def get_available_devices_info() -> List[DeviceInfo]:
"""
returns a list of available picklable DeviceInfo's
"""
devices = []
for device_index, device_id in enumerate(_get_opencl_device_ids()):
device_name = 'undefined'
device_total_memory = 0
name_size = CL.c_size_t()
if CL.clGetDeviceInfo(device_id, CL.CL_DEVICE_NAME, 0, None, name_size) == CL.CLERROR.SUCCESS:
name_value = CL.create_string_buffer(name_size.value)
if CL.clGetDeviceInfo(device_id, CL.CL_DEVICE_NAME, name_size.value, name_value, None) == CL.CLERROR.SUCCESS:
device_name = str(name_value.value, 'ascii')
global_mem_size = CL.cl_ulong()
if CL.clGetDeviceInfo(device_id, CL.CL_DEVICE_GLOBAL_MEM_SIZE, CL.sizeof(global_mem_size), CL.byref(global_mem_size), None) == CL.CLERROR.SUCCESS:
device_total_memory = global_mem_size.value
vendor_id = CL.cl_uint()
CL.clGetDeviceInfo(device_id, CL.CL_DEVICE_VENDOR_ID, CL.sizeof(vendor_id), CL.byref(vendor_id), None)
vendor_id = vendor_id.value
max_compute_units = CL.cl_uint()
CL.clGetDeviceInfo(device_id, CL.CL_DEVICE_MAX_COMPUTE_UNITS, CL.sizeof(max_compute_units), CL.byref(max_compute_units), None)
max_compute_units = max_compute_units.value
performance_level = max_compute_units
if vendor_id == 0x8086: # Intel device
performance_level -= 1000
devices.append( DeviceInfo(index=device_index, name=device_name, total_memory=device_total_memory, performance_level=performance_level ) )
return devices
def get_default_device() -> Union[Device, None]:
global _default_device
if _default_device is None:
_default_device = get_best_device()
return _default_device
def set_default_device(device : Device):
if not isinstance(device, Device):
raise ValueError('device must be an instance of Device')
global _default_device
_default_device = device
def get_device(arg : Union[None, int, Device, DeviceInfo]) -> Union[Device, None]:
"""
get physical TensorCL device.
arg None - get default device
int - by index
DeviceInfo - by device info
Device - returns the same
"""
global _devices
if arg is None:
return get_default_device()
elif isinstance(arg, int):
devices_info = get_available_devices_info()
if arg < len(devices_info):
arg = devices_info[arg]
else:
return None
elif isinstance(arg, Device):
return arg
elif not isinstance(arg, DeviceInfo):
raise ValueError(f'Unknown type of arg {arg.__class__}')
device = _devices.get(arg, None)
if device is None:
device = _devices[arg] = Device(arg, _check=1)
return device
def get_best_device() -> Union[Device, None]:
"""
returns best device from available.
"""
perf_level = -999999
result = None
for device_info in get_available_devices_info():
dev_perf_level = device_info.get_performance_level()
if perf_level < dev_perf_level:
perf_level = dev_perf_level
result = device_info
if result is not None:
result = get_device(result)
return result
def cleanup_devices():
global _devices
for device in list(_devices.values()):
device.cleanup()
_devices = {}