|
import math |
|
import FinanceDataReader as fdr |
|
import yfinance as yf |
|
from concurrent.futures import ThreadPoolExecutor |
|
from modules.utils import load_css, get_currency_symbol, format_quantity, plot_donut_chart, format_value, current_time |
|
from collections import defaultdict |
|
|
|
def parse_input(holdings, cash_amount): |
|
try: |
|
lines = holdings.strip().split(',') |
|
stock_inputs = [] |
|
total_target_ratio = 0 |
|
|
|
for line in lines: |
|
parts = line.split() |
|
if len(parts) == 4: |
|
stock_code, currency_code, quantity_expr, target_ratio_expr = parts |
|
quantity = float(eval(quantity_expr.replace(' ', ''))) |
|
if target_ratio_expr.strip() == '[]': |
|
target_ratio = 0 |
|
else: |
|
target_ratio = float(eval(target_ratio_expr.strip('[]').replace(' ', ''))) |
|
|
|
stock_inputs.append((currency_code.upper(), stock_code, quantity, target_ratio)) |
|
total_target_ratio += target_ratio |
|
|
|
return stock_inputs, cash_amount |
|
except Exception as e: |
|
raise ValueError(f"Input parsing error: {e}") |
|
|
|
def get_portfolio_exchange_rate(currency_code, main_currency): |
|
try: |
|
if currency_code.lower() == main_currency.lower(): |
|
return 1.0 |
|
|
|
ticker = f"{currency_code.upper()}{main_currency.upper()}=X" |
|
data = yf.download(ticker, period='1d', progress=False) |
|
if not data.empty: |
|
return data['Close'].iloc[0] |
|
else: |
|
raise ValueError("<p style='color: red;'>Failed to retrieve exchange rate data.</p>") |
|
except Exception as e: |
|
raise ValueError(f"<p style='color: red;'>Exchange rate retrieval error: {e}</p>") |
|
|
|
def get_portfolio_exchange_reflected_stock_price(stock_code, currency_code, main_currency): |
|
try: |
|
new_price = get_portfolio_current_stock_price(stock_code) |
|
exchange_rate = get_portfolio_exchange_rate(currency_code, main_currency) |
|
return new_price * exchange_rate |
|
except Exception as e: |
|
raise ValueError(f"<p style='color: red;'>Exchange reflected stock price error: {e}</p>") |
|
|
|
def get_portfolio_current_stock_price(stock_code): |
|
try: |
|
df = fdr.DataReader(stock_code) |
|
return df['Close'].iloc[-1] |
|
except Exception as e: |
|
raise ValueError(f"<p style='color: red;'>Current stock price retrieval error: {e}</p>") |
|
|
|
def set_default_ratios_if_zero(target_ratios): |
|
total_target_ratio = sum(target_ratios.values()) |
|
if total_target_ratio == 0: |
|
num_stocks = len(target_ratios) |
|
default_ratio = 1 / num_stocks |
|
return {stock_code: default_ratio for stock_code in target_ratios} |
|
return target_ratios |
|
|
|
def build_portfolio(stock_inputs, main_currency): |
|
portfolio = {} |
|
target_ratios = {} |
|
|
|
with ThreadPoolExecutor() as executor: |
|
results = list(executor.map(lambda x: (x[1], get_portfolio_exchange_reflected_stock_price(x[1], x[0], main_currency), x[2], x[3], x[0]), stock_inputs)) |
|
|
|
total_value = 0 |
|
for stock_code, new_price, quantity, target_ratio, currency_code in results: |
|
portfolio[stock_code] = {'quantity': quantity, 'price': new_price, 'currency': currency_code} |
|
target_ratios[stock_code] = target_ratio |
|
total_value += new_price * quantity |
|
|
|
target_ratios = set_default_ratios_if_zero(target_ratios) |
|
|
|
return portfolio, target_ratios, total_value |
|
|
|
def generate_portfolio_info(portfolio, total_value, main_currency): |
|
css = load_css() |
|
currency_symbol = get_currency_symbol(main_currency) |
|
|
|
|
|
holdings_totals = { |
|
stock_code: { |
|
'value': stock['price'] * stock['quantity'], |
|
'weight': (stock['price'] * stock['quantity'] / total_value) |
|
} |
|
for stock_code, stock in portfolio.items() |
|
} |
|
|
|
|
|
currency_totals = defaultdict(lambda: {'value': 0, 'weight': 0}) |
|
for stock in portfolio.values(): |
|
currency = stock['currency'] |
|
value = stock['price'] * stock['quantity'] |
|
currency_totals[currency]['value'] += value |
|
currency_totals[currency]['weight'] += value / total_value |
|
|
|
|
|
currunt_weights = {stock_code: details['weight'] for stock_code, details in holdings_totals.items()} |
|
currunt_weights_chart = plot_donut_chart(currunt_weights) |
|
|
|
currency_weights = {currency: details['weight'] for currency, details in currency_totals.items()} |
|
currency_weights_chart = plot_donut_chart(currency_weights) |
|
|
|
|
|
portfolio_info = css + f""" |
|
<div class="wrap-text"> |
|
<h3>Your Portfolio Holdings</h3> |
|
{currunt_weights_chart} |
|
<br> |
|
<div class='table-container wrap-text'> |
|
<table> |
|
<thead> |
|
<tr><th>Stock Code</th><th>Current Weight (%)</th><th>Current Value</th></tr> |
|
</thead> |
|
<tbody> |
|
{''.join( |
|
f"<tr><td>{stock_code.upper()}</td><td>{details['weight'] * 100:.1f}%</td><td>{currency_symbol}{format_value(details['value'])}</td></tr>" |
|
for stock_code, details in holdings_totals.items() |
|
)} |
|
</tbody> |
|
</table> |
|
</div> |
|
|
|
<br> |
|
</div> |
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return portfolio_info |
|
|
|
def generate_rebalancing_analysis(portfolio, target_ratios, total_value, main_currency, cash_amount, allow_selling): |
|
css = load_css() |
|
currency_symbol = get_currency_symbol(main_currency) |
|
adjustments = [] |
|
|
|
new_total_value = cash_amount + total_value |
|
total_target_ratio = sum(target_ratios.values()) |
|
|
|
target_ratios = set_default_ratios_if_zero(target_ratios) |
|
|
|
for stock_code, stock_data in portfolio.items(): |
|
current_value = stock_data['price'] * stock_data['quantity'] |
|
target_ratio = target_ratios.get(stock_code, 0) |
|
target_weight = target_ratio / total_target_ratio |
|
target_value = new_total_value * target_weight |
|
difference = target_value - current_value |
|
|
|
|
|
if not allow_selling and (difference < 0 or cash_amount < 0): |
|
trade_quantity = 0 |
|
else: |
|
trade_quantity = difference / stock_data['price'] |
|
|
|
if trade_quantity > 0 and not allow_selling: |
|
trade_quantity = min(trade_quantity, cash_amount / stock_data['price']) |
|
|
|
trade_value = trade_quantity * stock_data['price'] |
|
new_quantity = trade_quantity + stock_data['quantity'] |
|
new_value = new_quantity * stock_data['price'] |
|
|
|
if trade_value > 0: |
|
cash_amount -= trade_value |
|
else: |
|
cash_amount += abs(trade_value) |
|
|
|
|
|
Buy_or_Sell = "" |
|
if trade_quantity > 0: |
|
Buy_or_Sell = f"<span class='buy-sell buy'>Buy</span>" |
|
elif trade_quantity < 0: |
|
Buy_or_Sell = f"<span class='buy-sell sell'>Sell</span>" |
|
else: |
|
Buy_or_Sell = f"<span></span>" |
|
|
|
adjustments.append({ |
|
'difference': difference, |
|
'current_value': current_value, |
|
'target_ratio': target_ratio, |
|
'current_value_pct': current_value / total_value, |
|
'trade_quantity': trade_quantity, |
|
'stock_code': stock_code, |
|
'price': stock_data['price'], |
|
'new_value': new_value, |
|
"Buy_or_Sell": Buy_or_Sell, |
|
'trade_value': trade_value, |
|
'old_quantity': stock_data['quantity'], |
|
'new_quantity': new_quantity, |
|
'target_weight': target_weight, |
|
'currency': stock_data['currency'], |
|
'new_value_pct': new_value / new_total_value |
|
}) |
|
|
|
|
|
rebalancing_analysis = css + f""" |
|
<div class="wrap-text"> |
|
<div style="margin-bottom: 1.5rem;"> |
|
<div style="font-size: 1.5rem; margin-top: 1.5rem; margin-bottom: 1.5rem;">Re-Balancing Analysis | Your Portfolio Holdings as of {current_time}</div> |
|
<span style='font-size: 1.5rem; font-weight: bold; color: #1678fb'>{currency_symbol}{format_value(sum(adj['new_value'] for adj in adjustments))} </span> |
|
(After Trades) |
|
</div> |
|
<div class='table-container wrap-text'> |
|
<table> |
|
<thead> |
|
<tr> |
|
<th colspan="1"></th> |
|
<th colspan="2" class="header-bg-before">Your Current Portfolio (Before Trades)</th> |
|
<th colspan="1"></th> |
|
<th colspan="5" style='text-align: center'>Trades to Re-Balance Your Portfolio</th> |
|
<th colspan="2" class="header-bg-after">Your Adjusted Portfolio (After Trades)</th> |
|
</tr> |
|
<tr> |
|
<th>Stock Code</th> |
|
<th class="header-bg-before">Total Value - {main_currency} {currency_symbol}</th> |
|
<th class="header-bg-before">% Asset Allocation</th> |
|
<th>Your Target Asset Allocation %</th> |
|
<th>Buy or Sell?</th> |
|
<th>Trade Amount - {main_currency} {currency_symbol}</th> |
|
<th>Current Price per Share - {main_currency} {currency_symbol}</th> |
|
<th>Estimated # of Shares to Buy or Sell</th> |
|
<th>Shares Before and After</th> |
|
<th class="header-bg-after">Total Value - {main_currency} {currency_symbol}</th> |
|
<th class="header-bg-after">% Asset Allocation</th> |
|
</tr> |
|
<tr style="font-weight: bold;"> |
|
<td>Total</td> |
|
<td>{format_value(sum(adj['current_value'] for adj in adjustments))}</td> |
|
<td>{sum(adj['current_value'] for adj in adjustments) / total_value * 100:.1f}%</td> |
|
<td></td> |
|
<td></td> |
|
<td>{format_value(sum(adj['trade_value'] for adj in adjustments))}</td> |
|
<td></td> |
|
<td></td> |
|
<td></td> |
|
<td>{format_value(sum(adj['new_value'] for adj in adjustments))}</td> |
|
<td>{sum(adj['new_value'] for adj in adjustments) / new_total_value * 100:.1f}%</td> |
|
</tr> |
|
</thead> |
|
<tbody> |
|
{''.join( |
|
f"<tr>" |
|
f"<td>{adj['stock_code'].upper()}</td>" |
|
f"<td>{format_value(adj['current_value'])}</td>" |
|
f"<td>{adj['current_value_pct'] * 100:.1f}%</td>" |
|
f"<td><span class='highlight-edit'>{adj['target_weight'] * 100:.1f}%</span></td>" |
|
f"<td>{adj['Buy_or_Sell']}</td>" |
|
f"<td><span class='highlight-sky'>{format_value(adj['trade_value'])}</span></td>" |
|
f"<td>{adj['price']:,.2f}</td>" |
|
f"<td><span class='highlight-sky'>{format_quantity(adj['trade_quantity'])}</span></td>" |
|
f"<td>{format_quantity(adj['old_quantity'])} β {format_quantity(adj['new_quantity'])}</td>" |
|
f"<td>{format_value(adj['new_value'])}</td>" |
|
f"<td>{adj['new_value_pct'] * 100:.1f}%</td>" |
|
f"</tr>" |
|
for adj in adjustments |
|
)} |
|
</tbody> |
|
</table> |
|
</div> |
|
<br> |
|
|
|
</div> |
|
""" |
|
|
|
return rebalancing_analysis |
|
|
|
def rebalancing_tool(main_currency, holdings, cash_amount, allow_selling): |
|
try: |
|
stock_inputs, cash_amount = parse_input(holdings, cash_amount) |
|
portfolio, target_ratios, total_value = build_portfolio(stock_inputs, main_currency) |
|
portfolio_info = generate_portfolio_info(portfolio, total_value, main_currency) |
|
rebalancing_analysis = generate_rebalancing_analysis(portfolio, target_ratios, total_value, main_currency, cash_amount, allow_selling) |
|
|
|
return portfolio_info + rebalancing_analysis |
|
except Exception as e: |
|
return f"<p style='color: red;'>An error occurred: {e}</p>" |
|
|