aideml / aide /utils /__init__.py
Dixing Xu
Init...
2fc6b05 unverified
import logging
import shutil
import zipfile
from pathlib import Path
logger = logging.getLogger("aide")
def copytree(src: Path, dst: Path, use_symlinks=True):
"""
Copy contents of `src` to `dst`. Unlike shutil.copytree, the dst dir can exist and will be merged.
If src is a file, only that file will be copied. Optionally uses symlinks instead of copying.
Args:
src (Path): source directory
dst (Path): destination directory
"""
assert dst.is_dir()
if src.is_file():
dest_f = dst / src.name
assert not dest_f.exists(), dest_f
if use_symlinks:
(dest_f).symlink_to(src)
else:
shutil.copyfile(src, dest_f)
return
for f in src.iterdir():
dest_f = dst / f.name
assert not dest_f.exists(), dest_f
if use_symlinks:
(dest_f).symlink_to(f)
elif f.is_dir():
shutil.copytree(f, dest_f)
else:
shutil.copyfile(f, dest_f)
def clean_up_dataset(path: Path):
for item in path.rglob("__MACOSX"):
if item.is_dir():
shutil.rmtree(item)
for item in path.rglob(".DS_Store"):
if item.is_file():
item.unlink()
def extract_archives(path: Path):
"""
unzips all .zip files within `path` and cleans up task dir
[TODO] handle nested zips
"""
for zip_f in path.rglob("*.zip"):
f_out_dir = zip_f.with_suffix("")
# special case: the intended output path already exists (maybe data has already been extracted by user)
if f_out_dir.exists():
logger.debug(
f"Skipping {zip_f} as an item with the same name already exists."
)
# if it's a file, it's probably exactly the same as in the zip -> remove the zip
# [TODO] maybe add an extra check to see if zip file content matches the colliding file
if f_out_dir.is_file() and f_out_dir.suffix != "":
zip_f.unlink()
continue
logger.debug(f"Extracting: {zip_f}")
f_out_dir.mkdir(exist_ok=True)
with zipfile.ZipFile(zip_f, "r") as zip_ref:
zip_ref.extractall(f_out_dir)
# remove any unwanted files
clean_up_dataset(f_out_dir)
contents = list(f_out_dir.iterdir())
# special case: the zip contains a single dir/file with the same name as the zip
if len(contents) == 1 and contents[0].name == f_out_dir.name:
sub_item = contents[0]
# if it's a dir, move its contents to the parent and remove it
if sub_item.is_dir():
logger.debug(f"Special handling (child is dir) enabled for: {zip_f}")
for f in sub_item.rglob("*"):
shutil.move(f, f_out_dir)
sub_item.rmdir()
# if it's a file, rename it to the parent and remove the parent
elif sub_item.is_file():
logger.debug(f"Special handling (child is file) enabled for: {zip_f}")
sub_item_tmp = sub_item.rename(f_out_dir.with_suffix(".__tmp_rename"))
f_out_dir.rmdir()
sub_item_tmp.rename(f_out_dir)
zip_f.unlink()
def preproc_data(path: Path):
extract_archives(path)
clean_up_dataset(path)