Source code for task_reflectance
"""
task_reflectance.py
Cooperative scheduler task that wraps the Reflectance_Sensor driver.
The task progresses through four states driven by the reflectanceMode share:
- S0_IDLE: Wait for a mode command.
- S1_CALIB_DARK: Run the dark-surface calibration sequence.
- S2_CALIB_LIGHT: Run the light-surface calibration sequence.
- S3_RUN: Continuously read the sensor, publish the line centroid
and line-found flag to shares, and log centroid + elapsed
time to queues.
"""
from task_share import Share, Queue
from drivers.reflectance import Reflectance_Sensor
import micropython
from utime import ticks_us, ticks_diff
S0_IDLE = micropython.const(0)
S1_CALIB_DARK = micropython.const(1)
S2_CALIB_LIGHT = micropython.const(2)
S3_RUN = micropython.const(3)
[docs]
class task_reflectance:
"""
Scheduler task that controls the reflectance sensor through calibration
and line-following states.
"""
def __init__(
self,
reflectanceSensor: Reflectance_Sensor,
reflectanceMode: Share,
lineCentroid: Share,
lineFound: Share,
centroidValues: Queue,
centroidTimeValues: Queue
):
"""
Initialize the reflectance task.
Parameters
----------
reflectanceSensor : Reflectance_Sensor
Instantiated reflectance sensor driver.
reflectanceMode : Share
Command share that selects the operating mode:
0 = idle, 1 = calibrate dark, 2 = calibrate light, 3 = run.
lineCentroid : Share
Output share for the computed line centroid position.
lineFound : Share
Output share that is True when a line is detected.
centroidValues : Queue
Queue for logging centroid values over time.
centroidTimeValues : Queue
Queue for logging timestamps (ms from run start) paired with
centroid values.
"""
self._state = 0
self._sensor = reflectanceSensor
self._mode = reflectanceMode
self._lineCentroid = lineCentroid
self._lineFound = lineFound
self._centroidValues = centroidValues
self._centroidTimeValues = centroidTimeValues
self._runStartTime = 0
print("Reflectance sensor instantiated")
[docs]
def run(self):
"""
Cooperative function for scheduler
"""
while True:
if self._state == S0_IDLE:
mode = self._mode.get()
if mode == 0:
pass
elif mode == 1:
self._state = S1_CALIB_DARK
elif mode == 2:
self._state = S2_CALIB_LIGHT
elif mode == 3:
self._runStartTime = ticks_us()
self._state = S3_RUN
else:
raise ValueError(f"Invalid mode: {mode}")
elif self._state == S1_CALIB_DARK:
# Run calibration sequence (cooperatively) for dark readings
cal_gen = self._sensor.calibrate("dark")
while True:
try:
# print(next(cal_gen))
next(cal_gen)
yield 0
# When calibration is done, reset states
except StopIteration:
break
self._mode.put(0)
self._state = S0_IDLE
elif self._state == S2_CALIB_LIGHT:
# Run calibration sequence (cooperatively) for light readings
cal_gen = self._sensor.calibrate("light")
while True:
try:
# print(next(cal_gen))
next(cal_gen)
yield 0
# When calibration is done, reset states
except StopIteration:
break
self._mode.put(0)
self._state = S0_IDLE
elif self._state == S3_RUN:
# First, test flag for exit
if self._mode.get() == 0:
self._state = S0_IDLE
# If running, get latest centroid and put into share
# centroid = self._sensor.get_centroid()
_raw, _calibrated, centroid, line_found = self._sensor.get_values()
self._lineCentroid.put(centroid)
self._lineFound.put(line_found)
# Log centroid vs time without blocking if queues fill up
if (not self._centroidValues.full()) and (not self._centroidTimeValues.full()):
self._centroidValues.put(centroid)
self._centroidTimeValues.put(int(ticks_diff(ticks_us(), self._runStartTime) / 1000))
yield self._state