The Output Loop¶
During any data collection, testing or replay, the Backend runs Backend.output_loop()
, which is a large loop that processes raw data, computes decoder output, controls the prosthetic or simulator, and computes any gradient updates or movement detection updates. Understanding this function is essential to understanding the operation of the Backend, so we will walk through this function in this article.
Before walking through the output loop, it is important to note that output_loop()
takes an OutputParams
object as its first argument. This object simply holds a bunch of parameters used by the output_loop()
. Backend.init_output()
generates this params object, so this must be called before calling output_loop()
. Backend.prompted_move()
is a good example of a function that uses init_output()
and output_loop()
. Another important note is that usually the same OutputParams
object should be used for multiple looped calls of output_loop()
. In prompted_move()
, we generate the params object using init_output()
, and then we call output_loop()
with that object in a loop over all the movements in our runner (either a Collector
or Tester
). output_loop()
does edit the params object, which is why we must save that params object between those looped calls to output_loop()
.
Alright now lets walk through the output loop. Backend.output_loop()
starts by just initializing some variables and getting the current movement trajectory from the params object, so we’ll skip over that and start right at the actual output loop.
The output loop runs as long as the Backend’s stop_task_flag
is not set. This flag is used to cancel long running Backend operations.
At the beginning of the loop, we get the time so that we compute the processing time for various parts of this loop. Then we begin by processing raw data if necessary. This includes calling the process_raw()
function, which is described in detail in [[Raw Data]], and calling read_finger_data()
which reads kinematic data from the glove.
# OUTPUT LOOP
while not self.stop_task_flag:
loop_start = time.perf_counter()
# Process inputs if needed
if params.process_inputs:
self.process_raw(params.data_idx) # update z_f
if params.subject == SubjectType.INTACT:
self._read_finger_data(idx=params.data_idx) # update finger_data
self.data.time_data[params.data_idx] = time.perf_counter() # update time_data
Next, if necessary, we play a sound to indicate the start or end of a prompted movement.
# play noise if needed
if params.play_sound and period != SampleType.REST:
if params.traj_idx in params.high_idxs[mov_idx]:
playsound('bin/high.wav', False)
if params.traj_idx in params.low_idxs[mov_idx]:
playsound('bin/low.wav', False)
Next, if we are performing a test or replay, we call self._update_buffer()
which basically appends the current timestep’s raw data to the RawData.full_buffer
. This is necessary for the Plotter process to receive the correct data to graph.
# update buffer if needed (makes plotter work during replays and file testing)
if params.control != ControlType.NONE \
and params.movement_type in [MovementType.FILE_TEST, MovementType.REPLAY]:
self._update_buffer(params.data_idx)
Next, we save the previous decoder output, and use the current runner (either Collector
or Tester
) to compute the new decoder output.
We convert this output from velocity control to positional control if necessary. Note that x_hat
is the decoder prediction and x_d
is the desired decoder prediction (which may sometimes be the same as x_hat
if we have no desired prediction).
# Set out_x and out_x_hat
prv_x_hat = np.squeeze(x_hat)
prv_out_x_hat = np.squeeze(out_x_hat)
x_d, x_hat = params.runner.compute_output(self, traj, params, _predict)
# convert to differential if necessary
if params.differential:
x_hat += prv_x_hat
x_hat = np.squeeze(x_hat)
# start output timing
output_start = time.perf_counter() if self.timing else 0
Then we update the online movement detection algorithm if necessary. Movement Detection allows the decoder to adapt over time to the user, and requires a much longer article to discuss.
# mov detection update - do before the clipping, clamp and latching
if params.online_mov_detection and period != SampleType.PROMPT:
# make sure to grab the right features depending on whether this is a source test
z_ = self.data.z_f[params.data_idx, :]
self._mov_detect_update(z_, x_hat, x_d=x_d,
wait=(params.movement_type == MovementType.FILE_TEST))
Now we apply output filters to x_hat
. These filters include latching, clamping and a speed limit.
The filtered x_hat
becomes out_x_hat
, because this will be the output to the (simulated or real) prosthetic limb.
# clamp and latching and speed limit and clipping for output to display or controllers
out_x_hat = utils.clamp(utils.latching_filter(prv_out_x_hat, x_hat, self.output_settings.latch_c),
self.output_settings.clamp_th)
if self.output_settings.speed_limit:
out_x_hat = np.clip(out_x_hat, prv_out_x_hat - self.output_settings.speed_limit,
prv_out_x_hat + self.output_settings.speed_limit)
out_x_hat = np.clip(out_x_hat, -1, 1)
Here we have small case for when we are in Free Control mode that uses the kinematic glove data as x_d
if necessary.
# in control mode, use glove data for x_d if we have intact subject, otherwise use x_hat
if params.movement_type == MovementType.CONTROL:
if params.subject == SubjectType.INTACT:
x_d = self.data.finger_data[params.data_idx, :]
out_x_d = np.copy(x_d)
else:
x_d = x_hat
out_x_d = out_x_hat
Now we can finally send our decoder output to either the simulated limb or an actual prosthetic hand. We time this as well because it can be useful for debugging.
# send output to display
if params.control == ControlType.MUJOCO_HAND and self.mujoco is not None:
command = mjc_command(out_x_d, out_x_hat)
self.mujoco.hx_update(command, PyMJC.HxSensor())
elif params.control == ControlType.ROBOTIC_HAND:
Robot.send_servo_position(self.robot, out_x_hat)
# end output timing
if self.timing:
self.output_timing_history.append(time.perf_counter() - output_start)
Next we handle any gradient descent updates for the decoder. This is not used much anymore since we are replacing it with the Movement Detection algorithms.
# gradient decoder update (runs during HOLD but not PROMPT)
if params.gradient_descent and period != SampleType.PROMPT:
params.x_d_batch = np.concatenate((params.x_d_batch, [x_d]), axis=0)
params.z_f_batch = np.concatenate((params.z_f_batch, [self.data.z_f[params.data_idx, :]]), axis=0)
if np.shape(params.x_d_batch)[0] == self.decoder.batch_size:
self.train_decoder(x=params.x_d_batch, z=params.z_f_batch, gradient=True)
params.x_d_batch = np.zeros((0, g.raw_settings.n_dof))
params.z_f_batch = np.zeros((0, self._num_features()))
Now we save the movement data to self.data
, the Backend’s Data
object which holds the decoder’s input and output, among other things.
# save movement data if necessary
if params.movement_type != MovementType.REPLAY:
try:
self.data.x_hat[params.data_idx, :] = x_hat
except IndexError:
self.data.double_length()
self.data.x_hat[params.data_idx, :] = x_hat
if params.save:
self.data.x_d[params.data_idx, :] = x_d
self.data.mov_idxs[params.data_idx] = mov_idx
self.data.rep_idxs[params.data_idx] = rep_idx
self.data.period[params.data_idx] = period.value
self.async_gui_updates.update({'data_num_samples': params.data_idx + 1,
'data_num_features': self.data.z_f.shape[1]})
Here we first update some of the output parameters corresponding to movement and data indices. Then we print out the current data index if necessary. This is useful for the user to see the progress through a dataset.
# target next step
params.data_idx += self.idx_step
params.traj_idx += self.idx_step
# print index
if params.print_idx:
# when there's no control output, only print every 5% of the way through
_printout = f"{params.data_idx}/{params.output_length}"
if params.data_idx == params.output_length:
self._print(len(params.prev_idx_printout) * '\b' + _printout) # show end for the last one
elif params.control != ControlType.NONE or params.data_idx % (params.output_length // 20) == 0:
self._print(len(params.prev_idx_printout) * '\b' + _printout, end='', show_end=False)
params.prev_idx_printout = str(_printout)
Lastly, we check if we are done with the loop. The loop ends if either the current trajectory is over or if we’ve reached the last data index.
If we aren’t done, we update the FPS calculation, and we sleep so as to make this loop run at the period defined by globals.raw_settings.timestep
.
# break if we're done
if params.traj is not None: # prompted, check TRAJ index against traj
if params.traj_idx == np.shape(traj)[1]:
params.traj_idx = 0
break
else: # unprompted, check DATA index against output_length
if params.data_idx == params.output_length:
break
# otherwise sleep until end of current timestep
elapsed = time.perf_counter() - loop_start
self._update_fps_buffer(elapsed)
time.sleep(max(g.raw_settings.timestep - elapsed, 0))