mirror of
https://github.com/Tautulli/Tautulli.git
synced 2025-01-21 18:32:58 -08:00
398 lines
14 KiB
Python
398 lines
14 KiB
Python
"""
|
|
Low-level helpers for the SecureTransport bindings.
|
|
|
|
These are Python functions that are not directly related to the high-level APIs
|
|
but are necessary to get them to work. They include a whole bunch of low-level
|
|
CoreFoundation messing about and memory management. The concerns in this module
|
|
are almost entirely about trying to avoid memory leaks and providing
|
|
appropriate and useful assistance to the higher-level code.
|
|
"""
|
|
import base64
|
|
import ctypes
|
|
import itertools
|
|
import os
|
|
import re
|
|
import ssl
|
|
import struct
|
|
import tempfile
|
|
|
|
from .bindings import CFConst, CoreFoundation, Security
|
|
|
|
# This regular expression is used to grab PEM data out of a PEM bundle.
|
|
_PEM_CERTS_RE = re.compile(
|
|
b"-----BEGIN CERTIFICATE-----\n(.*?)\n-----END CERTIFICATE-----", re.DOTALL
|
|
)
|
|
|
|
|
|
def _cf_data_from_bytes(bytestring):
|
|
"""
|
|
Given a bytestring, create a CFData object from it. This CFData object must
|
|
be CFReleased by the caller.
|
|
"""
|
|
return CoreFoundation.CFDataCreate(
|
|
CoreFoundation.kCFAllocatorDefault, bytestring, len(bytestring)
|
|
)
|
|
|
|
|
|
def _cf_dictionary_from_tuples(tuples):
|
|
"""
|
|
Given a list of Python tuples, create an associated CFDictionary.
|
|
"""
|
|
dictionary_size = len(tuples)
|
|
|
|
# We need to get the dictionary keys and values out in the same order.
|
|
keys = (t[0] for t in tuples)
|
|
values = (t[1] for t in tuples)
|
|
cf_keys = (CoreFoundation.CFTypeRef * dictionary_size)(*keys)
|
|
cf_values = (CoreFoundation.CFTypeRef * dictionary_size)(*values)
|
|
|
|
return CoreFoundation.CFDictionaryCreate(
|
|
CoreFoundation.kCFAllocatorDefault,
|
|
cf_keys,
|
|
cf_values,
|
|
dictionary_size,
|
|
CoreFoundation.kCFTypeDictionaryKeyCallBacks,
|
|
CoreFoundation.kCFTypeDictionaryValueCallBacks,
|
|
)
|
|
|
|
|
|
def _cfstr(py_bstr):
|
|
"""
|
|
Given a Python binary data, create a CFString.
|
|
The string must be CFReleased by the caller.
|
|
"""
|
|
c_str = ctypes.c_char_p(py_bstr)
|
|
cf_str = CoreFoundation.CFStringCreateWithCString(
|
|
CoreFoundation.kCFAllocatorDefault,
|
|
c_str,
|
|
CFConst.kCFStringEncodingUTF8,
|
|
)
|
|
return cf_str
|
|
|
|
|
|
def _create_cfstring_array(lst):
|
|
"""
|
|
Given a list of Python binary data, create an associated CFMutableArray.
|
|
The array must be CFReleased by the caller.
|
|
|
|
Raises an ssl.SSLError on failure.
|
|
"""
|
|
cf_arr = None
|
|
try:
|
|
cf_arr = CoreFoundation.CFArrayCreateMutable(
|
|
CoreFoundation.kCFAllocatorDefault,
|
|
0,
|
|
ctypes.byref(CoreFoundation.kCFTypeArrayCallBacks),
|
|
)
|
|
if not cf_arr:
|
|
raise MemoryError("Unable to allocate memory!")
|
|
for item in lst:
|
|
cf_str = _cfstr(item)
|
|
if not cf_str:
|
|
raise MemoryError("Unable to allocate memory!")
|
|
try:
|
|
CoreFoundation.CFArrayAppendValue(cf_arr, cf_str)
|
|
finally:
|
|
CoreFoundation.CFRelease(cf_str)
|
|
except BaseException as e:
|
|
if cf_arr:
|
|
CoreFoundation.CFRelease(cf_arr)
|
|
raise ssl.SSLError("Unable to allocate array: %s" % (e,))
|
|
return cf_arr
|
|
|
|
|
|
def _cf_string_to_unicode(value):
|
|
"""
|
|
Creates a Unicode string from a CFString object. Used entirely for error
|
|
reporting.
|
|
|
|
Yes, it annoys me quite a lot that this function is this complex.
|
|
"""
|
|
value_as_void_p = ctypes.cast(value, ctypes.POINTER(ctypes.c_void_p))
|
|
|
|
string = CoreFoundation.CFStringGetCStringPtr(
|
|
value_as_void_p, CFConst.kCFStringEncodingUTF8
|
|
)
|
|
if string is None:
|
|
buffer = ctypes.create_string_buffer(1024)
|
|
result = CoreFoundation.CFStringGetCString(
|
|
value_as_void_p, buffer, 1024, CFConst.kCFStringEncodingUTF8
|
|
)
|
|
if not result:
|
|
raise OSError("Error copying C string from CFStringRef")
|
|
string = buffer.value
|
|
if string is not None:
|
|
string = string.decode("utf-8")
|
|
return string
|
|
|
|
|
|
def _assert_no_error(error, exception_class=None):
|
|
"""
|
|
Checks the return code and throws an exception if there is an error to
|
|
report
|
|
"""
|
|
if error == 0:
|
|
return
|
|
|
|
cf_error_string = Security.SecCopyErrorMessageString(error, None)
|
|
output = _cf_string_to_unicode(cf_error_string)
|
|
CoreFoundation.CFRelease(cf_error_string)
|
|
|
|
if output is None or output == u"":
|
|
output = u"OSStatus %s" % error
|
|
|
|
if exception_class is None:
|
|
exception_class = ssl.SSLError
|
|
|
|
raise exception_class(output)
|
|
|
|
|
|
def _cert_array_from_pem(pem_bundle):
|
|
"""
|
|
Given a bundle of certs in PEM format, turns them into a CFArray of certs
|
|
that can be used to validate a cert chain.
|
|
"""
|
|
# Normalize the PEM bundle's line endings.
|
|
pem_bundle = pem_bundle.replace(b"\r\n", b"\n")
|
|
|
|
der_certs = [
|
|
base64.b64decode(match.group(1)) for match in _PEM_CERTS_RE.finditer(pem_bundle)
|
|
]
|
|
if not der_certs:
|
|
raise ssl.SSLError("No root certificates specified")
|
|
|
|
cert_array = CoreFoundation.CFArrayCreateMutable(
|
|
CoreFoundation.kCFAllocatorDefault,
|
|
0,
|
|
ctypes.byref(CoreFoundation.kCFTypeArrayCallBacks),
|
|
)
|
|
if not cert_array:
|
|
raise ssl.SSLError("Unable to allocate memory!")
|
|
|
|
try:
|
|
for der_bytes in der_certs:
|
|
certdata = _cf_data_from_bytes(der_bytes)
|
|
if not certdata:
|
|
raise ssl.SSLError("Unable to allocate memory!")
|
|
cert = Security.SecCertificateCreateWithData(
|
|
CoreFoundation.kCFAllocatorDefault, certdata
|
|
)
|
|
CoreFoundation.CFRelease(certdata)
|
|
if not cert:
|
|
raise ssl.SSLError("Unable to build cert object!")
|
|
|
|
CoreFoundation.CFArrayAppendValue(cert_array, cert)
|
|
CoreFoundation.CFRelease(cert)
|
|
except Exception:
|
|
# We need to free the array before the exception bubbles further.
|
|
# We only want to do that if an error occurs: otherwise, the caller
|
|
# should free.
|
|
CoreFoundation.CFRelease(cert_array)
|
|
raise
|
|
|
|
return cert_array
|
|
|
|
|
|
def _is_cert(item):
|
|
"""
|
|
Returns True if a given CFTypeRef is a certificate.
|
|
"""
|
|
expected = Security.SecCertificateGetTypeID()
|
|
return CoreFoundation.CFGetTypeID(item) == expected
|
|
|
|
|
|
def _is_identity(item):
|
|
"""
|
|
Returns True if a given CFTypeRef is an identity.
|
|
"""
|
|
expected = Security.SecIdentityGetTypeID()
|
|
return CoreFoundation.CFGetTypeID(item) == expected
|
|
|
|
|
|
def _temporary_keychain():
|
|
"""
|
|
This function creates a temporary Mac keychain that we can use to work with
|
|
credentials. This keychain uses a one-time password and a temporary file to
|
|
store the data. We expect to have one keychain per socket. The returned
|
|
SecKeychainRef must be freed by the caller, including calling
|
|
SecKeychainDelete.
|
|
|
|
Returns a tuple of the SecKeychainRef and the path to the temporary
|
|
directory that contains it.
|
|
"""
|
|
# Unfortunately, SecKeychainCreate requires a path to a keychain. This
|
|
# means we cannot use mkstemp to use a generic temporary file. Instead,
|
|
# we're going to create a temporary directory and a filename to use there.
|
|
# This filename will be 8 random bytes expanded into base64. We also need
|
|
# some random bytes to password-protect the keychain we're creating, so we
|
|
# ask for 40 random bytes.
|
|
random_bytes = os.urandom(40)
|
|
filename = base64.b16encode(random_bytes[:8]).decode("utf-8")
|
|
password = base64.b16encode(random_bytes[8:]) # Must be valid UTF-8
|
|
tempdirectory = tempfile.mkdtemp()
|
|
|
|
keychain_path = os.path.join(tempdirectory, filename).encode("utf-8")
|
|
|
|
# We now want to create the keychain itself.
|
|
keychain = Security.SecKeychainRef()
|
|
status = Security.SecKeychainCreate(
|
|
keychain_path, len(password), password, False, None, ctypes.byref(keychain)
|
|
)
|
|
_assert_no_error(status)
|
|
|
|
# Having created the keychain, we want to pass it off to the caller.
|
|
return keychain, tempdirectory
|
|
|
|
|
|
def _load_items_from_file(keychain, path):
|
|
"""
|
|
Given a single file, loads all the trust objects from it into arrays and
|
|
the keychain.
|
|
Returns a tuple of lists: the first list is a list of identities, the
|
|
second a list of certs.
|
|
"""
|
|
certificates = []
|
|
identities = []
|
|
result_array = None
|
|
|
|
with open(path, "rb") as f:
|
|
raw_filedata = f.read()
|
|
|
|
try:
|
|
filedata = CoreFoundation.CFDataCreate(
|
|
CoreFoundation.kCFAllocatorDefault, raw_filedata, len(raw_filedata)
|
|
)
|
|
result_array = CoreFoundation.CFArrayRef()
|
|
result = Security.SecItemImport(
|
|
filedata, # cert data
|
|
None, # Filename, leaving it out for now
|
|
None, # What the type of the file is, we don't care
|
|
None, # what's in the file, we don't care
|
|
0, # import flags
|
|
None, # key params, can include passphrase in the future
|
|
keychain, # The keychain to insert into
|
|
ctypes.byref(result_array), # Results
|
|
)
|
|
_assert_no_error(result)
|
|
|
|
# A CFArray is not very useful to us as an intermediary
|
|
# representation, so we are going to extract the objects we want
|
|
# and then free the array. We don't need to keep hold of keys: the
|
|
# keychain already has them!
|
|
result_count = CoreFoundation.CFArrayGetCount(result_array)
|
|
for index in range(result_count):
|
|
item = CoreFoundation.CFArrayGetValueAtIndex(result_array, index)
|
|
item = ctypes.cast(item, CoreFoundation.CFTypeRef)
|
|
|
|
if _is_cert(item):
|
|
CoreFoundation.CFRetain(item)
|
|
certificates.append(item)
|
|
elif _is_identity(item):
|
|
CoreFoundation.CFRetain(item)
|
|
identities.append(item)
|
|
finally:
|
|
if result_array:
|
|
CoreFoundation.CFRelease(result_array)
|
|
|
|
CoreFoundation.CFRelease(filedata)
|
|
|
|
return (identities, certificates)
|
|
|
|
|
|
def _load_client_cert_chain(keychain, *paths):
|
|
"""
|
|
Load certificates and maybe keys from a number of files. Has the end goal
|
|
of returning a CFArray containing one SecIdentityRef, and then zero or more
|
|
SecCertificateRef objects, suitable for use as a client certificate trust
|
|
chain.
|
|
"""
|
|
# Ok, the strategy.
|
|
#
|
|
# This relies on knowing that macOS will not give you a SecIdentityRef
|
|
# unless you have imported a key into a keychain. This is a somewhat
|
|
# artificial limitation of macOS (for example, it doesn't necessarily
|
|
# affect iOS), but there is nothing inside Security.framework that lets you
|
|
# get a SecIdentityRef without having a key in a keychain.
|
|
#
|
|
# So the policy here is we take all the files and iterate them in order.
|
|
# Each one will use SecItemImport to have one or more objects loaded from
|
|
# it. We will also point at a keychain that macOS can use to work with the
|
|
# private key.
|
|
#
|
|
# Once we have all the objects, we'll check what we actually have. If we
|
|
# already have a SecIdentityRef in hand, fab: we'll use that. Otherwise,
|
|
# we'll take the first certificate (which we assume to be our leaf) and
|
|
# ask the keychain to give us a SecIdentityRef with that cert's associated
|
|
# key.
|
|
#
|
|
# We'll then return a CFArray containing the trust chain: one
|
|
# SecIdentityRef and then zero-or-more SecCertificateRef objects. The
|
|
# responsibility for freeing this CFArray will be with the caller. This
|
|
# CFArray must remain alive for the entire connection, so in practice it
|
|
# will be stored with a single SSLSocket, along with the reference to the
|
|
# keychain.
|
|
certificates = []
|
|
identities = []
|
|
|
|
# Filter out bad paths.
|
|
paths = (path for path in paths if path)
|
|
|
|
try:
|
|
for file_path in paths:
|
|
new_identities, new_certs = _load_items_from_file(keychain, file_path)
|
|
identities.extend(new_identities)
|
|
certificates.extend(new_certs)
|
|
|
|
# Ok, we have everything. The question is: do we have an identity? If
|
|
# not, we want to grab one from the first cert we have.
|
|
if not identities:
|
|
new_identity = Security.SecIdentityRef()
|
|
status = Security.SecIdentityCreateWithCertificate(
|
|
keychain, certificates[0], ctypes.byref(new_identity)
|
|
)
|
|
_assert_no_error(status)
|
|
identities.append(new_identity)
|
|
|
|
# We now want to release the original certificate, as we no longer
|
|
# need it.
|
|
CoreFoundation.CFRelease(certificates.pop(0))
|
|
|
|
# We now need to build a new CFArray that holds the trust chain.
|
|
trust_chain = CoreFoundation.CFArrayCreateMutable(
|
|
CoreFoundation.kCFAllocatorDefault,
|
|
0,
|
|
ctypes.byref(CoreFoundation.kCFTypeArrayCallBacks),
|
|
)
|
|
for item in itertools.chain(identities, certificates):
|
|
# ArrayAppendValue does a CFRetain on the item. That's fine,
|
|
# because the finally block will release our other refs to them.
|
|
CoreFoundation.CFArrayAppendValue(trust_chain, item)
|
|
|
|
return trust_chain
|
|
finally:
|
|
for obj in itertools.chain(identities, certificates):
|
|
CoreFoundation.CFRelease(obj)
|
|
|
|
|
|
TLS_PROTOCOL_VERSIONS = {
|
|
"SSLv2": (0, 2),
|
|
"SSLv3": (3, 0),
|
|
"TLSv1": (3, 1),
|
|
"TLSv1.1": (3, 2),
|
|
"TLSv1.2": (3, 3),
|
|
}
|
|
|
|
|
|
def _build_tls_unknown_ca_alert(version):
|
|
"""
|
|
Builds a TLS alert record for an unknown CA.
|
|
"""
|
|
ver_maj, ver_min = TLS_PROTOCOL_VERSIONS[version]
|
|
severity_fatal = 0x02
|
|
description_unknown_ca = 0x30
|
|
msg = struct.pack(">BB", severity_fatal, description_unknown_ca)
|
|
msg_len = len(msg)
|
|
record_type_alert = 0x15
|
|
record = struct.pack(">BBBH", record_type_alert, ver_maj, ver_min, msg_len) + msg
|
|
return record
|