— From Simulator to Embedded Firmware
(Module 5 · Development & Implementation – Bringing Modbus to Life)
Learning objectives
After finishing this chapter you will be able to …
- Explain the life-cycle of a Modbus server: initialise → listen → parse → execute → respond → idle.
- Create fully-compliant servers in Python (pymodbus), C (libmodbus RTU & TCP), and C++/Arduino (ESP32).
- Design an in-memory data store that mirrors the four Modbus tables and supports multi-thread-safe access.
- Implement custom logic hooks (on-write callbacks, range checks, simulation formulas).
- Validate your server with conformance-test scripts and integrate it in CI.
16.1 What a Modbus server actually does
Phase | Detail |
---|---|
1. Init | Allocate data store (coils, DIs, IRs, HRs) → fill with defaults or persistent values. |
2. Listen/Loop | Wait for request: UART ISR (RTU) or socket epoll (TCP/UDP). |
3. Parse PDU | Validate function code, address range, quantity, CRC/LRC (RTU). |
4. Execute | Read/Write/Diag against data store + optional callback. |
5. Respond | Build normal frame or exception; append CRC/LRC; send. |
6. Idle | Release bus (RTU → drop TX-Enable) or block in poll(); loop to 2. |
(Fig-16-1 placeholder: state-machine diagram.)
16.2 Choosing your implementation stack
Environment | Library / RTOS | Pros | Cons |
---|---|---|---|
Python 3.11+ | pymodbus.server | Fast prototyping, rich callbacks, asyncio | Not real-time; GIL for multi-CPU |
C / POSIX Linux | libmodbus | Low latency, deterministic, runs on RasPi & x86 | Manual memory mgmt |
C++ / Arduino (ESP) | TinyModbus or SimpleModbus | Compiles into 32 kB; hardware timers | Limited PDU length; DIY CRC |
FreeRTOS + LwIP | FreeMODBUS (mbed) | Production-grade embedded RTOS, ISR-driven | More build system work |
Recommendation: Start with pymodbus for behaviour + tests, then port to libmodbus
or FreeMODBUS when latency or footprint demands.
16.3 Python reference server (TCP & RTU)
16.3.1 Create register map
from pymodbus.datastore import ModbusSequentialDataBlock, ModbusSlaveContext, ModbusServerContext
store = ModbusSlaveContext(
di = ModbusSequentialDataBlock(0, [0]*128),
co = ModbusSequentialDataBlock(0, [0]*128),
hr = ModbusSequentialDataBlock(0, [0]*256), # Holding (4X)
ir = ModbusSequentialDataBlock(0, [0]*256) # Input (3X)
)
context = ModbusServerContext(slaves={17: store}, single=False)
16.3.2 Start TCP server (asyncio) — Listing 16-1
from pymodbus.server.async_io import StartTcpServer
from pymodbus.device import ModbusDeviceIdentification
identity = ModbusDeviceIdentification()
identity.VendorName = "Technical ABCD Laboratory"
identity.ProductCode = "TAL-VMBS-01"
identity.ProductName = "Virtual Modbus Slave"
identity.MajorMinorRevision = "1.0"
StartTcpServer(context, identity=identity, address=("0.0.0.0", 502))
16.3.3 Add on-write callback
def on_write_callback(register, address, values):
print(f"HR {address} <- {values}")
if register == 'hr' and address == 0: # HR40001
# Auto-update Input Register 30001 with doubled value
context[17].setValues(3, 0, [values[0]*2])
context[17].add_on_write_callback(on_write_callback)
16.4 libmodbus RTU slave (C, POSIX) — Listing 16-2
#include <modbus.h>
#include <signal.h>
#include <unistd.h>
uint16_t holding[256];
int run = 1;
void sig(int s){ run = 0; }
int main(){
modbus_t *ctx = modbus_new_rtu("/dev/ttyUSB0", 38400, 'E', 8, 1);
modbus_set_slave(ctx, 17);
modbus_rtu_set_serial_mode(ctx, MODBUS_RTU_RS485);
modbus_set_response_timeout(ctx, 0, 500000); // 0.5s
modbus_mapping_t *map = modbus_mapping_new(
128, 128, // coils, DI
256, 256, // holding, input
0); // file
signal(SIGINT,sig);
modbus_connect(ctx);
while(run){
uint8_t query[MODBUS_RTU_MAX_ADU_LENGTH];
int rc = modbus_receive(ctx, query);
if(rc > 0){
modbus_reply(ctx, query, rc, map);
}
}
modbus_mapping_free(map);
modbus_close(ctx); modbus_free(ctx);
}
Key flags
MODBUS_RTU_RS485
toggles TX-Enable via ioctl TIOCSRS485.- Compile:
gcc slave.c -lmodbus -lpthread -o slave
.
16.5 Microcontroller slave (ESP32 + Arduino) — highlights
#include "ModbusRTUServer.h"
ModbusRTUServer mb;
void setup() {
Serial.begin(38400, SERIAL_8E1);
mb.begin(17, 38400);
mb.configureHoldingRegisters(0, 256);
mb.configureCoils(0, 64);
}
void loop() {
if (mb.poll()) { // inside ISR-driven poll
uint16_t val = mb.holdingRegisterRead(0);
mb.inputRegisterWrite(0, val*2);
}
}
ISR latency ~ 35 µs per byte @38 k4 on ESP32 S3, fits 32 kB Flash.
16.6 Designing the data store
Table | Backing | Update source | Example cycle |
---|---|---|---|
Coils (0X) | std::atomic<bool> array | Master writes / PLC DI | Every write callback |
Discrete Inputs (1X) | GPIO scan ISR pushes bool | 10 ms task | |
Input Registers (3X) | ADC DMA buffer → queue | 100 Hz | |
Holding Registers (4X) | Config struct mirrored to EEPROM | Master writes; persist on change |
Thread safety – if ISR pushes data and Modbus task reads, protect with spinlock or double-buffer swap.
16.7 Implementing exception logic on server
if (!is_address_valid(addr, qty)) {
modbus_reply_exception(ctx, query, MODBUS_EXCEPTION_ILLEGAL_DATA_ADDRESS);
continue;
}
if (is_in_read_only_region(addr)) {
modbus_reply_exception(ctx, query, MODBUS_EXCEPTION_ILLEGAL_FUNCTION);
continue;
}
(Mapping defined in Chapter 13.)
16.8 Performance tuning
Bottleneck | Symptom | Tweak |
---|---|---|
UART ISR overrun | CRC errors > 1 % | DMA circular buffer, raise baud, shrink poll |
Socket backlog | accept() = -1 | listen(backlog=128) |
Task contention | 10 ms jitter | Raise server task priority in FreeRTOS |
Copy overhead | 40 % CPU at 100 rps | Use zero-copy pointers inside mapping |
(Fig-16-2 placeholder: oscilloscope screenshot showing stable TX-Enable waveform.)
16.9 Conformance & CI testing
- Official test tool:
modbus.org conformance test v3.4
(Windows). - Open-source alt:
modbus-tk test_parser.py
+ YAML scenarios. - GitHub Actions:
docker run -d --name slave simmodbus/slave:latest -a 17
then run pytest client verifying read/write loops.
Fail CI if any exception or time-out occurs at 50 rps for 60 s.
16.10 Security controls inside a slave
Threat | Mitigation |
---|---|
Unauthorized writes | Configurable write mask (allow only FC03/04) |
Broadcast flood (Unit-ID 0 ) | Disable broadcast bit |
Function-code fuzz | Table of allowed FC; respond Illegal Function else |
Buffer overflow via qty | Range-check qty <= map_size before memcpy |
(Listing 16-security.c snippet.)
16.11 Best-practice checklist (device vendor)
✔︎ | Rule |
---|---|
☐ | Always return specific exceptions; never silent drop. |
☐ | Keep ISR < 1 char time; else reply Acknowledge (05). |
☐ | Publish register map & word order in manual. |
☐ | Provide firmware flag to disable writes for read-only deployments. |
☐ | Ship with unique default slave ID (avoid 1). |
☐ | Test at 125-reg read length & 2 000-coil write length. |
Chapter recap
- A Modbus server is essentially a finite-state machine that exposes four memory tables and speaks either RTU or TCP.
- pymodbus gets you running in minutes; libmodbus or FreeMODBUS take you to real-time embedded.
- Success hinges on a sane data store, tight exception validation, and iron-clad thread safety.
- Conformance tests + CI pipelines guarantee you never ship a firmware that breaks when a master upgrades.
Assets to create
ID | Visual / file |
---|---|
Fig-16-1 | Server life-cycle FSM |
Fig-16-2 | TX-Enable timing scope |
Listing 16-1 | Python TCP server |
Listing 16-2 | libmodbus RTU C |
Listing-security | C write-mask example |
ZIP | Docker image with compiled slave + test harness |
Next: Chapter 17 – Advanced Modbus Programming Techniques: asynchronous RTU over RS-485, bulk-data streaming, user-defined function codes, and integrating a Modbus stack with edge analytics containers.