File size: 9,438 Bytes
f91846a
a7c9894
55b1be9
d8436e9
78d28b2
faf1533
c0ca9ac
217ceea
f91846a
d8436e9
 
 
 
 
 
 
 
6fa6ba2
071a386
d8436e9
 
f91846a
98dc21e
78d28b2
98dc21e
 
d8436e9
 
 
98dc21e
d8436e9
 
 
 
 
 
78d28b2
d8436e9
1ce0901
 
98dc21e
 
55b1be9
98dc21e
d8436e9
55b1be9
 
 
df5bf2a
55b1be9
 
 
 
 
6482098
a7c9894
2cf7795
6482098
d8436e9
0a52d39
3a2e91a
34287b9
 
 
3a2e91a
34287b9
0385c62
2cf7795
d8436e9
 
 
2dcd5ab
8ff7523
 
 
 
 
 
 
 
 
 
 
 
 
b7f637d
d8436e9
71de3d4
 
 
 
 
 
 
 
 
 
 
 
d8436e9
 
a7a62b4
4307798
 
4ca38d6
8f4bb7f
59aa321
8cc2c87
46bb01c
d8436e9
 
 
 
 
 
 
 
20870ce
c0ca9ac
5e2e6eb
7c73af9
 
8ff7523
d8436e9
883cea6
b92f1fc
8b31299
b1adecf
b92f1fc
 
8ff7523
b92f1fc
 
 
 
b7f637d
 
 
 
 
 
d8436e9
 
 
 
 
 
 
 
 
 
 
 
b92f1fc
 
 
 
 
 
 
ab50743
b92f1fc
 
 
 
 
c0ca9ac
7c5eecf
a7a62b4
735a8eb
d546491
d8436e9
c0ca9ac
b92f1fc
 
c0ca9ac
b92f1fc
 
 
 
 
c0ca9ac
b92f1fc
78d28b2
b92f1fc
c0ca9ac
b92f1fc
8b31299
b92f1fc
c0ca9ac
d8436e9
3bf6983
98dc21e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8aa26dc
98dc21e
 
 
 
 
 
 
 
 
 
 
8e50e71
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
import json, os
import gradio as gr
from fastapi.middleware.cors import CORSMiddleware
from fastapi import FastAPI, Request, Header, BackgroundTasks, HTTPException, status
import google.generativeai as genai
import base64
from collections import defaultdict
from linebot import LineBotApi, WebhookHandler
from linebot.exceptions import InvalidSignatureError
from linebot.models import (
    MessageEvent,
    TextMessage,
    TextSendMessage,
    ImageSendMessage,
    AudioMessage,
    ImageMessage,
)
import PIL.Image
import tempfile
import httpx
import uvicorn

# 設定 Google AI API 金鑰
genai.configure(api_key=os.environ["GOOGLE_API_KEY"])

# 設定生成文字的參數
generation_config = genai.types.GenerationConfig(
    max_output_tokens=1000, temperature=0.2, top_p=0.5, top_k=16
)

# 使用 Gemini 模型
model = genai.GenerativeModel(
    "gemini-2.0-flash-exp",
    system_instruction="主要用繁體中文回答,但如果用戶使用詢問英文問題,就用英文回應。你現在是個專業助理,職稱為OPEN小助理,個性活潑、樂觀,願意回答所有問題",
    generation_config=generation_config,
)

# 初始化 LINE Bot
line_bot_api = LineBotApi(os.environ["CHANNEL_ACCESS_TOKEN"])
line_handler = WebhookHandler(os.environ["CHANNEL_SECRET"])

# 建立 FastAPI 應用程式
app = FastAPI()

# 設定 CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

@app.get("/")
def root():
    return {"title": "Line Bot"}

@app.post("/webhook")
async def webhook(request: Request, background_tasks: BackgroundTasks, x_line_signature=Header(None)):
    body = await request.body()
    try:
        background_tasks.add_task(
            line_handler.handle, body.decode("utf-8"), x_line_signature
        )
    except InvalidSignatureError:
        raise HTTPException(status_code=400, detail="Invalid signature")
    return "ok"

# Imgur 圖片上傳功能(接收 image_binary,不用暫存)
def upload_image_to_imgur_with_token(image_binary, access_token):
    try:
        headers = {"Authorization": f"Bearer {access_token}"}
        data = {
            "image": base64.b64encode(image_binary).decode("utf-8"),
            "type": "base64",
        }
        response = httpx.post("https://api.imgur.com/3/image", headers=headers, data=data)
        if response.status_code == 200:
            return response.json()["data"]["link"]
        else:
            print("Imgur 上傳失敗:", response.text)
            return None
    except Exception as e:
        print("圖片上傳例外:", e)
        return None

# 接收使用者圖片
def get_image_url(message_id):
    try:
        message_content = line_bot_api.get_message_content(message_id)
        file_path = f"/tmp/{message_id}.png"
        with open(file_path, "wb") as f:
            for chunk in message_content.iter_content():
                f.write(chunk)
        return file_path
    except Exception as e:
        print(f"Error getting image: {e}")
        return None

# 分析用戶圖片+問題
def analyze_with_gemini(image_path, user_text, chat):
    try:
        if not os.path.exists(image_path):
            raise FileNotFoundError(f"圖片路徑無效:{image_path}")
        organ = PIL.Image.open(image_path)
        response = chat.send_message([user_text, organ])
        return response.text
    except Exception as e:
        return f"發生錯誤: {e}"

# 儲存使用者訊息
user_message_history = defaultdict(list)
def store_user_message(user_id, message_type, message_content):
    user_message_history[user_id].append({
        "type": message_type,
        "content": message_content
    })

