DeepFaceLive/xlib/mp/PMPI.py
2021-07-23 17:34:49 +04:00

72 lines
2.0 KiB
Python

import multiprocessing
from multiprocessing.connection import Connection
class PMPI:
"""
Paired Message Processing Interface
send and recv messages between processes via pipe
"""
def __init__(self, pipe : Connection = None):
self.pipe = pipe
self.funcs = {}
def set_pipe(self, pipe):
self.pipe = pipe
def call_on_msg(self, name, func):
"""
Call func on received 'name' message
"""
if func is None:
return
d = self.funcs
ar = d.get(name, None)
if ar is None:
d[name] = ar = []
ar.append(func)
def send_msg(self, name, *args, **kwargs):
"""
send message with name and args/kwargs
"""
if self.pipe is not None:
self.pipe.send( (name, args, kwargs) )
def process_messages(self, timeout=0):
"""
arguments
timeout float sec
"""
pipe = self.pipe
try:
if pipe is not None and pipe.poll(timeout): # poll with timeout only once
while True:
name, args, kwargs = pipe.recv()
funcs = self.funcs.get(name, None)
if funcs is not None:
for func in funcs:
func(*args, **kwargs)
pipe = self.pipe
if pipe is not None and pipe.poll():
continue
break
except BrokenPipeError as e:
self.pipe = None
# def wait_message(self, name):
# """
# Wait only specific message and ignore all others.
# returns args, kwargs
# """
# pipe = self.pipe
# if pipe is not None:
# while True:
# if pipe.poll(None):
# msg_name, args, kwargs = pipe.recv()
# if name == msg_name:
# return args, kwargs
# return [], {}