# The Output Loop During any data collection, testing or replay, the Backend runs `Backend.output_loop()`, which is a large loop that processes raw data, computes decoder output, controls the prosthetic or simulator, and computes any gradient updates or movement detection updates. Understanding this function is essential to understanding the operation of the Backend, so we will walk through this function in this article. Before walking through the output loop, it is important to note that `output_loop()` takes an `OutputParams` object as its first argument. This object simply holds a bunch of parameters used by the `output_loop()`. `Backend.init_output()` generates this params object, so this must be called before calling `output_loop()`. `Backend.prompted_move()` is a good example of a function that uses `init_output()` and `output_loop()`. Another important note is that usually the same `OutputParams` object should be used for multiple looped calls of `output_loop()`. In `prompted_move()`, we generate the params object using `init_output()`, and then we call `output_loop()` with that object in a loop over all the movements in our runner (either a `Collector` or `Tester`). `output_loop()` does edit the params object, which is why we must save that params object between those looped calls to `output_loop()`. Alright now lets walk through the output loop. `Backend.output_loop()` starts by just initializing some variables and getting the current movement trajectory from the params object, so we'll skip over that and start right at the actual output **loop**. The output loop runs as long as the Backend's `stop_task_flag` is not set. This flag is used to cancel long running Backend operations. At the beginning of the loop, we get the time so that we compute the processing time for various parts of this loop. Then we begin by processing raw data if necessary. This includes calling the `process_raw()` function, which is described in detail in [[Raw Data]], and calling `read_finger_data()` which reads kinematic data from the glove. ```python # OUTPUT LOOP while not self.stop_task_flag: loop_start = time.perf_counter() # Process inputs if needed if params.process_inputs: self.process_raw(params.data_idx) # update z_f if params.subject == SubjectType.INTACT: self._read_finger_data(idx=params.data_idx) # update finger_data self.data.time_data[params.data_idx] = time.perf_counter() # update time_data ``` Next, if necessary, we play a sound to indicate the start or end of a prompted movement. ```python # play noise if needed if params.play_sound and period != SampleType.REST: if params.traj_idx in params.high_idxs[mov_idx]: playsound('bin/high.wav', False) if params.traj_idx in params.low_idxs[mov_idx]: playsound('bin/low.wav', False) ``` Next, if we are performing a test or replay, we call `self._update_buffer()` which basically appends the current timestep's raw data to the `RawData.full_buffer`. This is necessary for the Plotter process to receive the correct data to graph. ```python # update buffer if needed (makes plotter work during replays and file testing) if params.control != ControlType.NONE \ and params.movement_type in [MovementType.FILE_TEST, MovementType.REPLAY]: self._update_buffer(params.data_idx) ``` Next, we save the previous decoder output, and use the current runner (either `Collector` or `Tester`) to compute the new decoder output. We convert this output from velocity control to positional control if necessary. Note that `x_hat` is the decoder prediction and `x_d` is the desired decoder prediction (which may sometimes be the same as `x_hat` if we have no desired prediction). ```python # Set out_x and out_x_hat prv_x_hat = np.squeeze(x_hat) prv_out_x_hat = np.squeeze(out_x_hat) x_d, x_hat = params.runner.compute_output(self, traj, params, _predict) # convert to differential if necessary if params.differential: x_hat += prv_x_hat x_hat = np.squeeze(x_hat) # start output timing output_start = time.perf_counter() if self.timing else 0 ``` Then we update the online movement detection algorithm if necessary. Movement Detection allows the decoder to adapt over time to the user, and requires a much longer article to discuss. ```python # mov detection update - do before the clipping, clamp and latching if params.online_mov_detection and period != SampleType.PROMPT: # make sure to grab the right features depending on whether this is a source test z_ = self.data.z_f[params.data_idx, :] self._mov_detect_update(z_, x_hat, x_d=x_d, wait=(params.movement_type == MovementType.FILE_TEST)) ``` Now we apply output filters to `x_hat`. These filters include latching, clamping and a speed limit. The filtered `x_hat` becomes `out_x_hat`, because this will be the output to the (simulated or real) prosthetic limb. ```python # clamp and latching and speed limit and clipping for output to display or controllers out_x_hat = utils.clamp(utils.latching_filter(prv_out_x_hat, x_hat, self.output_settings.latch_c), self.output_settings.clamp_th) if self.output_settings.speed_limit: out_x_hat = np.clip(out_x_hat, prv_out_x_hat - self.output_settings.speed_limit, prv_out_x_hat + self.output_settings.speed_limit) out_x_hat = np.clip(out_x_hat, -1, 1) ``` Here we have small case for when we are in Free Control mode that uses the kinematic glove data as `x_d` if necessary. ```python # in control mode, use glove data for x_d if we have intact subject, otherwise use x_hat if params.movement_type == MovementType.CONTROL: if params.subject == SubjectType.INTACT: x_d = self.data.finger_data[params.data_idx, :] out_x_d = np.copy(x_d) else: x_d = x_hat out_x_d = out_x_hat ``` Now we can finally send our decoder output to either the simulated limb or an actual prosthetic hand. We time this as well because it can be useful for debugging. ```python # send output to display if params.control == ControlType.MUJOCO_HAND and self.mujoco is not None: command = mjc_command(out_x_d, out_x_hat) self.mujoco.hx_update(command, PyMJC.HxSensor()) elif params.control == ControlType.ROBOTIC_HAND: Robot.send_servo_position(self.robot, out_x_hat) # end output timing if self.timing: self.output_timing_history.append(time.perf_counter() - output_start) ``` Next we handle any gradient descent updates for the decoder. This is not used much anymore since we are replacing it with the Movement Detection algorithms. ```python # gradient decoder update (runs during HOLD but not PROMPT) if params.gradient_descent and period != SampleType.PROMPT: params.x_d_batch = np.concatenate((params.x_d_batch, [x_d]), axis=0) params.z_f_batch = np.concatenate((params.z_f_batch, [self.data.z_f[params.data_idx, :]]), axis=0) if np.shape(params.x_d_batch)[0] == self.decoder.batch_size: self.train_decoder(x=params.x_d_batch, z=params.z_f_batch, gradient=True) params.x_d_batch = np.zeros((0, g.raw_settings.n_dof)) params.z_f_batch = np.zeros((0, self._num_features())) ``` Now we save the movement data to `self.data`, the Backend's `Data` object which holds the decoder's input and output, among other things. ```python # save movement data if necessary if params.movement_type != MovementType.REPLAY: try: self.data.x_hat[params.data_idx, :] = x_hat except IndexError: self.data.double_length() self.data.x_hat[params.data_idx, :] = x_hat if params.save: self.data.x_d[params.data_idx, :] = x_d self.data.mov_idxs[params.data_idx] = mov_idx self.data.rep_idxs[params.data_idx] = rep_idx self.data.period[params.data_idx] = period.value self.async_gui_updates.update({'data_num_samples': params.data_idx + 1, 'data_num_features': self.data.z_f.shape[1]}) ``` Here we first update some of the output parameters corresponding to movement and data indices. Then we print out the current data index if necessary. This is useful for the user to see the progress through a dataset. ```python # target next step params.data_idx += self.idx_step params.traj_idx += self.idx_step # print index if params.print_idx: # when there's no control output, only print every 5% of the way through _printout = f"{params.data_idx}/{params.output_length}" if params.data_idx == params.output_length: self._print(len(params.prev_idx_printout) * '\b' + _printout) # show end for the last one elif params.control != ControlType.NONE or params.data_idx % (params.output_length // 20) == 0: self._print(len(params.prev_idx_printout) * '\b' + _printout, end='', show_end=False) params.prev_idx_printout = str(_printout) ``` Lastly, we check if we are done with the loop. The loop ends if either the current trajectory is over or if we've reached the last data index. If we aren't done, we update the FPS calculation, and we sleep so as to make this loop run at the period defined by `globals.raw_settings.timestep`. ```python # break if we're done if params.traj is not None: # prompted, check TRAJ index against traj if params.traj_idx == np.shape(traj)[1]: params.traj_idx = 0 break else: # unprompted, check DATA index against output_length if params.data_idx == params.output_length: break # otherwise sleep until end of current timestep elapsed = time.perf_counter() - loop_start self._update_fps_buffer(elapsed) time.sleep(max(g.raw_settings.timestep - elapsed, 0)) ```