def get_previous_message(user_id):
    if user_id in user_message_history and len(user_message_history[user_id]) > 0:
        return user_message_history[user_id][-1]
    return None

# 主訊息處理邏輯
chat_sessions = {}
@line_handler.add(MessageEvent, message=(ImageMessage, TextMessage))
def handle_image_message(event):
    user_id = event.source.user_id
    chat = chat_sessions.get(user_id) or model.start_chat(history=[])
    chat_sessions[user_id] = chat

    user_text = event.message.text if event.message.type == "text" else None

    if user_text and user_text.startswith("請幫我生成圖片"):
        prompt = user_text.replace("請幫我生成圖片", "").strip()
        image_model = genai.GenerativeModel("gemini-2.0-flash-exp-image-generation")
        response = image_model.generate_content(prompt)

        if response.parts and hasattr(response.parts[0], "inline_data"):
            image_data = response.parts[0].inline_data.data
            access_token = os.environ.get("IMGUR_ACCESS_TOKEN")
            image_url = upload_image_to_imgur_with_token(image_data, access_token)

            if image_url:
                line_bot_api.reply_message(
                    event.reply_token,
                    [
                        TextSendMessage(text="這是我為你生成的圖片喔~ ✨"),
                        ImageSendMessage(original_content_url=image_url, preview_image_url=image_url)
                    ]
                )
            else:
                line_bot_api.reply_message(event.reply_token, TextSendMessage(text="圖片上傳失敗,請稍後再試~"))
        else:
            line_bot_api.reply_message(event.reply_token, TextSendMessage(text="圖片生成失敗,請稍後再試~"))
        return

    if event.message.type == "image":
        image_path = get_image_url(event.message.id)
        if image_path:
            store_user_message(user_id, "image", image_path)
            line_bot_api.reply_message(event.reply_token, TextSendMessage(text="圖片已接收成功囉,幫我輸入你想詢問的問題喔~"))
        else:
            line_bot_api.reply_message(event.reply_token, TextSendMessage(text="沒有接收到圖片~"))
        return

    previous_message = get_previous_message(user_id)
    if previous_message and previous_message["type"] == "image" and event.message.type == "text":
        image_path = previous_message["content"]
        user_text = event.message.text
        store_user_message(user_id, "text", user_text)
        out = analyze_with_gemini(image_path, user_text, chat)
    else:
        if event.message.type != "text":
            line_bot_api.reply_message(event.reply_token, TextSendMessage(text="請輸入文字或圖片~"))
            return
        if event.message.text == "再見":
            line_bot_api.reply_message(event.reply_token, TextSendMessage(text="Bye!"))
            return
        if working_status:
            try:
                prompt = event.message.text
                store_user_message(user_id, "text", prompt)
                completion = chat.send_message(prompt)
                out = completion.text if completion.text else "我不太懂什麼意思也~"
            except:
                out = "執行出錯!請換個說法!"

    line_bot_api.reply_message(event.reply_token, TextSendMessage(text=out))

# 啟動應用
if __name__ == "__main__":
    uvicorn.run("main:app", host="0.0.0.0", port=7860, reload=True)

# 註解說明:
# import 導入必要的套件
# genai.configure 設定 Google AI API 金鑰
# generation_config 設定文字生成參數
# model 設定使用的 Gemini 模型
# line_bot_api 和 line_handler 設定 Line Bot API 和 webhook 處理器
# working_status 設定是否正在與使用者交談
# app 建立 FastAPI 應用程式
# app.add_middleware 設定 CORS
# @app.get("/") 處理根路徑請求
# @app.post("/webhook") 處理 Line Webhook 請求
# @line_handler.add(MessageEvent, message=TextMessage) 處理文字訊息事件
# if __name__ == "__main__": 啟動 FastAPI 應用程式
# 程式碼功能說明:
# 程式碼首先會導入必要的套件,並設定 Google AI API 金鑰、文字生成參數、Gemini 模型以及 Line Bot API。
# 接著會建立 FastAPI 應用程式,並設定 CORS。
# 程式碼會定義兩個函數:
# root() 處理根路徑請求,返回一個簡單的 JSON 訊息。
# webhook() 處理 Line Webhook 請求,將處理 Line 事件的任務加入背景工作,並處理無效的簽章錯誤。
# 程式碼還定義一個函數 handle_message() 來處理文字訊息事件,它會檢查事件類型和訊息類型,並根據使用者輸入執行不同的動作:
# 如果使用者輸入 "再見",回覆 "Bye!"。
# 如果正在與使用者交談,則會使用 Gemini 模型生成文字,並將結果回覆給使用者。
# 最後,程式碼會啟動 FastAPI 應用程式,開始監聽 HTTP 請求。
# 程式碼運行方式:
# 將程式碼存為 main.py 文件。
# 在環境變數中設定 GOOGLE_API_KEY、CHANNEL_ACCESS_TOKEN 和 CHANNEL_SECRET。
# 執行 uvicorn main:app --host 0.0.0.0 --port 7860 --reload 命令啟動 FastAPI 應用程式。
# 使用 Line 帳戶與 Line Bot 進行對話。
# 注意:
# 程式碼中使用 os.environ["GOOGLE_API_KEY"]、os.environ["CHANNEL_ACCESS_TOKEN"] 和 os.environ["CHANNEL_SECRET"] 來存取環境變數,需要先在環境變數中設定這些值。
# 程式碼中使用 uvicorn 執行 FastAPI 應用程式,需要先安裝 uvicorn 套件。
# 程式碼中使用 google.generativeai 套件,需要先安裝 google-generativeai 套件。
# 程式碼中使用 linebot 套件,需要先安裝 linebot 套件。