Spaces:
Running
on
Zero
Running
on
Zero
| import torch | |
| from torch.utils import data | |
| import numpy as np | |
| from os.path import join as pjoin | |
| import random | |
| import codecs as cs | |
| from tqdm.auto import tqdm | |
| from utils.word_vectorizer import WordVectorizer, POS_enumerator | |
| from utils.motion_process import recover_from_ric | |
| class Text2MotionDataset(data.Dataset): | |
| """ | |
| Dataset for Text2Motion generation task. | |
| """ | |
| data_root = "" | |
| min_motion_len = 40 | |
| joints_num = None | |
| dim_pose = None | |
| max_motion_length = 196 | |
| def __init__(self, opt, split, mode="train", accelerator=None): | |
| self.max_text_len = getattr(opt, "max_text_len", 20) | |
| self.unit_length = getattr(opt, "unit_length", 4) | |
| self.mode = mode | |
| motion_dir = pjoin(self.data_root, "new_joint_vecs") | |
| text_dir = pjoin(self.data_root, "texts") | |
| if mode not in ["train", "eval", "gt_eval", "xyz_gt", "hml_gt"]: | |
| raise ValueError( | |
| f"Mode '{mode}' is not supported. Please use one of: 'train', 'eval', 'gt_eval', 'xyz_gt','hml_gt'." | |
| ) | |
| mean, std = None, None | |
| if mode == "gt_eval": | |
| print(pjoin(opt.eval_meta_dir, f"{opt.dataset_name}_std.npy")) | |
| # used by T2M models (including evaluators) | |
| mean = np.load(pjoin(opt.eval_meta_dir, f"{opt.dataset_name}_mean.npy")) | |
| std = np.load(pjoin(opt.eval_meta_dir, f"{opt.dataset_name}_std.npy")) | |
| elif mode in ["eval"]: | |
| print(pjoin(opt.meta_dir, "std.npy")) | |
| # used by our models during inference | |
| mean = np.load(pjoin(opt.meta_dir, "mean.npy")) | |
| std = np.load(pjoin(opt.meta_dir, "std.npy")) | |
| else: | |
| # used by our models during train | |
| mean = np.load(pjoin(self.data_root, "Mean.npy")) | |
| std = np.load(pjoin(self.data_root, "Std.npy")) | |
| if mode == "eval": | |
| # used by T2M models (including evaluators) | |
| # this is to translate ours norms to theirs | |
| self.mean_for_eval = np.load( | |
| pjoin(opt.eval_meta_dir, f"{opt.dataset_name}_mean.npy") | |
| ) | |
| self.std_for_eval = np.load( | |
| pjoin(opt.eval_meta_dir, f"{opt.dataset_name}_std.npy") | |
| ) | |
| if mode in ["gt_eval", "eval"]: | |
| self.w_vectorizer = WordVectorizer(opt.glove_dir, "our_vab") | |
| data_dict = {} | |
| id_list = [] | |
| split_file = pjoin(self.data_root, f"{split}.txt") | |
| with cs.open(split_file, "r") as f: | |
| for line in f.readlines(): | |
| id_list.append(line.strip()) | |
| if opt.debug == True: | |
| id_list = id_list[:1000] | |
| new_name_list = [] | |
| length_list = [] | |
| for name in tqdm( | |
| id_list, | |
| disable=( | |
| not accelerator.is_local_main_process | |
| if accelerator is not None | |
| else False | |
| ), | |
| ): | |
| motion = np.load(pjoin(motion_dir, name + ".npy")) | |
| if (len(motion)) < self.min_motion_len or (len(motion) >= 200): | |
| continue | |
| text_data = [] | |
| flag = False | |
| with cs.open(pjoin(text_dir, name + ".txt")) as f: | |
| for line in f.readlines(): | |
| text_dict = {} | |
| line_split = line.strip().split("#") | |
| caption = line_split[0] | |
| try: | |
| tokens = line_split[1].split(" ") | |
| f_tag = float(line_split[2]) | |
| to_tag = float(line_split[3]) | |
| f_tag = 0.0 if np.isnan(f_tag) else f_tag | |
| to_tag = 0.0 if np.isnan(to_tag) else to_tag | |
| except: | |
| tokens = ["a/NUM", "a/NUM"] | |
| f_tag = 0.0 | |
| to_tag = 8.0 | |
| text_dict["caption"] = caption | |
| text_dict["tokens"] = tokens | |
| if f_tag == 0.0 and to_tag == 0.0: | |
| flag = True | |
| text_data.append(text_dict) | |
| else: | |
| n_motion = motion[int(f_tag * 20) : int(to_tag * 20)] | |
| if (len(n_motion)) < self.min_motion_len or ( | |
| len(n_motion) >= 200 | |
| ): | |
| continue | |
| new_name = random.choice("ABCDEFGHIJKLMNOPQRSTUVW") + "_" + name | |
| while new_name in data_dict: | |
| new_name = ( | |
| random.choice("ABCDEFGHIJKLMNOPQRSTUVW") + "_" + name | |
| ) | |
| data_dict[new_name] = { | |
| "motion": n_motion, | |
| "length": len(n_motion), | |
| "text": [text_dict], | |
| } | |
| new_name_list.append(new_name) | |
| length_list.append(len(n_motion)) | |
| if flag: | |
| data_dict[name] = { | |
| "motion": motion, | |
| "length": len(motion), | |
| "text": text_data, | |
| } | |
| new_name_list.append(name) | |
| length_list.append(len(motion)) | |
| name_list, length_list = zip( | |
| *sorted(zip(new_name_list, length_list), key=lambda x: x[1]) | |
| ) | |
| if mode == "train": | |
| if opt.dataset_name != "amass": | |
| joints_num = self.joints_num | |
| # root_rot_velocity (B, seq_len, 1) | |
| std[0:1] = std[0:1] / opt.feat_bias | |
| # root_linear_velocity (B, seq_len, 2) | |
| std[1:3] = std[1:3] / opt.feat_bias | |
| # root_y (B, seq_len, 1) | |
| std[3:4] = std[3:4] / opt.feat_bias | |
| # ric_data (B, seq_len, (joint_num - 1)*3) | |
| std[4 : 4 + (joints_num - 1) * 3] = ( | |
| std[4 : 4 + (joints_num - 1) * 3] / 1.0 | |
| ) | |
| # rot_data (B, seq_len, (joint_num - 1)*6) | |
| std[4 + (joints_num - 1) * 3 : 4 + (joints_num - 1) * 9] = ( | |
| std[4 + (joints_num - 1) * 3 : 4 + (joints_num - 1) * 9] / 1.0 | |
| ) | |
| # local_velocity (B, seq_len, joint_num*3) | |
| std[ | |
| 4 + (joints_num - 1) * 9 : 4 + (joints_num - 1) * 9 + joints_num * 3 | |
| ] = ( | |
| std[ | |
| 4 | |
| + (joints_num - 1) * 9 : 4 | |
| + (joints_num - 1) * 9 | |
| + joints_num * 3 | |
| ] | |
| / 1.0 | |
| ) | |
| # foot contact (B, seq_len, 4) | |
| std[4 + (joints_num - 1) * 9 + joints_num * 3 :] = ( | |
| std[4 + (joints_num - 1) * 9 + joints_num * 3 :] / opt.feat_bias | |
| ) | |
| assert 4 + (joints_num - 1) * 9 + joints_num * 3 + 4 == mean.shape[-1] | |
| if accelerator is not None and accelerator.is_main_process: | |
| np.save(pjoin(opt.meta_dir, "mean.npy"), mean) | |
| np.save(pjoin(opt.meta_dir, "std.npy"), std) | |
| self.mean = mean | |
| self.std = std | |
| self.data_dict = data_dict | |
| self.name_list = name_list | |
| def inv_transform(self, data): | |
| return data * self.std + self.mean | |
| def __len__(self): | |
| return len(self.data_dict) | |
| def __getitem__(self, idx): | |
| data = self.data_dict[self.name_list[idx]] | |
| motion, m_length, text_list = data["motion"], data["length"], data["text"] | |
| # Randomly select a caption | |
| text_data = random.choice(text_list) | |
| caption = text_data["caption"] | |
| "Z Normalization" | |
| if self.mode not in ["xyz_gt", "hml_gt"]: | |
| motion = (motion - self.mean) / self.std | |
| "crop motion" | |
| if self.mode in ["eval", "gt_eval"]: | |
| # Crop the motions in to times of 4, and introduce small variations | |
| if self.unit_length < 10: | |
| coin2 = np.random.choice(["single", "single", "double"]) | |
| else: | |
| coin2 = "single" | |
| if coin2 == "double": | |
| m_length = (m_length // self.unit_length - 1) * self.unit_length | |
| elif coin2 == "single": | |
| m_length = (m_length // self.unit_length) * self.unit_length | |
| idx = random.randint(0, len(motion) - m_length) | |
| motion = motion[idx : idx + m_length] | |
| elif m_length >= self.max_motion_length: | |
| idx = random.randint(0, len(motion) - self.max_motion_length) | |
| motion = motion[idx : idx + self.max_motion_length] | |
| m_length = self.max_motion_length | |
| "pad motion" | |
| if m_length < self.max_motion_length: | |
| motion = np.concatenate( | |
| [ | |
| motion, | |
| np.zeros((self.max_motion_length - m_length, motion.shape[1])), | |
| ], | |
| axis=0, | |
| ) | |
| assert len(motion) == self.max_motion_length | |
| if self.mode in ["gt_eval", "eval"]: | |
| "word embedding for text-to-motion evaluation" | |
| tokens = text_data["tokens"] | |
| if len(tokens) < self.max_text_len: | |
| # pad with "unk" | |
| tokens = ["sos/OTHER"] + tokens + ["eos/OTHER"] | |
| sent_len = len(tokens) | |
| tokens = tokens + ["unk/OTHER"] * (self.max_text_len + 2 - sent_len) | |
| else: | |
| # crop | |
| tokens = tokens[: self.max_text_len] | |
| tokens = ["sos/OTHER"] + tokens + ["eos/OTHER"] | |
| sent_len = len(tokens) | |
| pos_one_hots = [] | |
| word_embeddings = [] | |
| for token in tokens: | |
| word_emb, pos_oh = self.w_vectorizer[token] | |
| pos_one_hots.append(pos_oh[None, :]) | |
| word_embeddings.append(word_emb[None, :]) | |
| pos_one_hots = np.concatenate(pos_one_hots, axis=0) | |
| word_embeddings = np.concatenate(word_embeddings, axis=0) | |
| return ( | |
| word_embeddings, | |
| pos_one_hots, | |
| caption, | |
| sent_len, | |
| motion, | |
| m_length, | |
| "_".join(tokens), | |
| ) | |
| elif self.mode in ["xyz_gt"]: | |
| "Convert motion hml representation to skeleton points xyz" | |
| # 1. Use kn to get the keypoints position (the padding position after kn is all zero) | |
| motion = torch.from_numpy(motion).float() | |
| pred_joints = recover_from_ric( | |
| motion, self.joints_num | |
| ) # (nframe, njoints, 3) | |
| # 2. Put on Floor (Y axis) | |
| floor_height = pred_joints.min(dim=0)[0].min(dim=0)[0][1] | |
| pred_joints[:, :, 1] -= floor_height | |
| return pred_joints | |
| return caption, motion, m_length | |
| class HumanML3D(Text2MotionDataset): | |
| def __init__(self, opt, split="train", mode="train", accelerator=None): | |
| self.data_root = "./data/HumanML3D" | |
| self.min_motion_len = 40 | |
| self.joints_num = 22 | |
| self.dim_pose = 263 | |
| self.max_motion_length = 196 | |
| if accelerator: | |
| accelerator.print( | |
| "\n Loading %s mode HumanML3D %s dataset ..." % (mode, split) | |
| ) | |
| else: | |
| print("\n Loading %s mode HumanML3D dataset ..." % mode) | |
| super(HumanML3D, self).__init__(opt, split, mode, accelerator) | |
| class KIT(Text2MotionDataset): | |
| def __init__(self, opt, split="train", mode="train", accelerator=None): | |
| self.data_root = "./data/KIT-ML" | |
| self.min_motion_len = 24 | |
| self.joints_num = 21 | |
| self.dim_pose = 251 | |
| self.max_motion_length = 196 | |
| if accelerator: | |
| accelerator.print("\n Loading %s mode KIT %s dataset ..." % (mode, split)) | |
| else: | |
| print("\n Loading %s mode KIT dataset ..." % mode) | |
| super(KIT, self).__init__(opt, split, mode, accelerator) | |