142 lines
4.7 KiB
Python
142 lines
4.7 KiB
Python
# pycsla-binary: Python implementation of CSLA .NET binary serialisation
|
|
# Copyright (C) 2025 Lee Yingtong Li (RunasSudo)
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
import io
|
|
import struct
|
|
from typing import List
|
|
|
|
from .known_types import CslaKnownTypes
|
|
from .serialization_info import SerializationInfo
|
|
|
|
class CslaBinaryWriter:
|
|
"""Writes SerializationInfo objects into binary-serialised CSLA data"""
|
|
|
|
@classmethod
|
|
def write_to_bytes(cls, serialisation_infos: List[SerializationInfo]) -> bytes:
|
|
stream = io.BytesIO()
|
|
writer = cls(stream)
|
|
writer.write(serialisation_infos)
|
|
return stream.getvalue()
|
|
|
|
# --------------
|
|
# Implementation
|
|
|
|
def __init__(self, stream: io.BufferedIOBase):
|
|
self.stream = stream
|
|
self.keywords_dictionary = []
|
|
|
|
def write(self, serialisation_infos: List[SerializationInfo]):
|
|
# Reverse of CslaBinaryReader.Read
|
|
self.write_int32(len(serialisation_infos))
|
|
|
|
for info in serialisation_infos:
|
|
# Write ReferenceID
|
|
self.write_int32(info.reference_id)
|
|
|
|
# Write TypeName
|
|
self.write_object_system_string(info.type_name)
|
|
|
|
# Write children
|
|
self.write_int32(len(info.children))
|
|
for child in info.children:
|
|
self.write_object_system_string(child.name)
|
|
self.write_object_bool(child.is_dirty)
|
|
self.write_object_int32(child.reference_id)
|
|
|
|
# Write field values
|
|
self.write_int32(len(info.values))
|
|
for value in info.values:
|
|
self.write_object_system_string(value.name)
|
|
self.write_object_system_string(value.enum_type_name or '')
|
|
self.write_object_bool(value.is_dirty)
|
|
self.write_object(value.value)
|
|
|
|
def write_7bit_encoded_int(self, value):
|
|
# BinaryWriter.Write7BitEncodedInt
|
|
# "The integer of the value parameter is written out seven bits at a time, starting with the seven least-significant bits. The high bit of a byte indicates whether there are more bytes to be written after this one."
|
|
while True:
|
|
value_7lsb = value & 0b01111111
|
|
value >>= 7
|
|
|
|
if value == 0:
|
|
# Final byte
|
|
self.stream.write(bytes([value_7lsb]))
|
|
return
|
|
else:
|
|
# Further bytes remaining
|
|
self.stream.write(bytes([value_7lsb | 0b10000000]))
|
|
|
|
def write_int32(self, value):
|
|
# BinaryWriter.WriteInt32
|
|
self.stream.write(struct.pack('<i', value))
|
|
|
|
def write_string(self, value):
|
|
# BinaryWriter.WriteString - "The string is prefixed with the length, encoded as an integer seven bits at a time."
|
|
encoded_string = value.encode('utf-8')
|
|
|
|
self.write_7bit_encoded_int(len(encoded_string))
|
|
self.stream.write(encoded_string)
|
|
|
|
def write_object(self, value):
|
|
# CslaBinaryWriter.Write(...)
|
|
if isinstance(value, bool):
|
|
return self.write_object_bool(value)
|
|
|
|
if isinstance(value, bytes):
|
|
return self.write_object_bytearray(value)
|
|
|
|
if isinstance(value, str):
|
|
return self.write_object_string(value)
|
|
|
|
raise NotImplementedError('CslaBinaryWriter.Write not implemented for type {}'.format(type(value).__name__))
|
|
|
|
def write_object_bool(self, value):
|
|
# CslaBinaryWriter.Write(bool)
|
|
self.stream.write(bytes([CslaKnownTypes.Boolean.value, 1 if value else 0]))
|
|
|
|
def write_object_bytearray(self, value):
|
|
# CslaBinaryWriter.Write(byte[])
|
|
self.stream.write(bytes([CslaKnownTypes.ByteArray.value]))
|
|
self.write_int32(len(value))
|
|
self.stream.write(value)
|
|
|
|
def write_object_int32(self, value):
|
|
# CslaBinaryWriter.Write(int)
|
|
self.stream.write(bytes([CslaKnownTypes.Int32.value]))
|
|
self.write_int32(value)
|
|
|
|
def write_object_string(self, value):
|
|
# CslaBinaryWriter.Write(string)
|
|
self.stream.write(bytes([CslaKnownTypes.String.value]))
|
|
self.write_string(value)
|
|
|
|
def write_object_system_string(self, value):
|
|
# CslaBinaryWriter.WriteSystemString
|
|
if value in self.keywords_dictionary:
|
|
# Dictionary key already exists
|
|
dictionary_key = self.keywords_dictionary.index(value)
|
|
|
|
self.stream.write(bytes([CslaKnownTypes.StringDictionaryKey.value]))
|
|
self.write_int32(dictionary_key)
|
|
else:
|
|
# New dictionary key
|
|
dictionary_key = len(self.keywords_dictionary)
|
|
self.keywords_dictionary.append(value)
|
|
|
|
self.stream.write(bytes([CslaKnownTypes.StringWithDictionaryKey.value]))
|
|
self.write_string(value)
|
|
self.write_int32(dictionary_key)
|