import gradio as gr import logging import math import FinanceDataReader as fdr import requests import ssl from datetime import datetime # 현재 날짜를 "Jun-20-2024" 형식으로 가져오기 current_date = datetime.now().strftime("%b-%d-%Y") # SSL 인증서 검증 비활성화 ssl._create_default_https_context = ssl._create_unverified_context # 로깅 설정 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) exchange_rates = {} class StockCodeNotFoundError(Exception): pass class CurrencyConversionError(Exception): pass def parse_input(text, cash_amount, cash_ratio): logger.info(f"Parsing input: {text} with cash amount: {cash_amount} and cash ratio: {cash_ratio}") try: lines = text.strip().split(',') stock_inputs = [] total_target_weight = 0 for line in lines: parts = line.split() if len(parts) == 4: country_code, stock_code, quantity_expr, target_weight_expr = parts quantity = math.floor(eval(quantity_expr.replace(' ', ''))) target_weight = eval(target_weight_expr.replace(' ', '')) stock_inputs.append((country_code, stock_code, quantity, target_weight)) total_target_weight += target_weight else: raise ValueError("Invalid input format.") krw_cash = {'amount': cash_amount, 'target_weight': cash_ratio / 100.0} cash_ratio = krw_cash['target_weight'] stock_total_weight = total_target_weight for i in range(len(stock_inputs)): stock_inputs[i] = (stock_inputs[i][0], stock_inputs[i][1], stock_inputs[i][2], (1 - cash_ratio) * stock_inputs[i][3] / stock_total_weight) logger.info(f"Parsed stock inputs: {stock_inputs}, KRW cash: {krw_cash}") return stock_inputs, krw_cash except (ValueError, SyntaxError) as e: logger.error("Error parsing input", exc_info=e) raise ValueError("Invalid input format. Example: usd schd 21 8, krw 368590 530 2") def get_exchange_rate(country_code): if country_code.upper() in exchange_rates: return exchange_rates[country_code.upper()] logger.info(f"Fetching exchange rate - {country_code}") if country_code.upper() == 'KRW': return 1 url = f"https://quotation-api-cdn.dunamu.com/v1/forex/recent?codes=FRX.KRW{country_code.upper()}" try: response = requests.get(url, verify=False) response.raise_for_status() data = response.json() exchange_rate = data[0]['basePrice'] exchange_rates[country_code.upper()] = exchange_rate logger.info(f"Exchange rate - {country_code}: {exchange_rate}") return exchange_rate except (requests.RequestException, IndexError) as e: logger.error(f"Error fetching exchange rate - {country_code}", exc_info=e) raise CurrencyConversionError("Error fetching exchange rate. Please enter a valid country code.") def get_exchange_reflected_stock_price(stock_code, country_code): logger.info(f"Fetching exchange reflected stock price - {stock_code} in {country_code}") try: current_price = get_current_stock_price(stock_code) exchange_rate = get_exchange_rate(country_code) reflected_price = math.floor(current_price * exchange_rate) logger.info(f"Reflected stock price - {stock_code}: {reflected_price}") return reflected_price except (StockCodeNotFoundError, CurrencyConversionError) as e: logger.error(f"Error fetching reflected stock price - {stock_code} in {country_code}", exc_info=e) raise e def get_current_stock_price(stock_code): logger.info(f"Fetching current stock price - {stock_code}") try: df = fdr.DataReader(stock_code) current_price = df['Close'].iloc[-1] logger.info(f"Current stock price - {stock_code}: {current_price}") return current_price except ValueError as e: logger.error(f"Error fetching stock price - {stock_code}", exc_info=e) raise StockCodeNotFoundError("Stock code not found. Please enter a valid stock code.") def build_portfolio(stock_inputs, krw_cash): portfolio = {} target_weights = {} logger.info(f"Building portfolio - stock inputs: {stock_inputs}, KRW cash: {krw_cash}") for stock_input in stock_inputs: country_code, stock_code, quantity, target_weight = stock_input current_price = get_exchange_reflected_stock_price(stock_code, country_code) portfolio[stock_code] = {'quantity': quantity, 'price': current_price, 'country_code': country_code} target_weights[stock_code] = target_weight if krw_cash is None: krw_cash = {'amount': 0, 'target_weight': 0} logger.info(f"Portfolio built: {portfolio}, target weights: {target_weights}, KRW cash: {krw_cash}") return portfolio, target_weights, krw_cash def get_portfolio_rebalancing_info(portfolio, target_weights, krw_cash): logger.info("Calculating portfolio rebalancing information") total_value = sum(stock['price'] * stock['quantity'] for stock in portfolio.values()) + krw_cash['amount'] total_new_stock_value = 0 total_trade_value = 0 adjustments = [] for stock_code, stock_data in portfolio.items(): current_value = stock_data['price'] * stock_data['quantity'] target_weight = target_weights.get(stock_code, 0) target_value = total_value * target_weights.get(stock_code, 0) difference = target_value - current_value trade_quantity = math.floor(difference / stock_data['price']) if difference > 0 else -math.ceil(-difference / stock_data['price']) new_quantity = trade_quantity + stock_data['quantity'] trade_value = trade_quantity * stock_data['price'] total_trade_value += abs(trade_value) new_value = trade_value + current_value total_new_stock_value += new_value current_value_pct = (current_value / total_value) * 100 adjustments.append((difference, current_value, target_weight, current_value_pct, trade_quantity, stock_code, stock_data['price'], new_value, trade_value, stock_data['quantity'], new_quantity)) result_message = "" for adjustment in adjustments: difference, current_value, target_weight, current_value_pct, trade_quantity, stock_code, price, new_value, trade_value, old_quantity, new_quantity = adjustment new_value_pct = (new_value / total_value) * 100 formatted_trade_quantity = f"{trade_quantity:+,}" if trade_quantity != 0 else "0" result_message += ( f"{stock_code.upper()} @{price:,}\n" f" {current_value:,} [{current_value_pct:.1f}%]\n" f" [{target_weight * 100:.1f}%] {formatted_trade_quantity}\n\n" ) if krw_cash: krw_new_amount = total_value - total_new_stock_value krw_target_weight = krw_cash['target_weight'] krw_difference = krw_new_amount - krw_cash['amount'] krw_new_pct = (krw_new_amount / total_value) * 100 formatted_krw_difference = f"{krw_difference:+,}" if krw_difference != 0 else "0" result_message += ( f"Cash\n" f" {krw_cash['amount']:,} [{(krw_cash['amount'] / total_value) * 100:.1f}%]\n" f" [{krw_target_weight * 100:.1f}%] {formatted_krw_difference}\n\n" ) result_message += f"Total Portfolio Value: {total_value:,}" logger.info("Portfolio rebalancing information calculated") return result_message def rebalance_portfolio(input_text, cash_amount, cash_ratio): try: cash_amount = int(cash_amount) stock_inputs, krw_cash = parse_input(input_text, cash_amount, cash_ratio) portfolio, target_weights, krw_cash = build_portfolio(stock_inputs, krw_cash) result_message = get_portfolio_rebalancing_info(portfolio, target_weights, krw_cash) return result_message except Exception as e: return str(e) # Gradio 인터페이스 설정 interface = gr.Interface( fn=rebalance_portfolio, inputs=[ gr.Textbox(lines=2, value="usd schd 545 8, usd qqq 22 2", label="포트폴리오 데이터"), gr.Textbox(lines=1, value="36974411", label="추가 투자금"), gr.Slider(minimum=0, maximum=100, step=1, value=33, label="현금 비율 (%)") ], outputs="text", title=f"Re-Balancing Analysis | Your Portfolio Holdings as of {current_date}", # description=( # "기본 통화: KRW\n\n" # "리밸런싱은 1주 단위로 계산됩니다.\n\n" # "포트폴리오 데이터를 다음 형식으로 입력하세요:\n" # " - 주식의 경우: 통화 티커 보유수량 목표비중\n" # " - 추가 투자금을 입력하세요\n" # " - 슬라이더를 사용하여 목표현금비율 (%)을 입력하세요\n\n" # " - 여러 종목을 쉼표로 구분하여 입력할 수 있습니다.\n" # " - 수량과 비중에 대해 사칙 연산을 사용할 수 있습니다.\n\n" # "예시:\n" # " - usd schd 545 8, usd qqq 22 2\n\n" # " - 보유주식: SCHD 545주, QQQ 22주\n" # " - 목표비중: SCHD:QQQ = 8:2\n\n" # "사칙 연산 예시:\n" # " - krw 458730 530+50 8/2, krw 368590 79*2 2+1\n\n" # "출력 형식:\n\n" # " - 종목코드 @현재가\n\n" # " - 평가금액 [현재비중%]\n\n" # " - [목표비중%] 신규수량\n\n" # " - 전체 평가금액" # ) ) if __name__ == "__main__": interface.launch(share=True)