Spaces:
Sleeping
Sleeping
| import requests | |
| import json | |
| import datetime | |
| import random | |
| API_URL = "http://127.0.0.1:5000/api/wearable-sync" | |
| USER_ID = "user_-8176948" # Valid user from DB | |
| def sync_data(): | |
| now = datetime.datetime.now() | |
| yesterday = now - datetime.timedelta(days=1) | |
| # π Scenario: Poor Sleep but High Activity | |
| # The Sleep Agent should VETO the Fitness plan | |
| metrics = [ | |
| # Sleep Data (Use different times for "segments" or one big chunk) | |
| { | |
| "type": "SLEEP_MINUTES", | |
| "value": 320, # 5 hours 20 mins (Low sleep!) | |
| "unit": "minutes", | |
| "timestamp": yesterday.replace(hour=6, minute=30).isoformat(), | |
| "source": "google_fit_mock" | |
| }, | |
| # Step Data | |
| { | |
| "type": "STEPS", | |
| "value": 2500, | |
| "unit": "count", | |
| "timestamp": now.replace(hour=14, minute=0).isoformat(), | |
| "source": "google_fit_mock" | |
| }, | |
| # Heart Rate (Slightly elevated due to stress/lack of sleep?) | |
| { | |
| "type": "HEART_RATE", | |
| "value": 85, | |
| "unit": "bpm", | |
| "timestamp": now.replace(hour=10, minute=0).isoformat(), | |
| "source": "apple_health_mock" | |
| } | |
| ] | |
| payload = { | |
| "userId": USER_ID, | |
| "metrics": metrics | |
| } | |
| print(f"π‘ Syncing data to {API_URL}...") | |
| try: | |
| response = requests.post(API_URL, json=payload) | |
| if response.status_code == 200: | |
| print("β Sync Success!") | |
| print(json.dumps(response.json(), indent=2)) | |
| print("\nπ Now go to the 'Strategy Dashboard' in Streamlit.") | |
| print("π The Sleep Agent should see this 5.3h sleep and trigger a 'Recovery' protocol.") | |
| else: | |
| print(f"β Error {response.status_code}: {response.text}") | |
| except Exception as e: | |
| print(f"β Connection Failed: {e}") | |
| if __name__ == "__main__": | |
| sync_data() | |