""" Minimal OpenAI-compatible local server that serves /LiquidAI/LFM2-1.2B via Hugging Face Transformers on CPU and exposes a subset of the OpenAI REST API (chat/completions, models). Save as local_openai_compatible_server.py and run: pip install -r requirements.txt python local_openai_compatible_server.py Or run with uvicorn directly (recommended for production/dev): uvicorn local_openai_compatible_server:app --host 0.0.0.0 --port 7860 Requirements (requirements.txt): fastapi "uvicorn[standard]" transformers torch Notes: - CPU-only: model loads on CPU (may be slow for a 1.2B model depending on your machine). - Model repo id used: "/LiquidAI/LFM2-1.2B" — adjust if you have a different path or local copy. - This provides a simplified compatibility layer. It is NOT feature-complete with OpenAI's API but implements common fields: messages, max_tokens, temperature, top_p, n, stop, stream (basic). """ from fastapi import FastAPI, Request, HTTPException from fastapi.responses import JSONResponse, StreamingResponse, PlainTextResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import List, Optional, Any, Dict import torch from transformers import AutoTokenizer, AutoModelForCausalLM import time import json import uuid # ----------------------------- # Configuration # ----------------------------- MODEL_ID = "/LiquidAI/LFM2-1.2B" # change to your model location or HF repo HOST = "0.0.0.0" PORT = 7860 DEVICE = torch.device("cpu") # CPU-only as requested DEFAULT_MAX_TOKENS = 256 # ----------------------------- # Load model & tokenizer # ----------------------------- print(f"Loading tokenizer and model '{MODEL_ID}' on device {DEVICE} (CPU-only)... this may take a while") try: tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, use_fast=True) model = AutoModelForCausalLM.from_pretrained(MODEL_ID, torch_dtype=torch.float32) model.to(DEVICE) model.eval() except Exception as e: raise RuntimeError(f"Failed to load model/tokenizer for '{MODEL_ID}': {e}") # If tokenizer has no pad/eos, try to set sensible defaults if tokenizer.pad_token_id is None: if tokenizer.eos_token_id is not None: tokenizer.pad_token_id = tokenizer.eos_token_id # ----------------------------- # FastAPI app # ----------------------------- app = FastAPI(title="Local OpenAI-compatible server (transformers)", version="0.1") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ----------------------------- # Pydantic models (request bodies) # ----------------------------- class Message(BaseModel): role: str content: str class ChatCompletionRequest(BaseModel): model: Optional[str] = MODEL_ID messages: List[Message] max_tokens: Optional[int] = DEFAULT_MAX_TOKENS temperature: Optional[float] = 0.0 top_p: Optional[float] = 1.0 n: Optional[int] = 1 stop: Optional[List[str]] = None stream: Optional[bool] = False # ----------------------------- # Helpers # ----------------------------- def build_prompt_from_messages(messages: List[Dict[str, Any]]) -> str: # Simple conversational prompt formatting. Adjust to suit model's expected format. parts = [] for m in messages: role = m.get("role", "user") content = m.get("content", "") if role == "system": parts.append(f"<|system|> {content}\n") elif role == "user": parts.append(f"User: {content}\n") elif role == "assistant": parts.append(f"Assistant: {content}\n") else: parts.append(f"{role}: {content}\n") parts.append("Assistant: ") return "".join(parts) def apply_stop_sequences(text: str, stops: Optional[List[str]]) -> str: if not stops: return text idx = None for s in stops: if s == "": continue pos = text.find(s) if pos != -1: if idx is None or pos < idx: idx = pos if idx is not None: return text[:idx] return text # ----------------------------- # Endpoints # ----------------------------- @app.get("/", response_class=PlainTextResponse) async def root(): return "Local OpenAI-compatible server running. Use /v1/chat/completions or /v1/models" @app.get("/v1/models") async def list_models(): return {"data": [{"id": MODEL_ID, "object": "model"}], "object": "list"} @app.post("/v1/chat/completions") async def chat_completions(request: Request, body: ChatCompletionRequest): # Basic validation if body.model is None or body.model != MODEL_ID: # Allow the default model but warn if mismatched raise HTTPException(status_code=400, detail={"error": "invalid_model", "message": f"Only model {MODEL_ID} is available on this server."}) prompt = build_prompt_from_messages([m.dict() for m in body.messages]) # Tokenize inputs = tokenizer(prompt, return_tensors="pt") input_ids = inputs["input_ids"].to(DEVICE) input_len = input_ids.shape[-1] # Generation settings gen_kwargs = { "max_new_tokens": body.max_tokens, "do_sample": bool(body.temperature and body.temperature > 0.0), "temperature": float(body.temperature or 0.0), "top_p": float(body.top_p or 1.0), "num_return_sequences": int(body.n or 1), "pad_token_id": tokenizer.pad_token_id or tokenizer.eos_token_id, # note: on CPU large models may be slow } # Synchronous generation with torch.no_grad(): outputs = model.generate(input_ids, **gen_kwargs) choices = [] for i, out_ids in enumerate(outputs): full_text = tokenizer.decode(out_ids, skip_special_tokens=True) # Attempt to strip the prompt prefix to return only generated reply # find the last occurrence of the prompt in full_text (best-effort) stripped = full_text try: # prefer exact match; fallback to trimming by token count if prompt.strip() and prompt in full_text: stripped = full_text.split(prompt, 1)[1] else: # fallback: remove first input_len tokens from decoded sequence decoded_all = full_text # naive fallback: no-op (we keep the full_text) stripped = decoded_all except Exception: stripped = full_text # apply stop sequences stripped = apply_stop_sequences(stripped, body.stop) # build choice structure similar to OpenAI choice = { "index": i, "message": {"role": "assistant", "content": stripped}, "finish_reason": "stop" if body.stop else "length", } choices.append(choice) # approximate token usage completion_tokens = max(0, (outputs.shape[-1] - input_len) if outputs is not None else 0) usage = {"prompt_tokens": int(input_len), "completion_tokens": int(completion_tokens), "total_tokens": int(input_len + completion_tokens)} response = { "id": str(uuid.uuid4()), "object": "chat.completion", "created": int(time.time()), "model": body.model, "choices": choices, "usage": usage, } # Streaming: rudimentary implementation that streams chunks of the final text as SSE if body.stream: # Only support streaming a single response (n > 1 will still stream the first) text_to_stream = choices[0]["message"]["content"] def event_stream(): # send a few small chunks chunk_size = 128 for start in range(0, len(text_to_stream), chunk_size): chunk = text_to_stream[start:start+chunk_size] payload = {"id": response["id"], "object": "chat.completion.chunk", "choices": [{"delta": {"content": chunk}, "index": 0}]} yield f"data: {json.dumps(payload)}\n\n" # final done message done_payload = {"id": response["id"], "object": "chat.completion.chunk", "choices": [{"delta": {}, "index": 0}], "done": True} yield f"data: {json.dumps(done_payload)}\n\n" return StreamingResponse(event_stream(), media_type="text/event-stream") return JSONResponse(response) # A convenience POST /v1/completions that accepts 'prompt' (legacy completions API) class CompletionRequest(BaseModel): model: Optional[str] = MODEL_ID prompt: Optional[str] = "" max_tokens: Optional[int] = DEFAULT_MAX_TOKENS temperature: Optional[float] = 0.0 top_p: Optional[float] = 1.0 n: Optional[int] = 1 stop: Optional[List[str]] = None stream: Optional[bool] = False @app.post("/v1/completions") async def completions(req: CompletionRequest): # wrap prompt into the chat-format for our generator messages = [Message(role="user", content=req.prompt)] chat_req = ChatCompletionRequest(model=req.model, messages=messages, max_tokens=req.max_tokens, temperature=req.temperature, top_p=req.top_p, n=req.n, stop=req.stop, stream=req.stream) # call the chat_completions handler directly return await chat_completions(Request(scope={}), chat_req) # ----------------------------- # If executed directly, run uvicorn # ----------------------------- if __name__ == "__main__": import uvicorn uvicorn.run("local_openai_compatible_server:app", host=HOST, port=PORT, log_level="info")