Spaces:
Build error
Build error
File size: 7,912 Bytes
0821095 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 |
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
SmoLAgent Parser - Extrait le premier modèle de chaque leaderboard Hugging Face
en utilisant Playwright et smolagents.
"""
import json
import os
import asyncio
from pathlib import Path
from typing import List, Dict, Any, Optional
from dotenv import load_dotenv
from huggingface_hub import login
from playwright.async_api import async_playwright
from smolagents import CodeAgent
from smolagents.models import HfApiModel
from smolagents.tools import Tool
# Charger les variables d'environnement depuis le fichier .env
load_dotenv()
# Récupérer le token Hugging Face
hf_token = os.getenv("HUGGING_FACE_HUB_TOKEN")
# Charger les leaderboards depuis le fichier JSON
def load_leaderboards() -> List[str]:
"""Charger les URLs des leaderboards depuis le fichier JSON."""
with open("leaderboards.json", "r") as f:
return json.load(f)
# Définir un outil pour utiliser Playwright
class PlaywrightBrowserTool(Tool):
"""Outil pour interagir avec un navigateur web via Playwright."""
name = "browser"
description = "Outil pour interagir avec un navigateur web via Playwright."
inputs = {
"goto": {
"url": {
"type": "string",
"description": "L'URL vers laquelle naviguer"
}
},
"get_content": {},
"get_title": {},
"take_screenshot": {
"path": {
"type": "string",
"description": "Le chemin où enregistrer la capture d'écran"
}
},
"run_js": {
"script": {
"type": "string",
"description": "Le code JavaScript à exécuter dans le contexte de la page"
}
},
"wait_for": {
"selector": {
"type": "string",
"description": "Le sélecteur CSS à attendre"
},
"timeout": {
"type": "integer",
"description": "Le temps maximum d'attente en millisecondes"
}
},
"click": {
"selector": {
"type": "string",
"description": "Le sélecteur CSS de l'élément à cliquer"
}
},
"fill": {
"selector": {
"type": "string",
"description": "Le sélecteur CSS du champ de formulaire"
},
"value": {
"type": "string",
"description": "La valeur à remplir dans le champ de formulaire"
}
}
}
output_type = "any"
def __init__(self, page):
self.page = page
async def goto(self, url: str) -> str:
"""Naviguer vers une URL."""
await self.page.goto(url, wait_until="networkidle", timeout=60000)
return f"Navigué vers {url}"
async def get_content(self) -> str:
"""Obtenir le contenu HTML de la page."""
return await self.page.content()
async def get_title(self) -> str:
"""Obtenir le titre de la page."""
return await self.page.title()
async def take_screenshot(self, path: str = "screenshot.png") -> str:
"""Prendre une capture d'écran de la page."""
await self.page.screenshot(path=path)
return f"Capture d'écran enregistrée dans {path}"
async def run_js(self, script: str) -> Any:
"""Exécuter du JavaScript dans le contexte de la page."""
return await self.page.evaluate(script)
async def wait_for(self, selector: str, timeout: int = 30000) -> str:
"""Attendre qu'un élément correspondant au sélecteur apparaisse."""
await self.page.wait_for_selector(selector, timeout=timeout)
return f"Élément avec le sélecteur '{selector}' trouvé"
async def click(self, selector: str) -> str:
"""Cliquer sur un élément correspondant au sélecteur."""
await self.page.click(selector)
return f"Cliqué sur l'élément avec le sélecteur '{selector}'"
async def fill(self, selector: str, value: str) -> str:
"""Remplir un champ de formulaire."""
await self.page.fill(selector, value)
return f"Rempli '{value}' dans l'élément avec le sélecteur '{selector}'"
async def extract_first_model(url: str) -> Optional[Dict[str, Any]]:
"""
Extraire le premier modèle d'un leaderboard en utilisant un agent.
Args:
url: L'URL du leaderboard
Returns:
Un dictionnaire contenant les informations sur le premier modèle, ou None si l'extraction a échoué
"""
async with async_playwright() as p:
browser = await p.chromium.launch(headless=False) # Mettre à True pour la production
page = await browser.new_page()
try:
# Créer l'outil Playwright
browser_tool = PlaywrightBrowserTool(page)
# Créer l'agent
agent = CodeAgent(
tools=[browser_tool],
model=HfApiModel()
)
# Exécuter l'agent
prompt = f"""
Extrais les informations sur le premier modèle du leaderboard à l'URL suivante: {url}
Utilise l'outil browser pour naviguer sur la page et extraire les informations suivantes:
- Nom du modèle
- Score
- Position/rang
- Créateur/auteur
Retourne les informations sous forme de dictionnaire Python.
"""
result = await agent.run(prompt)
print(f"Résultat brut de l'agent: {result}")
# Essayer de parser le résultat comme un dictionnaire
try:
# L'agent peut retourner une représentation textuelle d'un dictionnaire
if isinstance(result, str):
# Essayer de trouver une structure de dictionnaire dans la chaîne
import re
dict_match = re.search(r'\{.*\}', result, re.DOTALL)
if dict_match:
dict_str = dict_match.group(0)
# Remplacer les guillemets simples par des guillemets doubles pour un JSON valide
dict_str = dict_str.replace("'", '"')
return json.loads(dict_str)
return {"raw_result": result}
return result
except Exception as e:
print(f"Erreur lors du parsing du résultat: {e}")
return {"raw_result": str(result)}
except Exception as e:
print(f"Erreur lors de l'extraction des données de {url}: {e}")
await page.screenshot(path=f"error_{url.replace('://', '_').replace('/', '_')}.png")
return {"error": str(e)}
finally:
await browser.close()
async def main():
"""Fonction principale pour traiter tous les leaderboards."""
# Se connecter à Hugging Face
if hf_token:
print("Token Hugging Face trouvé dans le fichier .env")
login(token=hf_token)
print("Connexion à Hugging Face réussie!")
else:
print("Erreur: Token Hugging Face non trouvé dans le fichier .env")
return
leaderboards = load_leaderboards()
results = {}
for url in leaderboards:
print(f"Traitement du leaderboard: {url}")
result = await extract_first_model(url)
results[url] = result
print(f"Résultat: {result}")
# Sauvegarder les résultats dans un fichier JSON
with open("results_smolagent.json", "w") as f:
json.dump(results, f, indent=2)
print(f"Résultats sauvegardés dans results_smolagent.json")
if __name__ == "__main__":
asyncio.run(main()) |