mirror of
https://github.com/iperov/DeepFaceLive.git
synced 2024-12-25 07:21:13 -08:00
72 lines
2.0 KiB
Python
72 lines
2.0 KiB
Python
import multiprocessing
|
|
from multiprocessing.connection import Connection
|
|
|
|
class PMPI:
|
|
"""
|
|
Paired Message Processing Interface
|
|
|
|
send and recv messages between processes via pipe
|
|
"""
|
|
def __init__(self, pipe : Connection = None):
|
|
self.pipe = pipe
|
|
self.funcs = {}
|
|
|
|
def set_pipe(self, pipe):
|
|
self.pipe = pipe
|
|
|
|
def call_on_msg(self, name, func):
|
|
"""
|
|
Call func on received 'name' message
|
|
"""
|
|
if func is None:
|
|
return
|
|
d = self.funcs
|
|
ar = d.get(name, None)
|
|
if ar is None:
|
|
d[name] = ar = []
|
|
ar.append(func)
|
|
|
|
def send_msg(self, name, *args, **kwargs):
|
|
"""
|
|
send message with name and args/kwargs
|
|
"""
|
|
if self.pipe is not None:
|
|
self.pipe.send( (name, args, kwargs) )
|
|
|
|
def process_messages(self, timeout=0):
|
|
"""
|
|
arguments
|
|
|
|
timeout float sec
|
|
"""
|
|
pipe = self.pipe
|
|
|
|
try:
|
|
if pipe is not None and pipe.poll(timeout): # poll with timeout only once
|
|
while True:
|
|
name, args, kwargs = pipe.recv()
|
|
funcs = self.funcs.get(name, None)
|
|
if funcs is not None:
|
|
for func in funcs:
|
|
func(*args, **kwargs)
|
|
|
|
pipe = self.pipe
|
|
if pipe is not None and pipe.poll():
|
|
continue
|
|
break
|
|
except BrokenPipeError as e:
|
|
self.pipe = None
|
|
|
|
# def wait_message(self, name):
|
|
# """
|
|
# Wait only specific message and ignore all others.
|
|
# returns args, kwargs
|
|
# """
|
|
# pipe = self.pipe
|
|
# if pipe is not None:
|
|
# while True:
|
|
# if pipe.poll(None):
|
|
# msg_name, args, kwargs = pipe.recv()
|
|
# if name == msg_name:
|
|
# return args, kwargs
|
|
# return [], {} |