Image / app.py
Dmtlant's picture
Update app.py
4615036 verified
raw
history blame
3.16 kB
import asyncio
import websockets
import json
import matplotlib.pyplot as plt
import pandas as pd
from datetime import datetime
import streamlit as st
# Список криптовалют для отслеживания
cryptos = ['BTCUSDT', 'ETHUSDT', 'LTCUSDT']
# Словарь для хранения данных
data = {crypto: {'prices': [], 'times': []} for crypto in cryptos}
# Инициализация Streamlit
st.title('Crypto Advisor')
# Создание пустых мест для графиков и советов
charts = {crypto: st.empty() for crypto in cryptos}
advice_boxes = {crypto: st.empty() for crypto in cryptos}
# Функция для подключения к веб-сокету Binance и получения данных
async def get_crypto_data():
uri = 'wss://stream.binance.com:9443/stream'
stream = '/'.join([f'{crypto.lower()}@trade' for crypto in cryptos])
url = f'{uri}?streams={stream}'
async with websockets.connect(url) as websocket:
while True:
response = await websocket.recv()
data_dict = json.loads(response)['data']
symbol = data_dict['s']
price = float(data_dict['p'])
time = datetime.fromtimestamp(data_dict['T'] / 1000)
if symbol in data:
# Обновление данных
data[symbol]['prices'].append(price)
data[symbol]['times'].append(time)
# Ограничение длины списка до 100 последних значений
if len(data[symbol]['prices']) > 100:
data[symbol]['prices'].pop(0)
data[symbol]['times'].pop(0)
# Обновление графика и советов
update_chart(symbol)
give_advice(symbol, price)
# Функция для обновления графика
def update_chart(symbol):
fig, ax = plt.subplots()
ax.plot(data[symbol]['times'], data[symbol]['prices'], label=symbol)
ax.set_xlabel('Time')
ax.set_ylabel('Price (USDT)')
ax.set_title(f'{symbol} Price')
ax.legend()
charts[symbol].pyplot(fig)
# Функция для выдачи советов
def give_advice(symbol, price):
if len(data[symbol]['prices']) < 2:
advice = f'{symbol}: Not enough data yet.'
else:
last_price = data[symbol]['prices'][-2]
current_price = price
if current_price > last_price:
advice = f'{symbol}: Price is increasing. Consider buying.'
elif current_price < last_price:
advice = f'{symbol}: Price is decreasing. Consider selling.'
else:
advice = f'{symbol}: Price is stable. Hold for now.'
advice_boxes[symbol].text(advice)
# Функция для запуска асинхронного цикла
async def main():
st.write('Connecting to Binance...')
st.write('Please wait for the data to start streaming.')
task = asyncio.create_task(get_crypto_data())
await task
# Запуск приложения
if __name__ == '__main__':
asyncio.run(main())