nzbToMedia/libs/win/jaraco/windows/privilege.py
2022-11-29 00:44:47 -05:00

145 lines
3.9 KiB
Python

import ctypes
from ctypes import wintypes
from .api import security
from .api import privilege
from .api import process
def get_process_token():
"""
Get the current process token
"""
token = wintypes.HANDLE()
res = process.OpenProcessToken(
process.GetCurrentProcess(), process.TOKEN_ALL_ACCESS, token
)
if not res > 0:
raise RuntimeError("Couldn't get process token")
return token
def get_symlink_luid():
"""
Get the LUID for the SeCreateSymbolicLinkPrivilege
"""
symlink_luid = privilege.LUID()
res = privilege.LookupPrivilegeValue(
None, "SeCreateSymbolicLinkPrivilege", symlink_luid
)
if not res > 0:
raise RuntimeError("Couldn't lookup privilege value")
return symlink_luid
def get_privilege_information():
"""
Get all privileges associated with the current process.
"""
# first call with zero length to determine what size buffer we need
return_length = wintypes.DWORD()
params = [
get_process_token(),
privilege.TOKEN_INFORMATION_CLASS.TokenPrivileges,
None,
0,
return_length,
]
res = privilege.GetTokenInformation(*params)
# assume we now have the necessary length in return_length
buffer = ctypes.create_string_buffer(return_length.value)
params[2] = buffer
params[3] = return_length.value
res = privilege.GetTokenInformation(*params)
assert res > 0, "Error in second GetTokenInformation (%d)" % res
privileges = ctypes.cast(
buffer, ctypes.POINTER(privilege.TOKEN_PRIVILEGES)
).contents
return privileges
def report_privilege_information():
"""
Report all privilege information assigned to the current process.
"""
privileges = get_privilege_information()
print("found {0} privileges".format(privileges.count))
tuple(map(print, privileges))
def enable_symlink_privilege():
"""
Try to assign the symlink privilege to the current process token.
Return True if the assignment is successful.
"""
# create a space in memory for a TOKEN_PRIVILEGES structure
# with one element
size = ctypes.sizeof(privilege.TOKEN_PRIVILEGES)
size += ctypes.sizeof(privilege.LUID_AND_ATTRIBUTES)
buffer = ctypes.create_string_buffer(size)
tp = ctypes.cast(buffer, ctypes.POINTER(privilege.TOKEN_PRIVILEGES)).contents
tp.count = 1
tp.get_array()[0].enable()
tp.get_array()[0].LUID = get_symlink_luid()
token = get_process_token()
res = privilege.AdjustTokenPrivileges(token, False, tp, 0, None, None)
if res == 0:
raise RuntimeError("Error in AdjustTokenPrivileges")
ERROR_NOT_ALL_ASSIGNED = 1300
return ctypes.windll.kernel32.GetLastError() != ERROR_NOT_ALL_ASSIGNED
class PolicyHandle(wintypes.HANDLE):
pass
class LSA_UNICODE_STRING(ctypes.Structure):
_fields_ = [
('length', ctypes.c_ushort),
('max_length', ctypes.c_ushort),
('buffer', ctypes.wintypes.LPWSTR),
]
def OpenPolicy(system_name, object_attributes, access_mask):
policy = PolicyHandle()
raise NotImplementedError(
"Need to construct structures for parameters "
"(see http://msdn.microsoft.com/en-us/library/windows"
"/desktop/aa378299%28v=vs.85%29.aspx)"
)
res = ctypes.windll.advapi32.LsaOpenPolicy(
system_name, object_attributes, access_mask, ctypes.byref(policy)
)
assert res == 0, "Error status {res}".format(**vars())
return policy
def grant_symlink_privilege(who, machine=''):
"""
Grant the 'create symlink' privilege to who.
Based on http://support.microsoft.com/kb/132958
"""
flags = security.POLICY_CREATE_ACCOUNT | security.POLICY_LOOKUP_NAMES
policy = OpenPolicy(machine, flags)
return policy
def main():
assigned = enable_symlink_privilege()
msg = ['failure', 'success'][assigned]
print("Symlink privilege assignment completed with {0}".format(msg))
if __name__ == '__main__':
main()