DeepFaceLab/mainscripts/dev_misc.py

594 lines
20 KiB
Python

import traceback
import json
import multiprocessing
import shutil
from pathlib import Path
import cv2
import numpy as np
from core import imagelib, pathex
from core.cv2ex import *
from core.interact import interact as io
from core.joblib import Subprocessor
from core.leras import nn
from DFLIMG import *
from facelib import FaceType, LandmarksProcessor
from . import Extractor, Sorter
from .Extractor import ExtractSubprocessor
def extract_vggface2_dataset(input_dir, device_args={} ):
multi_gpu = device_args.get('multi_gpu', False)
cpu_only = device_args.get('cpu_only', False)
input_path = Path(input_dir)
if not input_path.exists():
raise ValueError('Input directory not found. Please ensure it exists.')
bb_csv = input_path / 'loose_bb_train.csv'
if not bb_csv.exists():
raise ValueError('loose_bb_train.csv found. Please ensure it exists.')
bb_lines = bb_csv.read_text().split('\n')
bb_lines.pop(0)
bb_dict = {}
for line in bb_lines:
name, l, t, w, h = line.split(',')
name = name[1:-1]
l, t, w, h = [ int(x) for x in (l, t, w, h) ]
bb_dict[name] = (l,t,w, h)
output_path = input_path.parent / (input_path.name + '_out')
dir_names = pathex.get_all_dir_names(input_path)
if not output_path.exists():
output_path.mkdir(parents=True, exist_ok=True)
data = []
for dir_name in io.progress_bar_generator(dir_names, "Collecting"):
cur_input_path = input_path / dir_name
cur_output_path = output_path / dir_name
if not cur_output_path.exists():
cur_output_path.mkdir(parents=True, exist_ok=True)
input_path_image_paths = pathex.get_image_paths(cur_input_path)
for filename in input_path_image_paths:
filename_path = Path(filename)
name = filename_path.parent.name + '/' + filename_path.stem
if name not in bb_dict:
continue
l,t,w,h = bb_dict[name]
if min(w,h) < 128:
continue
data += [ ExtractSubprocessor.Data(filename=filename,rects=[ (l,t,l+w,t+h) ], landmarks_accurate=False, force_output_path=cur_output_path ) ]
face_type = FaceType.fromString('full_face')
io.log_info ('Performing 2nd pass...')
data = ExtractSubprocessor (data, 'landmarks', 256, face_type, debug_dir=None, multi_gpu=multi_gpu, cpu_only=cpu_only, manual=False).run()
io.log_info ('Performing 3rd pass...')
ExtractSubprocessor (data, 'final', 256, face_type, debug_dir=None, multi_gpu=multi_gpu, cpu_only=cpu_only, manual=False, final_output_path=None).run()
"""
import code
code.interact(local=dict(globals(), **locals()))
data_len = len(data)
i = 0
while i < data_len-1:
i_name = Path(data[i].filename).parent.name
sub_data = []
for j in range (i, data_len):
j_name = Path(data[j].filename).parent.name
if i_name == j_name:
sub_data += [ data[j] ]
else:
break
i = j
cur_output_path = output_path / i_name
io.log_info (f"Processing: {str(cur_output_path)}, {i}/{data_len} ")
if not cur_output_path.exists():
cur_output_path.mkdir(parents=True, exist_ok=True)
for dir_name in dir_names:
cur_input_path = input_path / dir_name
cur_output_path = output_path / dir_name
input_path_image_paths = pathex.get_image_paths(cur_input_path)
l = len(input_path_image_paths)
#if l < 250 or l > 350:
# continue
io.log_info (f"Processing: {str(cur_input_path)} ")
if not cur_output_path.exists():
cur_output_path.mkdir(parents=True, exist_ok=True)
data = []
for filename in input_path_image_paths:
filename_path = Path(filename)
name = filename_path.parent.name + '/' + filename_path.stem
if name not in bb_dict:
continue
bb = bb_dict[name]
l,t,w,h = bb
if min(w,h) < 128:
continue
data += [ ExtractSubprocessor.Data(filename=filename,rects=[ (l,t,l+w,t+h) ], landmarks_accurate=False ) ]
io.log_info ('Performing 2nd pass...')
data = ExtractSubprocessor (data, 'landmarks', 256, face_type, debug_dir=None, multi_gpu=False, cpu_only=False, manual=False).run()
io.log_info ('Performing 3rd pass...')
data = ExtractSubprocessor (data, 'final', 256, face_type, debug_dir=None, multi_gpu=False, cpu_only=False, manual=False, final_output_path=cur_output_path).run()
io.log_info (f"Sorting: {str(cur_output_path)} ")
Sorter.main (input_path=str(cur_output_path), sort_by_method='hist')
import code
code.interact(local=dict(globals(), **locals()))
#try:
# io.log_info (f"Removing: {str(cur_input_path)} ")
# shutil.rmtree(cur_input_path)
#except:
# io.log_info (f"unable to remove: {str(cur_input_path)} ")
def extract_vggface2_dataset(input_dir, device_args={} ):
multi_gpu = device_args.get('multi_gpu', False)
cpu_only = device_args.get('cpu_only', False)
input_path = Path(input_dir)
if not input_path.exists():
raise ValueError('Input directory not found. Please ensure it exists.')
output_path = input_path.parent / (input_path.name + '_out')
dir_names = pathex.get_all_dir_names(input_path)
if not output_path.exists():
output_path.mkdir(parents=True, exist_ok=True)
for dir_name in dir_names:
cur_input_path = input_path / dir_name
cur_output_path = output_path / dir_name
l = len(pathex.get_image_paths(cur_input_path))
if l < 250 or l > 350:
continue
io.log_info (f"Processing: {str(cur_input_path)} ")
if not cur_output_path.exists():
cur_output_path.mkdir(parents=True, exist_ok=True)
Extractor.main( str(cur_input_path),
str(cur_output_path),
detector='s3fd',
image_size=256,
face_type='full_face',
max_faces_from_image=1,
device_args=device_args )
io.log_info (f"Sorting: {str(cur_input_path)} ")
Sorter.main (input_path=str(cur_output_path), sort_by_method='hist')
try:
io.log_info (f"Removing: {str(cur_input_path)} ")
shutil.rmtree(cur_input_path)
except:
io.log_info (f"unable to remove: {str(cur_input_path)} ")
"""
#unused in end user workflow
def dev_test_68(input_dir ):
# process 68 landmarks dataset with .pts files
input_path = Path(input_dir)
if not input_path.exists():
raise ValueError('input_dir not found. Please ensure it exists.')
output_path = input_path.parent / (input_path.name+'_aligned')
io.log_info(f'Output dir is % {output_path}')
if output_path.exists():
output_images_paths = pathex.get_image_paths(output_path)
if len(output_images_paths) > 0:
io.input_bool("WARNING !!! \n %s contains files! \n They will be deleted. \n Press enter to continue." % (str(output_path)), False )
for filename in output_images_paths:
Path(filename).unlink()
else:
output_path.mkdir(parents=True, exist_ok=True)
images_paths = pathex.get_image_paths(input_path)
for filepath in io.progress_bar_generator(images_paths, "Processing"):
filepath = Path(filepath)
pts_filepath = filepath.parent / (filepath.stem+'.pts')
if pts_filepath.exists():
pts = pts_filepath.read_text()
pts_lines = pts.split('\n')
lmrk_lines = None
for pts_line in pts_lines:
if pts_line == '{':
lmrk_lines = []
elif pts_line == '}':
break
else:
if lmrk_lines is not None:
lmrk_lines.append (pts_line)
if lmrk_lines is not None and len(lmrk_lines) == 68:
try:
lmrks = [ np.array ( lmrk_line.strip().split(' ') ).astype(np.float32).tolist() for lmrk_line in lmrk_lines]
except Exception as e:
print(e)
print(filepath)
continue
rect = LandmarksProcessor.get_rect_from_landmarks(lmrks)
output_filepath = output_path / (filepath.stem+'.jpg')
img = cv2_imread(filepath)
img = imagelib.normalize_channels(img, 3)
cv2_imwrite(output_filepath, img, [int(cv2.IMWRITE_JPEG_QUALITY), 95] )
raise Exception("unimplemented")
#DFLJPG.x(output_filepath, face_type=FaceType.toString(FaceType.MARK_ONLY),
# landmarks=lmrks,
# source_filename=filepath.name,
# source_rect=rect,
# source_landmarks=lmrks
# )
io.log_info("Done.")
#unused in end user workflow
def extract_umd_csv(input_file_csv,
face_type='full_face',
device_args={} ):
#extract faces from umdfaces.io dataset csv file with pitch,yaw,roll info.
multi_gpu = device_args.get('multi_gpu', False)
cpu_only = device_args.get('cpu_only', False)
face_type = FaceType.fromString(face_type)
input_file_csv_path = Path(input_file_csv)
if not input_file_csv_path.exists():
raise ValueError('input_file_csv not found. Please ensure it exists.')
input_file_csv_root_path = input_file_csv_path.parent
output_path = input_file_csv_path.parent / ('aligned_' + input_file_csv_path.name)
io.log_info("Output dir is %s." % (str(output_path)) )
if output_path.exists():
output_images_paths = pathex.get_image_paths(output_path)
if len(output_images_paths) > 0:
io.input_bool("WARNING !!! \n %s contains files! \n They will be deleted. \n Press enter to continue." % (str(output_path)), False )
for filename in output_images_paths:
Path(filename).unlink()
else:
output_path.mkdir(parents=True, exist_ok=True)
try:
with open( str(input_file_csv_path), 'r') as f:
csv_file = f.read()
except Exception as e:
io.log_err("Unable to open or read file " + str(input_file_csv_path) + ": " + str(e) )
return
strings = csv_file.split('\n')
keys = strings[0].split(',')
keys_len = len(keys)
csv_data = []
for i in range(1, len(strings)):
values = strings[i].split(',')
if keys_len != len(values):
io.log_err("Wrong string in csv file, skipping.")
continue
csv_data += [ { keys[n] : values[n] for n in range(keys_len) } ]
data = []
for d in csv_data:
filename = input_file_csv_root_path / d['FILE']
x,y,w,h = float(d['FACE_X']), float(d['FACE_Y']), float(d['FACE_WIDTH']), float(d['FACE_HEIGHT'])
data += [ ExtractSubprocessor.Data(filename=filename, rects=[ [x,y,x+w,y+h] ]) ]
images_found = len(data)
faces_detected = 0
if len(data) > 0:
io.log_info ("Performing 2nd pass from csv file...")
data = ExtractSubprocessor (data, 'landmarks', multi_gpu=multi_gpu, cpu_only=cpu_only).run()
io.log_info ('Performing 3rd pass...')
data = ExtractSubprocessor (data, 'final', face_type, None, multi_gpu=multi_gpu, cpu_only=cpu_only, manual=False, final_output_path=output_path).run()
faces_detected += sum([d.faces_detected for d in data])
io.log_info ('-------------------------')
io.log_info ('Images found: %d' % (images_found) )
io.log_info ('Faces detected: %d' % (faces_detected) )
io.log_info ('-------------------------')
def dev_test1(input_dir):
# LaPa dataset
image_size = 1024
face_type = FaceType.HEAD
input_path = Path(input_dir)
images_path = input_path / 'images'
if not images_path.exists:
raise ValueError('LaPa dataset: images folder not found.')
labels_path = input_path / 'labels'
if not labels_path.exists:
raise ValueError('LaPa dataset: labels folder not found.')
landmarks_path = input_path / 'landmarks'
if not landmarks_path.exists:
raise ValueError('LaPa dataset: landmarks folder not found.')
output_path = input_path / 'out'
if output_path.exists():
output_images_paths = pathex.get_image_paths(output_path)
if len(output_images_paths) != 0:
io.input(f"\n WARNING !!! \n {output_path} contains files! \n They will be deleted. \n Press enter to continue.\n")
for filename in output_images_paths:
Path(filename).unlink()
output_path.mkdir(parents=True, exist_ok=True)
data = []
img_paths = pathex.get_image_paths (images_path)
for filename in img_paths:
filepath = Path(filename)
landmark_filepath = landmarks_path / (filepath.stem + '.txt')
if not landmark_filepath.exists():
raise ValueError(f'no landmarks for {filepath}')
#img = cv2_imread(filepath)
lm = landmark_filepath.read_text()
lm = lm.split('\n')
if int(lm[0]) != 106:
raise ValueError(f'wrong landmarks format in {landmark_filepath}')
lmrks = []
for i in range(106):
x,y = lm[i+1].split(' ')
x,y = float(x), float(y)
lmrks.append ( (x,y) )
lmrks = np.array(lmrks)
l,t = np.min(lmrks, 0)
r,b = np.max(lmrks, 0)
l,t,r,b = ( int(x) for x in (l,t,r,b) )
#for x, y in lmrks:
# x,y = int(x), int(y)
# cv2.circle(img, (x, y), 1, (0,255,0) , 1, lineType=cv2.LINE_AA)
#imagelib.draw_rect(img, (l,t,r,b), (0,255,0) )
data += [ ExtractSubprocessor.Data(filepath=filepath, rects=[ (l,t,r,b) ]) ]
#cv2.imshow("", img)
#cv2.waitKey(0)
if len(data) > 0:
device_config = nn.DeviceConfig.BestGPU()
io.log_info ("Performing 2nd pass...")
data = ExtractSubprocessor (data, 'landmarks', image_size, 95, face_type, device_config=device_config).run()
io.log_info ("Performing 3rd pass...")
data = ExtractSubprocessor (data, 'final', image_size, 95, face_type, final_output_path=output_path, device_config=device_config).run()
for filename in pathex.get_image_paths (output_path):
filepath = Path(filename)
dflimg = DFLJPG.load(filepath)
src_filename = dflimg.get_source_filename()
image_to_face_mat = dflimg.get_image_to_face_mat()
label_filepath = labels_path / ( Path(src_filename).stem + '.png')
if not label_filepath.exists():
raise ValueError(f'{label_filepath} does not exist')
mask = cv2_imread(label_filepath)
#mask[mask == 10] = 0 # remove hair
mask[mask > 0] = 1
mask = cv2.warpAffine(mask, image_to_face_mat, (image_size, image_size), cv2.INTER_LINEAR)
mask = cv2.blur(mask, (3,3) )
#cv2.imshow("", (mask*255).astype(np.uint8) )
#cv2.waitKey(0)
dflimg.set_xseg_mask(mask)
dflimg.save()
import code
code.interact(local=dict(globals(), **locals()))
def dev_resave_pngs(input_dir):
input_path = Path(input_dir)
if not input_path.exists():
raise ValueError('input_dir not found. Please ensure it exists.')
images_paths = pathex.get_image_paths(input_path, image_extensions=['.png'], subdirs=True, return_Path_class=True)
for filepath in io.progress_bar_generator(images_paths,"Processing"):
cv2_imwrite(filepath, cv2_imread(filepath))
def dev_segmented_trash(input_dir):
input_path = Path(input_dir)
if not input_path.exists():
raise ValueError('input_dir not found. Please ensure it exists.')
output_path = input_path.parent / (input_path.name+'_trash')
output_path.mkdir(parents=True, exist_ok=True)
images_paths = pathex.get_image_paths(input_path, return_Path_class=True)
trash_paths = []
for filepath in images_paths:
json_file = filepath.parent / (filepath.stem +'.json')
if not json_file.exists():
trash_paths.append(filepath)
for filepath in trash_paths:
try:
filepath.rename ( output_path / filepath.name )
except:
io.log_info ('fail to trashing %s' % (src.name) )
def dev_test(input_dir):
"""
extract FaceSynthetics dataset https://github.com/microsoft/FaceSynthetics
BACKGROUND = 0
SKIN = 1
NOSE = 2
RIGHT_EYE = 3
LEFT_EYE = 4
RIGHT_BROW = 5
LEFT_BROW = 6
RIGHT_EAR = 7
LEFT_EAR = 8
MOUTH_INTERIOR = 9
TOP_LIP = 10
BOTTOM_LIP = 11
NECK = 12
HAIR = 13
BEARD = 14
CLOTHING = 15
GLASSES = 16
HEADWEAR = 17
FACEWEAR = 18
IGNORE = 255
"""
image_size = 1024
face_type = FaceType.WHOLE_FACE
input_path = Path(input_dir)
output_path = input_path.parent / f'{input_path.name}_out'
if output_path.exists():
output_images_paths = pathex.get_image_paths(output_path)
if len(output_images_paths) != 0:
io.input(f"\n WARNING !!! \n {output_path} contains files! \n They will be deleted. \n Press enter to continue.\n")
for filename in output_images_paths:
Path(filename).unlink()
output_path.mkdir(parents=True, exist_ok=True)
data = []
for filepath in io.progress_bar_generator(pathex.get_paths(input_path), "Processing"):
if filepath.suffix == '.txt':
image_filepath = filepath.parent / f'{filepath.name.split("_")[0]}.png'
if not image_filepath.exists():
print(f'{image_filepath} does not exist, skipping')
lmrks = []
for lmrk_line in filepath.read_text().split('\n'):
if len(lmrk_line) == 0:
continue
x, y = lmrk_line.split(' ')
x, y = float(x), float(y)
lmrks.append( (x,y) )
lmrks = np.array(lmrks[:68], np.float32)
rect = LandmarksProcessor.get_rect_from_landmarks(lmrks)
data += [ ExtractSubprocessor.Data(filepath=image_filepath, rects=[rect], landmarks=[ lmrks ] ) ]
if len(data) > 0:
io.log_info ("Performing 3rd pass...")
data = ExtractSubprocessor (data, 'final', image_size, 95, face_type, final_output_path=output_path, device_config=nn.DeviceConfig.CPU()).run()
for filename in io.progress_bar_generator(pathex.get_image_paths (output_path), "Processing"):
filepath = Path(filename)
dflimg = DFLJPG.load(filepath)
src_filename = dflimg.get_source_filename()
image_to_face_mat = dflimg.get_image_to_face_mat()
seg_filepath = input_path / ( Path(src_filename).stem + '_seg.png')
if not seg_filepath.exists():
raise ValueError(f'{seg_filepath} does not exist')
seg = cv2_imread(seg_filepath)
seg_inds = np.isin(seg, [1,2,3,4,5,6,9,10,11])
seg[~seg_inds] = 0
seg[seg_inds] = 1
seg = seg.astype(np.float32)
seg = cv2.warpAffine(seg, image_to_face_mat, (image_size, image_size), cv2.INTER_LANCZOS4)
dflimg.set_xseg_mask(seg)
dflimg.save()