from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware # Importa il middleware CORS from pydantic import BaseModel from huggingface_hub import InferenceClient from datetime import datetime from gradio_client import Client import base64 import requests import os import socket import time from enum import Enum import random import aiohttp import asyncio import json from types import SimpleNamespace from io import BytesIO from PIL import Image #--------------------------------------------------- Definizione Server FAST API ------------------------------------------------------ app = FastAPI() client = InferenceClient("mistralai/Mixtral-8x7B-Instruct-v0.1") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) class InputData(BaseModel): input: str systemRole: str = '' systemStyle: str = '' instruction: str = '' temperature: float = 0.7 max_new_tokens: int = 2000 top_p: float = 0.95 repetition_penalty: float = 1.0 asincrono: bool = False NumeroGenerazioni: int = 1 StringaSplit: str = '********' NumeroCaratteriSplitInstruction: int = 30000 EliminaRisposteNonPertinenti: bool = False UnificaRispostaPertinente: bool = False telegramChatId: str = '' telegramUrlBot: str = '' telegramUrlPost: str = '' class InputDataAsync(InputData): test: str = '' class PostSpazio(BaseModel): nomeSpazio: str input: str = '' api_name: str = "/chat" def LoggaTesto(log_type, data, serializza=True): if serializza: formatted_data = json.dumps(data, indent=2) else: formatted_data = data print(f"\n{datetime.now()}: ---------------------------------------------------------------| {log_type} |--------------------------------------------------------------\n{formatted_data}") #--------------------------------------------------- Generazione TESTO ------------------------------------------------------ @app.post("/Genera") def generate_text(request: Request, input_data: InputData): if not input_data.asincrono: LoggaTesto("INPUT", input_data.input, False) temperature = input_data.temperature max_new_tokens = input_data.max_new_tokens top_p = input_data.top_p repetition_penalty = input_data.repetition_penalty input_text = generate_input_text(input_data) history = [] generated_response = generate(input_text, history, temperature, max_new_tokens, top_p, repetition_penalty) if input_data.telegramChatId != '' and input_data.telegramUrlBot != '' and input_data.telegramUrlPost != '': asyncio.run(call_telegram_api(input_data, generated_response)) LoggaTesto("RISPOSTA", {"response": generated_response}, False) return {"response": generated_response} #return json.dumps({"response": generated_response}) else: input_data.asincrono = False if input_data.EliminaRisposteNonPertinenti: msgEliminaRisposteNonPertinenti = " (Rispondi solo sulla base delle ISTRUZIONI che hai ricevuto. se non trovi corrispondenza tra RICHIESTA e ISTRUZIONI rispondi con !!!)" input_data.input = input_data.input + msgEliminaRisposteNonPertinenti input_data.systemRole = input_data.systemRole + msgEliminaRisposteNonPertinenti result_data = asyncio.run(GeneraTestoAsync("https://matteoscript-fastapi.hf.space/Genera", input_data)) #result_data = result_data.replace('"', '') LoggaTesto("RISPOSTA ASINCRONA", {"response": result_data}) if input_data.EliminaRisposteNonPertinenti: result_data = [item for item in result_data if "NOTFOUND" not in item["response"]] if input_data.UnificaRispostaPertinente: input_data.input= f'''Metti insieme le seguenti risposte. Basati solo su questo TESTO e non AGGIUNGERE ALTRO!!!!: {result_data}''' input_data.systemRole = '' input_data.systemStyle = 'Rispondi in ITALIANO' input_data.instruction ='' result_data = asyncio.run(GeneraTestoAsync("https://matteoscript-fastapi.hf.space/Genera", input_data)) #result_data = result_data.replace('"', '') LoggaTesto("RISPOSTA ASINCRONA UNIFICATA", {"response": result_data}) return {"response": result_data} #return json.dumps({"response": result_data}) def call_telegram_api_OLD(input_data, text): payload = { "chat_id": input_data.telegramChatId, "text": text, "telegramUrl": input_data.telegramUrlBot } response = requests.post(input_data.telegramUrlPost, json=payload) if response.status_code == 200: print("Invio messaggio TELEGRAM") else: print("Errore nella richiesta POST. Codice di stato:", response.status_code) async def call_telegram_api(input_data, text): payload = { "chat_id": input_data.telegramChatId, "text": text, "telegramUrl": input_data.telegramUrlBot } async with aiohttp.ClientSession() as session: async with session.post(input_data.telegramUrlPost, json=payload) as response: response_text = await response.text() def generate_input_text(input_data): if input_data.instruction.startswith("http"): try: resp = requests.get(input_data.instruction) resp.raise_for_status() # Lancia un'eccezione per errori HTTP input_data.instruction = resp.text except requests.exceptions.RequestException as e: input_data.instruction = "" history = [] if input_data.systemRole != "" or input_data.systemStyle != "" or input_data.instruction != "": input_text = f''' {{ "input": {{ "role": "system", "content": "{input_data.systemRole}", "style": "{input_data.systemStyle}" }}, "messages": [ {{ "role": "instructions", "content": "{input_data.instruction} "("{input_data.systemStyle}")" }}, {{ "role": "user", "content": "{input_data.input}" }} ] }} ''' else: input_text = input_data.input return input_text def generate(prompt, history, temperature=0.7, max_new_tokens=30000, top_p=0.95, repetition_penalty=1.0): temperature = float(temperature) if temperature < 1e-2: temperature = 1e-2 top_p = float(top_p) generate_kwargs = dict( temperature=temperature, max_new_tokens=max_new_tokens, top_p=top_p, repetition_penalty=repetition_penalty, do_sample=True, seed=random.randint(0, 10**7), ) formatted_prompt = format_prompt(prompt, history) output = client.text_generation(formatted_prompt, **generate_kwargs, stream=False, details=False) return output def format_prompt(message, history): prompt = "" for user_prompt, bot_response in history: prompt += f"[INST] {user_prompt} [/INST]" prompt += f" {bot_response} " now = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") prompt += f"[{now}] [INST] {message} [/INST]" return prompt #--------------------------------------------------- Generazione TESTO ASYNC ------------------------------------------------------ @app.post("/GeneraAsync") def generate_textAsync(request: Request, input_data: InputDataAsync): result_data = asyncio.run(GeneraTestoAsync("https://matteoscript-fastapi.hf.space/Genera", input_data)) return {"response": result_data} async def make_request(session, token, data, url, index, semaphore, max_retries=3): async with semaphore: headers = { 'Content-Type': 'application/json', 'Authorization': 'Bearer ' + token } if (int(index)+1) % 3 == 1: data['max_new_tokens'] = data['max_new_tokens'] elif (int(index)+1) % 3 == 2: data['max_new_tokens'] = max(200, data['max_new_tokens'] - 200) else: data['max_new_tokens'] = data['max_new_tokens'] + 200 for _ in range(max_retries): try: async with session.post(url, headers=headers, json=data) as response: response.raise_for_status() try: result_data = await response.json() except aiohttp.ContentTypeError: result_data = await response.text() return result_data except (asyncio.TimeoutError, aiohttp.ClientError, requests.exceptions.HTTPError) as e: LoggaTesto("ERRORE ASYNC", {e}, False) if isinstance(e, (asyncio.TimeoutError, requests.exceptions.HTTPError)) and e.response.status in [502, 504]: break await asyncio.sleep(3) raise Exception("Max retries reached or skipping retries. Unable to make the request.") async def CreaListaInput(input_data): if input_data.instruction.startswith("http"): try: resp = requests.get(input_data.instruction) resp.raise_for_status() input_data.instruction = resp.text except requests.exceptions.RequestException as e: input_data.instruction = "" try: lista_dizionari = [] nuova_lista_dizionari = [] lista_dizionari = json.loads(input_data.instruction) if lista_dizionari and "Titolo" in lista_dizionari[0]: nuova_lista_dizionari = DividiInstructionJSON(lista_dizionari, input_data) else: nuova_lista_dizionari = DividiInstructionText(input_data) except json.JSONDecodeError: nuova_lista_dizionari = DividiInstructionText(input_data) return nuova_lista_dizionari def split_at_space_or_dot(input_string, length): delimiters = ['\n\n', '.\n', ';\n', '.', ' '] positions = [input_string.rfind(d, 0, length) for d in delimiters] valid_positions = [pos for pos in positions if pos >= 0] lastpos = max(valid_positions) if valid_positions else length indice_divisione = int(lastpos) return indice_divisione + 1 def DividiInstructionJSON(lista_dizionari, input_data): ListaInput = [] nuova_lista_dizionari = [] for dizionario in lista_dizionari: titolo = dizionario["Titolo"] testo_completo = dizionario["Testo"] while len(testo_completo) > input_data.NumeroCaratteriSplitInstruction: indice_divisione = split_at_space_or_dot(testo_completo, input_data.NumeroCaratteriSplitInstruction) indice_divisione_precedente = split_at_space_or_dot(testo_completo, input_data.NumeroCaratteriSplitInstruction-100) sottostringa = testo_completo[:indice_divisione].strip() testo_completo = testo_completo[indice_divisione_precedente:].strip() nuovo_dizionario = {"Titolo": titolo, "Testo": sottostringa} nuova_lista_dizionari.append(nuovo_dizionario) if len(testo_completo) > 0: nuovo_dizionario = {"Titolo": titolo, "Testo": testo_completo} nuova_lista_dizionari.append(nuovo_dizionario) input_strings = input_data.input.split(input_data.StringaSplit) systemRole_strings = input_data.systemRole.split(input_data.StringaSplit) for systemRole_string in systemRole_strings: for input_string in input_strings: for dizionario in nuova_lista_dizionari: data = { 'input': input_string, 'instruction': str(dizionario), 'temperature': input_data.temperature, 'max_new_tokens': input_data.max_new_tokens, 'top_p': input_data.top_p, 'repetition_penalty': input_data.repetition_penalty, 'systemRole': systemRole_string, 'systemStyle': input_data.systemStyle, 'telegramChatId': input_data.telegramChatId, 'telegramUrlBot': input_data.telegramUrlBot, 'telegramUrlPost': input_data.telegramUrlPost } ListaInput.append(data) return ListaInput def DividiInstructionText(input_data): ListaInput = [] input_str = input_data.instruction StringaSplit = input_data.StringaSplit sottostringhe = [] indice_inizio = 0 if len(input_str) > input_data.NumeroCaratteriSplitInstruction: while indice_inizio < len(input_str): lunghezza_sottostringa = split_at_space_or_dot(input_str[indice_inizio:], input_data.NumeroCaratteriSplitInstruction) sottostringhe.append(input_str[indice_inizio:indice_inizio + lunghezza_sottostringa].strip()) indice_inizio += lunghezza_sottostringa else: sottostringhe.append(input_str) testoSeparato = StringaSplit.join(sottostringhe) instruction_strings = testoSeparato.split(StringaSplit) input_strings = input_data.input.split(input_data.StringaSplit) systemRole_strings = input_data.systemRole.split(input_data.StringaSplit) for systemRole_string in systemRole_strings: for input_string in input_strings: for instruction_string in instruction_strings: data = { 'input': input_string.strip(), 'instruction': str([instruction_string.strip()]), 'temperature': input_data.temperature, 'max_new_tokens': input_data.max_new_tokens, 'top_p': input_data.top_p, 'repetition_penalty': input_data.repetition_penalty, 'systemRole': systemRole_string.strip(), 'systemStyle': input_data.systemStyle, 'telegramChatId': input_data.telegramChatId, 'telegramUrlBot': input_data.telegramUrlBot, 'telegramUrlPost': input_data.telegramUrlPost } ListaInput.append(data) return ListaInput async def GeneraTestoAsync(url, input_data): token = os.getenv('TOKEN') semaphore = asyncio.Semaphore(20) async with aiohttp.ClientSession() as session: tasks = [] ListaInput = await CreaListaInput(input_data) for data in ListaInput: LoggaTesto("RICHIESTA ASINCRONA", data) tasks.extend([make_request(session, token, data, url, index, semaphore) for index in range(input_data.NumeroGenerazioni)]) #tasks.extend([generate_text_internal(data) for _ in range(input_data.NumeroGenerazioni)]) await asyncio.sleep(0.1) return await asyncio.gather(*tasks) async def generate_text_internal(datajson): data = SimpleNamespace(**datajson) temperature = data.temperature max_new_tokens = data.max_new_tokens top_p = data.top_p repetition_penalty = data.repetition_penalty input_text = generate_input_text(data) max_new_tokens = min(max_new_tokens, 29500 - len(input_text)) history = [] generated_response = generate(input_text, history, temperature, max_new_tokens, top_p, repetition_penalty) return generated_response #--------------------------------------------------- Generazione IMMAGINE ------------------------------------------------------ style_image = { "PROFESSIONAL-PHOTO": { "descrizione": "Professional photo {prompt} . Vivid colors, Mirrorless, 35mm lens, f/1.8 aperture, ISO 100, natural daylight", "negativePrompt": "out of frame, lowres, text, error, cropped, worst quality, low quality, jpeg artifacts, ugly, duplicate, morbid, mutilated, out of frame, extra fingers, mutated hands, poorly drawn hands, poorly drawn face, mutation, deformed, blurry, bad anatomy, bad proportions, extra limbs, cloned face, disfigured, gross proportions, malformed limbs, missing arms, missing legs, extra arms, extra legs, fused fingers, too many fingers, long neck, username, watermark, signature" }, "CINEMATIC-PHOTO": { "descrizione": "cinematic photo {prompt} . 35mm photograph, film, bokeh, professional, 4k, highly detailed", "negativePrompt": "drawing, painting, crayon, sketch, graphite, impressionist, noisy, blurry, soft, deformed, ugly" }, "CINEMATIC-PORTRAIT": { "descrizione": "cinematic portrait {prompt} 8k, ultra realistic, good vibes, vibrant", "negativePrompt": "drawing, painting, crayon, sketch, graphite, impressionist, noisy, blurry, soft, deformed, ugly" }, "LINE-ART-DRAWING": { "descrizione": "line art drawing {prompt} . professional, sleek, modern, minimalist, graphic, line art, vector graphics", "negativePrompt": "anime, photorealistic, 35mm film, deformed, glitch, blurry, noisy, off-center, deformed, cross-eyed, closed eyes, bad anatomy, ugly, disfigured, mutated, realism, realistic, impressionism, expressionism, oil, acrylic" }, "COMIC": { "descrizione": "comic {prompt} . graphic illustration, comic art, graphic novel art, vibrant, highly detailed", "negativePrompt": "photograph, deformed, glitch, noisy, realistic, stock photo" }, "ADVERTISING-POSTER-STYLE": { "descrizione": "advertising poster style {prompt} . Professional, modern, product-focused, commercial, eye-catching, highly detailed", "negativePrompt": "noisy, blurry, amateurish, sloppy, unattractive" }, "RETAIL-PACKAGING-STYLE": { "descrizione": "retail packaging style {prompt} . vibrant, enticing, commercial, product-focused, eye-catching, professional, highly detailed", "negativePrompt": "noisy, blurry, amateurish, sloppy, unattractive" }, "GRAFFITI-STYLE": { "descrizione": "graffiti style {prompt} . street art, vibrant, urban, detailed, tag, mural", "negativePrompt": "ugly, deformed, noisy, blurry, low contrast, realism, photorealistic" }, "POP-ART-STYLE": { "descrizione": "pop Art style {prompt} . bright colors, bold outlines, popular culture themes, ironic or kitsch", "negativePrompt": "ugly, deformed, noisy, blurry, low contrast, realism, photorealistic, minimalist" }, "ISOMETRIC-STYLE": { "descrizione": "isometric style {prompt} . vibrant, beautiful, crisp, detailed, ultra detailed, intricate", "negativePrompt": "deformed, mutated, ugly, disfigured, blur, blurry, noise, noisy, realistic, photographic" }, "LOW-POLY-STYLE": { "descrizione": "low-poly style {prompt}. ambient occlusion, low-poly game art, polygon mesh, jagged, blocky, wireframe edges, centered composition", "negativePrompt": "noisy, sloppy, messy, grainy, highly detailed, ultra textured, photo" }, "CLAYMATION-STYLE": { "descrizione": "claymation style {prompt} . sculpture, clay art, centered composition, play-doh", "negativePrompt": "" }, "PROFESSIONAL-3D-MODEL": { "descrizione": "professional 3d model {prompt} . octane render, highly detailed, volumetric, dramatic lighting", "negativePrompt": "ugly, deformed, noisy, low poly, blurry, painting" }, "ANIME-ARTWORK": { "descrizione": "anime artwork {prompt} . anime style, key visual, vibrant, studio anime, highly detailed", "negativePrompt": "photo, deformed, black and white, realism, disfigured, low contrast" }, "ETHEREAL-FANTASY-CONCEPT-ART": { "descrizione": "ethereal fantasy concept art of {prompt} . magnificent, celestial, ethereal, painterly, epic, majestic, magical, fantasy art, cover art, dreamy", "negativePrompt": "photographic, realistic, realism, 35mm film, dslr, cropped, frame, text, deformed, glitch, noise, noisy, off-center, deformed, cross-eyed, closed eyes, bad anatomy, ugly, disfigured, sloppy, duplicate, mutated, black and white" }, "CYBERNETIC-STYLE": { "descrizione": "cybernetic style {prompt} . futuristic, technological, cybernetic enhancements, robotics, artificial intelligence themes", "negativePrompt": "ugly, deformed, noisy, blurry, low contrast, realism, photorealistic, historical, medieval" }, "FUTURISTIC-STYLE": { "descrizione": "futuristic style {prompt} . sleek, modern, ultramodern, high tech, detailed", "negativePrompt": "ugly, deformed, noisy, blurry, low contrast, realism, photorealistic, vintage, antique" }, "SCI-FI-STYLE": { "descrizione": "sci-fi style {prompt} . futuristic, technological, alien worlds, space themes, advanced civilizations", "negativePrompt": "ugly, deformed, noisy, blurry, low contrast, realism, photorealistic, historical, medieval" }, "DIGITAL-ART": { "descrizione": "Digital Art {prompt} . vibrant, cute, digital, handmade", "negativePrompt": "" }, "SIMPLE-LOGO": { "descrizione": "Minimalist Logo {prompt} . material design, primary colors, stylized, minimalist", "negativePrompt": "3D, high detail, noise, grainy, blurry, painting, drawing, photo, disfigured" }, "MINIMALISTIC-LOGO": { "descrizione": "Ultra-minimalist Material Design logo for a BRAND: {prompt} . simple, few colors, clean lines, minimal details, modern color palette, no shadows", "negativePrompt": "3D, high detail, noise, grainy, blurry, painting, drawing, photo, disfigured" } } class InputImage(BaseModel): input: str negativePrompt: str = '' style: str = '' steps: int = 25 cfg: int = 6 seed: int = -1 variante = False @app.post("/Immagine") def generate_image(request: Request, input_data: InputImage): #client = Client("https://manjushri-sdxl-1-0.hf.space/") if input_data.style: print(input_data.style) if input_data.style == 'RANDOM': random_style = random.choice(list(style_image.keys())) style_info = style_image[random_style] input_data.input = style_info["descrizione"].format(prompt=input_data.input) input_data.negativePrompt = style_info["negativePrompt"] elif input_data.style in style_image: style_info = style_image[input_data.style] input_data.input = style_info["descrizione"].format(prompt=input_data.input) input_data.negativePrompt = style_info["negativePrompt"] max_attempts = 5 attempt = 0 while attempt < max_attempts: try: print(input_data.input) if input_data.variante == False: #client = Client("AP123/SDXL-Lightning") client = Client("ByteDance/SDXL-Lightning") result = client.predict( input_data.input, "8-Step", api_name="/generate_image" ) image_url = result else: #client = Client("https://playgroundai-playground-v2-5.hf.space/--replicas/9kuov/") client = Client("https://choimirai-playground-v2-5.hf.space/--replicas/bgsav/") result = client.predict( input_data.input, # str in 'Prompt' Textbox component input_data.negativePrompt, # str in 'Negative prompt' Textbox component True, # bool in 'Use negative prompt' Checkbox component 0, # float (numeric value between 0 and 2147483647) in 'Seed' Slider component 1024, # float (numeric value between 256 and 1536) in 'Width' Slider component 1024, # float (numeric value between 256 and 1536) in 'Height' Slider component 3, # float (numeric value between 0.1 and 20) in 'Guidance Scale' Slider component True, # bool in 'Randomize seed' Checkbox component api_name="/run" ) image_url = result[0][0]['image'] print(image_url) with open(image_url, 'rb') as img_file: img_binary = img_file.read() img_base64 = base64.b64encode(img_binary).decode('utf-8') return {"response": img_base64} except requests.exceptions.HTTPError as e: time.sleep(1) attempt += 1 if attempt < max_attempts: continue else: return {"error": "Errore interno del server persistente!"} return {"error": "Numero massimo di tentativi raggiunto"} #--------------------------------------------------- IMAGE TO TEXT ------------------------------------------------------ class InputImageToText(BaseModel): base64: str input: str = '' def base64_in_immagine(dati_base64): immagine = base64.b64decode(dati_base64) immagine_pil = Image.open(BytesIO(immagine)) nome_file = "/tmp/img.jpg" immagine_pil.save(nome_file) @app.post("/Image_To_Text") def image_to_text(request: Request, input_data: InputImageToText): base64_in_immagine(input_data.base64) if input_data.input == '': input_data.input = 'Describe the image' Version = 1 if Version == 1: client = Client("https://vikhyatk-moondream1.hf.space/--replicas/av7ct/") result = client.predict( "/tmp/img.jpg", input_data.input, api_name="/answer_question" ) else: client = Client("vikhyatk/moondream2") result = client.predict( "/tmp/img.jpg", input_data.input, api_name="/answer_question_1" ) LoggaTesto("IMMAGINE", {"response": result}, False) return {"response": result} #--------------------------------------------------- API PostSpazio ------------------------------------------------------ @app.post("/PostSpazio") def generate_postspazio(request: Request, input_data: PostSpazio): client = Client(input_data.nomeSpazio) result = client.predict( input_data.input, api_name=input_data.api_name ) return {"response": result} @app.get("/") def read_general(): return {"response": "Benvenuto. Per maggiori info: https://matteoscript-fastapi.hf.space/docs"}