The Output Loop

During any data collection, testing or replay, the Backend runs Backend.output_loop(), which is a large loop that processes raw data, computes decoder output, controls the prosthetic or simulator, and computes any gradient updates or movement detection updates. Understanding this function is essential to understanding the operation of the Backend, so we will walk through this function in this article.

Before walking through the output loop, it is important to note that output_loop() takes an OutputParams object as its first argument. This object simply holds a bunch of parameters used by the output_loop(). Backend.init_output() generates this params object, so this must be called before calling output_loop(). Backend.prompted_move() is a good example of a function that uses init_output() and output_loop(). Another important note is that usually the same OutputParams object should be used for multiple looped calls of output_loop(). In prompted_move(), we generate the params object using init_output(), and then we call output_loop() with that object in a loop over all the movements in our runner (either a Collector or Tester). output_loop() does edit the params object, which is why we must save that params object between those looped calls to output_loop().

Alright now lets walk through the output loop. Backend.output_loop() starts by just initializing some variables and getting the current movement trajectory from the params object, so we’ll skip over that and start right at the actual output loop.

The output loop runs as long as the Backend’s stop_task_flag is not set. This flag is used to cancel long running Backend operations. At the beginning of the loop, we get the time so that we compute the processing time for various parts of this loop. Then we begin by processing raw data if necessary. This includes calling the process_raw() function, which is described in detail in [[Raw Data]], and calling read_finger_data() which reads kinematic data from the glove.

# OUTPUT LOOP
while not self.stop_task_flag:
    loop_start = time.perf_counter()

    # Process inputs if needed
    if params.process_inputs:
        self.process_raw(params.data_idx)  # update z_f
        if params.subject == SubjectType.INTACT:
            self._read_finger_data(idx=params.data_idx)  # update finger_data
        self.data.time_data[params.data_idx] = time.perf_counter()  # update time_data

Next, if necessary, we play a sound to indicate the start or end of a prompted movement.

    # play noise if needed
    if params.play_sound and period != SampleType.REST:
        if params.traj_idx in params.high_idxs[mov_idx]:
            playsound('bin/high.wav', False)
        if params.traj_idx in params.low_idxs[mov_idx]:
            playsound('bin/low.wav', False)

Next, if we are performing a test or replay, we call self._update_buffer() which basically appends the current timestep’s raw data to the RawData.full_buffer. This is necessary for the Plotter process to receive the correct data to graph.

    # update buffer if needed (makes plotter work during replays and file testing)
    if params.control != ControlType.NONE \
            and params.movement_type in [MovementType.FILE_TEST, MovementType.REPLAY]:
        self._update_buffer(params.data_idx)

Next, we save the previous decoder output, and use the current runner (either Collector or Tester) to compute the new decoder output. We convert this output from velocity control to positional control if necessary. Note that x_hat is the decoder prediction and x_d is the desired decoder prediction (which may sometimes be the same as x_hat if we have no desired prediction).

    # Set out_x and out_x_hat
    prv_x_hat = np.squeeze(x_hat)
    prv_out_x_hat = np.squeeze(out_x_hat)
    x_d, x_hat = params.runner.compute_output(self, traj, params, _predict)

    # convert to differential if necessary
    if params.differential:
        x_hat += prv_x_hat
    x_hat = np.squeeze(x_hat)

    # start output timing
    output_start = time.perf_counter() if self.timing else 0

Then we update the online movement detection algorithm if necessary. Movement Detection allows the decoder to adapt over time to the user, and requires a much longer article to discuss.

    # mov detection update - do before the clipping, clamp and latching
    if params.online_mov_detection and period != SampleType.PROMPT:
        # make sure to grab the right features depending on whether this is a source test
        z_ = self.data.z_f[params.data_idx, :]
        self._mov_detect_update(z_, x_hat, x_d=x_d,
                                wait=(params.movement_type == MovementType.FILE_TEST))

Now we apply output filters to x_hat. These filters include latching, clamping and a speed limit. The filtered x_hat becomes out_x_hat, because this will be the output to the (simulated or real) prosthetic limb.

    # clamp and latching and speed limit and clipping for output to display or controllers
    out_x_hat = utils.clamp(utils.latching_filter(prv_out_x_hat, x_hat, self.output_settings.latch_c),
                            self.output_settings.clamp_th)
    if self.output_settings.speed_limit:
        out_x_hat = np.clip(out_x_hat, prv_out_x_hat - self.output_settings.speed_limit,
                            prv_out_x_hat + self.output_settings.speed_limit)
    out_x_hat = np.clip(out_x_hat, -1, 1)

