mirror of
synced 2025-02-23 02:27:20 -08:00
305 lines
8.2 KiB
305 lines
8.2 KiB
import io
import pickle
import struct
from pathlib import Path
from typing import Union
class FormattedIOBase:
base class for formatting IO-like objects
methods should be implemented:
def get_file_size(self):
c = self.tell()
result = self.seek(0,2)
return result
def fill(self, byte, size : int):
"""fills a byte with length of size"""
c_size = 16384
n_count = size // c_size
m_count = size % c_size
f = bytes([byte]) * c_size
if n_count >= 1:
for _ in range(n_count):
if m_count > 0:
return size
def write_bytes(self, b_bytes):
"""writes compact bytes() object"""
self.write_fmt('Q', len(b_bytes))
def read_bytes(self):
"""reads bytes() object"""
return self.read( self.read_fmt('Q')[0] )
def write_utf8(self, s):
"""write compact string as utf8. Length must be not larger than 4Gb"""
b = s.encode('utf-8')
self.write_fmt('I', len(b))
def read_utf8(self):
"""read string from utf8"""
return self.read(self.read_fmt('I')[0]).decode('utf-8')
def calc_fmt(self, fmt):
"""calc size for _fmt functions"""
return struct.calcsize(fmt)
def write_fmt_at(self, offset, fmt, *args):
write_fmt at offset and return cursor where it was
c = self.tell()
n = self.write_fmt(fmt, *args)
return n
def write_fmt(self, fmt, *args):
write compact data using struct.pack
b = struct.pack(fmt, *args)
n_written = self.write(b)
if n_written != len(b):
raise IOError('Not enough room for write_fmt')
return n_written
def get_fmt(self, fmt):
"""read using struct.unpack without incrementing a cursor"""
c = self.tell()
result = self.read_fmt(fmt)
return result
def read_fmt(self, fmt):
read using struct.unpack
size = struct.calcsize(fmt)
b = self.read(size)
if size != len(b):
raise IOError('Not enough room for read_fmt')
return struct.unpack (fmt, b)
def read_backward_fmt(self, fmt):
read using struct.unpack in backward direction
size = struct.calcsize(fmt)
cursor = self.tell()
new_cursor = self.seek(-size,1)
if size != cursor-new_cursor:
raise IOError('Not enough room for read_backward_fmt')
b = self.read(size)
return struct.unpack (fmt, b)
def write_pickled(self, obj):
write pickled obj
c = self.tell()
self.write_fmt('Q', 0)
pickle.dump(obj, self, 4)
size = self.tell() - c
self.write_fmt_at(c, 'Q', size)
return size
def read_pickled(self, suppress_error=True):
read pickled object
suppress_error(True) ignore unpickling errors
the stream will not be corrupted
c = self.tell()
size, = self.read_fmt('Q')
if suppress_error:
obj = pickle.load(self)
obj = None
obj = pickle.load(self)
return obj
class FormattedFileIO(io.FileIO, FormattedIOBase):
FileIO to use with formatting methods
def __init__(self, file, mode: str, closefd: bool = True, opener = None):
filepath = Path(file)
plus = '+' if '+' in mode else ''
if 'a' in mode:
if filepath.exists():
mode = 'r' + plus
mode = 'w' + plus
io.FileIO.__init__(self, file, mode, closefd=closefd, opener=opener)
def seek(self, offset, whence=0):
seek to offset
whence 0(default) : from begin , offset >= 0
1 : from current , offset any
2 : from end , offset any
returns cursor after seek
# reimplement method to allow to expand with zeros
offset_cur = self.tell()
offset_max = io.FileIO.seek(self, 0,2)
if whence == 1:
offset += offset_cur
elif whence == 2:
offset += offset_max
expand = offset - offset_max
if expand > 0:
# new offset will be more than file size
# expand with zero bytes
self.fill(0x00, expand)
return io.FileIO.seek(self, offset)
def readinto (self, bytes_like : Union[bytearray, memoryview], size : int = 0):
"""read size amount of bytes into mutable bytes_like"""
if size == 0:
return io.FileIO.readinto(self, bytes_like)
return io.FileIO.readinto(self, memoryview(bytes_like).cast('B')[:size] )
def write(self, b: Union[bytes, bytearray, memoryview]):
b_len = len(b)
acc = 0
n_count = b_len // 16384
m_count = b_len % 16384
if n_count > 0:
for i in range(n_count):
acc += io.FileIO.write(self, b[i*16384:(i+1)*16384])
if m_count > 0:
acc += io.FileIO.write(self, b[n_count*16384:])
return acc
class FormattedMemoryViewIO(io.RawIOBase, FormattedIOBase):
"""file-IO-like to memoryview"""
def __init__(self, mv : memoryview):
self._mv = mv
self._mv_size = self._c_max = mv.nbytes
self._c = 0
def seek(self, cursor, whence=0):
seek to offset
whence 0(default) : from begin , offset >= 0
1 : from current , offset any
2 : from end , offset any
returns cursor after seek
# memoryview is not expandable, thus just clip the cursor
if whence == 0:
self._c = min( max(cursor,0), self._mv_size)
self._c_max = max(self._c, self._c_max)
elif whence == 1:
self._c = min( max(self._c + cursor, 0), self._mv_size)
self._c_max = max(self._c, self._c_max)
elif whence == 2:
self._c = min( max(self._c_max + cursor, 0), self._mv_size)
self._c_max = max(self._c, self._c_max)
raise ValueError('whence != 0,1,2')
return self._c
def tell(self):
"""returns current cursor offset"""
return self._c
def truncate(self, size=None):
"""truncate to the current cursor"""
if size is not None:
self._c_max = min(size, self._c_max)
self._c_max = self._c
return self._c_max
def write(self, b : Union[bytes, bytearray, memoryview], size=0):
if isinstance(b, memoryview):
b = b.cast('B')
size = b.nbytes
size = len(b)
size = min(size, self._mv_size - self._c)
self._mv[self._c:self._c+size] = b[:size]
self._c += size
return size
def read_memoryview(self, size) -> memoryview:
size = min(size, self._c_max - self._c)
result = self._mv[self._c:self._c+size]
self._c += size
return result
def read(self, size) -> bytearray:
read bytearray of size
size = min(size, self._c_max - self._c)
result = bytearray(size)
memoryview(result)[:] = self._mv[self._c:self._c+size]
self._c += size
return result
def readinto (self, bytes_like_or_io : Union[bytearray, memoryview, io.IOBase], size):
"""read size amount of bytes into mutable bytes_like or io"""
if isinstance(bytes_like_or_io, io.IOBase):
bytes_like_or_io.write( self._mv[self._c:self._c+size] )
memoryview(bytes_like_or_io).cast('B')[:size] = self._mv[self._c:self._c+size]
self._c += size