"""
task_imu.py
Cooperative scheduler task that manages the BNO055 IMU sensor.
The task moves through the following states:
- S0_BEGIN: Initialize the IMU and attempt to restore a saved
calibration from flash.
- S1_CALIBRATE: Wait until all subsystems reach full calibration,
then save offsets to flash.
- S2_IDLE: Dispatch mode commands (load/save calibration, tare,
read values, get calibration status, or run NDOF).
- S3_RUN_NDOF: Read heading and heading-rate from the IMU and publish
them to shares, then return to idle.
- S4_SAVE_CALIB: Persist current calibration offsets and tare values.
- S5_LOAD_CALIB: Restore calibration offsets and tare values from flash.
- S6_TARE: Collect accelerometer/gyro tare samples and save them.
- S7_READ_VALS: Perform a one-shot read of heading and heading-rate.
- S8_GET_CALIB_STATE: Pack calibration sub-system statuses into a single
byte and publish it to a share.
"""
from task_share import Share
from micropython import const
from drivers.imu import BNO055, NDOF_OP_MODE
from constants import IMU_FILE
from pyb import I2C
try:
import ujson as json
except ImportError:
import json
S0_BEGIN = const(0)
S1_CALIBRATE = const(1)
S2_IDLE = const(2)
S3_RUN_NDOF = const(3)
S4_SAVE_CALIB = const(4)
S5_LOAD_CALIB = const(5)
S6_TARE = const(6)
S7_READ_VALS = const(7)
S8_GET_CALIB_STATE = const(8)
[docs]
class task_imu:
"""
Scheduler task that initializes, calibrates, and reads the BNO055 IMU.
Heading and heading-rate are published to shared variables for use by
other tasks (e.g., closed-loop motor control).
"""
def __init__(self, imuSensor: BNO055, mode: Share, calibration: Share,
heading: Share, headingRate: Share
):
"""
Initialize the IMU task.
Parameters
----------
imuSensor : BNO055
Instantiated BNO055 driver.
mode : Share
Command share that selects the operating mode:
0 = idle, 1 = load calibration, 2 = save calibration,
3 = tare, 4 = read values, 5 = get calibration status,
0xFF = run NDOF (continuous heading updates).
calibration : Share
Output share for the packed calibration status byte
(sys[7:6] | gyro[5:4] | accel[3:2] | mag[1:0]).
heading : Share
Output share for the current Euler heading angle (degrees).
headingRate : Share
Output share for the current yaw rate from the gyroscope
(degrees per second).
"""
self._mode = mode
self._calibration = calibration
self._heading = heading
self._headingRate = headingRate
self._imu = imuSensor
self._calibration_saved = False
self._run_mode_active = False
print('IMU Task initialized')
self._state = S0_BEGIN
def _read_imu_file(self):
"""
Read and parse the IMU JSON file from flash.
Returns
-------
dict
Parsed file contents, or an empty dict if the file is missing
or contains invalid JSON.
"""
try:
with open(IMU_FILE, "r") as gains_file:
data = json.load(gains_file)
except (OSError, ValueError):
data = {}
if not isinstance(data, dict):
data = {}
return data
def _write_imu_file(self, data):
"""
Serialize and write data to the IMU JSON file on flash.
Parameters
----------
data : dict
Data to serialize and persist.
Returns
-------
bool
True on success, False if an OSError occurs during the write.
"""
try:
with open(IMU_FILE, "w") as gains_file:
json.dump(data, gains_file)
except OSError:
return False
return True
def _save_calibration(self):
"""
Retrieve current calibration offsets and tare values from the IMU,
then persist them to the IMU JSON file.
This is a generator method; use ``yield from _save_calibration()``.
Returns
-------
bool
True if the file was written successfully, False otherwise.
"""
offsets = yield from self._imu.get_calibration_offsets()
accel_tare, gyro_tare = self._imu.get_tare()
data = self._read_imu_file()
data["imu"] = {
"offsets": list(offsets),
"tare": {
"accel": list(accel_tare),
"gyro": list(gyro_tare),
},
}
return self._write_imu_file(data)
def _save_tare_only(self):
"""
Persist only the current tare values (accelerometer and gyroscope)
to the IMU JSON file, leaving any stored calibration offsets intact.
Returns
-------
bool
True if the file was written successfully, False otherwise.
"""
accel_tare, gyro_tare = self._imu.get_tare()
data = self._read_imu_file()
imu_data = data.get("imu", {}) if isinstance(data.get("imu", {}), dict) else {}
imu_data["tare"] = {
"accel": list(accel_tare),
"gyro": list(gyro_tare),
}
data["imu"] = imu_data
return self._write_imu_file(data)
def _load_calibration(self):
"""
Load calibration offsets and tare values from the IMU JSON file and
apply them to the sensor.
This is a generator method; use ``yield from _load_calibration()``.
Returns
-------
bool
True if valid 22-byte calibration offsets were found and applied,
False if the file is missing, malformed, or the offset payload is
the wrong length.
"""
data = self._read_imu_file()
imu_data = data.get("imu", {}) if isinstance(data, dict) else {}
offsets = imu_data.get("offsets") if isinstance(imu_data, dict) else None
tare_data = imu_data.get("tare") if isinstance(imu_data, dict) else None
if isinstance(tare_data, dict):
accel_tare = tare_data.get("accel")
gyro_tare = tare_data.get("gyro")
if accel_tare is not None and gyro_tare is not None:
try:
self._imu.set_tare(accel_tare, gyro_tare)
except (TypeError, ValueError, IndexError):
self._imu.clear_tare()
if offsets is None:
return False
try:
payload = bytes(offsets)
except (TypeError, ValueError):
return False
if len(payload) != 22:
return False
yield from self._imu.set_calibration_offsets(payload)
return True
[docs]
def tare_accel_gyro(self, sample_count=100, sample_delay_ms=5):
"""
Collect tare samples from the accelerometer and gyroscope, then
save the resulting tare offsets to flash.
This is a generator method; use ``yield from tare_accel_gyro()``.
Parameters
----------
sample_count : int, optional
Number of samples to average for the tare (default 100).
sample_delay_ms : int, optional
Delay in milliseconds between samples (default 5).
Returns
-------
bool
True if the tare values were saved successfully, False otherwise.
"""
yield from self._imu.tare_accel_gyro(sample_count=sample_count, sample_delay_ms=sample_delay_ms)
return self._save_tare_only()
[docs]
def clear_tare(self):
"""
Reset the IMU tare offsets to zero and persist the cleared values
to flash.
Returns
-------
bool
True if the updated tare values were saved successfully,
False otherwise.
"""
self._imu.clear_tare()
return self._save_tare_only()
[docs]
def run(self):
"""
Cooperative task for scheduler
"""
while True:
if self._state == S0_BEGIN:
# Begin sensor (contains cooperative delays)
yield from self._imu.initialize(NDOF_OP_MODE)
# Attempt to restore saved calibration; otherwise calibrate live.
loaded = yield from self._load_calibration()
if loaded:
# print('DEBUG: Calibration restored')
self._calibration_saved = True
self._state = S2_IDLE
else:
# self._state = S1_CALIBRATE
self._state = S2_IDLE
elif self._state == S1_CALIBRATE:
# Get calibration states
calib_status = self._imu.calibration_status()
# Continue when all systems calibrated (status = 3)
if all([x == 0x03 for x in calib_status]):
if not self._calibration_saved:
self._calibration_saved = bool((yield from self._save_calibration()))
# print('DEBUG: Calibration saved')
self._state = S2_IDLE
elif self._state == S2_IDLE:
mode = self._mode.get()
if mode == 1:
self._run_mode_active = False
self._state = S5_LOAD_CALIB
elif mode == 2:
self._run_mode_active = False
self._state = S4_SAVE_CALIB
elif mode == 3:
self._run_mode_active = False
self._state = S6_TARE
elif mode == 4:
self._run_mode_active = False
self._state = S7_READ_VALS
elif mode == 5:
self._run_mode_active = False
self._state = S8_GET_CALIB_STATE
elif mode == 0xFF:
if not self._run_mode_active:
# Tare heading once when switching into run mode.
yield from self._imu.tare_euler()
self._run_mode_active = True
self._state = S3_RUN_NDOF
else:
self._run_mode_active = False
elif self._state == S3_RUN_NDOF:
gx, gy, gz = self._imu.gyro()
h, r, p = self._imu.euler()
#ax, ay, az = self._imu.acceleration()
#mx, my, mz = self._imu.magnetic()
self._heading.put(h)
self._headingRate.put(gz)
#self._accelX.put(ax)
#self._accelY.put(ay)
#self._accelZ.put(az)
#self._heading.put(mz)
self._state = S2_IDLE
elif self._state == S4_SAVE_CALIB:
result = yield from self._save_calibration()
self._mode.put(0) # Ack that save calibration is done
self._state = S2_IDLE
elif self._state == S5_LOAD_CALIB:
result = yield from self._load_calibration()
self._mode.put(0) # Ack that load calibration is done
self._state = S2_IDLE
elif self._state == S6_TARE:
result = yield from self.tare_accel_gyro()
self._mode.put(0)
self._state = S2_IDLE
elif self._state == S7_READ_VALS:
gx, gy, gz = self._imu.gyro()
h, r, p = self._imu.euler()
#ax, ay, az = self._imu.acceleration()
#mx, my, mz = self._imu.magnetic()
self._heading.put(h)
self._headingRate.put(gz)
self._mode.put(0)
self._state = S2_IDLE
elif self._state == S8_GET_CALIB_STATE:
s_sys, s_gyro, s_accel, s_mag = self._imu.calibration_status()
result = 0
result |= (s_sys << 6)
result |= (s_gyro << 4)
result |= (s_accel << 2)
result |= s_mag
self._calibration.put(result)
self._mode.put(0)
self._state = S2_IDLE
yield self._state