PyscoutAI / deepinfra_client.py
PyScoutAI's picture
Upload 15 files
ead2510 verified
import requests
import json
import os
import random
import time
from typing import Any, Dict, Optional, Generator, Union, List
import warnings
from fake_useragent import UserAgent
from requests_ip_rotator import ApiGateway
from proxy_finder import ProxyFinder
class IPRotator:
"""Manages AWS API Gateway rotation for multiple regions"""
def __init__(self, target_url="deepinfra.com", regions=None):
"""Initialize with target URL and regions"""
self.target_url = target_url
self.regions = regions or ["us-east-1", "us-west-1", "eu-west-1", "ap-southeast-1"]
self.gateways = {}
def setup(self):
"""Set up API gateways for each region"""
for region in self.regions:
try:
gateway = ApiGateway(self.target_url, region=region)
gateway.start()
self.gateways[region] = gateway
except Exception as e:
print(f"Failed to set up gateway in {region}: {str(e)}")
if not self.gateways:
raise Exception("Failed to set up any API gateways for IP rotation")
def get_session(self):
"""Get a random session from a gateway"""
if not self.gateways:
return requests.Session()
# Choose a random gateway
region = random.choice(list(self.gateways.keys()))
gateway = self.gateways[region]
return gateway.get_session()
def shutdown(self):
"""Clean up all gateways"""
for gateway in self.gateways.values():
try:
gateway.shutdown()
except:
pass
class ProxyManager:
"""Manages proxy rotation for HTTP requests"""
def __init__(self, proxies=None):
"""Initialize with a list of proxies or an empty list"""
self.proxies = proxies or []
def add_proxy(self, proxy):
"""Add a proxy to the list"""
self.proxies.append(proxy)
def get_random(self):
"""Return a random proxy if available, otherwise None"""
if not self.proxies:
return None
return random.choice(self.proxies)
class DeepInfraClient:
"""
A client for DeepInfra API with OpenAI-compatible interface and enhanced features
"""
AVAILABLE_MODELS = [
"deepseek-ai/DeepSeek-R1-Turbo",
"deepseek-ai/DeepSeek-R1",
"deepseek-ai/DeepSeek-R1-Distill-Llama-70B",
"deepseek-ai/DeepSeek-V3",
"meta-llama/Llama-3.3-70B-Instruct-Turbo",
"mistralai/Mistral-Small-24B-Instruct-2501",
"deepseek-ai/DeepSeek-R1-Distill-Qwen-32B",
"microsoft/phi-4",
"meta-llama/Meta-Llama-3.1-70B-Instruct",
"meta-llama/Meta-Llama-3.1-8B-Instruct",
"meta-llama/Meta-Llama-3.1-405B-Instruct",
"meta-llama/Meta-Llama-3.1-8B-Instruct-Turbo",
"meta-llama/Meta-Llama-3.1-70B-Instruct-Turbo",
"Qwen/Qwen2.5-Coder-32B-Instruct",
"nvidia/Llama-3.1-Nemotron-70B-Instruct",
"Qwen/Qwen2.5-72B-Instruct",
"meta-llama/Llama-3.2-90B-Vision-Instruct",
"meta-llama/Llama-3.2-11B-Vision-Instruct",
"Gryphe/MythoMax-L2-13b",
"NousResearch/Hermes-3-Llama-3.1-405B",
"NovaSky-AI/Sky-T1-32B-Preview",
"Qwen/Qwen2.5-7B-Instruct",
"Sao10K/L3.1-70B-Euryale-v2.2",
"Sao10K/L3.3-70B-Euryale-v2.3",
"google/gemma-2-27b-it",
"google/gemma-2-9b-it",
"meta-llama/Llama-3.2-1B-Instruct",
"meta-llama/Llama-3.2-3B-Instruct",
"meta-llama/Meta-Llama-3-70B-Instruct",
"meta-llama/Meta-Llama-3-8B-Instruct",
"mistralai/Mistral-Nemo-Instruct-2407",
"mistralai/Mistral-7B-Instruct-v0.3",
"mistralai/Mixtral-8x7B-Instruct-v0.1"
]
def __init__(
self,
api_key: Optional[str] = None,
base_url: str = "https://api.deepinfra.com/v1",
timeout: int = 30,
max_tokens: int = 2049,
model: str = "meta-llama/Llama-3.3-70B-Instruct-Turbo",
use_random_user_agent: bool = True,
use_proxy_rotation: bool = True,
use_ip_rotation: bool = True,
proxy_types: List[str] = None
):
"""Initialize the DeepInfraClient"""
self.base_url = base_url
self.api_key = api_key
self.model = model
self.timeout = timeout
self.max_tokens = max_tokens
self.use_random_user_agent = use_random_user_agent
self.use_ip_rotation = use_ip_rotation
self.use_proxy_rotation = use_proxy_rotation
self.proxy_types = proxy_types or ['http', 'socks5'] # Default proxy types
# Initialize user agent generator
self.user_agent = UserAgent()
# Set up proxy finder and get initial proxies if proxy rotation is enabled
self.proxy_finder = None
if self.use_proxy_rotation:
self.proxy_finder = ProxyFinder(verbose=False)
self.proxy_finder.get_proxies(self.proxy_types)
# Set up IP rotator if enabled
self.ip_rotator = None
if use_ip_rotation:
try:
self.ip_rotator = IPRotator(target_url="deepinfra.com")
self.ip_rotator.setup()
except Exception as e:
print(f"Failed to set up IP rotation: {e}. Continuing without IP rotation.")
self.ip_rotator = None
# Set up headers with random or fixed user agent
self.headers = self._create_headers()
# Initialize session based on available rotation methods
if self.use_ip_rotation and self.ip_rotator:
self.session = self.ip_rotator.get_session()
else:
self.session = requests.Session()
self.session.headers.update(self.headers)
# Apply proxy if proxy rotation is enabled
if self.use_proxy_rotation and self.proxy_finder:
self._apply_random_proxy()
# Resources
self.models = Models(self)
self.chat = ChatCompletions(self)
self.completions = Completions(self)
def _create_headers(self) -> Dict[str, str]:
"""Create headers for the HTTP request, optionally with a random user agent"""
user_agent = self.user_agent.random if self.use_random_user_agent else \
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36'
headers = {
'User-Agent': user_agent,
'Accept-Language': 'en-US,en;q=0.9',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Content-Type': 'application/json',
'Origin': 'https://deepinfra.com',
'Referer': 'https://deepinfra.com/',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-site',
'X-Deepinfra-Source': 'web-embed',
'accept': 'text/event-stream',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"macOS"'
}
if self.api_key:
headers['Authorization'] = f"Bearer {self.api_key}"
return headers
def _apply_random_proxy(self):
"""Apply a random proxy to the current session"""
if not self.proxy_finder:
return False
# First try to get a proxy of preferred type (http/https first, then socks5)
for proxy_type in self.proxy_types:
proxy = self.proxy_finder.get_random_proxy(proxy_type)
if proxy:
if proxy_type in ['http', 'https']:
self.session.proxies.update({
"http": f"http://{proxy}",
"https": f"http://{proxy}"
})
return True
elif proxy_type == 'socks4':
self.session.proxies.update({
"http": f"socks4://{proxy}",
"https": f"socks4://{proxy}"
})
return True
elif proxy_type == 'socks5':
self.session.proxies.update({
"http": f"socks5://{proxy}",
"https": f"socks5://{proxy}"
})
return True
# If no proxy found, return False
return False
def refresh_session(self):
"""Refresh the session with new headers and possibly a new proxy or IP"""
if self.use_random_user_agent:
self.headers['User-Agent'] = self.user_agent.random
# Apply a random proxy if proxy rotation is enabled
if self.use_proxy_rotation and self.proxy_finder:
proxy_applied = self._apply_random_proxy()
# If no proxy was applied, try to get new proxies
if not proxy_applied:
self.proxy_finder.get_proxies(self.proxy_types)
self._apply_random_proxy()
# Rotate IP if enabled
if self.use_ip_rotation and self.ip_rotator:
self.session = self.ip_rotator.get_session()
self.session.headers.update(self.headers)
def _request(self, method: str, endpoint: str, **kwargs) -> requests.Response:
"""Make an HTTP request with automatic retry and proxy/user-agent rotation"""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
max_retries = 3
retry_delay = 1
for attempt in range(max_retries):
try:
response = self.session.request(method, url, **kwargs)
response.raise_for_status()
return response
except requests.RequestException as e:
if attempt < max_retries - 1:
self.refresh_session()
time.sleep(retry_delay * (attempt + 1))
continue
raise e
def refresh_proxies(self):
"""Refresh all proxies by fetching new ones"""
if self.proxy_finder:
self.proxy_finder.get_proxies(self.proxy_types)
self._apply_random_proxy()
return True
return False
def __del__(self):
"""Clean up resources on deletion"""
if self.ip_rotator:
try:
self.ip_rotator.shutdown()
except:
pass
class Models:
def __init__(self, client: DeepInfraClient):
self.client = client
def list(self) -> Dict[str, Any]:
"""Get available models, similar to OpenAI's /v1/models endpoint"""
model_data = []
for model_id in self.client.AVAILABLE_MODELS:
model_data.append({
"id": model_id,
"object": "model",
"created": 1677610602,
"owned_by": "deepinfra"
})
return {
"object": "list",
"data": model_data
}
class ChatCompletions:
def __init__(self, client: DeepInfraClient):
self.client = client
def create(
self,
messages: List[Dict[str, str]],
model: str = None,
temperature: float = 0.7,
max_tokens: int = None,
stream: bool = False,
**kwargs
) -> Union[Dict[str, Any], Generator[Dict[str, Any], None, None]]:
"""Create a chat completion, similar to OpenAI's chat/completions endpoint"""
model = model or self.client.model
max_tokens = max_tokens or self.client.max_tokens
url = "openai/chat/completions"
# Prepare the payload for the API request
payload = {
"model": model,
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens,
"stream": stream
}
# Add any additional parameters
payload.update({k: v for k, v in kwargs.items() if v is not None})
if stream:
return self._handle_stream(url, payload)
else:
return self._handle_request(url, payload)
def _handle_request(self, url: str, payload: Dict[str, Any]) -> Dict[str, Any]:
"""Handle non-streaming requests"""
try:
response = self.client._request(
"POST",
url,
json=payload,
timeout=self.client.timeout
)
return response.json()
except requests.RequestException as e:
error_message = f"Request failed: {str(e)}"
if hasattr(e, 'response') and e.response is not None:
try:
error_data = e.response.json()
if 'error' in error_data:
error_message = f"API error: {error_data['error']}"
except:
error_message = f"API error: {e.response.text}"
raise Exception(error_message)
def _handle_stream(self, url: str, payload: Dict[str, Any]) -> Generator[Dict[str, Any], None, None]:
"""Handle streaming requests"""
try:
response = self.client._request(
"POST",
url,
json=payload,
stream=True,
timeout=self.client.timeout
)
for line in response.iter_lines(decode_unicode=True):
if line:
line = line.strip()
if line.startswith("data: "):
json_str = line[6:]
if json_str == "[DONE]":
break
try:
json_data = json.loads(json_str)
yield json_data
except json.JSONDecodeError:
continue
except requests.RequestException as e:
error_message = f"Stream request failed: {str(e)}"
if hasattr(e, 'response') and e.response is not None:
try:
error_data = e.response.json()
if 'error' in error_data:
error_message = f"API error: {error_data['error']}"
except:
error_message = f"API error: {e.response.text}"
raise Exception(error_message)
class Completions:
def __init__(self, client: DeepInfraClient):
self.client = client
def create(
self,
prompt: str,
model: str = None,
temperature: float = 0.7,
max_tokens: int = None,
stream: bool = False,
**kwargs
) -> Union[Dict[str, Any], Generator[Dict[str, Any], None, None]]:
"""Create a completion, similar to OpenAI's completions endpoint"""
# Convert prompt to messages format for chat models
messages = [{"role": "user", "content": prompt}]
return self.client.chat.create(
messages=messages,
model=model,
temperature=temperature,
max_tokens=max_tokens,
stream=stream,
**kwargs
)
if __name__ == "__main__":
import time
from rich import print
# Example with random user agent, proxy rotation and IP rotation
client = DeepInfraClient(
use_random_user_agent=True,
use_ip_rotation=True,
use_proxy_rotation=True,
proxy_types=['http', 'socks5']
)
# Get available models
models_response = client.models.list()
print("Available models:")
for model in models_response["data"][:5]: # Print first 5 models
print(f"- {model['id']}")
print("...")
# Non-streaming chat completion
chat_response = client.chat.create(
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Write a short poem about AI"}
]
)
print("\nNon-streaming response:")
print(chat_response["choices"][0]["message"]["content"])
# Refresh proxies and try again with another request
print("\nRefreshing proxies and making another request...")
client.refresh_proxies()
client.refresh_session()
streaming_response = client.chat.create(
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Tell me about the future of AI in 3 sentences."}
],
stream=True
)
print("\nStreaming response:")
full_response = ""
for chunk in streaming_response:
if 'choices' in chunk and len(chunk['choices']) > 0:
delta = chunk['choices'][0].get('delta', {})
if 'content' in delta:
content = delta['content']
print(content, end='', flush=True)
full_response += content
print("\n")