import gradio as gr from mysite.utilities import chat_with_interpreter, completion, process_file from interpreter import interpreter import mysite.interpreter_config # インポートするだけで設定が適用されます import duckdb def format_response(chunk, full_response): # Message if chunk["type"] == "message": full_response += chunk.get("content", "") if chunk.get("end", False): full_response += "\n" # Code if chunk["type"] == "code": if chunk.get("start", False): full_response += "```python\n" full_response += chunk.get("content", "").replace("`", "") if chunk.get("end", False): full_response += "\n```\n" # Output if chunk["type"] == "confirmation": if chunk.get("start", False): full_response += "```python\n" full_response += chunk.get("content", {}).get("code", "") if chunk.get("end", False): full_response += "```\n" # Console if chunk["type"] == "console": if chunk.get("start", False): full_response += "```python\n" if chunk.get("format", "") == "active_line": console_content = chunk.get("content", "") if console_content is None: full_response += "No output available on console." if chunk.get("format", "") == "output": console_content = chunk.get("content", "") full_response += console_content if chunk.get("end", False): full_response += "\n```\n" # Image if chunk["type"] == "image": if chunk.get("start", False) or chunk.get("end", False): full_response += "\n" else: image_format = chunk.get("format", "") if image_format == "base64.png": image_content = chunk.get("content", "") if image_content: image = Image.open(BytesIO(base64.b64decode(image_content))) new_image = Image.new("RGB", image.size, "white") new_image.paste(image, mask=image.split()[3]) buffered = BytesIO() new_image.save(buffered, format="PNG") img_str = base64.b64encode(buffered.getvalue()).decode() full_response += f"\n" return full_response # Set the environment variable. def chat_with_interpreter( message, history, a=None, b=None, c=None, d=None ): # , openai_api_key): # Set the API key for the interpreter # interpreter.llm.api_key = openai_api_key if message == "reset": interpreter.reset() return "Interpreter reset", history full_response = "" # add_conversation(history,20) user_entry = {"role": "user", "type": "message", "content": message} #messages.append(user_entry) # Call interpreter.chat and capture the result # message = message + "\nシンタックスを確認してください。" # result = interpreter.chat(message) for chunk in interpreter.chat(message, display=False, stream=True): # print(chunk) # output = '\n'.join(item['content'] for item in result if 'content' in item) full_response = format_response(chunk, full_response) yield full_response # chunk.get("content", "") # Extract the 'content' field from all elements in the result """ if isinstance(result, list): for item in result: if 'content' in item: #yield item['content']#, history output = '\n'.join(item['content'] for item in result if 'content' in item) else: #yield str(result)#, history output = str(result) """ age = 28 con = duckdb.connect(database="./workspace/sample.duckdb") con.execute( """ CREATE SEQUENCE IF NOT EXISTS sample_id_seq START 1; CREATE TABLE IF NOT EXISTS samples ( id INTEGER DEFAULT nextval('sample_id_seq'), name VARCHAR, age INTEGER, PRIMARY KEY(id) ); """ ) cur = con.cursor() con.execute("INSERT INTO samples (name, age) VALUES (?, ?)", (full_response, age)) con.execute("INSERT INTO samples (name, age) VALUES (?, ?)", (message, age)) # データをCSVファイルにエクスポート con.execute("COPY samples TO 'sample.csv' (FORMAT CSV, HEADER)") # データをコミット con.commit() # データを選択 cur = con.execute("SELECT * FROM samples") # 結果をフェッチ res = cur.fetchall() rows = "" # 結果を表示 # 結果を文字列に整形 rows = "\n".join([f"name: {row[0]}, age: {row[1]}" for row in res]) # コネクションを閉じる con.close() # print(cur.fetchall()) yield full_response + rows # , history return full_response, history PLACEHOLDER = """
Ask me anything...