Source code for cotask

"""
Cooperative multitasking scheduler for MicroPython.

Contains classes to run cooperatively scheduled tasks in a multitasking
system. Tasks are written as generator functions with infinite loops that
call ``yield`` at least once per iteration. A :class:`CoTaskList` maintains
references to all active tasks and the scheduler runs each task's
``run()`` method using round-robin or highest-priority-first scheduling.

:author: JR Ridgely
:date: 2017-Jan-01 — original creation; 2021-Dec-18 — docstrings updated
:copyright: Copyright (c) 2017-2023 by JR Ridgely, released under the
    GNU Public License v3.0. Provided for educational use without warranty.
"""

import gc                              # Memory allocation garbage collector
import utime                           # Micropython version of time library
import micropython                     # This shuts up incorrect warnings


## Implements multitasking with scheduling and some performance logging.
#
#  This class implements behavior common to tasks in a cooperative 
#  multitasking system which runs in MicroPython. The ability to be scheduled
#  on the basis of time or an external software trigger or interrupt is 
#  implemented, state transitions can be recorded, and run times can be
#  profiled. The user's task code must be implemented in a generator which
#  yields the state (and the CPU) after it has run for a short and bounded 
#  period of time. 
#
#  @b Example:
#    @code
#       def task1_fun ():
#            '''! This function switches states repeatedly for no reason '''
#            state = 0
#            while True:
#                if state == 0:
#                    state = 1
#                elif state == 1:
#                    state = 0
#                yield (state)
# 
#        # In main routine, create this task and set it to run twice per second
#        task1 = cotask.Task (task1_fun, name = 'Task 1', priority = 1, 
#                             period = 500, profile = True, trace = True)
#
#        # Add the task to the list (so it will be run) and run scheduler
#        cotask.task_list.append (task1)
#        while True: 
#            cotask.task_list.pri_sched ()
#    @endcode
[docs] class Task: ## Initialize a task object so it may be run by the scheduler. # # This method initializes a task object, saving copies of constructor # parameters and preparing an empty dictionary for states. # # @param run_fun The function which implements the task's code. It must # be a generator which yields the current state. # @param name The name of the task, by default @c NoName. This should # be overridden with a more descriptive name by the programmer. # @param priority The priority of the task, a positive integer with # higher numbers meaning higher priority (default 0) # @param period The time in milliseconds between runs of the task if it's # run by a timer or @c None if the task is not run by a timer. # The time can be given in a @c float or @c int; it will be # converted to microseconds for internal use by the scheduler. # @param profile Set to @c True to enable run-time profiling # @param trace Set to @c True to generate a list of transitions between # states. @b Note: This slows things down and allocates memory. # @param shares A list or tuple of shares and queues used by this task. # If no list is given, no shares are passed to the task def __init__(self, run_fun, name="NoName", priority=0, period=None, profile=False, trace=False, shares=()): # The function which is run to implement this task's code. Since it # is a generator, we "run" it here, which doesn't actually run it but # gets it going as a generator which is ready to yield values if shares: self._run_gen = run_fun(shares) else: self._run_gen = run_fun() ## The name of the task, hopefully a short and descriptive string. self.name = name ## The task's priority, an integer with higher numbers meaning higher # priority. self.priority = int(priority) ## The period, in milliseconds, between runs of the task's @c run() # method. If the period is @c None, the @c run() method won't be run # on a time basis but will instead be run by the scheduler as soon # as feasible after code such as an interrupt handler calls the # @c go() method. if period != None: self.period = int(period * 1000) self._next_run = utime.ticks_us() + self.period else: self.period = period self._next_run = None # Flag which causes the task to be profiled, in which the execution # time of the @c run() method is measured and basic statistics kept. self._prof = profile self.reset_profile() # The previous state in which the task last ran. It is used to watch # for and track state transitions. self._prev_state = 0 # If transition tracing has been enabled, create an empty list in # which to store transition (time, to-state) stamps self._trace = trace self._tr_data = [] self._prev_time = utime.ticks_us() ## Flag which is set true when the task is ready to be run by the # scheduler self.go_flag = False ## This method is called by the scheduler; it attempts to run this task. # If the task is not yet ready to run, this method returns @c False # immediately; if this task is ready to run, it runs the task's generator # up to the next @c yield() and then returns @c True. # # @return @c True if the task ran or @c False if it did not
[docs] def schedule(self) -> bool: if self.ready(): # Reset the go flag for the next run self.go_flag = False # If profiling, save the start time if self._prof: stime = utime.ticks_us() # Run the method belonging to the state which should be run next curr_state = next(self._run_gen) # If profiling or tracing, save timing data if self._prof or self._trace: etime = utime.ticks_us() # If profiling, save timing data if self._prof: self._runs += 1 runt = utime.ticks_diff(etime, stime) if self._runs > 2: self._run_sum += runt if runt > self._slowest: self._slowest = runt # If transition logic tracing is on, record a transition; if not, # ignore the state. If out of memory, switch tracing off and # run the memory allocation garbage collector if self._trace: try: if curr_state != self._prev_state: self._tr_data.append( (utime.ticks_diff(etime, self._prev_time), curr_state)) except MemoryError: self._trace = False gc.collect() self._prev_state = curr_state self._prev_time = etime return True else: return False
## This method checks if the task is ready to run. # If the task runs on a timer, this method checks what time it is; if not, # this method checks the flag which indicates that the task is ready to # go. This method may be overridden in descendent classes to implement # some other behavior. @micropython.native def ready(self) -> bool: # If this task uses a timer, check if it's time to run run() again. If # so, set go flag and set the timer to go off at the next run time if self.period != None: late = utime.ticks_diff(utime.ticks_us(), self._next_run) if late > 0: self.go_flag = True self._next_run = utime.ticks_diff(self.period, -self._next_run) # If keeping a latency profile, record the data if self._prof: self._late_sum += late if late > self._latest: self._latest = late # If the task doesn't use a timer, we rely on go_flag to signal ready return self.go_flag ## This method sets the period between runs of the task to the given # number of milliseconds, or @c None if the task is triggered by calls # to @c go() rather than time. # @param new_period The new period in milliseconds between task runs
[docs] def set_period(self, new_period): if new_period is None: self.period = None else: self.period = int(new_period) * 1000
## This method resets the variables used for execution time profiling. # This method is also used by @c __init__() to create the variables.
[docs] def reset_profile(self): self._runs = 0 self._run_sum = 0 self._slowest = 0 self._late_sum = 0 self._latest = 0
## This method returns a string containing the task's transition trace. # The trace is a set of tuples, each of which contains a time and the # states from and to which the system transitioned. # @return A possibly quite large string showing state transitions
[docs] def get_trace(self): tr_str = 'Task ' + self.name + ':' if self._trace: tr_str += '\n' last_state = 0 total_time = 0.0 for item in self._tr_data: total_time += item[0] / 1000000.0 tr_str += '{: 12.6f}: {: 2d} -> {:d}\n'.format (total_time, last_state, item[1]) last_state = item[1] else: tr_str += ' not traced' return tr_str
## Method to set a flag so that this task indicates that it's ready to run. # This method may be called from an interrupt service routine or from # another task which has data that this task needs to process soon.
[docs] def go(self): self.go_flag = True
## This method converts the task to a string for diagnostic use. # It shows information about the task, including execution time # profiling results if profiling has been done. # @returns The string which represents the task def __repr__(self): rst = f"{self.name:<16s}{self.priority: 4d}" try: rst += f"{(self.period / 1000.0): 10.1f}" except TypeError: rst += ' -' rst += f"{self._runs: 8d}" if self._prof and self._runs > 0: avg_dur = (self._run_sum / self._runs) / 1000.0 avg_late = (self._late_sum / self._runs) / 1000.0 rst += f"{avg_dur: 10.3f}{(self._slowest / 1000.0): 10.3f}" if self.period != None: rst += f"{avg_late: 10.3f}{(self._latest / 1000.0): 10.3f}" return rst
# ============================================================================= ## A list of tasks used internally by the task scheduler. # This class holds the list of tasks which will be run by the task scheduler. # The task list is usually not directly used by the programmer except when # tasks are added to it and the scheduler is called. An example showing the # use of the task list is given in the last few lines of the documentation # for class @c Task. # # The task list is sorted by priority so that the scheduler can efficiently # look through the list to find the highest priority task which is ready to # run at any given time. Tasks can also be scheduled in a simpler # "round-robin" fashion.
[docs] class TaskList: ## Initialize the task list. This creates the list of priorities in # which tasks will be organized by priority. def __init__(self): ## The list of priority lists. Each priority for which at least one # task has been created has a list whose first element is a task # priority and whose other elements are references to task objects at # that priority. self.pri_list = [] ## Remove all tasks from the scheduler so old task objects don't persist # across REPL reruns after a KeyboardInterrupt.
[docs] def clear(self): self.pri_list.clear()
## Append a task to the task list. The list will be sorted by task # priorities so that the scheduler can quickly find the highest priority # task which is ready to run at any given time. # @param task The task to be appended to the list
[docs] def append(self, task): # See if there's a tasklist with the given priority in the main list new_pri = task.priority for pri in self.pri_list: # If a tasklist with this priority exists, add this task to it. if pri[0] == new_pri: pri.append(task) break # If the priority isn't in the list, this else clause starts a new # priority list with this task as first one. A priority list has the # priority as element 0, an index into the list of tasks (used for # round-robin scheduling those tasks) as the second item, and tasks # after those else: self.pri_list.append([new_pri, 2, task]) # Make sure the main list (of lists at each priority) is sorted self.pri_list.sort(key=lambda pri: pri[0], reverse=True)
## Run tasks in order, ignoring the tasks' priorities. # # This scheduling method runs tasks in a round-robin fashion. Each # time it is called, it goes through the list of tasks and gives each of # them a chance to run. Although this scheduler first runs higher priority # tasks first, that has no significant effect in the long run, as all the # tasks are given a chance to run each time through the list, and it takes # about the same amount of time before each is given a chance to run # again. @micropython.native def rr_sched(self): # For each priority level, run all tasks at that level for pri in self.pri_list: for task in pri[2:]: task.schedule() ## Run tasks according to their priorities. # # This scheduler runs tasks in a priority based fashion. Each time it is # called, it finds the highest priority task which is ready to run and # calls that task's @c run() method. @micropython.native def pri_sched(self): # Go down the list of priorities, beginning with the highest for pri in self.pri_list: # Within each priority list, run tasks in round-robin order # Each priority list is [priority, index, task, task, ...] where # index is the index of the next task in the list to be run tries = 2 length = len(pri) while tries < length: ran = pri[pri[1]].schedule() tries += 1 pri[1] += 1 if pri[1] >= length: pri[1] = 2 if ran: return ## Create some diagnostic text showing the tasks in the task list. def __repr__(self): ret_str = 'TASK PRI PERIOD RUNS AVG DUR MAX ' \ 'DUR AVG LATE MAX LATE\n' for pri in self.pri_list: for task in pri[2:]: ret_str += str(task) + '\n' return ret_str
## This is @b the main task list which is created for scheduling when # @c cotask.py is imported into a program. task_list = TaskList()