mirror of
https://github.com/clinton-hall/nzbToMedia.git
synced 2025-01-09 20:43:19 -08:00
498 lines
14 KiB
Python
498 lines
14 KiB
Python
import os
|
|
import sys
|
|
import operator
|
|
import collections
|
|
import functools
|
|
import stat
|
|
from ctypes import (
|
|
POINTER,
|
|
byref,
|
|
cast,
|
|
create_unicode_buffer,
|
|
create_string_buffer,
|
|
windll,
|
|
)
|
|
from ctypes.wintypes import LPWSTR
|
|
import nt
|
|
import posixpath
|
|
import builtins
|
|
|
|
from jaraco.structures import binary
|
|
|
|
from jaraco.windows.error import WindowsError, handle_nonzero_success
|
|
import jaraco.windows.api.filesystem as api
|
|
from jaraco.windows import reparse
|
|
|
|
|
|
def mklink():
|
|
"""
|
|
Like cmd.exe's mklink except it will infer directory status of the
|
|
target.
|
|
"""
|
|
from optparse import OptionParser
|
|
|
|
parser = OptionParser(usage="usage: %prog [options] link target")
|
|
parser.add_option(
|
|
'-d',
|
|
'--directory',
|
|
help="Target is a directory (only necessary if not present)",
|
|
action="store_true",
|
|
)
|
|
options, args = parser.parse_args()
|
|
try:
|
|
link, target = args
|
|
except ValueError:
|
|
parser.error("incorrect number of arguments")
|
|
symlink(target, link, options.directory)
|
|
sys.stdout.write("Symbolic link created: %(link)s --> %(target)s\n" % vars())
|
|
|
|
|
|
def _is_target_a_directory(link, rel_target):
|
|
"""
|
|
If creating a symlink from link to a target, determine if target
|
|
is a directory (relative to dirname(link)).
|
|
"""
|
|
target = os.path.join(os.path.dirname(link), rel_target)
|
|
return os.path.isdir(target)
|
|
|
|
|
|
def symlink(target, link, target_is_directory=False):
|
|
"""
|
|
An implementation of os.symlink for Windows (Vista and greater)
|
|
"""
|
|
target_is_directory = target_is_directory or _is_target_a_directory(link, target)
|
|
# normalize the target (MS symlinks don't respect forward slashes)
|
|
target = os.path.normpath(target)
|
|
flags = target_is_directory | api.SYMBOLIC_LINK_FLAG_ALLOW_UNPRIVILEGED_CREATE
|
|
handle_nonzero_success(api.CreateSymbolicLink(link, target, flags))
|
|
|
|
|
|
def link(target, link):
|
|
"""
|
|
Establishes a hard link between an existing file and a new file.
|
|
"""
|
|
handle_nonzero_success(api.CreateHardLink(link, target, None))
|
|
|
|
|
|
def is_reparse_point(path):
|
|
"""
|
|
Determine if the given path is a reparse point.
|
|
Return False if the file does not exist or the file attributes cannot
|
|
be determined.
|
|
"""
|
|
res = api.GetFileAttributes(path)
|
|
return res != api.INVALID_FILE_ATTRIBUTES and bool(
|
|
res & api.FILE_ATTRIBUTE_REPARSE_POINT
|
|
)
|
|
|
|
|
|
def islink(path):
|
|
"Determine if the given path is a symlink"
|
|
return is_reparse_point(path) and is_symlink(path)
|
|
|
|
|
|
def _patch_path(path):
|
|
r"""
|
|
Paths have a max length of api.MAX_PATH characters (260). If a target path
|
|
is longer than that, it needs to be made absolute and prepended with
|
|
\\?\ in order to work with API calls.
|
|
See http://msdn.microsoft.com/en-us/library/aa365247%28v=vs.85%29.aspx for
|
|
details.
|
|
"""
|
|
if path.startswith('\\\\?\\'):
|
|
return path
|
|
abs_path = os.path.abspath(path)
|
|
if not abs_path[1] == ':':
|
|
# python doesn't include the drive letter, but \\?\ requires it
|
|
abs_path = os.getcwd()[:2] + abs_path
|
|
return '\\\\?\\' + abs_path
|
|
|
|
|
|
def is_symlink(path):
|
|
"""
|
|
Assuming path is a reparse point, determine if it's a symlink.
|
|
"""
|
|
path = _patch_path(path)
|
|
try:
|
|
return _is_symlink(next(find_files(path)))
|
|
# comment below workaround for PyCQA/pyflakes#376
|
|
except WindowsError as orig_error: # noqa: F841
|
|
tmpl = "Error accessing {path}: {orig_error.message}"
|
|
raise builtins.WindowsError(tmpl.format(**locals()))
|
|
|
|
|
|
def _is_symlink(find_data):
|
|
return find_data.reserved[0] == api.IO_REPARSE_TAG_SYMLINK
|
|
|
|
|
|
def find_files(spec):
|
|
r"""
|
|
A pythonic wrapper around the FindFirstFile/FindNextFile win32 api.
|
|
|
|
>>> root_files = tuple(find_files(r'c:\*'))
|
|
>>> len(root_files) > 1
|
|
True
|
|
>>> root_files[0].filename == root_files[1].filename
|
|
False
|
|
|
|
This test might fail on a non-standard installation
|
|
>>> 'Windows' in (fd.filename for fd in root_files)
|
|
True
|
|
"""
|
|
fd = api.WIN32_FIND_DATA()
|
|
handle = api.FindFirstFile(spec, byref(fd))
|
|
while True:
|
|
if handle == api.INVALID_HANDLE_VALUE:
|
|
raise WindowsError()
|
|
yield fd
|
|
fd = api.WIN32_FIND_DATA()
|
|
res = api.FindNextFile(handle, byref(fd))
|
|
if res == 0: # error
|
|
error = WindowsError()
|
|
if error.code == api.ERROR_NO_MORE_FILES:
|
|
break
|
|
else:
|
|
raise error
|
|
# todo: how to close handle when generator is destroyed?
|
|
# hint: catch GeneratorExit
|
|
windll.kernel32.FindClose(handle)
|
|
|
|
|
|
def get_final_path(path):
|
|
r"""
|
|
For a given path, determine the ultimate location of that path.
|
|
Useful for resolving symlink targets.
|
|
This functions wraps the GetFinalPathNameByHandle from the Windows
|
|
SDK.
|
|
|
|
Note, this function fails if a handle cannot be obtained (such as
|
|
for C:\Pagefile.sys on a stock windows system). Consider using
|
|
trace_symlink_target instead.
|
|
"""
|
|
desired_access = api.NULL
|
|
share_mode = api.FILE_SHARE_READ | api.FILE_SHARE_WRITE | api.FILE_SHARE_DELETE
|
|
security_attributes = api.LPSECURITY_ATTRIBUTES() # NULL pointer
|
|
hFile = api.CreateFile(
|
|
path,
|
|
desired_access,
|
|
share_mode,
|
|
security_attributes,
|
|
api.OPEN_EXISTING,
|
|
api.FILE_FLAG_BACKUP_SEMANTICS,
|
|
api.NULL,
|
|
)
|
|
|
|
if hFile == api.INVALID_HANDLE_VALUE:
|
|
raise WindowsError()
|
|
|
|
buf_size = api.GetFinalPathNameByHandle(hFile, LPWSTR(), 0, api.VOLUME_NAME_DOS)
|
|
handle_nonzero_success(buf_size)
|
|
buf = create_unicode_buffer(buf_size)
|
|
result_length = api.GetFinalPathNameByHandle(
|
|
hFile, buf, len(buf), api.VOLUME_NAME_DOS
|
|
)
|
|
|
|
assert result_length < len(buf)
|
|
handle_nonzero_success(result_length)
|
|
handle_nonzero_success(api.CloseHandle(hFile))
|
|
|
|
return buf[:result_length]
|
|
|
|
|
|
def compat_stat(path):
|
|
"""
|
|
Generate stat as found on Python 3.2 and later.
|
|
"""
|
|
stat = os.stat(path)
|
|
info = get_file_info(path)
|
|
# rewrite st_ino, st_dev, and st_nlink based on file info
|
|
return nt.stat_result(
|
|
(stat.st_mode,)
|
|
+ (info.file_index, info.volume_serial_number, info.number_of_links)
|
|
+ stat[4:]
|
|
)
|
|
|
|
|
|
def samefile(f1, f2):
|
|
"""
|
|
Backport of samefile from Python 3.2 with support for Windows.
|
|
"""
|
|
return posixpath.samestat(compat_stat(f1), compat_stat(f2))
|
|
|
|
|
|
def get_file_info(path):
|
|
# open the file the same way CPython does in posixmodule.c
|
|
desired_access = api.FILE_READ_ATTRIBUTES
|
|
share_mode = 0
|
|
security_attributes = None
|
|
creation_disposition = api.OPEN_EXISTING
|
|
flags_and_attributes = (
|
|
api.FILE_ATTRIBUTE_NORMAL
|
|
| api.FILE_FLAG_BACKUP_SEMANTICS
|
|
| api.FILE_FLAG_OPEN_REPARSE_POINT
|
|
)
|
|
template_file = None
|
|
|
|
handle = api.CreateFile(
|
|
path,
|
|
desired_access,
|
|
share_mode,
|
|
security_attributes,
|
|
creation_disposition,
|
|
flags_and_attributes,
|
|
template_file,
|
|
)
|
|
|
|
if handle == api.INVALID_HANDLE_VALUE:
|
|
raise WindowsError()
|
|
|
|
info = api.BY_HANDLE_FILE_INFORMATION()
|
|
res = api.GetFileInformationByHandle(handle, info)
|
|
handle_nonzero_success(res)
|
|
handle_nonzero_success(api.CloseHandle(handle))
|
|
|
|
return info
|
|
|
|
|
|
def GetBinaryType(filepath):
|
|
res = api.DWORD()
|
|
handle_nonzero_success(api._GetBinaryType(filepath, res))
|
|
return res
|
|
|
|
|
|
def _make_null_terminated_list(obs):
|
|
obs = _makelist(obs)
|
|
if obs is None:
|
|
return
|
|
return u'\x00'.join(obs) + u'\x00\x00'
|
|
|
|
|
|
def _makelist(ob):
|
|
if ob is None:
|
|
return
|
|
if not isinstance(ob, (list, tuple, set)):
|
|
return [ob]
|
|
return ob
|
|
|
|
|
|
def SHFileOperation(operation, from_, to=None, flags=[]):
|
|
flags = functools.reduce(operator.or_, flags, 0)
|
|
from_ = _make_null_terminated_list(from_)
|
|
to = _make_null_terminated_list(to)
|
|
params = api.SHFILEOPSTRUCT(0, operation, from_, to, flags)
|
|
res = api._SHFileOperation(params)
|
|
if res != 0:
|
|
raise RuntimeError("SHFileOperation returned %d" % res)
|
|
|
|
|
|
def join(*paths):
|
|
r"""
|
|
Wrapper around os.path.join that works with Windows drive letters.
|
|
|
|
>>> join('d:\\foo', '\\bar')
|
|
'd:\\bar'
|
|
"""
|
|
paths_with_drives = map(os.path.splitdrive, paths)
|
|
drives, paths = zip(*paths_with_drives)
|
|
# the drive we care about is the last one in the list
|
|
drive = next(filter(None, reversed(drives)), '')
|
|
return os.path.join(drive, os.path.join(*paths))
|
|
|
|
|
|
def resolve_path(target, start=os.path.curdir):
|
|
r"""
|
|
Find a path from start to target where target is relative to start.
|
|
|
|
>>> tmp = str(getfixture('tmpdir_as_cwd'))
|
|
|
|
>>> findpath('d:\\')
|
|
'd:\\'
|
|
|
|
>>> findpath('d:\\', tmp)
|
|
'd:\\'
|
|
|
|
>>> findpath('\\bar', 'd:\\')
|
|
'd:\\bar'
|
|
|
|
>>> findpath('\\bar', 'd:\\foo') # fails with '\\bar'
|
|
'd:\\bar'
|
|
|
|
>>> findpath('bar', 'd:\\foo')
|
|
'd:\\foo\\bar'
|
|
|
|
>>> findpath('\\baz', 'd:\\foo\\bar') # fails with '\\baz'
|
|
'd:\\baz'
|
|
|
|
>>> os.path.abspath(findpath('\\bar')).lower()
|
|
'c:\\bar'
|
|
|
|
>>> os.path.abspath(findpath('bar'))
|
|
'...\\bar'
|
|
|
|
>>> findpath('..', 'd:\\foo\\bar')
|
|
'd:\\foo'
|
|
|
|
The parent of the root directory is the root directory.
|
|
>>> findpath('..', 'd:\\')
|
|
'd:\\'
|
|
"""
|
|
return os.path.normpath(join(start, target))
|
|
|
|
|
|
findpath = resolve_path
|
|
|
|
|
|
def trace_symlink_target(link):
|
|
"""
|
|
Given a file that is known to be a symlink, trace it to its ultimate
|
|
target.
|
|
|
|
Raises TargetNotPresent when the target cannot be determined.
|
|
Raises ValueError when the specified link is not a symlink.
|
|
"""
|
|
|
|
if not is_symlink(link):
|
|
raise ValueError("link must point to a symlink on the system")
|
|
while is_symlink(link):
|
|
orig = os.path.dirname(link)
|
|
link = readlink(link)
|
|
link = resolve_path(link, orig)
|
|
return link
|
|
|
|
|
|
def readlink(link):
|
|
"""
|
|
readlink(link) -> target
|
|
Return a string representing the path to which the symbolic link points.
|
|
"""
|
|
handle = api.CreateFile(
|
|
link,
|
|
0,
|
|
0,
|
|
None,
|
|
api.OPEN_EXISTING,
|
|
api.FILE_FLAG_OPEN_REPARSE_POINT | api.FILE_FLAG_BACKUP_SEMANTICS,
|
|
None,
|
|
)
|
|
|
|
if handle == api.INVALID_HANDLE_VALUE:
|
|
raise WindowsError()
|
|
|
|
res = reparse.DeviceIoControl(handle, api.FSCTL_GET_REPARSE_POINT, None, 10240)
|
|
|
|
bytes = create_string_buffer(res)
|
|
p_rdb = cast(bytes, POINTER(api.REPARSE_DATA_BUFFER))
|
|
rdb = p_rdb.contents
|
|
if not rdb.tag == api.IO_REPARSE_TAG_SYMLINK:
|
|
raise RuntimeError("Expected IO_REPARSE_TAG_SYMLINK, but got %d" % rdb.tag)
|
|
|
|
handle_nonzero_success(api.CloseHandle(handle))
|
|
return rdb.get_substitute_name()
|
|
|
|
|
|
def patch_os_module():
|
|
"""
|
|
jaraco.windows provides the os.symlink and os.readlink functions.
|
|
Monkey-patch the os module to include them if not present.
|
|
"""
|
|
if not hasattr(os, 'symlink'):
|
|
os.symlink = symlink
|
|
os.path.islink = islink
|
|
if not hasattr(os, 'readlink'):
|
|
os.readlink = readlink
|
|
|
|
|
|
def find_symlinks(root):
|
|
for dirpath, dirnames, filenames in os.walk(root):
|
|
for name in dirnames + filenames:
|
|
pathname = os.path.join(dirpath, name)
|
|
if is_symlink(pathname):
|
|
yield pathname
|
|
# don't traverse symlinks
|
|
if name in dirnames:
|
|
dirnames.remove(name)
|
|
|
|
|
|
def find_symlinks_cmd():
|
|
"""
|
|
%prog [start-path]
|
|
Search the specified path (defaults to the current directory) for symlinks,
|
|
printing the source and target on each line.
|
|
"""
|
|
from optparse import OptionParser
|
|
from textwrap import dedent
|
|
|
|
parser = OptionParser(usage=dedent(find_symlinks_cmd.__doc__).strip())
|
|
options, args = parser.parse_args()
|
|
if not args:
|
|
args = ['.']
|
|
root = args.pop()
|
|
if args:
|
|
parser.error("unexpected argument(s)")
|
|
try:
|
|
for symlink in find_symlinks(root):
|
|
target = readlink(symlink)
|
|
dir = ['', 'D'][os.path.isdir(symlink)]
|
|
msg = '{dir:2}{symlink} --> {target}'.format(**locals())
|
|
print(msg)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
|
|
|
|
class FileAttributes(int, metaclass=binary.BitMask):
|
|
|
|
# extract the values from the stat module on Python 3.5
|
|
# and later.
|
|
locals().update(
|
|
(name.split('FILE_ATTRIBUTES_')[1].lower(), value)
|
|
for name, value in vars(stat).items()
|
|
if name.startswith('FILE_ATTRIBUTES_')
|
|
)
|
|
|
|
# For Python 3.4 and earlier, define the constants here
|
|
archive = 0x20
|
|
compressed = 0x800
|
|
hidden = 0x2
|
|
device = 0x40
|
|
directory = 0x10
|
|
encrypted = 0x4000
|
|
normal = 0x80
|
|
not_content_indexed = 0x2000
|
|
offline = 0x1000
|
|
read_only = 0x1
|
|
reparse_point = 0x400
|
|
sparse_file = 0x200
|
|
system = 0x4
|
|
temporary = 0x100
|
|
virtual = 0x10000
|
|
|
|
@classmethod
|
|
def get(cls, filepath):
|
|
attrs = api.GetFileAttributes(filepath)
|
|
if attrs == api.INVALID_FILE_ATTRIBUTES:
|
|
raise WindowsError()
|
|
return cls(attrs)
|
|
|
|
|
|
GetFileAttributes = FileAttributes.get
|
|
|
|
|
|
def SetFileAttributes(filepath, *attrs):
|
|
"""
|
|
Set file attributes. e.g.:
|
|
|
|
SetFileAttributes('C:\\foo', 'hidden')
|
|
|
|
Each attr must be either a numeric value, a constant defined in
|
|
jaraco.windows.filesystem.api, or one of the nice names
|
|
defined in this function.
|
|
"""
|
|
nice_names = collections.defaultdict(
|
|
lambda key: key,
|
|
hidden='FILE_ATTRIBUTE_HIDDEN',
|
|
read_only='FILE_ATTRIBUTE_READONLY',
|
|
)
|
|
flags = (getattr(api, nice_names[attr], attr) for attr in attrs)
|
|
flags = functools.reduce(operator.or_, flags)
|
|
handle_nonzero_success(api.SetFileAttributes(filepath, flags))
|