Spaces:
Running
Running
| """Perform inference of one model on a dataset and measure time and energy.""" | |
| from __future__ import annotations | |
| import os | |
| import json | |
| import copy | |
| import atexit | |
| from typing import Generator, Literal, Iterable, Dict | |
| import gc | |
| import numpy as np | |
| import tyro | |
| import torch | |
| import rich | |
| from rich.table import Table | |
| from fastchat.serve.inference import prepare_logits_processor | |
| from fastchat.model.model_adapter import load_model, get_conversation_template | |
| from zeus.monitor import ZeusMonitor | |
| SYSTEM_PROMPTS = { | |
| "chat": ( | |
| "A chat between a human user (prompter) and an artificial intelligence (AI) assistant. " | |
| "The assistant gives helpful, detailed, and polite answers to the user's questions. " | |
| ), | |
| "chat-concise": ( | |
| "A chat between a human user (prompter) and an artificial intelligence (AI) assistant. " | |
| "The assistant gives helpful, detailed, and polite answers to the user's questions. " | |
| "The assistant's answers are very concise. " | |
| ), | |
| "instruct": ( | |
| "Below is an instruction that describes a task. " | |
| "Write a response that appropriately completes the request. " | |
| ), | |
| "instruct-concise": ( | |
| "Below is an instruction that describes a task. " | |
| "Write a response that appropriately completes the request. " | |
| "The response should be very concise. " | |
| ), | |
| } | |
| def is_partial_stop(output: str, stop_str: str): | |
| """Check whether the output contains a partial stop str.""" | |
| for i in range(0, min(len(output), len(stop_str))): | |
| if stop_str.startswith(output[-i:]): | |
| return True | |
| return False | |
| def generate_stream( | |
| model, | |
| tokenizer, | |
| params: Dict, | |
| device: str, | |
| context_len: int = 2048, | |
| ): | |
| # Read parameters | |
| prompts = params["prompt"] | |
| temperature = float(params.get("temperature", 1.0)) | |
| repetition_penalty = float(params.get("repetition_penalty", 1.0)) | |
| top_p = float(params.get("top_p", 1.0)) | |
| top_k = int(params.get("top_k", -1)) # -1 means disable | |
| max_new_tokens = int(params.get("max_new_tokens", 256)) | |
| stop_str = params.get("stop", None) | |
| stop_token_ids = params.get("stop_token_ids", None) or [] | |
| stop_token_ids.append(tokenizer.eos_token_id) | |
| batch_size = len(prompts) | |
| # left append prompts with eos to make all input prompts the same length | |
| tokenizer.padding_side = "left" | |
| tokenizer.pad_token = tokenizer.eos_token | |
| logits_processor = prepare_logits_processor( | |
| temperature, repetition_penalty, top_p, top_k | |
| ) | |
| input_ids = tokenizer(prompts, padding=True).input_ids | |
| output_ids = list(input_ids) | |
| if model.config.is_encoder_decoder: | |
| max_src_len = context_len | |
| else: # truncate | |
| max_src_len = context_len - max_new_tokens - 8 | |
| input_ids = [input_id[-max_src_len:] for input_id in input_ids] | |
| input_len = len(input_ids[0]) | |
| if model.config.is_encoder_decoder: | |
| encoder_output = model.encoder( | |
| input_ids=torch.as_tensor(input_ids, device=device) | |
| )[0] | |
| start_ids = torch.as_tensor( | |
| [[model.generation_config.decoder_start_token_id] for _ in range(batch_size)], | |
| dtype=torch.int64, | |
| device=device, | |
| ) | |
| past_key_values = out = None | |
| stopped = np.array(batch_size*[False]) | |
| for i in range(max_new_tokens): | |
| if i == 0: # prefill | |
| if model.config.is_encoder_decoder: | |
| out = model.decoder( | |
| input_ids=start_ids, | |
| encoder_hidden_states=encoder_output, | |
| use_cache=True, | |
| ) | |
| logits = model.lm_head(out[0]) | |
| else: | |
| out = model(torch.as_tensor(input_ids, device=device), use_cache=True) | |
| logits = out.logits | |
| past_key_values = out.past_key_values | |
| else: # decoding | |
| if model.config.is_encoder_decoder: | |
| out = model.decoder( | |
| input_ids=torch.as_tensor( | |
| [[token[0]] for token in tokens], device=device | |
| ), | |
| encoder_hidden_states=encoder_output, | |
| use_cache=True, | |
| past_key_values=past_key_values, | |
| ) | |
| logits = model.lm_head(out[0]) | |
| else: | |
| out = model( | |
| input_ids=torch.as_tensor( | |
| [[token[0]] for token in tokens], device=device | |
| ), | |
| use_cache=True, | |
| past_key_values=past_key_values, | |
| ) | |
| logits = out.logits | |
| past_key_values = out.past_key_values | |
| if logits_processor: | |
| if repetition_penalty > 1.0: | |
| tmp_output_ids = torch.as_tensor(output_ids, device=logits.device) | |
| else: | |
| tmp_output_ids = None | |
| last_token_logits = logits_processor(tmp_output_ids, logits[:, -1, :]) | |
| else: | |
| last_token_logits = logits[:, -1, :] | |
| if device == "mps": | |
| # Switch to CPU by avoiding some bugs in mps backend. | |
| last_token_logits = last_token_logits.float().to("cpu") | |
| if temperature < 1e-5 or top_p < 1e-8: # greedy | |
| _, indices = torch.topk(last_token_logits, 2) | |
| tokens = [[int(token) for token in query] for query in indices.tolist()] | |
| else: | |
| probs = torch.softmax(last_token_logits, dim=-1) | |
| indices = torch.multinomial(probs, num_samples=2) | |
| tokens = [[int(token) for token in query] for query in indices.tolist()] | |
| old_stopped = stopped | |
| stopped = np.logical_or(old_stopped, np.array([True if token[0] in stop_token_ids else False for token in tokens])) | |
| output_ids = [ids + [token[0]] for ids, token in zip(output_ids, tokens)] | |
| def slice(s, pos): | |
| return s[:pos] | |
| vec_slice = np.vectorize(slice, otypes=[str]) | |
| vec_is_partial_stop = np.vectorize(is_partial_stop) | |
| # Yield the output tokens | |
| if any(stopped): | |
| tmp_output_ids = [ids[input_len:] for ids in output_ids] | |
| rfind_start = 0 | |
| output = tokenizer.batch_decode( | |
| tmp_output_ids, | |
| skip_special_tokens=True, | |
| spaces_between_special_tokens=False, | |
| clean_up_tokenization_spaces=True, | |
| ) | |
| output = np.array(output) | |
| partially_stopped = np.array(len(output_ids) * [False]) | |
| different_indices = np.empty(shape=(0,)) | |
| if stop_str: | |
| if isinstance(stop_str, str): | |
| pos_array = np.char.rfind(output, stop_str, rfind_start) | |
| find_stop = pos_array != -1 | |
| output[find_stop] = vec_slice(output[find_stop], pos_array[find_stop]) | |
| stopped = find_stop | |
| partially_stopped = vec_is_partial_stop(output, stop_str) | |
| elif isinstance(stop_str, Iterable): | |
| for each_stop in stop_str: | |
| pos_array = np.char.rfind(output, stop_str, rfind_start) | |
| find_stop = pos_array != -1 | |
| output[find_stop] = vec_slice(output[find_stop], pos_array[find_stop]) | |
| stopped = find_stop | |
| partially_stopped = partially_stopped | vec_is_partial_stop(output, each_stop) | |
| else: | |
| raise ValueError("Invalid stop field type.") | |
| # Prevent yielding partial stop sequence | |
| if not any(partially_stopped): | |
| # indicates which request in batch stopped | |
| different_indices = np.where(stopped != old_stopped)[0] | |
| stop_length = np.array([(j, i+1) for j in different_indices]) | |
| yield { | |
| "text": output, | |
| "stop_length": stop_length, | |
| } | |
| if all(stopped): | |
| break | |
| false_indices = np.where(stopped == False)[0] | |
| if any(stopped) == False: | |
| tmp_output_ids = [ids[input_len:] for ids in output_ids] | |
| output = tokenizer.batch_decode( | |
| tmp_output_ids, | |
| skip_special_tokens=True, | |
| spaces_between_special_tokens=False, | |
| clean_up_tokenization_spaces=True, | |
| ) | |
| stop_length = np.array([(i, max_new_tokens) for i in false_indices]) | |
| yield { | |
| "text": output, | |
| "stop_length": stop_length, | |
| } | |
| # Clean | |
| del past_key_values, out | |
| gc.collect() | |
| torch.cuda.empty_cache() | |
| def main( | |
| model_path: str, | |
| input_file: str = "sharegpt/sg_90k_part1_html_cleaned_lang_first_sampled_sorted.json", | |
| output_dir: str = "data", | |
| device_index: int = 0, | |
| task: Literal[tuple(SYSTEM_PROMPTS)] = "chat", # type: ignore | |
| load_8bit: bool = False, | |
| temperature: float = 0.7, | |
| repitition_penalty: float = 1.0, | |
| max_new_tokens: int = 512, | |
| batch: int = 1, | |
| ) -> None: | |
| """Run benchmarking for one model on the entire input file. | |
| Args: | |
| model_path: Path to or Huggingface Hub Id of the model. | |
| input_file: Path to the input JSON file. Assumed to be our cleaned ShareGPT data. | |
| (Default: "sharegpt/sg_90k_part1_html_cleaned_lang_first_sampled_sorted.json") | |
| output_dir: Path to the output directory. (Default: "data") | |
| device_index: Index of the GPU to use for inference. (Default: 0) | |
| task: Type of task to perform inference on. (Default: "chat") | |
| load_8bit: Whether to load the model in 8-bit mode. (Default: False) | |
| temperature: Temperature to use for sampling. (Default: 0.7) | |
| repitition_penalty: Repitition penalty to use for the model. (Default: 1.0) | |
| max_new_tokens: Maximum numbers of tokens to generate, ignoring the prompt. (Default: 512) | |
| """ | |
| # NOTE(JW): ChatGLM is implemented as a special case in FastChat inference. | |
| # Also, it's primarily a model that's fine-tuned for Chinese, so it doesn't | |
| # make sense to prompt it in English and talk about its verbosity. | |
| if "chatglm" in model_path.lower(): | |
| raise ValueError("ChatGLM is not supported.") | |
| # Get Rich Console instance. | |
| console = rich.get_console() | |
| # Print out what we're about to do. | |
| if model_path.endswith("/"): | |
| model_path = model_path[:-1] | |
| model_name_cleaned = "--".join(model_path.split("/")[-2:]) | |
| output_dir = f"{output_dir}/{task}/{model_name_cleaned}" | |
| output_csv_path = f"{output_dir}/benchmark_batch_{batch}.json" | |
| config_json_path = f"{output_dir}/config.json" | |
| table = Table(title="Benchmark") | |
| table.add_column("Configuration") | |
| table.add_column("Value") | |
| table.add_row("Model", f"{model_name_cleaned} (path: {model_path})") | |
| table.add_row("Input", input_file) | |
| table.add_row("Device", f"cuda:{device_index}") | |
| table.add_row("Task", task) | |
| table.add_row("8-bit", str(load_8bit)) | |
| table.add_row("Temperature", f"{temperature:.2f}") | |
| table.add_row("Repitition Penalty", f"{repitition_penalty:.2f}") | |
| table.add_row("Max New Tokens", str(max_new_tokens)) | |
| table.add_row("Output CSV", output_csv_path) | |
| table.add_row("Config JSON", config_json_path) | |
| console.print(table) | |
| # Set the device. | |
| torch.cuda.set_device(f"cuda:{device_index}") | |
| # Load the model (Huggingface PyTorch) and tokenizer (Huggingface). | |
| model, tokenizer = load_model( | |
| model_path=model_path, | |
| device="cuda", | |
| num_gpus=1, | |
| max_gpu_memory=None, | |
| load_8bit=load_8bit, | |
| cpu_offloading=False, | |
| gptq_config=None, | |
| debug=False, | |
| ) | |
| # Chats are accumulated in a conversation helper object. | |
| conv_base = get_conversation_template(model_path) | |
| # Standardize the system prompt for every model. | |
| if "llama-2" in model_path.lower(): | |
| conv_base.system = f"<s>[INST] <<SYS>>\n{SYSTEM_PROMPTS[task]}\n<</SYS>>\n\n" | |
| elif "stablelm" in model_path.lower(): | |
| conv_base.system = f"""<|SYSTEM|># {SYSTEM_PROMPTS[task]}\n""" | |
| else: | |
| conv_base.system = SYSTEM_PROMPTS[task] | |
| conv_base.messages = [] | |
| conv_base.offset = 0 | |
| gen_params = { | |
| "model": model_path, | |
| "prompt": "EMPTY", | |
| "temperature": temperature, | |
| "repitition_penalty": repitition_penalty, | |
| "max_new_tokens": max_new_tokens, | |
| "stop": conv_base.stop_str, | |
| "stop_token_ids": conv_base.stop_token_ids, | |
| } | |
| monitor = ZeusMonitor(gpu_indices=[torch.cuda.current_device()]) | |
| # Output files. | |
| # Leave only the last two path components and replace slashes with double dashes. | |
| os.makedirs(output_dir, exist_ok=True) | |
| output_json = open(output_csv_path, "w") | |
| output_json.write("[\n") | |
| output_json.flush() | |
| # Conclude the JSON file format with a closing bracket. Using `atexit` will | |
| # handle all cases of the program exiting, including Ctrl-C and errors. | |
| atexit.register(lambda: output_json.write("\n]\n")) | |
| # Dump the configuration to a JSON file. | |
| with open(config_json_path, "w") as config_json: | |
| json.dump( | |
| { | |
| "model_path": model_path, | |
| "input_file": input_file, | |
| "device_index": device_index, | |
| "task": task, | |
| "load_8bit": load_8bit, | |
| "temperature": temperature, | |
| "repitition_penalty": repitition_penalty, | |
| "max_new_tokens": max_new_tokens, | |
| }, | |
| config_json, | |
| indent=4, | |
| ) | |
| config_json.write("\n") | |
| def dataloader(input_file: str) -> Generator[tuple[bool, str], None, None]: | |
| """Yields a tuple of whether this is a warmup run and the input prompt.""" | |
| for _ in range(3*batch): | |
| yield True, "Say something long and random. I don't care about the content." | |
| for item in json.load(open(input_file, "r")): | |
| input_prompt = item["conversations"][0]["value"] | |
| yield False, input_prompt | |
| # Warm up the GPU with some random prompts. | |
| # Forward through all the prompts. | |
| is_first = True | |
| convs = [] | |
| prompts = [] | |
| data_iter = iter(dataloader(input_file)) | |
| end_of_file = False # flag to track the end of the file | |
| while True: | |
| try: | |
| is_warmup, input_prompt = next(data_iter) | |
| except StopIteration: | |
| end_of_file = True # no more data | |
| # Construct the input prompt. | |
| if not end_of_file: | |
| conv = copy.deepcopy(conv_base) | |
| conv.append_message(conv.roles[0], input_prompt) | |
| conv.append_message(conv.roles[1], "") | |
| prompt = conv.get_prompt() | |
| prompts.append(prompt) | |
| convs.append(conv) | |
| if (len(convs) < batch): continue | |
| gen_params["prompt"] = prompts | |
| if end_of_file and len(prompts) == 0: | |
| break | |
| # Print input prompt. | |
| for i in range(len(convs)): | |
| console.print(f"\n[u cyan]{'Warmup ' if is_warmup else ''}Prompt[/u cyan](batch_{i}):") | |
| console.print(prompts[i].strip() + "\n", markup=False) | |
| # Generate the ouptut from the model. | |
| output_stream = generate_stream(model, tokenizer, gen_params, device="cuda", context_len=2048) | |
| output = {} | |
| batch_token_len = {} | |
| ################################################# | |
| # Inference and measurement zone! | |
| ################################################# | |
| monitor.begin_window("inference") | |
| for output in output_stream: | |
| stop_length = output["stop_length"] | |
| for it in stop_length: | |
| batch_token_len[it[0]] = it[1] | |
| measurements = monitor.end_window("inference") | |
| ################################################# | |
| # Record numbers. | |
| output_text = output["text"] | |
| if not is_warmup: | |
| total_length = int(sum(batch_token_len.values())) # number of valid tokens | |
| response_length = float(total_length) / len(convs) | |
| latency = measurements.time | |
| throughput = response_length / latency | |
| energy = measurements.total_energy | |
| output = { | |
| "model": model_name_cleaned, | |
| "batch": len(convs), | |
| "throughput": throughput, | |
| "response_length": response_length, | |
| "latency": latency, | |
| "energy": energy, | |
| "input": [prompt.strip() for prompt in prompts], | |
| "output": [output_text[i][:batch_token_len[i]].strip() for i in range(len(convs))], | |
| } | |
| output_str = json.dumps(output, indent=4) | |
| if not is_warmup: | |
| if not is_first: | |
| output_json.write(",\n" + output_str) | |
| else: | |
| is_first = False | |
| output_json.write(output_str) | |
| output_json.flush() | |
| # Print the response. | |
| for i in range(len(convs)): | |
| console.print(f"\n[u cyan]{'Warmup ' if is_warmup else ''}Response[/u cyan](batch_{i}):") | |
| console.print(output_text[i][:batch_token_len[i]].strip() + "\n", markup=False) | |
| # Print measurement. | |
| console.print(measurements) | |
| convs = [] | |
| prompts = [] | |
| if end_of_file: | |
| break | |
| if __name__ == "__main__": | |
| tyro.cli(main) | |