import telegram from telegram import InlineKeyboardButton, InlineKeyboardMarkup, ChatAction from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, CallbackQueryHandler import FinanceDataReader as fdr import requests import logging import math from transformers import pipeline class StockCodeNotFoundError(Exception): pass class CurrencyConversionError(Exception): pass # Hugging Face 모델 로드 (명시적 지정) sentiment_analysis = pipeline('sentiment-analysis', model="distilbert-base-uncased-finetuned-sst-2-english") # 로깅 설정 logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO, handlers=[logging.StreamHandler()] ) logger = logging.getLogger(__name__) # 전역 변수 exchange_rates = {} # 입력 파싱 함수 def parse_input(text): logger.info(f"입력 파싱 중: {text}") try: lines = text.strip().split(',') stock_inputs = [] total_target_weight = 0 krw_cash = None for line in lines: parts = line.split() if len(parts) == 4: country_code, stock_code, quantity_expr, target_weight_expr = parts quantity = math.floor(eval(quantity_expr.replace(' ', ''))) target_weight = eval(target_weight_expr.replace(' ', '')) stock_inputs.append((country_code, stock_code, quantity, target_weight)) total_target_weight += target_weight elif len(parts) == 2: cash_amount_expr, target_weight_expr = parts cash_amount = math.floor(eval(cash_amount_expr.replace(' ', ''))) krw_cash = {'amount': cash_amount, 'target_weight': eval(target_weight_expr.replace(' ', ''))} elif len(parts) == 1: cash_amount_expr = parts[0] cash_amount = math.floor(eval(cash_amount_expr.replace(' ', ''))) krw_cash = {'amount': cash_amount, 'target_weight': 0} else: raise ValueError("잘못된 입력 형식입니다.") if krw_cash is None: krw_cash = {'amount': 0, 'target_weight': 0} # 현금 비율 cash_ratio = krw_cash['target_weight'] # 총 목표 비중 합계 stock_total_weight = total_target_weight # 목표 비중을 (1 - 현금 비율)로 조정 for i in range(len(stock_inputs)): stock_inputs[i] = (stock_inputs[i][0], stock_inputs[i][1], stock_inputs[i][2], (1 - cash_ratio) * stock_inputs[i][3] / stock_total_weight) logger.info(f"주식 입력: {stock_inputs}, 원화 현금: {krw_cash}") return stock_inputs, krw_cash except (ValueError, SyntaxError) as e: logger.error("입력 파싱 중 오류 발생", exc_info=e) raise ValueError("입력 형식이 잘못되었습니다. 예시) krw 458730 530 8, krw 368590 79 2, 518192") # 이후 다른 함수들은 동일합니다. # 환율 가져오기 함수 def get_exchange_rate(country_code): if country_code.upper() in exchange_rates: return exchange_rates[country_code.upper()] logger.info(f"환율 가져오는 중 - {country_code}") if country_code.upper() == 'KRW': return 1 url = f"https://quotation-api-cdn.dunamu.com/v1/forex/recent?codes=FRX.KRW{country_code.upper()}" try: response = requests.get(url) response.raise_for_status() data = response.json() exchange_rate = data[0]['basePrice'] exchange_rates[country_code.upper()] = exchange_rate logger.info(f"환율 - {country_code}: {exchange_rate}") return exchange_rate except (requests.RequestException, IndexError) as e: logger.error(f"환율 가져오기 오류 - {country_code}", exc_info=e) raise CurrencyConversionError("환율을 가져오는 중에 문제가 발생했습니다. 올바른 국가 코드를 입력하세요.") # 환율 반영 주가 가져오기 함수 def get_exchange_reflected_stock_price(stock_code, country_code): logger.info(f"환율 반영 주가 가져오는 중 - {stock_code} in {country_code}") try: current_price = get_current_stock_price(stock_code) exchange_rate = get_exchange_rate(country_code) reflected_price = math.floor(current_price * exchange_rate) logger.info(f"반영된 주가 - {stock_code}: {reflected_price}") return reflected_price except (StockCodeNotFoundError, CurrencyConversionError) as e: logger.error(f"반영된 주가 가져오기 오류 - {stock_code} in {country_code}", exc_info=e) raise e # 포트폴리오 구성 함수 def build_portfolio(stock_inputs, krw_cash): portfolio = {} target_weights = {} logger.info(f"포트폴리오 구성 중 - 주식 입력: {stock_inputs}, 원화 현금: {krw_cash}") for stock_input in stock_inputs: country_code, stock_code, quantity, target_weight = stock_input current_price = get_exchange_reflected_stock_price(stock_code, country_code) portfolio[stock_code] = {'quantity': quantity, 'price': current_price, 'country_code': country_code} target_weights[stock_code] = target_weight if krw_cash is None: krw_cash = {'amount': 0, 'target_weight': 0} logger.info(f"포트폴리오 구성 완료: {portfolio}, 목표 비중: {target_weights}, 원화 현금: {krw_cash}") return portfolio, target_weights, krw_cash # 메시지 처리 함수 def process_message(update, context): global exchange_rates exchange_rates = {} text = update.message.text chat_id = update.message.chat_id logger.info(f"메시지 처리 중 - chat_id {chat_id}: {text}") if chat_id not in context.user_data: context.user_data[chat_id] = {'portfolio': {}, 'target_weights': {}, 'krw_cash': None} delete_previous_messages(context, chat_id) loading_message = context.bot.send_message(chat_id=chat_id, text="불러오는 중...") context.bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING) try: stock_inputs, krw_cash = parse_input(text) portfolio, target_weights, krw_cash = build_portfolio(stock_inputs, krw_cash) context.user_data[chat_id] = {'portfolio': portfolio, 'target_weights': target_weights, 'krw_cash': krw_cash} result_message = get_portfolio_rebalancing_info(portfolio, target_weights, krw_cash) keyboard = [[InlineKeyboardButton("새로고침", callback_data='portfolio_info')]] reply_markup = InlineKeyboardMarkup(keyboard) message = update.message.reply_text(result_message, reply_markup=reply_markup, disable_web_page_preview=True, parse_mode='Markdown') message_ids = [message.message_id] context.user_data[chat_id]['message_ids'] = message_ids logger.info(f"메시지 처리 완료 - chat_id {chat_id}") except ValueError as e: logger.error("ValueError in process_message", exc_info=e) update.message.reply_text(str(e)) except CurrencyConversionError as e: logger.error("CurrencyConversionError in process_message", exc_info=e) update.message.reply_text(str(e)) except KeyError as e: logger.error("KeyError in process_message", exc_info=e) update.message.reply_text("해당 종목의 주가 데이터를 찾을 수 없습니다. 올바른 종목 코드를 입력하세요.") finally: context.bot.delete_message(chat_id=chat_id, message_id=loading_message.message_id) # 이전 메시지 삭제 함수 def delete_previous_messages(context, chat_id): logger.info(f"기존 메시지 삭제 중 - chat_id {chat_id}") for message_id in context.user_data.get(chat_id, {}).get('message_ids', []): try: context.bot.delete_message(chat_id=chat_id, message_id=message_id) except Exception as e: logger.error(f"메시지 삭제 중 오류 발생 - id {message_id}", exc_info=e) # 버튼 클릭 처리 함수 def button(update, context): query = update.callback_query chat_id = query.message.chat_id logger.info(f"버튼 콜백 데이터: {query.data}") if chat_id not in context.user_data: context.user_data[chat_id] = {'portfolio': {}, 'target_weights': {}, 'krw_cash': None} delete_previous_messages(context, chat_id) loading_message = context.bot.send_message(chat_id=chat_id, text="불러오는 중...") if query.data == 'portfolio_info': global exchange_rates exchange_rates = {} portfolio = context.user_data[chat_id]['portfolio'] krw_cash = context.user_data[chat_id]['krw_cash'] context.bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING) for stock_code, data in portfolio.items(): country_code = data['country_code'] try: current_price = get_current_stock_price(stock_code) exchange_rate = get_exchange_rate(country_code) reflected_price = math.floor(current_price * exchange_rate) data['price'] = reflected_price logger.info(f"가격 업데이트 - {stock_code}: {reflected_price}") except (StockCodeNotFoundError, CurrencyConversionError) as e: logger.error(f"가격 업데이트 중 오류 - {stock_code}", exc_info=e) context.bot.send_message(chat_id=chat_id, text=str(e)) return result_message = get_portfolio_rebalancing_info(portfolio, context.user_data[chat_id]['target_weights'], krw_cash) keyboard = [[InlineKeyboardButton("새로고침", callback_data='portfolio_info')]] reply_markup = InlineKeyboardMarkup(keyboard) message = context.bot.send_message(chat_id=chat_id, text=result_message, reply_markup=reply_markup, disable_web_page_preview=True, parse_mode='Markdown') message_ids = [message.message_id] context.user_data[chat_id]['message_ids'] = message_ids context.bot.delete_message(chat_id=chat_id, message_id=loading_message.message_id) # 현재 주가 가져오기 함수 def get_current_stock_price(stock_code): logger.info(f"현재 주가 가져오는 중 - {stock_code}") try: df = fdr.DataReader(stock_code) current_price = df['Close'].iloc[-1] logger.info(f"현재 주가 - {stock_code}: {current_price}") return current_price except ValueError as e: logger.error(f"주가 가져오기 오류 - {stock_code}", exc_info=e) raise StockCodeNotFoundError(f"해당 종목 코드를 찾을 수 없습니다. 올바른 종목 코드를 입력하세요.") # 포트폴리오 리밸런싱 정보 가져오기 함수 def get_portfolio_rebalancing_info(portfolio, target_weights, krw_cash): logger.info("포트폴리오 리밸런싱 정보 계산 중") total_value = sum(stock['price'] * stock['quantity'] for stock in portfolio.values()) + krw_cash['amount'] total_new_stock_value = 0 total_trade_value = 0 adjustments = [] for stock_code, stock_data in portfolio.items(): current_value = stock_data['price'] * stock_data['quantity'] target_weight = target_weights.get(stock_code, 0)*100 target_value = total_value * target_weights.get(stock_code, 0) difference = target_value - current_value # trade_quantity = difference / stock_data['price'] if difference > 0: trade_quantity = math.floor(difference / stock_data['price']) else: trade_quantity = -math.ceil(-difference / stock_data['price']) new_quantity = trade_quantity + stock_data['quantity'] trade_value = trade_quantity * stock_data['price'] total_trade_value += abs(trade_value) new_value = trade_value + current_value total_new_stock_value += new_value current_value_pct = (current_value / total_value) * 100 country_code = stock_data['country_code'].lower() country_emoji = COUNTRY_EMOJI_MAP.get(country_code, '') adjustments.append((difference, current_value,target_weight, current_value_pct, trade_quantity, country_emoji, stock_code, stock_data['price'], new_value, trade_value, stock_data['quantity'], new_quantity)) result_message = "" for adjustment in adjustments: difference, current_value,target_weight, current_value_pct, trade_quantity, country_emoji, stock_code, price, new_value, trade_value, old_quantity, new_quantity = adjustment new_value_pct = (new_value / total_value) * 100 result_message += f"`{country_emoji} {stock_code.upper()} @{format_amount(price)}\n KRW {format_amount(current_value)} [{current_value_pct:.1f}%]\n [{target_weight:.0f}%] {format_quantity(trade_quantity)}`\n\n" if krw_cash: krw_new_amount = total_value - total_new_stock_value krw_target_weight = krw_cash['target_weight']*100 krw_difference = krw_new_amount - krw_cash['amount'] krw_new_pct = (krw_new_amount / total_value) * 100 krw_emoji = COUNTRY_EMOJI_MAP.get('krw', '') result_message += f"`{krw_emoji} 현금\n KRW {format_amount(krw_cash['amount'])} [{(krw_cash['amount'] / total_value) * 100:.1f}%]\n [{krw_target_weight:.0f}%] {format_quantity(krw_difference)}`\n\n" result_message += f"`😁 전체 평가금액\n KRW {format_amount(total_value)}`" logger.info("포트폴리오 리밸런싱 정보 계산 완료") return f"\n{result_message}\n\n" # 수량 포맷 함수 def format_quantity(quantity): if quantity < 0: return f"-{-quantity:,}" else: return f"+{quantity:,}" # 금액 포맷 함수 def format_amount(amount): if amount < 0: return f"({-amount:,})" else: return f"{amount:,}" # 필요한 다른 국가 코드 및 이모지 매핑 COUNTRY_EMOJI_MAP = { 'usd': '🇺🇸', 'krw': '🇰🇷', 'eur': '🇪🇺', 'jpy': '🇯🇵', 'gbp': '🇬🇧', 'cny': '🇨🇳', 'aud': '🇦🇺', 'cad': '🇨🇦', 'chf': '🇨🇭', 'hkd': '🇭🇰', } # 시작 명령어 함수 def start(update, context): logger.info("받은 /start 명령어") bot = context.bot bot_info = bot.get_me() bot_description = f"{bot_info.first_name}" message = f"`{bot_description}`" update.message.reply_text(message, parse_mode='Markdown') update.message.text = f"{bot_description}" process_message(update, context) # 메인 함수 def main(): updater = Updater("6445814849:AAE7rMShIpKZ5tH-JXfaB16XeRpyRLKA5Mg", use_context=True) dp = updater.dispatcher dp.add_handler(CommandHandler("start", start)) dp.add_handler(MessageHandler(Filters.text & ~Filters.command, process_message)) dp.add_handler(CallbackQueryHandler(button)) updater.bot.send_message(chat_id='6287128460', text="/start") updater.start_polling() updater.idle() logger.info("봇 시작") if __name__ == "__main__": main()