import hashlib import os from glob import glob import laion_clap from diskcache import Cache from qdrant_client import QdrantClient from qdrant_client.http import models from tqdm import tqdm # Utiliser les variables d'environnement pour la configuration QDRANT_HOST = os.getenv('QDRANT_HOST', 'localhost') QDRANT_PORT = int(os.getenv('QDRANT_PORT', 6333)) # Functions utils def get_md5(fpath): with open(fpath, "rb") as f: file_hash = hashlib.md5() while chunk := f.read(8192): file_hash.update(chunk) return file_hash.hexdigest() # PARAMETERS CACHE_FOLDER = '/home/nahia/data/audio/' KAGGLE_TRAIN_PATH = '/home/nahia/Documents/audio/actor/Actor_01/' # Charger le modèle CLAP print("[INFO] Loading the model...") model_name = 'music_speech_epoch_15_esc_89.25.pt' model = laion_clap.CLAP_Module(enable_fusion=False) model.load_ckpt() # télécharger le checkpoint préentraîné par défaut # Initialiser le cache os.makedirs(CACHE_FOLDER, exist_ok=True) cache = Cache(CACHE_FOLDER) # Embarquer les fichiers audio audio_files = [p for p in glob(os.path.join(KAGGLE_TRAIN_PATH, '*.wav'))] audio_embeddings = [] chunk_size = 100 total_chunks = int(len(audio_files) / chunk_size) # Utiliser tqdm pour une barre de progression for i in tqdm(range(0, len(audio_files), chunk_size), total=total_chunks): chunk = audio_files[i:i + chunk_size] # Obtenir un chunk de fichiers audio chunk_embeddings = [] for audio_file in chunk: # Calculer un hash unique pour le fichier audio file_key = get_md5(audio_file) if file_key in cache: # Si l'embedding pour ce fichier est en cache, le récupérer embedding = cache[file_key] else: # Sinon, calculer l'embedding et le mettre en cache embedding = model.get_audio_embedding_from_filelist(x=[audio_file], use_tensor=False)[ 0] # Assumer que le modèle retourne une liste cache[file_key] = embedding chunk_embeddings.append(embedding) audio_embeddings.extend(chunk_embeddings) # Fermer le cache quand terminé cache.close() # Créer une collection qdrant client = QdrantClient(QDRANT_HOST, port=QDRANT_PORT) print("[INFO] Client created...") print("[INFO] Creating qdrant data collection...") client.create_collection( collection_name="demo_db7", vectors_config=models.VectorParams( size=audio_embeddings[0].shape[0], distance=models.Distance.COSINE ), ) # Créer des enregistrements Qdrant à partir des embeddings records = [] for idx, (audio_path, embedding) in enumerate(zip(audio_files, audio_embeddings)): record = models.PointStruct( id=idx, vector=embedding, payload={"audio_path": audio_path, "style": audio_path.split('/')[-2]} ) records.append(record) # Téléverser les enregistrements dans la collection Qdrant print("[INFO] Uploading data records to data collection...") client.upload_points(collection_name="demo_db7", points=records) print("[INFO] Successfully uploaded data records to data collection!")