import requests import json import os import random import time from typing import Any, Dict, Optional, Generator, Union, List import warnings from fake_useragent import UserAgent from requests_ip_rotator import ApiGateway from proxy_finder import ProxyFinder class IPRotator: """Manages AWS API Gateway rotation for multiple regions""" def __init__(self, target_url="deepinfra.com", regions=None): """Initialize with target URL and regions""" self.target_url = target_url self.regions = regions or ["us-east-1", "us-west-1", "eu-west-1", "ap-southeast-1"] self.gateways = {} def setup(self): """Set up API gateways for each region""" for region in self.regions: try: gateway = ApiGateway(self.target_url, region=region) gateway.start() self.gateways[region] = gateway except Exception as e: print(f"Failed to set up gateway in {region}: {str(e)}") if not self.gateways: raise Exception("Failed to set up any API gateways for IP rotation") def get_session(self): """Get a random session from a gateway""" if not self.gateways: return requests.Session() # Choose a random gateway region = random.choice(list(self.gateways.keys())) gateway = self.gateways[region] return gateway.get_session() def shutdown(self): """Clean up all gateways""" for gateway in self.gateways.values(): try: gateway.shutdown() except: pass class ProxyManager: """Manages proxy rotation for HTTP requests""" def __init__(self, proxies=None): """Initialize with a list of proxies or an empty list""" self.proxies = proxies or [] def add_proxy(self, proxy): """Add a proxy to the list""" self.proxies.append(proxy) def get_random(self): """Return a random proxy if available, otherwise None""" if not self.proxies: return None return random.choice(self.proxies) class DeepInfraClient: """ A client for DeepInfra API with OpenAI-compatible interface and enhanced features """ AVAILABLE_MODELS = [ "deepseek-ai/DeepSeek-R1-Turbo", "deepseek-ai/DeepSeek-R1", "deepseek-ai/DeepSeek-R1-Distill-Llama-70B", "deepseek-ai/DeepSeek-V3", "meta-llama/Llama-3.3-70B-Instruct-Turbo", "mistralai/Mistral-Small-24B-Instruct-2501", "deepseek-ai/DeepSeek-R1-Distill-Qwen-32B", "microsoft/phi-4", "meta-llama/Meta-Llama-3.1-70B-Instruct", "meta-llama/Meta-Llama-3.1-8B-Instruct", "meta-llama/Meta-Llama-3.1-405B-Instruct", "meta-llama/Meta-Llama-3.1-8B-Instruct-Turbo", "meta-llama/Meta-Llama-3.1-70B-Instruct-Turbo", "Qwen/Qwen2.5-Coder-32B-Instruct", "nvidia/Llama-3.1-Nemotron-70B-Instruct", "Qwen/Qwen2.5-72B-Instruct", "meta-llama/Llama-3.2-90B-Vision-Instruct", "meta-llama/Llama-3.2-11B-Vision-Instruct", "Gryphe/MythoMax-L2-13b", "NousResearch/Hermes-3-Llama-3.1-405B", "NovaSky-AI/Sky-T1-32B-Preview", "Qwen/Qwen2.5-7B-Instruct", "Sao10K/L3.1-70B-Euryale-v2.2", "Sao10K/L3.3-70B-Euryale-v2.3", "google/gemma-2-27b-it", "google/gemma-2-9b-it", "meta-llama/Llama-3.2-1B-Instruct", "meta-llama/Llama-3.2-3B-Instruct", "meta-llama/Meta-Llama-3-70B-Instruct", "meta-llama/Meta-Llama-3-8B-Instruct", "mistralai/Mistral-Nemo-Instruct-2407", "mistralai/Mistral-7B-Instruct-v0.3", "mistralai/Mixtral-8x7B-Instruct-v0.1" ] def __init__( self, api_key: Optional[str] = None, base_url: str = "https://api.deepinfra.com/v1", timeout: int = 30, max_tokens: int = 2049, model: str = "meta-llama/Llama-3.3-70B-Instruct-Turbo", use_random_user_agent: bool = True, use_proxy_rotation: bool = True, use_ip_rotation: bool = True, proxy_types: List[str] = None ): """Initialize the DeepInfraClient""" self.base_url = base_url self.api_key = api_key self.model = model self.timeout = timeout self.max_tokens = max_tokens self.use_random_user_agent = use_random_user_agent self.use_ip_rotation = use_ip_rotation self.use_proxy_rotation = use_proxy_rotation self.proxy_types = proxy_types or ['http', 'socks5'] # Default proxy types # Initialize user agent generator self.user_agent = UserAgent() # Set up proxy finder and get initial proxies if proxy rotation is enabled self.proxy_finder = None if self.use_proxy_rotation: self.proxy_finder = ProxyFinder(verbose=False) self.proxy_finder.get_proxies(self.proxy_types) # Set up IP rotator if enabled self.ip_rotator = None if use_ip_rotation: try: self.ip_rotator = IPRotator(target_url="deepinfra.com") self.ip_rotator.setup() except Exception as e: print(f"Failed to set up IP rotation: {e}. Continuing without IP rotation.") self.ip_rotator = None # Set up headers with random or fixed user agent self.headers = self._create_headers() # Initialize session based on available rotation methods if self.use_ip_rotation and self.ip_rotator: self.session = self.ip_rotator.get_session() else: self.session = requests.Session() self.session.headers.update(self.headers) # Apply proxy if proxy rotation is enabled if self.use_proxy_rotation and self.proxy_finder: self._apply_random_proxy() # Resources self.models = Models(self) self.chat = ChatCompletions(self) self.completions = Completions(self) def _create_headers(self) -> Dict[str, str]: """Create headers for the HTTP request, optionally with a random user agent""" user_agent = self.user_agent.random if self.use_random_user_agent else \ 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36' headers = { 'User-Agent': user_agent, 'Accept-Language': 'en-US,en;q=0.9', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Content-Type': 'application/json', 'Origin': 'https://deepinfra.com', 'Referer': 'https://deepinfra.com/', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-site', 'X-Deepinfra-Source': 'web-embed', 'accept': 'text/event-stream', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"macOS"' } if self.api_key: headers['Authorization'] = f"Bearer {self.api_key}" return headers def _apply_random_proxy(self): """Apply a random proxy to the current session""" if not self.proxy_finder: return False # First try to get a proxy of preferred type (http/https first, then socks5) for proxy_type in self.proxy_types: proxy = self.proxy_finder.get_random_proxy(proxy_type) if proxy: if proxy_type in ['http', 'https']: self.session.proxies.update({ "http": f"http://{proxy}", "https": f"http://{proxy}" }) return True elif proxy_type == 'socks4': self.session.proxies.update({ "http": f"socks4://{proxy}", "https": f"socks4://{proxy}" }) return True elif proxy_type == 'socks5': self.session.proxies.update({ "http": f"socks5://{proxy}", "https": f"socks5://{proxy}" }) return True # If no proxy found, return False return False def refresh_session(self): """Refresh the session with new headers and possibly a new proxy or IP""" if self.use_random_user_agent: self.headers['User-Agent'] = self.user_agent.random # Apply a random proxy if proxy rotation is enabled if self.use_proxy_rotation and self.proxy_finder: proxy_applied = self._apply_random_proxy() # If no proxy was applied, try to get new proxies if not proxy_applied: self.proxy_finder.get_proxies(self.proxy_types) self._apply_random_proxy() # Rotate IP if enabled if self.use_ip_rotation and self.ip_rotator: self.session = self.ip_rotator.get_session() self.session.headers.update(self.headers) def _request(self, method: str, endpoint: str, **kwargs) -> requests.Response: """Make an HTTP request with automatic retry and proxy/user-agent rotation""" url = f"{self.base_url}/{endpoint.lstrip('/')}" max_retries = 3 retry_delay = 1 for attempt in range(max_retries): try: response = self.session.request(method, url, **kwargs) response.raise_for_status() return response except requests.RequestException as e: if attempt < max_retries - 1: self.refresh_session() time.sleep(retry_delay * (attempt + 1)) continue raise e def refresh_proxies(self): """Refresh all proxies by fetching new ones""" if self.proxy_finder: self.proxy_finder.get_proxies(self.proxy_types) self._apply_random_proxy() return True return False def __del__(self): """Clean up resources on deletion""" if self.ip_rotator: try: self.ip_rotator.shutdown() except: pass class Models: def __init__(self, client: DeepInfraClient): self.client = client def list(self) -> Dict[str, Any]: """Get available models, similar to OpenAI's /v1/models endpoint""" model_data = [] for model_id in self.client.AVAILABLE_MODELS: model_data.append({ "id": model_id, "object": "model", "created": 1677610602, "owned_by": "deepinfra" }) return { "object": "list", "data": model_data } class ChatCompletions: def __init__(self, client: DeepInfraClient): self.client = client def create( self, messages: List[Dict[str, str]], model: str = None, temperature: float = 0.7, max_tokens: int = None, stream: bool = False, **kwargs ) -> Union[Dict[str, Any], Generator[Dict[str, Any], None, None]]: """Create a chat completion, similar to OpenAI's chat/completions endpoint""" model = model or self.client.model max_tokens = max_tokens or self.client.max_tokens url = "openai/chat/completions" # Prepare the payload for the API request payload = { "model": model, "messages": messages, "temperature": temperature, "max_tokens": max_tokens, "stream": stream } # Add any additional parameters payload.update({k: v for k, v in kwargs.items() if v is not None}) if stream: return self._handle_stream(url, payload) else: return self._handle_request(url, payload) def _handle_request(self, url: str, payload: Dict[str, Any]) -> Dict[str, Any]: """Handle non-streaming requests""" try: response = self.client._request( "POST", url, json=payload, timeout=self.client.timeout ) return response.json() except requests.RequestException as e: error_message = f"Request failed: {str(e)}" if hasattr(e, 'response') and e.response is not None: try: error_data = e.response.json() if 'error' in error_data: error_message = f"API error: {error_data['error']}" except: error_message = f"API error: {e.response.text}" raise Exception(error_message) def _handle_stream(self, url: str, payload: Dict[str, Any]) -> Generator[Dict[str, Any], None, None]: """Handle streaming requests""" try: response = self.client._request( "POST", url, json=payload, stream=True, timeout=self.client.timeout ) for line in response.iter_lines(decode_unicode=True): if line: line = line.strip() if line.startswith("data: "): json_str = line[6:] if json_str == "[DONE]": break try: json_data = json.loads(json_str) yield json_data except json.JSONDecodeError: continue except requests.RequestException as e: error_message = f"Stream request failed: {str(e)}" if hasattr(e, 'response') and e.response is not None: try: error_data = e.response.json() if 'error' in error_data: error_message = f"API error: {error_data['error']}" except: error_message = f"API error: {e.response.text}" raise Exception(error_message) class Completions: def __init__(self, client: DeepInfraClient): self.client = client def create( self, prompt: str, model: str = None, temperature: float = 0.7, max_tokens: int = None, stream: bool = False, **kwargs ) -> Union[Dict[str, Any], Generator[Dict[str, Any], None, None]]: """Create a completion, similar to OpenAI's completions endpoint""" # Convert prompt to messages format for chat models messages = [{"role": "user", "content": prompt}] return self.client.chat.create( messages=messages, model=model, temperature=temperature, max_tokens=max_tokens, stream=stream, **kwargs ) if __name__ == "__main__": import time from rich import print # Example with random user agent, proxy rotation and IP rotation client = DeepInfraClient( use_random_user_agent=True, use_ip_rotation=True, use_proxy_rotation=True, proxy_types=['http', 'socks5'] ) # Get available models models_response = client.models.list() print("Available models:") for model in models_response["data"][:5]: # Print first 5 models print(f"- {model['id']}") print("...") # Non-streaming chat completion chat_response = client.chat.create( messages=[ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "Write a short poem about AI"} ] ) print("\nNon-streaming response:") print(chat_response["choices"][0]["message"]["content"]) # Refresh proxies and try again with another request print("\nRefreshing proxies and making another request...") client.refresh_proxies() client.refresh_session() streaming_response = client.chat.create( messages=[ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "Tell me about the future of AI in 3 sentences."} ], stream=True ) print("\nStreaming response:") full_response = "" for chunk in streaming_response: if 'choices' in chunk and len(chunk['choices']) > 0: delta = chunk['choices'][0].get('delta', {}) if 'content' in delta: content = delta['content'] print(content, end='', flush=True) full_response += content print("\n")