Spaces:
Running
Running
import math | |
import FinanceDataReader as fdr | |
import yfinance as yf | |
from concurrent.futures import ThreadPoolExecutor | |
from modules.utils import load_css, get_currency_symbol, format_quantity, format_value, current_time | |
from collections import defaultdict | |
def parse_input(holdings, cash_amount): | |
try: | |
lines = holdings.strip().split(',') | |
stock_inputs = [] | |
total_target_ratio = 0 | |
# ์ข ๋ชฉ ์ ๋ณด๋ฅผ ์ ์ฅํ ๋์ ๋๋ฆฌ | |
stock_dict = {} | |
for line in lines: | |
parts = line.split() | |
if len(parts) == 4: | |
stock_code, currency_code, quantity_expr, target_ratio_expr = parts | |
quantity = float(eval(quantity_expr.replace(' ', ''))) | |
if target_ratio_expr.strip() == '[]': | |
target_ratio = 0 | |
else: | |
target_ratio = float(eval(target_ratio_expr.strip('[]').replace(' ', ''))) | |
# ์ข ๋ชฉ ์ฝ๋๊ฐ ์ด๋ฏธ ๋์ ๋๋ฆฌ์ ์กด์ฌํ๋ ๊ฒฝ์ฐ, ์๋๊ณผ ๋ชฉํ ๋น์จ์ ํฉ์ฐ | |
if stock_code in stock_dict: | |
stock_dict[stock_code]['quantity'] += quantity | |
stock_dict[stock_code]['target_ratio'] += target_ratio | |
else: | |
stock_dict[stock_code] = { | |
'currency_code': currency_code.upper(), | |
'quantity': quantity, | |
'target_ratio': target_ratio | |
} | |
total_target_ratio += target_ratio | |
stock_inputs = [(details['currency_code'], stock_code, details['quantity'], details['target_ratio']) | |
for stock_code, details in stock_dict.items()] | |
return stock_inputs, cash_amount | |
except Exception as e: | |
raise ValueError(f"Input parsing error: {e}") | |
# def get_portfolio_exchange_rate(currency_code, main_currency): | |
# try: | |
# if main_currency.lower() == 'krw': | |
# if currency_code.upper() != 'KRW': | |
# fx_ticker = f"FRED:DEXKOUS" if currency_code.upper() == 'USD' else None | |
# if not fx_ticker: | |
# raise ValueError(f"<p style='color: red;'>Unsupported currency pair for KRW: {currency_code}</p>") | |
# exchange_rate_data = fdr.DataReader(fx_ticker) | |
# if not exchange_rate_data.empty: | |
# return exchange_rate_data['DEXKOUS'].iloc[-1] | |
# else: | |
# raise ValueError(f"<p style='color: red;'>Failed to retrieve exchange rate data for {currency_code} to KRW</p>") | |
# else: | |
# return 1.0 # KRW to KRW๋ ํ์จ์ด 1๋ก ๊ณ ์ ๋จ | |
# else: | |
# if currency_code.lower() == main_currency.lower(): | |
# return 1.0 | |
# ticker = f"{currency_code.upper()}{main_currency.upper()}=X" | |
# data = yf.download(ticker, period='1d', progress=False) | |
# if not data.empty: | |
# return data['Close'].iloc[0] | |
# else: | |
# raise ValueError(f"<p style='color: red;'>Failed to retrieve exchange rate data for ticker: {ticker}</p>") | |
# except Exception as e: | |
# raise ValueError(f"<p style='color: red;'>Exchange rate retrieval error: {e}</p>") | |
import pandas_datareader.data as web | |
import datetime | |
def get_portfolio_exchange_rate(currency_code, main_currency): | |
try: | |
if main_currency.lower() == 'krw': | |
if currency_code.upper() != 'KRW': | |
if currency_code.upper() == 'USD': | |
fx_ticker = 'DEXKOUS' | |
else: | |
raise ValueError(f"<p style='color: red;'>Unsupported currency pair for KRW: {currency_code}</p>") | |
start = datetime.datetime.now() - datetime.timedelta(days=30) | |
end = datetime.datetime.now() | |
exchange_rate_data = web.DataReader(fx_ticker, 'fred', start, end) | |
if exchange_rate_data.empty: | |
raise ValueError(f"<p style='color: red;'>No data available for {fx_ticker}</p>") | |
return exchange_rate_data['DEXKOUS'].iloc[-1] | |
else: | |
return 1.0 # KRW to KRW๋ ํ์จ์ด 1๋ก ๊ณ ์ ๋จ | |
else: | |
if currency_code.lower() == main_currency.lower(): | |
return 1.0 | |
ticker = f"{currency_code.upper()}{main_currency.upper()}=X" | |
data = yf.download(ticker, period='1d', progress=False) | |
if data.empty: | |
raise ValueError(f"<p style='color: red;'>No exchange rate data for ticker: {ticker}</p>") | |
return data['Close'].iloc[0] | |
except Exception as e: | |
raise ValueError(f"<p style='color: red;'>Exchange rate retrieval error: {e}</p>") | |
def get_portfolio_exchange_reflected_stock_price(stock_code, currency_code, main_currency): | |
try: | |
new_price = get_portfolio_current_stock_price(stock_code) | |
exchange_rate = get_portfolio_exchange_rate(currency_code, main_currency) | |
return new_price * exchange_rate | |
except Exception as e: | |
raise ValueError(f"<p style='color: red;'>Exchange reflected stock price error: {e}</p>") | |
def get_portfolio_current_stock_price(stock_code): | |
try: | |
# ํ๊ตญ ์ฃผ์ ์ฝ๋๊ฐ ์ซ์๋ง ์๋์ง ํ์ธ (ํ๊ตญ ์ฃผ์์ ๋ณดํต ์ซ์ ์ฝ๋) | |
if stock_code.isdigit(): | |
df = fdr.DataReader(stock_code) | |
return df['Close'].iloc[-1] | |
else: | |
# ํ๊ตญ ์ธ ์ฃผ์์ yfinance๋ก ์กฐํ | |
stock = yf.Ticker(stock_code) | |
df = stock.history(period="1d") | |
return df['Close'].iloc[-1] | |
except Exception as e: | |
raise ValueError(f"<p style='color: red;'>Current stock price retrieval error: {e}</p>") | |
def set_default_ratios_if_zero(target_ratios): | |
total_target_ratio = sum(target_ratios.values()) | |
if total_target_ratio == 0: | |
num_stocks = len(target_ratios) | |
default_ratio = 1 / num_stocks | |
return {stock_code: default_ratio for stock_code in target_ratios} | |
return target_ratios | |
def build_portfolio(stock_inputs, main_currency): | |
portfolio = {} | |
target_ratios = {} | |
with ThreadPoolExecutor() as executor: | |
results = list(executor.map(lambda x: (x[1], get_portfolio_exchange_reflected_stock_price(x[1], x[0], main_currency), x[2], x[3], x[0]), stock_inputs)) | |
total_value = 0 | |
for stock_code, new_price, quantity, target_ratio, currency_code in results: | |
portfolio[stock_code] = {'quantity': quantity, 'price': new_price, 'currency': currency_code} | |
target_ratios[stock_code] = target_ratio | |
total_value += new_price * quantity | |
target_ratios = set_default_ratios_if_zero(target_ratios) | |
return portfolio, target_ratios, total_value | |
def generate_rebalancing_analysis(portfolio, target_ratios, total_value, main_currency, cash_amount, allow_selling): | |
css = load_css() | |
currency_symbol = get_currency_symbol(main_currency) | |
adjustments = [] | |
new_total_value = cash_amount + total_value | |
total_target_ratio = sum(target_ratios.values()) | |
target_ratios = set_default_ratios_if_zero(target_ratios) | |
for stock_code, stock_data in portfolio.items(): | |
current_value = stock_data['price'] * stock_data['quantity'] | |
target_ratio = target_ratios.get(stock_code, 0) | |
target_weight = target_ratio / total_target_ratio | |
target_value = new_total_value * target_weight | |
difference = target_value - current_value | |
# ๋งค๋ ํ์ฉ ์ฌ๋ถ์ ๋ฐ๋ฅธ trade_value ๊ณ์ฐ | |
if allow_selling: | |
trade_value = difference | |
else: | |
# ํ๊ธ ๋ฐ ์ข ๋ชฉ๋ณ trade_value ๊ณ์ฐ | |
positive_differences = [ | |
new_total_value * (target_ratios.get(code, 0) / total_target_ratio) - (data['price'] * data['quantity']) | |
for code, data in portfolio.items() | |
] | |
total_positive_difference = sum(max(d, 0) for d in positive_differences) | |
if total_positive_difference > 0: | |
trade_value = max(difference, 0) * cash_amount / total_positive_difference | |
else: | |
trade_value = 0 | |
# ๋งค์, ๋งค๋ ์ฌ๋ถ ๊ฒฐ์ | |
trade_quantity = trade_value / stock_data['price'] | |
Buy_or_Sell = "" | |
if trade_quantity > 0: | |
Buy_or_Sell = f"<span class='buy-sell buy'>Buy</span>" | |
elif trade_quantity < 0: | |
Buy_or_Sell = f"<span class='buy-sell sell'>Sell</span>" | |
else: | |
Buy_or_Sell = f"<span></span>" | |
# highlight-sky ์ ์ฉ ์ฌ๋ถ ๊ฒฐ์ | |
trade_value_display = f"{format_value(trade_value)}" if trade_value != 0 else "" | |
trade_quantity_display = f"{format_quantity(trade_quantity)}" if trade_quantity != 0 else "" | |
new_value = current_value + trade_value | |
new_quantity = stock_data['quantity'] + trade_quantity | |
adjustments.append({ | |
'difference': difference, | |
'current_value': current_value, | |
'target_ratio': target_ratio, | |
'current_value_pct': current_value / total_value, | |
'trade_quantity': trade_quantity, | |
'stock_code': stock_code, | |
'price': stock_data['price'], | |
'new_value': new_value, | |
"Buy_or_Sell": Buy_or_Sell, | |
'trade_value': trade_value, | |
'old_quantity': stock_data['quantity'], | |
'new_quantity': new_quantity, | |
'target_weight': target_weight, | |
'currency': stock_data['currency'], | |
'new_value_pct': new_value / new_total_value, | |
'trade_value_display': trade_value_display, | |
'trade_quantity_display': trade_quantity_display | |
}) | |
# HTML ์์ฑ | |
rebalancing_analysis = css + f""" | |
<div> | |
<div class='table-container'> | |
<table> | |
<thead> | |
<tr> | |
<th colspan="1"></th> | |
<th colspan="2" class="table-header-bg-before">Your Current Portfolio (Before Trades)</th> | |
<th colspan="1"></th> | |
<th colspan="4" style='text-align: center'>Trades to Re-Balance Your Portfolio</th> | |
<th colspan="2" class="table-header-bg-after">Your Adjusted Portfolio (After Trades)</th> | |
</tr> | |
<tr> | |
<th>Stock Code</th> | |
<th class="table-header-bg-before">Total Value - {main_currency} {currency_symbol}</th> | |
<th class="table-header-bg-before">% Asset Allocation</th> | |
<th style='text-align: center !important;'>Your Target Asset Allocation %</th> | |
<th>Buy or Sell?</th> | |
<th>Trade Amount - {main_currency} {currency_symbol}</th> | |
<th>Current Price per Share - {main_currency} {currency_symbol}</th> | |
<th>Estimated # of Shares to Buy or Sell</th> | |
<th class="table-header-bg-after">Total Value - {main_currency} {currency_symbol}</th> | |
<th class="table-header-bg-after">% Asset Allocation</th> | |
</tr> | |
<tr style="font-weight: bold;"> | |
<td style='text-align: left !important;'>Total</td> | |
<td>{format_value(sum(adj['current_value'] for adj in adjustments))}</td> | |
<td>{sum(adj['current_value'] for adj in adjustments) / total_value * 100:.1f}%</td> | |
<td></td> | |
<td></td> | |
<td>{format_value(sum(adj['trade_value'] for adj in adjustments))}</td> | |
<td></td> | |
<td></td> | |
<td>{format_value(sum(adj['new_value'] for adj in adjustments))}</td> | |
<td>{sum(adj['new_value'] for adj in adjustments) / new_total_value * 100:.1f}%</td> | |
</tr> | |
</thead> | |
<tbody> | |
{''.join( | |
f"<tr>" | |
f"<td style='text-align: left !important;'>{adj['stock_code'].upper()}</td>" | |
f"<td>{format_value(adj['current_value'])}</td>" | |
f"<td>{adj['current_value_pct'] * 100:.1f}%</td>" | |
f"<td class='table-data-bg-edit' style='text-align: center !important;'>{adj['target_weight'] * 100:.0f}%</td>" | |
f"<td style='text-align: center !important;'>{adj['Buy_or_Sell']}</td>" | |
f"<td class='table-data-bg-sky'>{adj['trade_value_display']}</td>" | |
f"<td>{adj['price']:,.2f}</td>" | |
f"<td class='table-data-bg-sky'>{adj['trade_quantity_display']}</td>" | |
f"<td>{format_value(adj['new_value'])}</td>" | |
f"<td>{adj['new_value_pct'] * 100:.1f}%</td>" | |
f"</tr>" | |
for adj in adjustments | |
)} | |
</tbody> | |
</table> | |
</div> | |
<br> | |
</div> | |
""" | |
return rebalancing_analysis | |
def rebalancing_tool(main_currency, holdings, cash_amount, allow_selling): | |
try: | |
stock_inputs, cash_amount = parse_input(holdings, cash_amount) | |
portfolio, target_ratios, total_value = build_portfolio(stock_inputs, main_currency) | |
rebalancing_analysis = generate_rebalancing_analysis(portfolio, target_ratios, total_value, main_currency, cash_amount, allow_selling) | |
return rebalancing_analysis | |
except Exception as e: | |
return f"<p style='color: red;'>An error occurred: {e}</p>" | |