tutorial_huggingface / dataset.py
motionsh's picture
Upload codes
732e0d9
import numpy as np
import pandas as pd
from loading.loadpickledataset import LoadPickleDataSet
from preprocessing.augmentation.gaussiannoise import GaussianNoise
from preprocessing.augmentation.imurotation import IMURotation
from preprocessing.filter_imu import FilterIMU
from preprocessing.filter_opensim import FilterOpenSim
from preprocessing.remove_outlier import remove_outlier
from preprocessing.resample import Resample
from preprocessing.segmentation.fixwindowsegmentation import FixWindowSegmentation
from preprocessing.segmentation.gaitcyclesegmentation import GaitCycleSegmentation
from preprocessing.segmentation.zeropaddingsegmentation import ZeroPaddingSegmentation
class DataSet:
def __init__(self, config, load_dataset=True):
self.config = config
self.x = []
self.y = []
self.labels = []
self.selected_trial_type = config['selected_trial_type']
self.selected_activity_label = config['selected_activity_label']
self.segmentation_method = config['segmentation_method']
if self.config['gc_dataset']:
self.segmentation_method = 'zeropadding'
self.resample = config['resample']
self.n_sample = len(self.y)
if load_dataset:
self.load_dataset()
self.train_subjects = config['train_subjects']
self.test_subjects = config['test_subjects']
self.train_activity = config['train_activity']
self.test_activity = config['test_activity']
# self.winsize = 128
self.train_dataset = {}
self.test_dataset = {}
def load_dataset(self):
getdata_handler = LoadPickleDataSet(self.config)
x, y, labels = getdata_handler.run_get_dataset()
self.x, self.y, self.labels = self.run_activity_based_filter(x, y, labels)
self._preprocess()
def _preprocess(self):
self.x, self.y, self.labels = remove_outlier(self.x, self.y, self.labels)
if self.resample:
self.x, self.y, self.labels = self.run_resample_signal(self.x, self.y, self.labels)
if self.config['opensim_filter']:
filteropensim_handler = FilterOpenSim(self.y, lowcut=6, fs=100, order=2)
self.y = filteropensim_handler.run_lowpass_filter()
if self.config['imu_filter']:
filterimu_handler = FilterIMU(self.x, lowcut=10, fs=100, order=2)
self.x = filterimu_handler.run_lowpass_filter()
def run_resample_signal(self, x, y, labels):
resample_handler = Resample(x, y, labels, 200, 100)
x, y, labels = resample_handler._run_resample()
return x, y, labels
def run_segmentation(self, x, y, labels):
if self.segmentation_method == 'fixedwindow':
segmentation_handler = FixWindowSegmentation(x, y, labels, winsize=self.config['target_padding_length'], overlap=0.5, start_over=True)
self.x, self.y, self.labels = segmentation_handler._run_segmentation()
elif self.segmentation_method == 'zeropadding':
segmentation_handler = ZeroPaddingSegmentation(x, y, labels, target_padding_length=self.config['target_padding_length'], start_over=True)
self.x, self.y, self.labels = segmentation_handler._run_segmentation()
elif self.segmentation_method == 'gaitcycle':
segmentation_handler = GaitCycleSegmentation(x, y, labels, winsize=128, overlap=0.5, start_over=True)
self.x, self.y, self.labels = segmentation_handler._run_segmentation()
if self.config['opensim_filter']:
filteropensim_handler = FilterOpenSim(self.y, lowcut=6, fs=100, order=2)
self.y = filteropensim_handler.run_lowpass_filter()
if self.config['rotation']:
imu_rotation_handler = IMURotation(knom=10)
self.x, self.y, self.labels = imu_rotation_handler.run_rotation(self.x.copy(), self.y.copy(), self.labels.copy())
if self.config['gaussian_noise']:
gaussian_noise_handler = GaussianNoise(0, .05)
self.x, self.y, self.labels = gaussian_noise_handler.run_add_noise(self.x, self.y, self.labels)
del x, y, labels
return self.x, self.y, self.labels
def run_activity_based_filter(self, x, y, label):
'''
:return: updated x, y, and labels which contains only the selected labels (activity section)
'''
updated_x = []
update_y = []
updated_label = []
s = 0
for ll, xx, yy, in zip(label, x, y):
# print(ll['subject'][0])
# print(ll['trialNum'][0])
if self.config['dataset_name']=='camargo' and ll['trialType'].isin(self.selected_trial_type).all() and self.selected_activity_label == ['all_idle']:
l_temp = ll[ll['trialType'].isin(self.selected_trial_type)]
l_temp_index = l_temp.index.values
xx_temp = xx[l_temp_index]
yy_temp = yy[l_temp_index]
updated_x.append(xx_temp)
update_y.append(yy_temp)
updated_label.append(l_temp)
elif self.config['dataset_name']=='camargo' and ll['trialType'].isin(self.selected_trial_type).all() and self.selected_activity_label == ['all']:
update_selected_activity_label = list(ll['Label'].unique())
update_selected_activity_label = [i for i in update_selected_activity_label if i not in ['idle', 'stand']]
l_temp = ll[(ll['trialType'].isin(self.selected_trial_type)) & (ll['Label'].isin(update_selected_activity_label))]
l_temp_index = l_temp.index.values
xx_temp = xx[l_temp_index]
yy_temp = yy[l_temp_index]
updated_x.append(xx_temp)
update_y.append(yy_temp)
updated_label.append(l_temp)
elif self.config['dataset_name'] == 'camargo' and ll['trialType'].isin(self.selected_trial_type).all() and self.selected_activity_label == ['all_split']:
ll_temp = ll.copy()
ll_temp['trialType2'] =ll_temp['Label']
if ll['trialType'][0] =='levelground':
# get the turn index if it's there
turn1_indx = ll_temp[ll_temp['Label'] == 'turn1'].index.values
turn2_indx = ll_temp[ll_temp['Label'] == 'turn2'].index.values
# check which turn is turn 1
if turn1_indx[0]<turn2_indx[0]:
pass
else:
turn2_indx_temp = turn1_indx
turn1_indx = turn2_indx
turn2_indx = turn2_indx_temp
# devide into two segments
seg1 = ll_temp.iloc[0:turn1_indx[-1]+1]
seg2 = ll_temp.iloc[turn2_indx[0]:]
seg1_trialType2 = seg1['trialType2'].replace({'idle': 'idle', 'stand': 'idle', 'turn1': 'idle', 'turn2': 'idle',
'stand-walk':'levelground1', 'walk':'levelground1',
'walk-stand': 'levelground1'})
seg2_trialType2 = seg2['trialType2'].replace({'idle': 'idle', 'stand': 'idle', 'turn1': 'idle','turn2': 'idle',
'stand-walk':'levelground2', 'walk':'levelground2',
'walk-stand': 'levelground2'})
ll_temp['trialType2'] = pd.concat([seg1_trialType2, seg2_trialType2])
ll = ll_temp
elif ll['trialType'][0] =='ramp':
ll_temp['trialType2'] = ll_temp['trialType2'].replace({'idle': 'idle',
'walk-rampascent': 'rampascent', 'rampascent':'rampascent','rampascent-walk': 'rampascent',
'walk-rampdescent': 'rampdescent', 'rampdescent':'rampdescent','rampdescent-walk': 'rampdescent'})
ll = ll_temp
elif ll['trialType'][0] == 'stair':
ll_temp['trialType2'] = ll_temp['trialType2'].replace({'idle': 'idle',
'walk-stairascent': 'stairascent', 'stairascent':'stairascent','stairascent-walk': 'stairascent',
'walk-stairdescent': 'stairdescent', 'stairdescent':'stairdescent','stairdescent-walk': 'stairdescent'})
ll = ll_temp
update_selected_activity_label = list(ll['trialType2'].unique())
# remove stand, idle, turn1, turn2 samples
update_selected_activity_label = [i for i in update_selected_activity_label if
i not in ['idle']]
for activity_label in update_selected_activity_label:
# if trial type == levelground ->save stand-walk and walk into one trial and walk-stand into another trial. all samples would be continues
# if ramp or stair--> save trial for ascent and descent individually
if isinstance(activity_label, str):
l_temp = ll[(ll['trialType'].isin(self.selected_trial_type)) & (ll['trialType2']==activity_label)]
l_temp_index = l_temp.index.values
xx_temp = xx[l_temp_index]
yy_temp = yy[l_temp_index]
updated_x.append(xx_temp)
update_y.append(yy_temp)
updated_label.append(l_temp)
if len(xx_temp)==0:
print(i)
elif self.config['dataset_name']=='camargo':
l_temp = ll[(ll['trialType'].isin(self.selected_trial_type)) & (ll['Label'].isin(self.selected_activity_label))]
l_temp_index = l_temp.index.values
xx_temp = xx[l_temp_index]
yy_temp = yy[l_temp_index]
updated_x.append(xx_temp)
update_y.append(yy_temp)
updated_label.append(l_temp)
elif self.config['dataset_name']=='kiha':
l_temp = ll[(ll['trialType'].isin(self.selected_trial_type))]
l_temp_index = l_temp.index.values
xx_temp = xx[l_temp_index]
yy_temp = yy[l_temp_index]
updated_x.append(xx_temp)
update_y.append(yy_temp)
updated_label.append(l_temp)
# else:
# continue
return updated_x, update_y, updated_label
def concatenate_data(self):
self.labels = pd.concat(self.labels, axis=0, ignore_index = True)
self.x = np.concatenate(self.x, axis=0)
self.y = np.concatenate(self.y, axis=0)
def run_dataset_split_loop(self):
train_labels = []
test_labels = []
train_x = []
train_y = []
test_x = []
test_y = []
for t, trial in enumerate(self.labels):
if all(trial['subject'].isin(self.train_subjects)) and all(trial['trialType2'].isin(self.train_activity)):
train_labels.append(trial)
train_x.append(self.x[t])
train_y.append(self.y[t])
elif all(trial['subject'].isin(self.test_subjects)) and all(trial['trialType2'].isin(self.test_activity)):
test_labels.append(trial)
test_x.append(self.x[t])
test_y.append(self.y[t])
self.train_dataset['x'] = train_x
self.train_dataset['y'] = train_y
self.train_dataset['labels'] = train_labels
self.test_dataset['x'] = test_x
self.test_dataset['y'] = test_y
self.test_dataset['labels'] = test_labels
return self.train_dataset, self.test_dataset
def run_dataset_split(self):
if set(self.test_subjects).issubset(self.train_subjects):
train_labels = self.labels[~self.labels['subject'].isin(self.test_subjects)]
test_labels = self.labels[(self.labels['subjects'].isin(self.test_subjects))]
else:
train_labels = self.labels[self.labels['subject'].isin(self.train_subjects)]
test_labels = self.labels[(self.labels['subject'].isin(self.test_subjects))]
print(train_labels['subject'].unique())
print(test_labels['subject'].unique())
train_index = train_labels.index.values
test_index = test_labels.index.values
print('training length', len(train_index))
print('test length', len(test_index))
train_x = self.x[train_index]
train_y = self.y[train_index]
# self.train_dataset['x'] = train_x.reshape([int(train_x.shape[0]/self.config['target_padding_length']), self.config['target_padding_length'], train_x.shape[1]])
# self.train_dataset['y'] = train_y.reshape([int(train_y.shape[0]/self.config['target_padding_length']), self.config['target_padding_length'], train_y.shape[1]])
self.train_dataset['x'] = train_x
self.train_dataset['y'] = train_y
self.train_dataset['labels'] = train_labels.reset_index(drop=True)
test_x = self.x[test_index]
test_y = self.y[test_index]
# self.test_dataset['x'] = test_x.reshape([int(test_x.shape[0]/self.config['target_padding_length']), self.config['target_padding_length'], test_x.shape[1]])
# self.test_dataset['y'] = test_y.reshape([int(test_y.shape[0]/self.config['target_padding_length']), self.config['target_padding_length'], test_y.shape[1]])
self.test_dataset['x'] = test_x
self.test_dataset['y'] = test_y
self.test_dataset['labels'] = test_labels.reset_index(drop=True)
del train_labels, test_labels, train_x, train_y, test_x, test_y
return self.train_dataset, self.test_dataset