|
""" |
|
Blockchain Wallet Analyzer - A tool for analyzing Ethereum wallet contents and NFT holdings. |
|
|
|
This module provides a complete implementation of a blockchain wallet analysis tool |
|
with a Gradio web interface. It includes wallet analysis, NFT tracking, |
|
interactive chat capabilities using the OpenAI API, |
|
and NFT image rendering from OpenSea via PIL objects. |
|
|
|
Author: Claude |
|
Date: January 2025 |
|
""" |
|
|
|
from __future__ import annotations |
|
|
|
import os |
|
import re |
|
import json |
|
import time |
|
import logging |
|
import asyncio |
|
import base64 |
|
from typing import List, Dict, Tuple, Any, Optional, TypeVar |
|
from datetime import datetime |
|
from decimal import Decimal |
|
from dataclasses import dataclass |
|
from pathlib import Path |
|
from io import BytesIO |
|
|
|
import aiohttp |
|
import openai |
|
import gradio as gr |
|
from PIL import Image |
|
from tenacity import retry, stop_after_attempt, wait_exponential |
|
|
|
|
|
T = TypeVar('T') |
|
WalletData = Dict[str, Any] |
|
ChatHistory = List[Tuple[str, str]] |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
|
handlers=[ |
|
logging.FileHandler('blockchain_analyzer.log'), |
|
logging.StreamHandler() |
|
] |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
class ConfigError(Exception): |
|
"""Raised when there's an error in configuration.""" |
|
pass |
|
|
|
class APIError(Exception): |
|
"""Raised when there's an error in API calls.""" |
|
pass |
|
|
|
class ValidationError(Exception): |
|
"""Raised when there's an error in input validation.""" |
|
pass |
|
|
|
@dataclass |
|
class Config: |
|
"""Application configuration settings.""" |
|
SYSTEM_PROMPT: str = """ |
|
You are LOSS DOG π (Learning & Observing Smart Systems Digital Output Generator), |
|
an adorable blockchain-sniffing puppy! |
|
Your personality: |
|
- Friendly and enthusiastic |
|
- Explain findings in fun, simple ways |
|
- Provide NFT images from OpenSea when possible |
|
|
|
Instructions: |
|
- You have access to detailed wallet data in your context |
|
- Use this data to provide specific answers about holdings |
|
- Reference exact numbers and collections when discussing NFTs |
|
- Compare wallets if multiple are available |
|
""" |
|
ETHERSCAN_BASE_URL: str = "https://api.etherscan.io/api" |
|
ETHEREUM_ADDRESS_REGEX: str = r"0x[a-fA-F0-9]{40}" |
|
RATE_LIMIT_DELAY: float = 0.2 |
|
MAX_RETRIES: int = 3 |
|
OPENAI_MODEL: str = "gpt-4o-mini" |
|
MAX_TOKENS: int = 4000 |
|
TEMPERATURE: float = 0.7 |
|
HISTORY_LIMIT: int = 5 |
|
|
|
|
|
class WalletAnalyzer: |
|
"""Analyzes Ethereum wallet contents using Etherscan API.""" |
|
|
|
def __init__(self, api_key: str): |
|
self.api_key = api_key |
|
self.base_url = Config.ETHERSCAN_BASE_URL |
|
self.session: Optional[aiohttp.ClientSession] = None |
|
self.last_request_time = 0 |
|
|
|
async def __aenter__(self) -> WalletAnalyzer: |
|
self.session = aiohttp.ClientSession() |
|
return self |
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: |
|
if self.session: |
|
await self.session.close() |
|
self.session = None |
|
|
|
@retry( |
|
stop=stop_after_attempt(Config.MAX_RETRIES), |
|
wait=wait_exponential(multiplier=1, min=4, max=10) |
|
) |
|
async def _fetch_data(self, params: Dict[str, str]) -> Dict[str, Any]: |
|
"""Fetch data from Etherscan with retry logic.""" |
|
if not self.session: |
|
raise APIError("No active session. Use context manager.") |
|
|
|
await self._rate_limit() |
|
params["apikey"] = self.api_key |
|
try: |
|
async with self.session.get(self.base_url, params=params) as response: |
|
if response.status != 200: |
|
raise APIError(f"Etherscan request failed: {response.status}") |
|
data = await response.json() |
|
if data.get("status") == "0": |
|
err = data.get("message", "Unknown Etherscan error") |
|
if "Max rate limit reached" in err: |
|
raise APIError("Etherscan rate limit exceeded") |
|
raise APIError(f"Etherscan error: {err}") |
|
return data |
|
except aiohttp.ClientError as e: |
|
raise APIError(f"Network error: {str(e)}") |
|
except Exception as e: |
|
raise APIError(f"Unexpected error: {str(e)}") |
|
|
|
async def _rate_limit(self) -> None: |
|
"""Simple rate limiting for Etherscan free tier.""" |
|
now = time.time() |
|
diff = now - self.last_request_time |
|
if diff < Config.RATE_LIMIT_DELAY: |
|
await asyncio.sleep(Config.RATE_LIMIT_DELAY - diff) |
|
self.last_request_time = time.time() |
|
|
|
@staticmethod |
|
def _validate_address(address: str) -> bool: |
|
"""Validate Ethereum address format.""" |
|
return bool(re.match(Config.ETHEREUM_ADDRESS_REGEX, address)) |
|
|
|
async def get_portfolio_data(self, address: str) -> WalletData: |
|
""" |
|
Return a dictionary with: |
|
- address |
|
- last_updated |
|
- eth_balance |
|
- tokens (list of ERC-20) |
|
- nft_collections (with contract & token_id) |
|
""" |
|
if not self._validate_address(address): |
|
raise ValidationError(f"Invalid Ethereum address: {address}") |
|
|
|
logger.info(f"Fetching portfolio data for {address}") |
|
eth_balance = await self._get_eth_balance(address) |
|
tokens = await self._get_token_holdings(address) |
|
nft_colls = await self._get_nft_holdings(address) |
|
|
|
return { |
|
"address": address, |
|
"last_updated": datetime.now().isoformat(), |
|
"eth_balance": float(eth_balance), |
|
"tokens": tokens, |
|
"nft_collections": nft_colls |
|
} |
|
|
|
async def _get_eth_balance(self, address: str) -> Decimal: |
|
params = { |
|
"module": "account", |
|
"action": "balance", |
|
"address": address, |
|
"tag": "latest" |
|
} |
|
data = await self._fetch_data(params) |
|
return Decimal(data["result"]) / Decimal("1e18") |
|
|
|
async def _get_token_holdings(self, address: str) -> List[Dict[str, Any]]: |
|
"""Fetch ERC-20 tokens for address.""" |
|
params = { |
|
"module": "account", |
|
"action": "tokentx", |
|
"address": address, |
|
"sort": "desc" |
|
} |
|
data = await self._fetch_data(params) |
|
|
|
token_map: Dict[str, Dict[str, Any]] = {} |
|
for tx in data.get("result", []): |
|
contract = tx["contractAddress"] |
|
if contract not in token_map: |
|
token_map[contract] = { |
|
"name": tx["tokenName"], |
|
"symbol": tx["tokenSymbol"], |
|
"decimals": int(tx["tokenDecimal"]), |
|
"balance": Decimal(0) |
|
} |
|
amount = Decimal(tx["value"]) / Decimal(10 ** token_map[contract]["decimals"]) |
|
if tx["to"].lower() == address.lower(): |
|
token_map[contract]["balance"] += amount |
|
elif tx["from"].lower() == address.lower(): |
|
token_map[contract]["balance"] -= amount |
|
|
|
return [ |
|
{ |
|
"name": v["name"], |
|
"symbol": v["symbol"], |
|
"balance": float(v["balance"]) |
|
} |
|
for v in token_map.values() if v["balance"] > 0 |
|
] |
|
|
|
async def _get_nft_holdings(self, address: str) -> Dict[str, Any]: |
|
""" |
|
Return { "collections": [ { "collection_name": str, "items": [{ contract, token_id }, ...] }, ... ] } |
|
so we can fetch images from OpenSea by contract + token_id. |
|
""" |
|
params = { |
|
"module": "account", |
|
"action": "tokennfttx", |
|
"address": address, |
|
"sort": "desc" |
|
} |
|
data = await self._fetch_data(params) |
|
|
|
if data.get("status") != "1" or "result" not in data: |
|
return {"collections": []} |
|
|
|
|
|
ownership_map = {} |
|
for tx in data["result"]: |
|
contract = tx["contractAddress"] |
|
coll_name = tx.get("tokenName", "Unknown Collection") |
|
token_id = tx["tokenID"] |
|
key = f"{contract}_{token_id}" |
|
|
|
if tx["to"].lower() == address.lower(): |
|
ownership_map[key] = { |
|
"contract": contract, |
|
"token_id": token_id, |
|
"collection_name": coll_name |
|
} |
|
elif tx["from"].lower() == address.lower(): |
|
if key in ownership_map: |
|
ownership_map.pop(key, None) |
|
|
|
|
|
coll_dict: Dict[str, List[Dict[str, str]]] = {} |
|
for item in ownership_map.values(): |
|
c_name = item["collection_name"] |
|
if c_name not in coll_dict: |
|
coll_dict[c_name] = [] |
|
coll_dict[c_name].append({ |
|
"contract": item["contract"], |
|
"token_id": item["token_id"] |
|
}) |
|
|
|
collections_out = [] |
|
for cname, items in coll_dict.items(): |
|
collections_out.append({ |
|
"collection_name": cname, |
|
"items": items |
|
}) |
|
|
|
return { "collections": collections_out } |
|
|
|
|
|
async def fetch_nft_metadata(opensea_key: str, contract: str, token_id: str) -> Dict[str, Any]: |
|
""" |
|
Fetch NFT metadata (including image_url) from OpenSea v2. |
|
Returns { "name": str, "image_url": str } or { "error": str } |
|
""" |
|
url = f"https://api.opensea.io/api/v2/chain/ethereum/contract/{contract}/nfts/{token_id}" |
|
headers = {"X-API-KEY": opensea_key} if opensea_key else {} |
|
async with aiohttp.ClientSession() as session: |
|
async with session.get(url, headers=headers) as resp: |
|
if resp.status == 403: |
|
return {"error": "403 Forbidden: OpenSea API key issue"} |
|
if resp.status == 404: |
|
return {"error": f"404 Not Found: {contract} #{token_id}"} |
|
try: |
|
data = await resp.json() |
|
except Exception as e: |
|
return {"error": f"OpenSea JSON parse error: {str(e)}"} |
|
|
|
nft_obj = data.get("nft", {}) |
|
name = nft_obj.get("name", f"NFT #{token_id}") |
|
image_url = nft_obj.get("image_url", "") |
|
return {"name": name, "image_url": image_url} |
|
|
|
async def fetch_image_as_pil(url: str) -> Optional[Image.Image]: |
|
""" |
|
Download an image from a URL and return as a PIL Image. |
|
""" |
|
if not url: |
|
return None |
|
async with aiohttp.ClientSession() as session: |
|
async with session.get(url) as resp: |
|
if resp.status != 200: |
|
return None |
|
raw_bytes = await resp.read() |
|
try: |
|
return Image.open(BytesIO(raw_bytes)) |
|
except Exception as e: |
|
logger.error(f"Error converting to PIL: {e}") |
|
return None |
|
|
|
|
|
class ChatInterface: |
|
"""Handles chat logic with Etherscan (wallet data), OpenSea (NFT images), and OpenAI (chat).""" |
|
|
|
def __init__(self, openai_key: str, etherscan_key: str, opensea_key: str): |
|
self.openai_key = openai_key |
|
self.etherscan_key = etherscan_key |
|
self.opensea_key = opensea_key |
|
self.context: Dict[str, Any] = {} |
|
openai.api_key = openai_key |
|
|
|
@staticmethod |
|
def _validate_api_keys(openai_key: str, etherscan_key: str, opensea_key: str) -> Tuple[bool, str]: |
|
""" |
|
Validate all keys. We'll do a minimal check for OpenSea (non-empty). |
|
""" |
|
try: |
|
|
|
client = openai.OpenAI(api_key=openai_key) |
|
client.chat.completions.create( |
|
model=Config.OPENAI_MODEL, |
|
messages=[{"role": "user", "content": "test"}], |
|
max_tokens=1 |
|
) |
|
|
|
|
|
async def check_etherscan(): |
|
async with WalletAnalyzer(etherscan_key) as analyzer: |
|
params = {"module": "stats", "action": "ethsupply"} |
|
await analyzer._fetch_data(params) |
|
asyncio.run(check_etherscan()) |
|
|
|
|
|
if not opensea_key.strip(): |
|
return False, "OpenSea API key is empty!" |
|
|
|
return True, "All API keys validated!" |
|
except Exception as e: |
|
return False, f"API key validation failed: {str(e)}" |
|
|
|
def _format_context_message(self) -> str: |
|
"""Format wallet data for GPT system prompt.""" |
|
lines = [] |
|
if not self.context: |
|
return "" |
|
lines.append("Current Wallet Data:") |
|
for addr, wdata in self.context.items(): |
|
lines.append(f"Wallet {addr[:8]}...{addr[-6:]}:") |
|
lines.append(f" ETH Balance: {wdata['eth_balance']:.4f}") |
|
lines.append(f" # of Tokens: {len(wdata['tokens'])}") |
|
|
|
nft_data = wdata["nft_collections"] |
|
if "collections" in nft_data: |
|
lines.append(" NFT Collections:") |
|
for c in nft_data["collections"]: |
|
lines.append(f" - {c['collection_name']}: {len(c['items'])} NFT(s)") |
|
return "\n".join(lines) |
|
|
|
async def process_message( |
|
self, |
|
message: str, |
|
history: Optional[ChatHistory] = None |
|
) -> Tuple[ChatHistory, Dict[str, Any], List[Image.Image]]: |
|
""" |
|
1) Detect Ethereum address |
|
2) Etherscan for wallet data |
|
3) For each NFT, fetch from OpenSea, convert to PIL |
|
4) Return images + chat response |
|
""" |
|
if history is None: |
|
history = [] |
|
|
|
if not message.strip(): |
|
return history, self.context, [] |
|
|
|
match = re.search(Config.ETHEREUM_ADDRESS_REGEX, message) |
|
nft_images: List[Image.Image] = [] |
|
|
|
if match: |
|
eth_address = match.group(0) |
|
|
|
partial_text = f"Analyzing {eth_address}..." |
|
history.append((message, partial_text)) |
|
|
|
try: |
|
|
|
async with WalletAnalyzer(self.etherscan_key) as analyzer: |
|
wallet_data = await analyzer.get_portfolio_data(eth_address) |
|
self.context[eth_address] = wallet_data |
|
|
|
|
|
lines = [ |
|
f"π Summary for {eth_address[:8]}...{eth_address[-6:]}", |
|
f"ETH: {wallet_data['eth_balance']:.4f}", |
|
f"Tokens: {len(wallet_data['tokens'])}" |
|
] |
|
nft_info = wallet_data["nft_collections"] |
|
total_nfts = 0 |
|
if "collections" in nft_info: |
|
for c in nft_info["collections"]: |
|
total_nfts += len(c["items"]) |
|
lines.append(f"NFTs: {total_nfts}") |
|
|
|
history.append((message, "\n".join(lines))) |
|
|
|
|
|
if "collections" in nft_info: |
|
for coll in nft_info["collections"][:2]: |
|
for item in coll["items"][:2]: |
|
|
|
meta = await fetch_nft_metadata(self.opensea_key, item["contract"], item["token_id"]) |
|
if "error" in meta: |
|
logger.warning(f"OpenSea metadata error: {meta['error']}") |
|
continue |
|
image_url = meta["image_url"] |
|
if not image_url: |
|
logger.info(f"No image for {meta['name']}") |
|
continue |
|
|
|
pil_img = await fetch_image_as_pil(image_url) |
|
if pil_img: |
|
nft_images.append(pil_img) |
|
|
|
found_msg = f"Found NFT image: {meta['name']} (contract {item['contract'][:8]}...{item['contract'][-6:]}, id={item['token_id']})" |
|
history.append((message, found_msg)) |
|
|
|
except Exception as e: |
|
err = f"Error analyzing {eth_address}: {str(e)}" |
|
logger.error(err) |
|
history.append((message, err)) |
|
|
|
|
|
try: |
|
context_str = self._format_context_message() |
|
|
|
limit = Config.HISTORY_LIMIT |
|
short_hist = history[-limit:] |
|
openai_msgs = [] |
|
for usr, ans in short_hist: |
|
openai_msgs.append({"role": "user", "content": usr}) |
|
openai_msgs.append({"role": "assistant", "content": ans}) |
|
|
|
|
|
openai.api_key = self.openai_key |
|
client = openai.OpenAI(api_key=self.openai_key) |
|
resp = client.chat.completions.create( |
|
model=Config.OPENAI_MODEL, |
|
messages=[ |
|
{"role": "system", "content": Config.SYSTEM_PROMPT}, |
|
{"role": "system", "content": context_str}, |
|
*openai_msgs, |
|
{"role": "user", "content": message} |
|
], |
|
temperature=Config.TEMPERATURE, |
|
max_tokens=Config.MAX_TOKENS |
|
) |
|
final_msg = resp.choices[0].message.content |
|
history.append((message, final_msg)) |
|
|
|
return history, self.context, nft_images |
|
|
|
except Exception as e: |
|
logger.error(f"OpenAI error: {e}") |
|
err_chat = f"OpenAI error: {str(e)}" |
|
history.append((message, err_chat)) |
|
return history, self.context, [] |
|
|
|
def clear_context(self) -> Tuple[Dict[str, Any], List[Tuple[str, str]]]: |
|
"""Clear the wallet context and chat.""" |
|
self.context = {} |
|
return {}, [] |
|
|
|
|
|
class GradioInterface: |
|
"""Manages the Gradio UI for Hugging Face Space, with top-right NFT gallery as PIL images.""" |
|
|
|
def __init__(self): |
|
self.chat_interface: Optional[ChatInterface] = None |
|
self.demo = self._create_interface() |
|
|
|
def _create_interface(self) -> gr.Blocks: |
|
with gr.Blocks(theme=gr.themes.Soft()) as demo: |
|
gr.Markdown(""" |
|
# π LOSS DOG: Blockchain Wallet Analyzer (NFT Images, top-right) |
|
|
|
- Enter your **OpenAI**, **Etherscan**, **OpenSea** keys below |
|
- Validate, then chat with your Ethereum address |
|
- NFT images will appear as **PIL** objects in the top-right |
|
""") |
|
|
|
with gr.Row(): |
|
openai_key = gr.Textbox( |
|
label="OpenAI API Key", |
|
type="password", |
|
placeholder="Enter your OpenAI API key..." |
|
) |
|
etherscan_key = gr.Textbox( |
|
label="Etherscan API Key", |
|
type="password", |
|
placeholder="Enter your Etherscan API key..." |
|
) |
|
opensea_key = gr.Textbox( |
|
label="OpenSea API Key", |
|
type="password", |
|
placeholder="Enter your OpenSea API key..." |
|
) |
|
|
|
validation_status = gr.Textbox(label="Validation Status", interactive=False) |
|
validate_btn = gr.Button("Validate API Keys", variant="primary") |
|
|
|
with gr.Row(): |
|
|
|
with gr.Column(scale=2): |
|
chatbot = gr.Chatbot(label="Chat History", height=420, value=[]) |
|
with gr.Row(): |
|
msg_input = gr.Textbox( |
|
label="Message", |
|
placeholder="Enter an ETH address or question..." |
|
) |
|
send_btn = gr.Button("Send", variant="primary") |
|
|
|
with gr.Column(scale=1): |
|
nft_gallery = gr.Gallery( |
|
label="NFT Images (Top-Right)", |
|
columns=2 |
|
) |
|
wallet_context = gr.JSON( |
|
label="Active Wallet Context", |
|
value={} |
|
) |
|
clear_btn = gr.Button("Clear Context") |
|
|
|
msg_input.interactive = False |
|
send_btn.interactive = False |
|
|
|
def validate_keys(openai_k: str, etherscan_k: str, opensea_k: str) -> Tuple[str, gr.update, gr.update]: |
|
"""Validate user-provided keys; enable chat if all pass.""" |
|
is_valid, msg = ChatInterface._validate_api_keys(openai_k, etherscan_k, opensea_k) |
|
if is_valid: |
|
self.chat_interface = ChatInterface(openai_k, etherscan_k, opensea_k) |
|
return ( |
|
f"β
{msg}", |
|
gr.update(interactive=True), |
|
gr.update(interactive=True) |
|
) |
|
else: |
|
return ( |
|
f"β {msg}", |
|
gr.update(interactive=False), |
|
gr.update(interactive=False) |
|
) |
|
|
|
validate_btn.click( |
|
fn=validate_keys, |
|
inputs=[openai_key, etherscan_key, opensea_key], |
|
outputs=[validation_status, msg_input, send_btn] |
|
) |
|
|
|
def clear_all(): |
|
"""Clear wallet context and chat.""" |
|
if self.chat_interface: |
|
return self.chat_interface.clear_context() |
|
return {}, [] |
|
|
|
clear_btn.click( |
|
fn=clear_all, |
|
inputs=[], |
|
outputs=[wallet_context, chatbot] |
|
) |
|
|
|
async def handle_message( |
|
message: str, |
|
chat_hist: List[Tuple[str, str]], |
|
context: Dict[str, Any] |
|
) -> Tuple[List[Tuple[str, str]], Dict[str, Any], List[Image.Image]]: |
|
"""Process user input, return updated chat, context, list of PIL images.""" |
|
if not self.chat_interface: |
|
return [], {}, [] |
|
|
|
try: |
|
new_history, new_context, images = await self.chat_interface.process_message(message, chat_hist) |
|
return new_history, new_context, images |
|
except Exception as e: |
|
logger.error(f"Error in handle_message: {e}") |
|
if chat_hist is None: |
|
chat_hist = [] |
|
chat_hist.append((message, f"Error: {str(e)}")) |
|
return chat_hist, context, [] |
|
|
|
|
|
msg_input.submit( |
|
fn=handle_message, |
|
inputs=[msg_input, chatbot, wallet_context], |
|
outputs=[chatbot, wallet_context, nft_gallery] |
|
).then( |
|
lambda: gr.update(value=""), |
|
None, |
|
[msg_input] |
|
) |
|
|
|
send_btn.click( |
|
fn=handle_message, |
|
inputs=[msg_input, chatbot, wallet_context], |
|
outputs=[chatbot, wallet_context, nft_gallery] |
|
).then( |
|
lambda: gr.update(value=""), |
|
None, |
|
[msg_input] |
|
) |
|
|
|
return demo |
|
|
|
def launch(self): |
|
self.demo.queue() |
|
self.demo.launch() |
|
|
|
def main(): |
|
"""Main entry point for Hugging Face Space.""" |
|
logger.info("Launching LOSS DOG with PIL-based NFT images (top-right).") |
|
interface = GradioInterface() |
|
interface.launch() |
|
|
|
if __name__ == "__main__": |
|
main() |