Spaces:
Running
Running
import logging | |
import os | |
import shutil | |
import uuid | |
from pathlib import Path | |
from typing import Optional | |
from open_webui.apps.webui.models.files import FileForm, FileModel, Files | |
from open_webui.config import UPLOAD_DIR | |
from open_webui.constants import ERROR_MESSAGES | |
from open_webui.env import SRC_LOG_LEVELS | |
from fastapi import APIRouter, Depends, File, HTTPException, UploadFile, status | |
from fastapi.responses import FileResponse | |
from open_webui.utils.utils import get_admin_user, get_verified_user | |
log = logging.getLogger(__name__) | |
log.setLevel(SRC_LOG_LEVELS["MODELS"]) | |
router = APIRouter() | |
############################ | |
# Upload File | |
############################ | |
def upload_file(file: UploadFile = File(...), user=Depends(get_verified_user)): | |
log.info(f"file.content_type: {file.content_type}") | |
try: | |
unsanitized_filename = file.filename | |
filename = os.path.basename(unsanitized_filename) | |
# replace filename with uuid | |
id = str(uuid.uuid4()) | |
name = filename | |
filename = f"{id}_{filename}" | |
file_path = f"{UPLOAD_DIR}/{filename}" | |
contents = file.file.read() | |
with open(file_path, "wb") as f: | |
f.write(contents) | |
f.close() | |
file = Files.insert_new_file( | |
user.id, | |
FileForm( | |
**{ | |
"id": id, | |
"filename": filename, | |
"meta": { | |
"name": name, | |
"content_type": file.content_type, | |
"size": len(contents), | |
"path": file_path, | |
}, | |
} | |
), | |
) | |
if file: | |
return file | |
else: | |
raise HTTPException( | |
status_code=status.HTTP_400_BAD_REQUEST, | |
detail=ERROR_MESSAGES.DEFAULT("Error uploading file"), | |
) | |
except Exception as e: | |
log.exception(e) | |
raise HTTPException( | |
status_code=status.HTTP_400_BAD_REQUEST, | |
detail=ERROR_MESSAGES.DEFAULT(e), | |
) | |
############################ | |
# List Files | |
############################ | |
async def list_files(user=Depends(get_verified_user)): | |
if user.role == "admin": | |
files = Files.get_files() | |
else: | |
files = Files.get_files_by_user_id(user.id) | |
return files | |
############################ | |
# Delete All Files | |
############################ | |
async def delete_all_files(user=Depends(get_admin_user)): | |
result = Files.delete_all_files() | |
if result: | |
folder = f"{UPLOAD_DIR}" | |
try: | |
# Check if the directory exists | |
if os.path.exists(folder): | |
# Iterate over all the files and directories in the specified directory | |
for filename in os.listdir(folder): | |
file_path = os.path.join(folder, filename) | |
try: | |
if os.path.isfile(file_path) or os.path.islink(file_path): | |
os.unlink(file_path) # Remove the file or link | |
elif os.path.isdir(file_path): | |
shutil.rmtree(file_path) # Remove the directory | |
except Exception as e: | |
print(f"Failed to delete {file_path}. Reason: {e}") | |
else: | |
print(f"The directory {folder} does not exist") | |
except Exception as e: | |
print(f"Failed to process the directory {folder}. Reason: {e}") | |
return {"message": "All files deleted successfully"} | |
else: | |
raise HTTPException( | |
status_code=status.HTTP_400_BAD_REQUEST, | |
detail=ERROR_MESSAGES.DEFAULT("Error deleting files"), | |
) | |
############################ | |
# Get File By Id | |
############################ | |
async def get_file_by_id(id: str, user=Depends(get_verified_user)): | |
file = Files.get_file_by_id(id) | |
if file and (file.user_id == user.id or user.role == "admin"): | |
return file | |
else: | |
raise HTTPException( | |
status_code=status.HTTP_404_NOT_FOUND, | |
detail=ERROR_MESSAGES.NOT_FOUND, | |
) | |
############################ | |
# Get File Content By Id | |
############################ | |
async def get_file_content_by_id(id: str, user=Depends(get_verified_user)): | |
file = Files.get_file_by_id(id) | |
if file and (file.user_id == user.id or user.role == "admin"): | |
file_path = Path(file.meta["path"]) | |
# Check if the file already exists in the cache | |
if file_path.is_file(): | |
print(f"file_path: {file_path}") | |
return FileResponse(file_path) | |
else: | |
raise HTTPException( | |
status_code=status.HTTP_404_NOT_FOUND, | |
detail=ERROR_MESSAGES.NOT_FOUND, | |
) | |
else: | |
raise HTTPException( | |
status_code=status.HTTP_404_NOT_FOUND, | |
detail=ERROR_MESSAGES.NOT_FOUND, | |
) | |
async def get_file_content_by_id(id: str, user=Depends(get_verified_user)): | |
file = Files.get_file_by_id(id) | |
if file and (file.user_id == user.id or user.role == "admin"): | |
file_path = Path(file.meta["path"]) | |
# Check if the file already exists in the cache | |
if file_path.is_file(): | |
print(f"file_path: {file_path}") | |
return FileResponse(file_path) | |
else: | |
raise HTTPException( | |
status_code=status.HTTP_404_NOT_FOUND, | |
detail=ERROR_MESSAGES.NOT_FOUND, | |
) | |
else: | |
raise HTTPException( | |
status_code=status.HTTP_404_NOT_FOUND, | |
detail=ERROR_MESSAGES.NOT_FOUND, | |
) | |
############################ | |
# Delete File By Id | |
############################ | |
async def delete_file_by_id(id: str, user=Depends(get_verified_user)): | |
file = Files.get_file_by_id(id) | |
if file and (file.user_id == user.id or user.role == "admin"): | |
result = Files.delete_file_by_id(id) | |
if result: | |
return {"message": "File deleted successfully"} | |
else: | |
raise HTTPException( | |
status_code=status.HTTP_400_BAD_REQUEST, | |
detail=ERROR_MESSAGES.DEFAULT("Error deleting file"), | |
) | |
else: | |
raise HTTPException( | |
status_code=status.HTTP_404_NOT_FOUND, | |
detail=ERROR_MESSAGES.NOT_FOUND, | |
) | |