AI / main.py
MatteoScript's picture
Update main.py
0650f0c verified
raw
history blame
26.4 kB
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware # Importa il middleware CORS
from pydantic import BaseModel
from huggingface_hub import InferenceClient
from datetime import datetime
from gradio_client import Client
import base64
import requests
import os
import socket
import time
from enum import Enum
import random
import aiohttp
import asyncio
import json
from types import SimpleNamespace
from io import BytesIO
from PIL import Image
#--------------------------------------------------- Definizione Server FAST API ------------------------------------------------------
app = FastAPI()
client = InferenceClient("mistralai/Mixtral-8x7B-Instruct-v0.1")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class InputData(BaseModel):
input: str
systemRole: str = ''
systemStyle: str = ''
instruction: str = ''
temperature: float = 0.7
max_new_tokens: int = 2000
top_p: float = 0.95
repetition_penalty: float = 1.0
asincrono: bool = False
NumeroGenerazioni: int = 1
StringaSplit: str = '********'
NumeroCaratteriSplitInstruction: int = 30000
EliminaRisposteNonPertinenti: bool = False
UnificaRispostaPertinente: bool = False
telegramChatId: str = ''
telegramUrlBot: str = ''
telegramUrlPost: str = ''
class InputDataAsync(InputData):
test: str = ''
class PostSpazio(BaseModel):
nomeSpazio: str
input: str = ''
api_name: str = "/chat"
def LoggaTesto(log_type, data, serializza=True):
if serializza:
formatted_data = json.dumps(data, indent=2)
else:
formatted_data = data
print(f"\n{datetime.now()}: ---------------------------------------------------------------| {log_type} |--------------------------------------------------------------\n{formatted_data}")
#--------------------------------------------------- Generazione TESTO ------------------------------------------------------
@app.post("/Genera")
def generate_text(request: Request, input_data: InputData):
if not input_data.asincrono:
LoggaTesto("INPUT", input_data.input, False)
temperature = input_data.temperature
max_new_tokens = input_data.max_new_tokens
top_p = input_data.top_p
repetition_penalty = input_data.repetition_penalty
input_text = generate_input_text(input_data)
history = []
generated_response = generate(input_text, history, temperature, max_new_tokens, top_p, repetition_penalty)
if input_data.telegramChatId != '' and input_data.telegramUrlBot != '' and input_data.telegramUrlPost != '':
asyncio.run(call_telegram_api(input_data, generated_response))
LoggaTesto("RISPOSTA", {"response": generated_response}, False)
return {"response": generated_response}
#return json.dumps({"response": generated_response})
else:
input_data.asincrono = False
if input_data.EliminaRisposteNonPertinenti:
msgEliminaRisposteNonPertinenti = " (Rispondi solo sulla base delle ISTRUZIONI che hai ricevuto. se non trovi corrispondenza tra RICHIESTA e ISTRUZIONI rispondi con <NOTFOUND>!!!)"
input_data.input = input_data.input + msgEliminaRisposteNonPertinenti
input_data.systemRole = input_data.systemRole + msgEliminaRisposteNonPertinenti
result_data = asyncio.run(GeneraTestoAsync("https://matteoscript-fastapi.hf.space/Genera", input_data))
#result_data = result_data.replace('"', '')
LoggaTesto("RISPOSTA ASINCRONA", {"response": result_data})
if input_data.EliminaRisposteNonPertinenti:
result_data = [item for item in result_data if "NOTFOUND" not in item["response"]]
if input_data.UnificaRispostaPertinente:
input_data.input= f'''Metti insieme le seguenti risposte. Basati solo su questo TESTO e non AGGIUNGERE ALTRO!!!!: {result_data}'''
input_data.systemRole = ''
input_data.systemStyle = 'Rispondi in ITALIANO'
input_data.instruction =''
result_data = asyncio.run(GeneraTestoAsync("https://matteoscript-fastapi.hf.space/Genera", input_data))
#result_data = result_data.replace('"', '')
LoggaTesto("RISPOSTA ASINCRONA UNIFICATA", {"response": result_data})
return {"response": result_data}
#return json.dumps({"response": result_data})
def call_telegram_api_OLD(input_data, text):
payload = {
"chat_id": input_data.telegramChatId,
"text": text,
"telegramUrl": input_data.telegramUrlBot
}
response = requests.post(input_data.telegramUrlPost, json=payload)
if response.status_code == 200:
print("Invio messaggio TELEGRAM")
else:
print("Errore nella richiesta POST. Codice di stato:", response.status_code)
async def call_telegram_api(input_data, text):
payload = {
"chat_id": input_data.telegramChatId,
"text": text,
"telegramUrl": input_data.telegramUrlBot
}
async with aiohttp.ClientSession() as session:
async with session.post(input_data.telegramUrlPost, json=payload) as response:
response_text = await response.text()
def generate_input_text(input_data):
if input_data.instruction.startswith("http"):
try:
resp = requests.get(input_data.instruction)
resp.raise_for_status() # Lancia un'eccezione per errori HTTP
input_data.instruction = resp.text
except requests.exceptions.RequestException as e:
input_data.instruction = ""
history = []
if input_data.systemRole != "" or input_data.systemStyle != "" or input_data.instruction != "":
input_text = f'''
{{
"input": {{
"role": "system",
"content": "{input_data.systemRole}",
"style": "{input_data.systemStyle}"
}},
"messages": [
{{
"role": "instructions",
"content": "{input_data.instruction} "("{input_data.systemStyle}")"
}},
{{
"role": "user",
"content": "{input_data.input}"
}}
]
}}
'''
else:
input_text = input_data.input
return input_text
def generate(prompt, history, temperature=0.7, max_new_tokens=30000, top_p=0.95, repetition_penalty=1.0):
temperature = float(temperature)
if temperature < 1e-2:
temperature = 1e-2
top_p = float(top_p)
generate_kwargs = dict(
temperature=temperature,
max_new_tokens=max_new_tokens,
top_p=top_p,
repetition_penalty=repetition_penalty,
do_sample=True,
seed=random.randint(0, 10**7),
)
formatted_prompt = format_prompt(prompt, history)
output = client.text_generation(formatted_prompt, **generate_kwargs, stream=False, details=False)
return output
def format_prompt(message, history):
prompt = "<s>"
for user_prompt, bot_response in history:
prompt += f"[INST] {user_prompt} [/INST]"
prompt += f" {bot_response}</s> "
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
prompt += f"[{now}] [INST] {message} [/INST]"
return prompt
#--------------------------------------------------- Generazione TESTO ASYNC ------------------------------------------------------
@app.post("/GeneraAsync")
def generate_textAsync(request: Request, input_data: InputDataAsync):
result_data = asyncio.run(GeneraTestoAsync("https://matteoscript-fastapi.hf.space/Genera", input_data))
return {"response": result_data}
async def make_request(session, token, data, url, index, semaphore, max_retries=3):
async with semaphore:
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + token
}
if (int(index)+1) % 3 == 1:
data['max_new_tokens'] = data['max_new_tokens']
elif (int(index)+1) % 3 == 2:
data['max_new_tokens'] = max(200, data['max_new_tokens'] - 200)
else:
data['max_new_tokens'] = data['max_new_tokens'] + 200
for _ in range(max_retries):
try:
async with session.post(url, headers=headers, json=data) as response:
response.raise_for_status()
try:
result_data = await response.json()
except aiohttp.ContentTypeError:
result_data = await response.text()
return result_data
except (asyncio.TimeoutError, aiohttp.ClientError, requests.exceptions.HTTPError) as e:
LoggaTesto("ERRORE ASYNC", {e}, False)
if isinstance(e, (asyncio.TimeoutError, requests.exceptions.HTTPError)) and e.response.status in [502, 504]:
break
await asyncio.sleep(3)
raise Exception("Max retries reached or skipping retries. Unable to make the request.")
async def CreaListaInput(input_data):
if input_data.instruction.startswith("http"):
try:
resp = requests.get(input_data.instruction)
resp.raise_for_status()
input_data.instruction = resp.text
except requests.exceptions.RequestException as e:
input_data.instruction = ""
try:
lista_dizionari = []
nuova_lista_dizionari = []
lista_dizionari = json.loads(input_data.instruction)
if lista_dizionari and "Titolo" in lista_dizionari[0]:
nuova_lista_dizionari = DividiInstructionJSON(lista_dizionari, input_data)
else:
nuova_lista_dizionari = DividiInstructionText(input_data)
except json.JSONDecodeError:
nuova_lista_dizionari = DividiInstructionText(input_data)
return nuova_lista_dizionari
def split_at_space_or_dot(input_string, length):
delimiters = ['\n\n', '.\n', ';\n', '.', ' ']
positions = [input_string.rfind(d, 0, length) for d in delimiters]
valid_positions = [pos for pos in positions if pos >= 0]
lastpos = max(valid_positions) if valid_positions else length
indice_divisione = int(lastpos)
return indice_divisione + 1
def DividiInstructionJSON(lista_dizionari, input_data):
ListaInput = []
nuova_lista_dizionari = []
for dizionario in lista_dizionari:
titolo = dizionario["Titolo"]
testo_completo = dizionario["Testo"]
while len(testo_completo) > input_data.NumeroCaratteriSplitInstruction:
indice_divisione = split_at_space_or_dot(testo_completo, input_data.NumeroCaratteriSplitInstruction)
indice_divisione_precedente = split_at_space_or_dot(testo_completo, input_data.NumeroCaratteriSplitInstruction-100)
sottostringa = testo_completo[:indice_divisione].strip()
testo_completo = testo_completo[indice_divisione_precedente:].strip()
nuovo_dizionario = {"Titolo": titolo, "Testo": sottostringa}
nuova_lista_dizionari.append(nuovo_dizionario)
if len(testo_completo) > 0:
nuovo_dizionario = {"Titolo": titolo, "Testo": testo_completo}
nuova_lista_dizionari.append(nuovo_dizionario)
input_strings = input_data.input.split(input_data.StringaSplit)
systemRole_strings = input_data.systemRole.split(input_data.StringaSplit)
for systemRole_string in systemRole_strings:
for input_string in input_strings:
for dizionario in nuova_lista_dizionari:
data = {
'input': input_string,
'instruction': str(dizionario),
'temperature': input_data.temperature,
'max_new_tokens': input_data.max_new_tokens,
'top_p': input_data.top_p,
'repetition_penalty': input_data.repetition_penalty,
'systemRole': systemRole_string,
'systemStyle': input_data.systemStyle,
'telegramChatId': input_data.telegramChatId,
'telegramUrlBot': input_data.telegramUrlBot,
'telegramUrlPost': input_data.telegramUrlPost
}
ListaInput.append(data)
return ListaInput
def DividiInstructionText(input_data):
ListaInput = []
input_str = input_data.instruction
StringaSplit = input_data.StringaSplit
sottostringhe = []
indice_inizio = 0
if len(input_str) > input_data.NumeroCaratteriSplitInstruction:
while indice_inizio < len(input_str):
lunghezza_sottostringa = split_at_space_or_dot(input_str[indice_inizio:], input_data.NumeroCaratteriSplitInstruction)
sottostringhe.append(input_str[indice_inizio:indice_inizio + lunghezza_sottostringa].strip())
indice_inizio += lunghezza_sottostringa
else:
sottostringhe.append(input_str)
testoSeparato = StringaSplit.join(sottostringhe)
instruction_strings = testoSeparato.split(StringaSplit)
input_strings = input_data.input.split(input_data.StringaSplit)
systemRole_strings = input_data.systemRole.split(input_data.StringaSplit)
for systemRole_string in systemRole_strings:
for input_string in input_strings:
for instruction_string in instruction_strings:
data = {
'input': input_string.strip(),
'instruction': str([instruction_string.strip()]),
'temperature': input_data.temperature,
'max_new_tokens': input_data.max_new_tokens,
'top_p': input_data.top_p,
'repetition_penalty': input_data.repetition_penalty,
'systemRole': systemRole_string.strip(),
'systemStyle': input_data.systemStyle,
'telegramChatId': input_data.telegramChatId,
'telegramUrlBot': input_data.telegramUrlBot,
'telegramUrlPost': input_data.telegramUrlPost
}
ListaInput.append(data)
return ListaInput
async def GeneraTestoAsync(url, input_data):
token = os.getenv('TOKEN')
semaphore = asyncio.Semaphore(20)
async with aiohttp.ClientSession() as session:
tasks = []
ListaInput = await CreaListaInput(input_data)
for data in ListaInput:
LoggaTesto("RICHIESTA ASINCRONA", data)
tasks.extend([make_request(session, token, data, url, index, semaphore) for index in range(input_data.NumeroGenerazioni)])
#tasks.extend([generate_text_internal(data) for _ in range(input_data.NumeroGenerazioni)])
await asyncio.sleep(0.1)
return await asyncio.gather(*tasks)
async def generate_text_internal(datajson):
data = SimpleNamespace(**datajson)
temperature = data.temperature
max_new_tokens = data.max_new_tokens
top_p = data.top_p
repetition_penalty = data.repetition_penalty
input_text = generate_input_text(data)
max_new_tokens = min(max_new_tokens, 29500 - len(input_text))
history = []
generated_response = generate(input_text, history, temperature, max_new_tokens, top_p, repetition_penalty)
return generated_response
#--------------------------------------------------- Generazione IMMAGINE ------------------------------------------------------
style_image = {
"PROFESSIONAL-PHOTO": {
"descrizione": "Professional photo {prompt} . Vivid colors, Mirrorless, 35mm lens, f/1.8 aperture, ISO 100, natural daylight",
"negativePrompt": "out of frame, lowres, text, error, cropped, worst quality, low quality, jpeg artifacts, ugly, duplicate, morbid, mutilated, out of frame, extra fingers, mutated hands, poorly drawn hands, poorly drawn face, mutation, deformed, blurry, bad anatomy, bad proportions, extra limbs, cloned face, disfigured, gross proportions, malformed limbs, missing arms, missing legs, extra arms, extra legs, fused fingers, too many fingers, long neck, username, watermark, signature"
},
"CINEMATIC-PHOTO": {
"descrizione": "cinematic photo {prompt} . 35mm photograph, film, bokeh, professional, 4k, highly detailed",
"negativePrompt": "drawing, painting, crayon, sketch, graphite, impressionist, noisy, blurry, soft, deformed, ugly"
},
"CINEMATIC-PORTRAIT": {
"descrizione": "cinematic portrait {prompt} 8k, ultra realistic, good vibes, vibrant",
"negativePrompt": "drawing, painting, crayon, sketch, graphite, impressionist, noisy, blurry, soft, deformed, ugly"
},
"LINE-ART-DRAWING": {
"descrizione": "line art drawing {prompt} . professional, sleek, modern, minimalist, graphic, line art, vector graphics",
"negativePrompt": "anime, photorealistic, 35mm film, deformed, glitch, blurry, noisy, off-center, deformed, cross-eyed, closed eyes, bad anatomy, ugly, disfigured, mutated, realism, realistic, impressionism, expressionism, oil, acrylic"
},
"COMIC": {
"descrizione": "comic {prompt} . graphic illustration, comic art, graphic novel art, vibrant, highly detailed",
"negativePrompt": "photograph, deformed, glitch, noisy, realistic, stock photo"
},
"ADVERTISING-POSTER-STYLE": {
"descrizione": "advertising poster style {prompt} . Professional, modern, product-focused, commercial, eye-catching, highly detailed",
"negativePrompt": "noisy, blurry, amateurish, sloppy, unattractive"
},
"RETAIL-PACKAGING-STYLE": {
"descrizione": "retail packaging style {prompt} . vibrant, enticing, commercial, product-focused, eye-catching, professional, highly detailed",
"negativePrompt": "noisy, blurry, amateurish, sloppy, unattractive"
},
"GRAFFITI-STYLE": {
"descrizione": "graffiti style {prompt} . street art, vibrant, urban, detailed, tag, mural",
"negativePrompt": "ugly, deformed, noisy, blurry, low contrast, realism, photorealistic"
},
"POP-ART-STYLE": {
"descrizione": "pop Art style {prompt} . bright colors, bold outlines, popular culture themes, ironic or kitsch",
"negativePrompt": "ugly, deformed, noisy, blurry, low contrast, realism, photorealistic, minimalist"
},
"ISOMETRIC-STYLE": {
"descrizione": "isometric style {prompt} . vibrant, beautiful, crisp, detailed, ultra detailed, intricate",
"negativePrompt": "deformed, mutated, ugly, disfigured, blur, blurry, noise, noisy, realistic, photographic"
},
"LOW-POLY-STYLE": {
"descrizione": "low-poly style {prompt}. ambient occlusion, low-poly game art, polygon mesh, jagged, blocky, wireframe edges, centered composition",
"negativePrompt": "noisy, sloppy, messy, grainy, highly detailed, ultra textured, photo"
},
"CLAYMATION-STYLE": {
"descrizione": "claymation style {prompt} . sculpture, clay art, centered composition, play-doh",
"negativePrompt": ""
},
"PROFESSIONAL-3D-MODEL": {
"descrizione": "professional 3d model {prompt} . octane render, highly detailed, volumetric, dramatic lighting",
"negativePrompt": "ugly, deformed, noisy, low poly, blurry, painting"
},
"ANIME-ARTWORK": {
"descrizione": "anime artwork {prompt} . anime style, key visual, vibrant, studio anime, highly detailed",
"negativePrompt": "photo, deformed, black and white, realism, disfigured, low contrast"
},
"ETHEREAL-FANTASY-CONCEPT-ART": {
"descrizione": "ethereal fantasy concept art of {prompt} . magnificent, celestial, ethereal, painterly, epic, majestic, magical, fantasy art, cover art, dreamy",
"negativePrompt": "photographic, realistic, realism, 35mm film, dslr, cropped, frame, text, deformed, glitch, noise, noisy, off-center, deformed, cross-eyed, closed eyes, bad anatomy, ugly, disfigured, sloppy, duplicate, mutated, black and white"
},
"CYBERNETIC-STYLE": {
"descrizione": "cybernetic style {prompt} . futuristic, technological, cybernetic enhancements, robotics, artificial intelligence themes",
"negativePrompt": "ugly, deformed, noisy, blurry, low contrast, realism, photorealistic, historical, medieval"
},
"FUTURISTIC-STYLE": {
"descrizione": "futuristic style {prompt} . sleek, modern, ultramodern, high tech, detailed",
"negativePrompt": "ugly, deformed, noisy, blurry, low contrast, realism, photorealistic, vintage, antique"
},
"SCI-FI-STYLE": {
"descrizione": "sci-fi style {prompt} . futuristic, technological, alien worlds, space themes, advanced civilizations",
"negativePrompt": "ugly, deformed, noisy, blurry, low contrast, realism, photorealistic, historical, medieval"
},
"DIGITAL-ART": {
"descrizione": "Digital Art {prompt} . vibrant, cute, digital, handmade",
"negativePrompt": ""
},
"SIMPLE-LOGO": {
"descrizione": "Minimalist Logo {prompt} . material design, primary colors, stylized, minimalist",
"negativePrompt": "3D, high detail, noise, grainy, blurry, painting, drawing, photo, disfigured"
},
"MINIMALISTIC-LOGO": {
"descrizione": "Ultra-minimalist Material Design logo for a BRAND: {prompt} . simple, few colors, clean lines, minimal details, modern color palette, no shadows",
"negativePrompt": "3D, high detail, noise, grainy, blurry, painting, drawing, photo, disfigured"
}
}
class InputImage(BaseModel):
input: str
negativePrompt: str = ''
style: str = ''
steps: int = 25
cfg: int = 6
seed: int = -1
variante = False
@app.post("/Immagine")
def generate_image(request: Request, input_data: InputImage):
#client = Client("https://manjushri-sdxl-1-0.hf.space/")
if input_data.style:
print(input_data.style)
if input_data.style == 'RANDOM':
random_style = random.choice(list(style_image.keys()))
style_info = style_image[random_style]
input_data.input = style_info["descrizione"].format(prompt=input_data.input)
input_data.negativePrompt = style_info["negativePrompt"]
elif input_data.style in style_image:
style_info = style_image[input_data.style]
input_data.input = style_info["descrizione"].format(prompt=input_data.input)
input_data.negativePrompt = style_info["negativePrompt"]
max_attempts = 5
attempt = 0
while attempt < max_attempts:
try:
print(input_data.input)
if input_data.variante == False:
#client = Client("AP123/SDXL-Lightning")
client = Client("ByteDance/SDXL-Lightning")
result = client.predict(
input_data.input,
"8-Step",
api_name="/generate_image"
)
image_url = result
else:
#client = Client("https://playgroundai-playground-v2-5.hf.space/--replicas/9kuov/")
client = Client("https://choimirai-playground-v2-5.hf.space/--replicas/bgsav/")
result = client.predict(
input_data.input, # str in 'Prompt' Textbox component
input_data.negativePrompt, # str in 'Negative prompt' Textbox component
True, # bool in 'Use negative prompt' Checkbox component
0, # float (numeric value between 0 and 2147483647) in 'Seed' Slider component
1024, # float (numeric value between 256 and 1536) in 'Width' Slider component
1024, # float (numeric value between 256 and 1536) in 'Height' Slider component
3, # float (numeric value between 0.1 and 20) in 'Guidance Scale' Slider component
True, # bool in 'Randomize seed' Checkbox component
api_name="/run"
)
image_url = result[0][0]['image']
print(image_url)
with open(image_url, 'rb') as img_file:
img_binary = img_file.read()
img_base64 = base64.b64encode(img_binary).decode('utf-8')
return {"response": img_base64}
except requests.exceptions.HTTPError as e:
time.sleep(1)
attempt += 1
if attempt < max_attempts:
continue
else:
return {"error": "Errore interno del server persistente!"}
return {"error": "Numero massimo di tentativi raggiunto"}
#--------------------------------------------------- IMAGE TO TEXT ------------------------------------------------------
class InputImageToText(BaseModel):
base64: str
input: str = ''
def base64_in_immagine(dati_base64):
immagine = base64.b64decode(dati_base64)
immagine_pil = Image.open(BytesIO(immagine))
nome_file = "/tmp/img.jpg"
immagine_pil.save(nome_file)
@app.post("/Image_To_Text")
def image_to_text(request: Request, input_data: InputImageToText):
base64_in_immagine(input_data.base64)
if input_data.input == '':
input_data.input = 'Describe the image'
Version = 1
if Version == 1:
client = Client("https://vikhyatk-moondream1.hf.space/--replicas/av7ct/")
result = client.predict(
"/tmp/img.jpg",
input_data.input,
api_name="/answer_question"
)
else:
client = Client("vikhyatk/moondream2")
result = client.predict(
"/tmp/img.jpg",
input_data.input,
api_name="/answer_question_1"
)
LoggaTesto("IMMAGINE", {"response": result}, False)
return {"response": result}
#--------------------------------------------------- API PostSpazio ------------------------------------------------------
@app.post("/PostSpazio")
def generate_postspazio(request: Request, input_data: PostSpazio):
client = Client(input_data.nomeSpazio)
result = client.predict(
input_data.input,
api_name=input_data.api_name
)
return {"response": result}
@app.get("/")
def read_general():
return {"response": "Benvenuto. Per maggiori info: https://matteoscript-fastapi.hf.space/docs"}