MotiMeter / session_analysis.py
Jiaaaaaaax's picture
Update session_analysis.py
0c62c39 verified
import streamlit as st
import io
from io import BytesIO
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import google.generativeai as genai
from datetime import datetime
import json
import numpy as np
from docx import Document
import re
from prompts import SESSION_EVALUATION_PROMPT, MI_SYSTEM_PROMPT
def show_session_analysis():
st.title("MI Session Analysis Dashboard")
# Initialize session state for analysis results
if 'analysis_results' not in st.session_state:
st.session_state.analysis_results = None
if 'current_transcript' not in st.session_state:
st.session_state.current_transcript = None
# Main layout
col1, col2 = st.columns([1, 2])
with col1:
show_upload_section()
with col2:
if st.session_state.analysis_results:
show_analysis_results()
def show_upload_section():
st.header("Session Data Upload")
upload_type = st.radio(
"Select Input Method:",
["Audio Recording", "Video Recording", "Text Transcript", "Session Notes", "Previous Session Data"]
)
if upload_type in ["Audio Recording", "Video Recording"]:
file = st.file_uploader(
f"Upload {upload_type}",
type=["wav", "mp3", "mp4"] if upload_type == "Audio Recording" else ["mp4", "avi", "mov"]
)
if file:
process_media_file(file, upload_type)
elif upload_type == "Text Transcript":
file = st.file_uploader("Upload Transcript", type=["txt", "doc", "docx", "json"])
if file:
process_text_file(file)
elif upload_type == "Session Notes":
show_manual_input_form()
else: # Previous Session Data
show_previous_sessions_selector()
def process_video_file(video_file):
"""Process uploaded video file"""
try:
# Create a unique temporary file name
temp_path = f"temp_video_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp4"
# Save video temporarily
with open(temp_path, "wb") as f:
f.write(video_file.getbuffer())
# Display video
st.video(temp_path)
# Add transcript input
transcript = st.text_area(
"Enter the session transcript:",
height=300,
help="Paste or type the transcript of the session here."
)
# Add analyze button
if st.button("Analyze Transcript"):
if transcript.strip():
with st.spinner('Analyzing transcript...'):
analyze_session_content(transcript)
else:
st.warning("Please enter a transcript before analyzing.")
# Clean up temporary file
try:
os.remove(temp_path)
except:
pass
except Exception as e:
st.error(f"Error processing video: {str(e)}")
def process_audio_file(audio_file):
"""Process uploaded audio file"""
try:
# Save audio temporarily
temp_path = f"temp_audio_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp3"
with open(temp_path, "wb") as f:
f.write(audio_file.getbuffer())
st.audio(temp_path)
st.info("Audio uploaded successfully. Please provide transcript.")
# Add manual transcript input
transcript = st.text_area("Enter the session transcript:", height=300)
# Add analyze button
if st.button("Analyze Transcript"):
if transcript:
with st.spinner('Analyzing transcript...'):
st.session_state.current_transcript = transcript
analyze_session_content(transcript)
else:
st.warning("Please enter a transcript before analyzing.")
except Exception as e:
st.error(f"Error processing audio: {str(e)}")
def process_media_file(file, type):
st.write(f"Processing {type}...")
# Add processing status
status = st.empty()
progress_bar = st.progress(0)
try:
# Read file content
file_content = file.read()
status.text("Generating transcript...")
progress_bar.progress(50)
# Generate transcript using Gemini
model = genai.GenerativeModel('gemini-pro')
# Convert file content to text
if type == "Audio Recording":
# For audio files, create a prompt that describes the audio
prompt = f"""
This is an audio recording of a therapy session.
Please transcribe the conversation and include speaker labels where possible.
Focus on capturing:
1. The therapist's questions and reflections
2. The client's responses and statements
3. Any significant pauses or non-verbal sounds
"""
else: # Video Recording
# For video files, create a prompt that describes the video
prompt = f"""
This is a video recording of a therapy session.
Please transcribe the conversation and include:
1. Speaker labels
2. Verbal communication
3. Relevant non-verbal cues and body language
4. Significant pauses or interactions
"""
# Generate transcript
response = model.generate_content(prompt)
transcript = response.text
if transcript:
st.session_state.current_transcript = transcript
status.text("Analyzing content...")
progress_bar.progress(80)
analyze_session_content(transcript)
progress_bar.progress(100)
status.text("Processing complete!")
except Exception as e:
st.error(f"Error processing file: {str(e)}")
finally:
status.empty()
progress_bar.empty()
def get_processing_step_name(step):
steps = [
"Loading media file",
"Converting to audio",
"Performing speech recognition",
"Generating transcript",
"Preparing analysis"
]
return steps[step]
def process_text_file(file):
"""Process uploaded text file"""
try:
# Read file content
content = file.getvalue().decode("utf-8")
st.session_state.current_transcript = content
# Display transcript with edit option
edited_transcript = st.text_area(
"Review and edit transcript if needed:",
value=content,
height=300
)
# Add analyze button
if st.button("Analyze Transcript"):
with st.spinner('Analyzing transcript...'):
st.session_state.current_transcript = edited_transcript
analyze_session_content(edited_transcript)
except Exception as e:
st.error(f"Error processing file: {str(e)}")
def parse_analysis_results(raw_results):
"""Parse the raw analysis results into structured format"""
if isinstance(raw_results, dict):
return raw_results # Already parsed
try:
# If it's a string, try to extract structured data
analysis = {
'mi_adherence_score': 0,
'key_themes': [],
'technique_usage': {},
'strengths': [],
'areas_for_improvement': [],
'session_summary': ''
}
# Extract score (assuming it's in format "Score: XX")
score_match = re.search(r'Score:\s*(\d+)', raw_results)
if score_match:
analysis['mi_adherence_score'] = int(score_match.group(1))
# Extract themes (assuming they're listed after "Key Themes:")
themes_match = re.search(r'Key Themes:(.*?)(?=\n\n|\Z)', raw_results, re.DOTALL)
if themes_match:
themes = themes_match.group(1).strip().split('\n')
analysis['key_themes'] = [t.strip('- ') for t in themes if t.strip()]
# Extract techniques (assuming they're listed with counts)
techniques = re.findall(r'(\w+\s*\w*)\s*:\s*(\d+)', raw_results)
if techniques:
analysis['technique_usage'] = {t[0]: int(t[1]) for t in techniques}
# Extract strengths
strengths_match = re.search(r'Strengths:(.*?)(?=Areas for Improvement|\Z)', raw_results, re.DOTALL)
if strengths_match:
strengths = strengths_match.group(1).strip().split('\n')
analysis['strengths'] = [s.strip('- ') for s in strengths if s.strip()]
# Extract areas for improvement
improvements_match = re.search(r'Areas for Improvement:(.*?)(?=\n\n|\Z)', raw_results, re.DOTALL)
if improvements_match:
improvements = improvements_match.group(1).strip().split('\n')
analysis['areas_for_improvement'] = [i.strip('- ') for i in improvements if i.strip()]
# Extract summary
summary_match = re.search(r'Summary:(.*?)(?=\n\n|\Z)', raw_results, re.DOTALL)
if summary_match:
analysis['session_summary'] = summary_match.group(1).strip()
return analysis
except Exception as e:
st.error(f"Error parsing analysis results: {str(e)}")
return None
def show_manual_input_form():
st.subheader("Session Details")
with st.form("session_notes_form"):
# Basic session information
session_date = st.date_input("Session Date", datetime.now())
session_duration = st.number_input("Duration (minutes)", min_value=15, max_value=120, value=50)
# Session content
session_notes = st.text_area(
"Session Notes",
height=300,
placeholder="Enter detailed session notes here..."
)
# Key themes and observations
key_themes = st.text_area(
"Key Themes",
height=100,
placeholder="Enter key themes identified during the session..."
)
# MI specific elements
mi_techniques_used = st.multiselect(
"MI Techniques Used",
["Open Questions", "Affirmations", "Reflections", "Summaries",
"Change Talk", "Commitment Language", "Planning"]
)
# Submit button
submitted = st.form_submit_button("Analyze Session")
if submitted and session_notes:
# Combine all input into a structured format
session_data = {
'date': session_date,
'duration': session_duration,
'notes': session_notes,
'themes': key_themes,
'techniques': mi_techniques_used
}
# Process the session data
st.session_state.current_transcript = format_session_data(session_data)
analyze_session_content(st.session_state.current_transcript)
def analyze_session_content(transcript):
"""Analyze the session transcript using Gemini"""
try:
if not transcript:
st.warning("Please provide a transcript for analysis.")
return
# Configure the model
model = genai.GenerativeModel('gemini-pro')
# Structured prompt for MI analysis
prompt = f"""
As an MI (Motivational Interviewing) expert, analyze this therapy session transcript and provide detailed feedback in the following format:
=== MI Adherence ===
Score: [Provide a score from 0-100]
Strengths:
- [List 3 specific strengths with examples]
Areas for Growth:
- [List 3 specific areas needing improvement with examples]
=== Technical Analysis ===
OARS Usage Count:
- Open Questions: [number]
- Affirmations: [number]
- Reflections: [number]
- Summaries: [number]
=== Client Language Analysis ===
Change Talk Examples:
- [List 3-4 specific quotes showing change talk]
Sustain Talk Examples:
- [List 2-3 specific quotes showing sustain talk]
Change Talk/Sustain Talk Ratio: [X:Y]
=== Session Flow ===
Key Moments:
1. [Describe key moment 1]
2. [Describe key moment 2]
3. [Describe key moment 3]
Therapeutic Process:
- [Describe how the session progressed]
- [Note any significant shifts]
=== Recommendations ===
Priority Actions:
1. [Specific recommendation 1]
2. [Specific recommendation 2]
3. [Specific recommendation 3]
Development Strategies:
- [Practical strategy 1]
- [Practical strategy 2]
Analyze this transcript:
{transcript}
"""
# Generate response
response = model.generate_content(prompt)
# Store results
st.session_state.analysis_results = response.text
return True
except Exception as e:
st.error(f"Error in analysis: {str(e)}")
return False
def generate_transcript(audio_content):
"""
Generate transcript from audio content using Google Speech-to-Text
Note: This requires the Google Cloud Speech-to-Text API
"""
try:
# Initialize Speech-to-Text client
client = speech_v1.SpeechClient()
# Configure audio and recognition settings
audio = speech_v1.RecognitionAudio(content=audio_content)
config = speech_v1.RecognitionConfig(
encoding=speech_v1.RecognitionConfig.AudioEncoding.LINEAR16,
sample_rate_hertz=16000,
language_code="en-US",
enable_automatic_punctuation=True,
)
# Perform the transcription
response = client.recognize(config=config, audio=audio)
# Combine all transcriptions
transcript = ""
for result in response.results:
transcript += result.alternatives[0].transcript + " "
return transcript.strip()
except Exception as e:
st.error(f"Error in transcript generation: {str(e)}")
return None
def convert_video_to_audio(video_file):
"""
Convert video file to audio content
Note: This is a placeholder - you'll need to implement actual video to audio conversion
"""
# Placeholder for video to audio conversion
# You might want to use libraries like moviepy or ffmpeg-python
st.warning("Video to audio conversion not implemented yet")
return None
def process_analysis_results(raw_analysis):
"""Process and structure the analysis results"""
# Parse the raw analysis text and extract structured data
sections = extract_analysis_sections(raw_analysis)
# Calculate metrics
metrics = calculate_mi_metrics(raw_analysis)
return {
"raw_analysis": raw_analysis,
"structured_sections": sections,
"metrics": metrics,
"timestamp": datetime.now().isoformat()
}
def show_mi_metrics_dashboard(metrics):
st.subheader("MI Performance Dashboard")
col1, col2, col3, col4 = st.columns(4)
with col1:
show_metric_card(
"MI Spirit Score",
metrics.get('mi_spirit_score', 0),
"0-5 scale"
)
with col2:
show_metric_card(
"Change Talk Ratio",
metrics.get('change_talk_ratio', 0),
"Change vs Sustain"
)
with col3:
show_metric_card(
"Reflection Ratio",
metrics.get('reflection_ratio', 0),
"Reflections/Questions"
)
with col4:
show_metric_card(
"Overall Adherence",
metrics.get('overall_adherence', 0),
"Percentage"
)
def show_metric_card(title, value, subtitle):
st.markdown(
f"""
<div style="border:1px solid #ccc; padding:10px; border-radius:5px; text-align:center;">
<h3>{title}</h3>
<h2>{value:.2f}</h2>
<p>{subtitle}</p>
</div>
""",
unsafe_allow_html=True
)
def show_mi_adherence_analysis(results):
st.subheader("MI Adherence Analysis")
# OARS Implementation
st.write("### OARS Implementation")
show_oars_chart(results['metrics'].get('oars_metrics', {}))
# MI Spirit Components
st.write("### MI Spirit Components")
show_mi_spirit_chart(results['metrics'].get('mi_spirit_metrics', {}))
# Detailed breakdown
st.write("### Detailed Analysis")
st.markdown(results['structured_sections'].get('mi_adherence', ''))
def show_technical_skills_analysis(results):
st.subheader("Technical Skills Analysis")
# Question Analysis
col1, col2 = st.columns(2)
with col1:
show_question_type_chart(results['metrics'].get('question_metrics', {}))
with col2:
show_reflection_depth_chart(results['metrics'].get('reflection_metrics', {}))
# Detailed analysis
st.markdown(results['structured_sections'].get('technical_skills', ''))
def show_client_language_analysis(results):
st.subheader("Client Language Analysis")
# Change Talk Timeline
show_change_talk_timeline(results['metrics'].get('change_talk_timeline', []))
# Language Categories
show_language_categories_chart(results['metrics'].get('language_categories', {}))
# Detailed analysis
st.markdown(results['structured_sections'].get('client_language', ''))
def show_session_flow_analysis(results):
st.subheader("Session Flow Analysis")
# Session Flow Timeline
show_session_flow_timeline(results['metrics'].get('session_flow', []))
# Engagement Metrics
show_engagement_metrics(results['metrics'].get('engagement_metrics', {}))
# Detailed analysis
st.markdown(results['structured_sections'].get('session_flow', ''))
def show_recommendations(results):
st.subheader("Recommendations and Next Steps")
col1, col2 = st.columns(2)
with col1:
st.write("### Strengths")
strengths = results['structured_sections'].get('strengths', [])
for strength in strengths:
st.markdown(f"✓ {strength}")
with col2:
st.write("### Growth Areas")
growth_areas = results['structured_sections'].get('growth_areas', [])
for area in growth_areas:
st.markdown(f"→ {area}")
st.write("### Suggested Interventions")
st.markdown(results['structured_sections'].get('suggested_interventions', ''))
st.write("### Next Session Planning")
st.markdown(results['structured_sections'].get('next_session_plan', ''))
# Utility functions for charts and visualizations
def show_oars_chart(oars_metrics):
# Create OARS radar chart using plotly
categories = ['Open Questions', 'Affirmations', 'Reflections', 'Summaries']
values = [
oars_metrics.get('open_questions', 0),
oars_metrics.get('affirmations', 0),
oars_metrics.get('reflections', 0),
oars_metrics.get('summaries', 0)
]
fig = go.Figure(data=go.Scatterpolar(
r=values,
theta=categories,
fill='toself'
))
fig.update_layout(
polar=dict(
radialaxis=dict(
visible=True,
range=[0, max(values) + 1]
)),
showlegend=False
)
st.plotly_chart(fig)
def save_analysis_results():
"""Save analysis results to file"""
if st.session_state.analysis_results:
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"analysis_results_{timestamp}.json"
with open(filename, "w") as f:
json.dump(st.session_state.analysis_results, f, indent=4)
st.success(f"Analysis results saved to {filename}")
except Exception as e:
st.error(f"Error saving analysis results: {str(e)}")
def show_upload_section():
"""Display the upload section of the dashboard"""
st.subheader("Upload Session")
upload_type = st.radio(
"Choose input method:",
["Text Transcript", "Video Recording", "Audio Recording", "Session Notes", "Previous Sessions"]
)
if upload_type == "Text Transcript":
file = st.file_uploader("Upload transcript file", type=['txt', 'doc', 'docx'])
if file:
process_text_file(file)
elif upload_type == "Video Recording":
video_file = st.file_uploader("Upload video file", type=['mp4', 'mov', 'avi'])
if video_file:
process_video_file(video_file)
elif upload_type == "Audio Recording":
audio_file = st.file_uploader("Upload audio file", type=['mp3', 'wav', 'm4a'])
if audio_file:
process_audio_file(audio_file)
elif upload_type == "Session Notes":
show_manual_input_form()
else:
show_previous_sessions_selector()
def process_text_file(file):
try:
if file.name.endswith('.json'):
content = json.loads(file.read().decode())
transcript = extract_transcript_from_json(content)
elif file.name.endswith('.docx'):
doc = Document(file)
transcript = '\n'.join([paragraph.text for paragraph in doc.paragraphs])
else:
transcript = file.read().decode()
if transcript:
st.session_state.current_transcript = transcript
analyze_session_content(transcript)
except Exception as e:
st.error(f"Error processing file: {str(e)}")
def show_export_options():
st.sidebar.subheader("Export Options")
if st.sidebar.button("Export Analysis Report"):
save_analysis_results()
report_format = st.sidebar.selectbox(
"Report Format",
["PDF", "DOCX", "JSON"]
)
if st.sidebar.button("Generate Report"):
generate_report(report_format)
def generate_report(format):
"""Generate analysis report in specified format"""
# Add report generation logic here
st.info(f"Generating {format} report... (Feature coming soon)")
def show_previous_sessions_selector():
"""Display selector for previous session data"""
st.subheader("Previous Sessions")
# Load or initialize previous sessions data
if 'previous_sessions' not in st.session_state:
st.session_state.previous_sessions = load_previous_sessions()
if not st.session_state.previous_sessions:
st.info("No previous sessions found.")
return
# Create session selector
sessions = st.session_state.previous_sessions
session_dates = [session['date'] for session in sessions]
selected_date = st.selectbox(
"Select Session Date:",
session_dates,
format_func=lambda x: x.strftime("%Y-%m-%d %H:%M")
)
# Show selected session data
if selected_date:
selected_session = next(
(session for session in sessions if session['date'] == selected_date),
None
)
if selected_session:
st.session_state.current_transcript = selected_session['transcript']
analyze_session_content(selected_session['transcript'])
def load_previous_sessions():
"""Load previous session data from storage"""
try:
# Initialize empty list for sessions
sessions = []
# Here you would typically load from your database or file storage
# For demonstration, we'll create some sample data
sample_sessions = [
{
'date': datetime.now(),
'transcript': "Sample transcript 1...",
'analysis': "Sample analysis 1..."
},
{
'date': datetime.now(),
'transcript': "Sample transcript 2...",
'analysis': "Sample analysis 2..."
}
]
return sample_sessions
except Exception as e:
st.error(f"Error loading previous sessions: {str(e)}")
return []
def format_session_data(session_data):
"""Format session data into analyzable transcript"""
formatted_text = f"""
Session Date: {session_data['date']}
Duration: {session_data['duration']} minutes
SESSION NOTES:
{session_data['notes']}
KEY THEMES:
{session_data['themes']}
MI TECHNIQUES USED:
{', '.join(session_data['techniques'])}
"""
return formatted_text
def show_analysis_results():
"""Display the analysis results in organized tabs"""
if 'analysis_results' not in st.session_state or not st.session_state.analysis_results:
st.info("Please analyze a transcript first.")
return
results = st.session_state.analysis_results
# Create tabs
tabs = st.tabs([
"MI Adherence",
"Technical Skills",
"Client Language",
"Session Flow",
"Recommendations"
])
# MI Adherence Tab
with tabs[0]:
st.subheader("MI Adherence Analysis")
# Extract score
score_match = re.search(r'Score:\s*(\d+)', results)
if score_match:
score = int(score_match.group(1))
# Create score gauge
fig = go.Figure(go.Indicator(
mode="gauge+number",
value=score,
domain={'x': [0, 1], 'y': [0, 1]},
gauge={
'axis': {'range': [0, 100]},
'bar': {'color': "rgb(26, 118, 255)"},
'steps': [
{'range': [0, 33], 'color': "lightgray"},
{'range': [33, 66], 'color': "gray"},
{'range': [66, 100], 'color': "darkgray"}
]
}
))
st.plotly_chart(fig)
# Display strengths and areas for growth
col1, col2 = st.columns(2)
with col1:
st.subheader("Strengths")
strengths = re.findall(r'Strengths:\n((?:- .*\n)*)', results)
if strengths:
for strength in strengths[0].strip().split('\n'):
if strength.startswith('- '):
st.markdown(f"✅ {strength[2:]}")
with col2:
st.subheader("Areas for Growth")
growth = re.findall(r'Areas for Growth:\n((?:- .*\n)*)', results)
if growth:
for area in growth[0].strip().split('\n'):
if area.startswith('- '):
st.markdown(f"🔄 {area[2:]}")
# Technical Skills Tab
with tabs[1]:
st.subheader("OARS Technique Analysis")
# Extract OARS counts
oars_pattern = r'OARS Usage Count:\n- Open Questions: (\d+)\n- Affirmations: (\d+)\n- Reflections: (\d+)\n- Summaries: (\d+)'
oars_match = re.search(oars_pattern, results)
if oars_match:
open_q = int(oars_match.group(1))
affirm = int(oars_match.group(2))
reflect = int(oars_match.group(3))
summ = int(oars_match.group(4))
# Create bar chart
fig = go.Figure(data=[
go.Bar(
x=['Open Questions', 'Affirmations', 'Reflections', 'Summaries'],
y=[open_q, affirm, reflect, summ],
marker_color=['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728']
)
])
fig.update_layout(
title="OARS Techniques Usage",
xaxis_title="Technique Type",
yaxis_title="Frequency",
showlegend=False,
height=400
)
st.plotly_chart(fig)
# Display detailed breakdown
col1, col2 = st.columns(2)
with col1:
st.markdown("### Technique Counts")
st.markdown(f"🔹 **Open Questions:** {open_q}")
st.markdown(f"🔹 **Affirmations:** {affirm}")
st.markdown(f"🔹 **Reflections:** {reflect}")
st.markdown(f"🔹 **Summaries:** {summ}")
with col2:
# Calculate total and percentages
total = open_q + affirm + reflect + summ
st.markdown("### Technique Distribution")
st.markdown(f"🔸 **Open Questions:** {(open_q/total*100):.1f}%")
st.markdown(f"🔸 **Affirmations:** {(affirm/total*100):.1f}%")
st.markdown(f"🔸 **Reflections:** {(reflect/total*100):.1f}%")
st.markdown(f"🔸 **Summaries:** {(summ/total*100):.1f}%")
# Add reflection-to-question ratio
st.markdown("### Key Metrics")
if open_q > 0:
r_to_q = reflect / open_q
st.metric(
label="Reflection-to-Question Ratio",
value=f"{r_to_q:.2f}",
help="Target ratio is 2:1 or higher"
)
# Add MI best practice guidelines
st.markdown("### MI Best Practices")
st.info("""
📌 **Ideal OARS Distribution:**
- Reflections should exceed questions (2:1 ratio)
- Regular use of affirmations (at least 1-2 per session)
- Strategic use of summaries at transition points
- Open questions > 70% of all questions
""")
else:
st.warning("Technical skills analysis data not found in the results.")
# Client Language Tab
with tabs[2]:
st.subheader("Client Language Analysis")
col1, col2 = st.columns(2)
with col1:
st.markdown("### Change Talk 🌱")
change_talk = re.findall(r'Change Talk Examples:\n((?:- .*\n)*)', results)
if change_talk:
for talk in change_talk[0].strip().split('\n'):
if talk.startswith('- '):
st.markdown(f"- {talk[2:]}")
with col2:
st.markdown("### Sustain Talk 🔄")
sustain_talk = re.findall(r'Sustain Talk Examples:\n((?:- .*\n)*)', results)
if sustain_talk:
for talk in sustain_talk[0].strip().split('\n'):
if talk.startswith('- '):
st.markdown(f"- {talk[2:]}")
# Session Flow Tab
with tabs[3]:
st.subheader("Session Flow Analysis")
# Key Moments
st.markdown("### Key Moments")
key_moments = re.findall(r'Key Moments:\n((?:\d\. .*\n)*)', results)
if key_moments:
for moment in key_moments[0].strip().split('\n'):
if moment.strip():
st.markdown(f"{moment}")
# Therapeutic Process
st.markdown("### Therapeutic Process")
process = re.findall(r'Therapeutic Process:\n((?:- .*\n)*)', results)
if process:
for item in process[0].strip().split('\n'):
if item.startswith('- '):
st.markdown(f"- {item[2:]}")
# Recommendations Tab
with tabs[4]:
st.subheader("Recommendations")
# Priority Actions
st.markdown("### Priority Actions 🎯")
priorities = re.findall(r'Priority Actions:\n((?:\d\. .*\n)*)', results)
if priorities:
for priority in priorities[0].strip().split('\n'):
if priority.strip():
st.markdown(f"{priority}")
# Development Strategies
st.markdown("### Development Strategies 📈")
strategies = re.findall(r'Development Strategies:\n((?:- .*\n)*)', results)
if strategies:
for strategy in strategies[0].strip().split('\n'):
if strategy.startswith('- '):
st.markdown(f"- {strategy[2:]}")
def get_technique_description(technique):
"""Return description for MI techniques"""
descriptions = {
"Open Questions": "Questions that allow for elaboration and cannot be answered with a simple yes/no.",
"Reflections": "Statements that mirror, rephrase, or elaborate on the client's speech.",
"Affirmations": "Statements that recognize client strengths and acknowledge behaviors that lead to positive change.",
"Summaries": "Statements that collect, link, and transition between client statements.",
"Information Giving": "Providing information with permission and in response to client needs.",
# Add more techniques as needed
}
return descriptions.get(technique, "Description not available")
def create_session_timeline(timeline_data):
"""Create a visual timeline of the session"""
if not timeline_data:
st.info("Detailed timeline not available")
return
fig = go.Figure()
# Add timeline visualization code here
st.plotly_chart(fig)
def get_improvement_suggestion(area):
"""Return specific suggestions for improvement areas"""
suggestions = {
"Open Questions": "Try replacing closed questions with open-ended ones. Instead of 'Did you exercise?', ask 'What kinds of physical activity have you been doing?'",
"Reflections": "Practice using more complex reflections by adding meaning or emotion to what the client has said.",
"Empathy": "Focus on seeing the situation from the client's perspective and verbalize your understanding.",
# Add more suggestions as needed
}
return suggestions.get(area, "Work on incorporating this element more intentionally in your sessions.")
def create_action_items(analysis):
"""Create specific action items based on analysis"""
st.write("Based on the analysis, consider focusing on these specific actions:")
# Example action items
action_items = [
"Practice one new MI skill each session",
"Record and review your sessions",
"Focus on developing complex reflections",
"Track change talk/sustain talk ratio"
]
for item in action_items:
st.checkbox(item)
def show_relevant_resources(analysis):
"""Display relevant resources based on analysis"""
resources = [
{"title": "MI Practice Exercises", "url": "#"},
{"title": "Reflection Templates", "url": "#"},
{"title": "Change Talk Recognition Guide", "url": "#"},
{"title": "MI Community of Practice", "url": "#"}
]
for resource in resources:
st.markdown(f"[{resource['title']}]({resource['url']})")
def parse_analysis_response(response_text):
"""Parse the AI response into structured analysis results"""
try:
# Initialize default structure for analysis results
analysis = {
'mi_adherence_score': 0.0,
'key_themes': [],
'technique_usage': {},
'strengths': [],
'areas_for_improvement': [],
'recommendations': [],
'change_talk_instances': [],
'session_summary': ""
}
# Extract MI adherence score
score_match = re.search(r'MI Adherence Score:\s*(\d+\.?\d*)', response_text)
if score_match:
analysis['mi_adherence_score'] = float(score_match.group(1))
# Extract key themes
themes_section = re.search(r'Key Themes:(.*?)(?=\n\n|\Z)', response_text, re.DOTALL)
if themes_section:
themes = themes_section.group(1).strip().split('\n')
analysis['key_themes'] = [theme.strip('- ') for theme in themes if theme.strip()]
# Extract technique usage
technique_section = re.search(r'Technique Usage:(.*?)(?=\n\n|\Z)', response_text, re.DOTALL)
if technique_section:
techniques = technique_section.group(1).strip().split('\n')
for technique in techniques:
if ':' in technique:
name, count = technique.split(':')
analysis['technique_usage'][name.strip()] = int(count.strip())
# Extract strengths
strengths_section = re.search(r'Strengths:(.*?)(?=\n\n|\Z)', response_text, re.DOTALL)
if strengths_section:
strengths = strengths_section.group(1).strip().split('\n')
analysis['strengths'] = [s.strip('- ') for s in strengths if s.strip()]
# Extract areas for improvement
improvements_section = re.search(r'Areas for Improvement:(.*?)(?=\n\n|\Z)', response_text, re.DOTALL)
if improvements_section:
improvements = improvements_section.group(1).strip().split('\n')
analysis['areas_for_improvement'] = [i.strip('- ') for i in improvements if i.strip()]
# Extract session summary
summary_section = re.search(r'Session Summary:(.*?)(?=\n\n|\Z)', response_text, re.DOTALL)
if summary_section:
analysis['session_summary'] = summary_section.group(1).strip()
return analysis
except Exception as e:
st.error(f"Error parsing analysis response: {str(e)}")
return None
def get_improvement_suggestion(area):
"""Return specific suggestions for improvement areas"""
suggestions = {
"open questions": "Practice replacing closed questions with open-ended ones. For example:\n- Instead of: 'Did you exercise?'\n- Try: 'What kinds of physical activity have you been doing?'",
"reflections": "Work on using more complex reflections by adding meaning or emotion to what the client has said. Try to make at least two complex reflections for every simple reflection.",
"empathy": "Focus on seeing the situation from the client's perspective. Take time to verbalize your understanding of their emotions and experiences.",
"summaries": "Use more collecting summaries to gather key points discussed and transition summaries to move between topics.",
"affirmations": "Look for opportunities to genuinely affirm client strengths and efforts, not just outcomes."
}
# Look for matching suggestions using partial string matching
for key, value in suggestions.items():
if key in area.lower():
return value
return "Focus on incorporating this element more intentionally in your sessions. Consider recording your sessions and reviewing them with a supervisor or peer."
def create_gauge_chart(score):
"""Create a gauge chart for MI Adherence Score"""
fig = go.Figure(go.Indicator(
mode = "gauge+number",
value = score,
domain = {'x': [0, 1], 'y': [0, 1]},
title = {'text': "MI Adherence"},
gauge = {
'axis': {'range': [0, 100]},
'bar': {'color': "darkblue"},
'steps': [
{'range': [0, 40], 'color': "lightgray"},
{'range': [40, 70], 'color': "gray"},
{'range': [70, 100], 'color': "darkgray"}
],
'threshold': {
'line': {'color': "red", 'width': 4},
'thickness': 0.75,
'value': 90
}
}
))
st.plotly_chart(fig)
def create_technique_usage_chart(technique_usage):
"""Create a bar chart for MI technique usage"""
df = pd.DataFrame(list(technique_usage.items()), columns=['Technique', 'Count'])
fig = px.bar(
df,
x='Technique',
y='Count',
title='MI Technique Usage Frequency'
)
fig.update_layout(
xaxis_title="Technique",
yaxis_title="Frequency",
showlegend=False
)
st.plotly_chart(fig)
def extract_transcript_from_json(content):
"""Extract transcript from JSON content"""
if isinstance(content, dict):
return json.dumps(content, indent=2)
return str(content)
# Analysis display functions
def show_mi_adherence_analysis(analysis):
st.subheader("MI Adherence Analysis")
st.write(analysis.get('raw_text', 'No analysis available'))
def show_technical_skills_analysis(analysis):
st.subheader("Technical Skills Analysis")
st.write(analysis.get('raw_text', 'No analysis available'))
def show_client_language_analysis(analysis):
st.subheader("Client Language Analysis")
st.write(analysis.get('raw_text', 'No analysis available'))
def show_session_flow_analysis(analysis):
st.subheader("Session Flow Analysis")
st.write(analysis.get('raw_text', 'No analysis available'))
def show_recommendations(analysis):
st.subheader("Recommendations")
st.write(analysis.get('raw_text', 'No recommendations available'))
if __name__ == "__main__":
show_session_analysis()