import streamlit as st
import cv2
import numpy as np
import tempfile
import os
from pathlib import Path
from typing import Optional, Tuple
from moviepy.editor import VideoFileClip
import torch
from PIL import Image
# ==============================
# Streamlit page config & Custom CSS
# ==============================
st.set_page_config(
page_title="Ansim Blur - Face Privacy Protection",
page_icon="๐",
layout="wide",
initial_sidebar_state="expanded"
)
# Custom CSS - ๋ ์ด์์ ์์ ํ๋ฅผ ์ํ ์์
st.markdown("""
""", unsafe_allow_html=True)
# ==============================
# Header Section
# ==============================
st.markdown("
๐ Ansim Blur
", unsafe_allow_html=True)
st.markdown("Advanced Face Privacy Protection
", unsafe_allow_html=True)
# Discord ๋ฐฐ์ง๋ฅผ ๊ฐ์ด๋ฐ ์ ๋ ฌ
st.markdown("""
""", unsafe_allow_html=True)
# Stats ์นด๋ - ์ปจํ
์ด๋๋ก ๊ณ ์
stats_container = st.container()
with stats_container:
col1, col2, col3, col4 = st.columns(4)
with col1:
st.markdown("""""", unsafe_allow_html=True)
with col2:
st.markdown("""""", unsafe_allow_html=True)
with col3:
st.markdown("""""", unsafe_allow_html=True)
with col4:
st.markdown("""""", unsafe_allow_html=True)
st.markdown("---")
# ==============================
# Model loader
# ==============================
@st.cache_resource(show_spinner=False)
def load_model(model_path: str = "yolov8-face-hf.pt", device: Optional[str] = None):
from ultralytics import YOLO
if device is None:
if torch.cuda.is_available():
device = "cuda"
elif torch.backends.mps.is_available():
device = "mps"
else:
device = "cpu"
model = YOLO(model_path)
model.to(device)
return model, device
with st.spinner("Loading AI model..."):
model, device = load_model()
# ==============================
# Sidebar - ๊ณ ์ ๋ ์ค์
# ==============================
with st.sidebar:
st.markdown("## โ๏ธ Configuration")
st.info(f"Device: **{device.upper()}**")
st.markdown("### Detection Settings")
conf = st.slider("Confidence Threshold", 0.05, 0.9, 0.25, 0.01)
iou = st.slider("NMS IoU", 0.1, 0.9, 0.45, 0.01)
expand_ratio = st.slider("Box Expansion", 0.0, 0.5, 0.05, 0.01)
st.markdown("### Blur Settings")
mode_choice = st.selectbox("Style", ["Gaussian Blur", "Mosaic Effect"])
if mode_choice == "Gaussian Blur":
blur_kernel = st.slider("Blur Intensity", 15, 151, 51, 2)
mosaic = 15
else:
mosaic = st.slider("Mosaic Size", 5, 40, 15, 1)
blur_kernel = 51
use_half = st.checkbox("Half Precision (CUDA)", value=False)
# ==============================
# Helper functions
# ==============================
def _ensure_odd(x: int) -> int:
return x if x % 2 == 1 else x + 1
def _choose_writer_size(w: int, h: int) -> Tuple[int, int]:
return (w if w % 2 == 0 else w - 1, h if h % 2 == 0 else h - 1)
def _apply_anonymization(face_roi: np.ndarray, mode: str, blur_kernel: int, mosaic: int = 15) -> np.ndarray:
if face_roi.size == 0:
return face_roi
if mode == "Gaussian Blur":
k = _ensure_odd(max(blur_kernel, 15))
return cv2.GaussianBlur(face_roi, (k, k), 0)
else:
m = max(2, mosaic)
h, w = face_roi.shape[:2]
face_small = cv2.resize(face_roi, (max(1, w // m), max(1, h // m)), interpolation=cv2.INTER_LINEAR)
return cv2.resize(face_small, (w, h), interpolation=cv2.INTER_NEAREST)
def blur_faces_image(image_bgr, conf, iou, expand_ratio, mode, blur_kernel, mosaic, use_half):
h, w = image_bgr.shape[:2]
face_count = 0
with torch.no_grad():
if use_half and device == "cuda":
torch.set_default_dtype(torch.float16)
results = model.predict(image_bgr, conf=conf, iou=iou, verbose=False, device=device)
if use_half and device == "cuda":
torch.set_default_dtype(torch.float32)
for r in results:
boxes = r.boxes.xyxy.cpu().numpy() if hasattr(r.boxes, "xyxy") else []
face_count = len(boxes)
for x1, y1, x2, y2 in boxes:
x1, y1, x2, y2 = map(int, [x1, y1, x2, y2])
if expand_ratio > 0:
bw = x2 - x1
bh = y2 - y1
dx = int(bw * expand_ratio)
dy = int(bh * expand_ratio)
x1 -= dx; y1 -= dy; x2 += dx; y2 += dy
x1 = max(0, min(w, x1))
x2 = max(0, min(w, x2))
y1 = max(0, min(h, y1))
y2 = max(0, min(h, y2))
if x2 <= x1 or y2 <= y1:
continue
roi = image_bgr[y1:y2, x1:x2]
image_bgr[y1:y2, x1:x2] = _apply_anonymization(roi, mode, blur_kernel, mosaic)
return image_bgr, face_count
def blur_faces_video(input_path, output_path, conf, iou, expand_ratio, mode, blur_kernel, mosaic, update_callback, use_half):
cap = cv2.VideoCapture(input_path)
if not cap.isOpened():
raise IOError("Cannot open video")
in_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
in_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) or 0
out_w, out_h = _choose_writer_size(in_w, in_h)
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
temp_video_path = str(Path(output_path).with_name("blurred_temp_video.mp4"))
out = cv2.VideoWriter(temp_video_path, fourcc, fps, (out_w, out_h))
idx = 0
total_faces = 0
try:
while True:
ret, frame = cap.read()
if not ret:
break
frame = cv2.resize(frame, (out_w, out_h))
with torch.no_grad():
if use_half and device == "cuda":
torch.set_default_dtype(torch.float16)
results = model.predict(frame, conf=conf, iou=iou, verbose=False, device=device)
if use_half and device == "cuda":
torch.set_default_dtype(torch.float32)
h, w = frame.shape[:2]
r0 = results[0] if len(results) else None
boxes = r0.boxes.xyxy if (r0 and hasattr(r0, "boxes")) else []
total_faces += len(boxes)
for b in boxes:
x1, y1, x2, y2 = map(int, b)
if expand_ratio > 0:
bw = x2 - x1
bh = y2 - y1
dx = int(bw * expand_ratio)
dy = int(bh * expand_ratio)
x1 -= dx; y1 -= dy; x2 += dx; y2 += dy
x1 = max(0, min(w, x1))
x2 = max(0, min(w, x2))
y1 = max(0, min(h, y1))
y2 = max(0, min(h, y2))
if x2 <= x1 or y2 <= y1:
continue
roi = frame[y1:y2, x1:x2]
frame[y1:y2, x1:x2] = _apply_anonymization(roi, mode, blur_kernel, mosaic)
out.write(frame)
idx += 1
if update_callback and frames > 0:
update_callback(min(0.98, idx / frames), idx, frames, total_faces)
finally:
cap.release()
out.release()
try:
if update_callback:
update_callback(0.99, idx, frames, total_faces)
original = VideoFileClip(input_path)
processed = VideoFileClip(temp_video_path).set_audio(original.audio)
processed.write_videofile(
output_path,
codec="libx264",
audio_codec="aac",
threads=1,
logger=None
)
if update_callback:
update_callback(1.0, idx, frames, total_faces)
return output_path, total_faces
except Exception as e:
print("Audio merging failed:", e)
return temp_video_path, total_faces
# ==============================
# Main Interface - ๊ณ ์ ๋ ๋ ์ด์์
# ==============================
tab1, tab2 = st.tabs(["๐ธ Image Processing", "๐ฌ Video Processing"])
with tab1:
# ๊ณ ์ ๋ ์ปจํ
์ด๋ ์์ฑ
main_container = st.container()
with main_container:
# 2๊ฐ์ ๊ณ ์ ๋ ์ปฌ๋ผ
col1, col2 = st.columns(2, gap="large")
# ์ผ์ชฝ ์ปฌ๋ผ - ์
๋ ฅ
with col1:
st.markdown("### Input")
# ํ์ผ ์
๋ก๋ ์ปจํ
์ด๋
upload_container = st.container()
with upload_container:
uploaded_file = st.file_uploader(
"Choose an image",
type=["jpg", "png", "jpeg"],
key="img_upload"
)
# ์๋ณธ ์ด๋ฏธ์ง ํ์ ์์ญ (๊ณ ์ ๋์ด)
original_placeholder = st.empty()
info_placeholder = st.empty()
if uploaded_file:
file_bytes = np.asarray(bytearray(uploaded_file.read()), dtype=np.uint8)
image = cv2.imdecode(file_bytes, cv2.IMREAD_COLOR)
original_placeholder.image(cv2.cvtColor(image, cv2.COLOR_BGR2RGB), caption="Original", use_container_width=True)
h, w = image.shape[:2]
info_placeholder.info(f"Size: {w} ร {h} pixels")
else:
# ๋น ๊ณต๊ฐ ์ ์ง
original_placeholder.markdown("", unsafe_allow_html=True)
info_placeholder.empty()
# ์ค๋ฅธ์ชฝ ์ปฌ๋ผ - ๊ฒฐ๊ณผ
with col2:
st.markdown("### Result")
# ๋ฒํผ ์ปจํ
์ด๋
button_container = st.container()
# ๊ฒฐ๊ณผ ์ด๋ฏธ์ง ํ์ ์์ญ (๊ณ ์ ๋์ด)
result_placeholder = st.empty()
success_placeholder = st.empty()
download_placeholder = st.empty()
with button_container:
if uploaded_file:
if st.button("๐ Process Image", type="primary", use_container_width=True):
with st.spinner("Processing..."):
result, face_count = blur_faces_image(
image.copy(), conf, iou, expand_ratio,
mode_choice, blur_kernel, mosaic, use_half
)
result_placeholder.image(cv2.cvtColor(result, cv2.COLOR_BGR2RGB), caption="Processed", use_container_width=True)
success_placeholder.success(f"Blurred {face_count} face(s)")
_, buffer = cv2.imencode('.jpg', result)
download_placeholder.download_button(
"โฌ๏ธ Download",
data=buffer.tobytes(),
file_name="blurred.jpg",
mime="image/jpeg",
use_container_width=True
)
else:
result_placeholder.markdown("", unsafe_allow_html=True)
with tab2:
video_container = st.container()
with video_container:
col1, col2 = st.columns(2, gap="large")
with col1:
st.markdown("### Input Video")
video_upload = st.file_uploader(
"Choose a video",
type=["mp4", "avi", "mov", "mkv"],
key="video_upload"
)
video_placeholder = st.empty()
if video_upload:
video_placeholder.video(video_upload)
else:
video_placeholder.markdown("", unsafe_allow_html=True)
with col2:
st.markdown("### Processed Video")
process_button = st.empty()
progress_placeholder = st.empty()
stats_placeholder = st.empty()
result_video_placeholder = st.empty()
download_video_placeholder = st.empty()
if video_upload:
if process_button.button("๐ฌ Process Video", type="primary", use_container_width=True):
# Save uploaded file
input_path = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name
with open(input_path, "wb") as f:
f.write(video_upload.read())
output_path = str(Path(tempfile.gettempdir()) / "blurred_video.mp4")
def update_progress(value, current_frame=0, total_frames=0, faces=0):
percent = int(value * 100)
progress_placeholder.progress(value)
if total_frames > 0:
stats_placeholder.info(f"๐ Frame: {current_frame}/{total_frames} | Progress: {percent}% | Faces: {faces}")
try:
final_output, total_faces = blur_faces_video(
input_path, output_path,
conf=conf, iou=iou, expand_ratio=expand_ratio,
mode=mode_choice, blur_kernel=blur_kernel,
mosaic=mosaic,
update_callback=update_progress, use_half=use_half
)
stats_placeholder.success(f"โ
Complete! Blurred {total_faces} faces.")
result_video_placeholder.video(final_output)
with open(final_output, "rb") as file:
download_video_placeholder.download_button(
"โฌ๏ธ Download Video",
file,
file_name="blurred_video.mp4",
mime="video/mp4",
use_container_width=True
)
except Exception as e:
stats_placeholder.error(f"โ Error: {e}")
finally:
if os.path.exists(input_path):
os.remove(input_path)
else:
result_video_placeholder.markdown("Processed video will appear here
", unsafe_allow_html=True)