Source code for task_line_follow
"""
task_line_follow.py
Cooperative task that runs a PI controller to keep a line centered under the
Romi robot. The centroid reported by the reflectance sensor is used as the
feedback signal. The controller output is applied as a differential speed
offset to the left and right motors so that the robot steers toward the line.
"""
from task_share import Share
from controller import PIController
try:
import ujson as json
except ImportError:
import json
from constants import GAINS_FILE, DEFAULT_LF_KP, DEFAULT_LF_KI, DEFAULT_LF_KFF
[docs]
class task_line_follow:
"""
Cooperative scheduler task for line following on a differential-drive Romi.
A PIController tracks the line centroid (feedback) and outputs a speed
correction that is added to the nominal forward velocity on one wheel and
subtracted from the other, steering the robot back onto the line.
"""
def __init__(
self,
lineFollowGo: Share,
lineFollowSetPoint: Share,
lineFollowKp: Share,
lineFollowKi: Share,
lineCentroid: Share,
lineFollowKff: Share,
rightMotorSetPoint: Share,
leftMotorSetPoint: Share
):
"""
Initialize the line-follow task.
Args:
lineFollowGo (Share): Boolean flag; task runs PI control while
this share is non-zero.
lineFollowSetPoint (Share): Nominal forward speed setpoint (mm/s)
shared with the UI.
lineFollowKp (Share): Proportional gain for the PI controller,
readable and writable from the UI.
lineFollowKi (Share): Integral gain for the PI controller,
readable and writable from the UI.
lineCentroid (Share): Centroid of the detected line (mm),
produced by the reflectance sensor task.
lineFollowKff (Share): Feed-forward gain applied to compensate
for steady-state curvature.
rightMotorSetPoint (Share): Speed setpoint written to the right motor
task (mm/s).
leftMotorSetPoint (Share): Speed setpoint written to the left motor
task (mm/s).
"""
self._state = 0
self._goFlag = lineFollowGo
self._setPoint = lineFollowSetPoint
self._Kp = lineFollowKp
self._Ki = lineFollowKi
self._lineCentroid = lineCentroid
self._Kff = lineFollowKff
self._nominalSetPoint = 0
self._rightMotorSetPoint = rightMotorSetPoint
self._leftMotorSetPoint = leftMotorSetPoint
self._controller = PIController( # Instantiate a PI controller to affect Romi spin
self._plant_cb, # as the line centroid drifts from center.
141, # Velocity at wheels = W*Omega, where W is wheel width (141mm)
self._lineCentroid.get,
1,
(-100, 100)
)
self._load_gains()
# Set Controller Parameters
self._controller.Kp = lineFollowKp.get() # Proportional gain
self._controller.Ki = lineFollowKi.get() # Integral gain
self._controller.set_point = 0 # Setpoint is 0: corresponding to
# keeping the line centered under romi
self._state = 0
print("Line follower task instantiated")
def _load_gains(self) -> bool:
"""
Read line-follower PI gains and setpoint from the JSON gains file.
Attempts to open GAINS_FILE and parse the ``line_follower`` section.
Each parameter (kp, ki, kff, set_point) is written to its corresponding
Share if present; otherwise the compile-time default constant is used.
Returns:
bool: True if the file was read and parsed successfully, False if the
file could not be opened or contained invalid JSON.
"""
try:
with open(GAINS_FILE, "r") as gains_file:
data = json.load(gains_file)
except (OSError, ValueError):
return False
line_follower = data.get("line_follower", {})
lf_kp = line_follower.get("kp")
lf_ki = line_follower.get("ki")
lf_setpoint = line_follower.get("set_point")
lf_kff = line_follower.get("kff")
if lf_kp is not None:
self._Kp.put(float(lf_kp))
print(f"Read LF Kp: {float(lf_kp)}")
else:
self._Kp.put(DEFAULT_LF_KP)
print(f"LF Kp not found. Using default: {DEFAULT_LF_KP}")
if lf_ki is not None:
self._Ki.put(float(lf_ki))
print(f"Read LF Ki: {float(lf_ki)}")
else:
self._Ki.put(DEFAULT_LF_KI)
print(f"LF Ki not found. Using default: {DEFAULT_LF_KI}")
if lf_setpoint is not None:
self._nominalSetPoint = float(lf_setpoint)
self._setPoint.put(float(lf_setpoint))
print(f"Read LF setpoint: {float(lf_setpoint)}")
if lf_kff is not None:
self._Kff.put(lf_kff)
print(f"Read LF Kff: {float(lf_kff)}")
else:
self._Ki.put(DEFAULT_LF_KFF)
print(f"LF Kff not found. Using default: {DEFAULT_LF_KFF}")
return True
[docs]
def run(self):
"""
Cooperative task for scheduler
"""
while True:
if self._state == 0:
if self._goFlag.get():
# Update line follow gains once each time go flag is set
self._controller.Kp = self._Kp.get()
self._controller.Ki = self._Ki.get()
self._nominalSetPoint = self._setPoint.get() # Setpoint in mm/s
# Configure feed-forward for this velocity
radius = 300 # radius of test circle in mm
omega = self._nominalSetPoint / radius
self._controller.set_feed_forward(omega, self._Kff.get())
self._controller.reset()
self._state = 1
elif self._state == 1:
if not self._goFlag.get():
self._state = 0
else:
# Continue to update nominal velocity
self._nominalSetPoint = self._setPoint.get()
# Run line follower PI control
# input will be read from the centroid share
# output will be sent to `plant_cb`
self._controller.run()
yield self._state
def _plant_cb(self, value):
"""
Callback invoked by PIController with the computed correction output.
Applies a differential speed offset to steer the robot toward the line.
The left motor receives ``nominal + value`` and the right motor receives
``nominal - value``, so a positive correction turns the robot right and
a negative correction turns it left.
Args:
value (float): Controller output (mm/s correction) bounded by the
controller's saturation limits.
"""
self._leftMotorSetPoint.put(self._nominalSetPoint + value)
self._rightMotorSetPoint.put(self._nominalSetPoint - value)