Source code for controller
from utime import ticks_us, ticks_diff
import gc
[docs]
class PIController:
"""
Generic PI (Proportional + Integral) controller for MicroPython.
Each `run()` call:
1) reads sensor
2) computes error and dt
3) updates integral (with anti-windup)
4) computes PI output
5) clamps to saturation
6) sends to actuator
"""
def __init__(
self,
actuator_cb, # Callable[[float], None]
actuator_gain: float, # gain applied to actuator effort
sensor_cb, # Callable[[], float]
sensor_gain: float, # gain applied to sensor measurement
saturation_range: tuple[float, float],
):
# External I/O
self._actuator_cb = actuator_cb
self._sensor_cb = sensor_cb
# Gains
self._actuator_gain: float = actuator_gain
self._sensor_gain: float = sensor_gain
# Saturation limits
self._sat_min, self._sat_max = saturation_range
# Controller gains (defaults preserved as in original)
self._Kp = 0.0
self._Ki = 0.0
self._Kff = 0.0
# Controller state
self._setpoint = 0.0
self._ff_setpoint = 0.0
self._last_ticksus = 0
self._last_measured = 0.0
self._i_error = 0.0
# Initialize timing and integrator state
self.reset()
# --------------------
# Public properties
# --------------------
@property
def Kp(self) -> float:
return self._Kp
@Kp.setter
def Kp(self, value: float):
self._Kp = value
@property
def Ki(self) -> float:
return self._Ki
@Ki.setter
def Ki(self, value: float):
self._Ki = value
@property
def set_point(self) -> float:
return self._setpoint
@set_point.setter
def set_point(self, value: float):
self._setpoint = value
[docs]
def set_feed_forward(self, setpoint: float, gain: float) -> None:
"""
Set feed-forward setpoint and gain.
Feed-forward effort is computed as ``ff_term = gain * setpoint``
and is added to the PI effort before saturation.
"""
self._ff_setpoint = setpoint
self._Kff = gain
# --------------------
# Control methods
# --------------------
[docs]
def reset(self):
"""Reset integral state and initialize the time reference."""
self._i_error = 0.0
self._last_ticksus = ticks_us()
[docs]
def run(self):
"""
Run one PI update with anti-windup protection.
--- Time delta (wrap-safe) ---
"""
now_us = ticks_us()
dt_s = ticks_diff(now_us, self._last_ticksus) * 1e-6 # µs -> s
if dt_s < 0:
dt_s = 0.0
self._last_ticksus = now_us
# --- Measurement ---
measured = self._sensor_cb() * self._sensor_gain
self._last_measured = measured
# --- Error and proportional term ---
error = self._setpoint - measured
p_term = self._Kp * error
# --- Anti-windup: predict integrator update, decide whether to accept it ---
# Candidate integrator after this timestep
candidate_i_error = self._i_error + (error * dt_s)
candidate_i_term = self._Ki * candidate_i_error
# Current PI pre-saturation using current integrator
current_i_term = self._Ki * self._i_error
current_pre_sat = p_term + current_i_term
candidate_pre_sat = p_term + candidate_i_term
# Decide whether to accept candidate integrator:
# - Accept if candidate output would be within saturation limits, OR
# - Accept if current output is saturated but the current error would move
# the output back towards the allowed range (helps recovery)
accept_integration = False
if self._sat_min <= candidate_pre_sat <= self._sat_max:
accept_integration = True
else:
# If currently above max and error is negative, integration will reduce output -> accept
if (current_pre_sat > self._sat_max) and (error < 0):
accept_integration = True
# If currently below min and error is positive, integration will increase output -> accept
if (current_pre_sat < self._sat_min) and (error > 0):
accept_integration = True
if accept_integration:
# Commit the integrator update
self._i_error = candidate_i_error
i_term = candidate_i_term
else:
# Do not change integrator (prevents wind-up)
i_term = current_i_term
# --- Compute final effort and saturate ---
ff_term = self._Kff * self._ff_setpoint
effort_pre_sat = p_term + i_term + ff_term
effort_pre_sat *= self._actuator_gain
if effort_pre_sat < self._sat_min:
effort = self._sat_min
elif effort_pre_sat > self._sat_max:
effort = self._sat_max
else:
effort = effort_pre_sat
# --- Send to actuator ---
self._actuator_cb(effort)
# Optional debug (commented)
# print("err:", error, "dt:", dt_s, "I:", self._i_error, "out:", effort)
gc.collect()