Source code for task_ultrasonic
"""
task_ultrasonic.py
Cooperative scheduler task that wraps the UltrasonicSensor driver.
On startup a 3-sample moving-average filter is enabled. Each scheduler
tick calls sensor.loop() to advance the non-blocking measurement state
machine, then publishes the latest distance (or 0 if no reading is
available) to a shared variable.
"""
from pyb import Pin
from drivers.ultrasonic import UltrasonicSensor
from task_share import Share
[docs]
class task_ultrasonic:
"""
Scheduler task that continuously updates the ultrasonic distance
measurement and publishes the result to a share.
"""
def __init__(self, sensor: UltrasonicSensor, distanceShare: Share):
"""
Initialize the ultrasonic task and enable the sample filter.
Parameters
----------
sensor : UltrasonicSensor
Instantiated ultrasonic sensor driver.
distanceShare : Share
Output share for the filtered distance reading (in the units
returned by the driver). Written as 0 when no reading is
available.
"""
self._sensor = sensor
self._distanceShare = distanceShare
self._sensor.enable_filter(3)
[docs]
def run(self):
"""
Cooperative function for scheduler
"""
while True:
self._sensor.loop()
distance = self._sensor.get_distance()
if distance is None:
self._distanceShare.put(0)
else:
self._distanceShare.put(distance)
yield