# Raw Data Processing ## Raw Data The raw EMG or EEG data used by the Backend is stored in the `globals.raw_data` variable protected by the `globals.raw_data_lock` mutex. This is an instance of the `RawData` class defined in `python/src/backend/data.py`. Let's begin by discussed how the raw data is stored in this object. `RawData` has three instance variables used to store data: `full_buffer`, `timestep_list`, and `current_timestep`. `full_buffer` always stores the most recent N raw data samples, where N is set by `RawDataSettings.buffer_size`. This buffer is used for plotting the raw data, filtering it, and for computing features as input to the decoder. Note that when we say it stores the 'most recent' samples, this is in reference to the current mode of operation. If we were collecting raw data, the newest raw samples are appended to this buffer. However, if we were refiltering raw data, then the `full_buffer` would the latest samples up the current filtering timestep. ### Collecting Raw Data During raw data collection, when a new raw data sample is received from OpenBCI, the Backend's EMG Thread appends the new sample to both the `current_timestep` and the `full_buffer`. Every `backend.globals.raw_data_settings.timestep` seconds, the `current_timestep` buffer is appended the `timestep_list` and then cleared. In this way, by the end of data collection, the `timestep_list` is a list of numpy arrays, where each of those subarrays contains the raw data samples during a single timestep of the decoder. So for example if the raw sample rate is 1000Hz and the decoder's timesteps is 0.033 (30Hz), then each of the subarrays in the `timestep_list` would hold on average 33 or 34 raw data samples. ### Refiltering Raw Data Refiltering raw data, which also recomputes the features, is done in `Backend.refilter_data()`. While refiltering, we iterate through the `timestep_list`, appending each timestep's worth of raw data to the `full_buffer` so that we can do the filtering and feature calculations on the `full_buffer`. Note that refiltering does not affect the data in the `backend.globals.raw_data` object. ## Raw Processing During raw data collection, refiltering or replay, the Backend is running its `Backend.output_loop()`, which calls `Backend.process_raw()` every `globals.raw_data_settings.timestep` seconds. This `process_raw()` function is what processes the newly added raw data in the global `RawData` object. Let's walk through this function. As shown above, first we grab the raw data's mutex so we can edit it. Then we copy the `full_buffer` into `snapshot` so we can do computations on it later. Then, if the `save_raw` argument is True, we call `RawData.end_current_timestep()` which simply appends the `current_timestep` to the end of `timestep_list`. Note that if the `timestep_list` is empty, then we simply save the `full_buffer` as the first timestep. ```python def process_raw(self, idx, recompute_x_hat=False, save_raw=True): with g.raw_data_lock: # don't let new data come in while we're making and saving the snapshot snapshot = g.raw_data.full_buffer.copy() # get the new raw data from the current timestep if save_raw: if len(g.raw_data.timestep_list) == 0: # save the full buffer as the first timestep g.raw_data.timestep_list.append(snapshot) else: g.raw_data.end_current_timestep() ``` Next we filter our copy of the `full_buffer` using the global filter object. ```python filtered_data = g.filter(snapshot, timing_history=(self.filter_timing_history if self.timing else None)) ``` Then, we loop through our list of features, `self.features`, and apply each feature computation to our filtered buffer `filtered_ata` by calling `Feature.process()`. Note that each feature receives the entire copy of the filtered buffer to perform its computation, but each Feature typically only uses a subset of that buffer for their computations. For example, a 100 sample MAV feature would only use the most recent 100 samples in `filtered_data` to compute its feature. ```python z_f_app = np.zeros(0) for feat in self.features: z_f_app = np.append(z_f_app, feat.process(filtered_data)) ``` Now in `z_f_app`, we have the computed features for the current timestep. Next we compute the output of the autoencoder if necessary. Let's skip this since the autoencoder is rarely used anymore. After appending the autoencoder's output to `z_f_app`, we filter the features to remove any DC bias. ```python # filter if needed (BEFORE histories) if self.feat_settings.baseline_filter_length > 0: # push sample to top self.feat_settings.pre_filter_z_f = np.roll(self.feat_settings.pre_filter_z_f, 1, axis=0) self.feat_settings.pre_filter_z_f[0, :] = z_f_app # compute minimum m = np.nanmin(self.feat_settings.pre_filter_z_f [:min(idx+1, self.feat_settings.baseline_filter_length), :], axis=0) m = np.minimum(m, self.feat_settings.thresholds) z_f_app -= m ``` Then we add any feature history to `z_f_app`. The feature history is retrieved from `self.data.z_f`, which is the buffer of computed features. The `idx` arguments to `process_raw()` defines which index of `self.data.z_f` we are currently computing, so we can use that to get the features from the previous timesteps. The length of the feature history is given in `self.feat_settings.feature_history` ```python # IMPORTANT: feature history is appended and then state history is appended!!! # ALSO: previous values are appended first, then values from two timesteps ago, .... if self.feat_settings.feature_history: num_regular_features = z_f_app.shape[0] feat_history = np.zeros((self.feat_settings.feature_history, num_regular_features)) valid_feat_history_length = min(self.feat_settings.feature_history, idx) if valid_feat_history_length > 0: feat_history[-valid_feat_history_length:, :] \ = self.data.z_f[idx - valid_feat_history_length:idx, :num_regular_features] z_f_app = np.append(z_f_app, feat_history[::-1].flatten()) ``` After adding the feature history, we add in the state history, which is the recent values of the decoder output pulled from `self.data.x_hat`. If the `recompute_x_hat` argument to `process_raw()` is True, this tells us that we must use the decoder to recompute the previous output. ```python if self.feat_settings.state_history: if recompute_x_hat: # from file: compute previous x_hat using current decoder if self.timing: start_decoder_time = time.perf_counter() if self.timing else 0 self.data.x_hat[idx - 1, :] = self.decoder.predict(self.data.z_f[idx - 1, :]).flatten() if self.timing: # finish timing the decoder self.decoder_timing_history.append(time.perf_counter() - start_decoder_time) # empty state history state_history = np.zeros((self.feat_settings.state_history, g.raw_settings.n_dof)) valid_state_history_length = min(self.feat_settings.state_history, idx) if valid_state_history_length > 0: state_history[-valid_state_history_length:, :] = self.data.x_hat[idx-valid_state_history_length:idx, :] # flatten state history z_f_app = np.append(z_f_app, state_history[::-1].flatten()) ``` Now we finally have `z_f_app` as a flat buffer containing all the features and histories for the current timestep. At the end of `process_raw()`, we simply insert `z_f_app` into `self.data.z_f`, the full feature buffer. Notice that the try/except block doubles the length of the `self.data` object if necessary. ```python # insert into feature array try: self.data.z_f[idx, :] = z_f_app except IndexError: self.data.double_length() self.data.z_f[idx, :] = z_f_app ```