232131 / app.py
cryman38's picture
Upload app.py
2496778 verified
import telegram
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, ChatAction
from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, CallbackQueryHandler
import FinanceDataReader as fdr
import requests
import logging
import math
from transformers import pipeline
class StockCodeNotFoundError(Exception):
pass
class CurrencyConversionError(Exception):
pass
# Hugging Face ๋ชจ๋ธ ๋กœ๋“œ (๋ช…์‹œ์  ์ง€์ •)
sentiment_analysis = pipeline('sentiment-analysis', model="distilbert-base-uncased-finetuned-sst-2-english")
# ๋กœ๊น… ์„ค์ •
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO,
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
# ์ „์—ญ ๋ณ€์ˆ˜
exchange_rates = {}
# ์ž…๋ ฅ ํŒŒ์‹ฑ ํ•จ์ˆ˜
def parse_input(text):
logger.info(f"์ž…๋ ฅ ํŒŒ์‹ฑ ์ค‘: {text}")
try:
lines = text.strip().split(',')
stock_inputs = []
total_target_weight = 0
krw_cash = None
for line in lines:
parts = line.split()
if len(parts) == 4:
country_code, stock_code, quantity_expr, target_weight_expr = parts
quantity = math.floor(eval(quantity_expr.replace(' ', '')))
target_weight = eval(target_weight_expr.replace(' ', ''))
stock_inputs.append((country_code, stock_code, quantity, target_weight))
total_target_weight += target_weight
elif len(parts) == 2:
cash_amount_expr, target_weight_expr = parts
cash_amount = math.floor(eval(cash_amount_expr.replace(' ', '')))
krw_cash = {'amount': cash_amount, 'target_weight': eval(target_weight_expr.replace(' ', ''))}
elif len(parts) == 1:
cash_amount_expr = parts[0]
cash_amount = math.floor(eval(cash_amount_expr.replace(' ', '')))
krw_cash = {'amount': cash_amount, 'target_weight': 0}
else:
raise ValueError("์ž˜๋ชป๋œ ์ž…๋ ฅ ํ˜•์‹์ž…๋‹ˆ๋‹ค.")
if krw_cash is None:
krw_cash = {'amount': 0, 'target_weight': 0}
# ํ˜„๊ธˆ ๋น„์œจ
cash_ratio = krw_cash['target_weight']
# ์ด ๋ชฉํ‘œ ๋น„์ค‘ ํ•ฉ๊ณ„
stock_total_weight = total_target_weight
# ๋ชฉํ‘œ ๋น„์ค‘์„ (1 - ํ˜„๊ธˆ ๋น„์œจ)๋กœ ์กฐ์ •
for i in range(len(stock_inputs)):
stock_inputs[i] = (stock_inputs[i][0], stock_inputs[i][1], stock_inputs[i][2], (1 - cash_ratio) * stock_inputs[i][3] / stock_total_weight)
logger.info(f"์ฃผ์‹ ์ž…๋ ฅ: {stock_inputs}, ์›ํ™” ํ˜„๊ธˆ: {krw_cash}")
return stock_inputs, krw_cash
except (ValueError, SyntaxError) as e:
logger.error("์ž…๋ ฅ ํŒŒ์‹ฑ ์ค‘ ์˜ค๋ฅ˜ ๋ฐœ์ƒ", exc_info=e)
raise ValueError("์ž…๋ ฅ ํ˜•์‹์ด ์ž˜๋ชป๋˜์—ˆ์Šต๋‹ˆ๋‹ค. ์˜ˆ์‹œ) krw 458730 530 8, krw 368590 79 2, 518192")
# ์ดํ›„ ๋‹ค๋ฅธ ํ•จ์ˆ˜๋“ค์€ ๋™์ผํ•ฉ๋‹ˆ๋‹ค.
# ํ™˜์œจ ๊ฐ€์ ธ์˜ค๊ธฐ ํ•จ์ˆ˜
def get_exchange_rate(country_code):
if country_code.upper() in exchange_rates:
return exchange_rates[country_code.upper()]
logger.info(f"ํ™˜์œจ ๊ฐ€์ ธ์˜ค๋Š” ์ค‘ - {country_code}")
if country_code.upper() == 'KRW':
return 1
url = f"https://quotation-api-cdn.dunamu.com/v1/forex/recent?codes=FRX.KRW{country_code.upper()}"
try:
response = requests.get(url)
response.raise_for_status()
data = response.json()
exchange_rate = data[0]['basePrice']
exchange_rates[country_code.upper()] = exchange_rate
logger.info(f"ํ™˜์œจ - {country_code}: {exchange_rate}")
return exchange_rate
except (requests.RequestException, IndexError) as e:
logger.error(f"ํ™˜์œจ ๊ฐ€์ ธ์˜ค๊ธฐ ์˜ค๋ฅ˜ - {country_code}", exc_info=e)
raise CurrencyConversionError("ํ™˜์œจ์„ ๊ฐ€์ ธ์˜ค๋Š” ์ค‘์— ๋ฌธ์ œ๊ฐ€ ๋ฐœ์ƒํ–ˆ์Šต๋‹ˆ๋‹ค. ์˜ฌ๋ฐ”๋ฅธ ๊ตญ๊ฐ€ ์ฝ”๋“œ๋ฅผ ์ž…๋ ฅํ•˜์„ธ์š”.")
# ํ™˜์œจ ๋ฐ˜์˜ ์ฃผ๊ฐ€ ๊ฐ€์ ธ์˜ค๊ธฐ ํ•จ์ˆ˜
def get_exchange_reflected_stock_price(stock_code, country_code):
logger.info(f"ํ™˜์œจ ๋ฐ˜์˜ ์ฃผ๊ฐ€ ๊ฐ€์ ธ์˜ค๋Š” ์ค‘ - {stock_code} in {country_code}")
try:
current_price = get_current_stock_price(stock_code)
exchange_rate = get_exchange_rate(country_code)
reflected_price = math.floor(current_price * exchange_rate)
logger.info(f"๋ฐ˜์˜๋œ ์ฃผ๊ฐ€ - {stock_code}: {reflected_price}")
return reflected_price
except (StockCodeNotFoundError, CurrencyConversionError) as e:
logger.error(f"๋ฐ˜์˜๋œ ์ฃผ๊ฐ€ ๊ฐ€์ ธ์˜ค๊ธฐ ์˜ค๋ฅ˜ - {stock_code} in {country_code}", exc_info=e)
raise e
# ํฌํŠธํด๋ฆฌ์˜ค ๊ตฌ์„ฑ ํ•จ์ˆ˜
def build_portfolio(stock_inputs, krw_cash):
portfolio = {}
target_weights = {}
logger.info(f"ํฌํŠธํด๋ฆฌ์˜ค ๊ตฌ์„ฑ ์ค‘ - ์ฃผ์‹ ์ž…๋ ฅ: {stock_inputs}, ์›ํ™” ํ˜„๊ธˆ: {krw_cash}")
for stock_input in stock_inputs:
country_code, stock_code, quantity, target_weight = stock_input
current_price = get_exchange_reflected_stock_price(stock_code, country_code)
portfolio[stock_code] = {'quantity': quantity, 'price': current_price, 'country_code': country_code}
target_weights[stock_code] = target_weight
if krw_cash is None:
krw_cash = {'amount': 0, 'target_weight': 0}
logger.info(f"ํฌํŠธํด๋ฆฌ์˜ค ๊ตฌ์„ฑ ์™„๋ฃŒ: {portfolio}, ๋ชฉํ‘œ ๋น„์ค‘: {target_weights}, ์›ํ™” ํ˜„๊ธˆ: {krw_cash}")
return portfolio, target_weights, krw_cash
# ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ ํ•จ์ˆ˜
def process_message(update, context):
global exchange_rates
exchange_rates = {}
text = update.message.text
chat_id = update.message.chat_id
logger.info(f"๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ ์ค‘ - chat_id {chat_id}: {text}")
if chat_id not in context.user_data:
context.user_data[chat_id] = {'portfolio': {}, 'target_weights': {}, 'krw_cash': None}
delete_previous_messages(context, chat_id)
loading_message = context.bot.send_message(chat_id=chat_id, text="๋ถˆ๋Ÿฌ์˜ค๋Š” ์ค‘...")
context.bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING)
try:
stock_inputs, krw_cash = parse_input(text)
portfolio, target_weights, krw_cash = build_portfolio(stock_inputs, krw_cash)
context.user_data[chat_id] = {'portfolio': portfolio, 'target_weights': target_weights, 'krw_cash': krw_cash}
result_message = get_portfolio_rebalancing_info(portfolio, target_weights, krw_cash)
keyboard = [[InlineKeyboardButton("์ƒˆ๋กœ๊ณ ์นจ", callback_data='portfolio_info')]]
reply_markup = InlineKeyboardMarkup(keyboard)
message = update.message.reply_text(result_message, reply_markup=reply_markup, disable_web_page_preview=True, parse_mode='Markdown')
message_ids = [message.message_id]
context.user_data[chat_id]['message_ids'] = message_ids
logger.info(f"๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ ์™„๋ฃŒ - chat_id {chat_id}")
except ValueError as e:
logger.error("ValueError in process_message", exc_info=e)
update.message.reply_text(str(e))
except CurrencyConversionError as e:
logger.error("CurrencyConversionError in process_message", exc_info=e)
update.message.reply_text(str(e))
except KeyError as e:
logger.error("KeyError in process_message", exc_info=e)
update.message.reply_text("ํ•ด๋‹น ์ข…๋ชฉ์˜ ์ฃผ๊ฐ€ ๋ฐ์ดํ„ฐ๋ฅผ ์ฐพ์„ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค. ์˜ฌ๋ฐ”๋ฅธ ์ข…๋ชฉ ์ฝ”๋“œ๋ฅผ ์ž…๋ ฅํ•˜์„ธ์š”.")
finally:
context.bot.delete_message(chat_id=chat_id, message_id=loading_message.message_id)
# ์ด์ „ ๋ฉ”์‹œ์ง€ ์‚ญ์ œ ํ•จ์ˆ˜
def delete_previous_messages(context, chat_id):
logger.info(f"๊ธฐ์กด ๋ฉ”์‹œ์ง€ ์‚ญ์ œ ์ค‘ - chat_id {chat_id}")
for message_id in context.user_data.get(chat_id, {}).get('message_ids', []):
try:
context.bot.delete_message(chat_id=chat_id, message_id=message_id)
except Exception as e:
logger.error(f"๋ฉ”์‹œ์ง€ ์‚ญ์ œ ์ค‘ ์˜ค๋ฅ˜ ๋ฐœ์ƒ - id {message_id}", exc_info=e)
# ๋ฒ„ํŠผ ํด๋ฆญ ์ฒ˜๋ฆฌ ํ•จ์ˆ˜
def button(update, context):
query = update.callback_query
chat_id = query.message.chat_id
logger.info(f"๋ฒ„ํŠผ ์ฝœ๋ฐฑ ๋ฐ์ดํ„ฐ: {query.data}")
if chat_id not in context.user_data:
context.user_data[chat_id] = {'portfolio': {}, 'target_weights': {}, 'krw_cash': None}
delete_previous_messages(context, chat_id)
loading_message = context.bot.send_message(chat_id=chat_id, text="๋ถˆ๋Ÿฌ์˜ค๋Š” ์ค‘...")
if query.data == 'portfolio_info':
global exchange_rates
exchange_rates = {}
portfolio = context.user_data[chat_id]['portfolio']
krw_cash = context.user_data[chat_id]['krw_cash']
context.bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING)
for stock_code, data in portfolio.items():
country_code = data['country_code']
try:
current_price = get_current_stock_price(stock_code)
exchange_rate = get_exchange_rate(country_code)
reflected_price = math.floor(current_price * exchange_rate)
data['price'] = reflected_price
logger.info(f"๊ฐ€๊ฒฉ ์—…๋ฐ์ดํŠธ - {stock_code}: {reflected_price}")
except (StockCodeNotFoundError, CurrencyConversionError) as e:
logger.error(f"๊ฐ€๊ฒฉ ์—…๋ฐ์ดํŠธ ์ค‘ ์˜ค๋ฅ˜ - {stock_code}", exc_info=e)
context.bot.send_message(chat_id=chat_id, text=str(e))
return
result_message = get_portfolio_rebalancing_info(portfolio, context.user_data[chat_id]['target_weights'], krw_cash)
keyboard = [[InlineKeyboardButton("์ƒˆ๋กœ๊ณ ์นจ", callback_data='portfolio_info')]]
reply_markup = InlineKeyboardMarkup(keyboard)
message = context.bot.send_message(chat_id=chat_id, text=result_message, reply_markup=reply_markup, disable_web_page_preview=True, parse_mode='Markdown')
message_ids = [message.message_id]
context.user_data[chat_id]['message_ids'] = message_ids
context.bot.delete_message(chat_id=chat_id, message_id=loading_message.message_id)
# ํ˜„์žฌ ์ฃผ๊ฐ€ ๊ฐ€์ ธ์˜ค๊ธฐ ํ•จ์ˆ˜
def get_current_stock_price(stock_code):
logger.info(f"ํ˜„์žฌ ์ฃผ๊ฐ€ ๊ฐ€์ ธ์˜ค๋Š” ์ค‘ - {stock_code}")
try:
df = fdr.DataReader(stock_code)
current_price = df['Close'].iloc[-1]
logger.info(f"ํ˜„์žฌ ์ฃผ๊ฐ€ - {stock_code}: {current_price}")
return current_price
except ValueError as e:
logger.error(f"์ฃผ๊ฐ€ ๊ฐ€์ ธ์˜ค๊ธฐ ์˜ค๋ฅ˜ - {stock_code}", exc_info=e)
raise StockCodeNotFoundError(f"ํ•ด๋‹น ์ข…๋ชฉ ์ฝ”๋“œ๋ฅผ ์ฐพ์„ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค. ์˜ฌ๋ฐ”๋ฅธ ์ข…๋ชฉ ์ฝ”๋“œ๋ฅผ ์ž…๋ ฅํ•˜์„ธ์š”.")
# ํฌํŠธํด๋ฆฌ์˜ค ๋ฆฌ๋ฐธ๋Ÿฐ์‹ฑ ์ •๋ณด ๊ฐ€์ ธ์˜ค๊ธฐ ํ•จ์ˆ˜
def get_portfolio_rebalancing_info(portfolio, target_weights, krw_cash):
logger.info("ํฌํŠธํด๋ฆฌ์˜ค ๋ฆฌ๋ฐธ๋Ÿฐ์‹ฑ ์ •๋ณด ๊ณ„์‚ฐ ์ค‘")
total_value = sum(stock['price'] * stock['quantity'] for stock in portfolio.values()) + krw_cash['amount']
total_new_stock_value = 0
total_trade_value = 0
adjustments = []
for stock_code, stock_data in portfolio.items():
current_value = stock_data['price'] * stock_data['quantity']
target_weight = target_weights.get(stock_code, 0)*100
target_value = total_value * target_weights.get(stock_code, 0)
difference = target_value - current_value
# trade_quantity = difference / stock_data['price']
if difference > 0:
trade_quantity = math.floor(difference / stock_data['price'])
else:
trade_quantity = -math.ceil(-difference / stock_data['price'])
new_quantity = trade_quantity + stock_data['quantity']
trade_value = trade_quantity * stock_data['price']
total_trade_value += abs(trade_value)
new_value = trade_value + current_value
total_new_stock_value += new_value
current_value_pct = (current_value / total_value) * 100
country_code = stock_data['country_code'].lower()
country_emoji = COUNTRY_EMOJI_MAP.get(country_code, '')
adjustments.append((difference, current_value,target_weight, current_value_pct, trade_quantity, country_emoji, stock_code, stock_data['price'], new_value, trade_value, stock_data['quantity'], new_quantity))
result_message = ""
for adjustment in adjustments:
difference, current_value,target_weight, current_value_pct, trade_quantity, country_emoji, stock_code, price, new_value, trade_value, old_quantity, new_quantity = adjustment
new_value_pct = (new_value / total_value) * 100
result_message += f"`{country_emoji} {stock_code.upper()} @{format_amount(price)}\n KRW {format_amount(current_value)} [{current_value_pct:.1f}%]\n [{target_weight:.0f}%] {format_quantity(trade_quantity)}`\n\n"
if krw_cash:
krw_new_amount = total_value - total_new_stock_value
krw_target_weight = krw_cash['target_weight']*100
krw_difference = krw_new_amount - krw_cash['amount']
krw_new_pct = (krw_new_amount / total_value) * 100
krw_emoji = COUNTRY_EMOJI_MAP.get('krw', '')
result_message += f"`{krw_emoji} ํ˜„๊ธˆ\n KRW {format_amount(krw_cash['amount'])} [{(krw_cash['amount'] / total_value) * 100:.1f}%]\n [{krw_target_weight:.0f}%] {format_quantity(krw_difference)}`\n\n"
result_message += f"`๐Ÿ˜ ์ „์ฒด ํ‰๊ฐ€๊ธˆ์•ก\n KRW {format_amount(total_value)}`"
logger.info("ํฌํŠธํด๋ฆฌ์˜ค ๋ฆฌ๋ฐธ๋Ÿฐ์‹ฑ ์ •๋ณด ๊ณ„์‚ฐ ์™„๋ฃŒ")
return f"\n{result_message}\n\n"
# ์ˆ˜๋Ÿ‰ ํฌ๋งท ํ•จ์ˆ˜
def format_quantity(quantity):
if quantity < 0:
return f"-{-quantity:,}"
else:
return f"+{quantity:,}"
# ๊ธˆ์•ก ํฌ๋งท ํ•จ์ˆ˜
def format_amount(amount):
if amount < 0:
return f"({-amount:,})"
else:
return f"{amount:,}"
# ํ•„์š”ํ•œ ๋‹ค๋ฅธ ๊ตญ๊ฐ€ ์ฝ”๋“œ ๋ฐ ์ด๋ชจ์ง€ ๋งคํ•‘
COUNTRY_EMOJI_MAP = {
'usd': '๐Ÿ‡บ๐Ÿ‡ธ',
'krw': '๐Ÿ‡ฐ๐Ÿ‡ท',
'eur': '๐Ÿ‡ช๐Ÿ‡บ',
'jpy': '๐Ÿ‡ฏ๐Ÿ‡ต',
'gbp': '๐Ÿ‡ฌ๐Ÿ‡ง',
'cny': '๐Ÿ‡จ๐Ÿ‡ณ',
'aud': '๐Ÿ‡ฆ๐Ÿ‡บ',
'cad': '๐Ÿ‡จ๐Ÿ‡ฆ',
'chf': '๐Ÿ‡จ๐Ÿ‡ญ',
'hkd': '๐Ÿ‡ญ๐Ÿ‡ฐ',
}
# ์‹œ์ž‘ ๋ช…๋ น์–ด ํ•จ์ˆ˜
def start(update, context):
logger.info("๋ฐ›์€ /start ๋ช…๋ น์–ด")
bot = context.bot
bot_info = bot.get_me()
bot_description = f"{bot_info.first_name}"
message = f"`{bot_description}`"
update.message.reply_text(message, parse_mode='Markdown')
update.message.text = f"{bot_description}"
process_message(update, context)
# ๋ฉ”์ธ ํ•จ์ˆ˜
def main():
updater = Updater("6445814849:AAE7rMShIpKZ5tH-JXfaB16XeRpyRLKA5Mg", use_context=True)
dp = updater.dispatcher
dp.add_handler(CommandHandler("start", start))
dp.add_handler(MessageHandler(Filters.text & ~Filters.command, process_message))
dp.add_handler(CallbackQueryHandler(button))
updater.bot.send_message(chat_id='6287128460', text="/start")
updater.start_polling()
updater.idle()
logger.info("๋ด‡ ์‹œ์ž‘")
if __name__ == "__main__":
main()