Fix ffmpeg test

This commit is contained in:
Labrys of Knossos 2022-12-19 00:39:36 -05:00
parent a25b15d72f
commit 379a86e555
8 changed files with 148 additions and 78 deletions

@ -13,6 +13,7 @@ import time
import typing
from subprocess import PIPE, DEVNULL
from nzb2media import tool
from nzb2media import databases
from nzb2media import main_db
from nzb2media import version_check
@ -212,8 +213,8 @@ OUTPUTQUALITYPERCENT = None
FFMPEG: pathlib.Path | None = None
SEVENZIP: pathlib.Path | None = None
SHOWEXTRACT = 0
PAR2CMD = None
FFPROBE = None
PAR2CMD: pathlib.Path | None = None
FFPROBE: pathlib.Path | None = None
CHECK_MEDIA = None
REQUIRE_LAN = None
NICENESS = []
@ -671,62 +672,16 @@ def configure_utility_locations():
global FFMPEG
global FFPROBE
global PAR2CMD
# Setup FFMPEG, FFPROBE and SEVENZIP locations
FFMPEG = tool.find_transcoder(FFMPEG_PATH)
FFPROBE = tool.find_video_corruption_detector(FFMPEG_PATH)
PAR2CMD = tool.find_archive_repairer()
if platform.system() == 'Windows':
if FFMPEG_PATH:
FFMPEG = FFMPEG_PATH / 'ffmpeg.exe'
FFPROBE = FFMPEG_PATH / 'ffprobe.exe'
SEVENZIP = APP_ROOT / f'nzb2media/extractor/bin{platform.machine()}/7z.exe'
SHOWEXTRACT = int(str(CFG['Windows']['show_extraction']), 0)
if FFMPEG and FFMPEG.exists(): # problem
FFMPEG = None
log.warning('Failed to locate ffmpeg.exe. Transcoding disabled!')
log.warning('Install ffmpeg with x264 support to enable this feature ...')
if not os.path.isfile(FFPROBE):
FFPROBE = None
if CHECK_MEDIA:
log.warning('Failed to locate ffprobe.exe. Video corruption detection disabled!')
log.warning('Install ffmpeg with x264 support to enable this feature ...')
path = APP_ROOT / f'nzb2media/extractor/bin/{platform.machine()}'
else:
if SYS_PATH:
os.environ['PATH'] += ':' + SYS_PATH
SEVENZIP = which('7z') or which('7zr') or which('7za')
if not SEVENZIP:
log.warning('Failed to locate 7zip. Transcoding of disk images and extraction of .7z files will not be possible!')
PAR2CMD = which('par2')
if not PAR2CMD:
PAR2CMD = None
log.warning('Failed to locate par2. Repair and rename using par files will not be possible!')
if FFMPEG_PATH:
ffmpeg_bin = FFMPEG_PATH / 'ffmpeg'
avconv_bin = FFMPEG_PATH / 'avconv'
if ffmpeg_bin.is_file() or os.access(ffmpeg_bin, os.X_OK):
FFMPEG = ffmpeg_bin
elif avconv_bin.is_file() or os.access(avconv_bin, os.X_OK):
FFMPEG = avconv_bin
if not FFMPEG:
FFMPEG = which('ffmpeg') or which('avconv')
if not FFMPEG:
FFMPEG = None
log.warning('Failed to locate ffmpeg. Transcoding disabled!')
log.warning('Install ffmpeg with x264 support to enable this feature ...')
if not FFMPEG_PATH:
ffprobe_bin = FFMPEG_PATH / 'ffprobe'
avprobe_bin = FFMPEG_PATH / 'avprobe'
if ffprobe_bin.is_file() or os.access(ffprobe_bin, os.X_OK):
FFPROBE = ffprobe_bin
elif avprobe_bin.is_file() or os.access(avprobe_bin, os.X_OK):
FFPROBE = avprobe_bin
if not FFPROBE:
FFPROBE = which('ffprobe') or which('avprobe')
if not FFPROBE:
FFPROBE = None
if CHECK_MEDIA:
log.warning('Failed to locate ffprobe. Video corruption detection disabled!')
log.warning('Install ffmpeg with x264 support to enable this feature ...')
path = None
SEVENZIP = tool.find_unzip(path)
def initialize(section=None):

