wellsync-api / simulate_mobile_sync.py
Xombi17's picture
Deploy: WellSync API with Wearable Integration
08a7e10
import requests
import json
import datetime
import random
API_URL = "http://127.0.0.1:5000/api/wearable-sync"
USER_ID = "user_-8176948" # Valid user from DB
def sync_data():
now = datetime.datetime.now()
yesterday = now - datetime.timedelta(days=1)
# 🌟 Scenario: Poor Sleep but High Activity
# The Sleep Agent should VETO the Fitness plan
metrics = [
# Sleep Data (Use different times for "segments" or one big chunk)
{
"type": "SLEEP_MINUTES",
"value": 320, # 5 hours 20 mins (Low sleep!)
"unit": "minutes",
"timestamp": yesterday.replace(hour=6, minute=30).isoformat(),
"source": "google_fit_mock"
},
# Step Data
{
"type": "STEPS",
"value": 2500,
"unit": "count",
"timestamp": now.replace(hour=14, minute=0).isoformat(),
"source": "google_fit_mock"
},
# Heart Rate (Slightly elevated due to stress/lack of sleep?)
{
"type": "HEART_RATE",
"value": 85,
"unit": "bpm",
"timestamp": now.replace(hour=10, minute=0).isoformat(),
"source": "apple_health_mock"
}
]
payload = {
"userId": USER_ID,
"metrics": metrics
}
print(f"πŸ“‘ Syncing data to {API_URL}...")
try:
response = requests.post(API_URL, json=payload)
if response.status_code == 200:
print("βœ… Sync Success!")
print(json.dumps(response.json(), indent=2))
print("\nπŸ‘‰ Now go to the 'Strategy Dashboard' in Streamlit.")
print("πŸ‘‰ The Sleep Agent should see this 5.3h sleep and trigger a 'Recovery' protocol.")
else:
print(f"❌ Error {response.status_code}: {response.text}")
except Exception as e:
print(f"❌ Connection Failed: {e}")
if __name__ == "__main__":
sync_data()