tuneit_demo / modules /rebalancing.py
cryman38's picture
Upload rebalancing.py
7fd3b80 verified
import math
import FinanceDataReader as fdr
import yfinance as yf
from concurrent.futures import ThreadPoolExecutor
from modules.utils import load_css, get_currency_symbol, format_quantity, format_value, current_time
from collections import defaultdict
def parse_input(holdings, cash_amount):
try:
lines = holdings.strip().split(',')
stock_inputs = []
total_target_ratio = 0
# ์ข…๋ชฉ ์ •๋ณด๋ฅผ ์ €์žฅํ•  ๋”•์…”๋„ˆ๋ฆฌ
stock_dict = {}
for line in lines:
parts = line.split()
if len(parts) == 4:
stock_code, currency_code, quantity_expr, target_ratio_expr = parts
quantity = float(eval(quantity_expr.replace(' ', '')))
if target_ratio_expr.strip() == '[]':
target_ratio = 0
else:
target_ratio = float(eval(target_ratio_expr.strip('[]').replace(' ', '')))
# ์ข…๋ชฉ ์ฝ”๋“œ๊ฐ€ ์ด๋ฏธ ๋”•์…”๋„ˆ๋ฆฌ์— ์กด์žฌํ•˜๋Š” ๊ฒฝ์šฐ, ์ˆ˜๋Ÿ‰๊ณผ ๋ชฉํ‘œ ๋น„์œจ์„ ํ•ฉ์‚ฐ
if stock_code in stock_dict:
stock_dict[stock_code]['quantity'] += quantity
stock_dict[stock_code]['target_ratio'] += target_ratio
else:
stock_dict[stock_code] = {
'currency_code': currency_code.upper(),
'quantity': quantity,
'target_ratio': target_ratio
}
total_target_ratio += target_ratio
stock_inputs = [(details['currency_code'], stock_code, details['quantity'], details['target_ratio'])
for stock_code, details in stock_dict.items()]
return stock_inputs, cash_amount
except Exception as e:
raise ValueError(f"Input parsing error: {e}")
# def get_portfolio_exchange_rate(currency_code, main_currency):
# try:
# if main_currency.lower() == 'krw':
# if currency_code.upper() != 'KRW':
# fx_ticker = f"FRED:DEXKOUS" if currency_code.upper() == 'USD' else None
# if not fx_ticker:
# raise ValueError(f"<p style='color: red;'>Unsupported currency pair for KRW: {currency_code}</p>")
# exchange_rate_data = fdr.DataReader(fx_ticker)
# if not exchange_rate_data.empty:
# return exchange_rate_data['DEXKOUS'].iloc[-1]
# else:
# raise ValueError(f"<p style='color: red;'>Failed to retrieve exchange rate data for {currency_code} to KRW</p>")
# else:
# return 1.0 # KRW to KRW๋Š” ํ™˜์œจ์ด 1๋กœ ๊ณ ์ •๋จ
# else:
# if currency_code.lower() == main_currency.lower():
# return 1.0
# ticker = f"{currency_code.upper()}{main_currency.upper()}=X"
# data = yf.download(ticker, period='1d', progress=False)
# if not data.empty:
# return data['Close'].iloc[0]
# else:
# raise ValueError(f"<p style='color: red;'>Failed to retrieve exchange rate data for ticker: {ticker}</p>")
# except Exception as e:
# raise ValueError(f"<p style='color: red;'>Exchange rate retrieval error: {e}</p>")
import pandas_datareader.data as web
import datetime
def get_portfolio_exchange_rate(currency_code, main_currency):
try:
if main_currency.lower() == 'krw':
if currency_code.upper() != 'KRW':
if currency_code.upper() == 'USD':
fx_ticker = 'DEXKOUS'
else:
raise ValueError(f"<p style='color: red;'>Unsupported currency pair for KRW: {currency_code}</p>")
start = datetime.datetime.now() - datetime.timedelta(days=30)
end = datetime.datetime.now()
exchange_rate_data = web.DataReader(fx_ticker, 'fred', start, end)
if exchange_rate_data.empty:
raise ValueError(f"<p style='color: red;'>No data available for {fx_ticker}</p>")
return exchange_rate_data['DEXKOUS'].iloc[-1]
else:
return 1.0 # KRW to KRW๋Š” ํ™˜์œจ์ด 1๋กœ ๊ณ ์ •๋จ
else:
if currency_code.lower() == main_currency.lower():
return 1.0
ticker = f"{currency_code.upper()}{main_currency.upper()}=X"
data = yf.download(ticker, period='1d', progress=False)
if data.empty:
raise ValueError(f"<p style='color: red;'>No exchange rate data for ticker: {ticker}</p>")
return data['Close'].iloc[0]
except Exception as e:
raise ValueError(f"<p style='color: red;'>Exchange rate retrieval error: {e}</p>")
def get_portfolio_exchange_reflected_stock_price(stock_code, currency_code, main_currency):
try:
new_price = get_portfolio_current_stock_price(stock_code)
exchange_rate = get_portfolio_exchange_rate(currency_code, main_currency)
return new_price * exchange_rate
except Exception as e:
raise ValueError(f"<p style='color: red;'>Exchange reflected stock price error: {e}</p>")
def get_portfolio_current_stock_price(stock_code):
try:
# ํ•œ๊ตญ ์ฃผ์‹ ์ฝ”๋“œ๊ฐ€ ์ˆซ์ž๋งŒ ์žˆ๋Š”์ง€ ํ™•์ธ (ํ•œ๊ตญ ์ฃผ์‹์€ ๋ณดํ†ต ์ˆซ์ž ์ฝ”๋“œ)
if stock_code.isdigit():
df = fdr.DataReader(stock_code)
return df['Close'].iloc[-1]
else:
# ํ•œ๊ตญ ์™ธ ์ฃผ์‹์€ yfinance๋กœ ์กฐํšŒ
stock = yf.Ticker(stock_code)
df = stock.history(period="1d")
return df['Close'].iloc[-1]
except Exception as e:
raise ValueError(f"<p style='color: red;'>Current stock price retrieval error: {e}</p>")
def set_default_ratios_if_zero(target_ratios):
total_target_ratio = sum(target_ratios.values())
if total_target_ratio == 0:
num_stocks = len(target_ratios)
default_ratio = 1 / num_stocks
return {stock_code: default_ratio for stock_code in target_ratios}
return target_ratios
def build_portfolio(stock_inputs, main_currency):
portfolio = {}
target_ratios = {}
with ThreadPoolExecutor() as executor:
results = list(executor.map(lambda x: (x[1], get_portfolio_exchange_reflected_stock_price(x[1], x[0], main_currency), x[2], x[3], x[0]), stock_inputs))
total_value = 0
for stock_code, new_price, quantity, target_ratio, currency_code in results:
portfolio[stock_code] = {'quantity': quantity, 'price': new_price, 'currency': currency_code}
target_ratios[stock_code] = target_ratio
total_value += new_price * quantity
target_ratios = set_default_ratios_if_zero(target_ratios)
return portfolio, target_ratios, total_value
def generate_rebalancing_analysis(portfolio, target_ratios, total_value, main_currency, cash_amount, allow_selling):
css = load_css()
currency_symbol = get_currency_symbol(main_currency)
adjustments = []
new_total_value = cash_amount + total_value
total_target_ratio = sum(target_ratios.values())
target_ratios = set_default_ratios_if_zero(target_ratios)
for stock_code, stock_data in portfolio.items():
current_value = stock_data['price'] * stock_data['quantity']
target_ratio = target_ratios.get(stock_code, 0)
target_weight = target_ratio / total_target_ratio
target_value = new_total_value * target_weight
difference = target_value - current_value
# ๋งค๋„ ํ—ˆ์šฉ ์—ฌ๋ถ€์— ๋”ฐ๋ฅธ trade_value ๊ณ„์‚ฐ
if allow_selling:
trade_value = difference
else:
# ํ˜„๊ธˆ ๋ฐ ์ข…๋ชฉ๋ณ„ trade_value ๊ณ„์‚ฐ
positive_differences = [
new_total_value * (target_ratios.get(code, 0) / total_target_ratio) - (data['price'] * data['quantity'])
for code, data in portfolio.items()
]
total_positive_difference = sum(max(d, 0) for d in positive_differences)
if total_positive_difference > 0:
trade_value = max(difference, 0) * cash_amount / total_positive_difference
else:
trade_value = 0
# ๋งค์ˆ˜, ๋งค๋„ ์—ฌ๋ถ€ ๊ฒฐ์ •
trade_quantity = trade_value / stock_data['price']
Buy_or_Sell = ""
if trade_quantity > 0:
Buy_or_Sell = f"<span class='buy-sell buy'>Buy</span>"
elif trade_quantity < 0:
Buy_or_Sell = f"<span class='buy-sell sell'>Sell</span>"
else:
Buy_or_Sell = f"<span></span>"
# highlight-sky ์ ์šฉ ์—ฌ๋ถ€ ๊ฒฐ์ •
trade_value_display = f"{format_value(trade_value)}" if trade_value != 0 else ""
trade_quantity_display = f"{format_quantity(trade_quantity)}" if trade_quantity != 0 else ""
new_value = current_value + trade_value
new_quantity = stock_data['quantity'] + trade_quantity
adjustments.append({
'difference': difference,
'current_value': current_value,
'target_ratio': target_ratio,
'current_value_pct': current_value / total_value,
'trade_quantity': trade_quantity,
'stock_code': stock_code,
'price': stock_data['price'],
'new_value': new_value,
"Buy_or_Sell": Buy_or_Sell,
'trade_value': trade_value,
'old_quantity': stock_data['quantity'],
'new_quantity': new_quantity,
'target_weight': target_weight,
'currency': stock_data['currency'],
'new_value_pct': new_value / new_total_value,
'trade_value_display': trade_value_display,
'trade_quantity_display': trade_quantity_display
})
# HTML ์ƒ์„ฑ
rebalancing_analysis = css + f"""
<div>
<div class='table-container'>
<table>
<thead>
<tr>
<th colspan="1"></th>
<th colspan="2" class="table-header-bg-before">Your Current Portfolio (Before Trades)</th>
<th colspan="1"></th>
<th colspan="4" style='text-align: center'>Trades to Re-Balance Your Portfolio</th>
<th colspan="2" class="table-header-bg-after">Your Adjusted Portfolio (After Trades)</th>
</tr>
<tr>
<th>Stock Code</th>
<th class="table-header-bg-before">Total Value - {main_currency} {currency_symbol}</th>
<th class="table-header-bg-before">% Asset Allocation</th>
<th style='text-align: center !important;'>Your Target Asset Allocation %</th>
<th>Buy or Sell?</th>
<th>Trade Amount - {main_currency} {currency_symbol}</th>
<th>Current Price per Share - {main_currency} {currency_symbol}</th>
<th>Estimated # of Shares to Buy or Sell</th>
<th class="table-header-bg-after">Total Value - {main_currency} {currency_symbol}</th>
<th class="table-header-bg-after">% Asset Allocation</th>
</tr>
<tr style="font-weight: bold;">
<td style='text-align: left !important;'>Total</td>
<td>{format_value(sum(adj['current_value'] for adj in adjustments))}</td>
<td>{sum(adj['current_value'] for adj in adjustments) / total_value * 100:.1f}%</td>
<td></td>
<td></td>
<td>{format_value(sum(adj['trade_value'] for adj in adjustments))}</td>
<td></td>
<td></td>
<td>{format_value(sum(adj['new_value'] for adj in adjustments))}</td>
<td>{sum(adj['new_value'] for adj in adjustments) / new_total_value * 100:.1f}%</td>
</tr>
</thead>
<tbody>
{''.join(
f"<tr>"
f"<td style='text-align: left !important;'>{adj['stock_code'].upper()}</td>"
f"<td>{format_value(adj['current_value'])}</td>"
f"<td>{adj['current_value_pct'] * 100:.1f}%</td>"
f"<td class='table-data-bg-edit' style='text-align: center !important;'>{adj['target_weight'] * 100:.0f}%</td>"
f"<td style='text-align: center !important;'>{adj['Buy_or_Sell']}</td>"
f"<td class='table-data-bg-sky'>{adj['trade_value_display']}</td>"
f"<td>{adj['price']:,.2f}</td>"
f"<td class='table-data-bg-sky'>{adj['trade_quantity_display']}</td>"
f"<td>{format_value(adj['new_value'])}</td>"
f"<td>{adj['new_value_pct'] * 100:.1f}%</td>"
f"</tr>"
for adj in adjustments
)}
</tbody>
</table>
</div>
<br>
</div>
"""
return rebalancing_analysis
def rebalancing_tool(main_currency, holdings, cash_amount, allow_selling):
try:
stock_inputs, cash_amount = parse_input(holdings, cash_amount)
portfolio, target_ratios, total_value = build_portfolio(stock_inputs, main_currency)
rebalancing_analysis = generate_rebalancing_analysis(portfolio, target_ratios, total_value, main_currency, cash_amount, allow_selling)
return rebalancing_analysis
except Exception as e:
return f"<p style='color: red;'>An error occurred: {e}</p>"