|
import logging |
|
from typing import Optional |
|
from flask import Flask, jsonify, request, abort, render_template |
|
import json |
|
import random |
|
import string |
|
import datetime |
|
from webscout import WEBS |
|
from functools import wraps |
|
import requests |
|
|
|
app = Flask(__name__) |
|
|
|
TIMEOUT = 10 |
|
PROXY = None |
|
|
|
PRICING_PLANS = { |
|
'free': { |
|
'name': 'Free Plan', |
|
'price': '$0/month', |
|
'rate_limit': 1000 |
|
}, |
|
'pro': { |
|
'name': 'Pro Plan', |
|
'price': 'Coming Soon', |
|
'rate_limit': None |
|
} |
|
} |
|
|
|
|
|
def generate_api_key(username): |
|
"""Generate a new API key starting with 'HUAI' and including the user's name and the current date.""" |
|
current_date = datetime.datetime.now().strftime("%Y%m%d") |
|
return 'HUAI' + username + current_date + ''.join(random.choices(string.ascii_uppercase + string.digits, k=5)) |
|
|
|
|
|
def validate_api_key(api_key): |
|
"""Validate an API key against the stored keys.""" |
|
with open('api_keys.json', 'r') as file: |
|
api_keys = json.load(file) |
|
return api_key in api_keys.values() |
|
|
|
|
|
def require_api_key(view_function): |
|
@wraps(view_function) |
|
def decorated_function(*args, **kwargs): |
|
|
|
api_key = request.headers.get('HUSI') |
|
|
|
|
|
if not api_key: |
|
api_key = request.args.get('HUAI') |
|
|
|
if not validate_api_key(api_key): |
|
abort(401) |
|
return view_function(*args, **kwargs) |
|
return decorated_function |
|
|
|
|
|
|
|
|
|
@app.route('/api/search', methods=['GET']) |
|
@require_api_key |
|
def search_text(): |
|
|
|
query = request.args.get('q', '') |
|
max_results = request.args.get('max_results', 10, type=int) |
|
timelimit = request.args.get('timelimit', None) |
|
safesearch = request.args.get('safesearch', 'moderate') |
|
region = request.args.get('region', 'wt-wt') |
|
WEBS_instance = WEBS() |
|
results = [] |
|
with WEBS() as webs: |
|
for result in enumerate(WEBS_instance.text(query, max_results=max_results, timelimit=timelimit, safesearch=safesearch, region=region)): |
|
results.append(result) |
|
return jsonify({'results': results}) |
|
|
|
@app.route('/api/images', methods=['GET']) |
|
@require_api_key |
|
def search_images(): |
|
query = request.args.get('q', '') |
|
max_results = request.args.get('max_results', 10, type=int) |
|
safesearch = request.args.get('safesearch', 'moderate') |
|
region = request.args.get('region', 'wt-wt') |
|
WEBS_instance = WEBS() |
|
results = [] |
|
with WEBS() as webs: |
|
for result in enumerate(WEBS_instance.images(query, max_results=max_results, safesearch=safesearch, region=region)): |
|
results.append(result) |
|
return jsonify({'results': results}) |
|
|
|
@app.route('/api/videos', methods=['GET']) |
|
@require_api_key |
|
def search_videos(): |
|
query = request.args.get('q', '') |
|
max_results = request.args.get('max_results', 10, type=int) |
|
safesearch = request.args.get('safesearch', 'moderate') |
|
region = request.args.get('region', 'wt-wt') |
|
timelimit = request.args.get('timelimit', None) |
|
resolution = request.args.get('resolution', None) |
|
duration = request.args.get('duration', None) |
|
WEBS_instance = WEBS() |
|
results = [] |
|
with WEBS() as webs: |
|
for result in enumerate(WEBS_instance.videos(query, max_results=max_results, safesearch=safesearch, region=region, timelimit=timelimit, resolution=resolution, duration=duration)): |
|
results.append(result) |
|
return jsonify({'results': results}) |
|
|
|
@app.route('/api/news', methods=['GET']) |
|
@require_api_key |
|
def search_news(): |
|
query = request.args.get('q', '') |
|
max_results = request.args.get('max_results', 10, type=int) |
|
safesearch = request.args.get('safesearch', 'moderate') |
|
region = request.args.get('region', 'wt-wt') |
|
timelimit = request.args.get('timelimit', None) |
|
WEBS_instance = WEBS() |
|
results = [] |
|
with WEBS() as webs: |
|
for result in enumerate(WEBS_instance.news(query, max_results=max_results, safesearch=safesearch, region=region, timelimit=timelimit)): |
|
results.append(result) |
|
return jsonify({'results': results}) |
|
|
|
@app.route('/api/maps', methods=['GET']) |
|
@require_api_key |
|
def search_maps(): |
|
query = request.args.get('q', '') |
|
place = request.args.get('place', None) |
|
max_results = request.args.get('max_results', 10, type=int) |
|
WEBS_instance = WEBS() |
|
results = [] |
|
with WEBS() as webs: |
|
for result in enumerate(WEBS_instance.maps(query, place=place, max_results=max_results)): |
|
results.append(result) |
|
return jsonify({'results': results}) |
|
|
|
@app.route('/api/translate', methods=['GET']) |
|
@require_api_key |
|
def translate_text(): |
|
query = request.args.get('q', '') |
|
to_lang = request.args.get('to', 'en') |
|
WEBS_instance = WEBS() |
|
with WEBS() as webs: |
|
translation = enumerate(WEBS_instance.translate(query, to=to_lang)) |
|
return jsonify({'translation': translation}) |
|
|
|
@app.route('/api/suggestions', methods=['GET']) |
|
@require_api_key |
|
def search_suggestions(): |
|
query = request.args.get('q', '') |
|
if not query: |
|
return jsonify({'error': 'Query parameter missing'}) |
|
results = [] |
|
try: |
|
with WEBS() as webs: |
|
for result in webs.suggestions(query): |
|
results.append(result) |
|
except Exception as e: |
|
return jsonify({'error': str(e)}), 500 |
|
return jsonify({'results': results}) |
|
|
|
@app.route('/api/health', methods=['GET']) |
|
@require_api_key |
|
def health_check(): |
|
return jsonify({'status': 'working'}) |
|
|
|
import json |
|
from flask import jsonify, request, abort |
|
|
|
@app.route('/pricing', methods=['GET']) |
|
def pricing(): |
|
return render_template('pricing.html', plans=PRICING_PLANS) |
|
|
|
|
|
@app.route('/generate_key', methods=['GET', 'POST']) |
|
def generate_key(): |
|
if request.method == 'POST': |
|
username = request.form['username'] |
|
plan = request.form['plan'] |
|
if plan not in PRICING_PLANS: |
|
return jsonify({'error': 'Invalid plan'}), 400 |
|
if plan == 'free' and PRICING_PLANS['free']['rate_limit'] == 0: |
|
return jsonify({'error': 'Free plan is out of limits'}), 400 |
|
|
|
|
|
with open('api_keys.json', 'r') as file: |
|
try: |
|
api_keys = json.load(file) |
|
except json.JSONDecodeError: |
|
|
|
api_keys = {} |
|
|
|
if username in api_keys: |
|
return jsonify({'api_key': api_keys[username]}) |
|
|
|
|
|
new_key = generate_api_key(username) |
|
try: |
|
with open('api_keys.json', 'w') as file: |
|
api_keys[username] = new_key |
|
json.dump(api_keys, file, indent=4) |
|
return jsonify({'api_key': new_key}) |
|
except Exception as e: |
|
|
|
return jsonify({'error': str(e)}), 500 |
|
else: |
|
return render_template('index.html', plans=PRICING_PLANS) |
|
if __name__ == '__main__': |
|
def get_public_ip(): |
|
try: |
|
response = requests.get('https://api.ipify.org/?format=json') |
|
response.raise_for_status() |
|
data = response.json() |
|
return data['ip'] |
|
except requests.exceptions.RequestException as e: |
|
print(f"An error occurred: {e}") |
|
return None |
|
except Exception as e: |
|
print(f"An unexpected error occurred: {e}") |
|
return None |
|
|
|
|
|
public_ip = get_public_ip() or 'Fallback_IP' |
|
|
|
if public_ip: |
|
print(f"Public IP: {public_ip}") |
|
else: |
|
print("Failed to retrieve public IP. Using fallback IP.") |
|
|
|
app.run(debug=True) |
|
|