pycsla-binary/csla_binary/binary_reader.py
2025-04-21 16:55:35 +10:00

193 lines
6.1 KiB
Python

# pycsla-binary: Python implementation of CSLA .NET binary serialisation
# Copyright (C) 2025 Lee Yingtong Li (RunasSudo)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import io
import struct
from .serialization_info import ChildData, FieldData, SerializationInfo
class CslaBinaryReader:
"""Reads binary-serialised CSLA data into SerializationInfo objects"""
def __init__(self, stream: io.BytesIO):
self.stream = stream
self.keywords_dictionary = {}
def read(self):
# CslaBinaryReader.Read
total_count = self.read_int32()
serialisation_infos = []
for _ in range(total_count):
info = SerializationInfo()
# Read ReferenceID
info.reference_id = self.read_int32()
# Read TypeName
info.type_name = self.read_object() # string
if not isinstance(info.type_name, str):
raise ValueError('Expected string, got {}'.format(type(info.type_name).__name__))
# Read children
child_count = self.read_int32()
for _ in range(child_count):
system_name = self.read_object() # string
if not isinstance(system_name, str):
raise ValueError('Expected string, got {}'.format(type(system_name).__name__))
is_dirty = self.read_object() # bool
if not isinstance(is_dirty, bool):
raise ValueError('Expected bool, got {}'.format(type(is_dirty).__name__))
reference_id = self.read_object() # int
if not isinstance(reference_id, int):
raise ValueError('Expected int, got {}'.format(type(reference_id).__name__))
info.children.append(ChildData(system_name, is_dirty, reference_id))
# Read field values
value_count = self.read_int32()
for _ in range(value_count):
system_name = self.read_object() # string
if not isinstance(system_name, str):
raise ValueError('Expected string, got {}'.format(type(system_name).__name__))
enum_type_name = self.read_object() # string
if not isinstance(enum_type_name, str):
raise ValueError('Expected string, got {}'.format(type(enum_type_name).__name__))
is_dirty = self.read_object() # bool
if not isinstance(is_dirty, bool):
raise ValueError('Expected bool, got {}'.format(type(is_dirty).__name__))
value = self.read_object()
info.values.append(FieldData(system_name, enum_type_name or None, is_dirty, value))
serialisation_infos.append(info)
return serialisation_infos
def read_7bit_encoded_int(self):
# BinaryReader.Read7BitEncodedInt
# "The integer of the value parameter is written out seven bits at a time, starting with the seven least-significant bits. The high bit of a byte indicates whether there are more bytes to be written after this one."
result = 0
shift = 0
while True:
byte = self.stream.read(1)[0]
byte_7lsb = byte & 0b01111111
byte_1msb = byte >> 7
result |= byte_7lsb << shift
if not byte_1msb:
# MSB is not set - return now
return result
shift += 7
def read_int32(self):
# BinaryReader.ReadInt32
return struct.unpack('<i', self.stream.read(4))[0]
def read_object(self):
# CslaBinaryReader.ReadObject
known_type = self.stream.read(1)[0]
if known_type == 1: # CslaKnownTypes.Boolean
raise NotImplementedError()
if known_type == 2: # CslaKnownTypes.Char
raise NotImplementedError()
if known_type == 3: # CslaKnownTypes.SByte
raise NotImplementedError()
if known_type == 4: # CslaKnownTypes.Byte
raise NotImplementedError()
if known_type == 5: # CslaKnownTypes.Int16
raise NotImplementedError()
if known_type == 6: # CslaKnownTypes.UInt16
raise NotImplementedError()
if known_type == 7: # CslaKnownTypes.Int32
raise NotImplementedError()
if known_type == 8: # CslaKnownTypes.UInt32
raise NotImplementedError()
if known_type == 9: # CslaKnownTypes.Int64
raise NotImplementedError()
if known_type == 10: # CslaKnownTypes.UInt64
raise NotImplementedError()
if known_type == 11: # CslaKnownTypes.Single
raise NotImplementedError()
if known_type == 12: # CslaKnownTypes.Double
raise NotImplementedError()
if known_type == 13: # CslaKnownTypes.Decimal
raise NotImplementedError()
if known_type == 14: # CslaKnownTypes.DateTime
raise NotImplementedError()
if known_type == 15: # CslaKnownTypes.String
return self.read_string()
if known_type == 16: # CslaKnownTypes.TimeSpan
raise NotImplementedError()
if known_type == 17: # CslaKnownTypes.DateTimeOffset
raise NotImplementedError()
if known_type == 18: # CslaKnownTypes.Guid
raise NotImplementedError()
if known_type == 19: # CslaKnownTypes.ByteArray
raise NotImplementedError()
if known_type == 20: # CslaKnownTypes.CharArray
raise NotImplementedError()
if known_type == 21: # CslaKnownTypes.ListOfInt
raise NotImplementedError()
if known_type == 22: # CslaKnownTypes.Null
raise NotImplementedError()
if known_type == 23: # CslaKnownTypes.StringWithDictionaryKey
system_string = self.read_string()
dictionary_key = self.read_int32()
self.keywords_dictionary[dictionary_key] = system_string
return system_string
if known_type == 24: # CslaKnownTypes.StringDictionaryKey
raise NotImplementedError()
raise ValueError('Unexpected object tag {}'.format(known_type))
def read_string(self):
# BinaryReader.ReadString - "The string is prefixed with the length, encoded as an integer seven bits at a time."
length = self.read_7bit_encoded_int()
return self.stream.read(length).decode('utf-8')