pycsla-binary/csla_binary/binary_writer.py

164 lines
5.4 KiB
Python

# pycsla-binary: Python implementation of CSLA .NET binary serialisation
# Copyright (C) 2025 Lee Yingtong Li (RunasSudo)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from decimal import Decimal
import io
import struct
from typing import List
from .decimal import decimal_to_int32
from .known_types import CslaKnownTypes
from .serialization_info import SerializationInfo
class CslaBinaryWriter:
"""Writes SerializationInfo objects into binary-serialised CSLA data"""
@classmethod
def write_to_bytes(cls, serialisation_infos: List[SerializationInfo]) -> bytes:
stream = io.BytesIO()
writer = cls(stream)
writer.write(serialisation_infos)
return stream.getvalue()
# --------------
# Implementation
def __init__(self, stream: io.BufferedIOBase):
self.stream = stream
self.keywords_dictionary = []
def write(self, serialisation_infos: List[SerializationInfo]):
# Reverse of CslaBinaryReader.Read
self.write_int32(len(serialisation_infos))
for info in serialisation_infos:
# Write ReferenceID
self.write_int32(info.reference_id)
# Write TypeName
self.write_object_system_string(info.type_name)
# Write children
self.write_int32(len(info.children))
for child in info.children:
self.write_object_system_string(child.name)
self.write_object_bool(child.is_dirty)
self.write_object_int32(child.reference_id)
# Write field values
self.write_int32(len(info.values))
for value in info.values:
self.write_object_system_string(value.name)
self.write_object_system_string(value.enum_type_name or '')
self.write_object_bool(value.is_dirty)
self.write_object(value.value)
def write_7bit_encoded_int(self, value):
# BinaryWriter.Write7BitEncodedInt
# "The integer of the value parameter is written out seven bits at a time, starting with the seven least-significant bits. The high bit of a byte indicates whether there are more bytes to be written after this one."
while True:
value_7lsb = value & 0b01111111
value >>= 7
if value == 0:
# Final byte
self.stream.write(bytes([value_7lsb]))
return
else:
# Further bytes remaining
self.stream.write(bytes([value_7lsb | 0b10000000]))
def write_int32(self, value):
# BinaryWriter.WriteInt32
self.stream.write(struct.pack('<i', value))
def write_string(self, value):
# BinaryWriter.WriteString - "The string is prefixed with the length, encoded as an integer seven bits at a time."
encoded_string = value.encode('utf-8')
self.write_7bit_encoded_int(len(encoded_string))
self.stream.write(encoded_string)
def write_object(self, value):
# CslaBinaryWriter.Write(...)
if value is None:
return self.write_object_null()
if isinstance(value, bool):
return self.write_object_bool(value)
if isinstance(value, int):
return self.write_object_int32(value)
if isinstance(value, Decimal):
return self.write_object_decimal(value)
if isinstance(value, str):
return self.write_object_string(value)
if isinstance(value, bytes):
return self.write_object_bytearray(value)
raise NotImplementedError('CslaBinaryWriter.Write not implemented for type {}'.format(type(value).__name__))
def write_object_bool(self, value):
# CslaBinaryWriter.Write(bool)
self.stream.write(bytes([CslaKnownTypes.Boolean.value, 1 if value else 0]))
def write_object_bytearray(self, value):
# CslaBinaryWriter.Write(byte[])
self.stream.write(bytes([CslaKnownTypes.ByteArray.value]))
self.write_int32(len(value))
self.stream.write(value)
def write_object_decimal(self, value):
# CslaBinaryWriter.Write(Decimal)
self.stream.write(bytes([CslaKnownTypes.Decimal.value]))
int32_repr = decimal_to_int32(value)
self.write_int32(len(int32_repr)) # Should be 4 always
for part in int32_repr:
self.write_int32(part)
def write_object_int32(self, value):
# CslaBinaryWriter.Write(int)
self.stream.write(bytes([CslaKnownTypes.Int32.value]))
self.write_int32(value)
def write_object_null(self):
self.stream.write(bytes([CslaKnownTypes.Null.value]))
def write_object_string(self, value):
# CslaBinaryWriter.Write(string)
self.stream.write(bytes([CslaKnownTypes.String.value]))
self.write_string(value)
def write_object_system_string(self, value):
# CslaBinaryWriter.WriteSystemString
if value in self.keywords_dictionary:
# Dictionary key already exists
dictionary_key = self.keywords_dictionary.index(value)
self.stream.write(bytes([CslaKnownTypes.StringDictionaryKey.value]))
self.write_int32(dictionary_key)
else:
# New dictionary key
dictionary_key = len(self.keywords_dictionary)
self.keywords_dictionary.append(value)
self.stream.write(bytes([CslaKnownTypes.StringWithDictionaryKey.value]))
self.write_string(value)
self.write_int32(dictionary_key)