mirror of
https://github.com/iperov/DeepFaceLive.git
synced 2024-12-25 07:21:13 -08:00
223 lines
6.9 KiB
Python
223 lines
6.9 KiB
Python
import multiprocessing
|
|
import uuid
|
|
from typing import Union
|
|
|
|
from ..io import FormattedMemoryViewIO
|
|
from .MPSharedMemory import MPSharedMemory
|
|
|
|
|
|
class MPWeakHeap:
|
|
"""
|
|
Multiprocess weak heap.
|
|
|
|
|
|
heap structure
|
|
|
|
|ring_head_block_offset block block block ...|
|
|
|
|
block structure:
|
|
---
|
|
(8) block_size
|
|
|
|
(8) data_size
|
|
|
|
(16) UUID sig
|
|
|
|
...data...
|
|
|
|
"""
|
|
class DataRef:
|
|
def __init__(self, block_offset, uuid : bytes):
|
|
self._block_offset = block_offset
|
|
self._uuid = uuid
|
|
|
|
def __init__(self, size_mb : int):
|
|
|
|
|
|
self._heap_size = size_mb * 1024 * 1024 # should be 16 byte aligned
|
|
self._shared_mem = MPSharedMemory(self._heap_size)
|
|
self._lock = multiprocessing.Lock()
|
|
|
|
# Initialize heap structure
|
|
self._ring_head_block_offset = 0
|
|
self._first_block_offset = 8
|
|
self._block_header_size = 8+8+16
|
|
self._block_data_start_offset = 8+8+16
|
|
|
|
fmv = FormattedMemoryViewIO(self._shared_mem.get_mv())
|
|
fmv.seek(self._ring_head_block_offset), fmv.write_fmt('q', self._first_block_offset)
|
|
|
|
# Entire block
|
|
fmv.seek(self._first_block_offset)
|
|
fmv.write_fmt('qq', self._heap_size-self._first_block_offset, 0), fmv.write(uuid.uuid4().bytes)
|
|
|
|
|
|
def add_data(self, data : Union[bytes, bytearray, memoryview] ) -> 'MPWeakHeap.DataRef':
|
|
"""
|
|
add the data to the head of ring
|
|
|
|
data
|
|
|
|
"""
|
|
heap_size = self._heap_size
|
|
block_header_size = self._block_header_size
|
|
|
|
if isinstance(data, memoryview):
|
|
data = data.cast('B')
|
|
if not data.contiguous:
|
|
raise ValueError('data as memoryview should be contiguous')
|
|
data_size = data.nbytes
|
|
else:
|
|
data_size = len(data)
|
|
|
|
lock = self._lock
|
|
fmv = FormattedMemoryViewIO(self._shared_mem.get_mv())
|
|
lock.acquire()
|
|
|
|
# start from ring_head_block_offset
|
|
fmv.seek(self._ring_head_block_offset)
|
|
cur_block_offset, = fmv.read_fmt('q')
|
|
|
|
while True:
|
|
fmv.seek(cur_block_offset)
|
|
block_size, = fmv.get_fmt('q')
|
|
block_free_size = block_size - block_header_size
|
|
|
|
if data_size <= block_free_size:
|
|
# the space of the block is enough for the data
|
|
|
|
block_new_size = block_header_size + ( data_size + (-data_size & 7) )
|
|
block_remain_size = block_size-block_new_size
|
|
|
|
if block_remain_size >= block_header_size:
|
|
# the remain space of the block is enough for next block, split the block
|
|
next_block_offset = cur_block_offset + block_new_size
|
|
fmv.seek(next_block_offset), fmv.write_fmt('qq', block_remain_size, 0), fmv.write(uuid.uuid4().bytes)
|
|
else:
|
|
# otherwise do not split
|
|
next_block_offset = cur_block_offset + block_size
|
|
if next_block_offset >= heap_size:
|
|
next_block_offset = self._first_block_offset
|
|
block_new_size = block_size
|
|
|
|
# update current block structure
|
|
uid = uuid.uuid4().bytes
|
|
fmv.seek(cur_block_offset), fmv.write_fmt('qq', block_new_size, data_size ), fmv.write(uid)
|
|
|
|
# update ring_head_block_offset
|
|
fmv.seek(self._ring_head_block_offset), fmv.write_fmt('q', next_block_offset)
|
|
|
|
lock.release()
|
|
|
|
# write the data into the block
|
|
fmv.seek(cur_block_offset+self._block_data_start_offset)
|
|
fmv.write(data)
|
|
|
|
return MPWeakHeap.DataRef(cur_block_offset, uid)
|
|
else:
|
|
# the space of the block is not enough for the daata
|
|
is_first_block = cur_block_offset == self._first_block_offset
|
|
is_last_block = (cur_block_offset+block_size) >= heap_size
|
|
|
|
if is_last_block:
|
|
if is_first_block:
|
|
lock.release()
|
|
raise Exception(f'Not enough space in MPWeakHeap to allocate {data_size}')
|
|
|
|
# if it is last block, leave it unchanged, and continue with first block
|
|
cur_block_offset = self._first_block_offset
|
|
continue
|
|
else:
|
|
# not last block, merge with next block
|
|
|
|
# get next block size
|
|
fmv.seek(cur_block_offset+block_size)
|
|
next_block_size, = fmv.get_fmt('q')
|
|
|
|
# erase data of next block
|
|
fmv.write_fmt('qq', 0, 0), fmv.write(uuid.uuid4().bytes)
|
|
|
|
# overwrite current block size with expanded block size
|
|
fmv.seek(cur_block_offset)
|
|
fmv.write_fmt('q', block_size+next_block_size)
|
|
|
|
# continue with the same expanded block
|
|
continue
|
|
|
|
|
|
def get_data(self, data_ref : 'MPWeakHeap.DataRef') -> Union[bytearray, None]:
|
|
"""
|
|
Get data
|
|
|
|
if data is overwritten already, None will be returned
|
|
"""
|
|
lock = self._lock
|
|
fmv = FormattedMemoryViewIO(self._shared_mem.get_mv())
|
|
|
|
# short lock to get data info
|
|
fmv.seek(data_ref._block_offset)
|
|
lock.acquire()
|
|
(_, data_size), uuid = fmv.read_fmt('qq'), fmv.read(16)
|
|
lock.release()
|
|
|
|
# Check valid UUID
|
|
if data_ref._uuid != uuid:
|
|
return None
|
|
|
|
# read the data
|
|
result = fmv.read(data_size)
|
|
|
|
# short lock again to validate that the reference is still valid,
|
|
# thus we read valid data
|
|
fmv.seek(data_ref._block_offset)
|
|
lock.acquire()
|
|
(_, data_size), uuid = fmv.read_fmt('qq'), fmv.read(16)
|
|
lock.release()
|
|
|
|
# Check valid UUID
|
|
if data_ref._uuid != uuid:
|
|
return None
|
|
|
|
return result
|
|
|
|
def summary(self) -> str:
|
|
"""
|
|
returns a string with summary of heap
|
|
"""
|
|
result = []
|
|
|
|
|
|
heap_size = self._heap_size
|
|
fmv = FormattedMemoryViewIO(self._shared_mem.get_mv())
|
|
|
|
lock = self._lock
|
|
lock.acquire()
|
|
|
|
head_block_offset, = fmv.read_fmt('q')
|
|
|
|
cur_block_offset = self._first_block_offset
|
|
|
|
block_id = 0
|
|
while cur_block_offset != self._heap_size:
|
|
fmv.seek(cur_block_offset)
|
|
(block_size, data_size), sig = fmv.read_fmt('qq'), fmv.read(16)
|
|
|
|
|
|
s = ''
|
|
if cur_block_offset == head_block_offset:
|
|
s += f'[{block_id} HEAD]:'
|
|
else:
|
|
s += f'[{block_id}]:'
|
|
|
|
if data_size != 0:
|
|
s += f'block_size: {block_size} data_size:{data_size}'
|
|
else:
|
|
s += f'block_size: {block_size} empty'
|
|
result.append(s)
|
|
block_id += 1
|
|
|
|
cur_block_offset += block_size
|
|
lock.release()
|
|
|
|
return '\n'.join(result)
|