DeepFaceLab/samplelib/SampleLoader.py
Colombo 45582d129d added XSeg model.
with XSeg model you can train your own mask segmentator of dst(and src) faces
that will be used in merger for whole_face.

Instead of using a pretrained model (which does not exist),
you control which part of faces should be masked.

Workflow is not easy, but at the moment it is the best solution
for obtaining the best quality of whole_face's deepfakes using minimum effort
without rotoscoping in AfterEffects.

new scripts:
	XSeg) data_dst edit.bat
	XSeg) data_dst merge.bat
	XSeg) data_dst split.bat
	XSeg) data_src edit.bat
	XSeg) data_src merge.bat
	XSeg) data_src split.bat
	XSeg) train.bat

Usage:
	unpack dst faceset if packed

	run XSeg) data_dst split.bat
		this scripts extracts (previously saved) .json data from jpg faces to use in label tool.

	run XSeg) data_dst edit.bat
		new tool 'labelme' is used

		use polygon (CTRL-N) to mask the face
			name polygon "1" (one symbol) as include polygon
			name polygon "0" (one symbol) as exclude polygon

			'exclude polygons' will be applied after all 'include polygons'

		Hot keys:
		ctrl-N			create polygon
		ctrl-J			edit polygon
		A/D 			navigate between frames
		ctrl + mousewheel 	image zoom
		mousewheel		vertical scroll
		alt+mousewheel		horizontal scroll

		repeat for 10/50/100 faces,
			you don't need to mask every frame of dst,
			only frames where the face is different significantly,
			for example:
				closed eyes
				changed head direction
				changed light
			the more various faces you mask, the more quality you will get

			Start masking from the upper left area and follow the clockwise direction.
			Keep the same logic of masking for all frames, for example:
				the same approximated jaw line of the side faces, where the jaw is not visible
				the same hair line
			Mask the obstructions using polygon with name "0".

	run XSeg) data_dst merge.bat
		this script merges .json data of polygons into jpg faces,
		therefore faceset can be sorted or packed as usual.

	run XSeg) train.bat
		train the model

		Check the faces of 'XSeg dst faces' preview.

		if some faces have wrong or glitchy mask, then repeat steps:
			split
			run edit
			find these glitchy faces and mask them
			merge
			train further or restart training from scratch

Restart training of XSeg model is only possible by deleting all 'model\XSeg_*' files.

If you want to get the mask of the predicted face in merger,
you should repeat the same steps for src faceset.

New mask modes available in merger for whole_face:

XSeg-prd	  - XSeg mask of predicted face	 -> faces from src faceset should be labeled
XSeg-dst	  - XSeg mask of dst face        -> faces from dst faceset should be labeled
XSeg-prd*XSeg-dst - the smallest area of both

if workspace\model folder contains trained XSeg model, then merger will use it,
otherwise you will get transparent mask by using XSeg-* modes.

Some screenshots:
label tool: https://i.imgur.com/aY6QGw1.jpg
trainer   : https://i.imgur.com/NM1Kn3s.jpg
merger    : https://i.imgur.com/glUzFQ8.jpg

example of the fake using 13 segmented dst faces
          : https://i.imgur.com/wmvyizU.gifv
2020-03-15 15:12:44 +04:00

192 lines
6.9 KiB
Python

