Spaces:
Sleeping
Sleeping
import streamlit as st | |
import requests | |
import hashlib | |
import pandas as pd | |
import json | |
import matplotlib.pyplot as plt | |
import seaborn as sns | |
from io import BytesIO | |
from PIL import Image | |
from pefile import PE, PEFormatError | |
import os | |
import re | |
# VirusTotal API details | |
VIRUSTOTAL_API_KEY = 'ed48e6407e0b7975be7d19c797e1217f500183c9ae84d1119af8628ba4c98c3d' | |
# streamlit framework | |
st.set_page_config( | |
page_title="OxThreat", | |
page_icon="π", | |
layout="wide" | |
) | |
# Function to calculate the file's SHA-256 hash | |
def get_file_hash(file): | |
file.seek(0) # Reset file pointer to the beginning | |
file_hash = hashlib.sha256(file.read()).hexdigest() | |
file.seek(0) # Reset file pointer to the beginning | |
return file_hash | |
# Function to analyze the file using VirusTotal | |
def virustotal_analysis(file_hash): | |
url = f"https://www.virustotal.com/api/v3/files/{file_hash}" | |
headers = {"x-apikey": VIRUSTOTAL_API_KEY} | |
response = requests.get(url, headers=headers) | |
if response.status_code == 200: | |
return response.json() | |
else: | |
st.error("Error with VirusTotal API request. Please check your API key or the file hash.") | |
return None | |
# Function to extract metadata from PE files | |
def extract_metadata(file): | |
try: | |
pe = PE(data=file.read()) | |
metadata = { | |
"Number of Sections": pe.FILE_HEADER.NumberOfSections, | |
"Time Date Stamp": pe.FILE_HEADER.TimeDateStamp, | |
"Characteristics": pe.FILE_HEADER.Characteristics, | |
} | |
return metadata | |
except PEFormatError: | |
st.error("Uploaded file is not a valid PE format.") | |
return None | |
# Function to analyze log files | |
def analyze_log_file(log_content): | |
errors = re.findall(r'ERROR.*', log_content) | |
return pd.DataFrame(errors, columns=["Errors"]) | |
# Function to create charts from VirusTotal results | |
def create_virus_total_charts(virus_total_results): | |
if not virus_total_results: | |
return None | |
stats = virus_total_results['data']['attributes']['last_analysis_stats'] | |
labels = list(stats.keys()) | |
values = list(stats.values()) | |
fig, ax = plt.subplots(figsize=(10, 5)) | |
sns.barplot(x=labels, y=values, palette="viridis", ax=ax) | |
ax.set_title("VirusTotal Analysis Results", fontsize=16, fontweight='bold') | |
ax.set_xlabel("Analysis Types", fontsize=14) | |
ax.set_ylabel("Count", fontsize=14) | |
return fig | |
# Function to create detailed tables from JSON data | |
def create_detailed_table(data, title): | |
st.write(f"### {title}") | |
# Normalize JSON data into a DataFrame | |
df = pd.json_normalize(data) | |
# Debug: Show raw data and DataFrame | |
st.write("Raw Data:", data) | |
if df.empty: | |
st.write("No data available.") | |
else: | |
# Apply minimal styling for debugging | |
styled_df = df.style.background_gradient(cmap='viridis') \ | |
.format(na_rep='N/A', precision=2) | |
# Display the styled DataFrame | |
st.dataframe(styled_df) | |
# Function to display the analysis results on the dashboard | |
def display_analysis_results(metadata, virus_total_results, log_analysis=None): | |
st.write("## Analysis Results") | |
col1, col2 = st.columns([2, 1]) | |
# Metadata | |
with col1: | |
if metadata: | |
st.write("### π PE File Metadata") | |
create_detailed_table(metadata, "PE File Metadata") | |
# VirusTotal Results | |
with col1: | |
if virus_total_results: | |
st.write("### π¦ VirusTotal Results") | |
create_detailed_table(virus_total_results['data'], "VirusTotal Results") | |
st.write("#### π VirusTotal Analysis Stats") | |
fig = create_virus_total_charts(virus_total_results) | |
if fig: | |
st.pyplot(fig) | |
# Log Analysis | |
with col2: | |
if log_analysis is not None: | |
st.write("### π Log Analysis") | |
st.table(log_analysis) | |
# Main page of the Streamlit app | |
def main_page(): | |
st.title("π¦ Malware Analysis Tool") | |
st.markdown("---") | |
st.image('ui/antivirus.png', width=200, use_column_width='always') | |
if st.button("Go to File Analysis ποΈ"): | |
st.session_state.page = "file_analysis" | |
st.experimental_rerun() | |
# File analysis page where the user can upload files for analysis | |
def file_analysis_page(): | |
st.title("π File Analysis Dashboard") | |
st.markdown("---") | |
st.image('ui/antivirus.png', width=80, use_column_width='none') | |
uploaded_file = st.file_uploader("Upload any file for analysis", type=["exe", "dll", "log", "pdf", "png", "jpg", "jpeg", "gif", "txt", "zip", "rar", "apk"]) | |
if uploaded_file: | |
file_hash = get_file_hash(uploaded_file) | |
st.write(f"SHA-256 Hash: {file_hash}") | |
file_extension = uploaded_file.name.split('.')[-1].lower() | |
# Handle different file types | |
if file_extension in ['png', 'jpg', 'jpeg', 'gif']: | |
st.write("### π Image Preview") | |
image = Image.open(uploaded_file) | |
image.thumbnail((150, 150)) # Resize for preview | |
st.image(image, caption='Uploaded Image', use_column_width=True) | |
metadata = None | |
virus_total_results = None | |
log_analysis = None | |
elif file_extension == 'pdf': | |
st.write("### π PDF File") | |
st.write("PDF preview is not supported. Please use other tools to view.") | |
st.download_button(label="Download PDF", data=uploaded_file, file_name=uploaded_file.name) | |
metadata = None | |
virus_total_results = None | |
log_analysis = None | |
elif file_extension in ['txt', 'log']: | |
st.write("### π Log File Content") | |
log_content = uploaded_file.getvalue().decode("utf-8") | |
log_analysis = analyze_log_file(log_content) | |
metadata = None | |
virus_total_results = None | |
elif file_extension in ['zip', 'rar']: | |
st.write("### π¦ Compressed File") | |
st.write("Compressed files require further extraction and analysis.") | |
metadata = None | |
virus_total_results = None | |
log_analysis = None | |
elif file_extension in ['apk', 'exe', 'dll']: | |
# Save uploaded file temporarily | |
file_path = f"./temp/{uploaded_file.name}" | |
os.makedirs(os.path.dirname(file_path), exist_ok=True) | |
with open(file_path, "wb") as f: | |
f.write(uploaded_file.getbuffer()) | |
try: | |
with open(file_path, "rb") as file: | |
file_hash = get_file_hash(file) | |
metadata = extract_metadata(file) | |
virus_total_results = virustotal_analysis(file_hash) | |
finally: | |
# Clean up | |
os.remove(file_path) | |
log_analysis = None | |
else: | |
st.error("Unsupported file type.") | |
metadata = None | |
virus_total_results = None | |
log_analysis = None | |
display_analysis_results(metadata, virus_total_results, log_analysis) | |
# Initialize session state for page navigation | |
if 'page' not in st.session_state: | |
st.session_state.page = "main" | |
# Routing based on page state | |
if st.session_state.page == "main": | |
main_page() | |
elif st.session_state.page == "file_analysis": | |
file_analysis_page() | |
def analyze_log_file(log_content): | |
# Data storage structures for IPs, Domains, Headers, Sessions | |
ip_data = [] | |
domain_data = [] | |
header_data = [] | |
session_data = [] | |
# Regular expressions for matching | |
ip_regex = re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b') | |
domain_regex = re.compile(r'\b[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}\b') | |
header_regex = re.compile(r'(User-Agent|Content-Type|Authorization):\s*(.*)', re.IGNORECASE) | |
session_regex = re.compile(r'SessionID:\s*([a-zA-Z0-9]+)') | |
log_entries = [] | |
for line in log_content.splitlines(): | |
# Match IPs | |
ips = ip_regex.findall(line) | |
if ips: | |
ip_data.extend(ips) | |
# Match Domains | |
domains = domain_regex.findall(line) | |
if domains: | |
domain_data.extend(domains) | |
# Match Headers | |
headers = header_regex.findall(line) | |
if headers: | |
header_data.extend(headers) | |
# Match Sessions | |
sessions = session_regex.findall(line) | |
if sessions: | |
session_data.extend(sessions) | |
log_entries.append(line) | |
# Convert to DataFrame | |
log_df = pd.DataFrame(log_entries, columns=["Log Entries"]) | |
# Additional DataFrames for captured data | |
ip_df = pd.DataFrame(ip_data, columns=["IP Addresses"]) | |
domain_df = pd.DataFrame(domain_data, columns=["Domains"]) | |
header_df = pd.DataFrame(header_data, columns=["Header Name", "Header Value"]) | |
session_df = pd.DataFrame(session_data, columns=["Session IDs"]) | |
# Summary of findings | |
summary = { | |
"log_dataframe": log_df, | |
"ip_dataframe": ip_df, | |
"domain_dataframe": domain_df, | |
"header_dataframe": header_df, | |
"session_dataframe": session_df | |
} | |
return summary | |
# Log Analysis Section | |
if log_analysis is not None: | |
st.write("### π Log Analysis") | |
# First row: IP Addresses and Domains | |
col1, col2 = st.columns(2) | |
with col1: | |
st.write("**IP Addresses:**") | |
st.dataframe(log_analysis.get("ip_dataframe")) | |
with col2: | |
st.write("**Domains:**") | |
st.dataframe(log_analysis.get("domain_dataframe")) | |
# Second row: Log Entries, Session IDs, Headers | |
col3, col4, col5 = st.columns([2, 1, 1]) | |
with col3: | |
st.write("**Log Entries:**") | |
st.dataframe(log_analysis.get("log_dataframe")) | |
with col4: | |
st.write("**Session IDs:**") | |
if not log_analysis.get("session_dataframe").empty: | |
st.dataframe(log_analysis.get("session_dataframe")) | |
else: | |
st.write("No session IDs found.") | |
with col5: | |
st.write("**Headers:**") | |
if not log_analysis.get("header_dataframe").empty: | |
st.dataframe(log_analysis.get("header_dataframe")) | |
else: | |
st.write("No headers found.") | |