mirror of
https://github.com/iperov/DeepFaceLab.git
synced 2025-03-12 20:42:45 -07:00
845 lines
38 KiB
Python
845 lines
38 KiB
Python
import math
|
|
import multiprocessing
|
|
import operator
|
|
import os
|
|
import pickle
|
|
import shutil
|
|
import sys
|
|
import time
|
|
import traceback
|
|
from pathlib import Path
|
|
|
|
import cv2
|
|
import numpy as np
|
|
import numpy.linalg as npla
|
|
|
|
import imagelib
|
|
import samplelib
|
|
from converters import (ConverterConfig, ConvertFaceAvatar, ConvertMasked,
|
|
FrameInfo)
|
|
from facelib import FaceType, LandmarksProcessor
|
|
from interact import interact as io
|
|
from joblib import SubprocessFunctionCaller, Subprocessor
|
|
from nnlib import TernausNet
|
|
from utils import Path_utils
|
|
from utils.cv2_utils import *
|
|
from DFLIMG import DFLIMG
|
|
|
|
from .ConverterScreen import Screen, ScreenManager
|
|
|
|
CONVERTER_DEBUG = False
|
|
|
|
class ConvertSubprocessor(Subprocessor):
|
|
|
|
class Frame(object):
|
|
def __init__(self, prev_temporal_frame_infos=None,
|
|
frame_info=None,
|
|
next_temporal_frame_infos=None):
|
|
self.prev_temporal_frame_infos = prev_temporal_frame_infos
|
|
self.frame_info = frame_info
|
|
self.next_temporal_frame_infos = next_temporal_frame_infos
|
|
self.output_filename = None
|
|
|
|
self.idx = None
|
|
self.cfg = None
|
|
self.is_done = False
|
|
self.is_processing = False
|
|
self.is_shown = False
|
|
self.image = None
|
|
|
|
class ProcessingFrame(object):
|
|
def __init__(self, idx=None,
|
|
cfg=None,
|
|
prev_temporal_frame_infos=None,
|
|
frame_info=None,
|
|
next_temporal_frame_infos=None,
|
|
output_filename=None,
|
|
need_return_image = False):
|
|
self.idx = idx
|
|
self.cfg = cfg
|
|
self.prev_temporal_frame_infos = prev_temporal_frame_infos
|
|
self.frame_info = frame_info
|
|
self.next_temporal_frame_infos = next_temporal_frame_infos
|
|
self.output_filename = output_filename
|
|
|
|
self.need_return_image = need_return_image
|
|
if self.need_return_image:
|
|
self.image = None
|
|
|
|
class Cli(Subprocessor.Cli):
|
|
|
|
#override
|
|
def on_initialize(self, client_dict):
|
|
self.log_info ('Running on %s.' % (client_dict['device_name']) )
|
|
self.device_idx = client_dict['device_idx']
|
|
self.device_name = client_dict['device_name']
|
|
self.predictor_func = client_dict['predictor_func']
|
|
self.predictor_input_shape = client_dict['predictor_input_shape']
|
|
self.superres_func = client_dict['superres_func']
|
|
|
|
#transfer and set stdin in order to work code.interact in debug subprocess
|
|
stdin_fd = client_dict['stdin_fd']
|
|
if stdin_fd is not None:
|
|
sys.stdin = os.fdopen(stdin_fd)
|
|
|
|
from nnlib import nnlib
|
|
#model process ate all GPU mem,
|
|
#so we cannot use GPU for any TF operations in converter processes
|
|
#therefore forcing active_DeviceConfig to CPU only
|
|
nnlib.active_DeviceConfig = nnlib.DeviceConfig (cpu_only=True)
|
|
|
|
def blursharpen_func (img, sharpen_mode=0, kernel_size=3, amount=100):
|
|
if kernel_size % 2 == 0:
|
|
kernel_size += 1
|
|
if amount > 0:
|
|
if sharpen_mode == 1: #box
|
|
kernel = np.zeros( (kernel_size, kernel_size), dtype=np.float32)
|
|
kernel[ kernel_size//2, kernel_size//2] = 1.0
|
|
box_filter = np.ones( (kernel_size, kernel_size), dtype=np.float32) / (kernel_size**2)
|
|
kernel = kernel + (kernel - box_filter) * amount
|
|
return cv2.filter2D(img, -1, kernel)
|
|
elif sharpen_mode == 2: #gaussian
|
|
blur = cv2.GaussianBlur(img, (kernel_size, kernel_size) , 0)
|
|
img = cv2.addWeighted(img, 1.0 + (0.5 * amount), blur, -(0.5 * amount), 0)
|
|
return img
|
|
elif amount < 0:
|
|
n = -amount
|
|
while n > 0:
|
|
|
|
img_blur = cv2.medianBlur(img, 5)
|
|
if int(n / 10) != 0:
|
|
img = img_blur
|
|
else:
|
|
pass_power = (n % 10) / 10.0
|
|
img = img*(1.0-pass_power)+img_blur*pass_power
|
|
n = max(n-10,0)
|
|
|
|
return img
|
|
return img
|
|
self.blursharpen_func = blursharpen_func
|
|
|
|
self.fanseg_by_face_type = {}
|
|
self.fanseg_input_size = 256
|
|
|
|
def fanseg_extract(face_type, *args, **kwargs):
|
|
fanseg = self.fanseg_by_face_type.get(face_type, None)
|
|
if self.fanseg_by_face_type.get(face_type, None) is None:
|
|
fanseg = TernausNet("FANSeg", self.fanseg_input_size , FaceType.toString( face_type ) )
|
|
self.fanseg_by_face_type[face_type] = fanseg
|
|
|
|
return fanseg.extract(*args, **kwargs)
|
|
|
|
self.fanseg_extract_func = fanseg_extract
|
|
|
|
self.fanchq_by_face_type = {}
|
|
self.fanchq_input_size = 256
|
|
def fanchq_extract(face_type, *args, **kwargs):
|
|
fanchq = self.fanchq_by_face_type.get(face_type, None)
|
|
if self.fanchq_by_face_type.get(face_type, None) is None:
|
|
fanchq = TernausNet("FANCHQ", self.fanchq_input_size , FaceType.toString( face_type ) )
|
|
self.fanchq_by_face_type[face_type] = fanchq
|
|
|
|
return fanchq.extract(*args, **kwargs)
|
|
|
|
self.fanchq_extract_func = fanchq_extract
|
|
|
|
import ebsynth
|
|
def ebs_ct(*args, **kwargs):
|
|
return ebsynth.color_transfer(*args, **kwargs)
|
|
|
|
self.ebs_ct_func = ebs_ct
|
|
|
|
return None
|
|
|
|
#override
|
|
def process_data(self, pf): #pf=ProcessingFrame
|
|
cfg = pf.cfg.copy()
|
|
cfg.blursharpen_func = self.blursharpen_func
|
|
cfg.superres_func = self.superres_func
|
|
cfg.ebs_ct_func = self.ebs_ct_func
|
|
|
|
frame_info = pf.frame_info
|
|
|
|
filename = frame_info.filename
|
|
landmarks_list = frame_info.landmarks_list
|
|
|
|
filename_path = Path(filename)
|
|
output_filename = pf.output_filename
|
|
need_return_image = pf.need_return_image
|
|
|
|
if len(landmarks_list) == 0:
|
|
self.log_info ( 'no faces found for %s, copying without faces' % (filename_path.name) )
|
|
|
|
if cfg.export_mask_alpha:
|
|
img_bgr = cv2_imread(filename)
|
|
h,w,c = img_bgr.shape
|
|
if c == 1:
|
|
img_bgr = np.repeat(img_bgr, 3, -1)
|
|
if c == 3:
|
|
img_bgr = np.concatenate ([img_bgr, np.zeros((h,w,1), dtype=img_bgr.dtype) ], axis=-1)
|
|
|
|
cv2_imwrite (output_filename, img_bgr)
|
|
else:
|
|
if filename_path.suffix == '.png':
|
|
shutil.copy (filename, output_filename )
|
|
else:
|
|
img_bgr = cv2_imread(filename)
|
|
cv2_imwrite (output_filename, img_bgr)
|
|
|
|
if need_return_image:
|
|
img_bgr = cv2_imread(filename)
|
|
pf.image = img_bgr
|
|
else:
|
|
if cfg.type == ConverterConfig.TYPE_MASKED:
|
|
cfg.fanseg_input_size = self.fanseg_input_size
|
|
cfg.fanseg_extract_func = self.fanseg_extract_func
|
|
cfg.fanchq_input_size = self.fanchq_input_size
|
|
cfg.fanchq_extract_func = self.fanchq_extract_func
|
|
|
|
try:
|
|
final_img = ConvertMasked (self.predictor_func, self.predictor_input_shape, cfg, frame_info)
|
|
except Exception as e:
|
|
e_str = traceback.format_exc()
|
|
if 'MemoryError' in e_str:
|
|
raise Subprocessor.SilenceException
|
|
else:
|
|
raise Exception( 'Error while converting file [%s]: %s' % (filename, e_str) )
|
|
|
|
elif cfg.type == ConverterConfig.TYPE_FACE_AVATAR:
|
|
final_img = ConvertFaceAvatar (self.predictor_func, self.predictor_input_shape,
|
|
cfg, pf.prev_temporal_frame_infos,
|
|
pf.frame_info,
|
|
pf.next_temporal_frame_infos )
|
|
|
|
if output_filename is not None and final_img is not None:
|
|
cv2_imwrite (output_filename, final_img )
|
|
|
|
if need_return_image:
|
|
pf.image = final_img
|
|
|
|
return pf
|
|
|
|
#overridable
|
|
def get_data_name (self, pf):
|
|
#return string identificator of your data
|
|
return pf.frame_info.filename
|
|
|
|
#override
|
|
def __init__(self, is_interactive, converter_session_filepath, predictor_func, predictor_input_shape, converter_config, frames, output_path, model_iter):
|
|
if len (frames) == 0:
|
|
raise ValueError ("len (frames) == 0")
|
|
|
|
super().__init__('Converter', ConvertSubprocessor.Cli, 86400 if CONVERTER_DEBUG else 60, io_loop_sleep_time=0.001, initialize_subprocesses_in_serial=False)
|
|
|
|
self.is_interactive = is_interactive
|
|
self.converter_session_filepath = Path(converter_session_filepath)
|
|
self.converter_config = converter_config
|
|
|
|
#dummy predict and sleep, tensorflow caching kernels. If remove it, sometime conversion speed can be x2 slower
|
|
predictor_func (dummy_predict=True)
|
|
time.sleep(2)
|
|
|
|
self.predictor_func_host, self.predictor_func = SubprocessFunctionCaller.make_pair(predictor_func)
|
|
self.predictor_input_shape = predictor_input_shape
|
|
|
|
self.dcscn = None
|
|
self.ranksrgan = None
|
|
def superres_func(mode, *args, **kwargs):
|
|
if mode == 1:
|
|
if self.ranksrgan is None:
|
|
self.ranksrgan = imagelib.RankSRGAN()
|
|
return self.ranksrgan.upscale(*args, **kwargs)
|
|
|
|
self.dcscn_host, self.superres_func = SubprocessFunctionCaller.make_pair(superres_func)
|
|
|
|
self.output_path = output_path
|
|
self.model_iter = model_iter
|
|
|
|
self.prefetch_frame_count = self.process_count = min(6,multiprocessing.cpu_count())
|
|
|
|
session_data = None
|
|
if self.is_interactive and self.converter_session_filepath.exists():
|
|
|
|
if io.input_bool ("Use saved session? (y/n skip:y) : ", True):
|
|
try:
|
|
with open( str(self.converter_session_filepath), "rb") as f:
|
|
session_data = pickle.loads(f.read())
|
|
except Exception as e:
|
|
pass
|
|
|
|
self.frames = frames
|
|
self.frames_idxs = [ *range(len(self.frames)) ]
|
|
self.frames_done_idxs = []
|
|
|
|
if self.is_interactive and session_data is not None:
|
|
s_frames = session_data.get('frames', None)
|
|
s_frames_idxs = session_data.get('frames_idxs', None)
|
|
s_frames_done_idxs = session_data.get('frames_done_idxs', None)
|
|
s_model_iter = session_data.get('model_iter', None)
|
|
|
|
frames_equal = (s_frames is not None) and \
|
|
(s_frames_idxs is not None) and \
|
|
(s_frames_done_idxs is not None) and \
|
|
(s_model_iter is not None) and \
|
|
(len(frames) == len(s_frames))
|
|
|
|
if frames_equal:
|
|
for i in range(len(frames)):
|
|
frame = frames[i]
|
|
s_frame = s_frames[i]
|
|
if frame.frame_info.filename != s_frame.frame_info.filename:
|
|
frames_equal = False
|
|
if not frames_equal:
|
|
break
|
|
|
|
if frames_equal:
|
|
io.log_info ('Using saved session from ' + '/'.join (self.converter_session_filepath.parts[-2:]) )
|
|
|
|
for frame in s_frames:
|
|
if frame.cfg is not None:
|
|
#recreate ConverterConfig class using constructor with get_config() as dict params
|
|
#so if any new param will be added, old converter session will work properly
|
|
frame.cfg = frame.cfg.__class__( **frame.cfg.get_config() )
|
|
|
|
self.frames = s_frames
|
|
self.frames_idxs = s_frames_idxs
|
|
self.frames_done_idxs = s_frames_done_idxs
|
|
|
|
|
|
|
|
if self.model_iter != s_model_iter:
|
|
#model is more trained, recompute all frames
|
|
for frame in self.frames:
|
|
frame.is_done = False
|
|
|
|
if self.model_iter != s_model_iter or \
|
|
len(self.frames_idxs) == 0:
|
|
#rewind to begin if model is more trained or all frames are done
|
|
|
|
while len(self.frames_done_idxs) > 0:
|
|
prev_frame = self.frames[self.frames_done_idxs.pop()]
|
|
self.frames_idxs.insert(0, prev_frame.idx)
|
|
|
|
if len(self.frames_idxs) != 0:
|
|
cur_frame = self.frames[self.frames_idxs[0]]
|
|
cur_frame.is_shown = False
|
|
|
|
if not frames_equal:
|
|
session_data = None
|
|
|
|
if session_data is None:
|
|
for filename in Path_utils.get_image_paths(self.output_path): #remove all images in output_path
|
|
Path(filename).unlink()
|
|
|
|
frames[0].cfg = self.converter_config.copy()
|
|
|
|
for i in range( len(self.frames) ):
|
|
frame = self.frames[i]
|
|
frame.idx = i
|
|
frame.output_filename = self.output_path / ( Path(frame.frame_info.filename).stem + '.png' )
|
|
|
|
|
|
|
|
#override
|
|
def process_info_generator(self):
|
|
r = [0] if CONVERTER_DEBUG else range(self.process_count)
|
|
|
|
for i in r:
|
|
yield 'CPU%d' % (i), {}, {'device_idx': i,
|
|
'device_name': 'CPU%d' % (i),
|
|
'predictor_func': self.predictor_func,
|
|
'predictor_input_shape' : self.predictor_input_shape,
|
|
'superres_func': self.superres_func,
|
|
'stdin_fd': sys.stdin.fileno() if CONVERTER_DEBUG else None
|
|
}
|
|
|
|
#overridable optional
|
|
def on_clients_initialized(self):
|
|
io.progress_bar ("Converting", len (self.frames_idxs), initial=len(self.frames_done_idxs) )
|
|
|
|
self.process_remain_frames = not self.is_interactive
|
|
self.is_interactive_quitting = not self.is_interactive
|
|
|
|
if self.is_interactive:
|
|
help_images = {
|
|
ConverterConfig.TYPE_MASKED : cv2_imread ( str(Path(__file__).parent / 'gfx' / 'help_converter_masked.jpg') ),
|
|
ConverterConfig.TYPE_FACE_AVATAR : cv2_imread ( str(Path(__file__).parent / 'gfx' / 'help_converter_face_avatar.jpg') ),
|
|
}
|
|
|
|
self.main_screen = Screen(initial_scale_to_width=1368, image=None, waiting_icon=True)
|
|
self.help_screen = Screen(initial_scale_to_height=768, image=help_images[self.converter_config.type], waiting_icon=False)
|
|
self.screen_manager = ScreenManager( "Converter", [self.main_screen, self.help_screen], capture_keys=True )
|
|
self.screen_manager.set_current (self.help_screen)
|
|
self.screen_manager.show_current()
|
|
|
|
#overridable optional
|
|
def on_clients_finalized(self):
|
|
io.progress_bar_close()
|
|
|
|
if self.is_interactive:
|
|
self.screen_manager.finalize()
|
|
|
|
for frame in self.frames:
|
|
frame.output_filename = None
|
|
frame.image = None
|
|
|
|
session_data = {
|
|
'frames': self.frames,
|
|
'frames_idxs': self.frames_idxs,
|
|
'frames_done_idxs': self.frames_done_idxs,
|
|
'model_iter' : self.model_iter,
|
|
}
|
|
self.converter_session_filepath.write_bytes( pickle.dumps(session_data) )
|
|
|
|
io.log_info ("Session is saved to " + '/'.join (self.converter_session_filepath.parts[-2:]) )
|
|
|
|
cfg_change_keys = ['`','1', '2', '3', '4', '5', '6', '7', '8',
|
|
'q', 'a', 'w', 's', 'e', 'd', 'r', 'f', 'y','h','u','j','i','k','o','l','p', ';',':',#'t', 'g',
|
|
'z', 'x', 'c', 'v', 'b','n' ]
|
|
#override
|
|
def on_tick(self):
|
|
self.predictor_func_host.process_messages()
|
|
self.dcscn_host.process_messages()
|
|
|
|
go_prev_frame = False
|
|
go_prev_frame_overriding_cfg = False
|
|
go_next_frame = self.process_remain_frames
|
|
go_next_frame_overriding_cfg = False
|
|
|
|
cur_frame = None
|
|
if len(self.frames_idxs) != 0:
|
|
cur_frame = self.frames[self.frames_idxs[0]]
|
|
|
|
if self.is_interactive:
|
|
self.main_screen.set_waiting_icon(False)
|
|
|
|
if not self.is_interactive_quitting and not self.process_remain_frames:
|
|
if cur_frame is not None:
|
|
if not cur_frame.is_shown:
|
|
if cur_frame.is_done:
|
|
cur_frame.is_shown = True
|
|
io.log_info (cur_frame.cfg.to_string( cur_frame.frame_info.filename_short) )
|
|
|
|
if cur_frame.image is None:
|
|
cur_frame.image = cv2_imread ( cur_frame.output_filename)
|
|
if cur_frame.image is None:
|
|
cur_frame.is_done = False #unable to read? recompute then
|
|
cur_frame.is_shown = False
|
|
self.main_screen.set_image(cur_frame.image)
|
|
else:
|
|
self.main_screen.set_waiting_icon(True)
|
|
|
|
else:
|
|
self.main_screen.set_image(None)
|
|
else:
|
|
self.main_screen.set_image(None)
|
|
self.main_screen.set_waiting_icon(True)
|
|
|
|
self.screen_manager.show_current()
|
|
|
|
key_events = self.screen_manager.get_key_events()
|
|
key, chr_key, ctrl_pressed, alt_pressed, shift_pressed = key_events[-1] if len(key_events) > 0 else (0,0,False,False,False)
|
|
|
|
if key == 9: #tab
|
|
self.screen_manager.switch_screens()
|
|
else:
|
|
if key == 27: #esc
|
|
self.is_interactive_quitting = True
|
|
elif self.screen_manager.get_current() is self.main_screen:
|
|
if chr_key in self.cfg_change_keys:
|
|
self.process_remain_frames = False
|
|
|
|
if cur_frame is not None:
|
|
cfg = cur_frame.cfg
|
|
prev_cfg = cfg.copy()
|
|
|
|
if cfg.type == ConverterConfig.TYPE_MASKED:
|
|
if chr_key == '`':
|
|
cfg.set_mode(0)
|
|
elif key >= ord('1') and key <= ord('8'):
|
|
cfg.set_mode( key - ord('0') )
|
|
elif chr_key == 'q':
|
|
cfg.add_hist_match_threshold(1 if not shift_pressed else 5)
|
|
elif chr_key == 'a':
|
|
cfg.add_hist_match_threshold(-1 if not shift_pressed else -5)
|
|
elif chr_key == 'w':
|
|
cfg.add_erode_mask_modifier(1 if not shift_pressed else 5)
|
|
elif chr_key == 's':
|
|
cfg.add_erode_mask_modifier(-1 if not shift_pressed else -5)
|
|
elif chr_key == 'e':
|
|
cfg.add_blur_mask_modifier(1 if not shift_pressed else 5)
|
|
elif chr_key == 'd':
|
|
cfg.add_blur_mask_modifier(-1 if not shift_pressed else -5)
|
|
elif chr_key == 'r':
|
|
cfg.add_motion_blur_power(1 if not shift_pressed else 5)
|
|
elif chr_key == 'f':
|
|
cfg.add_motion_blur_power(-1 if not shift_pressed else -5)
|
|
elif chr_key == 'y':
|
|
cfg.add_blursharpen_amount(1 if not shift_pressed else 5)
|
|
elif chr_key == 'h':
|
|
cfg.add_blursharpen_amount(-1 if not shift_pressed else -5)
|
|
elif chr_key == 'u':
|
|
cfg.add_output_face_scale(1 if not shift_pressed else 5)
|
|
elif chr_key == 'j':
|
|
cfg.add_output_face_scale(-1 if not shift_pressed else -5)
|
|
elif chr_key == 'i':
|
|
cfg.add_image_denoise_power(1 if not shift_pressed else 5)
|
|
elif chr_key == 'k':
|
|
cfg.add_image_denoise_power(-1 if not shift_pressed else -5)
|
|
elif chr_key == 'o':
|
|
cfg.add_bicubic_degrade_power(1 if not shift_pressed else 5)
|
|
elif chr_key == 'l':
|
|
cfg.add_bicubic_degrade_power(-1 if not shift_pressed else -5)
|
|
|
|
elif chr_key == 'p':
|
|
cfg.add_color_degrade_power(1 if not shift_pressed else 5)
|
|
elif chr_key == ';':
|
|
cfg.add_color_degrade_power(-1)
|
|
elif chr_key == ':':
|
|
cfg.add_color_degrade_power(-5)
|
|
|
|
elif chr_key == 'z':
|
|
cfg.toggle_masked_hist_match()
|
|
elif chr_key == 'x':
|
|
cfg.toggle_mask_mode()
|
|
elif chr_key == 'c':
|
|
cfg.toggle_color_transfer_mode()
|
|
elif chr_key == 'v':
|
|
cfg.toggle_super_resolution_mode()
|
|
elif chr_key == 'b':
|
|
cfg.toggle_export_mask_alpha()
|
|
elif chr_key == 'n':
|
|
cfg.toggle_sharpen_mode()
|
|
|
|
else:
|
|
if chr_key == 'y':
|
|
cfg.add_blursharpen_amount(1 if not shift_pressed else 5)
|
|
elif chr_key == 'h':
|
|
cfg.add_blursharpen_amount(-1 if not shift_pressed else -5)
|
|
elif chr_key == 's':
|
|
cfg.toggle_add_source_image()
|
|
elif chr_key == 'v':
|
|
cfg.toggle_super_resolution_mode()
|
|
elif chr_key == 'n':
|
|
cfg.toggle_sharpen_mode()
|
|
|
|
if prev_cfg != cfg:
|
|
io.log_info ( cfg.to_string(cur_frame.frame_info.filename_short) )
|
|
cur_frame.is_done = False
|
|
cur_frame.is_shown = False
|
|
else:
|
|
if chr_key == ',' or chr_key == 'm':
|
|
self.process_remain_frames = False
|
|
go_prev_frame = True
|
|
go_prev_frame_overriding_cfg = chr_key == 'm'
|
|
elif chr_key == '.' or chr_key == '/':
|
|
self.process_remain_frames = False
|
|
go_next_frame = True
|
|
go_next_frame_overriding_cfg = chr_key == '/'
|
|
elif chr_key == '\r' or chr_key == '\n':
|
|
self.process_remain_frames = not self.process_remain_frames
|
|
elif chr_key == '-':
|
|
self.screen_manager.get_current().diff_scale(-0.1)
|
|
elif chr_key == '=':
|
|
self.screen_manager.get_current().diff_scale(0.1)
|
|
|
|
|
|
if go_prev_frame:
|
|
if cur_frame is None or cur_frame.is_done:
|
|
if cur_frame is not None:
|
|
cur_frame.image = None
|
|
|
|
if len(self.frames_done_idxs) > 0:
|
|
prev_frame = self.frames[self.frames_done_idxs.pop()]
|
|
self.frames_idxs.insert(0, prev_frame.idx)
|
|
prev_frame.is_shown = False
|
|
io.progress_bar_inc(-1)
|
|
|
|
if cur_frame is not None and go_prev_frame_overriding_cfg:
|
|
if prev_frame.cfg != cur_frame.cfg:
|
|
prev_frame.cfg = cur_frame.cfg.copy()
|
|
prev_frame.is_done = False
|
|
|
|
elif go_next_frame:
|
|
if cur_frame is not None and cur_frame.is_done:
|
|
cur_frame.image = None
|
|
cur_frame.is_shown = True
|
|
self.frames_done_idxs.append(cur_frame.idx)
|
|
self.frames_idxs.pop(0)
|
|
io.progress_bar_inc(1)
|
|
|
|
if len(self.frames_idxs) != 0:
|
|
next_frame = self.frames[ self.frames_idxs[0] ]
|
|
|
|
if go_next_frame_overriding_cfg:
|
|
f = self.frames
|
|
for i in range( next_frame.idx, len(self.frames) ):
|
|
f[i].cfg = None
|
|
f[i].is_shown = False
|
|
|
|
if next_frame.cfg is None or next_frame.is_shown == False: #next frame is never shown or override current cfg to next frames and the prefetches
|
|
for i in range( min(len(self.frames_idxs), self.prefetch_frame_count) ):
|
|
frame = self.frames[ self.frames_idxs[i] ]
|
|
|
|
if frame.cfg is None or frame.cfg != cur_frame.cfg:
|
|
frame.cfg = cur_frame.cfg.copy()
|
|
frame.is_done = False #initiate solve again
|
|
|
|
|
|
next_frame.is_shown = False
|
|
|
|
if len(self.frames_idxs) == 0:
|
|
self.process_remain_frames = False
|
|
|
|
return (self.is_interactive and self.is_interactive_quitting) or \
|
|
(not self.is_interactive and self.process_remain_frames == False)
|
|
|
|
|
|
#override
|
|
def on_data_return (self, host_dict, pf):
|
|
frame = self.frames[pf.idx]
|
|
frame.is_done = False
|
|
frame.is_processing = False
|
|
|
|
#override
|
|
def on_result (self, host_dict, pf_sent, pf_result):
|
|
frame = self.frames[pf_result.idx]
|
|
frame.is_processing = False
|
|
if frame.cfg == pf_result.cfg:
|
|
frame.is_done = True
|
|
frame.image = pf_result.image
|
|
|
|
#override
|
|
def get_data(self, host_dict):
|
|
if self.is_interactive and self.is_interactive_quitting:
|
|
return None
|
|
|
|
for i in range ( min(len(self.frames_idxs), self.prefetch_frame_count) ):
|
|
frame = self.frames[ self.frames_idxs[i] ]
|
|
|
|
if not frame.is_done and not frame.is_processing and frame.cfg is not None:
|
|
frame.is_processing = True
|
|
return ConvertSubprocessor.ProcessingFrame(idx=frame.idx,
|
|
cfg=frame.cfg.copy(),
|
|
prev_temporal_frame_infos=frame.prev_temporal_frame_infos,
|
|
frame_info=frame.frame_info,
|
|
next_temporal_frame_infos=frame.next_temporal_frame_infos,
|
|
output_filename=frame.output_filename,
|
|
need_return_image=True )
|
|
|
|
return None
|
|
|
|
#override
|
|
def get_result(self):
|
|
return 0
|
|
|
|
def main (args, device_args):
|
|
io.log_info ("Running converter.\r\n")
|
|
|
|
training_data_src_dir = args.get('training_data_src_dir', None)
|
|
training_data_src_path = Path(training_data_src_dir) if training_data_src_dir is not None else None
|
|
aligned_dir = args.get('aligned_dir', None)
|
|
avaperator_aligned_dir = args.get('avaperator_aligned_dir', None)
|
|
|
|
try:
|
|
input_path = Path(args['input_dir'])
|
|
output_path = Path(args['output_dir'])
|
|
model_path = Path(args['model_dir'])
|
|
|
|
if not input_path.exists():
|
|
io.log_err('Input directory not found. Please ensure it exists.')
|
|
return
|
|
|
|
if not output_path.exists():
|
|
output_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
if not model_path.exists():
|
|
io.log_err('Model directory not found. Please ensure it exists.')
|
|
return
|
|
|
|
is_interactive = io.input_bool ("Use interactive converter? (y/n skip:y) : ", True) if not io.is_colab() else False
|
|
|
|
import models
|
|
model = models.import_model( args['model_name'])(model_path, device_args=device_args, training_data_src_path=training_data_src_path)
|
|
converter_session_filepath = model.get_strpath_storage_for_file('converter_session.dat')
|
|
predictor_func, predictor_input_shape, cfg = model.get_ConverterConfig()
|
|
|
|
if not is_interactive:
|
|
cfg.ask_settings()
|
|
|
|
input_path_image_paths = Path_utils.get_image_paths(input_path)
|
|
|
|
if cfg.type == ConverterConfig.TYPE_MASKED:
|
|
if aligned_dir is None:
|
|
io.log_err('Aligned directory not found. Please ensure it exists.')
|
|
return
|
|
|
|
aligned_path = Path(aligned_dir)
|
|
if not aligned_path.exists():
|
|
io.log_err('Aligned directory not found. Please ensure it exists.')
|
|
return
|
|
|
|
packed_samples = None
|
|
try:
|
|
packed_samples = samplelib.PackedFaceset.load(aligned_path)
|
|
except:
|
|
io.log_err(f"Error occured while loading samplelib.PackedFaceset.load {str(aligned_path)}, {traceback.format_exc()}")
|
|
|
|
|
|
if packed_samples is not None:
|
|
io.log_info ("Using packed faceset.")
|
|
def generator():
|
|
for sample in io.progress_bar_generator( packed_samples, "Collecting alignments"):
|
|
filepath = Path(sample.filename)
|
|
yield DFLIMG.load(filepath, loader_func=lambda x: sample.read_raw_file() )
|
|
else:
|
|
def generator():
|
|
for filepath in io.progress_bar_generator( Path_utils.get_image_paths(aligned_path), "Collecting alignments"):
|
|
filepath = Path(filepath)
|
|
yield DFLIMG.load(filepath)
|
|
|
|
alignments = {}
|
|
multiple_faces_detected = False
|
|
|
|
for dflimg in generator():
|
|
if dflimg is None:
|
|
io.log_err ("%s is not a dfl image file" % (filepath.name) )
|
|
continue
|
|
|
|
source_filename = dflimg.get_source_filename()
|
|
if source_filename is None or source_filename == "_":
|
|
continue
|
|
|
|
source_filename = Path(source_filename)
|
|
source_filename_stem = source_filename.stem
|
|
|
|
if source_filename_stem not in alignments.keys():
|
|
alignments[ source_filename_stem ] = []
|
|
|
|
alignments_ar = alignments[ source_filename_stem ]
|
|
alignments_ar.append (dflimg.get_source_landmarks())
|
|
if len(alignments_ar) > 1:
|
|
multiple_faces_detected = True
|
|
|
|
if multiple_faces_detected:
|
|
io.log_info ("Warning: multiple faces detected. Strongly recommended to process them separately.")
|
|
|
|
frames = [ ConvertSubprocessor.Frame( frame_info=FrameInfo(filename=p, landmarks_list=alignments.get(Path(p).stem, None))) for p in input_path_image_paths ]
|
|
|
|
if multiple_faces_detected:
|
|
io.log_info ("Warning: multiple faces detected. Motion blur will not be used.")
|
|
else:
|
|
s = 256
|
|
local_pts = [ (s//2-1, s//2-1), (s//2-1,0) ] #center+up
|
|
frames_len = len(frames)
|
|
for i in io.progress_bar_generator( range(len(frames)) , "Computing motion vectors"):
|
|
fi_prev = frames[max(0, i-1)].frame_info
|
|
fi = frames[i].frame_info
|
|
fi_next = frames[min(i+1, frames_len-1)].frame_info
|
|
if len(fi_prev.landmarks_list) == 0 or \
|
|
len(fi.landmarks_list) == 0 or \
|
|
len(fi_next.landmarks_list) == 0:
|
|
continue
|
|
|
|
mat_prev = LandmarksProcessor.get_transform_mat ( fi_prev.landmarks_list[0], s, face_type=FaceType.FULL)
|
|
mat = LandmarksProcessor.get_transform_mat ( fi.landmarks_list[0] , s, face_type=FaceType.FULL)
|
|
mat_next = LandmarksProcessor.get_transform_mat ( fi_next.landmarks_list[0], s, face_type=FaceType.FULL)
|
|
|
|
pts_prev = LandmarksProcessor.transform_points (local_pts, mat_prev, True)
|
|
pts = LandmarksProcessor.transform_points (local_pts, mat, True)
|
|
pts_next = LandmarksProcessor.transform_points (local_pts, mat_next, True)
|
|
|
|
prev_vector = pts[0]-pts_prev[0]
|
|
next_vector = pts_next[0]-pts[0]
|
|
|
|
motion_vector = pts_next[0] - pts_prev[0]
|
|
fi.motion_power = npla.norm(motion_vector)
|
|
|
|
motion_vector = motion_vector / fi.motion_power if fi.motion_power != 0 else np.array([0,0],dtype=np.float32)
|
|
|
|
fi.motion_deg = -math.atan2(motion_vector[1],motion_vector[0])*180 / math.pi
|
|
|
|
|
|
elif cfg.type == ConverterConfig.TYPE_FACE_AVATAR:
|
|
filesdata = []
|
|
for filepath in io.progress_bar_generator(input_path_image_paths, "Collecting info"):
|
|
filepath = Path(filepath)
|
|
|
|
dflimg = DFLIMG.load(filepath)
|
|
if dflimg is None:
|
|
io.log_err ("%s is not a dfl image file" % (filepath.name) )
|
|
continue
|
|
filesdata += [ ( FrameInfo(filename=str(filepath), landmarks_list=[dflimg.get_landmarks()] ), dflimg.get_source_filename() ) ]
|
|
|
|
filesdata = sorted(filesdata, key=operator.itemgetter(1)) #sort by filename
|
|
frames = []
|
|
filesdata_len = len(filesdata)
|
|
for i in range(len(filesdata)):
|
|
frame_info = filesdata[i][0]
|
|
|
|
prev_temporal_frame_infos = []
|
|
next_temporal_frame_infos = []
|
|
|
|
for t in range (cfg.temporal_face_count):
|
|
prev_frame_info = filesdata[ max(i -t, 0) ][0]
|
|
next_frame_info = filesdata[ min(i +t, filesdata_len-1 )][0]
|
|
|
|
prev_temporal_frame_infos.insert (0, prev_frame_info )
|
|
next_temporal_frame_infos.append ( next_frame_info )
|
|
|
|
frames.append ( ConvertSubprocessor.Frame(prev_temporal_frame_infos=prev_temporal_frame_infos,
|
|
frame_info=frame_info,
|
|
next_temporal_frame_infos=next_temporal_frame_infos) )
|
|
|
|
if len(frames) == 0:
|
|
io.log_info ("No frames to convert in input_dir.")
|
|
else:
|
|
ConvertSubprocessor (
|
|
is_interactive = is_interactive,
|
|
converter_session_filepath = converter_session_filepath,
|
|
predictor_func = predictor_func,
|
|
predictor_input_shape = predictor_input_shape,
|
|
converter_config = cfg,
|
|
frames = frames,
|
|
output_path = output_path,
|
|
model_iter = model.get_iter()
|
|
).run()
|
|
|
|
model.finalize()
|
|
|
|
except Exception as e:
|
|
print ( 'Error: %s' % (str(e)))
|
|
traceback.print_exc()
|
|
|
|
#interpolate landmarks
|
|
#from facelib import LandmarksProcessor
|
|
#from facelib import FaceType
|
|
#a = sorted(alignments.keys())
|
|
#a_len = len(a)
|
|
#
|
|
#box_pts = 3
|
|
#box = np.ones(box_pts)/box_pts
|
|
#for i in range( a_len ):
|
|
# if i >= box_pts and i <= a_len-box_pts-1:
|
|
# af0 = alignments[ a[i] ][0] ##first face
|
|
# m0 = LandmarksProcessor.get_transform_mat (af0, 256, face_type=FaceType.FULL)
|
|
#
|
|
# points = []
|
|
#
|
|
# for j in range(-box_pts, box_pts+1):
|
|
# af = alignments[ a[i+j] ][0] ##first face
|
|
# m = LandmarksProcessor.get_transform_mat (af, 256, face_type=FaceType.FULL)
|
|
# p = LandmarksProcessor.transform_points (af, m)
|
|
# points.append (p)
|
|
#
|
|
# points = np.array(points)
|
|
# points_len = len(points)
|
|
# t_points = np.transpose(points, [1,0,2])
|
|
#
|
|
# p1 = np.array ( [ int(np.convolve(x[:,0], box, mode='same')[points_len//2]) for x in t_points ] )
|
|
# p2 = np.array ( [ int(np.convolve(x[:,1], box, mode='same')[points_len//2]) for x in t_points ] )
|
|
#
|
|
# new_points = np.concatenate( [np.expand_dims(p1,-1),np.expand_dims(p2,-1)], -1 )
|
|
#
|
|
# alignments[ a[i] ][0] = LandmarksProcessor.transform_points (new_points, m0, True).astype(np.int32)
|