Here we have small case for when we are in Free Control mode that uses the kinematic glove data as x_d if necessary.

    # in control mode, use glove data for x_d if we have intact subject, otherwise use x_hat
    if params.movement_type == MovementType.CONTROL:
        if params.subject == SubjectType.INTACT:
            x_d = self.data.finger_data[params.data_idx, :]
            out_x_d = np.copy(x_d)
        else:
            x_d = x_hat
            out_x_d = out_x_hat

Now we can finally send our decoder output to either the simulated limb or an actual prosthetic hand. We time this as well because it can be useful for debugging.

    # send output to display
    if params.control == ControlType.MUJOCO_HAND and self.mujoco is not None:
        command = mjc_command(out_x_d, out_x_hat)
        self.mujoco.hx_update(command, PyMJC.HxSensor())
    elif params.control == ControlType.ROBOTIC_HAND:
        Robot.send_servo_position(self.robot, out_x_hat)

    # end output timing
    if self.timing:
        self.output_timing_history.append(time.perf_counter() - output_start)

Next we handle any gradient descent updates for the decoder. This is not used much anymore since we are replacing it with the Movement Detection algorithms.

    # gradient decoder update (runs during HOLD but not PROMPT)
    if params.gradient_descent and period != SampleType.PROMPT:
        params.x_d_batch = np.concatenate((params.x_d_batch, [x_d]), axis=0)
        params.z_f_batch = np.concatenate((params.z_f_batch, [self.data.z_f[params.data_idx, :]]), axis=0)
        if np.shape(params.x_d_batch)[0] == self.decoder.batch_size:
            self.train_decoder(x=params.x_d_batch, z=params.z_f_batch, gradient=True)
            params.x_d_batch = np.zeros((0, g.raw_settings.n_dof))
            params.z_f_batch = np.zeros((0, self._num_features()))

Now we save the movement data to self.data, the Backend’s Data object which holds the decoder’s input and output, among other things.

    # save movement data if necessary
    if params.movement_type != MovementType.REPLAY:
        try:
            self.data.x_hat[params.data_idx, :] = x_hat
        except IndexError:
            self.data.double_length()
            self.data.x_hat[params.data_idx, :] = x_hat

        if params.save:
            self.data.x_d[params.data_idx, :] = x_d
            self.data.mov_idxs[params.data_idx] = mov_idx
            self.data.rep_idxs[params.data_idx] = rep_idx
            self.data.period[params.data_idx] = period.value
            self.async_gui_updates.update({'data_num_samples': params.data_idx + 1,
                                            'data_num_features': self.data.z_f.shape[1]})

Here we first update some of the output parameters corresponding to movement and data indices. Then we print out the current data index if necessary. This is useful for the user to see the progress through a dataset.

    # target next step
    params.data_idx += self.idx_step
    params.traj_idx += self.idx_step

    # print index
    if params.print_idx:
        # when there's no control output, only print every 5% of the way through
        _printout = f"{params.data_idx}/{params.output_length}"
        if params.data_idx == params.output_length:
            self._print(len(params.prev_idx_printout) * '\b' + _printout)  # show end for the last one
        elif params.control != ControlType.NONE or params.data_idx % (params.output_length // 20) == 0:
            self._print(len(params.prev_idx_printout) * '\b' + _printout, end='', show_end=False)
        params.prev_idx_printout = str(_printout)

Lastly, we check if we are done with the loop. The loop ends if either the current trajectory is over or if we’ve reached the last data index. If we aren’t done, we update the FPS calculation, and we sleep so as to make this loop run at the period defined by globals.raw_settings.timestep.

    # break if we're done
    if params.traj is not None:  # prompted, check TRAJ index against traj
        if params.traj_idx == np.shape(traj)[1]:
            params.traj_idx = 0
            break
    else:  # unprompted, check DATA index against output_length
        if params.data_idx == params.output_length:
            break

    # otherwise sleep until end of current timestep
    elapsed = time.perf_counter() - loop_start
    self._update_fps_buffer(elapsed)
    time.sleep(max(g.raw_settings.timestep - elapsed, 0))