mirror of
https://github.com/clinton-hall/nzbToMedia.git
synced 2024-11-14 17:40:24 -08:00
236 lines
7.2 KiB
Python
236 lines
7.2 KiB
Python
# -*- coding: utf-8 -*-
|
|
from ...compat import bytes
|
|
from ...exceptions import ReadError, SizeError
|
|
from datetime import datetime, timedelta
|
|
from io import BytesIO
|
|
from struct import unpack
|
|
|
|
|
|
__all__ = ['read_element_id', 'read_element_size', 'read_element_integer', 'read_element_uinteger',
|
|
'read_element_float', 'read_element_string', 'read_element_unicode', 'read_element_date',
|
|
'read_element_binary']
|
|
|
|
|
|
def _read(stream, size):
|
|
"""Read the `stream` for *exactly* `size` bytes and raise an exception if
|
|
less than `size` bytes are actually read
|
|
|
|
:param stream: file-like object from which to read
|
|
:param int size: number of bytes to read
|
|
:raise ReadError: when less than `size` bytes are actually read
|
|
:return: read data from the `stream`
|
|
:rtype: bytes
|
|
|
|
"""
|
|
data = stream.read(size)
|
|
if len(data) < size:
|
|
raise ReadError('Less than %d bytes read (%d)' % (size, len(data)))
|
|
return data
|
|
|
|
|
|
def read_element_id(stream):
|
|
"""Read the Element ID
|
|
|
|
:param stream: file-like object from which to read
|
|
:raise ReadError: when not all the required bytes could be read
|
|
:return: the id of the element
|
|
:rtype: int
|
|
|
|
"""
|
|
char = _read(stream, 1)
|
|
byte = ord(char)
|
|
if byte & 0x80:
|
|
return byte
|
|
elif byte & 0x40:
|
|
return unpack('>H', char + _read(stream, 1))[0]
|
|
elif byte & 0x20:
|
|
b, h = unpack('>BH', char + _read(stream, 2))
|
|
return b * 2 ** 16 + h
|
|
elif byte & 0x10:
|
|
return unpack('>L', char + _read(stream, 3))[0]
|
|
else:
|
|
ValueError('Not an Element ID')
|
|
|
|
|
|
def read_element_size(stream):
|
|
"""Read the Element Size
|
|
|
|
:param stream: file-like object from which to read
|
|
:raise ReadError: when not all the required bytes could be read
|
|
:return: the size of element's data
|
|
:rtype: int
|
|
|
|
"""
|
|
char = _read(stream, 1)
|
|
byte = ord(char)
|
|
if byte & 0x80:
|
|
return unpack('>B', bytes((byte ^ 0x80,)))[0]
|
|
elif byte & 0x40:
|
|
return unpack('>H', bytes((byte ^ 0x40,)) + _read(stream, 1))[0]
|
|
elif byte & 0x20:
|
|
b, h = unpack('>BH', bytes((byte ^ 0x20,)) + _read(stream, 2))
|
|
return b * 2 ** 16 + h
|
|
elif byte & 0x10:
|
|
return unpack('>L', bytes((byte ^ 0x10,)) + _read(stream, 3))[0]
|
|
elif byte & 0x08:
|
|
b, l = unpack('>BL', bytes((byte ^ 0x08,)) + _read(stream, 4))
|
|
return b * 2 ** 32 + l
|
|
elif byte & 0x04:
|
|
h, l = unpack('>HL', bytes((byte ^ 0x04,)) + _read(stream, 5))
|
|
return h * 2 ** 32 + l
|
|
elif byte & 0x02:
|
|
b, h, l = unpack('>BHL', bytes((byte ^ 0x02,)) + _read(stream, 6))
|
|
return b * 2 ** 48 + h * 2 ** 32 + l
|
|
elif byte & 0x01:
|
|
return unpack('>Q', bytes((byte ^ 0x01,)) + _read(stream, 7))[0]
|
|
else:
|
|
ValueError('Not an Element Size')
|
|
|
|
|
|
def read_element_integer(stream, size):
|
|
"""Read the Element Data of type :data:`INTEGER`
|
|
|
|
:param stream: file-like object from which to read
|
|
:param int size: size of element's data
|
|
:raise ReadError: when not all the required bytes could be read
|
|
:raise SizeError: if size is incorrect
|
|
:return: the read integer
|
|
:rtype: int
|
|
|
|
"""
|
|
if size == 1:
|
|
return unpack('>b', _read(stream, 1))[0]
|
|
elif size == 2:
|
|
return unpack('>h', _read(stream, 2))[0]
|
|
elif size == 3:
|
|
b, h = unpack('>bH', _read(stream, 3))
|
|
return b * 2 ** 16 + h
|
|
elif size == 4:
|
|
return unpack('>l', _read(stream, 4))[0]
|
|
elif size == 5:
|
|
b, l = unpack('>bL', _read(stream, 5))
|
|
return b * 2 ** 32 + l
|
|
elif size == 6:
|
|
h, l = unpack('>hL', _read(stream, 6))
|
|
return h * 2 ** 32 + l
|
|
elif size == 7:
|
|
b, h, l = unpack('>bHL', _read(stream, 7))
|
|
return b * 2 ** 48 + h * 2 ** 32 + l
|
|
elif size == 8:
|
|
return unpack('>q', _read(stream, 8))[0]
|
|
else:
|
|
raise SizeError(size)
|
|
|
|
|
|
def read_element_uinteger(stream, size):
|
|
"""Read the Element Data of type :data:`UINTEGER`
|
|
|
|
:param stream: file-like object from which to read
|
|
:param int size: size of element's data
|
|
:raise ReadError: when not all the required bytes could be read
|
|
:raise SizeError: if size is incorrect
|
|
:return: the read unsigned integer
|
|
:rtype: int
|
|
|
|
"""
|
|
if size == 1:
|
|
return unpack('>B', _read(stream, 1))[0]
|
|
elif size == 2:
|
|
return unpack('>H', _read(stream, 2))[0]
|
|
elif size == 3:
|
|
b, h = unpack('>BH', _read(stream, 3))
|
|
return b * 2 ** 16 + h
|
|
elif size == 4:
|
|
return unpack('>L', _read(stream, 4))[0]
|
|
elif size == 5:
|
|
b, l = unpack('>BL', _read(stream, 5))
|
|
return b * 2 ** 32 + l
|
|
elif size == 6:
|
|
h, l = unpack('>HL', _read(stream, 6))
|
|
return h * 2 ** 32 + l
|
|
elif size == 7:
|
|
b, h, l = unpack('>BHL', _read(stream, 7))
|
|
return b * 2 ** 48 + h * 2 ** 32 + l
|
|
elif size == 8:
|
|
return unpack('>Q', _read(stream, 8))[0]
|
|
else:
|
|
raise SizeError(size)
|
|
|
|
|
|
def read_element_float(stream, size):
|
|
"""Read the Element Data of type :data:`FLOAT`
|
|
|
|
:param stream: file-like object from which to read
|
|
:param int size: size of element's data
|
|
:raise ReadError: when not all the required bytes could be read
|
|
:raise SizeError: if size is incorrect
|
|
:return: the read float
|
|
:rtype: float
|
|
|
|
"""
|
|
if size == 4:
|
|
return unpack('>f', _read(stream, 4))[0]
|
|
elif size == 8:
|
|
return unpack('>d', _read(stream, 8))[0]
|
|
else:
|
|
raise SizeError(size)
|
|
|
|
|
|
def read_element_string(stream, size):
|
|
"""Read the Element Data of type :data:`STRING`
|
|
|
|
:param stream: file-like object from which to read
|
|
:param int size: size of element's data
|
|
:raise ReadError: when not all the required bytes could be read
|
|
:raise SizeError: if size is incorrect
|
|
:return: the read ascii-decoded string
|
|
:rtype: unicode
|
|
|
|
"""
|
|
return _read(stream, size).decode('ascii')
|
|
|
|
|
|
def read_element_unicode(stream, size):
|
|
"""Read the Element Data of type :data:`UNICODE`
|
|
|
|
:param stream: file-like object from which to read
|
|
:param int size: size of element's data
|
|
:raise ReadError: when not all the required bytes could be read
|
|
:raise SizeError: if size is incorrect
|
|
:return: the read utf-8-decoded string
|
|
:rtype: unicode
|
|
|
|
"""
|
|
return _read(stream, size).decode('utf-8')
|
|
|
|
|
|
def read_element_date(stream, size):
|
|
"""Read the Element Data of type :data:`DATE`
|
|
|
|
:param stream: file-like object from which to read
|
|
:param int size: size of element's data
|
|
:raise ReadError: when not all the required bytes could be read
|
|
:raise SizeError: if size is incorrect
|
|
:return: the read date
|
|
:rtype: datetime
|
|
|
|
"""
|
|
if size != 8:
|
|
raise SizeError(size)
|
|
nanoseconds = unpack('>q', _read(stream, 8))[0]
|
|
return datetime(2001, 1, 1, 0, 0, 0, 0, None) + timedelta(microseconds=nanoseconds // 1000)
|
|
|
|
|
|
def read_element_binary(stream, size):
|
|
"""Read the Element Data of type :data:`BINARY`
|
|
|
|
:param stream: file-like object from which to read
|
|
:param int size: size of element's data
|
|
:raise ReadError: when not all the required bytes could be read
|
|
:raise SizeError: if size is incorrect
|
|
:return: raw binary data
|
|
:rtype: bytes
|
|
|
|
"""
|
|
return BytesIO(stream.read(size))
|