# Python Fronius Registers # # Copyright 2024, Paul Warren # Licensed under AGPLv3, See LICENSE.md for terms from enum import Enum from pymodbus.constants import Endian from pymodbus.payload import BinaryPayloadDecoder, BinaryPayloadBuilder #from pymodbus.client.tcp import ModbusTcpClient as ModbusClient #from pymodbus.diag_message import * #from pymodbus.file_message import * #from pymodbus.other_message import * #from pymodbus.mei_message import * class DataType(Enum): String8 = 1 String16 = 2 String32 = 3 Int16 = 4 UInt16 = 5 Int32 = 6 UInt32 = 7 Float32 = 8 UInt64 = 7 # Returns the length (amount) of the registers. # This corresponds to the value from the Fronius Excel list (column "Size"). # This refers to how many registers the Mobus function read_holding_registers() # must read to get the complete value def getRegisterLength(self): if (self == DataType.String8) or (self == DataType.UInt64): return int(4) elif (self == DataType.String16): return int(8) elif (self == DataType.String32): return int(16) elif (self == DataType.Int16) or (self == DataType.UInt16): return int(1) elif (self == DataType.Int32) or (self == DataType.UInt32) or (self == DataType.Float32): return int(2) def decode(self, value): decoder = BinaryPayloadDecoder.fromRegisters(value.registers, byteorder=Endian.Big, wordorder=Endian.Big) if (self == DataType.String8) or (self == DataType.String16) or (self == DataType.String32): return str(decoder.decode_string(16).decode('utf-8')) elif (self == DataType.Int16): return decoder.decode_16bit_int() elif (self == DataType.UInt16): return decoder.decode_16bit_uint() elif (self == DataType.Int32): return decoder.decode_32bit_int() elif (self == DataType.UInt32): return decoder.decode_32bit_uint() elif (self == DataType.Float32): return decoder.decode_32bit_float() else: return str(decoder.decode_bits()) def encode(self, value): encoder = BinaryPayloadBuilder(byteorder=Endian.Big, wordorder=Endian.Big) if (self == DataType.String8) or (self == DataType.String16) or (self == DataType.String32): return encoder.add_string(value).build() elif (self == DataType.Int16): encoder.add_16bit_int(int(value)) return encoder.build() # return int(value) elif (self == DataType.UInt16): encoder.add_16bit_uint(int(value)) return encoder.build() elif (self == DataType.Int32): return encoder.add_32bit_int(value).build() elif (self == DataType.UInt32): return encoder.add_32bit_uint(value).build() elif (self == DataType.Float32): return encoder.add_32bit_float(value).build() else: return encoder.add_bits(value).build() class registerReadError(Exception): pass class FroniusReg: def __init__(self, address, datatype, unit, description): self.address = address self.datatype = datatype self.unit = unit self.description=description def getValue(self, modbusClient): return self.__getRegisterValue(modbusClient) def setValue(self, modbusClient, value): return self.__setRegisterValue(modbusClient, value) def __getRegisterValue(self, modbusClient): modbusValue = modbusClient.read_holding_registers(self.address-1, self.datatype.getRegisterLength(), slave=self.unit) if(modbusValue.isError()): raise registerReadError("Unable to read from Fronius Register: %d, %s" % (self.id, self.description)) if(modbusValue is None): raise registerReadError("It's NONE!") return self.datatype.decode(modbusValue) def __setRegisterValue(self, modbusClient, value): modbusValue = modbusClient.write_registers(self.address-1, self.datatype.encode(value), count=self.datatype.getRegisterLength(), slave=self.unit, skip_encode=True) return modbusValue class ScaledFroniusReg: def __init__(self, valueReg, scaleReg): self.valueReg=valueReg self.scaleReg=scaleReg def getValue(self, modbusClient): return self.valueReg.getValue(modbusClient) * 10 ** self.scaleReg.getValue(modbusClient) def setValue(self, modbusClient, value): return self.valueReg.setValue(modbusClient, value / 10 ** self.scaleReg.getValue(modbusClient)) MaxChaRte = FroniusReg(40155, DataType.UInt16, 1, "Max Charge Rate") MaxChaRte_SF = FroniusReg(40156, DataType.Int16, 1, "Max Charge Rate SF") wChaGra = FroniusReg(40357, DataType.UInt16, 1, "Max Charge Power") storageStateOfCharge = FroniusReg(40362, DataType.UInt16, 1, "Storage State of Charge") storageStateOfChargeSF = FroniusReg(40376, DataType.Int16, 1, "Storage State of Charge Scaling Factor") scaledStateOfCharge = ScaledFroniusReg(storageStateOfCharge, storageStateOfChargeSF) OutWRte = FroniusReg(40366, DataType.Int16, 1, "DischargeRate") InWRte = FroniusReg(40367, DataType.Int16, 1, "ChargeRate") WRteSF = FroniusReg(40379, DataType.Int16, 1, "ScalingFactor for storage Watts") StorCtl_Mode = FroniusReg(40359, DataType.UInt16, 1, "Hold/Charge/Discharge enable") MinRsvPct = FroniusReg(40361, DataType.UInt16, 1, "Reserve Percentage") InOutWRte_RvrtTms = FroniusReg(40369, DataType.UInt16, 1, "Revert timer for charge settings") ChaGriSet = FroniusReg(40371, DataType.UInt16, 1, "enum16, 0 = PV only, 1 = Grid enabled") WChaDisChaGra_SF = FroniusReg(40373, DataType.Int16, 1, "Charge/Discharge Power SF") MinRsvPct_SF = FroniusReg(40375, DataType.Int16, 1, "Reserve Percentage Scaling") scaledOutWRte = ScaledFroniusReg(OutWRte, WRteSF) scaledInWRte = ScaledFroniusReg(InWRte, WRteSF) scaledReserve = ScaledFroniusReg(MinRsvPct, MinRsvPct_SF) scaledMaxChaRte = ScaledFroniusReg(MaxChaRte, MaxChaRte_SF) scaledMaxWChaGra = ScaledFroniusReg(wChaGra, WChaDisChaGra_SF)