from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware  # Importa il middleware CORS
from pydantic import BaseModel
from huggingface_hub import InferenceClient
from datetime import datetime
from gradio_client import Client
import base64
import requests
import os
import socket
import time
from enum import Enum
import random
import aiohttp
import asyncio
import json
from types import SimpleNamespace
from io import BytesIO
from PIL import Image

#--------------------------------------------------- Definizione Server FAST API ------------------------------------------------------
app = FastAPI()
client = InferenceClient("mistralai/Mixtral-8x7B-Instruct-v0.1")


class InputData(BaseModel):
    input: str
    systemRole: str = '' 
    systemStyle: str = ''
    instruction: str = ''
    temperature: float = 0.7
    max_new_tokens: int = 2000
    top_p: float = 0.95
    repetition_penalty: float = 1.0
    asincrono: bool = False
    NumeroGenerazioni: int = 1
    StringaSplit: str = '********'
    NumeroCaratteriSplitInstruction: int = 30000
    EliminaRisposteNonPertinenti: bool = False
    UnificaRispostaPertinente: bool = False
    telegramChatId: str = ''
    telegramUrlBot: str = ''
    telegramUrlPost: str = ''

class InputDataAsync(InputData):
    test: str = ''
class PostSpazio(BaseModel):
    nomeSpazio: str
    input: str = '' 
    api_name: str = "/chat"

def LoggaTesto(log_type, data, serializza=True):
    if serializza:
        formatted_data = json.dumps(data, indent=2)
        formatted_data = data
    print(f"\n{}: ---------------------------------------------------------------| {log_type} |--------------------------------------------------------------\n{formatted_data}")
#--------------------------------------------------- Generazione TESTO ------------------------------------------------------"/Genera")
def generate_text(request: Request, input_data: InputData):
    if not input_data.asincrono: 
        LoggaTesto("INPUT", input_data.input, False)
        temperature = input_data.temperature
        max_new_tokens = input_data.max_new_tokens
        top_p = input_data.top_p
        repetition_penalty = input_data.repetition_penalty
        input_text = generate_input_text(input_data)  
        max_new_tokens = min(max_new_tokens, 29500 - len(input_text))
        history = []
        generated_response = generate(input_text, history, temperature, max_new_tokens, top_p, repetition_penalty)
        if input_data.telegramChatId != '' and input_data.telegramUrlBot != '' and input_data.telegramUrlPost != '': 
  , generated_response))
        LoggaTesto("RISPOSTA", {"response": generated_response}, False)
        return {"response": generated_response}
        #return json.dumps({"response": generated_response})
        input_data.asincrono = False
        if input_data.EliminaRisposteNonPertinenti:
            msgEliminaRisposteNonPertinenti = " (Rispondi solo sulla base delle ISTRUZIONI che hai ricevuto. se non trovi corrispondenza tra RICHIESTA e ISTRUZIONI rispondi con <NOTFOUND>!!!)"
            input_data.input = input_data.input + msgEliminaRisposteNonPertinenti
            input_data.systemRole = input_data.systemRole + msgEliminaRisposteNonPertinenti
        result_data ="", input_data))
        #result_data = result_data.replace('"', '')
        LoggaTesto("RISPOSTA ASINCRONA", {"response": result_data})
        if input_data.EliminaRisposteNonPertinenti:
            result_data = [item for item in result_data if "NOTFOUND" not in item["response"]]
        if input_data.UnificaRispostaPertinente:
            input_data.input= f'''Metti insieme le seguenti risposte. Basati solo su questo TESTO e non AGGIUNGERE ALTRO!!!!: {result_data}'''
            input_data.systemRole = '' 
            input_data.systemStyle = 'Rispondi in ITALIANO'
            input_data.instruction =''
            result_data ="", input_data))
            #result_data = result_data.replace('"', '')
            LoggaTesto("RISPOSTA ASINCRONA UNIFICATA", {"response": result_data})
        return {"response": result_data}
        #return json.dumps({"response": result_data})

def call_telegram_api_OLD(input_data, text): 
    payload = {
        "chat_id": input_data.telegramChatId,
        "text": text,
        "telegramUrl": input_data.telegramUrlBot
    response =, json=payload)
    if response.status_code == 200:
        print("Invio messaggio TELEGRAM")
        print("Errore nella richiesta POST. Codice di stato:", response.status_code)    

