Face-blurring / app.py
openfree's picture
Create app.py
81989c5 verified
import streamlit as st
import cv2
import numpy as np
import tempfile
import os
from pathlib import Path
from typing import Optional, Tuple
from moviepy.editor import VideoFileClip
import torch
from PIL import Image
# ==============================
# Streamlit page config & Custom CSS
# ==============================
st.set_page_config(
page_title="Ansim Blur - Face Privacy Protection",
page_icon="๐Ÿ”’",
layout="wide",
initial_sidebar_state="expanded"
)
# Custom CSS - ๋ ˆ์ด์•„์›ƒ ์•ˆ์ •ํ™”๋ฅผ ์œ„ํ•œ ์ˆ˜์ •
st.markdown("""
<style>
/* ์ปจํ…Œ์ด๋„ˆ ๊ณ ์ • ๋†’์ด ์„ค์ •์œผ๋กœ ํ”๋“ค๋ฆผ ๋ฐฉ์ง€ */
.image-container {
min-height: 400px;
display: flex;
align-items: center;
justify-content: center;
}
/* ๋ฉ”์ธ ์ปจํ…Œ์ด๋„ˆ ์•ˆ์ •ํ™” */
.main .block-container {
max-width: 1400px;
padding-top: 2rem;
padding-bottom: 2rem;
}
/* ์ปฌ๋Ÿผ ๊ณ ์ • */
[data-testid="column"] {
min-height: 500px;
}
/* ์ด๋ฏธ์ง€ ์—…๋กœ๋” ์˜์—ญ ๊ณ ์ • */
[data-testid="stFileUploader"] {
min-height: 150px;
}
/* ๋ฒ„ํŠผ ์˜์—ญ ๊ณ ์ • */
.stButton {
min-height: 60px;
}
/* ํ”„๋กœ๊ทธ๋ ˆ์Šค ๋ฐ” ์˜์—ญ ๊ณ ์ • */
.stProgress {
min-height: 30px;
}
/* ํ—ค๋” ์Šคํƒ€์ผ๋ง */
h1 {
background: linear-gradient(120deg, #a855f7, #ec4899);
-webkit-background-clip: text;
-webkit-text-fill-color: transparent;
font-size: 3rem !important;
font-weight: 700 !important;
text-align: center;
margin-bottom: 1rem !important;
}
/* ์นด๋“œ ์Šคํƒ€์ผ */
.stat-card {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
padding: 1rem;
border-radius: 12px;
color: white;
text-align: center;
height: 100px;
display: flex;
flex-direction: column;
justify-content: center;
box-shadow: 0 4px 15px rgba(0,0,0,0.1);
}
.stat-number {
font-size: 2rem;
margin-bottom: 0.3rem;
}
.stat-label {
font-size: 0.85rem;
opacity: 0.95;
}
/* ๋ฒ„ํŠผ ์Šคํƒ€์ผ ๊ฐœ์„  */
.stButton > button {
background: linear-gradient(135deg, #a855f7 0%, #ec4899 100%);
color: white;
border: none;
padding: 0.7rem 1.5rem;
font-size: 1rem;
font-weight: 600;
border-radius: 25px;
width: 100%;
transition: transform 0.2s;
}
.stButton > button:hover {
transform: translateY(-2px);
}
/* ์‚ฌ์ด๋“œ๋ฐ” ์Šคํƒ€์ผ */
.css-1d391kg {
background-color: #f8f7ff;
}
/* Info ๋ฐ•์Šค */
.info-box {
background: #f0f4ff;
border-left: 4px solid #667eea;
padding: 1rem;
border-radius: 8px;
margin: 1rem 0;
}
</style>
""", unsafe_allow_html=True)
# ==============================
# Header Section
# ==============================
st.markdown("<h1>๐Ÿ”’ Ansim Blur</h1>", unsafe_allow_html=True)
st.markdown("<p style='text-align: center; color: #6b7280; margin-bottom: 1rem;'>Advanced Face Privacy Protection</p>", unsafe_allow_html=True)
# Discord ๋ฐฐ์ง€๋ฅผ ๊ฐ€์šด๋ฐ ์ •๋ ฌ
st.markdown("""
<div style='text-align: center; margin-bottom: 2rem;'>
<a href="https://discord.gg/openfreeai" target="_blank">
<img src="https://img.shields.io/static/v1?label=Discord&message=Openfree%20AI&color=%230000ff&labelColor=%23800080&logo=discord&logoColor=white&style=for-the-badge" alt="Discord badge">
</a>
</div>
""", unsafe_allow_html=True)
# Stats ์นด๋“œ - ์ปจํ…Œ์ด๋„ˆ๋กœ ๊ณ ์ •
stats_container = st.container()
with stats_container:
col1, col2, col3, col4 = st.columns(4)
with col1:
st.markdown("""<div class='stat-card'><div class='stat-number'>๐Ÿ–ผ๏ธ</div><div class='stat-label'>Image Support</div></div>""", unsafe_allow_html=True)
with col2:
st.markdown("""<div class='stat-card'><div class='stat-number'>๐ŸŽฅ</div><div class='stat-label'>Video Processing</div></div>""", unsafe_allow_html=True)
with col3:
st.markdown("""<div class='stat-card'><div class='stat-number'>โšก</div><div class='stat-label'>Real-time</div></div>""", unsafe_allow_html=True)
with col4:
st.markdown("""<div class='stat-card'><div class='stat-number'>๐Ÿ›ก๏ธ</div><div class='stat-label'>Privacy First</div></div>""", unsafe_allow_html=True)
st.markdown("---")
# ==============================
# Model loader
# ==============================
@st.cache_resource(show_spinner=False)
def load_model(model_path: str = "yolov8-face-hf.pt", device: Optional[str] = None):
from ultralytics import YOLO
if device is None:
if torch.cuda.is_available():
device = "cuda"
elif torch.backends.mps.is_available():
device = "mps"
else:
device = "cpu"
model = YOLO(model_path)
model.to(device)
return model, device
with st.spinner("Loading AI model..."):
model, device = load_model()
# ==============================
# Sidebar - ๊ณ ์ •๋œ ์„ค์ •
# ==============================
with st.sidebar:
st.markdown("## โš™๏ธ Configuration")
st.info(f"Device: **{device.upper()}**")
st.markdown("### Detection Settings")
conf = st.slider("Confidence Threshold", 0.05, 0.9, 0.25, 0.01)
iou = st.slider("NMS IoU", 0.1, 0.9, 0.45, 0.01)
expand_ratio = st.slider("Box Expansion", 0.0, 0.5, 0.05, 0.01)
st.markdown("### Blur Settings")
mode_choice = st.selectbox("Style", ["Gaussian Blur", "Mosaic Effect"])
if mode_choice == "Gaussian Blur":
blur_kernel = st.slider("Blur Intensity", 15, 151, 51, 2)
mosaic = 15
else:
mosaic = st.slider("Mosaic Size", 5, 40, 15, 1)
blur_kernel = 51
use_half = st.checkbox("Half Precision (CUDA)", value=False)
# ==============================
# Helper functions
# ==============================
def _ensure_odd(x: int) -> int:
return x if x % 2 == 1 else x + 1
def _choose_writer_size(w: int, h: int) -> Tuple[int, int]:
return (w if w % 2 == 0 else w - 1, h if h % 2 == 0 else h - 1)
def _apply_anonymization(face_roi: np.ndarray, mode: str, blur_kernel: int, mosaic: int = 15) -> np.ndarray:
if face_roi.size == 0:
return face_roi
if mode == "Gaussian Blur":
k = _ensure_odd(max(blur_kernel, 15))
return cv2.GaussianBlur(face_roi, (k, k), 0)
else:
m = max(2, mosaic)
h, w = face_roi.shape[:2]
face_small = cv2.resize(face_roi, (max(1, w // m), max(1, h // m)), interpolation=cv2.INTER_LINEAR)
return cv2.resize(face_small, (w, h), interpolation=cv2.INTER_NEAREST)
def blur_faces_image(image_bgr, conf, iou, expand_ratio, mode, blur_kernel, mosaic, use_half):
h, w = image_bgr.shape[:2]
face_count = 0
with torch.no_grad():
if use_half and device == "cuda":
torch.set_default_dtype(torch.float16)
results = model.predict(image_bgr, conf=conf, iou=iou, verbose=False, device=device)
if use_half and device == "cuda":
torch.set_default_dtype(torch.float32)
for r in results:
boxes = r.boxes.xyxy.cpu().numpy() if hasattr(r.boxes, "xyxy") else []
face_count = len(boxes)
for x1, y1, x2, y2 in boxes:
x1, y1, x2, y2 = map(int, [x1, y1, x2, y2])
if expand_ratio > 0:
bw = x2 - x1
bh = y2 - y1
dx = int(bw * expand_ratio)
dy = int(bh * expand_ratio)
x1 -= dx; y1 -= dy; x2 += dx; y2 += dy
x1 = max(0, min(w, x1))
x2 = max(0, min(w, x2))
y1 = max(0, min(h, y1))
y2 = max(0, min(h, y2))
if x2 <= x1 or y2 <= y1:
continue
roi = image_bgr[y1:y2, x1:x2]
image_bgr[y1:y2, x1:x2] = _apply_anonymization(roi, mode, blur_kernel, mosaic)
return image_bgr, face_count
def blur_faces_video(input_path, output_path, conf, iou, expand_ratio, mode, blur_kernel, mosaic, update_callback, use_half):
cap = cv2.VideoCapture(input_path)
if not cap.isOpened():
raise IOError("Cannot open video")
in_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
in_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) or 0
out_w, out_h = _choose_writer_size(in_w, in_h)
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
temp_video_path = str(Path(output_path).with_name("blurred_temp_video.mp4"))
out = cv2.VideoWriter(temp_video_path, fourcc, fps, (out_w, out_h))
idx = 0
total_faces = 0
try:
while True:
ret, frame = cap.read()
if not ret:
break
frame = cv2.resize(frame, (out_w, out_h))
with torch.no_grad():
if use_half and device == "cuda":
torch.set_default_dtype(torch.float16)
results = model.predict(frame, conf=conf, iou=iou, verbose=False, device=device)
if use_half and device == "cuda":
torch.set_default_dtype(torch.float32)
h, w = frame.shape[:2]
r0 = results[0] if len(results) else None
boxes = r0.boxes.xyxy if (r0 and hasattr(r0, "boxes")) else []
total_faces += len(boxes)
for b in boxes:
x1, y1, x2, y2 = map(int, b)
if expand_ratio > 0:
bw = x2 - x1
bh = y2 - y1
dx = int(bw * expand_ratio)
dy = int(bh * expand_ratio)
x1 -= dx; y1 -= dy; x2 += dx; y2 += dy
x1 = max(0, min(w, x1))
x2 = max(0, min(w, x2))
y1 = max(0, min(h, y1))
y2 = max(0, min(h, y2))
if x2 <= x1 or y2 <= y1:
continue
roi = frame[y1:y2, x1:x2]
frame[y1:y2, x1:x2] = _apply_anonymization(roi, mode, blur_kernel, mosaic)
out.write(frame)
idx += 1
if update_callback and frames > 0:
update_callback(min(0.98, idx / frames), idx, frames, total_faces)
finally:
cap.release()
out.release()
try:
if update_callback:
update_callback(0.99, idx, frames, total_faces)
original = VideoFileClip(input_path)
processed = VideoFileClip(temp_video_path).set_audio(original.audio)
processed.write_videofile(
output_path,
codec="libx264",
audio_codec="aac",
threads=1,
logger=None
)
if update_callback:
update_callback(1.0, idx, frames, total_faces)
return output_path, total_faces
except Exception as e:
print("Audio merging failed:", e)
return temp_video_path, total_faces
# ==============================
# Main Interface - ๊ณ ์ •๋œ ๋ ˆ์ด์•„์›ƒ
# ==============================
tab1, tab2 = st.tabs(["๐Ÿ“ธ Image Processing", "๐ŸŽฌ Video Processing"])
with tab1:
# ๊ณ ์ •๋œ ์ปจํ…Œ์ด๋„ˆ ์ƒ์„ฑ
main_container = st.container()
with main_container:
# 2๊ฐœ์˜ ๊ณ ์ •๋œ ์ปฌ๋Ÿผ
col1, col2 = st.columns(2, gap="large")
# ์™ผ์ชฝ ์ปฌ๋Ÿผ - ์ž…๋ ฅ
with col1:
st.markdown("### Input")
# ํŒŒ์ผ ์—…๋กœ๋” ์ปจํ…Œ์ด๋„ˆ
upload_container = st.container()
with upload_container:
uploaded_file = st.file_uploader(
"Choose an image",
type=["jpg", "png", "jpeg"],
key="img_upload"
)
# ์›๋ณธ ์ด๋ฏธ์ง€ ํ‘œ์‹œ ์˜์—ญ (๊ณ ์ • ๋†’์ด)
original_placeholder = st.empty()
info_placeholder = st.empty()
if uploaded_file:
file_bytes = np.asarray(bytearray(uploaded_file.read()), dtype=np.uint8)
image = cv2.imdecode(file_bytes, cv2.IMREAD_COLOR)
original_placeholder.image(cv2.cvtColor(image, cv2.COLOR_BGR2RGB), caption="Original", use_container_width=True)
h, w = image.shape[:2]
info_placeholder.info(f"Size: {w} ร— {h} pixels")
else:
# ๋นˆ ๊ณต๊ฐ„ ์œ ์ง€
original_placeholder.markdown("<div class='image-container'><p style='text-align:center;color:#999;'>No image uploaded</p></div>", unsafe_allow_html=True)
info_placeholder.empty()
# ์˜ค๋ฅธ์ชฝ ์ปฌ๋Ÿผ - ๊ฒฐ๊ณผ
with col2:
st.markdown("### Result")
# ๋ฒ„ํŠผ ์ปจํ…Œ์ด๋„ˆ
button_container = st.container()
# ๊ฒฐ๊ณผ ์ด๋ฏธ์ง€ ํ‘œ์‹œ ์˜์—ญ (๊ณ ์ • ๋†’์ด)
result_placeholder = st.empty()
success_placeholder = st.empty()
download_placeholder = st.empty()
with button_container:
if uploaded_file:
if st.button("๐Ÿ” Process Image", type="primary", use_container_width=True):
with st.spinner("Processing..."):
result, face_count = blur_faces_image(
image.copy(), conf, iou, expand_ratio,
mode_choice, blur_kernel, mosaic, use_half
)
result_placeholder.image(cv2.cvtColor(result, cv2.COLOR_BGR2RGB), caption="Processed", use_container_width=True)
success_placeholder.success(f"Blurred {face_count} face(s)")
_, buffer = cv2.imencode('.jpg', result)
download_placeholder.download_button(
"โฌ‡๏ธ Download",
data=buffer.tobytes(),
file_name="blurred.jpg",
mime="image/jpeg",
use_container_width=True
)
else:
result_placeholder.markdown("<div class='image-container'><p style='text-align:center;color:#999;'>Results will appear here</p></div>", unsafe_allow_html=True)
with tab2:
video_container = st.container()
with video_container:
col1, col2 = st.columns(2, gap="large")
with col1:
st.markdown("### Input Video")
video_upload = st.file_uploader(
"Choose a video",
type=["mp4", "avi", "mov", "mkv"],
key="video_upload"
)
video_placeholder = st.empty()
if video_upload:
video_placeholder.video(video_upload)
else:
video_placeholder.markdown("<div class='image-container'><p style='text-align:center;color:#999;'>No video uploaded</p></div>", unsafe_allow_html=True)
with col2:
st.markdown("### Processed Video")
process_button = st.empty()
progress_placeholder = st.empty()
stats_placeholder = st.empty()
result_video_placeholder = st.empty()
download_video_placeholder = st.empty()
if video_upload:
if process_button.button("๐ŸŽฌ Process Video", type="primary", use_container_width=True):
# Save uploaded file
input_path = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name
with open(input_path, "wb") as f:
f.write(video_upload.read())
output_path = str(Path(tempfile.gettempdir()) / "blurred_video.mp4")
def update_progress(value, current_frame=0, total_frames=0, faces=0):
percent = int(value * 100)
progress_placeholder.progress(value)
if total_frames > 0:
stats_placeholder.info(f"๐Ÿ“Š Frame: {current_frame}/{total_frames} | Progress: {percent}% | Faces: {faces}")
try:
final_output, total_faces = blur_faces_video(
input_path, output_path,
conf=conf, iou=iou, expand_ratio=expand_ratio,
mode=mode_choice, blur_kernel=blur_kernel,
mosaic=mosaic,
update_callback=update_progress, use_half=use_half
)
stats_placeholder.success(f"โœ… Complete! Blurred {total_faces} faces.")
result_video_placeholder.video(final_output)
with open(final_output, "rb") as file:
download_video_placeholder.download_button(
"โฌ‡๏ธ Download Video",
file,
file_name="blurred_video.mp4",
mime="video/mp4",
use_container_width=True
)
except Exception as e:
stats_placeholder.error(f"โŒ Error: {e}")
finally:
if os.path.exists(input_path):
os.remove(input_path)
else:
result_video_placeholder.markdown("<div class='image-container'><p style='text-align:center;color:#999;'>Processed video will appear here</p></div>", unsafe_allow_html=True)