pyFroniusReg/froniusreg/froniusreg.py

185 lines
6.5 KiB
Python

# Python Fronius Registers
#
# Copyright 2024, Paul Warren <pwarren@pwarren.id.au>
# Licensed under AGPLv3, See LICENSE.md for terms
from enum import Enum
from pymodbus.constants import Endian
from pymodbus.payload import BinaryPayloadDecoder, BinaryPayloadBuilder
#from pymodbus.client.tcp import ModbusTcpClient as ModbusClient
#from pymodbus.diag_message import *
#from pymodbus.file_message import *
#from pymodbus.other_message import *
#from pymodbus.mei_message import *
class DataType(Enum):
String8 = 1
String16 = 2
String32 = 3
Int16 = 4
UInt16 = 5
Int32 = 6
UInt32 = 7
Float32 = 8
UInt64 = 7
# Returns the length (amount) of the registers.
# This corresponds to the value from the Fronius Excel list (column "Size").
# This refers to how many registers the Mobus function read_holding_registers()
# must read to get the complete value
def getRegisterLength(self):
if (self == DataType.String8) or (self == DataType.UInt64):
return int(4)
elif (self == DataType.String16):
return int(8)
elif (self == DataType.String32):
return int(16)
elif (self == DataType.Int16) or (self == DataType.UInt16):
return int(1)
elif (self == DataType.Int32) or (self == DataType.UInt32) or (self == DataType.Float32):
return int(2)
def decode(self, value):
decoder = BinaryPayloadDecoder.fromRegisters(value.registers, byteorder=Endian.Big, wordorder=Endian.Big)
if (self == DataType.String8) or (self == DataType.String16) or (self == DataType.String32):
return str(decoder.decode_string(16).decode('utf-8'))
elif (self == DataType.Int16):
return decoder.decode_16bit_int()
elif (self == DataType.UInt16):
return decoder.decode_16bit_uint()
elif (self == DataType.Int32):
return decoder.decode_32bit_int()
elif (self == DataType.UInt32):
return decoder.decode_32bit_uint()
elif (self == DataType.Float32):
return decoder.decode_32bit_float()
else:
return str(decoder.decode_bits())
def encode(self, value):
encoder = BinaryPayloadBuilder(byteorder=Endian.Big, wordorder=Endian.Big)
if (self == DataType.String8) or (self == DataType.String16) or (self == DataType.String32):
return encoder.add_string(value).build()
elif (self == DataType.Int16):
encoder.add_16bit_int(int(value))
return encoder.build()
# return int(value)
elif (self == DataType.UInt16):
encoder.add_16bit_uint(int(value))
return encoder.build()
elif (self == DataType.Int32):
return encoder.add_32bit_int(value).build()
elif (self == DataType.UInt32):
return encoder.add_32bit_uint(value).build()
elif (self == DataType.Float32):
return encoder.add_32bit_float(value).build()
else:
return encoder.add_bits(value).build()
# Constants
string8 = DataType.String8
string16 = DataType.String16
string32 = DataType.String32
int16 = DataType.Int16
uint16 = DataType.UInt16
int32 = DataType.Int32
uint32 = DataType.UInt32
float32 = DataType.Float32
uint64 = DataType.UInt64
class registerReadError(Exception):
pass
class FroniusReg:
def __init__(self, address, datatype, unit, description):
self.address = address
self.datatype = datatype
self.unit = unit
self.description=description
def getValue(self, modbusClient):
return self.__getRegisterValue(modbusClient)
def setValue(self, modbusClient, value):
return self.__setRegisterValue(modbusClient, value)
def __getRegisterValue(self, modbusClient):
modbusValue = modbusClient.read_holding_registers(self.address-1,
self.datatype.getRegisterLength(),
slave=self.unit)
if(modbusValue.isError()):
raise registerReadError("Unable to read from Fronius Register: %d, %s" % (self.id, self.description))
if(modbusValue is None):
raise registerReadError("It's NONE!")
return self.datatype.decode(modbusValue)
def __setRegisterValue(self, modbusClient, value):
modbusValue = modbusClient.write_registers(self.address-1,
self.datatype.encode(value),
count=self.datatype.getRegisterLength(),
slave=self.unit,
skip_encode=True)
return modbusValue
class ScaledFroniusReg:
def __init__(self, valueReg, scaleReg):
self.valueReg=valueReg
self.scaleReg=scaleReg
def getValue(self, modbusClient):
return self.valueReg.getValue(modbusClient) * 10 ** self.scaleReg.getValue(modbusClient)
def setValue(self, modbusClient, value):
return self.valueReg.setValue(modbusClient, value / 10 ** self.scaleReg.getValue(modbusClient))
MaxChaRte = FroniusReg(40155, uint16, 1, "Max Charge Rate")
MaxChaRte_SF = FroniusReg(40156, int16, 1, "Max Charge Rate SF")
wChaGra = FroniusReg(40357, uint16, 1, "Max Charge Power")
storageStateOfCharge = FroniusReg(40362, uint16, 1, "Storage State of Charge")
storageStateOfChargeSF = FroniusReg(40376, int16, 1, "Storage State of Charge Scaling Factor")
scaledStateOfCharge = ScaledFroniusReg(storageStateOfCharge, storageStateOfChargeSF)
OutWRte = FroniusReg(40366, int16, 1, "DischargeRate")
InWRte = FroniusReg(40367, int16, 1, "ChargeRate")
WRteSF = FroniusReg(40379, int16, 1, "ScalingFactor for storage Watts")
StorCtl_Mode = FroniusReg(40359, uint16, 1, "Hold/Charge/Discharge enable")
MinRsvPct = FroniusReg(40361, uint16, 1, "Reserve Percentage")
InOutWRte_RvrtTms = FroniusReg(40369, uint16, 1, "Revert timer for charge settings")
ChaGriSet = FroniusReg(40371, uint16, 1, "enum16, 0 = PV only, 1 = Grid enabled")
WChaDisChaGra_SF = FroniusReg(40373, int16, 1, "Charge/Discharge Power SF")
MinRsvPct_SF = FroniusReg(40375, int16, 1, "Reserve Percentage Scaling")
scaledOutWRte = ScaledFroniusReg(OutWRte, WRteSF)
scaledInWRte = ScaledFroniusReg(InWRte, WRteSF)
scaledReserve = ScaledFroniusReg(MinRsvPct, MinRsvPct_SF)
scaledMaxChaRte = ScaledFroniusReg(MaxChaRte, MaxChaRte_SF)
scaledMaxWChaGra = ScaledFroniusReg(wChaGra, WChaDisChaGra_SF)