async def call_telegram_api(input_data, text):
    payload = {
        "chat_id": input_data.telegramChatId,
        "text": text,
        "telegramUrl": input_data.telegramUrlBot
    async with aiohttp.ClientSession() as session:
        async with, json=payload) as response:
            response_text = await response.text()
def generate_input_text(input_data):
    if input_data.instruction.startswith("http"):
            resp = requests.get(input_data.instruction)
            resp.raise_for_status()  # Lancia un'eccezione per errori HTTP
            input_data.instruction = resp.text
        except requests.exceptions.RequestException as e:
            input_data.instruction = ""
    history = [] 
    if input_data.systemRole != "" or input_data.systemStyle != "" or input_data.instruction != "":
        input_text = f'''
            "input": {{
                "role": "system",
                "content": "{input_data.systemRole}", 
                "style": "{input_data.systemStyle}"
            "messages": [
                    "role": "instructions",
                    "content": "{input_data.instruction} "("{input_data.systemStyle}")"
                    "role": "user",
                    "content": "{input_data.input}"
        input_text = input_data.input   
    return input_text

def generate(prompt, history, temperature=0.7, max_new_tokens=30000, top_p=0.95, repetition_penalty=1.0):
    temperature = float(temperature)
    if temperature < 1e-2:
        temperature = 1e-2
    top_p = float(top_p)
    generate_kwargs = dict(
        seed=random.randint(0, 10**7),
    formatted_prompt = format_prompt(prompt, history)
    output = client.text_generation(formatted_prompt, **generate_kwargs, stream=False, details=False)
    return output
def format_prompt(message, history):
    prompt = "<s>"
    for user_prompt, bot_response in history:
        prompt += f"[INST] {user_prompt} [/INST]"
        prompt += f" {bot_response}</s> "
    now ="%Y-%m-%d %H:%M:%S.%f")
    prompt += f"[{now}] [INST] {message} [/INST]"
    return prompt

#--------------------------------------------------- Generazione TESTO ASYNC ------------------------------------------------------"/GeneraAsync")
def generate_textAsync(request: Request, input_data: InputDataAsync):
    result_data ="", input_data))
    return {"response": result_data}

async def make_request(session, token, data, url, index, semaphore, max_retries=3):
    async with semaphore:
        headers = {
            'Content-Type': 'application/json',
            'Authorization': 'Bearer ' + token
        if (int(index)+1) % 3 == 1:
            data['max_new_tokens'] = data['max_new_tokens'] 
        elif (int(index)+1) % 3 == 2:
            data['max_new_tokens'] = max(200, data['max_new_tokens'] - 200)
            data['max_new_tokens'] = data['max_new_tokens'] + 200 
        for _ in range(max_retries):
                async with, headers=headers, json=data) as response:
                        result_data = await response.json()
                    except aiohttp.ContentTypeError:
                        result_data = await response.text()
                    return result_data
            except (asyncio.TimeoutError, aiohttp.ClientError, requests.exceptions.HTTPError) as e:
                LoggaTesto("ERRORE ASYNC", {e}, False)
                if isinstance(e, (asyncio.TimeoutError, requests.exceptions.HTTPError)) and e.response.status in [502, 504]:
                await asyncio.sleep(3)
        raise Exception("Max retries reached or skipping retries. Unable to make the request.")

async def CreaListaInput(input_data):
    if input_data.instruction.startswith("http"):
            resp = requests.get(input_data.instruction)
            input_data.instruction = resp.text
        except requests.exceptions.RequestException as e:
            input_data.instruction = ""
        lista_dizionari = []
        nuova_lista_dizionari = []
        lista_dizionari = json.loads(input_data.instruction)
        if lista_dizionari and "Titolo" in lista_dizionari[0]:
            nuova_lista_dizionari = DividiInstructionJSON(lista_dizionari, input_data)
            nuova_lista_dizionari = DividiInstructionText(input_data)
    except json.JSONDecodeError:
        nuova_lista_dizionari = DividiInstructionText(input_data)
    return nuova_lista_dizionari

def split_at_space_or_dot(input_string, length):
    delimiters = ['\n\n', '.\n', ';\n', '.', ' ']
    positions = [input_string.rfind(d, 0, length) for d in delimiters]
    valid_positions = [pos for pos in positions if pos >= 0]
    lastpos = max(valid_positions) if valid_positions else length
    indice_divisione = int(lastpos)
    return indice_divisione + 1

def DividiInstructionJSON(lista_dizionari, input_data):
    ListaInput = []
    nuova_lista_dizionari = []
    for dizionario in lista_dizionari:
        titolo = dizionario["Titolo"]
        testo_completo = dizionario["Testo"]
        while len(testo_completo) > input_data.NumeroCaratteriSplitInstruction:
            indice_divisione = split_at_space_or_dot(testo_completo, input_data.NumeroCaratteriSplitInstruction)
            indice_divisione_precedente = split_at_space_or_dot(testo_completo, input_data.NumeroCaratteriSplitInstruction-100)
            sottostringa = testo_completo[:indice_divisione].strip()
            testo_completo = testo_completo[indice_divisione_precedente:].strip()
            nuovo_dizionario = {"Titolo": titolo, "Testo": sottostringa}

        if len(testo_completo) > 0:
            nuovo_dizionario = {"Titolo": titolo, "Testo": testo_completo}

    input_strings = input_data.input.split(input_data.StringaSplit)
    systemRole_strings = input_data.systemRole.split(input_data.StringaSplit)
    for systemRole_string in systemRole_strings:
        for input_string in input_strings:
            for dizionario in nuova_lista_dizionari:
                data = {
                    'input': input_string,
                    'instruction': str(dizionario),
                    'temperature': input_data.temperature,
                    'max_new_tokens': input_data.max_new_tokens,
                    'top_p': input_data.top_p,
                    'repetition_penalty': input_data.repetition_penalty,
                    'systemRole': systemRole_string,
                    'systemStyle': input_data.systemStyle,
                    'telegramChatId': input_data.telegramChatId,
                    'telegramUrlBot': input_data.telegramUrlBot,
                    'telegramUrlPost': input_data.telegramUrlPost
    return ListaInput

def DividiInstructionText(input_data): 
    ListaInput = []
    input_str = input_data.instruction
    StringaSplit = input_data.StringaSplit
    sottostringhe = []
    indice_inizio = 0
    if len(input_str) > input_data.NumeroCaratteriSplitInstruction:
        while indice_inizio < len(input_str):
            lunghezza_sottostringa = split_at_space_or_dot(input_str[indice_inizio:], input_data.NumeroCaratteriSplitInstruction)
            sottostringhe.append(input_str[indice_inizio:indice_inizio + lunghezza_sottostringa].strip())
            indice_inizio += lunghezza_sottostringa
    testoSeparato = StringaSplit.join(sottostringhe)
    instruction_strings = testoSeparato.split(StringaSplit)
    input_strings = input_data.input.split(input_data.StringaSplit)
    systemRole_strings = input_data.systemRole.split(input_data.StringaSplit)
    for systemRole_string in systemRole_strings:
        for input_string in input_strings:
            for instruction_string in instruction_strings:
                data = {
                    'input': input_string.strip(),
                    'instruction': str([instruction_string.strip()]), 
                    'temperature': input_data.temperature,
                    'max_new_tokens': input_data.max_new_tokens,
                    'top_p': input_data.top_p,
                    'repetition_penalty': input_data.repetition_penalty,
                    'systemRole': systemRole_string.strip(),
                    'systemStyle': input_data.systemStyle,
                    'telegramChatId': input_data.telegramChatId,
                    'telegramUrlBot': input_data.telegramUrlBot,
                    'telegramUrlPost': input_data.telegramUrlPost
    return ListaInput

async def GeneraTestoAsync(url, input_data):
    token = os.getenv('TOKEN')
    semaphore = asyncio.Semaphore(20) 
    async with aiohttp.ClientSession() as session:
        tasks = []
        ListaInput = await CreaListaInput(input_data)
        for data in ListaInput:
            LoggaTesto("RICHIESTA ASINCRONA", data)
            tasks.extend([make_request(session, token, data, url, index, semaphore) for index in range(input_data.NumeroGenerazioni)])
            #tasks.extend([generate_text_internal(data) for _ in range(input_data.NumeroGenerazioni)])
            await asyncio.sleep(0.1)
        return await asyncio.gather(*tasks)

async def generate_text_internal(datajson):
    data = SimpleNamespace(**datajson)
    temperature = data.temperature
    max_new_tokens = data.max_new_tokens
    top_p = data.top_p
    repetition_penalty = data.repetition_penalty
    input_text = generate_input_text(data)
    max_new_tokens = min(max_new_tokens, 29500 - len(input_text))
    history = []
    generated_response = generate(input_text, history, temperature, max_new_tokens, top_p, repetition_penalty)
    return generated_response        

#--------------------------------------------------- Generazione IMMAGINE ------------------------------------------------------
style_image = {
        "descrizione": "Professional photo {prompt} . Vivid colors, Mirrorless, 35mm lens, f/1.8 aperture, ISO 100, natural daylight",
        "negativePrompt": "out of frame, lowres, text, error, cropped, worst quality, low quality, jpeg artifacts, ugly, duplicate, morbid, mutilated, out of frame, extra fingers, mutated hands, poorly drawn hands, poorly drawn face, mutation, deformed, blurry, bad anatomy, bad proportions, extra limbs, cloned face, disfigured, gross proportions, malformed limbs, missing arms, missing legs, extra arms, extra legs, fused fingers, too many fingers, long neck, username, watermark, signature"
        "descrizione": "cinematic photo {prompt} . 35mm photograph, film, bokeh, professional, 4k, highly detailed",
        "negativePrompt": "drawing, painting, crayon, sketch, graphite, impressionist, noisy, blurry, soft, deformed, ugly"
        "descrizione": "cinematic portrait {prompt} 8k, ultra realistic, good vibes, vibrant",
        "negativePrompt": "drawing, painting, crayon, sketch, graphite, impressionist, noisy, blurry, soft, deformed, ugly"
        "descrizione": "line art drawing {prompt} . professional, sleek, modern, minimalist, graphic, line art, vector graphics",
        "negativePrompt": "anime, photorealistic, 35mm film, deformed, glitch, blurry, noisy, off-center, deformed, cross-eyed, closed eyes, bad anatomy, ugly, disfigured, mutated, realism, realistic, impressionism, expressionism, oil, acrylic"
    "COMIC": {
        "descrizione": "comic {prompt} . graphic illustration, comic art, graphic novel art, vibrant, highly detailed",
        "negativePrompt": "photograph, deformed, glitch, noisy, realistic, stock photo"
        "descrizione": "advertising poster style {prompt} . Professional, modern, product-focused, commercial, eye-catching, highly detailed",
        "negativePrompt": "noisy, blurry, amateurish, sloppy, unattractive"
        "descrizione": "retail packaging style {prompt} . vibrant, enticing, commercial, product-focused, eye-catching, professional, highly detailed",
        "negativePrompt": "noisy, blurry, amateurish, sloppy, unattractive"
        "descrizione": "graffiti style {prompt} . street art, vibrant, urban, detailed, tag, mural",
        "negativePrompt": "ugly, deformed, noisy, blurry, low contrast, realism, photorealistic"
    "POP-ART-STYLE": {
        "descrizione": "pop Art style {prompt} . bright colors, bold outlines, popular culture themes, ironic or kitsch",
        "negativePrompt": "ugly, deformed, noisy, blurry, low contrast, realism, photorealistic, minimalist"
        "descrizione": "isometric style {prompt} . vibrant, beautiful, crisp, detailed, ultra detailed, intricate",
        "negativePrompt": "deformed, mutated, ugly, disfigured, blur, blurry, noise, noisy, realistic, photographic"
        "descrizione": "low-poly style {prompt}. ambient occlusion, low-poly game art, polygon mesh, jagged, blocky, wireframe edges, centered composition",
        "negativePrompt": "noisy, sloppy, messy, grainy, highly detailed, ultra textured, photo"
        "descrizione": "claymation style {prompt} . sculpture, clay art, centered composition, play-doh",
        "negativePrompt": ""
        "descrizione": "professional 3d model {prompt} . octane render, highly detailed, volumetric, dramatic lighting",
        "negativePrompt": "ugly, deformed, noisy, low poly, blurry, painting"
        "descrizione": "anime artwork {prompt} . anime style, key visual, vibrant, studio anime, highly detailed",
        "negativePrompt": "photo, deformed, black and white, realism, disfigured, low contrast"
        "descrizione": "ethereal fantasy concept art of {prompt} . magnificent, celestial, ethereal, painterly, epic, majestic, magical, fantasy art, cover art, dreamy",
        "negativePrompt": "photographic, realistic, realism, 35mm film, dslr, cropped, frame, text, deformed, glitch, noise, noisy, off-center, deformed, cross-eyed, closed eyes, bad anatomy, ugly, disfigured, sloppy, duplicate, mutated, black and white"
        "descrizione": "cybernetic style {prompt} . futuristic, technological, cybernetic enhancements, robotics, artificial intelligence themes",
        "negativePrompt": "ugly, deformed, noisy, blurry, low contrast, realism, photorealistic, historical, medieval"
        "descrizione": "futuristic style {prompt} . sleek, modern, ultramodern, high tech, detailed",
        "negativePrompt": "ugly, deformed, noisy, blurry, low contrast, realism, photorealistic, vintage, antique"
    "SCI-FI-STYLE": {
        "descrizione": "sci-fi style {prompt} . futuristic, technological, alien worlds, space themes, advanced civilizations",
        "negativePrompt": "ugly, deformed, noisy, blurry, low contrast, realism, photorealistic, historical, medieval"
    "DIGITAL-ART": {
        "descrizione": "Digital Art {prompt} . vibrant, cute, digital, handmade",
        "negativePrompt": ""
    "SIMPLE-LOGO": {
        "descrizione": "Minimalist Logo {prompt} . material design, primary colors, stylized, minimalist",
        "negativePrompt": "3D, high detail, noise, grainy, blurry, painting, drawing, photo, disfigured"
        "descrizione": "Ultra-minimalist Material Design logo for a BRAND: {prompt} . simple, few colors, clean lines, minimal details, modern color palette, no shadows",
        "negativePrompt": "3D, high detail, noise, grainy, blurry, painting, drawing, photo, disfigured"

class InputImage(BaseModel):
    input: str
    negativePrompt: str = '' 
    style: str = ''
    steps: int = 25
    cfg: int = 6
    seed: int = -1
    variante = False"/Immagine")
def generate_image(request: Request, input_data: InputImage):
    #client = Client("")
        if == 'RANDOM':
            random_style = random.choice(list(style_image.keys()))
            style_info = style_image[random_style]
            input_data.input = style_info["descrizione"].format(prompt=input_data.input)
            input_data.negativePrompt = style_info["negativePrompt"]           
        elif in style_image:
            style_info = style_image[]
            input_data.input = style_info["descrizione"].format(prompt=input_data.input)
            input_data.negativePrompt = style_info["negativePrompt"]
    max_attempts = 5
    attempt = 0
    while attempt < max_attempts:
            if input_data.variante == False:
                #client = Client("AP123/SDXL-Lightning")
                client = Client("ByteDance/SDXL-Lightning")
                result = client.predict(
                image_url = result
                #client = Client("")
                client = Client("")
                result = client.predict(
                    input_data.input,	# str  in 'Prompt' Textbox component                    
                    input_data.negativePrompt,	# str  in 'Negative prompt' Textbox component
                    True,	# bool  in 'Use negative prompt' Checkbox component
                    0,	# float (numeric value between 0 and 2147483647) in 'Seed' Slider component
                    1024,	# float (numeric value between 256 and 1536) in 'Width' Slider component
                    1024,	# float (numeric value between 256 and 1536) in 'Height' Slider component
                    3,	# float (numeric value between 0.1 and 20) in 'Guidance Scale' Slider component
                    True,	# bool  in 'Randomize seed' Checkbox component
                image_url = result[0][0]['image']
            with open(image_url, 'rb') as img_file:
                img_binary =
                img_base64 = base64.b64encode(img_binary).decode('utf-8')
            return {"response": img_base64}
        except requests.exceptions.HTTPError as e:
            attempt += 1
            if attempt < max_attempts:
                return {"error": "Errore interno del server persistente!"}
    return {"error": "Numero massimo di tentativi raggiunto"}

#--------------------------------------------------- IMAGE TO TEXT ------------------------------------------------------
class InputImageToText(BaseModel): 
    base64: str
    input: str = ''

def base64_in_immagine(dati_base64):
    immagine = base64.b64decode(dati_base64)
    immagine_pil =
    nome_file = "/tmp/img.jpg""/Image_To_Text")
def image_to_text(request: Request, input_data: InputImageToText):
    if input_data.input == '':
        input_data.input = 'Describe the image'
    Version = 1
    if Version == 1:
        client = Client("")
        result = client.predict(
        client = Client("vikhyatk/moondream2")
        result = client.predict(
    LoggaTesto("IMMAGINE", {"response": result}, False)
    return {"response": result}
#--------------------------------------------------- API PostSpazio ------------------------------------------------------"/PostSpazio")
def generate_postspazio(request: Request, input_data: PostSpazio):
    client = Client(input_data.nomeSpazio)
    result = client.predict(
    return {"response": result}

def read_general(): 
    return {"response": "Benvenuto. Per maggiori info:"}