Raw Data Processing

Raw Data

The raw EMG or EEG data used by the Backend is stored in the globals.raw_data variable protected by the globals.raw_data_lock mutex. This is an instance of the RawData class defined in python/src/backend/data.py. Let’s begin by discussed how the raw data is stored in this object.

RawData has three instance variables used to store data: full_buffer, timestep_list, and current_timestep.

full_buffer always stores the most recent N raw data samples, where N is set by RawDataSettings.buffer_size. This buffer is used for plotting the raw data, filtering it, and for computing features as input to the decoder. Note that when we say it stores the ‘most recent’ samples, this is in reference to the current mode of operation. If we were collecting raw data, the newest raw samples are appended to this buffer. However, if we were refiltering raw data, then the full_buffer would the latest samples up the current filtering timestep.

Collecting Raw Data

During raw data collection, when a new raw data sample is received from OpenBCI, the Backend’s EMG Thread appends the new sample to both the current_timestep and the full_buffer. Every backend.globals.raw_data_settings.timestep seconds, the current_timestep buffer is appended the timestep_list and then cleared. In this way, by the end of data collection, the timestep_list is a list of numpy arrays, where each of those subarrays contains the raw data samples during a single timestep of the decoder. So for example if the raw sample rate is 1000Hz and the decoder’s timesteps is 0.033 (30Hz), then each of the subarrays in the timestep_list would hold on average 33 or 34 raw data samples.

Refiltering Raw Data

Refiltering raw data, which also recomputes the features, is done in Backend.refilter_data(). While refiltering, we iterate through the timestep_list, appending each timestep’s worth of raw data to the full_buffer so that we can do the filtering and feature calculations on the full_buffer. Note that refiltering does not affect the data in the backend.globals.raw_data object.

Raw Processing

During raw data collection, refiltering or replay, the Backend is running its Backend.output_loop(), which calls Backend.process_raw() every globals.raw_data_settings.timestep seconds. This process_raw() function is what processes the newly added raw data in the global RawData object. Let’s walk through this function.

As shown above, first we grab the raw data’s mutex so we can edit it. Then we copy the full_buffer into snapshot so we can do computations on it later. Then, if the save_raw argument is True, we call RawData.end_current_timestep() which simply appends the current_timestep to the end of timestep_list. Note that if the timestep_list is empty, then we simply save the full_buffer as the first timestep.

def process_raw(self, idx, recompute_x_hat=False, save_raw=True):
    with g.raw_data_lock:  # don't let new data come in while we're making and saving the snapshot
        snapshot = g.raw_data.full_buffer.copy()
        # get the new raw data from the current timestep
        if save_raw:
            if len(g.raw_data.timestep_list) == 0:  # save the full buffer as the first timestep
                g.raw_data.timestep_list.append(snapshot)
            else:
               g.raw_data.end_current_timestep()

Next we filter our copy of the full_buffer using the global filter object.

    filtered_data = g.filter(snapshot, timing_history=(self.filter_timing_history if self.timing else None))

Then, we loop through our list of features, self.features, and apply each feature computation to our filtered buffer filtered_ata by calling Feature.process(). Note that each feature receives the entire copy of the filtered buffer to perform its computation, but each Feature typically only uses a subset of that buffer for their computations. For example, a 100 sample MAV feature would only use the most recent 100 samples in filtered_data to compute its feature.

    z_f_app = np.zeros(0)
    for feat in self.features:
        z_f_app = np.append(z_f_app, feat.process(filtered_data))

Now in z_f_app, we have the computed features for the current timestep. Next we compute the output of the autoencoder if necessary. Let’s skip this since the autoencoder is rarely used anymore. After appending the autoencoder’s output to z_f_app, we filter the features to remove any DC bias.

    # filter if needed (BEFORE histories)
    if self.feat_settings.baseline_filter_length > 0:
        # push sample to top
        self.feat_settings.pre_filter_z_f = np.roll(self.feat_settings.pre_filter_z_f, 1, axis=0)
        self.feat_settings.pre_filter_z_f[0, :] = z_f_app

        # compute minimum
        m = np.nanmin(self.feat_settings.pre_filter_z_f
                        [:min(idx+1, self.feat_settings.baseline_filter_length), :], axis=0)
        m = np.minimum(m, self.feat_settings.thresholds)
        z_f_app -= m

Then we add any feature history to z_f_app. The feature history is retrieved from self.data.z_f, which is the buffer of computed features. The idx arguments to process_raw() defines which index of self.data.z_f we are currently computing, so we can use that to get the features from the previous timesteps. The length of the feature history is given in self.feat_settings.feature_history

    # IMPORTANT: feature history is appended and then state history is appended!!!
    # ALSO: previous values are appended first, then values from two timesteps ago, ....
    if self.feat_settings.feature_history:
        num_regular_features = z_f_app.shape[0]
        feat_history = np.zeros((self.feat_settings.feature_history, num_regular_features))
        valid_feat_history_length = min(self.feat_settings.feature_history, idx)

        if valid_feat_history_length > 0:
            feat_history[-valid_feat_history_length:, :] \
                = self.data.z_f[idx - valid_feat_history_length:idx, :num_regular_features]

        z_f_app = np.append(z_f_app, feat_history[::-1].flatten())

After adding the feature history, we add in the state history, which is the recent values of the decoder output pulled from self.data.x_hat. If the recompute_x_hat argument to process_raw() is True, this tells us that we must use the decoder to recompute the previous output.

    if self.feat_settings.state_history:
        if recompute_x_hat:  # from file: compute previous x_hat using current decoder
            if self.timing:
                start_decoder_time = time.perf_counter() if self.timing else 0

            self.data.x_hat[idx - 1, :] = self.decoder.predict(self.data.z_f[idx - 1, :]).flatten()

            if self.timing:  # finish timing the decoder
                self.decoder_timing_history.append(time.perf_counter() - start_decoder_time)

        # empty state history
        state_history = np.zeros((self.feat_settings.state_history, g.raw_settings.n_dof))
        valid_state_history_length = min(self.feat_settings.state_history, idx)

        if valid_state_history_length > 0:
            state_history[-valid_state_history_length:, :] = self.data.x_hat[idx-valid_state_history_length:idx, :]

        # flatten state history
        z_f_app = np.append(z_f_app, state_history[::-1].flatten())

Now we finally have z_f_app as a flat buffer containing all the features and histories for the current timestep. At the end of process_raw(), we simply insert z_f_app into self.data.z_f, the full feature buffer. Notice that the try/except block doubles the length of the self.data object if necessary.

    # insert into feature array
    try:
        self.data.z_f[idx, :] = z_f_app
    except IndexError:
        self.data.double_length()
        self.data.z_f[idx, :] = z_f_app