DeepFaceLive/xlib/io/IOThreadLinesReader.py
2021-08-09 22:53:01 +04:00

57 lines
1.3 KiB
Python

import threading
import time
from io import IOBase
from typing import List
from collections import deque
class IOThreadLinesReader:
"""
continuously reads lines from IO in background thread.
"""
def __init__(self, io : IOBase, max_lines=None):
self._io = io
self._lock = threading.Lock()
self._lines = deque(maxlen=max_lines)
threading.Thread(target=self._proc, daemon=True).start()
def _proc(self):
io = self._io
lock = self._lock
lines = self._lines
while not io.closed and io.readable():
line = io.readline()
lock.acquire()
lines.append(line.decode('utf-8').rstrip())
lock.release()
if len(line) == 0:
break
time.sleep(0.01)
def get_lines(self, wait_new=True, till_eof=False) -> List[str]:
"""
"""
lock = self._lock
lines = self._lines
result = []
while True:
if len(lines) != 0:
lock.acquire()
result += lines
lines.clear()
lock.release()
if till_eof and len(result[-1]) != 0:
continue
return result
if not till_eof and not wait_new:
return None
time.sleep(0.001)