smallville / worker.py
CazC's picture
Refactor code for improved performance and readability
5a2deaa
raw
history blame
2.95 kB
import subprocess
from supabase import create_client, Client
from imgGen import generateTransparentImage
import sys
sys.path.append('./TripoSR')
import TripoSR.obj_gen as obj_gen
import os
from dotenv import load_dotenv
import time
load_dotenv()
url: str = os.environ.get("SUPABASE_URL")
key: str = os.environ.get("SUPABASE_KEY")
supabase: Client = create_client(url, key)
def check_queue():
try:
tasks = supabase.table("Tasks").select("*").eq("status", "pending").execute()
assert len(tasks.data) > 0
if len(tasks.data) > 0:
return tasks.data[0]
else:
return None
except Exception as e:
print(f"Error checking queue: {e}")
return None
def generate_image(text):
try:
img = generateTransparentImage(text)
return img
except Exception as e:
print(f"Error generating image: {e}")
return None
def create_obj_file(img, task_id):
try:
obj_gen.generate_obj_from_image(img, 'task_'+str(task_id)+'.obj')
except Exception as e:
print(f"Error creating obj file: {e}")
supabase.table("Tasks").update({"status": "error"}).eq("id", task_id).execute()
def send_back_to_supabase(task_id):
# check that a file was created
if os.path.exists('task_'+str(task_id)+'.obj'):
try:
with open('task_'+str(task_id)+'.obj', 'rb') as file:
data = file.read()
supabase.storage.from_('Results').upload('task_'+str(task_id)+'.obj', data)
public_url = supabase.storage.from_('Results').get_public_url('task_'+str(task_id)+'.obj')
supabase.table("Tasks").update({"status": "complete","result":public_url}).eq("id", task_id).execute()
os.remove('task_'+str(task_id)+'.obj')
except Exception as e:
print(f"Error sending file back to Supabase: {e}")
supabase.table("Tasks").update({"status": "error"}).eq("id", task_id).execute()
else:
print(f"Error: No file was created for task {task_id}")
def worker():
while True:
task = check_queue()
if task:
supabase.table("Tasks").update({"status": "processing"}).eq("id", task['id']).execute()
print(f"Processing task {task['id']}")
img = generate_image(task["text"])
if img:
print(f"Image generated for task {task['id']}")
create_obj_file(img,task["id"])
send_back_to_supabase(task["id"])
print(f"Task {task['id']} completed")
else:
print(f"Error generating image for task {task['id']}")
supabase.table("Tasks").update({"status": "error"}).eq("id", task['id']).execute()
else:
print("No pending tasks in the queue")
time.sleep(2) # Add a 2 second delay between checks
if __name__ == "__main__":
worker()