Skip to content

Add ModbusConnection support #25

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dependencies = [
"pydantic",
"pvi~=0.7.1",
"softioc",
"pymodbus",
] # Add project dependencies here, e.g. ["click", "numpy"]
dynamic = ["version"]
license.file = "LICENSE"
Expand Down
3 changes: 2 additions & 1 deletion src/fastcs/connections/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .ip_connection import IPConnection
from .modbus_connection import ModbusSerialConnection, ModbusTcpConnection, ModbusUdpConnection

__all__ = ["IPConnection"]
__all__ = ["IPConnection", "ModbusSerialConnection", "ModbusTcpConnection", "ModbusUdpConnection"]
134 changes: 134 additions & 0 deletions src/fastcs/connections/modbus_connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
from dataclasses import dataclass
from typing import Optional

from pymodbus.client import (
AsyncModbusSerialClient,
AsyncModbusTcpClient,
AsyncModbusUdpClient,
ModbusBaseClient,
)
from pymodbus.exceptions import ModbusException
from pymodbus.framer import Framer
from pymodbus.pdu import ExceptionResponse, ModbusResponse

# Constants
CR = "\r"
TIMEOUT = 1.0 # Seconds
RECV_BUFFER = 4096 # Bytes


@dataclass
class ModbusConnectionSettings:
host: str = "127.0.0.1"
port: int = 7001
slave: int = 0


class ModbusConnection:
def __init__(self, settings: ModbusConnectionSettings) -> None:
self.host, self.port, self.slave = settings.host, settings.port, settings.slave
self.running: bool = False

self._client: ModbusBaseClient

async def connect(self) -> None:
raise NotImplementedError

def disconnect(self):
self._client.close()

async def _read(self, address: int, count: int = 2) -> Optional[ModbusResponse]:
# address -= 1 # modbus spec starts from 0 not 1
try:
# address_hex = hex(address)
rr = await self._client.read_holding_registers(
address, count=count, slave=self.slave
) # type: ignore

if rr.isError() or isinstance(rr, ExceptionResponse): # pragma no cover
# Received Modbus library error or exception
# THIS EXCEPTION IS NOT A PYTHON EXCEPTION, but a valid modbus message
self.disconnect()
return None
return rr

except ModbusException: # pragma no cover
# Received ModbusException from library
self.disconnect()
return None

async def send(self, address: int, value: int) -> ModbusResponse | None:
"""Send a request.

Args:
address: The register address to write to.
value: The value to write.
"""
await self._client.write_registers(address, value, slave=self.slave)
resp = await self._read(address, 2)
return resp


class ModbusSerialConnection(ModbusConnection):
def __init__(self, settings: ModbusConnectionSettings) -> None:
super().__init__(settings)

async def connect(self, framer: Framer = Framer.SOCKET):
self._client = AsyncModbusSerialClient(
str(self.port),
framer=framer,
timeout=10,
retries=3,
retry_on_empty=False,
close_comm_on_error=False,
strict=True,
baudrate=9600,
bytesize=8,
parity="N",
stopbits=1,
)

await self._client.connect()
assert self._client.connected


class ModbusTcpConnection(ModbusConnection):
def __init__(self, settings: ModbusConnectionSettings) -> None:
super().__init__(settings)

async def connect(self, framer: Framer = Framer.SOCKET):
self._client = AsyncModbusTcpClient(
self.host,
self.port,
framer=framer,
timeout=10,
retries=3,
retry_on_empty=False,
close_comm_on_error=False,
strict=True,
source_address=("localhost", 0),
)

await self._client.connect()
assert self._client.connected


class ModbusUdpConnection(ModbusConnection):
def __init__(self, settings: ModbusConnectionSettings) -> None:
super().__init__(settings)

async def connect(self, framer: Framer = Framer.SOCKET):
self._client = AsyncModbusUdpClient(
self.host,
self.port,
framer=framer,
timeout=10,
retries=3,
retry_on_empty=False,
close_comm_on_error=False,
strict=True,
source_address=("localhost", 0),
)

await self._client.connect()
assert self._client.connected