ShelterSearch / app.py
KeshavRa's picture
Update app.py
cd86141 verified
import streamlit as st
import json
import pandas as pd
import requests
import os
import math
from openai import OpenAI
import folium
from streamlit_folium import folium_static
from twilio.rest import Client
from datetime import datetime
from datetime import time
from zoneinfo import ZoneInfo
timezone = ZoneInfo('America/Los_Angeles')
def get_time_score(current_datetime, shelter):
current_day = current_datetime.strftime("%A")
if current_day not in shelter['Days']:
return 1
weekday = current_datetime.weekday()
current_hour = current_datetime.strftime("%H")
current_minute = current_datetime.strftime("%M")
current_time = time(int(current_hour), int(current_minute))
hour_start = shelter['Hour Start'].split(',')
minute_start = shelter['Minute Start'].split(',')
shelter_start = time(int(hour_start[weekday]), int(minute_start[weekday]))
hour_end = shelter['Hour End'].split(',')
minute_end = shelter['Minute End'].split(',')
shelter_end = time(int(hour_end[weekday]), int(minute_end[weekday]))
if shelter_start < shelter_end:
if shelter_start <= current_time <= shelter_end: return 0
else: return 1
else:
if current_time >= shelter_start or current_time <= shelter_end: return 0
else: return 1
def geocode_address(address, api_key):
# URL encode the address
encoded_address = requests.utils.quote(address)
# Send a request to the Google Maps Geocoding API
geocode_url = f"https://maps.googleapis.com/maps/api/geocode/json?address={encoded_address}&key={api_key}"
response = requests.get(geocode_url)
data = response.json()
lat = data['results'][0]['geometry']['location']['lat']
lon = data['results'][0]['geometry']['location']['lng']
return round(lat, 6), round(lon, 6)
# Reference: https://github.com/sfc38/Google-Maps-API-Streamlit-App/blob/master/google_maps_app.py#L126-L135
def create_map():
# Create the map with Google Maps
map_obj = folium.Map(tiles=None)
folium.TileLayer("https://{s}.google.com/vt/lyrs=m&x={x}&y={y}&z={z}",
attr="google",
name="Google Maps",
overlay=True,
control=True,
subdomains=["mt0", "mt1", "mt2", "mt3"]).add_to(map_obj)
return map_obj
def call_gpt(user_needs, shelter_services, api_key):
client = OpenAI(api_key = api_key)
completion = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "Given two variables 'user needs' (the ideal qualities/services of a shelter) and 'shelter services' (the services offered by a shelter), return an integer 0-10 that scores how well the 'shelter services' match the 'user needs' where 0 is the best fit and 10 is the worst fit. IMPORTANT: NO MATTER WHAT, ONLY RETURN THE INTEGER (NO EXTRA WORDS, PUNCTUATION, ETC.)"},
{"role": "user", "content": f"user_needs: {user_needs}, shelter_services: {shelter_services}"}
]
)
score = completion.choices[0].message.content.strip()
return int(score)
def get_urgency_score(user, shelter):
if user == "Today":
if shelter == "Immidiate": return 0
if shelter == "High": return 0.75
if shelter == "Moderate": return 1
elif user == "In the next few days":
if shelter == "Immidiate": return 0.25
if shelter == "High": return 0
if shelter == "Moderate": return 0.75
elif user == "In a week or more":
if shelter == "Immidiate": return 0.75
if shelter == "High": return 0.25
if shelter == "Moderate": return 0
def get_duration_score(user, shelter):
if user == "Overnight":
if shelter == "Overnight": return 0
if shelter == "Temporary": return 0.5
if shelter == "Transitional": return 0.75
if shelter == "Long-Term": return 1
elif user == "A month or less":
if shelter == "Overnight": return 0.5
if shelter == "Temporary": return 0
if shelter == "Transitional": return 0.25
if shelter == "Long-Term": return 0.75
elif user == "A couple of months":
if shelter == "Overnight": return 0.75
if shelter == "Temporary": return 0.25
if shelter == "Transitional": return 0
if shelter == "Long-Term": return 0.5
elif user == "A year or more":
if shelter == "Overnight": return 1
if shelter == "Temporary": return 0.75
if shelter == "Transitional": return 0.5
if shelter == "Long-Term": return 0
# def get_coordinates(zipcode: str, api_key: str) -> list:
# """
# Get the coordinates (latitude and longitude) of an address using the OpenWeather Geocoding API.
# Parameters:
# zipcode (str): The zipcode to geocode.
# api_key (str): Your OpenWeather API key.
# Returns:
# list: A list containing the latitude and longitude of the address.
# """
# base_url = "http://api.openweathermap.org/geo/1.0/zip"
# params = {
# 'zip': str(zipcode) + ",US",
# 'appid': api_key
# }
# response = requests.get(base_url, params=params)
# data = response.json()
# print(data)
# return [data.get('lat'), data.get('lon')]
def haversine(lat1, lon1, lat2, lon2):
R = 6371 # Earth radius in kilometers. Use 3956 for miles.
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = math.sin(dlat / 2) ** 2 + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlon / 2) ** 2
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
distance = R * c
return distance
# Initialize session state
if 'form_submitted' not in st.session_state:
st.session_state.form_submitted = False
if 'shelter_index' not in st.session_state:
st.session_state.shelter_index = 0
if 'shelters_filtered' not in st.session_state:
st.session_state.shelters_filtered = False
# Page config
st.set_page_config(
page_title="ShelterSearch",
layout="wide",
)
st.title("ShelterSearch")
if not st.session_state.form_submitted:
st.write("Hello there! Fill out this quick form to receive recommendation for where you can go to receive help.")
st.markdown("Please give us feedback at this [link](https://forms.gle/oLMJ2qVc6HYgwfCw9)")
# should be updated manually annually - use zipcodebase API
zipcodes = {
'San Francisco': ['94101', '94102', '94103', '94104', '94105', '94107', '94108', '94109', '94110', '94111', '94112', '94114', '94115', '94116', '94117', '94118', '94119', '94120', '94121', '94122', '94123', '94124', '94125', '94126', '94127', '94128', '94129', '94130', '94131', '94132', '94133', '94134', '94140', '94141', '94142', '94146', '94147', '94157', '94159', '94164', '94165', '94166', '94167', '94168', '94169', '94170', '94172', '94188'],
'Oakland': ['94601', '94602', '94603', '94604', '94605', '94606', '94607', '94608', '94609', '94610', '94611', '94612', '94613', '94614', '94615', '94617', '94618', '94619', '94620', '94621', '94623', '94624', '94661', '94662'],
'Berkeley': ['94701', '94702', '94703', '94704', '94705', '94706', '94707', '94708', '94709', '94710', '94712']
}
city = st.selectbox("City", ['San Francisco', 'Oakland', 'Berkeley'])
zipcode = st.selectbox("Zipcode", ['Unsure'] + zipcodes[city])
sex = st.radio("Sex", ["Male", "Female", "Other"])
lgbtq = st.radio("Do you identify as LGBTQ+ (some shelters serve this community specifically)", ["No", "Yes"])
domestic_violence = st.radio("Have you experienced domestic violence (some shelters serve these individuals specifically", ["No", "Yes"])
urgency = st.radio("How quickly do you need help?", ("Today", "In the next few days", "In a week or more"))
duration = st.radio("How long do you need a place to stay?", ("Overnight", "A month or less", "A couple of months", "A year or more"))
needs = st.text_area("Optional - Needs (tell us what you need and how we can help)")
phone_number = st.text_input('Optional - Enter your phone number (to text shelter info to you)', '+1')
consent = st.checkbox('I consent to receiving a one-time message')
if st.button("Submit"):
data = {
"City": city,
"Zip Code": zipcode,
"Sex": sex,
"LGBTQ": lgbtq,
"Domestic Violence": domestic_violence,
"Urgency": urgency,
"Duration": duration,
"Needs": needs,
"Phone Number": phone_number,
"Consent": consent
}
with open('data.json', 'w') as f:
json.dump(data, f)
st.session_state.form_submitted = True
st.rerun()
else:
if not st.session_state.shelters_filtered:
with open('data.json', 'r') as f:
data = json.load(f)
shelters = pd.read_csv("database.csv")
# filter city
shelters = shelters[(shelters['City'] == data['City'])]
# filter sex
shelters = shelters[(shelters['Sex'] == data['Sex']) | (shelters['Sex'] == 'All')]
# filter lgbtq
if data['LGBTQ'] == 'No':
shelters = shelters[(shelters['LGBTQ'] == "No")]
# filter domestic violence
if data['Domestic Violence'] == "No":
shelters = shelters[(shelters['Domestic Violence'] == "No")]
# keep track of which scores are calculated
scores = []
# calculate distances between zipcodes
if data['Zip Code'] != "Unsure":
geocoding_api_key = os.environ['GoogleAPI']
shelters_coordinates = shelters.apply(lambda row: geocode_address(f"{row['Address']}, {row['City']}, CA {row['Zip Code']}", geocoding_api_key), axis=1).tolist()
user_coordinates = geocode_address(f"{data['City']}, CA {data['Zip Code']}", geocoding_api_key)
distances = []
for coordinates in shelters_coordinates:
distances.append(haversine(coordinates[0], coordinates[1], user_coordinates[0], user_coordinates[1]))
max_d = max(distances) if (max(distances) != 0) else 1
shelters['zipcode_score'] = [d / max_d for d in distances]
scores.append('zipcode_score')
print(shelters['zipcode_score'])
# get urgency scores
urgency_scores = shelters.apply(lambda row: get_urgency_score(data['Urgency'], row['Urgency']), axis=1).tolist()
shelters['urgency_score'] = urgency_scores
scores.append('urgency_score')
# get duration scores
duration_scores = shelters.apply(lambda row: get_duration_score(data['Duration'], row['Duration']), axis=1).tolist()
shelters['duration_score'] = duration_scores
scores.append('duration_score')
# get services scores
if data['Needs'] != "":
OpenAI_API_KEY = os.environ["OPENAI_API_KEY"]
services_scores = shelters.apply(lambda row: call_gpt(data['Needs'], row['Services'], OpenAI_API_KEY), axis=1).tolist()
services_scores = [s / 10 for s in services_scores]
shelters['services_score'] = services_scores
scores.append('services_score')
# get time-based scores
time_scores = shelters.apply(lambda row: get_time_score(datetime.now(timezone), row), axis=1).tolist()
if data['Urgency'] == "Today":
for i in range(len(scores)):
shelters[f'time_score_{i}'] = time_scores
scores.append(f'time_score_{i}')
elif data['Urgency'] == "In the next few days":
shelters['time_score'] = time_scores
scores.append('time_score')
elif data['Urgency'] == "In a week or more":
pass
# calcualte cumulative score
shelters['total_score'] = shelters[scores].sum(axis=1)
shelters['total_score'] = shelters['total_score'] / len(scores)
shelters = shelters.sort_values(by='total_score', ascending=True)
shelters = shelters.head(3)
# convert pandas df into list of dicts
shelters = shelters.to_dict(orient='records')
# text messaging
if len(data['Phone Number']) == 12 and data['Consent']:
try:
account_sid = os.environ["SID"]
auth_token = os.environ["auth_token"]
client = Client(account_sid, auth_token)
message_body = "Here's some key shelter information from using ShelterSearch today:\n\n"
for i in range(len(shelters)):
phone = str(shelters[i]['Phone'])
message_body += f"{shelters[i]['Organization Name']}: {shelters[i]['Program Name']}\n"
message_body += f"🕒 Open Hours: {shelters[i]['Open Hours']}\n"
message_body += f"📍 Address: {shelters[i]['Address']}\n"
message_body += f"📞 Phone Number: ({phone[1:4]}) {phone[4:7]}-{phone[7:]}\n\n"
message = client.messages.create(
body = message_body,
from_= "+15107212356",
to = data['Phone Number']
)
except: pass
st.session_state.shelters_filtered = True
st.session_state.shelters = shelters
# Display the current shelter information
shelter = st.session_state.shelters[st.session_state.shelter_index]
st.header(f"{shelter['Organization Name']}: {shelter['Program Name']}")
st.subheader(f"{shelter['Type']}")
st.divider()
st.subheader("Shelter Summary")
st.write(shelter['Summary'])
st.divider()
st.subheader("How to Receive Help")
st.write(shelter['Application Details'])
st.markdown(f"- **🕒\tOpen Hours**: {shelter['Open Hours']}")
st.markdown(f"- **📍\tAddress**: {shelter['Address']}")
phone_number = str(shelter['Phone'])
formatted_phone_number = f"({phone_number[1:4]}) {phone_number[4:7]}-{phone_number[7:]}"
phone_link = f"<a href='tel:{phone_number}'>{formatted_phone_number}</a>"
st.markdown(f"- **📞\tPhone Number**: {phone_link}", unsafe_allow_html=True)
st.divider()
with st.expander("More Information"):
tabs = st.tabs(["Full List of Services", "More About the Program", "More About the Organization", "Webpage Link"])
with tabs[0]:
st.write(shelter['Services'])
with tabs[1]:
st.write(shelter['Program About'])
with tabs[2]:
st.write(shelter['Organization About'])
with tabs[3]:
st.write(shelter['Webpage'])
st.divider()
# Create map for address
map = create_map()
key = os.environ['GoogleAPI']
address = f"{shelter['Address']}, {shelter['City']}, CA"
lat, long = geocode_address(address, key)
# Fit the map bounds to include all markers
south_west = [lat - 0.02, long - 0.02]
north_east = [lat + 0.02, long + 0.02]
map_bounds = [south_west, north_east]
map.fit_bounds(map_bounds)
folium.Marker([lat, long], popup=shelter['Address']).add_to(map)
folium_static(map)
st.markdown(f" ## [Get Directions](https://www.google.com/maps/dir/?api=1&origin=current+location&destination={lat},{long})")
st.divider()
# Create two columns
col1, col2, col3 = st.columns([1,1,1])
# Add buttons to each column
with col1:
if st.button("Previous"):
if st.session_state.shelter_index > 0:
st.session_state.shelter_index -= 1
st.rerun()
with col2:
if st.button("Next"):
if st.session_state.shelter_index < 2:
st.session_state.shelter_index += 1
st.rerun()
with col3:
if st.button("Reset"):
st.session_state.shelter_index = 0
st.session_state.form_submitted = False
st.session_state.shelters_filtered = False
st.rerun()