Spaces:
Sleeping
Sleeping
import re | |
import requests | |
import pandas as pd | |
import yfinance as yf | |
import concurrent.futures | |
from time import time | |
from datetime import datetime | |
from typing import List | |
from bs4 import BeautifulSoup | |
from logger import logger | |
from openai import AzureOpenAI | |
from langchain.tools import tool | |
from langchain_core.utils.function_calling import convert_to_openai_tool, convert_to_openai_function | |
from config import config | |
from azure.core.credentials import AzureKeyCredential | |
from azure.search.documents import SearchClient | |
from azure.search.documents.models import VectorizedQuery | |
az_creds = AzureKeyCredential(config.az_search_api_key) | |
az_search_client = SearchClient(config.az_search_endpoint, config.az_search_idx_name, az_creds) | |
openai_client = AzureOpenAI( | |
azure_endpoint=config.azure_openai_endpoint, | |
api_key=config.azure_openai_api_key, | |
api_version="2024-02-01" | |
) | |
def get_analysis(query: str) -> dict: | |
""" | |
Full text search through your database of company and crypto analysis, retrieves top 4 | |
pieces of analysis relevant to your query. You MUST ALWAYS use this function | |
to help form your opinions and predictions. | |
Args: | |
query (str): The search query | |
Returns: | |
list: A list of dictionaries containing the pieces of analysis. | |
""" | |
start_time = time() | |
embed_model = "default-large-embeddings" | |
vec = openai_client.embeddings.create(input=[query], model=embed_model).data[0].embedding | |
vector_query = VectorizedQuery(vector=vec, k_nearest_neighbors=config.az_search_top_k * 2, fields="vector") | |
results = az_search_client.search( | |
search_text="*", | |
vector_queries=[vector_query], | |
select=["date", "popularity", "sequence", "context", "securities"], | |
order_by=["securities desc", "date desc", "popularity desc"], | |
top=config.az_search_top_k, | |
) | |
output = [] | |
for x in results: | |
output.append({ | |
"securities": x["securities"], | |
"date written": x["date"].split("T")[0], | |
"summary": x["context"], | |
"content": x["sequence"], | |
}) | |
print(f"Search took {time() - start_time:.2f} seconds\n---") | |
return output | |
def google_search_and_scrape(query: str) -> dict: | |
""" | |
Performs a Google search for the given query, retrieves the top search result URLs, | |
and scrapes the text content and table data from those pages in parallel. | |
Args: | |
query (str): The search query. | |
Returns: | |
list: A list of dictionaries containing the URL, text content, and table data for each scraped page. | |
""" | |
num_results = 2 | |
url = 'https://www.google.com/search' | |
params = {'q': query, 'num': num_results} | |
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.61 Safari/537.3'} | |
logger.info(f"Performing google search with query: {query}\nplease wait...") | |
response = requests.get(url, params=params, headers=headers) | |
soup = BeautifulSoup(response.text, 'html.parser') | |
urls = [result.find('a')['href'] for result in soup.find_all('div', class_='tF2Cxc')] | |
logger.info(f"Scraping text from urls, please wait...") | |
[logger.info(url) for url in urls] | |
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: | |
futures = [executor.submit(lambda url: (url, requests.get(url, headers=headers).text if isinstance(url, str) else None), url) for url in urls[:num_results] if isinstance(url, str)] | |
results = [] | |
for future in concurrent.futures.as_completed(futures): | |
url, html = future.result() | |
soup = BeautifulSoup(html, 'html.parser') | |
paragraphs = [p.text.strip() for p in soup.find_all('p') if p.text.strip()] | |
text_content = ' '.join(paragraphs) | |
text_content = re.sub(r'\s+', ' ', text_content) | |
table_data = [[cell.get_text(strip=True) for cell in row.find_all('td')] for table in soup.find_all('table') for row in table.find_all('tr')] | |
if text_content or table_data: | |
results.append({'url': url, 'content': text_content, 'tables': table_data}) | |
return results | |
def get_current_stock_price(symbol: str) -> float: | |
""" | |
Get the current stock price for a given symbol. | |
Args: | |
symbol (str): The stock symbol. | |
Returns: | |
float: The current stock price, or None if an error occurs. | |
""" | |
try: | |
# config.status.update(label=":chart_with_upwards_trend: Getting price") | |
stock = yf.Ticker(symbol) | |
# Use "regularMarketPrice" for regular market hours, or "currentPrice" for pre/post market | |
current_price = stock.info.get("regularMarketPrice", stock.info.get("currentPrice")) | |
return current_price if current_price else None | |
except Exception as e: | |
print(f"Error fetching current price for {symbol}: {e}") | |
return None | |
def get_stock_fundamentals(symbol: str) -> dict: | |
""" | |
Get fundamental data for a given stock symbol using yfinance API. | |
Args: | |
symbol (str): The stock symbol. | |
Returns: | |
dict: A dictionary containing fundamental data. | |
Keys: | |
- 'symbol': The stock symbol. | |
- 'company_name': The long name of the company. | |
- 'sector': The sector to which the company belongs. | |
- 'industry': The industry to which the company belongs. | |
- 'market_cap': The market capitalization of the company. | |
- 'pe_ratio': The forward price-to-earnings ratio. | |
- 'pb_ratio': The price-to-book ratio. | |
- 'dividend_yield': The dividend yield. | |
- 'eps': The trailing earnings per share. | |
- 'beta': The beta value of the stock. | |
- '52_week_high': The 52-week high price of the stock. | |
- '52_week_low': The 52-week low price of the stock. | |
""" | |
try: | |
stock = yf.Ticker(symbol) | |
info = stock.info | |
fundamentals = { | |
'symbol': symbol, | |
'company_name': info.get('longName', ''), | |
'sector': info.get('sector', ''), | |
'industry': info.get('industry', ''), | |
'market_cap': info.get('marketCap', None), | |
'pe_ratio': info.get('forwardPE', None), | |
'pb_ratio': info.get('priceToBook', None), | |
'dividend_yield': info.get('dividendYield', None), | |
'eps': info.get('trailingEps', None), | |
'beta': info.get('beta', None), | |
'52_week_high': info.get('fiftyTwoWeekHigh', None), | |
'52_week_low': info.get('fiftyTwoWeekLow', None) | |
} | |
return fundamentals | |
except Exception as e: | |
print(f"Error getting fundamentals for {symbol}: {e}") | |
return {} | |
def get_financial_statements(symbol: str) -> dict: | |
""" | |
Get financial statements for a given stock symbol. | |
Args: | |
symbol (str): The stock symbol. | |
Returns: | |
dict: Dictionary containing financial statements (income statement, balance sheet, cash flow statement). | |
""" | |
try: | |
stock = yf.Ticker(symbol) | |
financials = stock.financials | |
return financials | |
except Exception as e: | |
print(f"Error fetching financial statements for {symbol}: {e}") | |
return {} | |
def get_key_financial_ratios(symbol: str) -> dict: | |
""" | |
Get key financial ratios for a given stock symbol. | |
Args: | |
symbol (str): The stock symbol. | |
Returns: | |
dict: Dictionary containing key financial ratios. | |
""" | |
try: | |
# config.status.update(label=":chart_with_upwards_trend: Gathering financial data") | |
stock = yf.Ticker(symbol) | |
key_ratios = stock.info | |
return key_ratios | |
except Exception as e: | |
print(f"Error fetching key financial ratios for {symbol}: {e}") | |
return {} | |
def get_dividend_data(symbol: str) -> pd.DataFrame: | |
""" | |
Get dividend data for a given stock symbol. | |
Args: | |
symbol (str): The stock symbol. | |
Returns: | |
pd.DataFrame: DataFrame containing dividend data. | |
""" | |
try: | |
stock = yf.Ticker(symbol) | |
dividends = stock.dividends | |
return dividends | |
except Exception as e: | |
print(f"Error fetching dividend data for {symbol}: {e}") | |
return pd.DataFrame() | |
def get_company_news(symbol: str) -> pd.DataFrame: | |
""" | |
Get company news and press releases for a given stock symbol. | |
If you use this, you must use the articles to backup your analysis. | |
Args: | |
symbol (str): The stock symbol. | |
Returns: | |
pd.DataFrame: DataFrame containing company news and press releases. | |
""" | |
# config.status.update(label=":newspaper: Getting news") | |
try: | |
news = yf.Ticker(symbol).news | |
return news | |
except Exception as e: | |
print(f"Error fetching company news for {symbol}: {e}") | |
return pd.DataFrame() | |
def get_technical_indicators(symbol: str) -> pd.DataFrame: | |
""" | |
Get technical indicators for a given stock symbol. | |
Args: | |
symbol (str): The stock symbol. | |
Returns: | |
pd.DataFrame: DataFrame containing technical indicators. | |
""" | |
try: | |
indicators = yf.Ticker(symbol).history(period="max") | |
return indicators | |
except Exception as e: | |
print(f"Error fetching technical indicators for {symbol}: {e}") | |
return pd.DataFrame() | |
def get_company_profile(symbol: str) -> dict: | |
""" | |
Get company profile and overview for a given stock symbol. | |
Args: | |
symbol (str): The stock symbol. | |
Returns: | |
dict: Dictionary containing company profile and overview. | |
""" | |
try: | |
profile = yf.Ticker(symbol).info | |
return profile | |
except Exception as e: | |
print(f"Error fetching company profile for {symbol}: {e}") | |
return {} | |
tools = [ | |
get_analysis, | |
# google_search_and_scrape, | |
get_current_stock_price, | |
# get_company_news, | |
# get_company_profile, | |
# get_stock_fundamentals, | |
# get_financial_statements, | |
get_key_financial_ratios, | |
# get_analyst_recommendations, | |
# get_dividend_data, | |
# get_technical_indicators | |
] | |
def get_openai_tools() -> List[dict]: | |
tools_ = [convert_to_openai_tool(f) for f in tools] | |
return tools_ | |
def get_openai_functions() -> List[str]: | |
functions = [convert_to_openai_function(f) for f in tools] | |
return functions | |