Compare commits

...

4 Commits

Author SHA1 Message Date
da65539d16
Set tz when reading DateTime 2025-04-21 23:19:04 +10:00
ef2580feba
Implement write for DateTime 2025-04-21 23:18:40 +10:00
6ded4cb568
Implement write for Int32 2025-04-21 23:18:25 +10:00
6970250be9
Implement write for Null 2025-04-21 23:18:11 +10:00
2 changed files with 34 additions and 5 deletions

View File

@ -14,7 +14,7 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
from datetime import datetime from datetime import datetime, timezone
from decimal import Decimal from decimal import Decimal
import io import io
import struct import struct
@ -115,6 +115,12 @@ class CslaBinaryReader:
shift += 7 shift += 7
def read_datetime(self):
timestamp = self.read_int64() # Number of 100-nanosecond intervals that have elapsed since January 1, 0001 at 00:00:00.000
timestamp_unix = timestamp - 621355968000000000 # Subtract unix epoch in .NET ticks - https://gist.github.com/kristopherjohnson/397d0f74213a0087f1a1
timestamp_unix /= 10000000 # Convert ticks to seconds (10^9 / 100)
return datetime.fromtimestamp(timestamp_unix, timezone.utc)
def read_decimal(self): def read_decimal(self):
length = self.read_int32() length = self.read_int32()
if length != 4: if length != 4:
@ -176,10 +182,7 @@ class CslaBinaryReader:
return self.read_decimal() return self.read_decimal()
if known_type == CslaKnownTypes.DateTime.value: if known_type == CslaKnownTypes.DateTime.value:
timestamp = self.read_int64() # Number of 100-nanosecond intervals that have elapsed since January 1, 0001 at 00:00:00.000 return self.read_datetime()
timestamp_unix = timestamp - 621355968000000000 # Subtract unix epoch in .NET ticks - https://gist.github.com/kristopherjohnson/397d0f74213a0087f1a1
timestamp_unix /= 10000000 # Convert ticks to seconds (10^9 / 100)
return datetime.fromtimestamp(timestamp_unix)
if known_type == CslaKnownTypes.String.value: if known_type == CslaKnownTypes.String.value:
return self.read_string() return self.read_string()

View File

@ -14,6 +14,7 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
from datetime import datetime
from decimal import Decimal from decimal import Decimal
import io import io
import struct import struct
@ -85,6 +86,10 @@ class CslaBinaryWriter:
# BinaryWriter.WriteInt32 # BinaryWriter.WriteInt32
self.stream.write(struct.pack('<i', value)) self.stream.write(struct.pack('<i', value))
def write_int64(self, value):
# BinaryWriter.WriteInt64
self.stream.write(struct.pack('<q', value))
def write_string(self, value): def write_string(self, value):
# BinaryWriter.WriteString - "The string is prefixed with the length, encoded as an integer seven bits at a time." # BinaryWriter.WriteString - "The string is prefixed with the length, encoded as an integer seven bits at a time."
encoded_string = value.encode('utf-8') encoded_string = value.encode('utf-8')
@ -94,12 +99,21 @@ class CslaBinaryWriter:
def write_object(self, value): def write_object(self, value):
# CslaBinaryWriter.Write(...) # CslaBinaryWriter.Write(...)
if value is None:
return self.write_object_null()
if isinstance(value, bool): if isinstance(value, bool):
return self.write_object_bool(value) return self.write_object_bool(value)
if isinstance(value, int):
return self.write_object_int32(value)
if isinstance(value, Decimal): if isinstance(value, Decimal):
return self.write_object_decimal(value) return self.write_object_decimal(value)
if isinstance(value, datetime):
return self.write_object_datetime(value)
if isinstance(value, str): if isinstance(value, str):
return self.write_object_string(value) return self.write_object_string(value)
@ -118,6 +132,15 @@ class CslaBinaryWriter:
self.write_int32(len(value)) self.write_int32(len(value))
self.stream.write(value) self.stream.write(value)
def write_object_datetime(self, value):
# CslaBinaryWriter.Write(DateTime)
timestamp_unix = value.timestamp()
timestamp = timestamp_unix * 10000000 # Convert seconds to ticks (10^9 / 100)
timestamp += 621355968000000000 # Add unix epoch in .NET ticks - https://gist.github.com/kristopherjohnson/397d0f74213a0087f1a1
timestamp = int(timestamp) # Number of 100-nanosecond intervals that have elapsed since January 1, 0001 at 00:00:00.000
self.stream.write(bytes([CslaKnownTypes.DateTime.value]))
self.write_int64(timestamp)
def write_object_decimal(self, value): def write_object_decimal(self, value):
# CslaBinaryWriter.Write(Decimal) # CslaBinaryWriter.Write(Decimal)
self.stream.write(bytes([CslaKnownTypes.Decimal.value])) self.stream.write(bytes([CslaKnownTypes.Decimal.value]))
@ -131,6 +154,9 @@ class CslaBinaryWriter:
self.stream.write(bytes([CslaKnownTypes.Int32.value])) self.stream.write(bytes([CslaKnownTypes.Int32.value]))
self.write_int32(value) self.write_int32(value)
def write_object_null(self):
self.stream.write(bytes([CslaKnownTypes.Null.value]))
def write_object_string(self, value): def write_object_string(self, value):
# CslaBinaryWriter.Write(string) # CslaBinaryWriter.Write(string)
self.stream.write(bytes([CslaKnownTypes.String.value])) self.stream.write(bytes([CslaKnownTypes.String.value]))