DeepFaceLive/xlib/mt/MTOrderedData.py
2021-09-29 16:54:47 +04:00

90 lines
2.1 KiB
Python

from collections import deque
from queue import Queue
from typing import Any, Union, Tuple
def MTOrderedData(queue_size=0):
"""
Multithreaded ordered work.
Ensures the order of work done by threads.
returns (host,client) classes.
"""
h2c, c2h = Queue(maxsize=queue_size), Queue(maxsize=queue_size)
host = _MTOrderedDataHost(h2c, c2h)
cli = _MTOrderedDataClient(h2c, c2h)
return host, cli
class _MTOrderedDataHost:
"""
"""
def __init__(self, h2c : Queue, c2h : Queue):
self._h2c = h2c
self._c2h = c2h
self._counter = 0
self._sent_ids = deque()
self._done_datas = {}
def send(self, data):
"""
send the data to the clients
"""
if data is None:
raise ValueError('data cannot be None')
c = self._counter
self._counter += 1
self._sent_ids.append(c)
self._h2c.put( (c, data) )
def recv(self) -> Union[Any, None]:
sent_ids = self._sent_ids
if len(sent_ids) != 0:
done_datas = self._done_datas
while not self._c2h.empty():
id, data = self._c2h.get()
done_datas[id] = data
id = sent_ids[0]
if id in done_datas:
done_data = done_datas.pop(id)
sent_ids.popleft()
print('len(sent_ids) ', len(sent_ids))
return done_data
return None
class _MTOrderedDataClient:
def __init__(self, h2c : Queue, c2h : Queue):
self._h2c = h2c
self._c2h = c2h
def send(self, data_id, data):
"""
"""
self._c2h.put( (data_id, data) )
def recv(self, wait=True) -> Tuple[int, Any]:
"""
returns ( data_id(int), data(Any) ) or None
"""
h2c = self._h2c
if not wait and h2c.empty():
return None
id, data = h2c.get()
return id, data