cryman38's picture
Upload 16 files
48b95e7 verified
import math
import FinanceDataReader as fdr
import yfinance as yf
from concurrent.futures import ThreadPoolExecutor
from modules.utils import load_css, get_currency_symbol, format_quantity, plot_donut_chart, format_value, current_time
from collections import defaultdict
def parse_input(holdings, cash_amount):
try:
lines = holdings.strip().split(',')
stock_inputs = []
total_target_ratio = 0
for line in lines:
parts = line.split()
if len(parts) == 4:
stock_code, currency_code, quantity_expr, target_ratio_expr = parts
quantity = float(eval(quantity_expr.replace(' ', '')))
if target_ratio_expr.strip() == '[]':
target_ratio = 0
else:
target_ratio = float(eval(target_ratio_expr.strip('[]').replace(' ', '')))
stock_inputs.append((currency_code.upper(), stock_code, quantity, target_ratio))
total_target_ratio += target_ratio
return stock_inputs, cash_amount
except Exception as e:
raise ValueError(f"Input parsing error: {e}")
def get_portfolio_exchange_rate(currency_code, main_currency):
try:
if currency_code.lower() == main_currency.lower():
return 1.0
ticker = f"{currency_code.upper()}{main_currency.upper()}=X"
data = yf.download(ticker, period='1d', progress=False)
if not data.empty:
return data['Close'].iloc[0]
else:
raise ValueError("<p style='color: red;'>Failed to retrieve exchange rate data.</p>")
except Exception as e:
raise ValueError(f"<p style='color: red;'>Exchange rate retrieval error: {e}</p>")
def get_portfolio_exchange_reflected_stock_price(stock_code, currency_code, main_currency):
try:
new_price = get_portfolio_current_stock_price(stock_code)
exchange_rate = get_portfolio_exchange_rate(currency_code, main_currency)
return new_price * exchange_rate
except Exception as e:
raise ValueError(f"<p style='color: red;'>Exchange reflected stock price error: {e}</p>")
def get_portfolio_current_stock_price(stock_code):
try:
df = fdr.DataReader(stock_code)
return df['Close'].iloc[-1]
except Exception as e:
raise ValueError(f"<p style='color: red;'>Current stock price retrieval error: {e}</p>")
def set_default_ratios_if_zero(target_ratios):
total_target_ratio = sum(target_ratios.values())
if total_target_ratio == 0:
num_stocks = len(target_ratios)
default_ratio = 1 / num_stocks
return {stock_code: default_ratio for stock_code in target_ratios}
return target_ratios
def build_portfolio(stock_inputs, main_currency):
portfolio = {}
target_ratios = {}
with ThreadPoolExecutor() as executor:
results = list(executor.map(lambda x: (x[1], get_portfolio_exchange_reflected_stock_price(x[1], x[0], main_currency), x[2], x[3], x[0]), stock_inputs))
total_value = 0
for stock_code, new_price, quantity, target_ratio, currency_code in results:
portfolio[stock_code] = {'quantity': quantity, 'price': new_price, 'currency': currency_code}
target_ratios[stock_code] = target_ratio
total_value += new_price * quantity
target_ratios = set_default_ratios_if_zero(target_ratios)
return portfolio, target_ratios, total_value
def generate_portfolio_info(portfolio, total_value, main_currency):
css = load_css()
currency_symbol = get_currency_symbol(main_currency)
# 보유 μ’…λͺ©λ³„ 총앑을 κ³„μ‚°ν•©λ‹ˆλ‹€.
holdings_totals = {
stock_code: {
'value': stock['price'] * stock['quantity'],
'weight': (stock['price'] * stock['quantity'] / total_value)
}
for stock_code, stock in portfolio.items()
}
# 톡화별 총앑을 κ³„μ‚°ν•©λ‹ˆλ‹€.
currency_totals = defaultdict(lambda: {'value': 0, 'weight': 0})
for stock in portfolio.values():
currency = stock['currency']
value = stock['price'] * stock['quantity']
currency_totals[currency]['value'] += value
currency_totals[currency]['weight'] += value / total_value
# ν˜„μž¬ 비쀑을 μ‚¬μš©ν•˜μ—¬ 포트폴리였 트리맡 차트λ₯Ό μƒμ„±ν•©λ‹ˆλ‹€.
currunt_weights = {stock_code: details['weight'] for stock_code, details in holdings_totals.items()}
currunt_weights_chart = plot_donut_chart(currunt_weights)
currency_weights = {currency: details['weight'] for currency, details in currency_totals.items()}
currency_weights_chart = plot_donut_chart(currency_weights)
# HTML 생성
portfolio_info = css + f"""
<div class="wrap-text">
<h3>Your Portfolio Holdings</h3>
{currunt_weights_chart}
<br>
<div class='table-container wrap-text'>
<table>
<thead>
<tr><th>Stock Code</th><th>Current Weight (%)</th><th>Current Value</th></tr>
</thead>
<tbody>
{''.join(
f"<tr><td>{stock_code.upper()}</td><td>{details['weight'] * 100:.1f}%</td><td>{currency_symbol}{format_value(details['value'])}</td></tr>"
for stock_code, details in holdings_totals.items()
)}
</tbody>
</table>
</div>
<br>
</div>
"""
# <br>
# <h3>Your Portfolio by Currency</h3>
# {currency_weights_chart}
# <br>
# <div class='table-container wrap-text'>
# <table>
# <thead>
# <tr><th>Currency</th><th>Total Weight (%)</th><th>Total Value</th></tr>
# </thead>
# <tbody>
# {''.join(
# f"<tr><td>{currency.upper()}</td><td>{details['weight']* 100:.1f}%</td><td>{currency_symbol}{format_value(details['value'])}</td></tr>"
# for currency, details in currency_totals.items()
# )}
# </tbody>
# </table>
# </div>
return portfolio_info
def generate_rebalancing_analysis(portfolio, target_ratios, total_value, main_currency, cash_amount, allow_selling):
css = load_css()
currency_symbol = get_currency_symbol(main_currency)
adjustments = []
new_total_value = cash_amount + total_value
total_target_ratio = sum(target_ratios.values())
target_ratios = set_default_ratios_if_zero(target_ratios)
for stock_code, stock_data in portfolio.items():
current_value = stock_data['price'] * stock_data['quantity']
target_ratio = target_ratios.get(stock_code, 0)
target_weight = target_ratio / total_target_ratio
target_value = new_total_value * target_weight
difference = target_value - current_value
# Allow selling이 false이고 ν˜„κΈˆμ΄ 음수인 경우 맀도λ₯Ό λ°©μ§€
if not allow_selling and (difference < 0 or cash_amount < 0):
trade_quantity = 0
else:
trade_quantity = difference / stock_data['price']
if trade_quantity > 0 and not allow_selling:
trade_quantity = min(trade_quantity, cash_amount / stock_data['price'])
trade_value = trade_quantity * stock_data['price']
new_quantity = trade_quantity + stock_data['quantity']
new_value = new_quantity * stock_data['price']
if trade_value > 0:
cash_amount -= trade_value
else:
cash_amount += abs(trade_value)
# 각 ν•­λͺ©λ³„ Buy_or_Sell 값을 루프 μ•ˆμ—μ„œ μ •μ˜
Buy_or_Sell = ""
if trade_quantity > 0:
Buy_or_Sell = f"<span class='buy-sell buy'>Buy</span>"
elif trade_quantity < 0:
Buy_or_Sell = f"<span class='buy-sell sell'>Sell</span>"
else:
Buy_or_Sell = f"<span></span>"
adjustments.append({
'difference': difference,
'current_value': current_value,
'target_ratio': target_ratio,
'current_value_pct': current_value / total_value,
'trade_quantity': trade_quantity,
'stock_code': stock_code,
'price': stock_data['price'],
'new_value': new_value,
"Buy_or_Sell": Buy_or_Sell,
'trade_value': trade_value,
'old_quantity': stock_data['quantity'],
'new_quantity': new_quantity,
'target_weight': target_weight,
'currency': stock_data['currency'],
'new_value_pct': new_value / new_total_value
})
# HTML 생성
rebalancing_analysis = css + f"""
<div class="wrap-text">
<div style="margin-bottom: 1.5rem;">
<div style="font-size: 1.5rem; margin-top: 1.5rem; margin-bottom: 1.5rem;">Re-Balancing Analysis | Your Portfolio Holdings as of {current_time}</div>
<span style='font-size: 1.5rem; font-weight: bold; color: #1678fb'>{currency_symbol}{format_value(sum(adj['new_value'] for adj in adjustments))} </span>
(After Trades)
</div>
<div class='table-container wrap-text'>
<table>
<thead>
<tr>
<th colspan="1"></th>
<th colspan="2" class="header-bg-before">Your Current Portfolio (Before Trades)</th>
<th colspan="1"></th>
<th colspan="5" style='text-align: center'>Trades to Re-Balance Your Portfolio</th>
<th colspan="2" class="header-bg-after">Your Adjusted Portfolio (After Trades)</th>
</tr>
<tr>
<th>Stock Code</th>
<th class="header-bg-before">Total Value - {main_currency} {currency_symbol}</th>
<th class="header-bg-before">% Asset Allocation</th>
<th>Your Target Asset Allocation %</th>
<th>Buy or Sell?</th>
<th>Trade Amount - {main_currency} {currency_symbol}</th>
<th>Current Price per Share - {main_currency} {currency_symbol}</th>
<th>Estimated # of Shares to Buy or Sell</th>
<th>Shares Before and After</th>
<th class="header-bg-after">Total Value - {main_currency} {currency_symbol}</th>
<th class="header-bg-after">% Asset Allocation</th>
</tr>
<tr style="font-weight: bold;">
<td>Total</td>
<td>{format_value(sum(adj['current_value'] for adj in adjustments))}</td>
<td>{sum(adj['current_value'] for adj in adjustments) / total_value * 100:.1f}%</td>
<td></td>
<td></td>
<td>{format_value(sum(adj['trade_value'] for adj in adjustments))}</td>
<td></td>
<td></td>
<td></td>
<td>{format_value(sum(adj['new_value'] for adj in adjustments))}</td>
<td>{sum(adj['new_value'] for adj in adjustments) / new_total_value * 100:.1f}%</td>
</tr>
</thead>
<tbody>
{''.join(
f"<tr>"
f"<td>{adj['stock_code'].upper()}</td>"
f"<td>{format_value(adj['current_value'])}</td>"
f"<td>{adj['current_value_pct'] * 100:.1f}%</td>"
f"<td><span class='highlight-edit'>{adj['target_weight'] * 100:.1f}%</span></td>"
f"<td>{adj['Buy_or_Sell']}</td>"
f"<td><span class='highlight-sky'>{format_value(adj['trade_value'])}</span></td>"
f"<td>{adj['price']:,.2f}</td>"
f"<td><span class='highlight-sky'>{format_quantity(adj['trade_quantity'])}</span></td>"
f"<td>{format_quantity(adj['old_quantity'])} β†’ {format_quantity(adj['new_quantity'])}</td>"
f"<td>{format_value(adj['new_value'])}</td>"
f"<td>{adj['new_value_pct'] * 100:.1f}%</td>"
f"</tr>"
for adj in adjustments
)}
</tbody>
</table>
</div>
<br>
</div>
"""
return rebalancing_analysis
def rebalancing_tool(main_currency, holdings, cash_amount, allow_selling):
try:
stock_inputs, cash_amount = parse_input(holdings, cash_amount)
portfolio, target_ratios, total_value = build_portfolio(stock_inputs, main_currency)
portfolio_info = generate_portfolio_info(portfolio, total_value, main_currency)
rebalancing_analysis = generate_rebalancing_analysis(portfolio, target_ratios, total_value, main_currency, cash_amount, allow_selling)
return portfolio_info + rebalancing_analysis
except Exception as e:
return f"<p style='color: red;'>An error occurred: {e}</p>"