@ -144,10 +144,8 @@ def process(*, section: str, dir_name: str, input_name: str = '', status: int =
failure_link += '&corrupt=true'
elif client_agent == 'manual':
log.warning(f'No media files found in directory {dir_name} to manually process.')
return ProcessResult(
message='',
status_code=0, # Success (as far as this script is concerned)
)
# Success (as far as this script is concerned)
return ProcessResult.success()
else:
log.warning(f'No media files found in directory {dir_name}. Processing this as a failed download')
status = 1

@ -34,8 +34,11 @@ def extract(file_path, output_destination):
required_cmds = ['unrar', 'unzip', 'tar', 'unxz', 'unlzma', '7zr', 'bunzip2', 'gunzip']
# ## Possible future suport:
# gunzip: gz (cmd will delete original archive)
# ## the following do not extract to dest dir
# '.xz': ['xz', '-d --keep'], # '.lzma': ['xz', '-d --format=lzma --keep'], # '.bz2': ['bzip2', '-d --keep'], extract_commands = { '.rar': ['unrar', 'x', '-o+', '-y'], '.tar': ['tar', '-xf'], '.zip': ['unzip'], '.tar.gz': ['tar', '-xzf'], '.tgz': ['tar', '-xzf'], '.tar.bz2': ['tar', '-xjf'], '.tbz': ['tar', '-xjf'], '.tar.lzma': ['tar', '--lzma', '-xf'], '.tlz': ['tar', '--lzma', '-xf'], '.tar.xz': ['tar', '--xz', '-xf'], '.txz': ['tar', '--xz', '-xf'], '.7z': ['7zr', 'x'], '.gz': ['gunzip'], }
# ## the following do not extract to destination dir
# '.xz': ['xz', '-d --keep'],
# '.lzma': ['xz', '-d --format=lzma --keep'],
# '.bz2': ['bzip2', '-d --keep']
extract_commands = {'.rar': ['unrar', 'x', '-o+', '-y'], '.tar': ['tar', '-xf'], '.zip': ['unzip'], '.tar.gz': ['tar', '-xzf'], '.tgz': ['tar', '-xzf'], '.tar.bz2': ['tar', '-xjf'], '.tbz': ['tar', '-xjf'], '.tar.lzma': ['tar', '--lzma', '-xf'], '.tlz': ['tar', '--lzma', '-xf'], '.tar.xz': ['tar', '--xz', '-xf'], '.txz': ['tar', '--xz', '-xf'], '.7z': ['7zr', 'x'], '.gz': ['gunzip']}
# Test command exists and if not, remove
if not os.getenv('TR_TORRENT_DIR'):
for cmd in required_cmds:

114
nzb2media/tool.py Normal file

@ -0,0 +1,114 @@
from __future__ import annotations
import itertools
import logging
import os
import pathlib
import shutil
import typing
log = logging.getLogger(__name__)
log.addHandler(logging.NullHandler())
def in_path(name: str) -> pathlib.Path | None:
"""Find tool if its on the system loc."""
log.debug(f'Searching for {name} on system path')
path = shutil.which(name)
if not path:
return None
return pathlib.Path(path)
def at_location(root: pathlib.Path, name: str) -> pathlib.Path | None:
"""Return tool if its at given loc."""
log.debug(f'Searching for {name} at {root}')
if not name:
raise ValueError('name is required')
path = root / name
if path.exists() or os.access(path, os.X_OK):
return path
return None
def find(root: pathlib.Path | None, *names) -> pathlib.Path | None:
"""Try to find a tool.
Look in target location first, then system path,
and finally check the current working directory.
"""
if not names:
raise ValueError('At least one name is required.')
# look in target location first
if root:
found_at_location: typing.Iterable[pathlib.Path | None] = (at_location(root, name) for name in names)
else:
found_at_location = []
# look on system path second
found_on_path = (in_path(name) for name in names)
found = itertools.chain(found_at_location, found_on_path)
for path in found:
if path is not None:
log.info(f'Found at {path}')
return path
# finally check current working directory
cwd = pathlib.Path.cwd()
log.debug(f'Falling back on current working directory: {cwd}')
found_in_working_directory = (at_location(cwd, name) for name in names)
for path in found_in_working_directory:
if path is not None:
log.info(f'Found {path}')
return path
return None
def find_transcoder(root: pathlib.Path | None = None) -> pathlib.Path | None:
"""Find a tool for transcoding."""
log.info('Searching for transcoding tool.')
names = ('ffmpeg', 'avconv')
found = find(root, *names)
if not found:
log.debug(f'Failed to locate any of the following: {names}')
log.warning('Transcoding disabled!')
log.warning('Install ffmpeg with x264 support to enable this feature.')
return found
def find_video_corruption_detector(root: pathlib.Path | None = None) -> pathlib.Path | None:
"""Find a tool for detecting video corruption."""
log.info('Searching for video corruption detection tool.')
names = ('ffprobe', 'avprobe')
found = find(root, *names)
if not found:
log.debug(f'Failed to locate any of the following: {names}')
log.warning('Video corruption detection disabled!')
log.warning('Install ffmpeg with x264 support to enable this feature.')
return found
def find_archive_repairer(root: pathlib.Path | None = None) -> pathlib.Path | None:
"""Find a tool for repairing and renaming archives."""
log.info('Searching for file repair and renaming tool.')
names = ('par2',)
found = find(root, *names)
if not found:
log.debug(f'Failed to locate any of the following: {names}')
log.warning('Archive repair and renaming disabled!')
log.warning('Install a parity archive repair tool to enable this feature.')
return found
def find_unzip(root: pathlib.Path | None = None) -> pathlib.Path | None:
"""Find a tool for unzipping archives."""
log.info('Searching for an unzipping tool.')
names = ('7z', '7zr', '7za')
found = find(root, *names)
if not found:
log.debug(f'Failed to locate any of the following: {names}')
log.warning('Transcoding of disk images and extraction zip files will not be possible!')
return found

@ -72,7 +72,7 @@ def zip_out(file, img):
if os.path.isfile(file):
cmd = ['cat', file]
else:
cmd = [nzb2media.SEVENZIP, '-so', 'e', img, file]
cmd = [os.fspath(nzb2media.SEVENZIP), '-so', 'e', img, file]
try:
with subprocess.Popen(cmd, stdout=PIPE, stderr=DEVNULL) as proc:
return proc
@ -87,11 +87,11 @@ def get_video_details(videofile, img=None):
file = videofile
if not nzb2media.FFPROBE:
return video_details, result
print_format = '-of' if 'avprobe' in nzb2media.FFPROBE else '-print_format'
print_format = '-of' if 'avprobe' in nzb2media.FFPROBE.name else '-print_format'
try:
if img:
videofile = '-'
command = [nzb2media.FFPROBE, '-v', 'quiet', print_format, 'json', '-show_format', '-show_streams', '-show_error', videofile]
command = [os.fspath(nzb2media.FFPROBE), '-v', 'quiet', print_format, 'json', '-show_format', '-show_streams', '-show_error', videofile]
print_cmd(command)
if img:
procin = zip_out(file, img)
@ -106,7 +106,7 @@ def get_video_details(videofile, img=None):
video_details = json.loads(proc_out.decode())
except Exception:
try: # try this again without -show error in case of ffmpeg limitation
command = [nzb2media.FFPROBE, '-v', 'quiet', print_format, 'json', '-show_format', '-show_streams', videofile]
command = [os.fspath(nzb2media.FFPROBE), '-v', 'quiet', print_format, 'json', '-show_format', '-show_streams', videofile]
print_cmd(command)
if img:
procin = zip_out(file, img)
@ -469,7 +469,7 @@ def build_commands(file, new_dir, movie_name):
break
if sub['codec_name'] in {'dvd_subtitle', 'VobSub'} and nzb2media.SCODEC == 'mov_text':
continue # We can't convert these.
_inded = sub['index']
_index = sub['index']
map_cmd.extend(['-map', f'0:{_index}'])
s_mapped.extend([sub['index']])
if nzb2media.SINCLUDE:

@ -261,8 +261,8 @@ class GitUpdateManager(UpdateManager):
return False
def update(self):
"""
Check git for a new version.
"""Check git for a new version.
Calls git pull origin <branch> in order to update Sick Beard.
Returns a bool depending on the call's success.
"""
@ -308,8 +308,8 @@ class SourceUpdateManager(UpdateManager):
return False
def _check_github_for_update(self):
"""
Check Github for a new version.
""" Check Github for a new version.
Uses pygithub to ask github if there is a newer version than
the provided commit hash. If there is a newer version it sets
Sick Beard's version text.
@ -388,7 +388,7 @@ class SourceUpdateManager(UpdateManager):
# walk temp folder and move files to main folder
log.info(f'Moving files from {content_dir} to {nzb2media.APP_ROOT}')
for dirname, _, filenames in os.walk(content_dir):
dirname = dirname[len(content_dir) + 1 :]
dirname = dirname[len(content_dir) + 1:]
for curfile in filenames:
old_path = os.path.join(content_dir, dirname, curfile)
new_path = os.path.join(nzb2media.APP_ROOT, dirname, curfile)

@ -1,7 +0,0 @@
import nzb2media
def test_has_ffmpeg():
nzb2media.configure_utility_locations()
assert nzb2media.FFMPEG is not None
assert nzb2media.FFMPEG.exists()

7
tests/tool_test.py Normal file

@ -0,0 +1,7 @@
import nzb2media.tool
def test_tool_in_path():
ffmpeg = nzb2media.tool.in_path('ffmpeg')
avprobe = nzb2media.tool.in_path('avprobe')
assert ffmpeg or avprobe