Spaces:
Sleeping
Sleeping
import gradio as gr | |
from collections import defaultdict | |
import random | |
import pandas as pd | |
import os | |
import cloudinary | |
import cloudinary.uploader | |
from cloudinary.utils import cloudinary_url | |
import io | |
import time | |
import requests | |
# Configuration | |
cloudinary.config( | |
cloud_name = "dnzirogvb", | |
api_key = os.environ["api_key"] , | |
api_secret = os.environ["api_secret"] , | |
secure=True | |
) | |
from datetime import datetime | |
import pytz | |
# Define the GMT+7 timezone | |
gmt_plus7 = pytz.timezone('Etc/GMT-7') # 'Etc/GMT-7' represents GMT+7 | |
def authenticate(user_id): | |
url = "https://intern-api.imtaedu.com/api/subnets/1/authenticate" | |
headers = { | |
"Content-Type": "application/json", | |
"Accept": "application/json", | |
"X-Public-Api-Key": os.environ['ADMIN'] | |
} | |
payload = { "token": user_id } | |
response = requests.post(url, json=payload, headers=headers) | |
return response.status_code == 200 | |
def login(username, state, package): | |
state[0] = username | |
#package[0] = get_next_package(user_id=username) | |
#temp | |
gr.Info("Login successfully. Welcome!") | |
return f"Welcome, {username}!", gr.update(visible=False), gr.update(visible=True) | |
#temp | |
# Authenticate user | |
if authenticate(username): | |
#user_sessions[username] = True | |
gr.Info("Login successfully. Welcome!") | |
return gr.update(visible=False), gr.update(visible=True), get_next_en_text(username), package[0][2] | |
else: | |
raise gr.Error("Username or Password is invalid! Try again!") | |
return "Invalid username or password.", gr.update(visible=True), gr.update(visible=False), "", "" | |
def logout(username): | |
# Log out user and reset session | |
if username in user_sessions: | |
del user_sessions[username] | |
gr.Warning("You need to login to access this subnet and do task ⛔️!", duration=5) | |
return "Logged out. Please log in again.", gr.update(visible=True), gr.update(visible=False) | |
def get_image_bytes(image): | |
if image is None: | |
return "No image uploaded!" | |
# Convert the image to bytes | |
img_byte_arr = io.BytesIO() | |
image.save(img_byte_arr, format='PNG') # Save as PNG or desired format | |
img_bytes = img_byte_arr.getvalue() | |
return img_bytes | |
# Define a function to handle image and text caption | |
def process_image_and_caption(username_input, image, caption): | |
if image is None: | |
gr.Warning("No image uploaded!", duration=5) | |
return "No image uploaded!", image, caption | |
print("here is cation" + caption) | |
print("here is cation type" + str(type(caption))) | |
if caption is None: | |
gr.Warning("No caption uploaded!", duration=5) | |
return "No caption uploaded!", image, caption | |
# Get current time in GMT+7 | |
current_time = datetime.now(gmt_plus7) | |
# Print the time | |
time = current_time.strftime('-%Y-%m-%d-%H-%M-%S') | |
name = username_input + time | |
# Upload an image | |
img_bytes = get_image_bytes(image) | |
upload_image_result = cloudinary.uploader.upload(img_bytes, public_id = name, asset_folder = "image_for_captioning") | |
# Upload caption | |
# Convert to bytes | |
caption_bytes = caption.encode('utf-8') | |
# Create a byte array buffer (like a binary file stream) | |
byte_buffer = io.BytesIO(caption_bytes) | |
upload_caption_result = cloudinary.uploader.upload(byte_buffer, public_id = name, asset_folder = "caption", resource_type = "raw") | |
#send_score(username_input, 0.1) | |
gr.Info("Submit successfully") | |
return f'You just upload an Image with Caption: "{caption}" \nIntime {time}', None, None | |
def send_score(user_id, score): | |
max_retries = 10 | |
while max_retries > 0: | |
url = "https://intern-api.imtaedu.com/api/subnets/2/grade" | |
payload = { | |
"token": user_id, | |
"comment": "Good job!", | |
"grade": score, | |
"submitted_at": "2021-01-01 00:00:00", | |
"graded_at": "2021-01-01 00:00:00" | |
} | |
headers = { | |
"Content-Type": "application/json", | |
"Accept": "application/json", | |
"X-Public-Api-Key": os.environ['ADMIN'] | |
} | |
response = requests.post(url, json=payload, headers=headers) | |
if response.status_code == 200: | |
return True | |
print(response) | |
max_retries -= 1 | |
return False | |
# Define the Gradio interface | |
with gr.Blocks() as demo: | |
state = gr.State([None]) | |
package = gr.State([None]) | |
# Login section | |
with gr.Column(visible=True) as login_section: | |
username_input = gr.Textbox(placeholder="Enter your token", label="Token ID", type="password") | |
login_button = gr.Button("Login") | |
login_output = gr.Textbox(label="Login Status", interactive=False) | |
# Upload section (initially hidden) | |
with gr.Column(visible=False) as upload_section: | |
with gr.Row(): | |
image_input = gr.Image(type="pil", label="Upload Image") | |
caption_input = gr.Textbox(label="Enter Caption") | |
submit_button = gr.Button("Submit") | |
output = gr.Textbox(label="Output") | |
# Button functions | |
submit_button.click( | |
process_image_and_caption, | |
inputs=[username_input, image_input, caption_input], | |
outputs=[output, image_input, caption_input] | |
) | |
caption_input.submit( | |
process_image_and_caption, | |
inputs=[username_input, image_input, caption_input], | |
outputs=[output, image_input, caption_input] | |
) | |
username_input.submit( | |
login, inputs=[username_input, state, package], outputs=[login_output, login_section, upload_section] #, translation_section, en_input, vi_input] | |
) | |
login_button.click( | |
login, inputs=[username_input, state, package], outputs=[login_output, login_section, upload_section] #, translation_section, en_input, vi_input] | |
) | |
demo.launch(debug=True) |