mirror of
https://github.com/Tautulli/Tautulli.git
synced 2025-03-12 04:35:40 -07:00
863 lines
26 KiB
Python
863 lines
26 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# Copyright (C) 2005-2006 Joe Wreschnig
|
|
# Copyright (C) 2006-2007 Lukas Lalinsky
|
|
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License version 2 as
|
|
# published by the Free Software Foundation.
|
|
|
|
"""Read and write ASF (Window Media Audio) files."""
|
|
|
|
__all__ = ["ASF", "Open"]
|
|
|
|
import sys
|
|
import struct
|
|
from mutagen import FileType, Metadata, StreamInfo
|
|
from mutagen._util import (insert_bytes, delete_bytes, DictMixin,
|
|
total_ordering, MutagenError)
|
|
from ._compat import swap_to_string, text_type, PY2, string_types, reraise, \
|
|
xrange, long_, PY3
|
|
|
|
|
|
class error(IOError, MutagenError):
|
|
pass
|
|
|
|
|
|
class ASFError(error):
|
|
pass
|
|
|
|
|
|
class ASFHeaderError(error):
|
|
pass
|
|
|
|
|
|
class ASFInfo(StreamInfo):
|
|
"""ASF stream information."""
|
|
|
|
def __init__(self):
|
|
self.length = 0.0
|
|
self.sample_rate = 0
|
|
self.bitrate = 0
|
|
self.channels = 0
|
|
|
|
def pprint(self):
|
|
s = "Windows Media Audio %d bps, %s Hz, %d channels, %.2f seconds" % (
|
|
self.bitrate, self.sample_rate, self.channels, self.length)
|
|
return s
|
|
|
|
|
|
class ASFTags(list, DictMixin, Metadata):
|
|
"""Dictionary containing ASF attributes."""
|
|
|
|
def pprint(self):
|
|
return "\n".join("%s=%s" % (k, v) for k, v in self)
|
|
|
|
def __getitem__(self, key):
|
|
"""A list of values for the key.
|
|
|
|
This is a copy, so comment['title'].append('a title') will not
|
|
work.
|
|
|
|
"""
|
|
|
|
# PY3 only
|
|
if isinstance(key, slice):
|
|
return list.__getitem__(self, key)
|
|
|
|
values = [value for (k, value) in self if k == key]
|
|
if not values:
|
|
raise KeyError(key)
|
|
else:
|
|
return values
|
|
|
|
def __delitem__(self, key):
|
|
"""Delete all values associated with the key."""
|
|
|
|
# PY3 only
|
|
if isinstance(key, slice):
|
|
return list.__delitem__(self, key)
|
|
|
|
to_delete = [x for x in self if x[0] == key]
|
|
if not to_delete:
|
|
raise KeyError(key)
|
|
else:
|
|
for k in to_delete:
|
|
self.remove(k)
|
|
|
|
def __contains__(self, key):
|
|
"""Return true if the key has any values."""
|
|
for k, value in self:
|
|
if k == key:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def __setitem__(self, key, values):
|
|
"""Set a key's value or values.
|
|
|
|
Setting a value overwrites all old ones. The value may be a
|
|
list of Unicode or UTF-8 strings, or a single Unicode or UTF-8
|
|
string.
|
|
|
|
"""
|
|
|
|
# PY3 only
|
|
if isinstance(key, slice):
|
|
return list.__setitem__(self, key, values)
|
|
|
|
if not isinstance(values, list):
|
|
values = [values]
|
|
|
|
to_append = []
|
|
for value in values:
|
|
if not isinstance(value, ASFBaseAttribute):
|
|
if isinstance(value, string_types):
|
|
value = ASFUnicodeAttribute(value)
|
|
elif PY3 and isinstance(value, bytes):
|
|
value = ASFByteArrayAttribute(value)
|
|
elif isinstance(value, bool):
|
|
value = ASFBoolAttribute(value)
|
|
elif isinstance(value, int):
|
|
value = ASFDWordAttribute(value)
|
|
elif isinstance(value, long_):
|
|
value = ASFQWordAttribute(value)
|
|
else:
|
|
raise TypeError("Invalid type %r" % type(value))
|
|
to_append.append((key, value))
|
|
|
|
try:
|
|
del(self[key])
|
|
except KeyError:
|
|
pass
|
|
|
|
self.extend(to_append)
|
|
|
|
def keys(self):
|
|
"""Return all keys in the comment."""
|
|
return self and set(next(iter(zip(*self))))
|
|
|
|
def as_dict(self):
|
|
"""Return a copy of the comment data in a real dict."""
|
|
d = {}
|
|
for key, value in self:
|
|
d.setdefault(key, []).append(value)
|
|
return d
|
|
|
|
|
|
class ASFBaseAttribute(object):
|
|
"""Generic attribute."""
|
|
TYPE = None
|
|
|
|
def __init__(self, value=None, data=None, language=None,
|
|
stream=None, **kwargs):
|
|
self.language = language
|
|
self.stream = stream
|
|
if data:
|
|
self.value = self.parse(data, **kwargs)
|
|
else:
|
|
if value is None:
|
|
# we used to support not passing any args and instead assign
|
|
# them later, keep that working..
|
|
self.value = None
|
|
else:
|
|
self.value = self._validate(value)
|
|
|
|
def _validate(self, value):
|
|
"""Raises TypeError or ValueError in case the user supplied value
|
|
isn't valid.
|
|
"""
|
|
|
|
return value
|
|
|
|
def data_size(self):
|
|
raise NotImplementedError
|
|
|
|
def __repr__(self):
|
|
name = "%s(%r" % (type(self).__name__, self.value)
|
|
if self.language:
|
|
name += ", language=%d" % self.language
|
|
if self.stream:
|
|
name += ", stream=%d" % self.stream
|
|
name += ")"
|
|
return name
|
|
|
|
def render(self, name):
|
|
name = name.encode("utf-16-le") + b"\x00\x00"
|
|
data = self._render()
|
|
return (struct.pack("<H", len(name)) + name +
|
|
struct.pack("<HH", self.TYPE, len(data)) + data)
|
|
|
|
def render_m(self, name):
|
|
name = name.encode("utf-16-le") + b"\x00\x00"
|
|
if self.TYPE == 2:
|
|
data = self._render(dword=False)
|
|
else:
|
|
data = self._render()
|
|
return (struct.pack("<HHHHI", 0, self.stream or 0, len(name),
|
|
self.TYPE, len(data)) + name + data)
|
|
|
|
def render_ml(self, name):
|
|
name = name.encode("utf-16-le") + b"\x00\x00"
|
|
if self.TYPE == 2:
|
|
data = self._render(dword=False)
|
|
else:
|
|
data = self._render()
|
|
|
|
return (struct.pack("<HHHHI", self.language or 0, self.stream or 0,
|
|
len(name), self.TYPE, len(data)) + name + data)
|
|
|
|
|
|
@swap_to_string
|
|
@total_ordering
|
|
class ASFUnicodeAttribute(ASFBaseAttribute):
|
|
"""Unicode string attribute."""
|
|
TYPE = 0x0000
|
|
|
|
def parse(self, data):
|
|
try:
|
|
return data.decode("utf-16-le").strip("\x00")
|
|
except UnicodeDecodeError as e:
|
|
reraise(ASFError, e, sys.exc_info()[2])
|
|
|
|
def _validate(self, value):
|
|
if not isinstance(value, text_type):
|
|
if PY2:
|
|
return value.decode("utf-8")
|
|
else:
|
|
raise TypeError("%r not str" % value)
|
|
return value
|
|
|
|
def _render(self):
|
|
return self.value.encode("utf-16-le") + b"\x00\x00"
|
|
|
|
def data_size(self):
|
|
return len(self._render())
|
|
|
|
def __bytes__(self):
|
|
return self.value.encode("utf-16-le")
|
|
|
|
def __str__(self):
|
|
return self.value
|
|
|
|
def __eq__(self, other):
|
|
return text_type(self) == other
|
|
|
|
def __lt__(self, other):
|
|
return text_type(self) < other
|
|
|
|
__hash__ = ASFBaseAttribute.__hash__
|
|
|
|
|
|
@swap_to_string
|
|
@total_ordering
|
|
class ASFByteArrayAttribute(ASFBaseAttribute):
|
|
"""Byte array attribute."""
|
|
TYPE = 0x0001
|
|
|
|
def parse(self, data):
|
|
assert isinstance(data, bytes)
|
|
return data
|
|
|
|
def _render(self):
|
|
assert isinstance(self.value, bytes)
|
|
return self.value
|
|
|
|
def _validate(self, value):
|
|
if not isinstance(value, bytes):
|
|
raise TypeError("must be bytes/str: %r" % value)
|
|
return value
|
|
|
|
def data_size(self):
|
|
return len(self.value)
|
|
|
|
def __bytes__(self):
|
|
return self.value
|
|
|
|
def __str__(self):
|
|
return "[binary data (%d bytes)]" % len(self.value)
|
|
|
|
def __eq__(self, other):
|
|
return self.value == other
|
|
|
|
def __lt__(self, other):
|
|
return self.value < other
|
|
|
|
__hash__ = ASFBaseAttribute.__hash__
|
|
|
|
|
|
@swap_to_string
|
|
@total_ordering
|
|
class ASFBoolAttribute(ASFBaseAttribute):
|
|
"""Bool attribute."""
|
|
TYPE = 0x0002
|
|
|
|
def parse(self, data, dword=True):
|
|
if dword:
|
|
return struct.unpack("<I", data)[0] == 1
|
|
else:
|
|
return struct.unpack("<H", data)[0] == 1
|
|
|
|
def _render(self, dword=True):
|
|
if dword:
|
|
return struct.pack("<I", bool(self.value))
|
|
else:
|
|
return struct.pack("<H", bool(self.value))
|
|
|
|
def _validate(self, value):
|
|
return bool(value)
|
|
|
|
def data_size(self):
|
|
return 4
|
|
|
|
def __bool__(self):
|
|
return bool(self.value)
|
|
|
|
def __bytes__(self):
|
|
return text_type(self.value).encode('utf-8')
|
|
|
|
def __str__(self):
|
|
return text_type(self.value)
|
|
|
|
def __eq__(self, other):
|
|
return bool(self.value) == other
|
|
|
|
def __lt__(self, other):
|
|
return bool(self.value) < other
|
|
|
|
__hash__ = ASFBaseAttribute.__hash__
|
|
|
|
|
|
@swap_to_string
|
|
@total_ordering
|
|
class ASFDWordAttribute(ASFBaseAttribute):
|
|
"""DWORD attribute."""
|
|
TYPE = 0x0003
|
|
|
|
def parse(self, data):
|
|
return struct.unpack("<L", data)[0]
|
|
|
|
def _render(self):
|
|
return struct.pack("<L", self.value)
|
|
|
|
def _validate(self, value):
|
|
value = int(value)
|
|
if not 0 <= value <= 2 ** 32 - 1:
|
|
raise ValueError("Out of range")
|
|
return value
|
|
|
|
def data_size(self):
|
|
return 4
|
|
|
|
def __int__(self):
|
|
return self.value
|
|
|
|
def __bytes__(self):
|
|
return text_type(self.value).encode('utf-8')
|
|
|
|
def __str__(self):
|
|
return text_type(self.value)
|
|
|
|
def __eq__(self, other):
|
|
return int(self.value) == other
|
|
|
|
def __lt__(self, other):
|
|
return int(self.value) < other
|
|
|
|
__hash__ = ASFBaseAttribute.__hash__
|
|
|
|
|
|
@swap_to_string
|
|
@total_ordering
|
|
class ASFQWordAttribute(ASFBaseAttribute):
|
|
"""QWORD attribute."""
|
|
TYPE = 0x0004
|
|
|
|
def parse(self, data):
|
|
return struct.unpack("<Q", data)[0]
|
|
|
|
def _render(self):
|
|
return struct.pack("<Q", self.value)
|
|
|
|
def _validate(self, value):
|
|
value = int(value)
|
|
if not 0 <= value <= 2 ** 64 - 1:
|
|
raise ValueError("Out of range")
|
|
return value
|
|
|
|
def data_size(self):
|
|
return 8
|
|
|
|
def __int__(self):
|
|
return self.value
|
|
|
|
def __bytes__(self):
|
|
return text_type(self.value).encode('utf-8')
|
|
|
|
def __str__(self):
|
|
return text_type(self.value)
|
|
|
|
def __eq__(self, other):
|
|
return int(self.value) == other
|
|
|
|
def __lt__(self, other):
|
|
return int(self.value) < other
|
|
|
|
__hash__ = ASFBaseAttribute.__hash__
|
|
|
|
|
|
@swap_to_string
|
|
@total_ordering
|
|
class ASFWordAttribute(ASFBaseAttribute):
|
|
"""WORD attribute."""
|
|
TYPE = 0x0005
|
|
|
|
def parse(self, data):
|
|
return struct.unpack("<H", data)[0]
|
|
|
|
def _render(self):
|
|
return struct.pack("<H", self.value)
|
|
|
|
def _validate(self, value):
|
|
value = int(value)
|
|
if not 0 <= value <= 2 ** 16 - 1:
|
|
raise ValueError("Out of range")
|
|
return value
|
|
|
|
def data_size(self):
|
|
return 2
|
|
|
|
def __int__(self):
|
|
return self.value
|
|
|
|
def __bytes__(self):
|
|
return text_type(self.value).encode('utf-8')
|
|
|
|
def __str__(self):
|
|
return text_type(self.value)
|
|
|
|
def __eq__(self, other):
|
|
return int(self.value) == other
|
|
|
|
def __lt__(self, other):
|
|
return int(self.value) < other
|
|
|
|
__hash__ = ASFBaseAttribute.__hash__
|
|
|
|
|
|
@swap_to_string
|
|
@total_ordering
|
|
class ASFGUIDAttribute(ASFBaseAttribute):
|
|
"""GUID attribute."""
|
|
TYPE = 0x0006
|
|
|
|
def parse(self, data):
|
|
assert isinstance(data, bytes)
|
|
return data
|
|
|
|
def _render(self):
|
|
assert isinstance(self.value, bytes)
|
|
return self.value
|
|
|
|
def _validate(self, value):
|
|
if not isinstance(value, bytes):
|
|
raise TypeError("must be bytes/str: %r" % value)
|
|
return value
|
|
|
|
def data_size(self):
|
|
return len(self.value)
|
|
|
|
def __bytes__(self):
|
|
return self.value
|
|
|
|
def __str__(self):
|
|
return repr(self.value)
|
|
|
|
def __eq__(self, other):
|
|
return self.value == other
|
|
|
|
def __lt__(self, other):
|
|
return self.value < other
|
|
|
|
__hash__ = ASFBaseAttribute.__hash__
|
|
|
|
|
|
UNICODE = ASFUnicodeAttribute.TYPE
|
|
BYTEARRAY = ASFByteArrayAttribute.TYPE
|
|
BOOL = ASFBoolAttribute.TYPE
|
|
DWORD = ASFDWordAttribute.TYPE
|
|
QWORD = ASFQWordAttribute.TYPE
|
|
WORD = ASFWordAttribute.TYPE
|
|
GUID = ASFGUIDAttribute.TYPE
|
|
|
|
|
|
def ASFValue(value, kind, **kwargs):
|
|
try:
|
|
attr_type = _attribute_types[kind]
|
|
except KeyError:
|
|
raise ValueError("Unknown value type %r" % kind)
|
|
else:
|
|
return attr_type(value=value, **kwargs)
|
|
|
|
|
|
_attribute_types = {
|
|
ASFUnicodeAttribute.TYPE: ASFUnicodeAttribute,
|
|
ASFByteArrayAttribute.TYPE: ASFByteArrayAttribute,
|
|
ASFBoolAttribute.TYPE: ASFBoolAttribute,
|
|
ASFDWordAttribute.TYPE: ASFDWordAttribute,
|
|
ASFQWordAttribute.TYPE: ASFQWordAttribute,
|
|
ASFWordAttribute.TYPE: ASFWordAttribute,
|
|
ASFGUIDAttribute.TYPE: ASFGUIDAttribute,
|
|
}
|
|
|
|
|
|
class BaseObject(object):
|
|
"""Base ASF object."""
|
|
GUID = None
|
|
|
|
def parse(self, asf, data, fileobj, size):
|
|
self.data = data
|
|
|
|
def render(self, asf):
|
|
data = self.GUID + struct.pack("<Q", len(self.data) + 24) + self.data
|
|
return data
|
|
|
|
|
|
class UnknownObject(BaseObject):
|
|
"""Unknown ASF object."""
|
|
def __init__(self, guid):
|
|
assert isinstance(guid, bytes)
|
|
self.GUID = guid
|
|
|
|
|
|
class HeaderObject(object):
|
|
"""ASF header."""
|
|
GUID = b"\x30\x26\xB2\x75\x8E\x66\xCF\x11\xA6\xD9\x00\xAA\x00\x62\xCE\x6C"
|
|
|
|
|
|
class ContentDescriptionObject(BaseObject):
|
|
"""Content description."""
|
|
GUID = b"\x33\x26\xB2\x75\x8E\x66\xCF\x11\xA6\xD9\x00\xAA\x00\x62\xCE\x6C"
|
|
|
|
NAMES = [
|
|
u"Title",
|
|
u"Author",
|
|
u"Copyright",
|
|
u"Description",
|
|
u"Rating",
|
|
]
|
|
|
|
def parse(self, asf, data, fileobj, size):
|
|
super(ContentDescriptionObject, self).parse(asf, data, fileobj, size)
|
|
asf.content_description_obj = self
|
|
lengths = struct.unpack("<HHHHH", data[:10])
|
|
texts = []
|
|
pos = 10
|
|
for length in lengths:
|
|
end = pos + length
|
|
if length > 0:
|
|
texts.append(data[pos:end].decode("utf-16-le").strip(u"\x00"))
|
|
else:
|
|
texts.append(None)
|
|
pos = end
|
|
|
|
for key, value in zip(self.NAMES, texts):
|
|
if value is not None:
|
|
value = ASFUnicodeAttribute(value=value)
|
|
asf._tags.setdefault(self.GUID, []).append((key, value))
|
|
|
|
def render(self, asf):
|
|
def render_text(name):
|
|
value = asf.to_content_description.get(name)
|
|
if value is not None:
|
|
return text_type(value).encode("utf-16-le") + b"\x00\x00"
|
|
else:
|
|
return b""
|
|
|
|
texts = [render_text(x) for x in self.NAMES]
|
|
data = struct.pack("<HHHHH", *map(len, texts)) + b"".join(texts)
|
|
return self.GUID + struct.pack("<Q", 24 + len(data)) + data
|
|
|
|
|
|
class ExtendedContentDescriptionObject(BaseObject):
|
|
"""Extended content description."""
|
|
GUID = b"\x40\xA4\xD0\xD2\x07\xE3\xD2\x11\x97\xF0\x00\xA0\xC9\x5E\xA8\x50"
|
|
|
|
def parse(self, asf, data, fileobj, size):
|
|
super(ExtendedContentDescriptionObject, self).parse(
|
|
asf, data, fileobj, size)
|
|
asf.extended_content_description_obj = self
|
|
num_attributes, = struct.unpack("<H", data[0:2])
|
|
pos = 2
|
|
for i in xrange(num_attributes):
|
|
name_length, = struct.unpack("<H", data[pos:pos + 2])
|
|
pos += 2
|
|
name = data[pos:pos + name_length]
|
|
name = name.decode("utf-16-le").strip("\x00")
|
|
pos += name_length
|
|
value_type, value_length = struct.unpack("<HH", data[pos:pos + 4])
|
|
pos += 4
|
|
value = data[pos:pos + value_length]
|
|
pos += value_length
|
|
attr = _attribute_types[value_type](data=value)
|
|
asf._tags.setdefault(self.GUID, []).append((name, attr))
|
|
|
|
def render(self, asf):
|
|
attrs = asf.to_extended_content_description.items()
|
|
data = b"".join(attr.render(name) for (name, attr) in attrs)
|
|
data = struct.pack("<QH", 26 + len(data), len(attrs)) + data
|
|
return self.GUID + data
|
|
|
|
|
|
class FilePropertiesObject(BaseObject):
|
|
"""File properties."""
|
|
GUID = b"\xA1\xDC\xAB\x8C\x47\xA9\xCF\x11\x8E\xE4\x00\xC0\x0C\x20\x53\x65"
|
|
|
|
def parse(self, asf, data, fileobj, size):
|
|
super(FilePropertiesObject, self).parse(asf, data, fileobj, size)
|
|
length, _, preroll = struct.unpack("<QQQ", data[40:64])
|
|
asf.info.length = (length / 10000000.0) - (preroll / 1000.0)
|
|
|
|
|
|
class StreamPropertiesObject(BaseObject):
|
|
"""Stream properties."""
|
|
GUID = b"\x91\x07\xDC\xB7\xB7\xA9\xCF\x11\x8E\xE6\x00\xC0\x0C\x20\x53\x65"
|
|
|
|
def parse(self, asf, data, fileobj, size):
|
|
super(StreamPropertiesObject, self).parse(asf, data, fileobj, size)
|
|
channels, sample_rate, bitrate = struct.unpack("<HII", data[56:66])
|
|
asf.info.channels = channels
|
|
asf.info.sample_rate = sample_rate
|
|
asf.info.bitrate = bitrate * 8
|
|
|
|
|
|
class HeaderExtensionObject(BaseObject):
|
|
"""Header extension."""
|
|
GUID = b"\xb5\x03\xbf_.\xa9\xcf\x11\x8e\xe3\x00\xc0\x0c Se"
|
|
|
|
def parse(self, asf, data, fileobj, size):
|
|
super(HeaderExtensionObject, self).parse(asf, data, fileobj, size)
|
|
asf.header_extension_obj = self
|
|
datasize, = struct.unpack("<I", data[18:22])
|
|
datapos = 0
|
|
self.objects = []
|
|
while datapos < datasize:
|
|
guid, size = struct.unpack(
|
|
"<16sQ", data[22 + datapos:22 + datapos + 24])
|
|
if guid in _object_types:
|
|
obj = _object_types[guid]()
|
|
else:
|
|
obj = UnknownObject(guid)
|
|
obj.parse(asf, data[22 + datapos + 24:22 + datapos + size],
|
|
fileobj, size)
|
|
self.objects.append(obj)
|
|
datapos += size
|
|
|
|
def render(self, asf):
|
|
data = b"".join(obj.render(asf) for obj in self.objects)
|
|
return (self.GUID + struct.pack("<Q", 24 + 16 + 6 + len(data)) +
|
|
b"\x11\xD2\xD3\xAB\xBA\xA9\xcf\x11" +
|
|
b"\x8E\xE6\x00\xC0\x0C\x20\x53\x65" +
|
|
b"\x06\x00" + struct.pack("<I", len(data)) + data)
|
|
|
|
|
|
class MetadataObject(BaseObject):
|
|
"""Metadata description."""
|
|
GUID = b"\xea\xcb\xf8\xc5\xaf[wH\x84g\xaa\x8cD\xfaL\xca"
|
|
|
|
def parse(self, asf, data, fileobj, size):
|
|
super(MetadataObject, self).parse(asf, data, fileobj, size)
|
|
asf.metadata_obj = self
|
|
num_attributes, = struct.unpack("<H", data[0:2])
|
|
pos = 2
|
|
for i in xrange(num_attributes):
|
|
(reserved, stream, name_length, value_type,
|
|
value_length) = struct.unpack("<HHHHI", data[pos:pos + 12])
|
|
pos += 12
|
|
name = data[pos:pos + name_length]
|
|
name = name.decode("utf-16-le").strip("\x00")
|
|
pos += name_length
|
|
value = data[pos:pos + value_length]
|
|
pos += value_length
|
|
args = {'data': value, 'stream': stream}
|
|
if value_type == 2:
|
|
args['dword'] = False
|
|
attr = _attribute_types[value_type](**args)
|
|
asf._tags.setdefault(self.GUID, []).append((name, attr))
|
|
|
|
def render(self, asf):
|
|
attrs = asf.to_metadata.items()
|
|
data = b"".join([attr.render_m(name) for (name, attr) in attrs])
|
|
return (self.GUID + struct.pack("<QH", 26 + len(data), len(attrs)) +
|
|
data)
|
|
|
|
|
|
class MetadataLibraryObject(BaseObject):
|
|
"""Metadata library description."""
|
|
GUID = b"\x94\x1c#D\x98\x94\xd1I\xa1A\x1d\x13NEpT"
|
|
|
|
def parse(self, asf, data, fileobj, size):
|
|
super(MetadataLibraryObject, self).parse(asf, data, fileobj, size)
|
|
asf.metadata_library_obj = self
|
|
num_attributes, = struct.unpack("<H", data[0:2])
|
|
pos = 2
|
|
for i in xrange(num_attributes):
|
|
(language, stream, name_length, value_type,
|
|
value_length) = struct.unpack("<HHHHI", data[pos:pos + 12])
|
|
pos += 12
|
|
name = data[pos:pos + name_length]
|
|
name = name.decode("utf-16-le").strip("\x00")
|
|
pos += name_length
|
|
value = data[pos:pos + value_length]
|
|
pos += value_length
|
|
args = {'data': value, 'language': language, 'stream': stream}
|
|
if value_type == 2:
|
|
args['dword'] = False
|
|
attr = _attribute_types[value_type](**args)
|
|
asf._tags.setdefault(self.GUID, []).append((name, attr))
|
|
|
|
def render(self, asf):
|
|
attrs = asf.to_metadata_library
|
|
data = b"".join([attr.render_ml(name) for (name, attr) in attrs])
|
|
return (self.GUID + struct.pack("<QH", 26 + len(data), len(attrs)) +
|
|
data)
|
|
|
|
|
|
_object_types = {
|
|
ExtendedContentDescriptionObject.GUID: ExtendedContentDescriptionObject,
|
|
ContentDescriptionObject.GUID: ContentDescriptionObject,
|
|
FilePropertiesObject.GUID: FilePropertiesObject,
|
|
StreamPropertiesObject.GUID: StreamPropertiesObject,
|
|
HeaderExtensionObject.GUID: HeaderExtensionObject,
|
|
MetadataLibraryObject.GUID: MetadataLibraryObject,
|
|
MetadataObject.GUID: MetadataObject,
|
|
}
|
|
|
|
|
|
class ASF(FileType):
|
|
"""An ASF file, probably containing WMA or WMV."""
|
|
|
|
_mimes = ["audio/x-ms-wma", "audio/x-ms-wmv", "video/x-ms-asf",
|
|
"audio/x-wma", "video/x-wmv"]
|
|
|
|
def load(self, filename):
|
|
self.filename = filename
|
|
with open(filename, "rb") as fileobj:
|
|
self.size = 0
|
|
self.size1 = 0
|
|
self.size2 = 0
|
|
self.offset1 = 0
|
|
self.offset2 = 0
|
|
self.num_objects = 0
|
|
self.info = ASFInfo()
|
|
self.tags = ASFTags()
|
|
self.__read_file(fileobj)
|
|
|
|
def save(self):
|
|
# Move attributes to the right objects
|
|
self.to_content_description = {}
|
|
self.to_extended_content_description = {}
|
|
self.to_metadata = {}
|
|
self.to_metadata_library = []
|
|
for name, value in self.tags:
|
|
library_only = (value.data_size() > 0xFFFF or value.TYPE == GUID)
|
|
can_cont_desc = value.TYPE == UNICODE
|
|
|
|
if library_only or value.language is not None:
|
|
self.to_metadata_library.append((name, value))
|
|
elif value.stream is not None:
|
|
if name not in self.to_metadata:
|
|
self.to_metadata[name] = value
|
|
else:
|
|
self.to_metadata_library.append((name, value))
|
|
elif name in ContentDescriptionObject.NAMES:
|
|
if name not in self.to_content_description and can_cont_desc:
|
|
self.to_content_description[name] = value
|
|
else:
|
|
self.to_metadata_library.append((name, value))
|
|
else:
|
|
if name not in self.to_extended_content_description:
|
|
self.to_extended_content_description[name] = value
|
|
else:
|
|
self.to_metadata_library.append((name, value))
|
|
|
|
# Add missing objects
|
|
if not self.content_description_obj:
|
|
self.content_description_obj = \
|
|
ContentDescriptionObject()
|
|
self.objects.append(self.content_description_obj)
|
|
if not self.extended_content_description_obj:
|
|
self.extended_content_description_obj = \
|
|
ExtendedContentDescriptionObject()
|
|
self.objects.append(self.extended_content_description_obj)
|
|
if not self.header_extension_obj:
|
|
self.header_extension_obj = \
|
|
HeaderExtensionObject()
|
|
self.objects.append(self.header_extension_obj)
|
|
if not self.metadata_obj:
|
|
self.metadata_obj = \
|
|
MetadataObject()
|
|
self.header_extension_obj.objects.append(self.metadata_obj)
|
|
if not self.metadata_library_obj:
|
|
self.metadata_library_obj = \
|
|
MetadataLibraryObject()
|
|
self.header_extension_obj.objects.append(self.metadata_library_obj)
|
|
|
|
# Render the header
|
|
data = b"".join([obj.render(self) for obj in self.objects])
|
|
data = (HeaderObject.GUID +
|
|
struct.pack("<QL", len(data) + 30, len(self.objects)) +
|
|
b"\x01\x02" + data)
|
|
|
|
with open(self.filename, "rb+") as fileobj:
|
|
size = len(data)
|
|
if size > self.size:
|
|
insert_bytes(fileobj, size - self.size, self.size)
|
|
if size < self.size:
|
|
delete_bytes(fileobj, self.size - size, 0)
|
|
fileobj.seek(0)
|
|
fileobj.write(data)
|
|
|
|
self.size = size
|
|
self.num_objects = len(self.objects)
|
|
|
|
def __read_file(self, fileobj):
|
|
header = fileobj.read(30)
|
|
if len(header) != 30 or header[:16] != HeaderObject.GUID:
|
|
raise ASFHeaderError("Not an ASF file.")
|
|
|
|
self.extended_content_description_obj = None
|
|
self.content_description_obj = None
|
|
self.header_extension_obj = None
|
|
self.metadata_obj = None
|
|
self.metadata_library_obj = None
|
|
|
|
self.size, self.num_objects = struct.unpack("<QL", header[16:28])
|
|
self.objects = []
|
|
self._tags = {}
|
|
for i in xrange(self.num_objects):
|
|
self.__read_object(fileobj)
|
|
|
|
for guid in [ContentDescriptionObject.GUID,
|
|
ExtendedContentDescriptionObject.GUID, MetadataObject.GUID,
|
|
MetadataLibraryObject.GUID]:
|
|
self.tags.extend(self._tags.pop(guid, []))
|
|
assert not self._tags
|
|
|
|
def __read_object(self, fileobj):
|
|
guid, size = struct.unpack("<16sQ", fileobj.read(24))
|
|
if guid in _object_types:
|
|
obj = _object_types[guid]()
|
|
else:
|
|
obj = UnknownObject(guid)
|
|
data = fileobj.read(size - 24)
|
|
obj.parse(self, data, fileobj, size)
|
|
self.objects.append(obj)
|
|
|
|
@staticmethod
|
|
def score(filename, fileobj, header):
|
|
return header.startswith(HeaderObject.GUID) * 2
|
|
|
|
Open = ASF
|