Source code for task_motor

"""
task_motor.py

Cooperative task that wraps a PIController around a single Motor/Encoder pair
on the Romi robot.  Multiple instances are created (one per wheel) and run by
the cooperative scheduler.

States:
    S0_INIT (0): One-shot initialization; transitions immediately to S1_WAIT.
    S1_WAIT (1): Waits for the go flag to be set before enabling the motor.
    S2_RUN  (2): Runs closed-loop PI velocity control; transitions back to S1_WAIT when the go flag is cleared.
"""

from drivers.motor import Motor
from drivers.encoder import Encoder
from task_share   import Share, Queue
from utime        import ticks_us, ticks_diff
import micropython
from controller   import PIController
from constants    import *
try:
    import ujson as json
except ImportError:
    import json

S0_INIT = micropython.const(0) # State 0 - initialiation
S1_WAIT = micropython.const(1) # State 1 - wait for go command
S2_RUN  = micropython.const(2) # State 2 - run closed loop control


[docs] class task_motor: """ Cooperative scheduler task for closed-loop velocity control of one motor. Each instance owns a Motor, an Encoder, and a PIController. The task reads the encoder velocity, computes a PI correction, and drives the motor accordingly. Instantiate once per wheel (left and right). """ def __init__(self, mot: Motor, enc: Encoder, goFlag: Share, kpVal: Share, kiVal: Share, setpoint: Share, dataValues: Queue, timeValues: Queue, wheelDistance: Share, motorVoltage: Share, motorAngVelocity: Share): """ Initialize the motor task. Args: mot (Motor): Motor driver object used to set effort. enc (Encoder): Encoder object used to read wheel position and velocity. goFlag (Share): Integer flag; 0 = stop, 1 = run, 2 = run with profiling data capture. kpVal (Share): Proportional gain for the PI controller, readable and writable from the UI. kiVal (Share): Integral gain for the PI controller, readable and writable from the UI. setpoint (Share): Velocity setpoint (mm/s) written by the line-follow task or the UI. dataValues (Queue): Queue for logging encoder velocity samples during profiling runs. timeValues (Queue): Queue for logging timestamps (ms) corresponding to each velocity sample. wheelDistance (Share): Cumulative wheel travel (encoder counts) reported to the observer task. motorVoltage (Share): Commanded motor voltage (V) reported for telemetry and the UI. motorAngVelocity (Share): Wheel angular velocity (rad/s) derived from the encoder and reported for telemetry. """ self._state: int = S0_INIT # The present state of the task self._mot: Motor = mot # A motor object self._enc: Encoder = enc # An encoder object self._goFlag: Share = goFlag # A share object representing a # flag to start data collection self._kpVal: Share = kpVal # A share for Kp gain from user # interface self._kiVal: Share = kiVal # A share for Ki gain from user # interface self._setpoint: Share = setpoint # A share for setpoint value # from user interface self._wheelDistance: Share = wheelDistance #A share of distance traveled by the wheel from observer self._motorVoltage: Share = motorVoltage # A share of commanded motor voltage [V] self._motorOmega: Share = motorAngVelocity # A share of commanded motor angular velocity [V] self._dataValues: Queue = dataValues # A queue object used to store # collected encoder position self._timeValues: Queue = timeValues # A queue object used to store the # time stamps associated with the # collected encoder data self._startTime: int = 0 # The start time (in microseconds) # for a batch of collected data self._controller = PIController( self._apply_effort, # Actuator callback () MOTOR_GAIN, # Actuator gain self._enc.get_velocity, # Sensor (encoder counts per ms) # ENCODER_GAIN, # Sensor gain 1, (-100, 100) # Saturation min, max range # For motor: -100 to 100 % duty cycle ) self._load_gains() # Cache initial gains in memory self._kp_init = self._kpVal.get() self._ki_init = self._kiVal.get() self._controller.Kp = self._kp_init self._controller.Ki = self._ki_init self._profiling = False print("Motor Task object instantiated") def _apply_effort(self, effort_pct): """ Actuator callback invoked by PIController with the computed effort. Converts the percent effort to an equivalent voltage assuming a 6.5 V supply, writes the result to the motorVoltage share for telemetry, then forwards the raw percent value to the motor driver. Args: effort_pct (float): Desired motor effort in percent (-100 to 100). """ motor_voltage = (effort_pct / 100.0) * 6.5 self._motorVoltage.put(motor_voltage) self._mot.set_effort(effort_pct) def _load_gains(self) -> bool: """ Read motor PI gains from the JSON gains file. Attempts to open GAINS_FILE and parse the ``motor`` section. If a value for ``kp`` or ``ki`` is present it is written to the corresponding Share; otherwise the compile-time default constant is used instead. Returns: bool: True if the file was read and parsed successfully, False if the file could not be opened or contained invalid JSON. """ try: with open(GAINS_FILE, "r") as gains_file: data = json.load(gains_file) except (OSError, ValueError): return False motor = data.get("motor", {}) motor_kp = motor.get("kp") motor_ki = motor.get("ki") if motor_kp is not None: self._kpVal.put(float(motor_kp)) print(f"Read motor Kp: {float(motor_kp)}") else: self._kpVal.put() print(f"Motor Kp not found. Using default: {DEFAULT_MOTOR_KP}") if motor_ki is not None: self._kiVal.put(float(motor_ki)) print(f"Read motor Ki: {float(motor_ki)}") else: self._kiVal.put() print(f"Motor Ki not found. Using default: {DEFAULT_MOTOR_KI}") return True
[docs] def run(self): """ Cooperative function for scheduler """ while True: if self._state == S0_INIT: # Init state (can be removed if unneeded) # print("Initializing motor task") self._state = S1_WAIT elif self._state == S1_WAIT: # Wait for "go command" state go = self._goFlag.get() if go == 2: self._profiling = True else: self._profiling = False if go: # print("Starting motor loop") # Capture a start time in microseconds so that each sample # can be timestamped with respect to this start time. The # start time will be off by however long it takes to # transition and run the next state, so the time values may # need to be zeroed out again during data processing. self._startTime = ticks_us() self._state = S2_RUN self._mot.enable() self._controller.reset() self._enc.zero() self._controller.set_point = self._setpoint.get() self._controller.Kp = self._kpVal.get() self._controller.Ki = self._kiVal.get() elif self._state == S2_RUN: # Closed-loop control state # print(f"Running motor loop, cycle {self._dataValues.num_in()}") # Check if we need to exit motor control if not self._goFlag.get(): self._state = S1_WAIT self._mot.disable() self._enc.zero() self._controller.set_point = self._setpoint.get() # Run the encoder update algorithm and then capture the present # position of the encoder. You will eventually need to capture # the motor speed instead of position here. # self._enc.update() # Collect a timestamp to use for this sample t = ticks_us() # Actuate the motor using a control law. The one used here in # the example is a "bang bang" controller, and will work very # poorly in practice. Note that the set position is zero. You # will replace this with the output of your PID controller that # uses feedback from the velocity measurement. # self._mot.set_effort(30 if pos < 0 else -30) # Update encoder before measuring velocity self._enc.update() self._controller.run() self._wheelDistance.put(self._enc.get_position()) #Report Motor Angular Velocity omega = self._enc.get_velocity() / WHEEL_RADIUS self._motorOmega.put(omega) # Store angular to be reported to output if self._profiling: vel = self._enc.get_velocity() # # Store the sampled values in the queues # # self._dataValues.put(pos) self._dataValues.put(vel) # Store velocity to be reported to output self._timeValues.put(int(ticks_diff(t, self._startTime) / 1000)) # Convert from uS to mS (10^3) yield self._state