DeepFaceLive/xlib/player/FramePlayer.py
2021-08-26 09:08:15 +04:00

273 lines
8.8 KiB
Python

from datetime import datetime
from typing import Tuple
import numpy as np
from ..image import ImageProcessor
from ..python import Disposable
class FramePlayer(Disposable):
"""
Base class for players based on fixed number of frames
"""
class Frame:
__slots__ = ['image','timestamp','fps','frame_num','frame_count','name','error']
image : np.ndarray
timestamp : float
fps : float
frame_num : int
frame_count : int
name : str
def __init__(self):
self.image = None
self.timestamp = None
self.fps = None
self.frame_num = None
self.frame_count = None
self.name = None
def __init__(self, default_fps, frame_count):
if frame_count == 0:
raise Exception('Frames count are 0.')
self._default_fps = default_fps
self._frame_count = frame_count
self._is_realtime = True
self._is_autorewind = False
self._fps = 0
self._target_width = 0
self._is_playing = False
self._frame_idx = 0
self._frame_timestamp = None
self._req_is_playing = None
self._req_frame_seek_idx = None
self._cached_frames = {}
self._cached_frames_idxs = []
def is_playing(self): return self._is_playing
def get_frame_count(self): return self._frame_count
def get_frame_idx(self): return self._frame_idx
def get_is_autorewind(self): return self._is_autorewind
def set_is_autorewind(self, is_autorewind):
if not isinstance(is_autorewind, bool):
raise ValueError('is_autorewind must be an instance of bool')
self._is_autorewind = is_autorewind
return self._is_autorewind
def get_is_realtime(self): return self._is_realtime
def set_is_realtime(self, is_realtime):
if not isinstance(is_realtime, bool):
raise ValueError('is_realtime must be an instance of bool')
self._is_realtime = is_realtime
return self._is_realtime
def get_fps(self): return self._fps
def set_fps(self, fps):
"""
set new FPS.
Returns adjusted FPS.
"""
if not isinstance(fps, (int, float) ):
raise ValueError('fps must be an instance of int/float')
self._fps = float(np.clip(fps, 0, 240))
self._on_fps_changed()
return self._fps
def get_target_width(self): return self._target_width
def set_target_width(self, target_width):
"""
0 - auto
4..4096
returns adjusted target_width
"""
if not isinstance(target_width, (int,float) ):
raise ValueError('target_width must be an instance of int/float')
target_width = int(target_width)
target_width = (target_width // 4) * 4
target_width = int(np.clip(target_width, 0, 4096))
self._target_width = target_width
self._on_target_width_changed()
return self._target_width
def _on_play_start(self):
"""@overridable"""
def _on_play_stop(self):
"""@overridable"""
def _on_fps_changed(self):
"""@overridable"""
def _on_target_width_changed(self):
"""@overridable"""
def _on_get_frame(self, idx) -> Tuple[np.ndarray, str]:
"""
@overridable
return (image_of_frame, name_or_error)
if image_of_frame is None, then specify an error to 'name_or_error'
otherwise 'name_or_error' is a name of frame
"""
return None
def req_frame_seek(self, idx, mode):
"""
Request to seek to specified frame idx
"""
self._req_frame_seek_idx = (idx, mode)
def req_play_start(self):
"""
Request to start playing.
"""
self._req_is_playing = True
def req_play_stop(self):
"""
Request to stop playing.
"""
self._req_is_playing = False
class ProcessResult:
__slots__ = ['new_is_playing','new_frame_idx','new_frame','new_error']
def __init__(self):
self.new_is_playing = None
self.new_frame_idx = None
self.new_frame = None
self.new_error : str = None
def process(self) -> 'FramePlayer.ProcessResult':
"""
processes inner logic
returns FramePlayer.ProcessResult()
"""
# Process player logic.
new_is_playing = None
new_frame_idx = None
new_frame_timestamp = None
update_frame = False
fps = self._fps
if fps == 0:
fps = self._default_fps
result = FramePlayer.ProcessResult()
if self._is_playing:
if self._is_realtime:
diff_frames = int( (datetime.now().timestamp() - self._frame_timestamp) / (1.0/fps) )
if diff_frames != 0:
new_frame_idx = self._frame_idx + diff_frames
new_frame_timestamp = self._frame_timestamp + diff_frames * (1.0/fps)
else:
new_frame_idx = self._frame_idx + 1
new_frame_timestamp = datetime.now().timestamp()
if self._req_frame_seek_idx is not None:
# User frame seek overrides new frame idx
seek_idx, seek_mode = self._req_frame_seek_idx
if seek_mode == 0:
new_frame_idx = seek_idx
elif seek_mode == 1:
new_frame_idx = self._frame_idx + seek_idx
elif seek_mode == 2:
new_frame_idx = self._frame_count - seek_idx -1
new_frame_timestamp = datetime.now().timestamp()
if new_frame_idx is not None:
# new_frame_idx mean the frame should be updated, even if idx is not changed
update_frame = True
if new_frame_idx < 0 or new_frame_idx >= self._frame_count:
# End of frames reached
if self._is_autorewind:
# AutoRewind
new_frame_idx %= self._frame_count
else:
# No AutoRewind. Stop at last frame, but don't update frame
if new_frame_idx < 0:
new_frame_idx = 0
else:
new_frame_idx = self._frame_count-1
update_frame = False
new_is_playing = False
if self._frame_idx != new_frame_idx:
self._frame_idx = result.new_frame_idx = new_frame_idx
if new_frame_timestamp is not None:
self._frame_timestamp = new_frame_timestamp
if new_is_playing is None:
# new_is_playing is not changed by system, now can handle user request
new_is_playing = self._req_is_playing
if new_is_playing is not None and new_is_playing and not self._is_playing:
# Start playing
result.new_is_playing = self._is_playing = True
self._frame_timestamp = datetime.now().timestamp()
self._on_play_start()
update_frame = True
if update_frame:
# Frame changed, construct Frame() with current values
_frame_idx = self._frame_idx
_cached_frames = self._cached_frames
_cached_frames_idxs = self._cached_frames_idxs
frame_tuple = _cached_frames.get(_frame_idx, None)
if frame_tuple is None:
frame_tuple = self._on_get_frame(_frame_idx)
_cached_frames[_frame_idx] = frame_tuple
_cached_frames_idxs.insert(0, _frame_idx)
if len(_cached_frames_idxs) > 5:
_cached_frames.pop(_cached_frames_idxs.pop(-1))
frame_image, name_or_err = frame_tuple
if frame_image is None:
# frame is not provided, stop playing
new_is_playing = False
result.new_error = name_or_err
else:
# frame is provided.
p_frame = result.new_frame = FramePlayer.Frame()
p_frame.fps = fps
p_frame.timestamp = self._frame_timestamp
p_frame.frame_num = _frame_idx
p_frame.frame_count = self._frame_count
ip = ImageProcessor(frame_image)
if self._target_width != 0:
ip.fit_in(TW=self._target_width)
frame_image = ip.ch(3).get_image('HWC')
p_frame.image = frame_image
p_frame.name = name_or_err
if new_is_playing is not None and self._is_playing and not new_is_playing:
# Stop playing
result.new_is_playing = self._is_playing = False
self._on_play_stop()
if self._req_is_playing is not None or self._req_frame_seek_idx is not None:
self._req_is_playing = None
self._req_frame_seek_idx = None
return result