import os import gc import psutil import cachetools from pydantic import BaseModel from llama_cpp import Llama from concurrent.futures import ThreadPoolExecutor, as_completed import re import httpx import asyncio import gradio as gr import torch from dotenv import load_dotenv from fastapi import FastAPI from fastapi.responses import JSONResponse import uvicorn from threading import Thread import gptcache from sklearn.metrics.pairwise import cosine_similarity from sklearn.feature_extraction.text import TfidfVectorizer load_dotenv() HUGGINGFACE_TOKEN = os.getenv("HUGGINGFACE_TOKEN") cache = cachetools.TTLCache(maxsize=100, ttl=60) global_data = { 'models': {}, 'tokensxx': { 'eos': '<|end_of-text|>', 'pad': '', 'padding': '', 'unk': '', 'bos': '<|begin_of_text|>', 'sep': '<|sep|>', 'cls': '<|cls|>', 'mask': '', 'eot': '<|eot_id|>', 'eom': '<|eom_id|>', 'lf': '<|0x0A|>' }, 'tokens': { 'eos': 'eos_token', 'pad': 'pad_token', 'padding': 'padding_token', 'unk': 'unk_token', 'bos': 'bos_token', 'sep': 'sep_token', 'cls': 'cls_token', 'mask': 'mask_token' }, 'model_metadata': {}, 'eos': {}, 'pad': {}, 'padding': {}, 'unk': {}, 'bos': {}, 'sep': {}, 'cls': {}, 'mask': {}, 'eot': {}, 'eom': {}, 'lf': {}, 'max_tokens': {}, 'tokenizers': {}, 'model_params': {}, 'model_size': {}, 'model_ftype': {}, 'n_ctx_train': {}, 'n_embd': {}, 'n_layer': {}, 'n_head': {}, 'n_head_kv': {}, 'n_rot': {}, 'n_swa': {}, 'n_embd_head_k': {}, 'n_embd_head_v': {}, 'n_gqa': {}, 'n_embd_k_gqa': {}, 'n_embd_v_gqa': {}, 'f_norm_eps': {}, 'f_norm_rms_eps': {}, 'f_clamp_kqv': {}, 'f_max_alibi_bias': {}, 'f_logit_scale': {}, 'n_ff': {}, 'n_expert': {}, 'n_expert_used': {}, 'causal_attn': {}, 'pooling_type': {}, 'rope_type': {}, 'rope_scaling': {}, 'freq_base_train': {}, 'freq_scale_train': {}, 'n_ctx_orig_yarn': {}, 'rope_finetuned': {}, 'ssm_d_conv': {}, 'ssm_d_inner': {}, 'ssm_d_state': {}, 'ssm_dt_rank': {}, 'ssm_dt_b_c_rms': {}, 'vocab_type': {}, 'model_type': {} } model_configs = [ {"repo_id": "Hjgugugjhuhjggg/testing_semifinal-Q2_K-GGUF", "filename": "testing_semifinal-q2_k.gguf", "name": "testing"}, {"repo_id": "bartowski/Llama-3.2-3B-Instruct-uncensored-GGUF", "filename": "Llama-3.2-3B-Instruct-uncensored-Q2_K.gguf", "name": "Llama-3.2-3B-Instruct"}, {"repo_id": "Ffftdtd5dtft/Meta-Llama-3.1-70B-Q2_K-GGUF", "filename": "meta-llama-3.1-70b-q2_k.gguf", "name": "Meta-Llama-3.1-70B"}, {"repo_id": "bartowski/QwQ-32B-Preview-GGUF", "filename": "QwQ-32B-Preview-Q2_K.gguf", "name": "QwQ-32B-Preview"}, {"repo_id": "Ffftdtd5dtft/Codestral-22B-v0.1-Q2_K-GGUF", "filename": "codestral-22b-v0.1-q2_k.gguf", "name": "Codestral-22B-v0.1"}, {"repo_id": "Ffftdtd5dtft/WizardLM-13B-Uncensored-Q2_K-GGUF", "filename": "wizardlm-13b-uncensored-q2_k.gguf", "name": "WizardLM-13B-Uncensored"}, {"repo_id": "Ffftdtd5dtft/Qwen2-Math-72B-Instruct-Q2_K-GGUF", "filename": "qwen2-math-72b-instruct-q2_k.gguf", "name": "Qwen2-Math-72B-Instruct"}, {"repo_id": "Ffftdtd5dtft/Phi-3-mini-128k-instruct-Q2_K-GGUF", "filename": "phi-3-mini-128k-instruct-q2_k.gguf", "name": "Phi-3-mini-128k"}, {"repo_id": "Ffftdtd5dtft/DeepSeek-Coder-V2-Lite-Instruct-Q2_K-GGUF", "filename": "deepseek-coder-v2-lite-instruct-q2_k.gguf", "name": "DeepSeek-Coder-V2-Lite"}, {"repo_id": "Ffftdtd5dtft/Mistral-Nemo-Instruct-2407-Q2_K-GGUF", "filename": "mistral-nemo-instruct-2407-q2_k.gguf", "name": "Mistral-Nemo-Instruct-2407"} ] class ModelManager: def __init__(self): self.models = {} def load_model(self, model_config): if model_config['name'] not in self.models: try: self.models[model_config['name']] = Llama.from_pretrained( repo_id=model_config['repo_id'], filename=model_config['filename'], use_auth_token=HUGGINGFACE_TOKEN, n_threads=20, use_gpu=False ) except Exception as e: print(f"Error loading model {model_config['name']}: {e}") def load_all_models(self): with ThreadPoolExecutor() as executor: for config in model_configs: executor.submit(self.load_model, config) return self.models model_manager = ModelManager() global_data['models'] = model_manager.load_all_models() def release_resources(): try: if torch.cuda.is_available(): torch.cuda.empty_cache() gc.collect() except Exception as e: print(f"Failed to release resources: {e}") def resource_manager(): MAX_RAM_PERCENT = 1 MAX_CPU_PERCENT = 1 MAX_GPU_PERCENT = 1 while True: try: virtual_mem = psutil.virtual_memory() current_ram_percent = virtual_mem.percent if current_ram_percent > MAX_RAM_PERCENT: release_resources() current_cpu_percent = psutil.cpu_percent() if current_cpu_percent > MAX_CPU_PERCENT: psutil.Process(os.getpid()).nice() if torch.cuda.is_available(): gpu = torch.cuda.current_device() gpu_mem = torch.cuda.memory_allocated(gpu) / (1024 * 1024) total_gpu_mem = torch.cuda.get_device_properties(gpu).total_memory / (1024 * 1024) gpu_mem_percent = (gpu_mem / total_gpu_mem) * 100 if gpu_mem_percent > MAX_GPU_PERCENT: release_resources() except Exception as e: print(f"Error in resource manager: {e}") def run_resource_manager(): resource_manager() Thread(target=run_resource_manager, daemon=True).start() def normalize_input(input_text): return input_text.strip() def remove_duplicates(text): lines = text.split('\n') unique_lines = [] seen_lines = set() for line in lines: if line not in seen_lines: unique_lines.append(line) seen_lines.add(line) return '\n'.join(unique_lines) def get_best_response(responses): vectorizer = TfidfVectorizer().fit_transform(responses) similarity_matrix = cosine_similarity(vectorizer) total_similarities = similarity_matrix.sum(axis=1) best_response_index = total_similarities.argmax() return responses[best_response_index] async def generate_model_response(model, inputs, max_tokens=2048): try: response = model(inputs) text = remove_duplicates(response['choices'][0]['text']) if len(text.split()) > max_tokens: return text[:max_tokens] return text except Exception as e: return "" async def process_message(message): inputs = normalize_input(message) with ThreadPoolExecutor() as executor: futures = [ executor.submit(generate_model_response, model, inputs) for model in global_data['models'].values() ] responses = [ future.result() for future in as_completed(futures) ] best_response = get_best_response(responses) return best_response app = FastAPI() @app.post("/generate") async def generate(request: BaseModel): try: response = await process_message(request.message) return JSONResponse(content={"response": response}) except Exception as e: return JSONResponse(content={"error": str(e)}) def run_uvicorn(): uvicorn.run(app, host="0.0.0.0", port=7860) iface = gr.Interface( fn=process_message, inputs=gr.Textbox(lines=2, placeholder="Enter your message here..."), outputs=gr.Markdown(), title="Multi-Model LLM API (CPU Optimized)", description="" ) def run_gradio(): iface.launch(server_port=7862, prevent_thread_lock=True) if __name__ == "__main__": Thread(target=run_uvicorn).start() Thread(target=run_gradio).start() asyncio.get_event_loop().run_forever()