import streamlit as st import requests import hashlib import pandas as pd import json import matplotlib.pyplot as plt import seaborn as sns from io import BytesIO from PIL import Image from pefile import PE, PEFormatError import os import re # VirusTotal API details VIRUSTOTAL_API_KEY = 'ed48e6407e0b7975be7d19c797e1217f500183c9ae84d1119af8628ba4c98c3d' # streamlit framework st.set_page_config( page_title="OxThreat", page_icon="🔍", layout="wide" ) # Function to calculate the file's SHA-256 hash def get_file_hash(file): file.seek(0) # Reset file pointer to the beginning file_hash = hashlib.sha256(file.read()).hexdigest() file.seek(0) # Reset file pointer to the beginning return file_hash # Function to analyze the file using VirusTotal def virustotal_analysis(file_hash): url = f"https://www.virustotal.com/api/v3/files/{file_hash}" headers = {"x-apikey": VIRUSTOTAL_API_KEY} response = requests.get(url, headers=headers) if response.status_code == 200: return response.json() else: st.error("Error with VirusTotal API request. Please check your API key or the file hash.") return None # Function to extract metadata from PE files def extract_metadata(file): try: pe = PE(data=file.read()) metadata = { "Number of Sections": pe.FILE_HEADER.NumberOfSections, "Time Date Stamp": pe.FILE_HEADER.TimeDateStamp, "Characteristics": pe.FILE_HEADER.Characteristics, } return metadata except PEFormatError: st.error("Uploaded file is not a valid PE format.") return None # Function to analyze log files def analyze_log_file(log_content): errors = re.findall(r'ERROR.*', log_content) return pd.DataFrame(errors, columns=["Errors"]) # Function to create charts from VirusTotal results def create_virus_total_charts(virus_total_results): if not virus_total_results: return None stats = virus_total_results['data']['attributes']['last_analysis_stats'] labels = list(stats.keys()) values = list(stats.values()) fig, ax = plt.subplots(figsize=(10, 5)) sns.barplot(x=labels, y=values, palette="viridis", ax=ax) ax.set_title("VirusTotal Analysis Results", fontsize=16, fontweight='bold') ax.set_xlabel("Analysis Types", fontsize=14) ax.set_ylabel("Count", fontsize=14) return fig # Function to create detailed tables from JSON data def create_detailed_table(data, title): st.write(f"### {title}") # Normalize JSON data into a DataFrame df = pd.json_normalize(data) # Debug: Show raw data and DataFrame st.write("Raw Data:", data) if df.empty: st.write("No data available.") else: # Apply minimal styling for debugging styled_df = df.style.background_gradient(cmap='viridis') \ .format(na_rep='N/A', precision=2) # Display the styled DataFrame st.dataframe(styled_df) # Function to display the analysis results on the dashboard def display_analysis_results(metadata, virus_total_results, log_analysis=None): st.write("## Analysis Results") col1, col2 = st.columns([2, 1]) # Metadata with col1: if metadata: st.write("### 📂 PE File Metadata") create_detailed_table(metadata, "PE File Metadata") # VirusTotal Results with col1: if virus_total_results: st.write("### 🦠 VirusTotal Results") create_detailed_table(virus_total_results['data'], "VirusTotal Results") st.write("#### 📊 VirusTotal Analysis Stats") fig = create_virus_total_charts(virus_total_results) if fig: st.pyplot(fig) # Log Analysis with col2: if log_analysis is not None: st.write("### 📝 Log Analysis") st.table(log_analysis) # Main page of the Streamlit app def main_page(): st.title("🦠 Malware Analysis Tool") st.markdown("---") st.image('ui/antivirus.png', width=200, use_column_width='always') if st.button("Go to File Analysis 🗂️"): st.session_state.page = "file_analysis" st.experimental_rerun() # File analysis page where the user can upload files for analysis def file_analysis_page(): st.title("🔍 File Analysis Dashboard") st.markdown("---") st.image('ui/antivirus.png', width=80, use_column_width='none') uploaded_file = st.file_uploader("Upload any file for analysis", type=["exe", "dll", "log", "pdf", "png", "jpg", "jpeg", "gif", "txt", "zip", "rar", "apk"]) if uploaded_file: file_hash = get_file_hash(uploaded_file) st.write(f"SHA-256 Hash: {file_hash}") file_extension = uploaded_file.name.split('.')[-1].lower() # Handle different file types if file_extension in ['png', 'jpg', 'jpeg', 'gif']: st.write("### 📄 Image Preview") image = Image.open(uploaded_file) image.thumbnail((150, 150)) # Resize for preview st.image(image, caption='Uploaded Image', use_column_width=True) metadata = None virus_total_results = None log_analysis = None elif file_extension == 'pdf': st.write("### 📄 PDF File") st.write("PDF preview is not supported. Please use other tools to view.") st.download_button(label="Download PDF", data=uploaded_file, file_name=uploaded_file.name) metadata = None virus_total_results = None log_analysis = None elif file_extension in ['txt', 'log']: st.write("### 📝 Log File Content") log_content = uploaded_file.getvalue().decode("utf-8") log_analysis = analyze_log_file(log_content) metadata = None virus_total_results = None elif file_extension in ['zip', 'rar']: st.write("### 📦 Compressed File") st.write("Compressed files require further extraction and analysis.") metadata = None virus_total_results = None log_analysis = None elif file_extension in ['apk', 'exe', 'dll']: # Save uploaded file temporarily file_path = f"./temp/{uploaded_file.name}" os.makedirs(os.path.dirname(file_path), exist_ok=True) with open(file_path, "wb") as f: f.write(uploaded_file.getbuffer()) try: with open(file_path, "rb") as file: file_hash = get_file_hash(file) metadata = extract_metadata(file) virus_total_results = virustotal_analysis(file_hash) finally: # Clean up os.remove(file_path) log_analysis = None else: st.error("Unsupported file type.") metadata = None virus_total_results = None log_analysis = None display_analysis_results(metadata, virus_total_results, log_analysis) # Initialize session state for page navigation if 'page' not in st.session_state: st.session_state.page = "main" # Routing based on page state if st.session_state.page == "main": main_page() elif st.session_state.page == "file_analysis": file_analysis_page() def analyze_log_file(log_content): # Data storage structures for IPs, Domains, Headers, Sessions ip_data = [] domain_data = [] header_data = [] session_data = [] # Regular expressions for matching ip_regex = re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b') domain_regex = re.compile(r'\b[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}\b') header_regex = re.compile(r'(User-Agent|Content-Type|Authorization):\s*(.*)', re.IGNORECASE) session_regex = re.compile(r'SessionID:\s*([a-zA-Z0-9]+)') log_entries = [] for line in log_content.splitlines(): # Match IPs ips = ip_regex.findall(line) if ips: ip_data.extend(ips) # Match Domains domains = domain_regex.findall(line) if domains: domain_data.extend(domains) # Match Headers headers = header_regex.findall(line) if headers: header_data.extend(headers) # Match Sessions sessions = session_regex.findall(line) if sessions: session_data.extend(sessions) log_entries.append(line) # Convert to DataFrame log_df = pd.DataFrame(log_entries, columns=["Log Entries"]) # Additional DataFrames for captured data ip_df = pd.DataFrame(ip_data, columns=["IP Addresses"]) domain_df = pd.DataFrame(domain_data, columns=["Domains"]) header_df = pd.DataFrame(header_data, columns=["Header Name", "Header Value"]) session_df = pd.DataFrame(session_data, columns=["Session IDs"]) # Summary of findings summary = { "log_dataframe": log_df, "ip_dataframe": ip_df, "domain_dataframe": domain_df, "header_dataframe": header_df, "session_dataframe": session_df } return summary # Log Analysis Section if log_analysis is not None: st.write("### 📝 Log Analysis") # First row: IP Addresses and Domains col1, col2 = st.columns(2) with col1: st.write("**IP Addresses:**") st.dataframe(log_analysis.get("ip_dataframe")) with col2: st.write("**Domains:**") st.dataframe(log_analysis.get("domain_dataframe")) # Second row: Log Entries, Session IDs, Headers col3, col4, col5 = st.columns([2, 1, 1]) with col3: st.write("**Log Entries:**") st.dataframe(log_analysis.get("log_dataframe")) with col4: st.write("**Session IDs:**") if not log_analysis.get("session_dataframe").empty: st.dataframe(log_analysis.get("session_dataframe")) else: st.write("No session IDs found.") with col5: st.write("**Headers:**") if not log_analysis.get("header_dataframe").empty: st.dataframe(log_analysis.get("header_dataframe")) else: st.write("No headers found.")