DeepFaceLive/xlib/mp/MPSPSCMRRingData.py
2021-11-07 10:03:15 +04:00

186 lines
5.7 KiB
Python

import multiprocessing
from operator import mul
import uuid
from typing import Union
from ..io import FormattedMemoryViewIO
from .MPSharedMemory import MPSharedMemory
class MPSPSCMRRingData:
"""
Multiprocess lockless Single Producer, Single Consumer, Multi Reader Ring Data.
Producer knows how many data is read by Consumer (by accessing read_id)
Side readers can read last data without locks.
The data returned is either valid or None.
"""
def __init__(self, table_size, heap_size_mb, multi_producer : bool = False):
self._table_size = table_size
self._heap_size = heap_size = heap_size_mb*1024*1024
self._write_lock = multiprocessing.Lock() if multi_producer else None
self._event = multiprocessing.Event()
self._sizeof_uuid = 16
table_item_size = self._table_item_size = 8+8+self._sizeof_uuid
self._table_offset = 8+8
self._heap_offset = self._table_offset + table_size*table_item_size
self._shared_mem = MPSharedMemory(8+8+table_size*table_item_size + heap_size)
self._initialize_mvs()
self._mv_ids[0] = 0 # write_id
self._mv_ids[1] = 0 # read_id
# Initialize first block at 0 index
wid = 0
wid_uuid = uuid.uuid4().bytes
wid_heap_offset = 0
wid_data_size = 0
fmv = FormattedMemoryViewIO(self._shared_mem.get_mv())
fmv.seek(self._table_offset + (wid % self._table_size)*self._table_item_size)
fmv.write_fmt('QQ', wid_heap_offset, wid_data_size), fmv.write(wid_uuid)
def _initialize_mvs(self):
mv = self._shared_mem.get_mv()
self._mv_ids = mv.cast('Q')[0:2]
def __getstate__(self):
d = self.__dict__.copy()
d.pop('_mv_ids')
return d
def __setstate__(self, d):
self.__dict__.update(d)
self._initialize_mvs()
def get_write_id(self) -> int: return self._mv_ids[0]
def get_read_id(self) -> int: return self._mv_ids[1]
def write(self, data : Union[bytes, bytearray]):
"""
write data incrementing write_id
"""
heap_size = self._heap_size
if not isinstance(data, (bytes, bytearray)):
raise ValueError('data must be an instance of bytes or bytearray')
data_size = len(data)
if data_size == 0:
raise ValueError('data_size must be > 0')
data_size_in_heap = data_size+self._sizeof_uuid
data_size_in_heap = ( data_size_in_heap + (-data_size_in_heap & 7) )
if data_size_in_heap > heap_size:
raise Exception('data_size more than heap_size')
fmv = FormattedMemoryViewIO(self._shared_mem.get_mv())
wid_uuid = uuid.uuid4().bytes
if self._write_lock is not None:
self._write_lock.acquire()
wid = self._mv_ids[0]
# Read table record of wid
fmv.seek(self._table_offset + (wid % self._table_size)*self._table_item_size)
(wid_heap_offset, wid_data_size), _ = fmv.read_fmt('QQ'), fmv.read(self._sizeof_uuid)
# Calc aligned next offset
wid_heap_offset += self._sizeof_uuid + wid_data_size
wid_heap_offset = ( wid_heap_offset + (-wid_heap_offset & 7) )
# Check if next offset with data size fit remain heap space
if wid_heap_offset+data_size_in_heap >= heap_size:
wid_heap_offset = 0
# Write the data into heap
fmv.seek(self._heap_offset + wid_heap_offset)
fmv.write(wid_uuid)
fmv.write(data)
# Write new table record
wid += 1
fmv.seek(self._table_offset + (wid % self._table_size)*self._table_item_size)
fmv.write_fmt('QQ', wid_heap_offset, data_size), fmv.write(wid_uuid)
# Set new write_id
self._mv_ids[0] = wid
if self._write_lock is not None:
self._write_lock.release()
self._event.set()
def get_by_id(self, id) -> Union[bytearray, None]:
"""
get data by id
"""
fmv = FormattedMemoryViewIO(self._shared_mem.get_mv())
sizeof_uuid = self._sizeof_uuid
# Read table record
fmv.seek(self._table_offset + (id % self._table_size)*self._table_item_size)
(rid_heap_offset, rid_data_size), rid_uuid = fmv.read_fmt('QQ'), fmv.read(sizeof_uuid)
if rid_data_size == 0:
return None
# Seek to the data in the heap
fmv.seek(self._heap_offset + rid_heap_offset)
# Check data validness
if fmv.read(sizeof_uuid) != rid_uuid:
return None
# read the data
result = fmv.read(rid_data_size)
# Check data validness again
fmv.seek(self._heap_offset + rid_heap_offset)
if fmv.read(sizeof_uuid) != rid_uuid:
return None
return result
def read(self, timeout=0, update_rid=True) -> Union[bytearray, None]:
"""
read data incrementing read_id
"""
if self._mv_ids[0] == self._mv_ids[1]:
if timeout == 0:
return None
if not self._event.wait(timeout):
return None
self._event.clear()
wid, rid = self._mv_ids[0], self._mv_ids[1]
result = None
while rid < wid:
rid = rid+1
result = self.get_by_id(rid)
if result is not None:
break
if update_rid:
self._mv_ids[1] = rid
return result
# def wait_for_read(self, timeout : float) -> bool:
# """
# returns True if ready to .read()
# """
# result = self._event.wait(timeout)
# if result:
# self._event.clear()
# return result