|
import os |
|
import shutil |
|
import tempfile |
|
import logging |
|
from pathlib import Path |
|
from huggingface_hub import HfApi, snapshot_download, upload_folder, create_repo |
|
from dotenv import load_dotenv |
|
|
|
|
|
SOURCE_USERNAME = "TheFinAI" |
|
DESTINATION_USERNAME = "tfrere" |
|
|
|
|
|
BACKEND_DIR = Path(__file__).parent.parent |
|
ROOT_DIR = BACKEND_DIR.parent |
|
|
|
|
|
load_dotenv(ROOT_DIR / ".env") |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(message)s' |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
DATASET_NAMES = [ |
|
"votes", |
|
"results", |
|
"requests", |
|
"greek-contents", |
|
"maintainers-highlight", |
|
] |
|
|
|
|
|
DATASETS = [ |
|
(name, f"{SOURCE_USERNAME}/{name}", f"{DESTINATION_USERNAME}/{name}") |
|
for name in DATASET_NAMES |
|
] |
|
|
|
|
|
api = HfApi() |
|
|
|
def ensure_repo_exists(repo_id, token): |
|
"""Ensure the repository exists, create it if it doesn't""" |
|
try: |
|
api.repo_info(repo_id=repo_id, repo_type="dataset") |
|
logger.info(f"β Repository {repo_id} already exists") |
|
except Exception: |
|
logger.info(f"Creating repository {repo_id}...") |
|
create_repo( |
|
repo_id=repo_id, |
|
repo_type="dataset", |
|
token=token, |
|
private=True |
|
) |
|
logger.info(f"β Repository {repo_id} created") |
|
|
|
def process_dataset(dataset_info, token): |
|
"""Process a single dataset""" |
|
name, source_dataset, destination_dataset = dataset_info |
|
try: |
|
logger.info(f"\nπ₯ Processing dataset: {name}") |
|
|
|
|
|
ensure_repo_exists(destination_dataset, token) |
|
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir: |
|
try: |
|
|
|
logger.info(f"Listing files in {source_dataset}...") |
|
files = api.list_repo_files(source_dataset, repo_type="dataset") |
|
logger.info(f"Detected structure: {len(files)} files") |
|
|
|
|
|
logger.info(f"Downloading from {source_dataset}...") |
|
local_dir = snapshot_download( |
|
repo_id=source_dataset, |
|
repo_type="dataset", |
|
local_dir=temp_dir, |
|
token=token |
|
) |
|
logger.info(f"β Download complete") |
|
|
|
|
|
logger.info(f"π€ Uploading to {destination_dataset}...") |
|
api.upload_folder( |
|
folder_path=local_dir, |
|
repo_id=destination_dataset, |
|
repo_type="dataset", |
|
token=token |
|
) |
|
logger.info(f"β
{name} copied successfully!") |
|
return True |
|
|
|
except Exception as e: |
|
logger.error(f"β Error processing {name}: {str(e)}") |
|
return False |
|
|
|
except Exception as e: |
|
logger.error(f"β Error for {name}: {str(e)}") |
|
return False |
|
|
|
def copy_datasets(): |
|
try: |
|
logger.info("π Checking authentication...") |
|
|
|
token = os.getenv("HF_TOKEN") |
|
if not token: |
|
raise ValueError("HF_TOKEN not found in .env file") |
|
|
|
|
|
results = [] |
|
for dataset_info in DATASETS: |
|
success = process_dataset(dataset_info, token) |
|
results.append((dataset_info[0], success)) |
|
|
|
|
|
logger.info("\nπ Final summary:") |
|
for dataset, success in results: |
|
status = "β
Success" if success else "β Failure" |
|
logger.info(f"{dataset}: {status}") |
|
|
|
except Exception as e: |
|
logger.error(f"β Global error: {str(e)}") |
|
|
|
if __name__ == "__main__": |
|
copy_datasets() |
|
|