|
from mcp.server.fastmcp import FastMCP |
|
import random |
|
import time |
|
from litellm import completion |
|
import shlex |
|
from subprocess import Popen, PIPE |
|
from threading import Timer |
|
import os |
|
import glob |
|
import http.client |
|
import json |
|
import openpyxl |
|
import shutil |
|
from google import genai |
|
|
|
client = genai.Client(api_key="AIzaSyDtP05TyoIy9j0uPL7_wLEhgQEE75AZQSc") |
|
|
|
source_dir = "/app/uploads/temp" |
|
destination_dir = "/app/code_interpreter" |
|
files_list=[] |
|
downloaded_files=[] |
|
|
|
os.environ["GROQ_API_KEY"] ="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" |
|
os.environ["GEMINI_API_KEY"] ="AIzaSyAQgAtQPpY0bQaCqCISGxeyF6tpDePx-Jg" |
|
os.environ["OPENROUTER_API_KEY"] = "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" |
|
mcp = FastMCP("code_sandbox") |
|
data={} |
|
result="" |
|
stdout="" |
|
stderr="" |
|
import requests |
|
import os |
|
from bs4 import BeautifulSoup |
|
|
|
|
|
def download_all_files(base_url, files_endpoint, download_directory): |
|
"""Downloads all files listed on the server's /upload page.""" |
|
global downloaded_files |
|
|
|
|
|
if not os.path.exists(download_directory): |
|
os.makedirs(download_directory) |
|
|
|
try: |
|
|
|
files_url = f"{base_url}{files_endpoint}" |
|
response = requests.get(files_url) |
|
response.raise_for_status() |
|
|
|
|
|
soup = BeautifulSoup(response.content, "html.parser") |
|
|
|
|
|
|
|
file_links = soup.find_all("a") |
|
|
|
|
|
for link in file_links: |
|
try: |
|
file_url = link.get("href") |
|
if file_url: |
|
|
|
if not file_url.startswith("http"): |
|
file_url = f"{base_url}{file_url}" |
|
|
|
filename = os.path.basename(file_url) |
|
file_path = os.path.join(download_directory, filename) |
|
if filename in downloaded_files: |
|
pass |
|
else: |
|
downloaded_files.append(filename) |
|
print(f"Downloading: {filename} from {file_url}") |
|
|
|
|
|
file_response = requests.get(file_url, stream=True) |
|
file_response.raise_for_status() |
|
|
|
with open(file_path, "wb") as file: |
|
for chunk in file_response.iter_content(chunk_size=8192): |
|
if chunk: |
|
file.write(chunk) |
|
|
|
print(f"Downloaded: {filename} to {file_path}") |
|
|
|
except requests.exceptions.RequestException as e: |
|
print(f"Error downloading {link.get('href')}: {e}") |
|
except OSError as e: |
|
print(f"Error saving {filename}: {e}") |
|
|
|
except requests.exceptions.RequestException as e: |
|
print(f"Error getting file list from server: {e}") |
|
except Exception as e: |
|
print(f"An unexpected error occurred: {e}") |
|
|
|
def transfer_files(): |
|
for item in os.listdir(source_dir): |
|
item_path = os.path.join(source_dir, item) |
|
if os.path.isdir(item_path): |
|
for filename in os.listdir(item_path): |
|
source_file_path = os.path.join(item_path, filename) |
|
destination_file_path = os.path.join(destination_dir, filename) |
|
shutil.move(source_file_path, destination_file_path) |
|
|
|
def upload_file(file_path, upload_url): |
|
"""Uploads a file to the specified server endpoint.""" |
|
|
|
try: |
|
|
|
if not os.path.exists(file_path): |
|
raise FileNotFoundError(f"File not found: {file_path}") |
|
|
|
|
|
with open(file_path, "rb") as file: |
|
files = {"file": (os.path.basename(file_path), file)} |
|
|
|
|
|
response = requests.post(upload_url, files=files) |
|
|
|
|
|
response.raise_for_status() |
|
|
|
|
|
if response.status_code == 200: |
|
print(f"File uploaded successfully. Filename returned by server: {response.text}") |
|
return response.text |
|
else: |
|
print(f"Upload failed. Status code: {response.status_code}, Response: {response.text}") |
|
return None |
|
|
|
except FileNotFoundError as e: |
|
print(e) |
|
return None |
|
except requests.exceptions.RequestException as e: |
|
print(f"Upload failed. Network error: {e}") |
|
return None |
|
|
|
|
|
TOKEN = "5182224145:AAEjkSlPqV-Q3rH8A9X8HfCDYYEQ44v_qy0" |
|
chat_id = "5075390513" |
|
from requests_futures.sessions import FuturesSession |
|
session = FuturesSession() |
|
|
|
def run(cmd, timeout_sec): |
|
global stdout |
|
global stderr |
|
proc = Popen(shlex.split(cmd), stdout=PIPE, stderr=PIPE,cwd="/app/code_interpreter/") |
|
timer = Timer(timeout_sec, proc.kill) |
|
try: |
|
timer.start() |
|
stdout, stderr = proc.communicate() |
|
finally: |
|
timer.cancel() |
|
|
|
|
|
@mcp.tool() |
|
def analyse_audio(audiopath,query) -> dict: |
|
"""Ask another AI model about audios.The AI model can listen to the audio and give answers.Eg-query:Generate detailed minutes of meeting from the audio clip,audiopath='/app/code_interpreter/<audioname>'.Note:The audios are automatically present in the /app/code_interpreter directory.""" |
|
download_all_files("https://opengpt-4ik5.onrender.com", "/upload", "/app/code_interpreter") |
|
myfile = client.files.upload(file=audiopath) |
|
|
|
response = client.models.generate_content( |
|
model='gemini-2.0-flash', |
|
contents=[query, myfile] |
|
) |
|
return {"Output":str(response.text)} |
|
|
|
@mcp.tool() |
|
def analyse_video(videopath,query) -> dict: |
|
"""Ask another AI model about videos.The AI model can see the videos and give answers.Eg-query:Create a very detailed transcript and summary of the video,videopath='/app/code_interpreter/<videoname>'Note:The videos are automatically present in the /app/code_interpreter directory.""" |
|
download_all_files("https://opengpt-4ik5.onrender.com", "/upload", "/app/code_interpreter") |
|
video_file = client.files.upload(file=videopath) |
|
|
|
while video_file.state.name == "PROCESSING": |
|
print('.', end='') |
|
time.sleep(1) |
|
video_file = client.files.get(name=video_file.name) |
|
|
|
if video_file.state.name == "FAILED": |
|
raise ValueError(video_file.state.name) |
|
|
|
response = client.models.generate_content( |
|
model='gemini-2.0-flash', |
|
contents=[query, video_file] |
|
) |
|
return {"Output":str(response.text)} |
|
|
|
|
|
@mcp.tool() |
|
def analyse_images(imagepath,query) -> dict: |
|
"""Ask another AI model about images.The AI model can see the images and give answers.Eg-query:Who is the person in this image?,imagepath='/app/code_interpreter/<imagename>'.Note:The images are automatically present in the /app/code_interpreter directory.""" |
|
download_all_files("https://opengpt-4ik5.onrender.com", "/upload", "/app/code_interpreter") |
|
video_file = client.files.upload(file=imagepath) |
|
|
|
|
|
response = client.models.generate_content( |
|
model='gemini-2.0-flash', |
|
contents=[query, video_file] |
|
) |
|
return {"Output":str(response.text)} |
|
|
|
@mcp.tool() |
|
def create_code_files(filename: str, code: str) -> dict: |
|
global destination_dir |
|
download_all_files("https://opengpt-4ik5.onrender.com", "/upload", "/app/code_interpreter") |
|
"""Create code files by passing the the filename as well the entire code to write.The file is created by default in the /app/code_interpreter directory.Note:All user uploaded files that you might need to work upon are stored in the /app/code_interpreter directory.""" |
|
transfer_files() |
|
f = open(os.path.join(destination_dir, filename), "w") |
|
f.write(code) |
|
f.close() |
|
return {"info":"task completed. The referenced code files were created successfully. "} |
|
|
|
|
|
|
|
@mcp.tool() |
|
def run_code_files(start_cmd:str) -> dict: |
|
"""(start_cmd:Example- sudo python /app/code_interpreter/app.py or bash /app/code_interpreter/app.py).The files must be inside the /app/code_interpreter directory.""" |
|
global files_list |
|
global stdout |
|
global stderr |
|
run(start_cmd, 300) |
|
while stderr=="" and stdout=="": |
|
pass |
|
time.sleep(1.5) |
|
onlyfiles = glob.glob("/app/code_interpreter/*") |
|
onlyfiles=list(set(onlyfiles)-set(files_list)) |
|
uploaded_filenames=[] |
|
for files in onlyfiles: |
|
try: |
|
uploaded_filename = upload_file(files, "https://opengpt-4ik5.onrender.com/upload") |
|
uploaded_filenames.append(f"https://opengpt-4ik5.onrender.com/static/{uploaded_filename}") |
|
except: |
|
pass |
|
files_list=onlyfiles |
|
return {"stdout":stdout,"stderr":stderr,"Files_download_link":uploaded_filenames} |
|
|
|
|
|
@mcp.tool() |
|
def run_shell_command(cmd:str) -> dict: |
|
"""(cmd:Example- mkdir test.By default , the command is run inside the /app/code_interpreter/ directory.).Remember, the code_interpreter is running on **alpine linux** , so write commands accordingly.Eg-sudo does not work and is not required..""" |
|
global stdout |
|
global stderr |
|
|
|
run(cmd, 300) |
|
while stderr=="" and stdout=="": |
|
pass |
|
time.sleep(1.5) |
|
transfer_files() |
|
return {"stdout":stdout,"stderr":stderr} |
|
|
|
|
|
|
|
@mcp.tool() |
|
def install_python_packages(python_packages:str) -> dict: |
|
"""python_packages to install seperated by space.eg-(python packages:numpy matplotlib).The following python packages are preinstalled:gradio XlsxWriter openpyxl""" |
|
global sbx |
|
package_names = python_packages.strip() |
|
command="pip install" |
|
if not package_names: |
|
return |
|
|
|
run( |
|
f"{command} --break-system-packages {package_names}", timeout_sec=300 |
|
) |
|
while stderr=="" and stdout=="": |
|
pass |
|
time.sleep(2) |
|
return {"stdout":stdout,"stderr":stderr,"info":"Ran package installation command"} |
|
|
|
@mcp.tool() |
|
def get_youtube_transcript(videoid:str) -> dict: |
|
"""Get the transcript of a youtube video by passing the video id.First search the web using google / exa for the relevant videos.Eg videoid=ZacjOVVgoLY""" |
|
conn = http.client.HTTPSConnection("youtube-transcript3.p.rapidapi.com") |
|
headers = { |
|
'x-rapidapi-key': "2a155d4498mshd52b7d6b7a2ff86p10cdd0jsn6252e0f2f529", |
|
'x-rapidapi-host': "youtube-transcript3.p.rapidapi.com" |
|
} |
|
conn.request("GET",f"/api/transcript?videoId={videoid}", headers=headers) |
|
|
|
res = conn.getresponse() |
|
data = res.read() |
|
return json.loads(data) |
|
|
|
@mcp.tool() |
|
def read_excel_file(filename) -> dict: |
|
"""Reads the contents of an excel file.Returns a dict with key :value pair = cell location:cell content.Always run this command first , when working with excels.The excel file is automatically present in the /app/code_interpreter directory.Note:Always use openpyxl in python to work with excel files.""" |
|
global destination_dir |
|
download_all_files("https://opengpt-4ik5.onrender.com", "/upload", "/app/code_interpreter") |
|
|
|
workbook = openpyxl.load_workbook(os.path.join(destination_dir, filename)) |
|
|
|
|
|
excel_data_dict = {} |
|
|
|
|
|
for sheet_name in workbook.sheetnames: |
|
sheet = workbook[sheet_name] |
|
|
|
for row in sheet.iter_rows(): |
|
for cell in row: |
|
|
|
cell_coordinate = cell.coordinate |
|
cell_value = cell.value |
|
if cell_value is not None: |
|
excel_data_dict[cell_coordinate] = str(cell_value) |
|
return excel_data_dict |
|
@mcp.tool() |
|
def scrape_websites(url_list:list,query:str) -> list: |
|
"""Get the entire content of websites by passing in the url lists.query is the question you want to ask about the content of the website.e.g-query:Give .pptx links in the website.Note:Max urls in url_list is 3.""" |
|
|
|
conn = http.client.HTTPSConnection("scrapeninja.p.rapidapi.com") |
|
|
|
|
|
headers = { |
|
'x-rapidapi-key': "2a155d4498mshd52b7d6b7a2ff86p10cdd0jsn6252e0f2f529", |
|
'x-rapidapi-host': "scrapeninja.p.rapidapi.com", |
|
'Content-Type': "application/json" |
|
} |
|
Output=[] |
|
for urls in url_list: |
|
payload = {"url" :urls} |
|
payload=json.dumps(payload) |
|
conn.request("POST", "/scrape", payload, headers) |
|
res = conn.getresponse() |
|
data = res.read() |
|
content=str(data.decode("utf-8")) |
|
response = completion( |
|
model="gemini/gemini-2.0-flash-exp", |
|
messages=[ |
|
{"role": "user", "content": f"Output the following content in the human readable format.Try to conserve all the links and the text.Try to ouput the entire content.Remove the html codes so its human readable.Also answer this question about the content in a seperate paragraph:{query}.Here is the content:{content}"} |
|
], |
|
) |
|
Output.append(response.choices[0].message.content) |
|
|
|
return {"website_content":Output} |
|
|
|
|
|
@mcp.tool() |
|
def deepthinking1(query:str,info:str) -> dict: |
|
"""Ask another intelligent AI about the query.Ask the question defined by the query string and what you know about the question as well as provide your own knowledge and ideas about the question through the info string.""" |
|
response = completion( |
|
model="groq/deepseek-r1-distill-llama-70b", |
|
messages=[ |
|
{"role": "user", "content": f"{query}.Here is what i Know about the query:{info}"} |
|
], |
|
stream=False |
|
) |
|
|
|
|
|
return {"response":str(response.choices[0].message.content)} |
|
|
|
@mcp.tool() |
|
def deepthinking2(query:str,info:str) -> dict: |
|
"""Ask another intelligent AI about the query.Ask the question defined by the query string and what you know about the question as well as provide your own knowledge and ideas about the question through the info string.""" |
|
response = completion( |
|
model="openrouter/deepseek/deepseek-chat", |
|
messages=[ |
|
{"role": "user", "content": f"Hi!"}], |
|
provider={"order": ["Together"],"allow_fallbacks":False}, |
|
|
|
) |
|
|
|
|
|
return {"response":str(response.choices[0].message.content)} |
|
|
|
@mcp.tool() |
|
def deepthinking3(query:str,info:str) -> dict: |
|
"""Ask another intelligent AI about the query.Ask the question defined by the query string and what you know about the question as well as provide your own knowledge and ideas about the question through the info string.""" |
|
response = completion( |
|
model="gemini/gemini-2.0-flash-thinking-exp-01-21", |
|
messages=[ |
|
{"role": "user", "content": f"{query}.Here is what i Know about the query:{info}"} |
|
], |
|
) |
|
|
|
|
|
return {"response":str(response.choices[0].message.content)} |
|
|
|
if __name__ == "__main__": |
|
|
|
mcp.run(transport='stdio') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|