pycsla-binary/csla_binary/binary_writer.py

112 lines
3.9 KiB
Python

# pycsla-binary: Python implementation of CSLA .NET binary serialisation
# Copyright (C) 2025 Lee Yingtong Li (RunasSudo)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import io
import struct
from typing import List
from .known_types import CslaKnownTypes
from .serialization_info import SerializationInfo
class CslaBinaryWriter:
"""Writes SerializationInfo objects into binary-serialised CSLA data"""
def __init__(self, stream: io.BufferedIOBase):
self.stream = stream
self.keywords_dictionary = []
def write(self, serialisation_infos: List[SerializationInfo]):
# Reverse of CslaBinaryReader.Read
self.write_int32(len(serialisation_infos))
for info in serialisation_infos:
# Write ReferenceID
self.write_int32(info.reference_id)
# Write TypeName
self.write_object_system_string(info.type_name)
# Write children
self.write_int32(len(info.children))
for child in info.children:
self.write_object_system_string(child.name)
self.write_object_bool(child.is_dirty)
self.write_object_int32(child.reference_id)
# Write field values
self.write_int32(len(info.values))
for value in info.values:
self.write_object_system_string(value.name)
self.write_object_system_string(value.enum_type_name or '')
self.write_object_bool(value.is_dirty)
self.write_object(value.value)
def write_7bit_encoded_int(self, value):
# BinaryWriter.Write7BitEncodedInt
# "The integer of the value parameter is written out seven bits at a time, starting with the seven least-significant bits. The high bit of a byte indicates whether there are more bytes to be written after this one."
while True:
value_7lsb = value & 0b01111111
value >>= 7
if value == 0:
# Final byte
self.stream.write(bytes([value_7lsb]))
return
else:
# Further bytes remaining
self.stream.write(bytes([value_7lsb | 0b10000000]))
def write_int32(self, value):
# BinaryWriter.WriteInt32
self.stream.write(struct.pack('<i', value))
def write_string(self, value):
# BinaryWriter.WriteString - "The string is prefixed with the length, encoded as an integer seven bits at a time."
encoded_string = value.encode('utf-8')
self.write_7bit_encoded_int(len(encoded_string))
self.stream.write(encoded_string)
def write_object(self, value):
# CslaBinaryWriter.Write(...)
raise NotImplementedError('Writing objects of dynamic type is not yet implemented')
def write_object_bool(self, value):
# CslaBinaryWriter.Write(bool)
self.stream.write(bytes([CslaKnownTypes.Boolean.value, 1 if value else 0]))
def write_object_int32(self, value):
# CslaBinaryWriter.Write(int)
self.stream.write(bytes([CslaKnownTypes.Int32.value]))
self.write_int32(value)
def write_object_system_string(self, value):
# CslaBinaryWriter.WriteSystemString
if value in self.keywords_dictionary:
# Dictionary key already exists
dictionary_key = self.keywords_dictionary.index(value)
self.stream.write(bytes([CslaKnownTypes.StringDictionaryKey.value]))
self.write_int32(dictionary_key)
else:
# New dictionary key
dictionary_key = len(self.keywords_dictionary)
self.keywords_dictionary.append(value)
self.stream.write(bytes([CslaKnownTypes.StringWithDictionaryKey.value]))
self.write_string(value)
self.write_int32(dictionary_key)