Source code for remerkleable.complex

from typing import NamedTuple, cast, List as PyList, Dict, Any, BinaryIO, Optional, TypeVar, Type, Protocol, \
    runtime_checkable
from types import GeneratorType
from collections.abc import Sequence as ColSequence
from itertools import chain
import io
from remerkleable.core import View, BasicTypeDef, BasicView, OFFSET_BYTE_LENGTH, ViewHook
from remerkleable.basic import uint256, uint8, uint32
from remerkleable.tree import Node, RootNode, subtree_fill_to_length, subtree_fill_to_contents,\
    zero_node, Gindex, PairNode, to_gindex, NavigationError, get_depth
from remerkleable.subtree import SubtreeView

V = TypeVar('V', bound=View)


[docs]def decode_offset(stream: BinaryIO) -> uint32: return cast(uint32, uint32.deserialize(stream, OFFSET_BYTE_LENGTH))
[docs]def encode_offset(stream: BinaryIO, offset: int): return uint32(offset).serialize(stream)
[docs]class ComplexView(SubtreeView):
[docs] def encode_bytes(self) -> bytes: stream = io.BytesIO() self.serialize(stream) stream.seek(0) return stream.read()
[docs] @classmethod def decode_bytes(cls: Type[V], bytez: bytes) -> V: stream = io.BytesIO() stream.write(bytez) stream.seek(0) return cls.deserialize(stream, len(bytez))
[docs]class MonoSubtreeView(ColSequence, ComplexView):
[docs] def length(self) -> int: raise NotImplementedError
[docs] @classmethod def coerce_view(cls: Type[V], v: Any) -> V: return cls(*v)
[docs] @classmethod def element_cls(cls) -> Type[View]: raise NotImplementedError
[docs] @classmethod def item_elem_cls(cls, i: int) -> Type[View]: return cls.element_cls()
[docs] @classmethod def to_chunk_length(cls, elems_length: int) -> int: if cls.is_packed(): elem_type: Type[View] = cls.element_cls() if isinstance(elem_type, BasicTypeDef): elems_per_chunk = 32 // elem_type.type_byte_length() return (elems_length + elems_per_chunk - 1) // elems_per_chunk else: raise Exception("cannot append a packed element that is not a basic type") else: return elems_length
[docs] @classmethod def views_into_chunks(cls, views: PyList[View]) -> PyList[Node]: if cls.is_packed(): elem_type: Type[View] = cls.element_cls() if isinstance(elem_type, BasicTypeDef): return elem_type.pack_views(views) else: raise Exception("cannot append a packed element that is not a basic type") else: return [v.get_backing() for v in views]
[docs] @classmethod def is_valid_count(cls, count: int) -> bool: raise NotImplementedError
def __iter__(self): return iter(self.get(i) for i in range(self.length()))
[docs] def readonly_iter(self): tree_depth = self.tree_depth() length = self.length() backing = self.get_backing() counter = 0 elem_type: Type[View] = self.element_cls() packed = self.is_packed() elems_per_chunk = 0 if packed: elems_per_chunk = 32 // elem_type.type_byte_length() is_byte_vec = issubclass(elem_type, bytes) base_view = None if (packed or is_byte_vec) else elem_type.default(hook=None) def inner_iter(node: Node, depth: int): nonlocal counter nonlocal length if counter >= length: return if depth == 0: if packed: if isinstance(node, RootNode): for i in range(elems_per_chunk): counter += 1 yield cast(BasicTypeDef, elem_type).basic_view_from_backing(node, i % elems_per_chunk) if counter >= length: break else: raise NavigationError(f"chunk {node} for basic element {counter} is not a root node") elif not is_byte_vec: counter += 1 base_view.set_backing(node) yield base_view else: counter += 1 yield elem_type.view_from_backing(node) return depth -= 1 yield from inner_iter(node.get_left(), depth) if counter >= length: return yield from inner_iter(node.get_right(), depth) return inner_iter(backing, tree_depth)
[docs] @classmethod def deserialize(cls: Type[V], stream: BinaryIO, scope: int) -> V: elem_cls = cls.element_cls() if elem_cls.is_fixed_byte_length(): elem_byte_length = elem_cls.type_byte_length() if scope % elem_byte_length != 0: raise Exception(f"scope {scope} does not match element byte length {elem_byte_length} multiple") count = scope // elem_byte_length if not cls.is_valid_count(count): raise Exception(f"count {count} is invalid") return cls(elem_cls.deserialize(stream, elem_byte_length) for _ in range(count)) else: if scope == 0: if not cls.is_valid_count(0): raise Exception("scope cannot be 0, count must not be 0") return cls() first_offset = decode_offset(stream) if first_offset > scope: raise Exception(f"first offset is too big: {first_offset}, scope: {scope}") if first_offset % OFFSET_BYTE_LENGTH != 0: raise Exception(f"first offset {first_offset} is not a multiple of offset length {OFFSET_BYTE_LENGTH}") count = first_offset // OFFSET_BYTE_LENGTH if not cls.is_valid_count(count): raise Exception(f"count {count} is invalid") # count - 1: we already have the first offset offsets = [first_offset] + [decode_offset(stream) for _ in range(count - 1)] + [uint32(scope)] elem_min, elem_max = elem_cls.min_byte_length(), elem_cls.max_byte_length() elems = [] for i in range(count): start, end = offsets[i], offsets[i+1] if end < start: raise Exception(f"offsets[{i}] value {start} is invalid, next offset is {end}") elem_size = end - start if not (elem_min <= elem_size <= elem_max): raise Exception(f"offset[{i}] value {start} is invalid, next offset is {end}," f" implied size is {elem_size}, size bounds: [{elem_min}, {elem_max}]") elems.append(elem_cls.deserialize(stream, elem_size)) return cls(*elems)
[docs] def serialize(self, stream: BinaryIO) -> int: elem_cls = self.__class__.element_cls() if issubclass(elem_cls, uint8): out = bytes(iter(self)) stream.write(out) return len(out) if elem_cls.is_fixed_byte_length(): for v in self.readonly_iter(): v.serialize(stream) return elem_cls.type_byte_length() * self.length() else: temp_dyn_stream = io.BytesIO() offset = OFFSET_BYTE_LENGTH * self.length() # the offsets are part of the fixed-size-bytes prologue for v in self: encode_offset(stream, offset) offset += cast(View, v).serialize(temp_dyn_stream) temp_dyn_stream.seek(0) stream.write(temp_dyn_stream.read(offset)) return offset
[docs] @classmethod def navigate_type(cls, key: Any) -> Type[View]: if key < 0: raise KeyError return cls.element_cls()
[docs] @classmethod def key_to_static_gindex(cls, key: Any) -> Gindex: if key < 0: raise KeyError if cls.is_packed(): elems_per_chunk = 32 // cls.element_cls().type_byte_length() chunk_i = key // elems_per_chunk else: chunk_i = key return to_gindex(chunk_i, cls.tree_depth())
[docs] def navigate_view(self, key: Any) -> View: return self.__getitem__(key)
def __len__(self): return self.length() def __add__(self, other): if issubclass(self.element_cls(), uint8): return bytes(self) + bytes(other) else: return list(chain(self, other)) def __getitem__(self, k): if isinstance(k, slice): start = 0 if k.start is None else k.start end = self.length() if k.stop is None else k.stop return [self.get(i) for i in range(start, end)] else: return self.get(k) def __setitem__(self, k, v): if type(k) == slice: i = 0 if k.start is None else k.start end = self.length() if k.stop is None else k.stop for item in v: self.set(i, item) i += 1 if i != end: raise Exception("failed to do full slice-set, not enough values") else: self.set(k, v)
[docs]class List(MonoSubtreeView): def __new__(cls, *args, backing: Optional[Node] = None, hook: Optional[ViewHook] = None, **kwargs): if backing is not None: if len(args) != 0: raise Exception("cannot have both a backing and elements to init List") return super().__new__(cls, backing=backing, hook=hook, **kwargs) elem_cls = cls.element_cls() vals = list(args) if len(vals) == 1: if isinstance(vals[0], (GeneratorType, list, tuple)): vals = list(vals[0]) if issubclass(elem_cls, uint8): val = vals[0] if isinstance(val, bytes): vals = list(val) if isinstance(val, str): if val[:2] == '0x': val = val[2:] vals = list(bytes.fromhex(val)) if len(vals) > 0: limit = cls.limit() if len(vals) > limit: raise Exception(f"too many list inputs: {len(vals)}, limit is: {limit}") input_views = [] for el in vals: if isinstance(el, View): input_views.append(el) else: input_views.append(elem_cls.coerce_view(el)) input_nodes = cls.views_into_chunks(input_views) contents = subtree_fill_to_contents(input_nodes, cls.contents_depth()) backing = PairNode(contents, uint256(len(input_views)).get_backing()) return super().__new__(cls, backing=backing, hook=hook, **kwargs) def __class_getitem__(cls, params) -> Type["List"]: (element_type, limit) = params contents_depth = 0 packed = False if isinstance(element_type, BasicTypeDef): elems_per_chunk = 32 // element_type.type_byte_length() contents_depth = get_depth((limit + elems_per_chunk - 1) // elems_per_chunk) packed = True else: contents_depth = get_depth(limit) class SpecialListView(List): @classmethod def is_packed(cls) -> bool: return packed @classmethod def contents_depth(cls) -> int: return contents_depth @classmethod def element_cls(cls) -> Type[View]: return element_type @classmethod def limit(cls) -> int: return limit return SpecialListView
[docs] def length(self) -> int: ll_node = super().get_backing().get_right() ll = cast(uint256, uint256.view_from_backing(node=ll_node, hook=None)) return int(ll)
[docs] def value_byte_length(self) -> int: elem_cls = self.__class__.element_cls() if elem_cls.is_fixed_byte_length(): return elem_cls.type_byte_length() * self.length() else: return sum(OFFSET_BYTE_LENGTH + cast(View, el).value_byte_length() for el in iter(self))
[docs] def append(self, v: View): ll = self.length() if ll >= self.__class__.limit(): raise Exception("list is maximum capacity, cannot append") i = ll elem_type: Type[View] = self.__class__.element_cls() if not isinstance(v, elem_type): v = elem_type.coerce_view(v) if self.__class__.is_packed(): next_backing = self.get_backing() if isinstance(elem_type, BasicTypeDef): if not isinstance(v, BasicView): raise Exception("input element is not a basic view") basic_v: BasicView = v elems_per_chunk = 32 // elem_type.type_byte_length() chunk_i = i // elems_per_chunk target: Gindex = to_gindex(chunk_i, self.__class__.tree_depth()) if i % elems_per_chunk == 0: set_last = next_backing.setter(target, expand=True) chunk = zero_node(0) else: set_last = next_backing.setter(target) chunk = next_backing.getter(target) chunk = basic_v.backing_from_base(chunk, i % elems_per_chunk) next_backing = set_last(chunk) else: raise Exception("cannot append a packed element that is not a basic type") else: target: Gindex = to_gindex(i, self.__class__.tree_depth()) set_last = self.get_backing().setter(target, expand=True) next_backing = set_last(v.get_backing()) set_length = next_backing.rebind_right new_length = uint256(ll + 1).get_backing() next_backing = set_length(new_length) self.set_backing(next_backing)
[docs] def pop(self): ll = self.length() if ll == 0: raise Exception("list is empty, cannot pop") i = ll - 1 target: Gindex can_summarize: bool if self.__class__.is_packed(): next_backing = self.get_backing() elem_type: Type[View] = self.__class__.element_cls() if isinstance(elem_type, BasicTypeDef): elems_per_chunk = 32 // elem_type.type_byte_length() chunk_i = i // elems_per_chunk target = to_gindex(chunk_i, self.__class__.tree_depth()) if i % elems_per_chunk == 0: chunk = zero_node(0) else: chunk = next_backing.getter(target) set_last = next_backing.setter(target) chunk = cast(BasicView, elem_type.default(None)).backing_from_base(chunk, i % elems_per_chunk) next_backing = set_last(chunk) can_summarize = (target & 1) == 0 and i % elems_per_chunk == 0 else: raise Exception("cannot pop a packed element that is not a basic type") else: target = to_gindex(i, self.__class__.tree_depth()) set_last = self.get_backing().setter(target) next_backing = set_last(zero_node(0)) can_summarize = (target & 1) == 0 # if possible, summarize if can_summarize: # summarize to the highest node possible. # I.e. the resulting target must be a right-hand, unless it's the only content node. while (target & 1) == 0 and target != 0b10: target >>= 1 summary_fn = next_backing.summarize_into(target) next_backing = summary_fn() set_length = next_backing.rebind_right new_length = uint256(ll - 1).get_backing() next_backing = set_length(new_length) self.set_backing(next_backing)
[docs] def get(self, i: int) -> View: if i < 0 or i >= self.length(): raise IndexError return super().get(i)
[docs] def set(self, i: int, v: View) -> None: if i < 0 or i >= self.length(): raise IndexError super().set(i, v)
def __repr__(self): length: int try: length = self.length() except NavigationError: return f"{self.type_repr()}( *summary root, no length known* )" vals: Dict[int, View] = {} partial = False for i in range(length): try: vals[i] = self.get(i) except NavigationError: partial = True continue if partial: return f"{self.type_repr()}~partial(length={length})" + \ '(' + ', '.join(f"{i}: {repr(v)}" for i, v in vals.items()) + ')' else: return self.type_repr() + \ '(' + ', '.join(repr(v) for v in vals.values()) + ')'
[docs] @classmethod def type_repr(cls) -> str: return f"List[{cls.element_cls().type_repr()}, {cls.limit()}]"
[docs] @classmethod def is_packed(cls) -> bool: raise NotImplementedError
[docs] @classmethod def contents_depth(cls) -> int: raise NotImplementedError
[docs] @classmethod def tree_depth(cls) -> int: return cls.contents_depth() + 1 # 1 extra for length mix-in
[docs] @classmethod def item_elem_cls(cls, i: int) -> Type[V]: return cls.element_cls()
[docs] @classmethod def limit(cls) -> int: raise NotImplementedError
[docs] @classmethod def is_valid_count(cls, count: int) -> bool: return 0 <= count <= cls.limit()
[docs] @classmethod def navigate_type(cls, key: Any) -> Type[View]: if key >= cls.limit(): raise KeyError return super().navigate_type(key)
[docs] @classmethod def key_to_static_gindex(cls, key: Any) -> Gindex: if key >= cls.limit(): raise KeyError return super().key_to_static_gindex(key)
[docs] @classmethod def default_node(cls) -> Node: return PairNode(zero_node(cls.contents_depth()), zero_node(0)) # mix-in 0 as list length
[docs] @classmethod def is_fixed_byte_length(cls) -> bool: return False
[docs] @classmethod def min_byte_length(cls) -> int: return 0
[docs] @classmethod def max_byte_length(cls) -> int: elem_cls = cls.element_cls() bytes_per_elem = elem_cls.max_byte_length() if not elem_cls.is_fixed_byte_length(): bytes_per_elem += OFFSET_BYTE_LENGTH return bytes_per_elem * cls.limit()
[docs]class Vector(MonoSubtreeView): def __new__(cls, *args, backing: Optional[Node] = None, hook: Optional[ViewHook] = None, **kwargs): if backing is not None: if len(args) != 0: raise Exception("cannot have both a backing and elements to init Vector") return super().__new__(cls, backing=backing, hook=hook, **kwargs) elem_cls = cls.element_cls() vals = list(args) if len(vals) == 1: if isinstance(vals[0], (GeneratorType, list, tuple)): vals = list(vals[0]) if issubclass(elem_cls, uint8): val = vals[0] if isinstance(val, bytes): vals = list(val) if isinstance(val, str): if val[:2] == '0x': val = val[2:] vals = list(bytes.fromhex(val)) if len(vals) > 0: vector_length = cls.vector_length() if len(vals) != vector_length: raise Exception(f"invalid inputs length: {len(vals)}, vector length is: {vector_length}") input_views = [] for el in vals: if isinstance(el, View): input_views.append(el) else: input_views.append(elem_cls.coerce_view(el)) input_nodes = cls.views_into_chunks(input_views) backing = subtree_fill_to_contents(input_nodes, cls.tree_depth()) return super().__new__(cls, backing=backing, hook=hook, **kwargs) def __class_getitem__(cls, params) -> Type["Vector"]: (element_view_cls, length) = params tree_depth = 0 packed = False if isinstance(element_view_cls, BasicTypeDef): elems_per_chunk = 32 // element_view_cls.type_byte_length() tree_depth = get_depth((length + elems_per_chunk - 1) // elems_per_chunk) packed = True else: tree_depth = get_depth(length) class SpecialVectorView(Vector): @classmethod def is_packed(cls) -> bool: return packed @classmethod def tree_depth(cls) -> int: return tree_depth @classmethod def element_cls(cls) -> Type[View]: return element_view_cls @classmethod def vector_length(cls) -> int: return length # for fixed-size vectors, pre-compute the size. if element_view_cls.is_fixed_byte_length(): byte_length = element_view_cls.type_byte_length() * length class FixedSpecialVectorView(SpecialVectorView): @classmethod def type_byte_length(cls) -> int: return byte_length @classmethod def min_byte_length(cls) -> int: return byte_length @classmethod def max_byte_length(cls) -> int: return byte_length SpecialVectorView = FixedSpecialVectorView return SpecialVectorView
[docs] def get(self, i: int) -> View: if i < 0 or i >= self.__class__.vector_length(): raise IndexError return super().get(i)
[docs] def set(self, i: int, v: View) -> None: if i < 0 or i >= self.__class__.vector_length(): raise IndexError super().set(i, v)
[docs] def length(self) -> int: return self.__class__.vector_length()
[docs] def value_byte_length(self) -> int: if self.__class__.is_fixed_byte_length(): return self.__class__.type_byte_length() else: return sum(OFFSET_BYTE_LENGTH + cast(View, el).value_byte_length() for el in iter(self))
def __repr__(self): vals: Dict[int, View] = {} length = self.length() partial = False for i in range(length): try: vals[i] = self.get(i) except NavigationError: partial = True continue if partial: return self.type_repr()+"~partial(" + ', '.join(f"{i}: {repr(v)}" for i, v in vals.items()) + ')' else: return self.type_repr()+'(' + ', '.join(repr(v) for v in vals.values()) + ')'
[docs] @classmethod def type_repr(cls) -> str: return f"Vector[{cls.element_cls().type_repr()}, {cls.vector_length()}]"
[docs] @classmethod def vector_length(cls) -> int: raise NotImplementedError
[docs] @classmethod def is_valid_count(cls, count: int) -> bool: return count == cls.vector_length()
[docs] @classmethod def navigate_type(cls, key: Any) -> Type[View]: if key >= cls.vector_length(): raise KeyError return super().navigate_type(key)
[docs] @classmethod def key_to_static_gindex(cls, key: Any) -> Gindex: if key >= cls.vector_length(): raise KeyError return super().key_to_static_gindex(key)
[docs] @classmethod def default_node(cls) -> Node: elem_type: Type[View] = cls.element_cls() length = cls.to_chunk_length(cls.vector_length()) if cls.is_packed(): elem = zero_node(0) else: elem = elem_type.default_node() return subtree_fill_to_length(elem, cls.tree_depth(), length)
[docs] @classmethod def is_fixed_byte_length(cls) -> bool: return cls.element_cls().is_fixed_byte_length() # only if the element type is fixed byte length.
[docs] @classmethod def min_byte_length(cls) -> int: elem_cls = cls.element_cls() bytes_per_elem = elem_cls.min_byte_length() if not elem_cls.is_fixed_byte_length(): bytes_per_elem += OFFSET_BYTE_LENGTH return bytes_per_elem * cls.vector_length()
[docs] @classmethod def max_byte_length(cls) -> int: elem_cls = cls.element_cls() bytes_per_elem = elem_cls.max_byte_length() if not elem_cls.is_fixed_byte_length(): bytes_per_elem += OFFSET_BYTE_LENGTH return bytes_per_elem * cls.vector_length()
Fields = Dict[str, Type[View]]
[docs]class FieldOffset(NamedTuple): key: str typ: Type[View] offset: int
@runtime_checkable class _ContainerLike(Protocol): @classmethod def fields(cls) -> Fields: ...
[docs]class Container(ComplexView): # Container types should declare fields through class annotations. # If none are specified, it will fall back on this (to avoid annotations of super classes), # and error on construction, since empty container types are invalid. _empty_annotations: bool def __new__(cls, *args, backing: Optional[Node] = None, hook: Optional[ViewHook] = None, **kwargs): if backing is not None: if len(args) != 0: raise Exception("cannot have both a backing and elements to init List") return super().__new__(cls, backing=backing, hook=hook, **kwargs) fields = cls.fields() input_nodes = [] for fkey, ftyp in fields.items(): fnode: Node if fkey in kwargs: finput = kwargs.pop(fkey) if isinstance(finput, View): fnode = finput.get_backing() else: fnode = ftyp.coerce_view(finput).get_backing() else: fnode = ftyp.default_node() input_nodes.append(fnode) backing = subtree_fill_to_contents(input_nodes, cls.tree_depth()) return super().__new__(cls, backing=backing, hook=hook, **kwargs)
[docs] @classmethod def coerce_view(cls: Type[V], v: Any) -> V: return cls({fkey: getattr(v, fkey) for fkey in cls.fields().keys()})
[docs] @classmethod def fields(cls) -> Fields: annot = cls.__annotations__ if '_empty_annotations' in annot.keys(): raise Exception("detected fallback empty annotation, cannot have container type without fields") return annot
[docs] @classmethod def is_fixed_byte_length(cls) -> bool: return all(f.is_fixed_byte_length() for f in cls.fields().values())
[docs] @classmethod def type_byte_length(cls) -> int: if cls.is_fixed_byte_length(): return cls.min_byte_length() else: raise Exception("dynamic length container does not have a fixed byte length")
[docs] @classmethod def min_byte_length(cls) -> int: total = 0 for ftyp in cls.fields().values(): if not ftyp.is_fixed_byte_length(): total += OFFSET_BYTE_LENGTH total += ftyp.min_byte_length() return total
[docs] @classmethod def max_byte_length(cls) -> int: total = 0 for ftyp in cls.fields().values(): if not ftyp.is_fixed_byte_length(): total += OFFSET_BYTE_LENGTH total += ftyp.max_byte_length() return total
[docs] @classmethod def is_packed(cls) -> bool: return False
[docs] @classmethod def tree_depth(cls) -> int: return get_depth(len(cls.fields()))
[docs] @classmethod def item_elem_cls(cls, i: int) -> Type[View]: return list(cls.fields().values())[i]
[docs] @classmethod def default_node(cls) -> Node: return subtree_fill_to_contents([field.default_node() for field in cls.fields().values()], cls.tree_depth())
[docs] def value_byte_length(self) -> int: if self.__class__.is_fixed_byte_length(): return self.__class__.type_byte_length() else: total = 0 fields = self.fields() for fkey, ftyp in fields.items(): if ftyp.is_fixed_byte_length(): total += ftyp.type_byte_length() else: total += OFFSET_BYTE_LENGTH total += cast(View, getattr(self, fkey)).value_byte_length() return total
def __getattr__(self, item): if item[0] == '_': return super().__getattribute__(item) else: keys = self.__class__.fields().keys() try: i = list(keys).index(item) except ValueError: raise AttributeError(f"unknown attribute {item}") return super().get(i) def __setattr__(self, key, value): if key[0] == '_': super().__setattr__(key, value) else: keys = self.__class__.fields().keys() try: i = list(keys).index(key) except ValueError: raise AttributeError(f"unknown attribute {key}") super().set(i, value) def _get_field_val_repr(self, fkey: str) -> str: try: return repr(getattr(self, fkey)) except NavigationError: return "*omitted from partial*" def __repr__(self): return f"{self.__class__.__name__}(Container)\n" + '\n'.join( (' ' + fkey + ': ' + ftype.type_repr() + ' = ' + self._get_field_val_repr(fkey)) for fkey, ftype in self.__class__.fields().items()) + '\n'
[docs] @classmethod def type_repr(cls) -> str: return f"{cls.__name__}(Container)\n" + '\n'.join( (' ' + fkey + ': ' + ftype.type_repr()) for fkey, ftype in cls.fields().items()) \ + '\n'
[docs] @classmethod def decode_bytes(cls: Type[V], bytez: bytes) -> V: stream = io.BytesIO() stream.write(bytez) stream.seek(0) return cls.deserialize(stream, len(bytez))
[docs] @classmethod def deserialize(cls: Type[V], stream: BinaryIO, scope: int) -> V: fields = cls.fields() field_values: Dict[str, View] if cls.is_fixed_byte_length(): field_values = {fkey: ftyp.deserialize(stream, ftyp.type_byte_length()) for fkey, ftyp in fields.items()} else: field_values = {} dyn_fields: PyList[FieldOffset] = [] fixed_size = 0 for fkey, ftyp in fields.items(): if ftyp.is_fixed_byte_length(): fsize = ftyp.type_byte_length() field_values[fkey] = ftyp.deserialize(stream, fsize) fixed_size += fsize else: dyn_fields.append(FieldOffset(key=fkey, typ=ftyp, offset=int(decode_offset(stream)))) fixed_size += OFFSET_BYTE_LENGTH if len(dyn_fields) > 0: if dyn_fields[0].offset < fixed_size: raise Exception(f"first offset is smaller than expected fixed size") for i, (fkey, ftyp, foffset) in enumerate(dyn_fields): next_offset = dyn_fields[i + 1].offset if i + 1 < len(dyn_fields) else scope if foffset > next_offset: raise Exception(f"offset {i} is invalid: {foffset} larger than next offset {next_offset}") fsize = next_offset - foffset f_min_size, f_max_size = ftyp.min_byte_length(), ftyp.max_byte_length() if not (f_min_size <= fsize <= f_max_size): raise Exception(f"offset {i} is invalid, size out of bounds: {foffset}, next {next_offset}," f" implied size: {fsize}, size bounds: [{f_min_size}, {f_max_size}]") field_values[fkey] = ftyp.deserialize(stream, fsize) return cls(**field_values)
[docs] def serialize(self, stream: BinaryIO) -> int: fields = self.__class__.fields() is_fixed_size = self.is_fixed_byte_length() temp_dyn_stream = None if is_fixed_size else io.BytesIO() written = sum(map((lambda x: x.type_byte_length() if x.is_fixed_byte_length() else OFFSET_BYTE_LENGTH), fields.values())) for fkey, ftyp in fields.items(): v: View = getattr(self, fkey) if ftyp.is_fixed_byte_length(): v.serialize(stream) else: encode_offset(stream, written) written += v.serialize(temp_dyn_stream) if not is_fixed_size: temp_dyn_stream.seek(0) stream.write(temp_dyn_stream.read(written)) return written
[docs] @classmethod def key_to_static_gindex(cls, key: Any) -> Gindex: fields = cls.fields() try: field_index = list(fields.keys()).index(key) except ValueError: # list.index raises ValueError if the element (a key here) is missing raise KeyError return to_gindex(field_index, cls.tree_depth())
[docs] @classmethod def navigate_type(cls, key: Any) -> Type[View]: return cls.fields()[key]
[docs] def navigate_view(self, key: Any) -> View: return self.__getattr__(key)