Raw Data Processing¶
Raw Data¶
The raw EMG or EEG data used by the Backend is stored in the globals.raw_data
variable protected by the globals.raw_data_lock
mutex. This is an instance of the RawData
class defined in python/src/backend/data.py
. Let’s begin by discussed how the raw data is stored in this object.
RawData
has three instance variables used to store data: full_buffer
, timestep_list
, and current_timestep
.
full_buffer
always stores the most recent N raw data samples, where N is set by RawDataSettings.buffer_size
. This buffer is used for plotting the raw data, filtering it, and for computing features as input to the decoder. Note that when we say it stores the ‘most recent’ samples, this is in reference to the current mode of operation. If we were collecting raw data, the newest raw samples are appended to this buffer. However, if we were refiltering raw data, then the full_buffer
would the latest samples up the current filtering timestep.
Collecting Raw Data¶
During raw data collection, when a new raw data sample is received from OpenBCI, the Backend’s EMG Thread appends the new sample to both the current_timestep
and the full_buffer
. Every backend.globals.raw_data_settings.timestep
seconds, the current_timestep
buffer is appended the timestep_list
and then cleared. In this way, by the end of data collection, the timestep_list
is a list of numpy arrays, where each of those subarrays contains the raw data samples during a single timestep of the decoder. So for example if the raw sample rate is 1000Hz and the decoder’s timesteps is 0.033 (30Hz), then each of the subarrays in the timestep_list
would hold on average 33 or 34 raw data samples.
Refiltering Raw Data¶
Refiltering raw data, which also recomputes the features, is done in Backend.refilter_data()
. While refiltering, we iterate through the timestep_list
, appending each timestep’s worth of raw data to the full_buffer
so that we can do the filtering and feature calculations on the full_buffer
. Note that refiltering does not affect the data in the backend.globals.raw_data
object.
Raw Processing¶
During raw data collection, refiltering or replay, the Backend is running its Backend.output_loop()
, which calls Backend.process_raw()
every globals.raw_data_settings.timestep
seconds. This process_raw()
function is what processes the newly added raw data in the global RawData
object. Let’s walk through this function.
As shown above, first we grab the raw data’s mutex so we can edit it. Then we copy the full_buffer
into snapshot
so we can do computations on it later. Then, if the save_raw
argument is True, we call RawData.end_current_timestep()
which simply appends the current_timestep
to the end of timestep_list
. Note that if the timestep_list
is empty, then we simply save the full_buffer
as the first timestep.
def process_raw(self, idx, recompute_x_hat=False, save_raw=True):
with g.raw_data_lock: # don't let new data come in while we're making and saving the snapshot
snapshot = g.raw_data.full_buffer.copy()
# get the new raw data from the current timestep
if save_raw:
if len(g.raw_data.timestep_list) == 0: # save the full buffer as the first timestep
g.raw_data.timestep_list.append(snapshot)
else:
g.raw_data.end_current_timestep()
Next we filter our copy of the full_buffer
using the global filter object.
filtered_data = g.filter(snapshot, timing_history=(self.filter_timing_history if self.timing else None))
Then, we loop through our list of features, self.features
, and apply each feature computation to our filtered buffer filtered_ata
by calling Feature.process()
. Note that each feature receives the entire copy of the filtered buffer to perform its computation, but each Feature typically only uses a subset of that buffer for their computations. For example, a 100 sample MAV feature would only use the most recent 100 samples in filtered_data
to compute its feature.
z_f_app = np.zeros(0)
for feat in self.features:
z_f_app = np.append(z_f_app, feat.process(filtered_data))
Now in z_f_app
, we have the computed features for the current timestep.
Next we compute the output of the autoencoder if necessary. Let’s skip this since the autoencoder is rarely used anymore.
After appending the autoencoder’s output to z_f_app
, we filter the features to remove any DC bias.
# filter if needed (BEFORE histories)
if self.feat_settings.baseline_filter_length > 0:
# push sample to top
self.feat_settings.pre_filter_z_f = np.roll(self.feat_settings.pre_filter_z_f, 1, axis=0)
self.feat_settings.pre_filter_z_f[0, :] = z_f_app
# compute minimum
m = np.nanmin(self.feat_settings.pre_filter_z_f
[:min(idx+1, self.feat_settings.baseline_filter_length), :], axis=0)
m = np.minimum(m, self.feat_settings.thresholds)
z_f_app -= m
Then we add any feature history to z_f_app
. The feature history is retrieved from self.data.z_f
, which is the buffer of computed features. The idx
arguments to process_raw()
defines which index of self.data.z_f
we are currently computing, so we can use that to get the features from the previous timesteps. The length of the feature history is given in self.feat_settings.feature_history
# IMPORTANT: feature history is appended and then state history is appended!!!
# ALSO: previous values are appended first, then values from two timesteps ago, ....
if self.feat_settings.feature_history:
num_regular_features = z_f_app.shape[0]
feat_history = np.zeros((self.feat_settings.feature_history, num_regular_features))
valid_feat_history_length = min(self.feat_settings.feature_history, idx)
if valid_feat_history_length > 0:
feat_history[-valid_feat_history_length:, :] \
= self.data.z_f[idx - valid_feat_history_length:idx, :num_regular_features]
z_f_app = np.append(z_f_app, feat_history[::-1].flatten())
After adding the feature history, we add in the state history, which is the recent values of the decoder output pulled from self.data.x_hat
. If the recompute_x_hat
argument to process_raw()
is True, this tells us that we must use the decoder to recompute the previous output.
if self.feat_settings.state_history:
if recompute_x_hat: # from file: compute previous x_hat using current decoder
if self.timing:
start_decoder_time = time.perf_counter() if self.timing else 0
self.data.x_hat[idx - 1, :] = self.decoder.predict(self.data.z_f[idx - 1, :]).flatten()
if self.timing: # finish timing the decoder
self.decoder_timing_history.append(time.perf_counter() - start_decoder_time)
# empty state history
state_history = np.zeros((self.feat_settings.state_history, g.raw_settings.n_dof))
valid_state_history_length = min(self.feat_settings.state_history, idx)
if valid_state_history_length > 0:
state_history[-valid_state_history_length:, :] = self.data.x_hat[idx-valid_state_history_length:idx, :]
# flatten state history
z_f_app = np.append(z_f_app, state_history[::-1].flatten())
Now we finally have z_f_app
as a flat buffer containing all the features and histories for the current timestep. At the end of process_raw()
, we simply insert z_f_app
into self.data.z_f
, the full feature buffer. Notice that the try/except block doubles the length of the self.data
object if necessary.
# insert into feature array
try:
self.data.z_f[idx, :] = z_f_app
except IndexError:
self.data.double_length()
self.data.z_f[idx, :] = z_f_app