AI / main.py
MatteoScript's picture
Update main.py
c4ffce6 verified
raw
history blame
21.8 kB
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware # Importa il middleware CORS
from pydantic import BaseModel
from huggingface_hub import InferenceClient
from datetime import datetime
from gradio_client import Client
import base64
import requests
import os
import socket
import time
from enum import Enum
import random
import aiohttp
import asyncio
import json
#--------------------------------------------------- Definizione Server FAST API ------------------------------------------------------
app = FastAPI()
client = InferenceClient("mistralai/Mixtral-8x7B-Instruct-v0.1")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class InputData(BaseModel):
input: str
systemRole: str = ''
systemStyle: str = ''
instruction: str = ''
temperature: float = 0.7
max_new_tokens: int = 2000
top_p: float = 0.95
repetition_penalty: float = 1.0
asincrono: bool = False
NumeroGenerazioni: int = 1
StringaSplit: str = '********'
NumeroCaratteriSplitInstruction: int = 30000
EliminaRisposteNonPertinenti: bool = False
UnificaRispostaPertinente: bool = False
class InputDataAsync(InputData):
test: str = ''
class PostSpazio(BaseModel):
nomeSpazio: str
input: str = ''
api_name: str = "/chat"
def LoggaTesto(log_type, data, serializza=True):
if serializza:
formatted_data = json.dumps(data, indent=2)
else:
formatted_data = data
print(f"\n{datetime.now()}: ---------------------------------------------------------------| {log_type} |--------------------------------------------------------------\n{formatted_data}")
#--------------------------------------------------- Generazione TESTO ------------------------------------------------------
@app.post("/Genera")
def generate_text(request: Request, input_data: InputData):
if not input_data.asincrono:
temperature = input_data.temperature
max_new_tokens = input_data.max_new_tokens
top_p = input_data.top_p
repetition_penalty = input_data.repetition_penalty
input_text = generate_input_text(input_data)
LoggaTesto("RICHIESTA SINCRONA", input_text, False)
max_new_tokens = min(max_new_tokens, 29500 - len(input_text))
history = []
generated_response = generate(input_text, history, temperature, max_new_tokens, top_p, repetition_penalty)
LoggaTesto("RISPOSTA SINCRONA", {"response": generated_response})
return {"response": generated_response}
else:
input_data.asincrono = False
if input_data.EliminaRisposteNonPertinenti:
msgEliminaRisposteNonPertinenti = " (Rispondi solo sulla base delle ISTRUZIONI che hai ricevuto. se non trovi corrispondenza tra RICHIESTA e ISTRUZIONI rispondi con <NOTFOUND>!!!)"
input_data.input = input_data.input + msgEliminaRisposteNonPertinenti
input_data.systemRole = input_data.systemRole + msgEliminaRisposteNonPertinenti
result_data = asyncio.run(GeneraTestoAsync("https://matteoscript-fastapi.hf.space/Genera", input_data))
LoggaTesto("RISPOSTA ASINCRONA FINALE", {"response": result_data})
if input_data.EliminaRisposteNonPertinenti:
result_data = [item for item in result_data if "NOTFOUND" not in item["response"]]
if input_data.UnificaRispostaPertinente:
input_data.instruction = f'''{result_data}'''
result_data = asyncio.run(GeneraTestoAsync("https://matteoscript-fastapi.hf.space/Genera", input_data))
return {"response": result_data}
def generate_input_text(input_data):
if input_data.instruction.startswith("http"):
try:
resp = requests.get(input_data.instruction)
resp.raise_for_status() # Lancia un'eccezione per errori HTTP
input_data.instruction = resp.text
except requests.exceptions.RequestException as e:
input_data.instruction = ""
history = []
if input_data.systemRole != "" or input_data.systemStyle != "" or input_data.instruction != "":
input_text = f'''
{{
"input": {{
"role": "system",
"content": "{input_data.systemRole}",
"style": "{input_data.systemStyle}"
}},
"messages": [
{{
"role": "instructions",
"content": "{input_data.instruction} "("{input_data.systemStyle}")"
}},
{{
"role": "user",
"content": "{input_data.input}"
}}
]
}}
'''
else:
input_text = input_data.input
return input_text
def generate(prompt, history, temperature=0.7, max_new_tokens=30000, top_p=0.95, repetition_penalty=1.0):
temperature = float(temperature)
if temperature < 1e-2:
temperature = 1e-2
top_p = float(top_p)
generate_kwargs = dict(
temperature=temperature,
max_new_tokens=max_new_tokens,
top_p=top_p,
repetition_penalty=repetition_penalty,
do_sample=True,
seed=random.randint(0, 10**7),
)
formatted_prompt = format_prompt(prompt, history)
output = client.text_generation(formatted_prompt, **generate_kwargs, stream=False, details=False)
return output
def format_prompt(message, history):
prompt = "<s>"
for user_prompt, bot_response in history:
prompt += f"[INST] {user_prompt} [/INST]"
prompt += f" {bot_response}</s> "
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
prompt += f"[{now}] [INST] {message} [/INST]"
return prompt
#--------------------------------------------------- Generazione TESTO ASYNC ------------------------------------------------------
@app.post("/GeneraAsync")
def generate_textAsync(request: Request, input_data: InputDataAsync):
result_data = asyncio.run(GeneraTestoAsync("https://matteoscript-fastapi.hf.space/Genera", input_data))
return {"response": result_data}
async def make_request(session, token, data, url, max_retries=3):
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + token
}
for _ in range(max_retries):
try:
async with session.post(url, headers=headers, json=data) as response:
response.raise_for_status()
try:
result_data = await response.json()
except aiohttp.ContentTypeError:
result_data = await response.text()
return result_data
except (asyncio.TimeoutError, aiohttp.ClientError, requests.exceptions.HTTPError) as e:
LoggaTesto("ERRORE ASYNC", {e})
if isinstance(e, (asyncio.TimeoutError, requests.exceptions.HTTPError)) and e.response.status in [502, 504]:
break
await asyncio.sleep(1)
raise Exception("Max retries reached or skipping retries. Unable to make the request.")
async def CreaListaInput(input_data):
if input_data.instruction.startswith("http"):
try:
resp = requests.get(input_data.instruction)
resp.raise_for_status()
input_data.instruction = resp.text
except requests.exceptions.RequestException as e:
input_data.instruction = ""
try:
lista_dizionari = []
nuova_lista_dizionari = []
lista_dizionari = json.loads(input_data.instruction)
if lista_dizionari and "Titolo" in lista_dizionari[0]:
nuova_lista_dizionari = DividiInstructionJSON(lista_dizionari, input_data)
else:
nuova_lista_dizionari = DividiInstructionText(input_data)
except json.JSONDecodeError:
nuova_lista_dizionari = DividiInstructionText(input_data)
return nuova_lista_dizionari
def split_at_space_or_dot(input_string, length):
delimiters = ['\n\n', '.\n', ';\n', '.', ' ']
positions = [input_string.rfind(d, 0, length) for d in delimiters]
valid_positions = [pos for pos in positions if pos >= 0]
lastpos = max(valid_positions) if valid_positions else length
indice_divisione = int(lastpos)
return indice_divisione + 1
def DividiInstructionJSON(lista_dizionari, input_data):
ListaInput = []
nuova_lista_dizionari = []
for dizionario in lista_dizionari:
titolo = dizionario["Titolo"]
testo_completo = dizionario["Testo"]
while len(testo_completo) > input_data.NumeroCaratteriSplitInstruction:
indice_divisione = split_at_space_or_dot(testo_completo, input_data.NumeroCaratteriSplitInstruction)
indice_divisione_precedente = split_at_space_or_dot(testo_completo, input_data.NumeroCaratteriSplitInstruction-100)
sottostringa = testo_completo[:indice_divisione].strip()
testo_completo = testo_completo[indice_divisione_precedente:].strip()
nuovo_dizionario = {"Titolo": titolo, "Testo": sottostringa}
nuova_lista_dizionari.append(nuovo_dizionario)
if len(testo_completo) > 0:
nuovo_dizionario = {"Titolo": titolo, "Testo": testo_completo}
nuova_lista_dizionari.append(nuovo_dizionario)
input_strings = input_data.input.split(input_data.StringaSplit)
for input_string in input_strings:
for dizionario in nuova_lista_dizionari:
data = {
'input': input_string,
'instruction': str(dizionario),
'temperature': input_data.temperature,
'max_new_tokens': input_data.max_new_tokens,
'top_p': input_data.top_p,
'repetition_penalty': input_data.repetition_penalty,
'systemRole': input_data.systemRole,
'systemStyle': input_data.systemStyle
}
ListaInput.append(data)
return ListaInput
def DividiInstructionText(input_data):
ListaInput = []
input_str = input_data.instruction
StringaSplit = input_data.StringaSplit
sottostringhe = []
indice_inizio = 0
if len(input_str) > input_data.NumeroCaratteriSplitInstruction:
while indice_inizio < len(input_str):
lunghezza_sottostringa = split_at_space_or_dot(input_str[indice_inizio:], input_data.NumeroCaratteriSplitInstruction)
sottostringhe.append(input_str[indice_inizio:indice_inizio + lunghezza_sottostringa].strip())
indice_inizio += lunghezza_sottostringa
else:
sottostringhe.append(input_str)
testoSeparato = StringaSplit.join(sottostringhe)
instruction_strings = testoSeparato.split(StringaSplit)
input_strings = input_data.input.split(input_data.StringaSplit)
for input_string in input_strings:
for instruction_string in instruction_strings:
data = {
'input': input_string.strip(),
'instruction': str([instruction_string.strip()]),
'temperature': input_data.temperature,
'max_new_tokens': input_data.max_new_tokens,
'top_p': input_data.top_p,
'repetition_penalty': input_data.repetition_penalty,
'systemRole': input_data.systemRole,
'systemStyle': input_data.systemStyle
}
ListaInput.append(data)
return ListaInput
async def GeneraTestoAsync(url, input_data):
token = os.getenv('TOKEN')
async with aiohttp.ClientSession() as session:
tasks = []
ListaInput = await CreaListaInput(input_data)
for data in ListaInput:
LoggaTesto("RICHIESTA ASINCRONA", data)
tasks.extend([make_request(session, token, data, url) for _ in range(input_data.NumeroGenerazioni)])
return await asyncio.gather(*tasks)
#--------------------------------------------------- Generazione IMMAGINE ------------------------------------------------------
style_image = {
"PROFESSIONAL-PHOTO": {
"descrizione": "Professional photo {prompt} . Vivid colors, Mirrorless, 35mm lens, f/1.8 aperture, ISO 100, natural daylight",
"negativePrompt": "out of frame, lowres, text, error, cropped, worst quality, low quality, jpeg artifacts, ugly, duplicate, morbid, mutilated, out of frame, extra fingers, mutated hands, poorly drawn hands, poorly drawn face, mutation, deformed, blurry, bad anatomy, bad proportions, extra limbs, cloned face, disfigured, gross proportions, malformed limbs, missing arms, missing legs, extra arms, extra legs, fused fingers, too many fingers, long neck, username, watermark, signature"
},
"CINEMATIC-PHOTO": {
"descrizione": "cinematic photo {prompt} . 35mm photograph, film, bokeh, professional, 4k, highly detailed",
"negativePrompt": "drawing, painting, crayon, sketch, graphite, impressionist, noisy, blurry, soft, deformed, ugly"
},
"CINEMATIC-PORTRAIT": {
"descrizione": "cinematic portrait {prompt} 8k, ultra realistic, good vibes, vibrant",
"negativePrompt": "drawing, painting, crayon, sketch, graphite, impressionist, noisy, blurry, soft, deformed, ugly"
},
"LINE-ART-DRAWING": {
"descrizione": "line art drawing {prompt} . professional, sleek, modern, minimalist, graphic, line art, vector graphics",
"negativePrompt": "anime, photorealistic, 35mm film, deformed, glitch, blurry, noisy, off-center, deformed, cross-eyed, closed eyes, bad anatomy, ugly, disfigured, mutated, realism, realistic, impressionism, expressionism, oil, acrylic"
},
"COMIC": {
"descrizione": "comic {prompt} . graphic illustration, comic art, graphic novel art, vibrant, highly detailed",
"negativePrompt": "photograph, deformed, glitch, noisy, realistic, stock photo"
},
"ADVERTISING-POSTER-STYLE": {
"descrizione": "advertising poster style {prompt} . Professional, modern, product-focused, commercial, eye-catching, highly detailed",
"negativePrompt": "noisy, blurry, amateurish, sloppy, unattractive"
},
"RETAIL-PACKAGING-STYLE": {
"descrizione": "retail packaging style {prompt} . vibrant, enticing, commercial, product-focused, eye-catching, professional, highly detailed",
"negativePrompt": "noisy, blurry, amateurish, sloppy, unattractive"
},
"GRAFFITI-STYLE": {
"descrizione": "graffiti style {prompt} . street art, vibrant, urban, detailed, tag, mural",
"negativePrompt": "ugly, deformed, noisy, blurry, low contrast, realism, photorealistic"
},
"POP-ART-STYLE": {
"descrizione": "pop Art style {prompt} . bright colors, bold outlines, popular culture themes, ironic or kitsch",
"negativePrompt": "ugly, deformed, noisy, blurry, low contrast, realism, photorealistic, minimalist"
},
"ISOMETRIC-STYLE": {
"descrizione": "isometric style {prompt} . vibrant, beautiful, crisp, detailed, ultra detailed, intricate",
"negativePrompt": "deformed, mutated, ugly, disfigured, blur, blurry, noise, noisy, realistic, photographic"
},
"LOW-POLY-STYLE": {
"descrizione": "low-poly style {prompt}. ambient occlusion, low-poly game art, polygon mesh, jagged, blocky, wireframe edges, centered composition",
"negativePrompt": "noisy, sloppy, messy, grainy, highly detailed, ultra textured, photo"
},
"CLAYMATION-STYLE": {
"descrizione": "claymation style {prompt} . sculpture, clay art, centered composition, play-doh",
"negativePrompt": ""
},
"PROFESSIONAL-3D-MODEL": {
"descrizione": "professional 3d model {prompt} . octane render, highly detailed, volumetric, dramatic lighting",
"negativePrompt": "ugly, deformed, noisy, low poly, blurry, painting"
},
"ANIME-ARTWORK": {
"descrizione": "anime artwork {prompt} . anime style, key visual, vibrant, studio anime, highly detailed",
"negativePrompt": "photo, deformed, black and white, realism, disfigured, low contrast"
},
"ETHEREAL-FANTASY-CONCEPT-ART": {
"descrizione": "ethereal fantasy concept art of {prompt} . magnificent, celestial, ethereal, painterly, epic, majestic, magical, fantasy art, cover art, dreamy",
"negativePrompt": "photographic, realistic, realism, 35mm film, dslr, cropped, frame, text, deformed, glitch, noise, noisy, off-center, deformed, cross-eyed, closed eyes, bad anatomy, ugly, disfigured, sloppy, duplicate, mutated, black and white"
},
"CYBERNETIC-STYLE": {
"descrizione": "cybernetic style {prompt} . futuristic, technological, cybernetic enhancements, robotics, artificial intelligence themes",
"negativePrompt": "ugly, deformed, noisy, blurry, low contrast, realism, photorealistic, historical, medieval"
},
"FUTURISTIC-STYLE": {
"descrizione": "futuristic style {prompt} . sleek, modern, ultramodern, high tech, detailed",
"negativePrompt": "ugly, deformed, noisy, blurry, low contrast, realism, photorealistic, vintage, antique"
},
"SCI-FI-STYLE": {
"descrizione": "sci-fi style {prompt} . futuristic, technological, alien worlds, space themes, advanced civilizations",
"negativePrompt": "ugly, deformed, noisy, blurry, low contrast, realism, photorealistic, historical, medieval"
},
"DIGITAL-ART": {
"descrizione": "Digital Art {prompt} . vibrant, cute, digital, handmade",
"negativePrompt": ""
},
"SIMPLE-LOGO": {
"descrizione": "Minimalist Logo {prompt} . material design, primary colors, stylized, minimalist",
"negativePrompt": "3D, high detail, noise, grainy, blurry, painting, drawing, photo, disfigured"
},
"MINIMALISTIC-LOGO": {
"descrizione": "Ultra-minimalist Material Design logo for a BRAND: {prompt} . simple, few colors, clean lines, minimal details, modern color palette, no shadows",
"negativePrompt": "3D, high detail, noise, grainy, blurry, painting, drawing, photo, disfigured"
}
}
class InputImage(BaseModel):
input: str
negativePrompt: str = ''
style: str = ''
steps: int = 25
cfg: int = 6
seed: int = -1
@app.post("/Immagine")
def generate_image(request: Request, input_data: InputImage):
client = Client("https://manjushri-sdxl-1-0.hf.space/")
if input_data.style:
print(input_data.style)
if input_data.style == 'RANDOM':
random_style = random.choice(list(style_image.keys()))
style_info = style_image[random_style]
input_data.input = style_info["descrizione"].format(prompt=input_data.input)
input_data.negativePrompt = style_info["negativePrompt"]
elif input_data.style in style_image:
style_info = style_image[input_data.style]
input_data.input = style_info["descrizione"].format(prompt=input_data.input)
input_data.negativePrompt = style_info["negativePrompt"]
max_attempts = 2
attempt = 0
while attempt < max_attempts:
try:
result = client.predict(
input_data.input, # str in 'What you want the AI to generate. 77 Token Limit. A Token is Any Word, Number, Symbol, or Punctuation. Everything Over 77 Will Be Truncated!' Textbox component
input_data.negativePrompt, # str in 'What you Do Not want the AI to generate. 77 Token Limit' Textbox component
1024, # int | float (numeric value between 512 and 1024) in 'Height' Slider component
1024, # int | float (numeric value between 512 and 1024) in 'Width' Slider component
input_data.cfg, # int | float (numeric value between 1 and 15) in 'Guidance Scale: How Closely the AI follows the Prompt' Slider component
input_data.steps, # int | float (numeric value between 25 and 100) in 'Number of Iterations' Slider component
0, # int | float (numeric value between 0 and 999999999999999999) in 'Seed: 0 is Random' Slider component
"Yes", # str in 'Upscale?' Radio component
"", # str in 'Embedded Prompt' Textbox component
"", # str in 'Embedded Negative Prompt' Textbox component
0.99, # int | float (numeric value between 0.7 and 0.99) in 'Refiner Denoise Start %' Slider component
100, # int | float (numeric value between 1 and 100) in 'Refiner Number of Iterations %' Slider component
api_name="/predict"
)
image_url = result[0]
print(image_url)
with open(image_url, 'rb') as img_file:
img_binary = img_file.read()
img_base64 = base64.b64encode(img_binary).decode('utf-8')
return {"response": img_base64}
except requests.exceptions.HTTPError as e:
time.sleep(1)
attempt += 1
if attempt < max_attempts:
continue
else:
return {"error": "Errore interno del server persistente"}
return {"error": "Numero massimo di tentativi raggiunto"}
#--------------------------------------------------- API PostSpazio ------------------------------------------------------
@app.post("/PostSpazio")
def generate_postspazio(request: Request, input_data: PostSpazio):
client = Client(input_data.nomeSpazio)
result = client.predict(
input_data.input,
api_name=input_data.api_name
)
return {"response": result}
@app.get("/")
def read_general():
return {"response": "Benvenuto. Per maggiori info: https://matteoscript-fastapi.hf.space/docs"}