jzou19950715's picture
Update app.py
c166129 verified
"""
Blockchain Wallet Analyzer - A tool for analyzing Ethereum wallet contents and NFT holdings.
This module provides a complete implementation of a blockchain wallet analysis tool
with a Gradio web interface. It includes wallet analysis, NFT tracking,
interactive chat capabilities using the OpenAI API,
and NFT image rendering from OpenSea via PIL objects.
Author: Claude
Date: January 2025
"""
from __future__ import annotations
import os
import re
import json
import time
import logging
import asyncio
import base64
from typing import List, Dict, Tuple, Any, Optional, TypeVar
from datetime import datetime
from decimal import Decimal
from dataclasses import dataclass
from pathlib import Path
from io import BytesIO
import aiohttp
import openai
import gradio as gr
from PIL import Image
from tenacity import retry, stop_after_attempt, wait_exponential
# Type variables
T = TypeVar('T')
WalletData = Dict[str, Any]
ChatHistory = List[Tuple[str, str]]
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('blockchain_analyzer.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class ConfigError(Exception):
"""Raised when there's an error in configuration."""
pass
class APIError(Exception):
"""Raised when there's an error in API calls."""
pass
class ValidationError(Exception):
"""Raised when there's an error in input validation."""
pass
@dataclass
class Config:
"""Application configuration settings."""
SYSTEM_PROMPT: str = """
You are LOSS DOG πŸ• (Learning & Observing Smart Systems Digital Output Generator),
an adorable blockchain-sniffing puppy!
Your personality:
- Friendly and enthusiastic
- Explain findings in fun, simple ways
- Provide NFT images from OpenSea when possible
Instructions:
- You have access to detailed wallet data in your context
- Use this data to provide specific answers about holdings
- Reference exact numbers and collections when discussing NFTs
- Compare wallets if multiple are available
"""
ETHERSCAN_BASE_URL: str = "https://api.etherscan.io/api"
ETHEREUM_ADDRESS_REGEX: str = r"0x[a-fA-F0-9]{40}"
RATE_LIMIT_DELAY: float = 0.2 # 5 requests per second
MAX_RETRIES: int = 3
OPENAI_MODEL: str = "gpt-4o-mini" # Example updated model
MAX_TOKENS: int = 4000
TEMPERATURE: float = 0.7
HISTORY_LIMIT: int = 5
#### Part 2: Wallet Analyzer (Etherscan)
class WalletAnalyzer:
"""Analyzes Ethereum wallet contents using Etherscan API."""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = Config.ETHERSCAN_BASE_URL
self.session: Optional[aiohttp.ClientSession] = None
self.last_request_time = 0
async def __aenter__(self) -> WalletAnalyzer:
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
if self.session:
await self.session.close()
self.session = None
@retry(
stop=stop_after_attempt(Config.MAX_RETRIES),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
async def _fetch_data(self, params: Dict[str, str]) -> Dict[str, Any]:
"""Fetch data from Etherscan with retry logic."""
if not self.session:
raise APIError("No active session. Use context manager.")
await self._rate_limit()
params["apikey"] = self.api_key
try:
async with self.session.get(self.base_url, params=params) as response:
if response.status != 200:
raise APIError(f"Etherscan request failed: {response.status}")
data = await response.json()
if data.get("status") == "0":
err = data.get("message", "Unknown Etherscan error")
if "Max rate limit reached" in err:
raise APIError("Etherscan rate limit exceeded")
raise APIError(f"Etherscan error: {err}")
return data
except aiohttp.ClientError as e:
raise APIError(f"Network error: {str(e)}")
except Exception as e:
raise APIError(f"Unexpected error: {str(e)}")
async def _rate_limit(self) -> None:
"""Simple rate limiting for Etherscan free tier."""
now = time.time()
diff = now - self.last_request_time
if diff < Config.RATE_LIMIT_DELAY:
await asyncio.sleep(Config.RATE_LIMIT_DELAY - diff)
self.last_request_time = time.time()
@staticmethod
def _validate_address(address: str) -> bool:
"""Validate Ethereum address format."""
return bool(re.match(Config.ETHEREUM_ADDRESS_REGEX, address))
async def get_portfolio_data(self, address: str) -> WalletData:
"""
Return a dictionary with:
- address
- last_updated
- eth_balance
- tokens (list of ERC-20)
- nft_collections (with contract & token_id)
"""
if not self._validate_address(address):
raise ValidationError(f"Invalid Ethereum address: {address}")
logger.info(f"Fetching portfolio data for {address}")
eth_balance = await self._get_eth_balance(address)
tokens = await self._get_token_holdings(address)
nft_colls = await self._get_nft_holdings(address)
return {
"address": address,
"last_updated": datetime.now().isoformat(),
"eth_balance": float(eth_balance),
"tokens": tokens,
"nft_collections": nft_colls
}
async def _get_eth_balance(self, address: str) -> Decimal:
params = {
"module": "account",
"action": "balance",
"address": address,
"tag": "latest"
}
data = await self._fetch_data(params)
return Decimal(data["result"]) / Decimal("1e18")
async def _get_token_holdings(self, address: str) -> List[Dict[str, Any]]:
"""Fetch ERC-20 tokens for address."""
params = {
"module": "account",
"action": "tokentx",
"address": address,
"sort": "desc"
}
data = await self._fetch_data(params)
token_map: Dict[str, Dict[str, Any]] = {}
for tx in data.get("result", []):
contract = tx["contractAddress"]
if contract not in token_map:
token_map[contract] = {
"name": tx["tokenName"],
"symbol": tx["tokenSymbol"],
"decimals": int(tx["tokenDecimal"]),
"balance": Decimal(0)
}
amount = Decimal(tx["value"]) / Decimal(10 ** token_map[contract]["decimals"])
if tx["to"].lower() == address.lower():
token_map[contract]["balance"] += amount
elif tx["from"].lower() == address.lower():
token_map[contract]["balance"] -= amount
return [
{
"name": v["name"],
"symbol": v["symbol"],
"balance": float(v["balance"])
}
for v in token_map.values() if v["balance"] > 0
]
async def _get_nft_holdings(self, address: str) -> Dict[str, Any]:
"""
Return { "collections": [ { "collection_name": str, "items": [{ contract, token_id }, ...] }, ... ] }
so we can fetch images from OpenSea by contract + token_id.
"""
params = {
"module": "account",
"action": "tokennfttx",
"address": address,
"sort": "desc"
}
data = await self._fetch_data(params)
if data.get("status") != "1" or "result" not in data:
return {"collections": []}
# track final ownership
ownership_map = {} # key = (contract_token), value = {contract, token_id, coll_name}
for tx in data["result"]:
contract = tx["contractAddress"]
coll_name = tx.get("tokenName", "Unknown Collection")
token_id = tx["tokenID"]
key = f"{contract}_{token_id}"
if tx["to"].lower() == address.lower():
ownership_map[key] = {
"contract": contract,
"token_id": token_id,
"collection_name": coll_name
}
elif tx["from"].lower() == address.lower():
if key in ownership_map:
ownership_map.pop(key, None)
# group by collection_name
coll_dict: Dict[str, List[Dict[str, str]]] = {}
for item in ownership_map.values():
c_name = item["collection_name"]
if c_name not in coll_dict:
coll_dict[c_name] = []
coll_dict[c_name].append({
"contract": item["contract"],
"token_id": item["token_id"]
})
collections_out = []
for cname, items in coll_dict.items():
collections_out.append({
"collection_name": cname,
"items": items
})
return { "collections": collections_out }
#### Part 3: OpenSea & PIL Conversion
async def fetch_nft_metadata(opensea_key: str, contract: str, token_id: str) -> Dict[str, Any]:
"""
Fetch NFT metadata (including image_url) from OpenSea v2.
Returns { "name": str, "image_url": str } or { "error": str }
"""
url = f"https://api.opensea.io/api/v2/chain/ethereum/contract/{contract}/nfts/{token_id}"
headers = {"X-API-KEY": opensea_key} if opensea_key else {}
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as resp:
if resp.status == 403:
return {"error": "403 Forbidden: OpenSea API key issue"}
if resp.status == 404:
return {"error": f"404 Not Found: {contract} #{token_id}"}
try:
data = await resp.json()
except Exception as e:
return {"error": f"OpenSea JSON parse error: {str(e)}"}
nft_obj = data.get("nft", {})
name = nft_obj.get("name", f"NFT #{token_id}")
image_url = nft_obj.get("image_url", "")
return {"name": name, "image_url": image_url}
async def fetch_image_as_pil(url: str) -> Optional[Image.Image]:
"""
Download an image from a URL and return as a PIL Image.
"""
if not url:
return None
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
if resp.status != 200:
return None
raw_bytes = await resp.read()
try:
return Image.open(BytesIO(raw_bytes))
except Exception as e:
logger.error(f"Error converting to PIL: {e}")
return None
#### Part 4: ChatInterface
class ChatInterface:
"""Handles chat logic with Etherscan (wallet data), OpenSea (NFT images), and OpenAI (chat)."""
def __init__(self, openai_key: str, etherscan_key: str, opensea_key: str):
self.openai_key = openai_key
self.etherscan_key = etherscan_key
self.opensea_key = opensea_key
self.context: Dict[str, Any] = {}
openai.api_key = openai_key
@staticmethod
def _validate_api_keys(openai_key: str, etherscan_key: str, opensea_key: str) -> Tuple[bool, str]:
"""
Validate all keys. We'll do a minimal check for OpenSea (non-empty).
"""
try:
# Check OpenAI
client = openai.OpenAI(api_key=openai_key)
client.chat.completions.create(
model=Config.OPENAI_MODEL,
messages=[{"role": "user", "content": "test"}],
max_tokens=1
)
# Check Etherscan
async def check_etherscan():
async with WalletAnalyzer(etherscan_key) as analyzer:
params = {"module": "stats", "action": "ethsupply"}
await analyzer._fetch_data(params)
asyncio.run(check_etherscan())
# Check OpenSea
if not opensea_key.strip():
return False, "OpenSea API key is empty!"
return True, "All API keys validated!"
except Exception as e:
return False, f"API key validation failed: {str(e)}"
def _format_context_message(self) -> str:
"""Format wallet data for GPT system prompt."""
lines = []
if not self.context:
return ""
lines.append("Current Wallet Data:")
for addr, wdata in self.context.items():
lines.append(f"Wallet {addr[:8]}...{addr[-6:]}:")
lines.append(f" ETH Balance: {wdata['eth_balance']:.4f}")
lines.append(f" # of Tokens: {len(wdata['tokens'])}")
# NFT aggregator
nft_data = wdata["nft_collections"]
if "collections" in nft_data:
lines.append(" NFT Collections:")
for c in nft_data["collections"]:
lines.append(f" - {c['collection_name']}: {len(c['items'])} NFT(s)")
return "\n".join(lines)
async def process_message(
self,
message: str,
history: Optional[ChatHistory] = None
) -> Tuple[ChatHistory, Dict[str, Any], List[Image.Image]]:
"""
1) Detect Ethereum address
2) Etherscan for wallet data
3) For each NFT, fetch from OpenSea, convert to PIL
4) Return images + chat response
"""
if history is None:
history = []
if not message.strip():
return history, self.context, []
match = re.search(Config.ETHEREUM_ADDRESS_REGEX, message)
nft_images: List[Image.Image] = []
if match:
eth_address = match.group(0)
# partial
partial_text = f"Analyzing {eth_address}..."
history.append((message, partial_text))
try:
# Etherscan
async with WalletAnalyzer(self.etherscan_key) as analyzer:
wallet_data = await analyzer.get_portfolio_data(eth_address)
self.context[eth_address] = wallet_data
# Summaries
lines = [
f"πŸ“Š Summary for {eth_address[:8]}...{eth_address[-6:]}",
f"ETH: {wallet_data['eth_balance']:.4f}",
f"Tokens: {len(wallet_data['tokens'])}"
]
nft_info = wallet_data["nft_collections"]
total_nfts = 0
if "collections" in nft_info:
for c in nft_info["collections"]:
total_nfts += len(c["items"])
lines.append(f"NFTs: {total_nfts}")
# Add to chat
history.append((message, "\n".join(lines)))
# If we want to fetch images for, say, first 2 collections, 2 items each
if "collections" in nft_info:
for coll in nft_info["collections"][:2]:
for item in coll["items"][:2]:
# 1) fetch metadata
meta = await fetch_nft_metadata(self.opensea_key, item["contract"], item["token_id"])
if "error" in meta:
logger.warning(f"OpenSea metadata error: {meta['error']}")
continue
image_url = meta["image_url"]
if not image_url:
logger.info(f"No image for {meta['name']}")
continue
# 2) Download + convert to PIL
pil_img = await fetch_image_as_pil(image_url)
if pil_img:
nft_images.append(pil_img)
# Also mention in chat
found_msg = f"Found NFT image: {meta['name']} (contract {item['contract'][:8]}...{item['contract'][-6:]}, id={item['token_id']})"
history.append((message, found_msg))
except Exception as e:
err = f"Error analyzing {eth_address}: {str(e)}"
logger.error(err)
history.append((message, err))
# Now do an OpenAI chat
try:
context_str = self._format_context_message()
# Convert chat to OpenAI format
limit = Config.HISTORY_LIMIT
short_hist = history[-limit:]
openai_msgs = []
for usr, ans in short_hist:
openai_msgs.append({"role": "user", "content": usr})
openai_msgs.append({"role": "assistant", "content": ans})
# openai
openai.api_key = self.openai_key
client = openai.OpenAI(api_key=self.openai_key)
resp = client.chat.completions.create(
model=Config.OPENAI_MODEL,
messages=[
{"role": "system", "content": Config.SYSTEM_PROMPT},
{"role": "system", "content": context_str},
*openai_msgs,
{"role": "user", "content": message}
],
temperature=Config.TEMPERATURE,
max_tokens=Config.MAX_TOKENS
)
final_msg = resp.choices[0].message.content
history.append((message, final_msg))
return history, self.context, nft_images
except Exception as e:
logger.error(f"OpenAI error: {e}")
err_chat = f"OpenAI error: {str(e)}"
history.append((message, err_chat))
return history, self.context, []
def clear_context(self) -> Tuple[Dict[str, Any], List[Tuple[str, str]]]:
"""Clear the wallet context and chat."""
self.context = {}
return {}, []
#### Part 5: Gradio UI & Launch
class GradioInterface:
"""Manages the Gradio UI for Hugging Face Space, with top-right NFT gallery as PIL images."""
def __init__(self):
self.chat_interface: Optional[ChatInterface] = None
self.demo = self._create_interface()
def _create_interface(self) -> gr.Blocks:
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# πŸ• LOSS DOG: Blockchain Wallet Analyzer (NFT Images, top-right)
- Enter your **OpenAI**, **Etherscan**, **OpenSea** keys below
- Validate, then chat with your Ethereum address
- NFT images will appear as **PIL** objects in the top-right
""")
with gr.Row():
openai_key = gr.Textbox(
label="OpenAI API Key",
type="password",
placeholder="Enter your OpenAI API key..."
)
etherscan_key = gr.Textbox(
label="Etherscan API Key",
type="password",
placeholder="Enter your Etherscan API key..."
)
opensea_key = gr.Textbox(
label="OpenSea API Key",
type="password",
placeholder="Enter your OpenSea API key..."
)
validation_status = gr.Textbox(label="Validation Status", interactive=False)
validate_btn = gr.Button("Validate API Keys", variant="primary")
with gr.Row():
# Main Chat Column
with gr.Column(scale=2):
chatbot = gr.Chatbot(label="Chat History", height=420, value=[])
with gr.Row():
msg_input = gr.Textbox(
label="Message",
placeholder="Enter an ETH address or question..."
)
send_btn = gr.Button("Send", variant="primary")
# Top-Right: NFT Gallery (PIL images)
with gr.Column(scale=1):
nft_gallery = gr.Gallery(
label="NFT Images (Top-Right)",
columns=2
)
wallet_context = gr.JSON(
label="Active Wallet Context",
value={}
)
clear_btn = gr.Button("Clear Context")
msg_input.interactive = False
send_btn.interactive = False
def validate_keys(openai_k: str, etherscan_k: str, opensea_k: str) -> Tuple[str, gr.update, gr.update]:
"""Validate user-provided keys; enable chat if all pass."""
is_valid, msg = ChatInterface._validate_api_keys(openai_k, etherscan_k, opensea_k)
if is_valid:
self.chat_interface = ChatInterface(openai_k, etherscan_k, opensea_k)
return (
f"βœ… {msg}",
gr.update(interactive=True),
gr.update(interactive=True)
)
else:
return (
f"❌ {msg}",
gr.update(interactive=False),
gr.update(interactive=False)
)
validate_btn.click(
fn=validate_keys,
inputs=[openai_key, etherscan_key, opensea_key],
outputs=[validation_status, msg_input, send_btn]
)
def clear_all():
"""Clear wallet context and chat."""
if self.chat_interface:
return self.chat_interface.clear_context()
return {}, []
clear_btn.click(
fn=clear_all,
inputs=[],
outputs=[wallet_context, chatbot]
)
async def handle_message(
message: str,
chat_hist: List[Tuple[str, str]],
context: Dict[str, Any]
) -> Tuple[List[Tuple[str, str]], Dict[str, Any], List[Image.Image]]:
"""Process user input, return updated chat, context, list of PIL images."""
if not self.chat_interface:
return [], {}, []
try:
new_history, new_context, images = await self.chat_interface.process_message(message, chat_hist)
return new_history, new_context, images
except Exception as e:
logger.error(f"Error in handle_message: {e}")
if chat_hist is None:
chat_hist = []
chat_hist.append((message, f"Error: {str(e)}"))
return chat_hist, context, []
# Chat flow
msg_input.submit(
fn=handle_message,
inputs=[msg_input, chatbot, wallet_context],
outputs=[chatbot, wallet_context, nft_gallery]
).then(
lambda: gr.update(value=""),
None,
[msg_input]
)
send_btn.click(
fn=handle_message,
inputs=[msg_input, chatbot, wallet_context],
outputs=[chatbot, wallet_context, nft_gallery]
).then(
lambda: gr.update(value=""),
None,
[msg_input]
)
return demo
def launch(self):
self.demo.queue()
self.demo.launch()
def main():
"""Main entry point for Hugging Face Space."""
logger.info("Launching LOSS DOG with PIL-based NFT images (top-right).")
interface = GradioInterface()
interface.launch()
if __name__ == "__main__":
main()