Spaces:
Paused
Paused
from datetime import datetime | |
from typing import Dict, Optional, List | |
from supabase_conex import SupabaseConnection | |
class UserManager: | |
"""Gerenciador de usuários usando Supabase""" | |
def __init__(self): | |
self.supabase = SupabaseConnection().get_client() | |
self.users_table = 'users' | |
def authenticate_user(self, email: str, password: str) -> Optional[Dict]: | |
"""Autentica um usuário""" | |
try: | |
# Autenticar usuário | |
auth_response = self.supabase.auth.sign_in_with_password({ | |
"email": email, | |
"password": password | |
}) | |
if auth_response: | |
# Usar admin_client para evitar recursão nas políticas | |
admin_client = SupabaseConnection().get_admin_client() | |
# Buscar dados completos do usuário | |
user_response = admin_client.from_('users')\ | |
.select('*')\ | |
.eq('email', email)\ | |
.single()\ | |
.execute() | |
if user_response.data: | |
print(f"=== Login realizado com sucesso ===") | |
print(f"Usuário: {user_response.data['nome']} ({user_response.data['email']})") | |
# Verificar se tem assistente | |
if not user_response.data.get('assistant_name'): | |
print("✗ Usuário não possui assistente configurado") | |
return user_response.data | |
return None | |
except Exception as e: | |
print(f"Erro na autenticação: {str(e)}") | |
raise | |
def get_user(self, user_id: str) -> Optional[Dict]: | |
"""Busca um usuário pelo ID""" | |
try: | |
# Usar admin_client para bypass nas políticas | |
admin_client = SupabaseConnection().get_admin_client() | |
response = admin_client.from_(self.users_table)\ | |
.select('*')\ | |
.eq('id', user_id)\ | |
.execute() | |
return response.data[0] if response.data else None | |
except Exception as e: | |
raise Exception(f"Erro ao buscar usuário: {str(e)}") | |
def get_current_user(self) -> Optional[Dict]: | |
"""Retorna o usuário atual""" | |
try: | |
session = self.supabase.auth.get_session() | |
if not session: | |
return None | |
return self.get_user(session.user.id) | |
except Exception as e: | |
print(f"Erro ao buscar usuário atual: {str(e)}") | |
return None | |
def is_admin(self, user_id: str) -> bool: | |
"""Verifica se um usuário é admin""" | |
try: | |
user = self.get_user(user_id) | |
return user.get('is_admin', False) if user else False | |
except Exception as e: | |
print(f"Erro ao verificar admin: {str(e)}") | |
return False | |
# Exemplo de uso | |
if __name__ == "__main__": | |
user_manager = UserManager() | |