import multiprocessing
import operator
import pickle
import traceback
from pathlib import Path
import samplelib.PackedFaceset
from core import pathex
from core.interact import interact as io
from core.joblib import Subprocessor
from DFLIMG import *
from facelib import FaceType, LandmarksProcessor
from .Sample import Sample, SampleType
class SampleLoader:
samples_cache = dict()
@staticmethod
def get_person_id_max_count(samples_path):
samples = None
try:
samples = samplelib.PackedFaceset.load(samples_path)
except:
io.log_err(f"Error occured while loading samplelib.PackedFaceset.load {str(samples_dat_path)}, {traceback.format_exc()}")
if samples is None:
raise ValueError("packed faceset not found.")
persons_name_idxs = {}
for sample in samples:
persons_name_idxs[sample.person_name] = 0
return len(list(persons_name_idxs.keys()))
@staticmethod
def load(sample_type, samples_path, subdirs=False):
samples_cache = SampleLoader.samples_cache
if str(samples_path) not in samples_cache.keys():
samples_cache[str(samples_path)] = [None]*SampleType.QTY
samples = samples_cache[str(samples_path)]
if sample_type == SampleType.IMAGE:
if samples[sample_type] is None:
samples[sample_type] = [ Sample(filename=filename) for filename in io.progress_bar_generator( pathex.get_image_paths(samples_path, subdirs=subdirs), "Loading") ]
elif sample_type == SampleType.FACE:
if samples[sample_type] is None:
try:
result = samplelib.PackedFaceset.load(samples_path)
except:
io.log_err(f"Error occured while loading samplelib.PackedFaceset.load {str(samples_dat_path)}, {traceback.format_exc()}")
if result is not None:
io.log_info (f"Loaded {len(result)} packed faces from {samples_path}")
if result is None:
result = SampleLoader.load_face_samples( pathex.get_image_paths(samples_path, subdirs=subdirs) )
samples[sample_type] = result
elif sample_type == SampleType.FACE_TEMPORAL_SORTED:
result = SampleLoader.load (SampleType.FACE, samples_path)
result = SampleLoader.upgradeToFaceTemporalSortedSamples(result)
samples[sample_type] = result
return samples[sample_type]
@staticmethod
def load_face_samples ( image_paths):
result = FaceSamplesLoaderSubprocessor(image_paths).run()
sample_list = []
for filename, \
( face_type,
shape,
landmarks,
ie_polys,
seg_ie_polys,
eyebrows_expand_mod,
source_filename,
) in result:
sample_list.append( Sample(filename=filename,
sample_type=SampleType.FACE,
face_type=FaceType.fromString (face_type),
shape=shape,
landmarks=landmarks,
ie_polys=ie_polys,
seg_ie_polys=seg_ie_polys,
eyebrows_expand_mod=eyebrows_expand_mod,
source_filename=source_filename,
))
return sample_list
"""
@staticmethod
def load_face_samples ( image_paths):
sample_list = []
for filename in io.progress_bar_generator (image_paths, desc="Loading"):
dflimg = DFLIMG.load (Path(filename))
if dflimg is None:
io.log_err (f"{filename} is not a dfl image file.")
else:
sample_list.append( Sample(filename=filename,
sample_type=SampleType.FACE,
face_type=FaceType.fromString ( dflimg.get_face_type() ),
shape=dflimg.get_shape(),
landmarks=dflimg.get_landmarks(),
ie_polys=dflimg.get_ie_polys(),
eyebrows_expand_mod=dflimg.get_eyebrows_expand_mod(),
source_filename=dflimg.get_source_filename(),
))
return sample_list
"""
@staticmethod
def upgradeToFaceTemporalSortedSamples( samples ):
new_s = [ (s, s.source_filename) for s in samples]
new_s = sorted(new_s, key=operator.itemgetter(1))
return [ s[0] for s in new_s]
class FaceSamplesLoaderSubprocessor(Subprocessor):
#override
def __init__(self, image_paths ):
self.image_paths = image_paths
self.image_paths_len = len(image_paths)
self.idxs = [*range(self.image_paths_len)]
self.result = [None]*self.image_paths_len
super().__init__('FaceSamplesLoader', FaceSamplesLoaderSubprocessor.Cli, 60)
#override
def on_clients_initialized(self):
io.progress_bar ("Loading samples", len (self.image_paths))
#override
def on_clients_finalized(self):
io.progress_bar_close()
#override
def process_info_generator(self):
for i in range(min(multiprocessing.cpu_count(), 8) ):
yield 'CPU%d' % (i), {}, {}
#override
def get_data(self, host_dict):
if len (self.idxs) > 0:
idx = self.idxs.pop(0)
return idx, self.image_paths[idx]
return None
#override
def on_data_return (self, host_dict, data):
self.idxs.insert(0, data[0])
#override
def on_result (self, host_dict, data, result):
idx, dflimg = result
self.result[idx] = (self.image_paths[idx], dflimg)
io.progress_bar_inc(1)
#override
def get_result(self):
return self.result
class Cli(Subprocessor.Cli):
#override
def process_data(self, data):
idx, filename = data
dflimg = DFLIMG.load (Path(filename))
if dflimg is None:
self.log_err (f"FaceSamplesLoader: {filename} is not a dfl image file.")
data = None
else:
data = (dflimg.get_face_type(),
dflimg.get_shape(),
dflimg.get_landmarks(),
dflimg.get_ie_polys(),
dflimg.get_seg_ie_polys(),
dflimg.get_eyebrows_expand_mod(),
dflimg.get_source_filename() )
return idx, data
#override
def get_data_name (self, data):
#return string identificator of your data
return data[1]