110 lines
3.7 KiB
Python
110 lines
3.7 KiB
Python
# Python Fronius Registers
|
|
#
|
|
# Copyright 2024, Paul Warren <pwarren@pwarren.id.au>
|
|
# Licensed under AGPLv3, See LICENSE.md for terms
|
|
|
|
from enum import Enum
|
|
|
|
from pymodbus.constants import Endian
|
|
from pymodbus.payload import BinaryPayloadDecoder
|
|
#from pymodbus.client.tcp import ModbusTcpClient as ModbusClient
|
|
#from pymodbus.diag_message import *
|
|
#from pymodbus.file_message import *
|
|
#from pymodbus.other_message import *
|
|
#from pymodbus.mei_message import *
|
|
|
|
class DataType(Enum):
|
|
String8 = 1
|
|
String16 = 2
|
|
String32 = 3
|
|
Int16 = 4
|
|
UInt16 = 5
|
|
Int32 = 6
|
|
UInt32 = 7
|
|
Float32 = 8
|
|
UInt64 = 7
|
|
|
|
# Returns the length (amount) of the registers.
|
|
# This corresponds to the value from the Fronius Excel list (column "Size").
|
|
# This refers to how many registers the Mobus function read_holding_registers()
|
|
# must read to get the complete value
|
|
def getRegisterLength(self):
|
|
|
|
if (self == DataType.String8) or (self == DataType.UInt64):
|
|
return int(4)
|
|
elif (self == DataType.String16):
|
|
return int(8)
|
|
elif (self == DataType.String32):
|
|
return int(16)
|
|
elif (self == DataType.Int16) or (self == DataType.UInt16):
|
|
return int(1)
|
|
elif (self == DataType.Int32) or (self == DataType.UInt32) or (self == DataType.Float32):
|
|
return int(2)
|
|
|
|
def decode(self, value):
|
|
decoder = BinaryPayloadDecoder.fromRegisters(value.registers, byteorder=Endian.Big, wordorder=Endian.Big)
|
|
|
|
if (self == DataType.String8) or (self == DataType.String16) or (self == DataType.String32):
|
|
return str(decoder.decode_string(16).decode('utf-8'))
|
|
|
|
elif (self == DataType.Int16):
|
|
return decoder.decode_16bit_int()
|
|
|
|
elif (self == DataType.UInt16):
|
|
return decoder.decode_16bit_uint()
|
|
|
|
elif (self == DataType.Int32):
|
|
return decoder.decode_32bit_int()
|
|
|
|
elif (self == DataType.UInt32):
|
|
return decoder.decode_32bit_uint()
|
|
|
|
elif (self == DataType.Float32):
|
|
return decoder.decode_32bit_float()
|
|
|
|
else:
|
|
return str(decoder.decode_bits())
|
|
|
|
class registerReadError(Exception):
|
|
pass
|
|
|
|
class FroniusReg:
|
|
def __init__(self, address, datatype, unit, description):
|
|
self.address = address
|
|
self.datatype = datatype
|
|
self.unit = unit
|
|
self.description=description
|
|
|
|
def getValue(self, modbusClient):
|
|
return self.__getRegisterValue(modbusClient,
|
|
self.address,
|
|
self.datatype,
|
|
self.unit)
|
|
|
|
def __getRegisterValue(self, modbusClient, address, dataType, unit):
|
|
modbusValue = modbusClient.read_holding_registers(address-1,
|
|
dataType.getRegisterLength(),
|
|
slave=unit)
|
|
if(modbusValue.isError()):
|
|
raise registerReadError("Unable to read from Fronius Register: %d, %s" % (self.id, self.description))
|
|
if(modbusValue is None):
|
|
raise registerReadError("It's NONE!")
|
|
return dataType.decode(modbusValue)
|
|
|
|
class ScaledFroniusReg:
|
|
def __init__(self, valueReg, scaleReg):
|
|
self.valueReg=valueReg
|
|
self.scaleReg=scaleReg
|
|
|
|
def getValue(self, modbusClient):
|
|
return self.valueReg.getValue(modbusClient) * 10 ** self.scaleReg.getValue(modbusClient)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
storageStateOfCharge = FroniusReg(40362, DataType.UInt16, 1, "Storage State of Charge")
|
|
storageStateOfChargeSF = FroniusReg(40376, DataType.Int16, 1, "Storage State of Charge Scaling Factor")
|
|
scaledStateOfCharge = ScaledFroniusReg(storageStateOfCharge, storageStateOfChargeSF)
|