|
from flask import Flask, request, jsonify, render_template, send_from_directory |
|
import os |
|
import subprocess |
|
import tempfile |
|
import shutil |
|
import sys |
|
import google.generativeai as genai |
|
import re |
|
|
|
app = Flask(__name__) |
|
|
|
|
|
temp_dir = tempfile.mkdtemp() |
|
current_dir = temp_dir |
|
|
|
|
|
genai.configure(api_key=os.environ["GEMINI_API_KEY"]) |
|
|
|
generation_config = { |
|
"temperature": 0.7, |
|
"top_p": 1, |
|
"top_k": 40, |
|
"max_output_tokens": 1024, |
|
} |
|
|
|
model = genai.GenerativeModel( |
|
model_name="gemini-1.5-pro", |
|
generation_config=generation_config, |
|
) |
|
|
|
system_instruction = """ |
|
You are a code generation assistant. Follow these strict rules: |
|
|
|
1. Respond ONLY with code in ```python or ```bash blocks |
|
2. Never include explanations or comments |
|
3. For file operations, create complete executable scripts |
|
4. Handle errors within the code itself |
|
5. Format example: |
|
User: Create a file that prints hello |
|
Response: ```python\nprint("hello")\n``` |
|
""" |
|
|
|
chat = model.start_chat(history=[]) |
|
|
|
def execute_command(command, cwd=None): |
|
"""Executes a command and returns the output.""" |
|
process = subprocess.Popen( |
|
command, |
|
shell=True, |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.PIPE, |
|
text=True, |
|
cwd=cwd or current_dir |
|
) |
|
stdout, stderr = process.communicate() |
|
return stdout + stderr |
|
|
|
def extract_code(response): |
|
"""Extracts clean code from Gemini response""" |
|
code_blocks = re.findall(r'```(?:python|bash)\n(.*?)\n```', response, re.DOTALL) |
|
return code_blocks[0].strip() if code_blocks else None |
|
|
|
@app.route("/") |
|
def index(): |
|
return render_template("index.html") |
|
|
|
@app.route("/execute", methods=["POST"]) |
|
def execute_code(): |
|
global current_dir |
|
command = request.json.get("code", "").strip() |
|
if not command: |
|
return jsonify({"result": "Error: No command provided."}) |
|
|
|
try: |
|
if command.lower().startswith("ai:"): |
|
ai_command = command[3:].strip() |
|
response = chat.send_message(f"{system_instruction}\nUser request: {ai_command}") |
|
code = extract_code(response.text) |
|
|
|
if not code: |
|
return jsonify({"result": "Error: No valid code generated", "type": "error"}) |
|
|
|
|
|
if "print(" in code or "def " in code or "import " in code: |
|
filename = "generated_script.py" |
|
filepath = os.path.join(current_dir, filename) |
|
with open(filepath, 'w') as f: |
|
f.write(code) |
|
return jsonify({ |
|
"result": f"File created: {filename}", |
|
"type": "code", |
|
"file": filename, |
|
"content": code |
|
}) |
|
|
|
|
|
elif any(cmd in code for cmd in ["pip", "git", "cd", "mkdir"]): |
|
result = execute_command(code) |
|
return jsonify({ |
|
"result": f"Command executed:\n{result}", |
|
"type": "command" |
|
}) |
|
|
|
return jsonify({"result": "Unsupported code type", "type": "error"}) |
|
|
|
elif command == "show files": |
|
files = os.listdir(current_dir) |
|
return jsonify({"result": "Files:\n" + "\n".join(files), "type": "files"}) |
|
|
|
elif command.startswith("cd "): |
|
new_dir = os.path.join(current_dir, command[3:]) |
|
if os.path.isdir(new_dir): |
|
current_dir = os.path.abspath(new_dir) |
|
return jsonify({"result": f"Changed directory to: {current_dir}", "type": "directory"}) |
|
return jsonify({"result": f"Error: Directory not found: {new_dir}", "type": "error"}) |
|
|
|
elif command.startswith("!"): |
|
result = execute_command(command[1:]) |
|
return jsonify({"result": result, "type": "command"}) |
|
|
|
elif command.startswith("pip install"): |
|
result = execute_command(f"{sys.executable} -m {command}") |
|
return jsonify({"result": result, "type": "package"}) |
|
|
|
elif command.endswith(".py"): |
|
result = execute_command(f"{sys.executable} {command}") |
|
return jsonify({"result": result, "type": "script"}) |
|
|
|
else: |
|
return jsonify({"result": "Unrecognized command", "type": "error"}) |
|
|
|
except Exception as e: |
|
return jsonify({"result": f"Error: {str(e)}", "type": "error"}) |
|
|
|
@app.route("/save_file", methods=["POST"]) |
|
def save_file(): |
|
try: |
|
filename = request.json.get("filename") |
|
content = request.json.get("content") |
|
filepath = os.path.join(current_dir, filename) |
|
with open(filepath, 'w') as f: |
|
f.write(content) |
|
return jsonify({"result": f"File {filename} saved", "type": "file"}) |
|
except Exception as e: |
|
return jsonify({"result": f"Save error: {str(e)}", "type": "error"}) |
|
|
|
@app.route("/cleanup", methods=["POST"]) |
|
def cleanup(): |
|
global temp_dir, current_dir |
|
try: |
|
if os.path.exists(temp_dir): |
|
shutil.rmtree(temp_dir) |
|
temp_dir = tempfile.mkdtemp() |
|
current_dir = temp_dir |
|
return jsonify({"result": "Environment reset", "type": "system"}) |
|
except Exception as e: |
|
return jsonify({"result": f"Cleanup error: {str(e)}", "type": "error"}) |
|
|
|
@app.route("/list_files", methods=["GET"]) |
|
def list_files(): |
|
try: |
|
files = os.listdir(current_dir) |
|
return jsonify({"files": files, "type": "files"}) |
|
except Exception as e: |
|
return jsonify({"result": f"List error: {str(e)}", "type": "error"}) |
|
|
|
@app.route("/download/<path:filename>", methods=["GET"]) |
|
def download_file(filename): |
|
try: |
|
return send_from_directory(current_dir, filename, as_attachment=True) |
|
except Exception as e: |
|
return jsonify({"result": f"Download error: {str(e)}", "type": "error"}) |
|
|
|
if __name__ == "__main__": |
|
app.run(host="0.0.0.0", port=7860) |