mirror of
https://github.com/iperov/DeepFaceLive.git
synced 2024-12-25 07:21:13 -08:00
305 lines
8.2 KiB
Python
305 lines
8.2 KiB
Python
import io
|
|
import pickle
|
|
import struct
|
|
from pathlib import Path
|
|
from typing import Union
|
|
|
|
|
|
class FormattedIOBase:
|
|
"""
|
|
base class for formatting IO-like objects
|
|
|
|
methods should be implemented:
|
|
|
|
write
|
|
read
|
|
seek
|
|
tell
|
|
"""
|
|
def get_file_size(self):
|
|
c = self.tell()
|
|
result = self.seek(0,2)
|
|
self.seek(c)
|
|
return result
|
|
|
|
def fill(self, byte, size : int):
|
|
"""fills a byte with length of size"""
|
|
c_size = 16384
|
|
n_count = size // c_size
|
|
m_count = size % c_size
|
|
f = bytes([byte]) * c_size
|
|
|
|
if n_count >= 1:
|
|
for _ in range(n_count):
|
|
self.write(f)
|
|
if m_count > 0:
|
|
self.write(f[:m_count])
|
|
return size
|
|
|
|
def write_bytes(self, b_bytes):
|
|
"""writes compact bytes() object"""
|
|
self.write_fmt('Q', len(b_bytes))
|
|
self.write(b_bytes)
|
|
|
|
def read_bytes(self):
|
|
"""reads bytes() object"""
|
|
return self.read( self.read_fmt('Q')[0] )
|
|
|
|
def write_utf8(self, s):
|
|
"""write compact string as utf8. Length must be not larger than 4Gb"""
|
|
b = s.encode('utf-8')
|
|
self.write_fmt('I', len(b))
|
|
self.write(b)
|
|
|
|
def read_utf8(self):
|
|
"""read string from utf8"""
|
|
return self.read(self.read_fmt('I')[0]).decode('utf-8')
|
|
|
|
def calc_fmt(self, fmt):
|
|
"""calc size for _fmt functions"""
|
|
return struct.calcsize(fmt)
|
|
|
|
def write_fmt_at(self, offset, fmt, *args):
|
|
"""
|
|
write_fmt at offset and return cursor where it was
|
|
"""
|
|
c = self.tell()
|
|
self.seek(offset,0)
|
|
n = self.write_fmt(fmt, *args)
|
|
self.seek(c)
|
|
return n
|
|
|
|
def write_fmt(self, fmt, *args):
|
|
"""
|
|
write compact data using struct.pack
|
|
"""
|
|
b = struct.pack(fmt, *args)
|
|
n_written = self.write(b)
|
|
if n_written != len(b):
|
|
raise IOError('Not enough room for write_fmt')
|
|
return n_written
|
|
|
|
def get_fmt(self, fmt):
|
|
"""read using struct.unpack without incrementing a cursor"""
|
|
c = self.tell()
|
|
result = self.read_fmt(fmt)
|
|
self.seek(c)
|
|
return result
|
|
|
|
def read_fmt(self, fmt):
|
|
"""
|
|
read using struct.unpack
|
|
"""
|
|
size = struct.calcsize(fmt)
|
|
|
|
b = self.read(size)
|
|
if size != len(b):
|
|
raise IOError('Not enough room for read_fmt')
|
|
|
|
return struct.unpack (fmt, b)
|
|
|
|
def read_backward_fmt(self, fmt):
|
|
"""
|
|
read using struct.unpack in backward direction
|
|
"""
|
|
size = struct.calcsize(fmt)
|
|
|
|
cursor = self.tell()
|
|
new_cursor = self.seek(-size,1)
|
|
|
|
if size != cursor-new_cursor:
|
|
raise IOError('Not enough room for read_backward_fmt')
|
|
|
|
b = self.read(size)
|
|
self.seek(-size,1)
|
|
|
|
return struct.unpack (fmt, b)
|
|
|
|
def write_pickled(self, obj):
|
|
"""
|
|
write pickled obj
|
|
"""
|
|
c = self.tell()
|
|
self.write_fmt('Q', 0)
|
|
|
|
pickle.dump(obj, self, 4)
|
|
size = self.tell() - c
|
|
|
|
self.write_fmt_at(c, 'Q', size)
|
|
return size
|
|
|
|
def read_pickled(self, suppress_error=True):
|
|
"""
|
|
read pickled object
|
|
|
|
suppress_error(True) ignore unpickling errors
|
|
the stream will not be corrupted
|
|
"""
|
|
c = self.tell()
|
|
size, = self.read_fmt('Q')
|
|
|
|
if suppress_error:
|
|
try:
|
|
obj = pickle.load(self)
|
|
except:
|
|
obj = None
|
|
else:
|
|
obj = pickle.load(self)
|
|
|
|
self.seek(c+size)
|
|
|
|
return obj
|
|
|
|
|
|
class FormattedFileIO(io.FileIO, FormattedIOBase):
|
|
"""
|
|
FileIO to use with formatting methods
|
|
"""
|
|
def __init__(self, file, mode: str, closefd: bool = True, opener = None):
|
|
filepath = Path(file)
|
|
plus = '+' if '+' in mode else ''
|
|
if 'a' in mode:
|
|
if filepath.exists():
|
|
mode = 'r' + plus
|
|
else:
|
|
mode = 'w' + plus
|
|
|
|
io.FileIO.__init__(self, file, mode, closefd=closefd, opener=opener)
|
|
|
|
|
|
|
|
def seek(self, offset, whence=0):
|
|
"""
|
|
seek to offset
|
|
|
|
whence 0(default) : from begin , offset >= 0
|
|
1 : from current , offset any
|
|
2 : from end , offset any
|
|
|
|
returns cursor after seek
|
|
"""
|
|
# reimplement method to allow to expand with zeros
|
|
|
|
offset_cur = self.tell()
|
|
offset_max = io.FileIO.seek(self, 0,2)
|
|
|
|
if whence == 1:
|
|
offset += offset_cur
|
|
elif whence == 2:
|
|
offset += offset_max
|
|
|
|
expand = offset - offset_max
|
|
if expand > 0:
|
|
# new offset will be more than file size
|
|
# expand with zero bytes
|
|
self.fill(0x00, expand)
|
|
|
|
return io.FileIO.seek(self, offset)
|
|
|
|
def readinto (self, bytes_like : Union[bytearray, memoryview], size : int = 0):
|
|
"""read size amount of bytes into mutable bytes_like"""
|
|
if size == 0:
|
|
return io.FileIO.readinto(self, bytes_like)
|
|
else:
|
|
return io.FileIO.readinto(self, memoryview(bytes_like).cast('B')[:size] )
|
|
|
|
def write(self, b: Union[bytes, bytearray, memoryview]):
|
|
b_len = len(b)
|
|
acc = 0
|
|
n_count = b_len // 16384
|
|
m_count = b_len % 16384
|
|
if n_count > 0:
|
|
for i in range(n_count):
|
|
acc += io.FileIO.write(self, b[i*16384:(i+1)*16384])
|
|
if m_count > 0:
|
|
acc += io.FileIO.write(self, b[n_count*16384:])
|
|
return acc
|
|
|
|
|
|
|
|
class FormattedMemoryViewIO(io.RawIOBase, FormattedIOBase):
|
|
"""file-IO-like to memoryview"""
|
|
def __init__(self, mv : memoryview):
|
|
super().__init__()
|
|
self._mv = mv
|
|
self._mv_size = self._c_max = mv.nbytes
|
|
self._c = 0
|
|
|
|
def seek(self, cursor, whence=0):
|
|
"""
|
|
seek to offset
|
|
|
|
whence 0(default) : from begin , offset >= 0
|
|
1 : from current , offset any
|
|
2 : from end , offset any
|
|
|
|
returns cursor after seek
|
|
"""
|
|
# memoryview is not expandable, thus just clip the cursor
|
|
if whence == 0:
|
|
self._c = min( max(cursor,0), self._mv_size)
|
|
self._c_max = max(self._c, self._c_max)
|
|
elif whence == 1:
|
|
self._c = min( max(self._c + cursor, 0), self._mv_size)
|
|
self._c_max = max(self._c, self._c_max)
|
|
elif whence == 2:
|
|
self._c = min( max(self._c_max + cursor, 0), self._mv_size)
|
|
self._c_max = max(self._c, self._c_max)
|
|
else:
|
|
raise ValueError('whence != 0,1,2')
|
|
|
|
return self._c
|
|
|
|
def tell(self):
|
|
"""returns current cursor offset"""
|
|
return self._c
|
|
|
|
def truncate(self, size=None):
|
|
"""truncate to the current cursor"""
|
|
if size is not None:
|
|
self._c_max = min(size, self._c_max)
|
|
else:
|
|
self._c_max = self._c
|
|
return self._c_max
|
|
|
|
|
|
def write(self, b : Union[bytes, bytearray, memoryview], size=0):
|
|
if isinstance(b, memoryview):
|
|
b = b.cast('B')
|
|
size = b.nbytes
|
|
else:
|
|
size = len(b)
|
|
|
|
size = min(size, self._mv_size - self._c)
|
|
|
|
self._mv[self._c:self._c+size] = b[:size]
|
|
self._c += size
|
|
return size
|
|
|
|
def read_memoryview(self, size) -> memoryview:
|
|
size = min(size, self._c_max - self._c)
|
|
result = self._mv[self._c:self._c+size]
|
|
self._c += size
|
|
return result
|
|
|
|
def read(self, size) -> bytearray:
|
|
"""
|
|
read bytearray of size
|
|
"""
|
|
size = min(size, self._c_max - self._c)
|
|
|
|
result = bytearray(size)
|
|
|
|
memoryview(result)[:] = self._mv[self._c:self._c+size]
|
|
|
|
self._c += size
|
|
return result
|
|
|
|
def readinto (self, bytes_like_or_io : Union[bytearray, memoryview, io.IOBase], size):
|
|
"""read size amount of bytes into mutable bytes_like or io"""
|
|
if isinstance(bytes_like_or_io, io.IOBase):
|
|
bytes_like_or_io.write( self._mv[self._c:self._c+size] )
|
|
else:
|
|
memoryview(bytes_like_or_io).cast('B')[:size] = self._mv[self._c:self._c+size]
|
|
self._c += size
|