nzbToMedia/libs/win/jaraco/windows/filesystem/__init__.py
2022-11-29 00:44:47 -05:00

498 lines
14 KiB
Python

import os
import sys
import operator
import collections
import functools
import stat
from ctypes import (
POINTER,
byref,
cast,
create_unicode_buffer,
create_string_buffer,
windll,
)
from ctypes.wintypes import LPWSTR
import nt
import posixpath
import builtins
from jaraco.structures import binary
from jaraco.windows.error import WindowsError, handle_nonzero_success
import jaraco.windows.api.filesystem as api
from jaraco.windows import reparse
def mklink():
"""
Like cmd.exe's mklink except it will infer directory status of the
target.
"""
from optparse import OptionParser
parser = OptionParser(usage="usage: %prog [options] link target")
parser.add_option(
'-d',
'--directory',
help="Target is a directory (only necessary if not present)",
action="store_true",
)
options, args = parser.parse_args()
try:
link, target = args
except ValueError:
parser.error("incorrect number of arguments")
symlink(target, link, options.directory)
sys.stdout.write("Symbolic link created: %(link)s --> %(target)s\n" % vars())
def _is_target_a_directory(link, rel_target):
"""
If creating a symlink from link to a target, determine if target
is a directory (relative to dirname(link)).
"""
target = os.path.join(os.path.dirname(link), rel_target)
return os.path.isdir(target)
def symlink(target, link, target_is_directory=False):
"""
An implementation of os.symlink for Windows (Vista and greater)
"""
target_is_directory = target_is_directory or _is_target_a_directory(link, target)
# normalize the target (MS symlinks don't respect forward slashes)
target = os.path.normpath(target)
flags = target_is_directory | api.SYMBOLIC_LINK_FLAG_ALLOW_UNPRIVILEGED_CREATE
handle_nonzero_success(api.CreateSymbolicLink(link, target, flags))
def link(target, link):
"""
Establishes a hard link between an existing file and a new file.
"""
handle_nonzero_success(api.CreateHardLink(link, target, None))
def is_reparse_point(path):
"""
Determine if the given path is a reparse point.
Return False if the file does not exist or the file attributes cannot
be determined.
"""
res = api.GetFileAttributes(path)
return res != api.INVALID_FILE_ATTRIBUTES and bool(
res & api.FILE_ATTRIBUTE_REPARSE_POINT
)
def islink(path):
"Determine if the given path is a symlink"
return is_reparse_point(path) and is_symlink(path)
def _patch_path(path):
r"""
Paths have a max length of api.MAX_PATH characters (260). If a target path
is longer than that, it needs to be made absolute and prepended with
\\?\ in order to work with API calls.
See http://msdn.microsoft.com/en-us/library/aa365247%28v=vs.85%29.aspx for
details.
"""
if path.startswith('\\\\?\\'):
return path
abs_path = os.path.abspath(path)
if not abs_path[1] == ':':
# python doesn't include the drive letter, but \\?\ requires it
abs_path = os.getcwd()[:2] + abs_path
return '\\\\?\\' + abs_path
def is_symlink(path):
"""
Assuming path is a reparse point, determine if it's a symlink.
"""
path = _patch_path(path)
try:
return _is_symlink(next(find_files(path)))
# comment below workaround for PyCQA/pyflakes#376
except WindowsError as orig_error: # noqa: F841
tmpl = "Error accessing {path}: {orig_error.message}"
raise builtins.WindowsError(tmpl.format(**locals()))
def _is_symlink(find_data):
return find_data.reserved[0] == api.IO_REPARSE_TAG_SYMLINK
def find_files(spec):
r"""
A pythonic wrapper around the FindFirstFile/FindNextFile win32 api.
>>> root_files = tuple(find_files(r'c:\*'))
>>> len(root_files) > 1
True
>>> root_files[0].filename == root_files[1].filename
False
This test might fail on a non-standard installation
>>> 'Windows' in (fd.filename for fd in root_files)
True
"""
fd = api.WIN32_FIND_DATA()
handle = api.FindFirstFile(spec, byref(fd))
while True:
if handle == api.INVALID_HANDLE_VALUE:
raise WindowsError()
yield fd
fd = api.WIN32_FIND_DATA()
res = api.FindNextFile(handle, byref(fd))
if res == 0: # error
error = WindowsError()
if error.code == api.ERROR_NO_MORE_FILES:
break
else:
raise error
# todo: how to close handle when generator is destroyed?
# hint: catch GeneratorExit
windll.kernel32.FindClose(handle)
def get_final_path(path):
r"""
For a given path, determine the ultimate location of that path.
Useful for resolving symlink targets.
This functions wraps the GetFinalPathNameByHandle from the Windows
SDK.
Note, this function fails if a handle cannot be obtained (such as
for C:\Pagefile.sys on a stock windows system). Consider using
trace_symlink_target instead.
"""
desired_access = api.NULL
share_mode = api.FILE_SHARE_READ | api.FILE_SHARE_WRITE | api.FILE_SHARE_DELETE
security_attributes = api.LPSECURITY_ATTRIBUTES() # NULL pointer
hFile = api.CreateFile(
path,
desired_access,
share_mode,
security_attributes,
api.OPEN_EXISTING,
api.FILE_FLAG_BACKUP_SEMANTICS,
api.NULL,
)
if hFile == api.INVALID_HANDLE_VALUE:
raise WindowsError()
buf_size = api.GetFinalPathNameByHandle(hFile, LPWSTR(), 0, api.VOLUME_NAME_DOS)
handle_nonzero_success(buf_size)
buf = create_unicode_buffer(buf_size)
result_length = api.GetFinalPathNameByHandle(
hFile, buf, len(buf), api.VOLUME_NAME_DOS
)
assert result_length < len(buf)
handle_nonzero_success(result_length)
handle_nonzero_success(api.CloseHandle(hFile))
return buf[:result_length]
def compat_stat(path):
"""
Generate stat as found on Python 3.2 and later.
"""
stat = os.stat(path)
info = get_file_info(path)
# rewrite st_ino, st_dev, and st_nlink based on file info
return nt.stat_result(
(stat.st_mode,)
+ (info.file_index, info.volume_serial_number, info.number_of_links)
+ stat[4:]
)
def samefile(f1, f2):
"""
Backport of samefile from Python 3.2 with support for Windows.
"""
return posixpath.samestat(compat_stat(f1), compat_stat(f2))
def get_file_info(path):
# open the file the same way CPython does in posixmodule.c
desired_access = api.FILE_READ_ATTRIBUTES
share_mode = 0
security_attributes = None
creation_disposition = api.OPEN_EXISTING
flags_and_attributes = (
api.FILE_ATTRIBUTE_NORMAL
| api.FILE_FLAG_BACKUP_SEMANTICS
| api.FILE_FLAG_OPEN_REPARSE_POINT
)
template_file = None
handle = api.CreateFile(
path,
desired_access,
share_mode,
security_attributes,
creation_disposition,
flags_and_attributes,
template_file,
)
if handle == api.INVALID_HANDLE_VALUE:
raise WindowsError()
info = api.BY_HANDLE_FILE_INFORMATION()
res = api.GetFileInformationByHandle(handle, info)
handle_nonzero_success(res)
handle_nonzero_success(api.CloseHandle(handle))
return info
def GetBinaryType(filepath):
res = api.DWORD()
handle_nonzero_success(api._GetBinaryType(filepath, res))
return res
def _make_null_terminated_list(obs):
obs = _makelist(obs)
if obs is None:
return
return u'\x00'.join(obs) + u'\x00\x00'
def _makelist(ob):
if ob is None:
return
if not isinstance(ob, (list, tuple, set)):
return [ob]
return ob
def SHFileOperation(operation, from_, to=None, flags=[]):
flags = functools.reduce(operator.or_, flags, 0)
from_ = _make_null_terminated_list(from_)
to = _make_null_terminated_list(to)
params = api.SHFILEOPSTRUCT(0, operation, from_, to, flags)
res = api._SHFileOperation(params)
if res != 0:
raise RuntimeError("SHFileOperation returned %d" % res)
def join(*paths):
r"""
Wrapper around os.path.join that works with Windows drive letters.
>>> join('d:\\foo', '\\bar')
'd:\\bar'
"""
paths_with_drives = map(os.path.splitdrive, paths)
drives, paths = zip(*paths_with_drives)
# the drive we care about is the last one in the list
drive = next(filter(None, reversed(drives)), '')
return os.path.join(drive, os.path.join(*paths))
def resolve_path(target, start=os.path.curdir):
r"""
Find a path from start to target where target is relative to start.
>>> tmp = str(getfixture('tmpdir_as_cwd'))
>>> findpath('d:\\')
'd:\\'
>>> findpath('d:\\', tmp)
'd:\\'
>>> findpath('\\bar', 'd:\\')
'd:\\bar'
>>> findpath('\\bar', 'd:\\foo') # fails with '\\bar'
'd:\\bar'
>>> findpath('bar', 'd:\\foo')
'd:\\foo\\bar'
>>> findpath('\\baz', 'd:\\foo\\bar') # fails with '\\baz'
'd:\\baz'
>>> os.path.abspath(findpath('\\bar')).lower()
'c:\\bar'
>>> os.path.abspath(findpath('bar'))
'...\\bar'
>>> findpath('..', 'd:\\foo\\bar')
'd:\\foo'
The parent of the root directory is the root directory.
>>> findpath('..', 'd:\\')
'd:\\'
"""
return os.path.normpath(join(start, target))
findpath = resolve_path
def trace_symlink_target(link):
"""
Given a file that is known to be a symlink, trace it to its ultimate
target.
Raises TargetNotPresent when the target cannot be determined.
Raises ValueError when the specified link is not a symlink.
"""
if not is_symlink(link):
raise ValueError("link must point to a symlink on the system")
while is_symlink(link):
orig = os.path.dirname(link)
link = readlink(link)
link = resolve_path(link, orig)
return link
def readlink(link):
"""
readlink(link) -> target
Return a string representing the path to which the symbolic link points.
"""
handle = api.CreateFile(
link,
0,
0,
None,
api.OPEN_EXISTING,
api.FILE_FLAG_OPEN_REPARSE_POINT | api.FILE_FLAG_BACKUP_SEMANTICS,
None,
)
if handle == api.INVALID_HANDLE_VALUE:
raise WindowsError()
res = reparse.DeviceIoControl(handle, api.FSCTL_GET_REPARSE_POINT, None, 10240)
bytes = create_string_buffer(res)
p_rdb = cast(bytes, POINTER(api.REPARSE_DATA_BUFFER))
rdb = p_rdb.contents
if not rdb.tag == api.IO_REPARSE_TAG_SYMLINK:
raise RuntimeError("Expected IO_REPARSE_TAG_SYMLINK, but got %d" % rdb.tag)
handle_nonzero_success(api.CloseHandle(handle))
return rdb.get_substitute_name()
def patch_os_module():
"""
jaraco.windows provides the os.symlink and os.readlink functions.
Monkey-patch the os module to include them if not present.
"""
if not hasattr(os, 'symlink'):
os.symlink = symlink
os.path.islink = islink
if not hasattr(os, 'readlink'):
os.readlink = readlink
def find_symlinks(root):
for dirpath, dirnames, filenames in os.walk(root):
for name in dirnames + filenames:
pathname = os.path.join(dirpath, name)
if is_symlink(pathname):
yield pathname
# don't traverse symlinks
if name in dirnames:
dirnames.remove(name)
def find_symlinks_cmd():
"""
%prog [start-path]
Search the specified path (defaults to the current directory) for symlinks,
printing the source and target on each line.
"""
from optparse import OptionParser
from textwrap import dedent
parser = OptionParser(usage=dedent(find_symlinks_cmd.__doc__).strip())
options, args = parser.parse_args()
if not args:
args = ['.']
root = args.pop()
if args:
parser.error("unexpected argument(s)")
try:
for symlink in find_symlinks(root):
target = readlink(symlink)
dir = ['', 'D'][os.path.isdir(symlink)]
msg = '{dir:2}{symlink} --> {target}'.format(**locals())
print(msg)
except KeyboardInterrupt:
pass
class FileAttributes(int, metaclass=binary.BitMask):
# extract the values from the stat module on Python 3.5
# and later.
locals().update(
(name.split('FILE_ATTRIBUTES_')[1].lower(), value)
for name, value in vars(stat).items()
if name.startswith('FILE_ATTRIBUTES_')
)
# For Python 3.4 and earlier, define the constants here
archive = 0x20
compressed = 0x800
hidden = 0x2
device = 0x40
directory = 0x10
encrypted = 0x4000
normal = 0x80
not_content_indexed = 0x2000
offline = 0x1000
read_only = 0x1
reparse_point = 0x400
sparse_file = 0x200
system = 0x4
temporary = 0x100
virtual = 0x10000
@classmethod
def get(cls, filepath):
attrs = api.GetFileAttributes(filepath)
if attrs == api.INVALID_FILE_ATTRIBUTES:
raise WindowsError()
return cls(attrs)
GetFileAttributes = FileAttributes.get
def SetFileAttributes(filepath, *attrs):
"""
Set file attributes. e.g.:
SetFileAttributes('C:\\foo', 'hidden')
Each attr must be either a numeric value, a constant defined in
jaraco.windows.filesystem.api, or one of the nice names
defined in this function.
"""
nice_names = collections.defaultdict(
lambda key: key,
hidden='FILE_ATTRIBUTE_HIDDEN',
read_only='FILE_ATTRIBUTE_READONLY',
)
flags = (getattr(api, nice_names[attr], attr) for attr in attrs)
flags = functools.reduce(operator.or_, flags)
handle_nonzero_success(api.SetFileAttributes(filepath, flags))