mirror of
https://github.com/iperov/DeepFaceLive.git
synced 2024-12-26 07:51:13 -08:00
557 lines
20 KiB
Python
557 lines
20 KiB
Python
import random
|
|
from collections import deque
|
|
from typing import List, Union
|
|
|
|
import numpy as np
|
|
|
|
from . import OpenCL as CL
|
|
from .Buffer import Buffer
|
|
from .DeviceInfo import DeviceInfo
|
|
from .Kernel import Kernel
|
|
|
|
_np_dtype_to_cl = { np.uint8: CL.cl_uchar,
|
|
np.int8: CL.cl_char,
|
|
np.uint16: CL.cl_ushort,
|
|
np.int16: CL.cl_short,
|
|
np.uint32: CL.cl_uint,
|
|
np.int32: CL.cl_int,
|
|
np.uint64: CL.cl_ulong,
|
|
np.int64: CL.cl_long,
|
|
np.float16: CL.cl_half,
|
|
np.float32: CL.cl_float}
|
|
|
|
_opencl_device_ids = None
|
|
_default_device = None
|
|
_devices = {}
|
|
|
|
class Device:
|
|
"""
|
|
Represents physical TensorCL device
|
|
"""
|
|
|
|
def __init__(self, device_info : DeviceInfo, **kwargs):
|
|
if kwargs.get('_check', None) is None:
|
|
raise Exception('You should not to create Device from constructor. Use get_device()')
|
|
|
|
self._cached_data = {} # cached data (per device) by key
|
|
self._pooled_buffers = {} # Pool of cached device buffers.
|
|
self._cached_kernels = {} # compiled kernels by key
|
|
self._ctx_q = None # CL command queue
|
|
self._ctx = None # CL context
|
|
|
|
self._target_memory_usage = 0
|
|
|
|
self._total_memory_allocated = 0
|
|
self._total_buffers_allocated = 0
|
|
self._total_memory_pooled = 0
|
|
self._total_buffers_pooled = 0
|
|
|
|
self._device_info = device_info
|
|
self._device_id = _get_opencl_device_ids()[device_info.get_index()]
|
|
|
|
def __del__(self):
|
|
self.cleanup()
|
|
|
|
def __eq__(self, other):
|
|
if self is not None and other is not None and isinstance(self, Device) and isinstance(other, Device):
|
|
return self._device_id.value == other._device_id.value
|
|
return False
|
|
|
|
def __hash__(self):
|
|
return self._device_id.value
|
|
|
|
def _get_ctx(self) -> CL.cl_context:
|
|
# Create OpenCL context on demand
|
|
if self._ctx is None:
|
|
clr = CL.CLRESULT()
|
|
ctx = CL.clCreateContext( None, 1, (CL.cl_device_id * 1)( self._device_id ), None, None, clr)
|
|
if clr != CL.CLERROR.SUCCESS:
|
|
raise Exception('Unable to create OpenCL context.')
|
|
self._ctx = ctx
|
|
return self._ctx
|
|
|
|
def _get_ctx_q(self) -> CL.cl_command_queue:
|
|
# Create CommandQueue on demand
|
|
if self._ctx_q is None:
|
|
clr = CL.CLRESULT()
|
|
ctx_q = CL.clCreateCommandQueue(self._get_ctx(), self._device_id, CL.cl_command_queue_properties(0), clr)
|
|
if clr != CL.CLERROR.SUCCESS:
|
|
raise Exception('Unable to create OpenCL CommandQueue.')
|
|
self._ctx_q = ctx_q
|
|
return self._ctx_q
|
|
|
|
def _compile_kernel(self, key, kernel_text) -> CL.cl_kernel:
|
|
"""
|
|
compile or get cached kernel
|
|
"""
|
|
|
|
compiled_krn, prog = self._cached_kernels.get(key, (None, None) )
|
|
if compiled_krn is None:
|
|
clr = CL.CLRESULT()
|
|
prog = CL.clCreateProgramWithSource(self._get_ctx(), 1, CL.c_char_p(kernel_text.encode()), None, clr )
|
|
if clr != CL.CLERROR.SUCCESS:
|
|
raise Exception(f'clCreateProgramWithSource error {clr}, with kernel_text:\n{kernel_text}')
|
|
|
|
clr = CL.clBuildProgram(prog, 1, (CL.cl_device_id*1)(self._device_id), CL.c_char_p('-cl-std=CL1.2 -cl-single-precision-constant'.encode()), None, None )
|
|
if clr != CL.CLERROR.SUCCESS:
|
|
build_log_size = CL.c_size_t()
|
|
clr = CL.clGetProgramBuildInfo(prog, self._device_id, CL.CL_PROGRAM_BUILD_LOG, 0, None, CL.byref(build_log_size) )
|
|
if clr != CL.CLERROR.SUCCESS:
|
|
raise Exception(f'clGetProgramBuildInfo,error: {clr}')
|
|
|
|
build_log = CL.create_string_buffer(build_log_size.value)
|
|
clr = CL.clGetProgramBuildInfo(prog, self._device_id, CL.CL_PROGRAM_BUILD_LOG, build_log_size.value, build_log, None )
|
|
if clr != CL.CLERROR.SUCCESS:
|
|
raise Exception(f'clGetProgramBuildInfo error: {clr}')
|
|
|
|
build_log = str(build_log.value, 'utf-8')
|
|
raise Exception(f'clBuildProgram error:\n\n{build_log}')
|
|
|
|
num_kernels = CL.cl_uint()
|
|
clr = CL.clCreateKernelsInProgram(prog, 0, None, CL.byref(num_kernels))
|
|
if clr != CL.CLERROR.SUCCESS:
|
|
raise Exception(f'clCreateKernelsInProgram error: {clr}')
|
|
|
|
if num_kernels.value != 1:
|
|
raise Exception(f'Kernel must contain only one __kernel:\n\n{kernel_text}')
|
|
|
|
kernels = (CL.cl_kernel * num_kernels.value)()
|
|
clr = CL.clCreateKernelsInProgram(prog, num_kernels.value, kernels, None)
|
|
if clr != CL.CLERROR.SUCCESS:
|
|
raise Exception(f'clCreateKernelsInProgram error: {clr}')
|
|
|
|
compiled_krn = kernels[0]
|
|
self._cached_kernels[key] = (compiled_krn, prog)
|
|
|
|
return compiled_krn
|
|
|
|
def _cl_mem_alloc(self, size) -> CL.cl_mem:
|
|
self._keep_target_memory_usage()
|
|
|
|
clr = CL.CLRESULT()
|
|
mem = CL.clCreateBuffer(self._get_ctx(), CL.CL_MEM_READ_WRITE, size, None, clr)
|
|
if clr == CL.CLERROR.SUCCESS:
|
|
# Fill one byte to check memory availability
|
|
ev = CL.cl_event()
|
|
clr = CL.clEnqueueFillBuffer (self._get_ctx_q(), mem, (CL.c_char * 1)(), 1, 0, 1, 0, None, ev )
|
|
if clr == CL.CLERROR.SUCCESS:
|
|
CL.clReleaseEvent(ev)
|
|
self._total_memory_allocated += size
|
|
self._total_buffers_allocated += 1
|
|
return mem
|
|
return None
|
|
|
|
def _cl_mem_free(self, mem : CL.cl_mem):
|
|
size = CL.c_size_t()
|
|
clr = CL.clGetMemObjectInfo(mem, CL.CL_MEM_SIZE, CL.sizeof(size), CL.byref(size), None )
|
|
if clr != CL.CLERROR.SUCCESS:
|
|
raise Exception(f'clGetMemObjectInfo error: {clr}')
|
|
size = size.value
|
|
|
|
self._total_memory_allocated -= size
|
|
self._total_buffers_allocated -= 1
|
|
clr = CL.clReleaseMemObject(mem)
|
|
if clr != CL.CLERROR.SUCCESS:
|
|
raise Exception(f'clReleaseMemObject error: {clr}')
|
|
|
|
def _cl_mem_pool_alloc(self, size):
|
|
"""
|
|
allocate or get cl_mem from pool
|
|
"""
|
|
self._keep_target_memory_usage()
|
|
|
|
pool = self._pooled_buffers
|
|
|
|
# First try to get pooled buffer
|
|
ar = pool.get(size, None)
|
|
if ar is not None and len(ar) != 0:
|
|
mem = ar.pop()
|
|
self._total_memory_pooled -= size
|
|
self._total_buffers_pooled -= 1
|
|
else:
|
|
# No pooled buffer, try to allocate new
|
|
while True:
|
|
mem = self._cl_mem_alloc(size)
|
|
if mem is None:
|
|
# MemoryError.
|
|
if not self._release_random_pooled_buffers():
|
|
raise Exception(f'Unable to allocate {size // 1024**2}Mb on {self.get_description()}')
|
|
continue
|
|
break
|
|
|
|
return mem
|
|
|
|
def _cl_mem_pool_free(self, mem : CL.cl_mem):
|
|
"""
|
|
Put cl_mem to pool for reuse in future.
|
|
"""
|
|
size = CL.c_size_t()
|
|
clr = CL.clGetMemObjectInfo(mem, CL.CL_MEM_SIZE, CL.sizeof(size), CL.byref(size), None )
|
|
if clr != CL.CLERROR.SUCCESS:
|
|
raise Exception(f'clGetMemObjectInfo error: {clr}')
|
|
size = size.value
|
|
|
|
pool = self._pooled_buffers
|
|
ar = pool.get(size, None)
|
|
if ar is None:
|
|
ar = pool[size] = deque()
|
|
ar.append(mem)
|
|
|
|
self._total_memory_pooled += size
|
|
self._total_buffers_pooled += 1
|
|
|
|
def _release_random_pooled_buffers(self) -> bool:
|
|
"""
|
|
remove random 25% of pooled boofers
|
|
|
|
returns True if something was released
|
|
"""
|
|
pool = self._pooled_buffers
|
|
mems = [ (k,x) for k in pool.keys() for x in pool[k] ]
|
|
|
|
if len(mems) != 0:
|
|
mems = random.sample(mems, max(1,int(len(mems)*0.25)) )
|
|
for k, mem in mems:
|
|
self._cl_mem_free(mem)
|
|
pool[k].remove(mem)
|
|
return len(mems) != 0
|
|
|
|
def _keep_target_memory_usage(self):
|
|
targ = self._target_memory_usage
|
|
if targ != 0 and self.get_total_allocated_memory() >= targ:
|
|
self.cleanup_cached_kernels()
|
|
self._release_random_pooled_buffers()
|
|
|
|
def __str__(self):
|
|
return self.get_description()
|
|
|
|
def __repr__(self):
|
|
return f'{self.__class__.__name__} object: ' + self.__str__()
|
|
|
|
def clear_pooled_memory(self):
|
|
"""
|
|
frees pooled memory
|
|
"""
|
|
|
|
pool = self._pooled_buffers
|
|
for size_key in pool.keys():
|
|
for mem in pool[size_key]:
|
|
self._cl_mem_free(mem)
|
|
self._pooled_buffers = {}
|
|
self._total_memory_pooled = 0
|
|
self._total_buffers_pooled = 0
|
|
|
|
def cleanup_cached_kernels(self):
|
|
for kernel, prog in self._cached_kernels.values():
|
|
clr = CL.clReleaseKernel(kernel)
|
|
if clr != CL.CLERROR.SUCCESS:
|
|
raise Exception(f'clReleaseKernel error: {clr}')
|
|
|
|
clr = CL.clReleaseProgram(prog)
|
|
if clr != CL.CLERROR.SUCCESS:
|
|
raise Exception(f'clReleaseProgram error: {clr}')
|
|
self._cached_kernels = {}
|
|
|
|
def cleanup(self):
|
|
"""
|
|
Frees all resources from this Device.
|
|
"""
|
|
self._cached_data = {}
|
|
|
|
self.clear_pooled_memory()
|
|
|
|
if self._total_memory_allocated != 0:
|
|
raise Exception('Unable to cleanup CLDevice, while not all Buffers are deallocated.')
|
|
|
|
self.cleanup_cached_kernels()
|
|
|
|
if self._ctx_q is not None:
|
|
clr = CL.clReleaseCommandQueue(self._ctx_q)
|
|
if clr != CL.CLERROR.SUCCESS:
|
|
raise Exception(f'clReleaseCommandQueue error: {clr}')
|
|
self._ctx_q = None
|
|
|
|
if self._ctx is not None:
|
|
clr = CL.clReleaseContext(self._ctx)
|
|
if clr != CL.CLERROR.SUCCESS:
|
|
raise Exception(f'clReleaseContext error: {clr}')
|
|
self._ctx = None
|
|
|
|
def get_cached_data(self, key):
|
|
return self._cached_data.get(key, None)
|
|
|
|
def get_description(self) -> str:
|
|
return f"{self._device_info.get_name()} [{(self._device_info.get_total_memory() / 1024**3) :.3}Gb]"
|
|
|
|
def get_total_allocated_memory(self):
|
|
"""
|
|
get total bytes of used and pooled memory
|
|
"""
|
|
return self._total_memory_allocated
|
|
|
|
def set_cached_data(self, key, value):
|
|
"""
|
|
All cached data will be freed with cleanup()
|
|
"""
|
|
self._cached_data[key] = value
|
|
|
|
def set_target_memory_usage(self, mb : int):
|
|
"""
|
|
keep memory usage at specified position
|
|
|
|
when total allocated memory reached the target and new allocation is performing,
|
|
random pooled memory will be freed
|
|
"""
|
|
self._target_memory_usage = mb*1024*1024
|
|
|
|
def print_stat(self):
|
|
s = f'''
|
|
Total memory allocated: {self._total_memory_allocated}
|
|
Total buffers allocated: {self._total_buffers_allocated}
|
|
Total memory pooled: {self._total_memory_pooled}
|
|
Total buffers pooled: {self._total_buffers_pooled}
|
|
N of compiled kernels: {len(self._cached_kernels)}
|
|
N of cacheddata: {len(self._cached_data)}
|
|
'''
|
|
print(s)
|
|
|
|
def run_kernel(self, kernel : Kernel, *args, global_shape=None, local_shape=None, global_shape_offsets=None, wait=False):
|
|
"""
|
|
Run kernel on Device
|
|
|
|
Arguments
|
|
|
|
*args arguments will be passed to OpenCL kernel
|
|
allowed types:
|
|
|
|
Buffer
|
|
np single value
|
|
|
|
global_shape(None) tuple of ints, up to 3 dims
|
|
amount of parallel kernel executions.
|
|
in OpenCL kernel,
|
|
id can be obtained via get_global_id(dim)
|
|
|
|
local_shape(None) tuple of ints, up to 3 dims
|
|
specifies local groups of every dim of global_shape.
|
|
in OpenCL kernel,
|
|
id can be obtained via get_local_id(dim)
|
|
|
|
global_shape_offsets(None) tuple of ints
|
|
offsets for global_shape
|
|
|
|
wait(False) wait execution to complete
|
|
"""
|
|
if global_shape is None:
|
|
global_shape = kernel.get_global_shape()
|
|
if global_shape is None:
|
|
raise ValueError('global_shape must be defined.')
|
|
|
|
work_dim = len(global_shape)
|
|
global_shape_ar = (CL.c_size_t*work_dim)()
|
|
for i,v in enumerate(global_shape):
|
|
global_shape_ar[i] = v
|
|
|
|
local_shape_ar = None
|
|
if local_shape is None:
|
|
local_shape = kernel.get_local_shape()
|
|
if local_shape is not None:
|
|
if len(local_shape) != work_dim:
|
|
raise ValueError('len of local_shape must match global_shape')
|
|
|
|
local_shape_ar = (CL.c_size_t*work_dim)()
|
|
for i,v in enumerate(local_shape):
|
|
local_shape_ar[i] = v
|
|
|
|
|
|
global_shape_offsets_ar = None
|
|
if global_shape_offsets is not None:
|
|
if len(global_shape_offsets) != work_dim:
|
|
raise ValueError('len of global_shape_offsets must match global_shape')
|
|
|
|
global_shape_offsets_ar = (CL.c_size_t*work_dim)()
|
|
for i,v in enumerate(local_shape):
|
|
global_shape_offsets_ar[i] = v
|
|
|
|
krn_args = []
|
|
for i, arg in enumerate(args):
|
|
if isinstance(arg, Buffer):
|
|
arg = arg.get_cl_mem()
|
|
else:
|
|
cl_type = _np_dtype_to_cl.get(arg.__class__, None)
|
|
if cl_type is None:
|
|
raise ValueError(f'Cannot convert type {arg.__class__} to OpenCL type.')
|
|
arg = cl_type(arg)
|
|
krn_args.append(arg)
|
|
|
|
ckernel = self._compile_kernel(kernel, kernel.get_kernel_text())
|
|
|
|
for i, arg in enumerate(krn_args):
|
|
clr = CL.clSetKernelArg(ckernel, i, CL.sizeof(arg), CL.byref(arg))
|
|
if clr != CL.CLERROR.SUCCESS:
|
|
raise Exception(f'clSetKernelArg error: {clr}')
|
|
|
|
ev = CL.cl_event() if wait else None
|
|
|
|
clr = CL.clEnqueueNDRangeKernel(self._get_ctx_q(), ckernel, work_dim, global_shape_offsets_ar, global_shape_ar, local_shape_ar, 0, None, ev)
|
|
if clr != CL.CLERROR.SUCCESS:
|
|
raise Exception(f'clEnqueueNDRangeKernel error: {clr}')
|
|
|
|
if wait:
|
|
CL.clWaitForEvents(1, (CL.cl_event*1)(ev) )
|
|
CL.clReleaseEvent(ev)
|
|
|
|
def wait(self):
|
|
"""
|
|
Wait to finish all queued operations on this Device
|
|
"""
|
|
clr = CL.clFinish(self._get_ctx_q())
|
|
if clr != CL.CLERROR.SUCCESS:
|
|
raise Exception(f'clFinish error: {clr}')
|
|
|
|
|
|
def _get_opencl_device_ids() -> List[CL.cl_device_id]:
|
|
global _opencl_device_ids
|
|
if _opencl_device_ids is None:
|
|
_opencl_device_ids = []
|
|
device_types = CL.CL_DEVICE_TYPE_CPU | CL.CL_DEVICE_TYPE_ACCELERATOR | CL.CL_DEVICE_TYPE_GPU
|
|
|
|
while True:
|
|
num_platforms = CL.cl_uint()
|
|
if CL.clGetPlatformIDs(0, None, num_platforms) != CL.CLERROR.SUCCESS or \
|
|
num_platforms.value == 0:
|
|
break
|
|
|
|
platforms = (CL.cl_platform_id * num_platforms.value) ()
|
|
if CL.clGetPlatformIDs(num_platforms.value, platforms, None) != CL.CLERROR.SUCCESS:
|
|
break
|
|
|
|
for i_platform in range(num_platforms.value):
|
|
platform = platforms[i_platform]
|
|
num_devices = CL.cl_uint(0)
|
|
if CL.clGetDeviceIDs(platform, device_types, 0, None, num_devices) != CL.CLERROR.SUCCESS or \
|
|
num_devices.value == 0:
|
|
continue
|
|
|
|
device_ids = (CL.cl_device_id * num_devices.value)()
|
|
if CL.clGetDeviceIDs(platform, device_types, num_devices.value, device_ids, None) != CL.CLERROR.SUCCESS:
|
|
continue
|
|
|
|
for i in range(num_devices.value):
|
|
device_id = device_ids[i]
|
|
# Check OpenCL version.
|
|
if device_id is not None:
|
|
device_version_size = CL.c_size_t()
|
|
if CL.clGetDeviceInfo(device_id, CL.CL_DEVICE_VERSION, 0, None, device_version_size) == CL.CLERROR.SUCCESS:
|
|
device_version = CL.create_string_buffer(device_version_size.value)
|
|
if CL.clGetDeviceInfo(device_id, CL.CL_DEVICE_VERSION, device_version_size.value, device_version, None) == CL.CLERROR.SUCCESS:
|
|
device_version = str(device_version.value, 'ascii')
|
|
|
|
major, minor = device_version.split(' ')[1].split('.')
|
|
opencl_version = int(major)*10+int(minor)
|
|
if opencl_version >= 12:
|
|
_opencl_device_ids.append(device_id)
|
|
break
|
|
return _opencl_device_ids
|
|
|
|
def get_available_devices_info() -> List[DeviceInfo]:
|
|
"""
|
|
returns a list of available picklable DeviceInfo's
|
|
"""
|
|
devices = []
|
|
for device_index, device_id in enumerate(_get_opencl_device_ids()):
|
|
device_name = 'undefined'
|
|
device_total_memory = 0
|
|
|
|
name_size = CL.c_size_t()
|
|
if CL.clGetDeviceInfo(device_id, CL.CL_DEVICE_NAME, 0, None, name_size) == CL.CLERROR.SUCCESS:
|
|
name_value = CL.create_string_buffer(name_size.value)
|
|
if CL.clGetDeviceInfo(device_id, CL.CL_DEVICE_NAME, name_size.value, name_value, None) == CL.CLERROR.SUCCESS:
|
|
device_name = str(name_value.value, 'ascii')
|
|
|
|
global_mem_size = CL.cl_ulong()
|
|
if CL.clGetDeviceInfo(device_id, CL.CL_DEVICE_GLOBAL_MEM_SIZE, CL.sizeof(global_mem_size), CL.byref(global_mem_size), None) == CL.CLERROR.SUCCESS:
|
|
device_total_memory = global_mem_size.value
|
|
|
|
vendor_id = CL.cl_uint()
|
|
CL.clGetDeviceInfo(device_id, CL.CL_DEVICE_VENDOR_ID, CL.sizeof(vendor_id), CL.byref(vendor_id), None)
|
|
vendor_id = vendor_id.value
|
|
|
|
max_compute_units = CL.cl_uint()
|
|
CL.clGetDeviceInfo(device_id, CL.CL_DEVICE_MAX_COMPUTE_UNITS, CL.sizeof(max_compute_units), CL.byref(max_compute_units), None)
|
|
max_compute_units = max_compute_units.value
|
|
|
|
performance_level = max_compute_units
|
|
|
|
if vendor_id == 0x8086: # Intel device
|
|
performance_level -= 1000
|
|
|
|
devices.append( DeviceInfo(index=device_index, name=device_name, total_memory=device_total_memory, performance_level=performance_level ) )
|
|
|
|
return devices
|
|
|
|
def get_default_device() -> Union[Device, None]:
|
|
global _default_device
|
|
if _default_device is None:
|
|
_default_device = get_best_device()
|
|
return _default_device
|
|
|
|
def set_default_device(device : Device):
|
|
if not isinstance(device, Device):
|
|
raise ValueError('device must be an instance of Device')
|
|
|
|
global _default_device
|
|
_default_device = device
|
|
|
|
def get_device(arg : Union[None, int, Device, DeviceInfo]) -> Union[Device, None]:
|
|
"""
|
|
get physical TensorCL device.
|
|
|
|
arg None - get default device
|
|
int - by index
|
|
DeviceInfo - by device info
|
|
Device - returns the same
|
|
"""
|
|
global _devices
|
|
|
|
if arg is None:
|
|
return get_default_device()
|
|
elif isinstance(arg, int):
|
|
devices_info = get_available_devices_info()
|
|
if arg < len(devices_info):
|
|
arg = devices_info[arg]
|
|
else:
|
|
return None
|
|
elif isinstance(arg, Device):
|
|
return arg
|
|
elif not isinstance(arg, DeviceInfo):
|
|
raise ValueError(f'Unknown type of arg {arg.__class__}')
|
|
|
|
device = _devices.get(arg, None)
|
|
if device is None:
|
|
device = _devices[arg] = Device(arg, _check=1)
|
|
|
|
return device
|
|
|
|
def get_best_device() -> Union[Device, None]:
|
|
"""
|
|
returns best device from available.
|
|
"""
|
|
perf_level = -999999
|
|
result = None
|
|
for device_info in get_available_devices_info():
|
|
dev_perf_level = device_info.get_performance_level()
|
|
if perf_level < dev_perf_level:
|
|
perf_level = dev_perf_level
|
|
result = device_info
|
|
if result is not None:
|
|
result = get_device(result)
|
|
return result
|
|
|
|
def cleanup_devices():
|
|
global _devices
|
|
|
|
for device in list(_devices.values()):
|
|
device.cleanup()
|
|
_devices = {}
|