import streamlit as st import io from io import BytesIO import pandas as pd import plotly.express as px import plotly.graph_objects as go import google.generativeai as genai from datetime import datetime import json import numpy as np from docx import Document import re from prompts import SESSION_EVALUATION_PROMPT, MI_SYSTEM_PROMPT def show_session_analysis(): st.title("MI Session Analysis Dashboard") # Initialize session state for analysis results if 'analysis_results' not in st.session_state: st.session_state.analysis_results = None if 'current_transcript' not in st.session_state: st.session_state.current_transcript = None # Main layout col1, col2 = st.columns([1, 2]) with col1: show_upload_section() with col2: if st.session_state.analysis_results: show_analysis_results() def show_upload_section(): st.header("Session Data Upload") upload_type = st.radio( "Select Input Method:", ["Audio Recording", "Video Recording", "Text Transcript", "Session Notes", "Previous Session Data"] ) if upload_type in ["Audio Recording", "Video Recording"]: file = st.file_uploader( f"Upload {upload_type}", type=["wav", "mp3", "mp4"] if upload_type == "Audio Recording" else ["mp4", "avi", "mov"] ) if file: process_media_file(file, upload_type) elif upload_type == "Text Transcript": file = st.file_uploader("Upload Transcript", type=["txt", "doc", "docx", "json"]) if file: process_text_file(file) elif upload_type == "Session Notes": show_manual_input_form() else: # Previous Session Data show_previous_sessions_selector() def process_video_file(video_file): """Process uploaded video file""" try: # Create a unique temporary file name temp_path = f"temp_video_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp4" # Save video temporarily with open(temp_path, "wb") as f: f.write(video_file.getbuffer()) # Display video st.video(temp_path) # Add transcript input transcript = st.text_area( "Enter the session transcript:", height=300, help="Paste or type the transcript of the session here." ) # Add analyze button if st.button("Analyze Transcript"): if transcript.strip(): with st.spinner('Analyzing transcript...'): analyze_session_content(transcript) else: st.warning("Please enter a transcript before analyzing.") # Clean up temporary file try: os.remove(temp_path) except: pass except Exception as e: st.error(f"Error processing video: {str(e)}") def process_audio_file(audio_file): """Process uploaded audio file""" try: # Save audio temporarily temp_path = f"temp_audio_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp3" with open(temp_path, "wb") as f: f.write(audio_file.getbuffer()) st.audio(temp_path) st.info("Audio uploaded successfully. Please provide transcript.") # Add manual transcript input transcript = st.text_area("Enter the session transcript:", height=300) # Add analyze button if st.button("Analyze Transcript"): if transcript: with st.spinner('Analyzing transcript...'): st.session_state.current_transcript = transcript analyze_session_content(transcript) else: st.warning("Please enter a transcript before analyzing.") except Exception as e: st.error(f"Error processing audio: {str(e)}") def process_media_file(file, type): st.write(f"Processing {type}...") # Add processing status status = st.empty() progress_bar = st.progress(0) try: # Read file content file_content = file.read() status.text("Generating transcript...") progress_bar.progress(50) # Generate transcript using Gemini model = genai.GenerativeModel('gemini-pro') # Convert file content to text if type == "Audio Recording": # For audio files, create a prompt that describes the audio prompt = f""" This is an audio recording of a therapy session. Please transcribe the conversation and include speaker labels where possible. Focus on capturing: 1. The therapist's questions and reflections 2. The client's responses and statements 3. Any significant pauses or non-verbal sounds """ else: # Video Recording # For video files, create a prompt that describes the video prompt = f""" This is a video recording of a therapy session. Please transcribe the conversation and include: 1. Speaker labels 2. Verbal communication 3. Relevant non-verbal cues and body language 4. Significant pauses or interactions """ # Generate transcript response = model.generate_content(prompt) transcript = response.text if transcript: st.session_state.current_transcript = transcript status.text("Analyzing content...") progress_bar.progress(80) analyze_session_content(transcript) progress_bar.progress(100) status.text("Processing complete!") except Exception as e: st.error(f"Error processing file: {str(e)}") finally: status.empty() progress_bar.empty() def get_processing_step_name(step): steps = [ "Loading media file", "Converting to audio", "Performing speech recognition", "Generating transcript", "Preparing analysis" ] return steps[step] def process_text_file(file): """Process uploaded text file""" try: # Read file content content = file.getvalue().decode("utf-8") st.session_state.current_transcript = content # Display transcript with edit option edited_transcript = st.text_area( "Review and edit transcript if needed:", value=content, height=300 ) # Add analyze button if st.button("Analyze Transcript"): with st.spinner('Analyzing transcript...'): st.session_state.current_transcript = edited_transcript analyze_session_content(edited_transcript) except Exception as e: st.error(f"Error processing file: {str(e)}") def parse_analysis_results(raw_results): """Parse the raw analysis results into structured format""" if isinstance(raw_results, dict): return raw_results # Already parsed try: # If it's a string, try to extract structured data analysis = { 'mi_adherence_score': 0, 'key_themes': [], 'technique_usage': {}, 'strengths': [], 'areas_for_improvement': [], 'session_summary': '' } # Extract score (assuming it's in format "Score: XX") score_match = re.search(r'Score:\s*(\d+)', raw_results) if score_match: analysis['mi_adherence_score'] = int(score_match.group(1)) # Extract themes (assuming they're listed after "Key Themes:") themes_match = re.search(r'Key Themes:(.*?)(?=\n\n|\Z)', raw_results, re.DOTALL) if themes_match: themes = themes_match.group(1).strip().split('\n') analysis['key_themes'] = [t.strip('- ') for t in themes if t.strip()] # Extract techniques (assuming they're listed with counts) techniques = re.findall(r'(\w+\s*\w*)\s*:\s*(\d+)', raw_results) if techniques: analysis['technique_usage'] = {t[0]: int(t[1]) for t in techniques} # Extract strengths strengths_match = re.search(r'Strengths:(.*?)(?=Areas for Improvement|\Z)', raw_results, re.DOTALL) if strengths_match: strengths = strengths_match.group(1).strip().split('\n') analysis['strengths'] = [s.strip('- ') for s in strengths if s.strip()] # Extract areas for improvement improvements_match = re.search(r'Areas for Improvement:(.*?)(?=\n\n|\Z)', raw_results, re.DOTALL) if improvements_match: improvements = improvements_match.group(1).strip().split('\n') analysis['areas_for_improvement'] = [i.strip('- ') for i in improvements if i.strip()] # Extract summary summary_match = re.search(r'Summary:(.*?)(?=\n\n|\Z)', raw_results, re.DOTALL) if summary_match: analysis['session_summary'] = summary_match.group(1).strip() return analysis except Exception as e: st.error(f"Error parsing analysis results: {str(e)}") return None def show_manual_input_form(): st.subheader("Session Details") with st.form("session_notes_form"): # Basic session information session_date = st.date_input("Session Date", datetime.now()) session_duration = st.number_input("Duration (minutes)", min_value=15, max_value=120, value=50) # Session content session_notes = st.text_area( "Session Notes", height=300, placeholder="Enter detailed session notes here..." ) # Key themes and observations key_themes = st.text_area( "Key Themes", height=100, placeholder="Enter key themes identified during the session..." ) # MI specific elements mi_techniques_used = st.multiselect( "MI Techniques Used", ["Open Questions", "Affirmations", "Reflections", "Summaries", "Change Talk", "Commitment Language", "Planning"] ) # Submit button submitted = st.form_submit_button("Analyze Session") if submitted and session_notes: # Combine all input into a structured format session_data = { 'date': session_date, 'duration': session_duration, 'notes': session_notes, 'themes': key_themes, 'techniques': mi_techniques_used } # Process the session data st.session_state.current_transcript = format_session_data(session_data) analyze_session_content(st.session_state.current_transcript) def analyze_session_content(transcript): """Analyze the session transcript using Gemini""" try: if not transcript: st.warning("Please provide a transcript for analysis.") return # Configure the model model = genai.GenerativeModel('gemini-pro') # Structured prompt for MI analysis prompt = f""" As an MI (Motivational Interviewing) expert, analyze this therapy session transcript and provide detailed feedback in the following format: === MI Adherence === Score: [Provide a score from 0-100] Strengths: - [List 3 specific strengths with examples] Areas for Growth: - [List 3 specific areas needing improvement with examples] === Technical Analysis === OARS Usage Count: - Open Questions: [number] - Affirmations: [number] - Reflections: [number] - Summaries: [number] === Client Language Analysis === Change Talk Examples: - [List 3-4 specific quotes showing change talk] Sustain Talk Examples: - [List 2-3 specific quotes showing sustain talk] Change Talk/Sustain Talk Ratio: [X:Y] === Session Flow === Key Moments: 1. [Describe key moment 1] 2. [Describe key moment 2] 3. [Describe key moment 3] Therapeutic Process: - [Describe how the session progressed] - [Note any significant shifts] === Recommendations === Priority Actions: 1. [Specific recommendation 1] 2. [Specific recommendation 2] 3. [Specific recommendation 3] Development Strategies: - [Practical strategy 1] - [Practical strategy 2] Analyze this transcript: {transcript} """ # Generate response response = model.generate_content(prompt) # Store results st.session_state.analysis_results = response.text return True except Exception as e: st.error(f"Error in analysis: {str(e)}") return False def generate_transcript(audio_content): """ Generate transcript from audio content using Google Speech-to-Text Note: This requires the Google Cloud Speech-to-Text API """ try: # Initialize Speech-to-Text client client = speech_v1.SpeechClient() # Configure audio and recognition settings audio = speech_v1.RecognitionAudio(content=audio_content) config = speech_v1.RecognitionConfig( encoding=speech_v1.RecognitionConfig.AudioEncoding.LINEAR16, sample_rate_hertz=16000, language_code="en-US", enable_automatic_punctuation=True, ) # Perform the transcription response = client.recognize(config=config, audio=audio) # Combine all transcriptions transcript = "" for result in response.results: transcript += result.alternatives[0].transcript + " " return transcript.strip() except Exception as e: st.error(f"Error in transcript generation: {str(e)}") return None def convert_video_to_audio(video_file): """ Convert video file to audio content Note: This is a placeholder - you'll need to implement actual video to audio conversion """ # Placeholder for video to audio conversion # You might want to use libraries like moviepy or ffmpeg-python st.warning("Video to audio conversion not implemented yet") return None def process_analysis_results(raw_analysis): """Process and structure the analysis results""" # Parse the raw analysis text and extract structured data sections = extract_analysis_sections(raw_analysis) # Calculate metrics metrics = calculate_mi_metrics(raw_analysis) return { "raw_analysis": raw_analysis, "structured_sections": sections, "metrics": metrics, "timestamp": datetime.now().isoformat() } def show_mi_metrics_dashboard(metrics): st.subheader("MI Performance Dashboard") col1, col2, col3, col4 = st.columns(4) with col1: show_metric_card( "MI Spirit Score", metrics.get('mi_spirit_score', 0), "0-5 scale" ) with col2: show_metric_card( "Change Talk Ratio", metrics.get('change_talk_ratio', 0), "Change vs Sustain" ) with col3: show_metric_card( "Reflection Ratio", metrics.get('reflection_ratio', 0), "Reflections/Questions" ) with col4: show_metric_card( "Overall Adherence", metrics.get('overall_adherence', 0), "Percentage" ) def show_metric_card(title, value, subtitle): st.markdown( f"""

{title}

{value:.2f}

{subtitle}

""", unsafe_allow_html=True ) def show_mi_adherence_analysis(results): st.subheader("MI Adherence Analysis") # OARS Implementation st.write("### OARS Implementation") show_oars_chart(results['metrics'].get('oars_metrics', {})) # MI Spirit Components st.write("### MI Spirit Components") show_mi_spirit_chart(results['metrics'].get('mi_spirit_metrics', {})) # Detailed breakdown st.write("### Detailed Analysis") st.markdown(results['structured_sections'].get('mi_adherence', '')) def show_technical_skills_analysis(results): st.subheader("Technical Skills Analysis") # Question Analysis col1, col2 = st.columns(2) with col1: show_question_type_chart(results['metrics'].get('question_metrics', {})) with col2: show_reflection_depth_chart(results['metrics'].get('reflection_metrics', {})) # Detailed analysis st.markdown(results['structured_sections'].get('technical_skills', '')) def show_client_language_analysis(results): st.subheader("Client Language Analysis") # Change Talk Timeline show_change_talk_timeline(results['metrics'].get('change_talk_timeline', [])) # Language Categories show_language_categories_chart(results['metrics'].get('language_categories', {})) # Detailed analysis st.markdown(results['structured_sections'].get('client_language', '')) def show_session_flow_analysis(results): st.subheader("Session Flow Analysis") # Session Flow Timeline show_session_flow_timeline(results['metrics'].get('session_flow', [])) # Engagement Metrics show_engagement_metrics(results['metrics'].get('engagement_metrics', {})) # Detailed analysis st.markdown(results['structured_sections'].get('session_flow', '')) def show_recommendations(results): st.subheader("Recommendations and Next Steps") col1, col2 = st.columns(2) with col1: st.write("### Strengths") strengths = results['structured_sections'].get('strengths', []) for strength in strengths: st.markdown(f"✓ {strength}") with col2: st.write("### Growth Areas") growth_areas = results['structured_sections'].get('growth_areas', []) for area in growth_areas: st.markdown(f"→ {area}") st.write("### Suggested Interventions") st.markdown(results['structured_sections'].get('suggested_interventions', '')) st.write("### Next Session Planning") st.markdown(results['structured_sections'].get('next_session_plan', '')) # Utility functions for charts and visualizations def show_oars_chart(oars_metrics): # Create OARS radar chart using plotly categories = ['Open Questions', 'Affirmations', 'Reflections', 'Summaries'] values = [ oars_metrics.get('open_questions', 0), oars_metrics.get('affirmations', 0), oars_metrics.get('reflections', 0), oars_metrics.get('summaries', 0) ] fig = go.Figure(data=go.Scatterpolar( r=values, theta=categories, fill='toself' )) fig.update_layout( polar=dict( radialaxis=dict( visible=True, range=[0, max(values) + 1] )), showlegend=False ) st.plotly_chart(fig) def save_analysis_results(): """Save analysis results to file""" if st.session_state.analysis_results: try: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"analysis_results_{timestamp}.json" with open(filename, "w") as f: json.dump(st.session_state.analysis_results, f, indent=4) st.success(f"Analysis results saved to {filename}") except Exception as e: st.error(f"Error saving analysis results: {str(e)}") def show_upload_section(): """Display the upload section of the dashboard""" st.subheader("Upload Session") upload_type = st.radio( "Choose input method:", ["Text Transcript", "Video Recording", "Audio Recording", "Session Notes", "Previous Sessions"] ) if upload_type == "Text Transcript": file = st.file_uploader("Upload transcript file", type=['txt', 'doc', 'docx']) if file: process_text_file(file) elif upload_type == "Video Recording": video_file = st.file_uploader("Upload video file", type=['mp4', 'mov', 'avi']) if video_file: process_video_file(video_file) elif upload_type == "Audio Recording": audio_file = st.file_uploader("Upload audio file", type=['mp3', 'wav', 'm4a']) if audio_file: process_audio_file(audio_file) elif upload_type == "Session Notes": show_manual_input_form() else: show_previous_sessions_selector() def process_text_file(file): try: if file.name.endswith('.json'): content = json.loads(file.read().decode()) transcript = extract_transcript_from_json(content) elif file.name.endswith('.docx'): doc = Document(file) transcript = '\n'.join([paragraph.text for paragraph in doc.paragraphs]) else: transcript = file.read().decode() if transcript: st.session_state.current_transcript = transcript analyze_session_content(transcript) except Exception as e: st.error(f"Error processing file: {str(e)}") def show_export_options(): st.sidebar.subheader("Export Options") if st.sidebar.button("Export Analysis Report"): save_analysis_results() report_format = st.sidebar.selectbox( "Report Format", ["PDF", "DOCX", "JSON"] ) if st.sidebar.button("Generate Report"): generate_report(report_format) def generate_report(format): """Generate analysis report in specified format""" # Add report generation logic here st.info(f"Generating {format} report... (Feature coming soon)") def show_previous_sessions_selector(): """Display selector for previous session data""" st.subheader("Previous Sessions") # Load or initialize previous sessions data if 'previous_sessions' not in st.session_state: st.session_state.previous_sessions = load_previous_sessions() if not st.session_state.previous_sessions: st.info("No previous sessions found.") return # Create session selector sessions = st.session_state.previous_sessions session_dates = [session['date'] for session in sessions] selected_date = st.selectbox( "Select Session Date:", session_dates, format_func=lambda x: x.strftime("%Y-%m-%d %H:%M") ) # Show selected session data if selected_date: selected_session = next( (session for session in sessions if session['date'] == selected_date), None ) if selected_session: st.session_state.current_transcript = selected_session['transcript'] analyze_session_content(selected_session['transcript']) def load_previous_sessions(): """Load previous session data from storage""" try: # Initialize empty list for sessions sessions = [] # Here you would typically load from your database or file storage # For demonstration, we'll create some sample data sample_sessions = [ { 'date': datetime.now(), 'transcript': "Sample transcript 1...", 'analysis': "Sample analysis 1..." }, { 'date': datetime.now(), 'transcript': "Sample transcript 2...", 'analysis': "Sample analysis 2..." } ] return sample_sessions except Exception as e: st.error(f"Error loading previous sessions: {str(e)}") return [] def format_session_data(session_data): """Format session data into analyzable transcript""" formatted_text = f""" Session Date: {session_data['date']} Duration: {session_data['duration']} minutes SESSION NOTES: {session_data['notes']} KEY THEMES: {session_data['themes']} MI TECHNIQUES USED: {', '.join(session_data['techniques'])} """ return formatted_text def show_analysis_results(): """Display the analysis results in organized tabs""" if 'analysis_results' not in st.session_state or not st.session_state.analysis_results: st.info("Please analyze a transcript first.") return results = st.session_state.analysis_results # Create tabs tabs = st.tabs([ "MI Adherence", "Technical Skills", "Client Language", "Session Flow", "Recommendations" ]) # MI Adherence Tab with tabs[0]: st.subheader("MI Adherence Analysis") # Extract score score_match = re.search(r'Score:\s*(\d+)', results) if score_match: score = int(score_match.group(1)) # Create score gauge fig = go.Figure(go.Indicator( mode="gauge+number", value=score, domain={'x': [0, 1], 'y': [0, 1]}, gauge={ 'axis': {'range': [0, 100]}, 'bar': {'color': "rgb(26, 118, 255)"}, 'steps': [ {'range': [0, 33], 'color': "lightgray"}, {'range': [33, 66], 'color': "gray"}, {'range': [66, 100], 'color': "darkgray"} ] } )) st.plotly_chart(fig) # Display strengths and areas for growth col1, col2 = st.columns(2) with col1: st.subheader("Strengths") strengths = re.findall(r'Strengths:\n((?:- .*\n)*)', results) if strengths: for strength in strengths[0].strip().split('\n'): if strength.startswith('- '): st.markdown(f"✅ {strength[2:]}") with col2: st.subheader("Areas for Growth") growth = re.findall(r'Areas for Growth:\n((?:- .*\n)*)', results) if growth: for area in growth[0].strip().split('\n'): if area.startswith('- '): st.markdown(f"🔄 {area[2:]}") # Technical Skills Tab with tabs[1]: st.subheader("OARS Technique Analysis") # Extract OARS counts oars_pattern = r'OARS Usage Count:\n- Open Questions: (\d+)\n- Affirmations: (\d+)\n- Reflections: (\d+)\n- Summaries: (\d+)' oars_match = re.search(oars_pattern, results) if oars_match: open_q = int(oars_match.group(1)) affirm = int(oars_match.group(2)) reflect = int(oars_match.group(3)) summ = int(oars_match.group(4)) # Create bar chart fig = go.Figure(data=[ go.Bar( x=['Open Questions', 'Affirmations', 'Reflections', 'Summaries'], y=[open_q, affirm, reflect, summ], marker_color=['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728'] ) ]) fig.update_layout( title="OARS Techniques Usage", xaxis_title="Technique Type", yaxis_title="Frequency", showlegend=False, height=400 ) st.plotly_chart(fig) # Display detailed breakdown col1, col2 = st.columns(2) with col1: st.markdown("### Technique Counts") st.markdown(f"🔹 **Open Questions:** {open_q}") st.markdown(f"🔹 **Affirmations:** {affirm}") st.markdown(f"🔹 **Reflections:** {reflect}") st.markdown(f"🔹 **Summaries:** {summ}") with col2: # Calculate total and percentages total = open_q + affirm + reflect + summ st.markdown("### Technique Distribution") st.markdown(f"🔸 **Open Questions:** {(open_q/total*100):.1f}%") st.markdown(f"🔸 **Affirmations:** {(affirm/total*100):.1f}%") st.markdown(f"🔸 **Reflections:** {(reflect/total*100):.1f}%") st.markdown(f"🔸 **Summaries:** {(summ/total*100):.1f}%") # Add reflection-to-question ratio st.markdown("### Key Metrics") if open_q > 0: r_to_q = reflect / open_q st.metric( label="Reflection-to-Question Ratio", value=f"{r_to_q:.2f}", help="Target ratio is 2:1 or higher" ) # Add MI best practice guidelines st.markdown("### MI Best Practices") st.info(""" 📌 **Ideal OARS Distribution:** - Reflections should exceed questions (2:1 ratio) - Regular use of affirmations (at least 1-2 per session) - Strategic use of summaries at transition points - Open questions > 70% of all questions """) else: st.warning("Technical skills analysis data not found in the results.") # Client Language Tab with tabs[2]: st.subheader("Client Language Analysis") col1, col2 = st.columns(2) with col1: st.markdown("### Change Talk 🌱") change_talk = re.findall(r'Change Talk Examples:\n((?:- .*\n)*)', results) if change_talk: for talk in change_talk[0].strip().split('\n'): if talk.startswith('- '): st.markdown(f"- {talk[2:]}") with col2: st.markdown("### Sustain Talk 🔄") sustain_talk = re.findall(r'Sustain Talk Examples:\n((?:- .*\n)*)', results) if sustain_talk: for talk in sustain_talk[0].strip().split('\n'): if talk.startswith('- '): st.markdown(f"- {talk[2:]}") # Session Flow Tab with tabs[3]: st.subheader("Session Flow Analysis") # Key Moments st.markdown("### Key Moments") key_moments = re.findall(r'Key Moments:\n((?:\d\. .*\n)*)', results) if key_moments: for moment in key_moments[0].strip().split('\n'): if moment.strip(): st.markdown(f"{moment}") # Therapeutic Process st.markdown("### Therapeutic Process") process = re.findall(r'Therapeutic Process:\n((?:- .*\n)*)', results) if process: for item in process[0].strip().split('\n'): if item.startswith('- '): st.markdown(f"- {item[2:]}") # Recommendations Tab with tabs[4]: st.subheader("Recommendations") # Priority Actions st.markdown("### Priority Actions 🎯") priorities = re.findall(r'Priority Actions:\n((?:\d\. .*\n)*)', results) if priorities: for priority in priorities[0].strip().split('\n'): if priority.strip(): st.markdown(f"{priority}") # Development Strategies st.markdown("### Development Strategies 📈") strategies = re.findall(r'Development Strategies:\n((?:- .*\n)*)', results) if strategies: for strategy in strategies[0].strip().split('\n'): if strategy.startswith('- '): st.markdown(f"- {strategy[2:]}") def get_technique_description(technique): """Return description for MI techniques""" descriptions = { "Open Questions": "Questions that allow for elaboration and cannot be answered with a simple yes/no.", "Reflections": "Statements that mirror, rephrase, or elaborate on the client's speech.", "Affirmations": "Statements that recognize client strengths and acknowledge behaviors that lead to positive change.", "Summaries": "Statements that collect, link, and transition between client statements.", "Information Giving": "Providing information with permission and in response to client needs.", # Add more techniques as needed } return descriptions.get(technique, "Description not available") def create_session_timeline(timeline_data): """Create a visual timeline of the session""" if not timeline_data: st.info("Detailed timeline not available") return fig = go.Figure() # Add timeline visualization code here st.plotly_chart(fig) def get_improvement_suggestion(area): """Return specific suggestions for improvement areas""" suggestions = { "Open Questions": "Try replacing closed questions with open-ended ones. Instead of 'Did you exercise?', ask 'What kinds of physical activity have you been doing?'", "Reflections": "Practice using more complex reflections by adding meaning or emotion to what the client has said.", "Empathy": "Focus on seeing the situation from the client's perspective and verbalize your understanding.", # Add more suggestions as needed } return suggestions.get(area, "Work on incorporating this element more intentionally in your sessions.") def create_action_items(analysis): """Create specific action items based on analysis""" st.write("Based on the analysis, consider focusing on these specific actions:") # Example action items action_items = [ "Practice one new MI skill each session", "Record and review your sessions", "Focus on developing complex reflections", "Track change talk/sustain talk ratio" ] for item in action_items: st.checkbox(item) def show_relevant_resources(analysis): """Display relevant resources based on analysis""" resources = [ {"title": "MI Practice Exercises", "url": "#"}, {"title": "Reflection Templates", "url": "#"}, {"title": "Change Talk Recognition Guide", "url": "#"}, {"title": "MI Community of Practice", "url": "#"} ] for resource in resources: st.markdown(f"[{resource['title']}]({resource['url']})") def parse_analysis_response(response_text): """Parse the AI response into structured analysis results""" try: # Initialize default structure for analysis results analysis = { 'mi_adherence_score': 0.0, 'key_themes': [], 'technique_usage': {}, 'strengths': [], 'areas_for_improvement': [], 'recommendations': [], 'change_talk_instances': [], 'session_summary': "" } # Extract MI adherence score score_match = re.search(r'MI Adherence Score:\s*(\d+\.?\d*)', response_text) if score_match: analysis['mi_adherence_score'] = float(score_match.group(1)) # Extract key themes themes_section = re.search(r'Key Themes:(.*?)(?=\n\n|\Z)', response_text, re.DOTALL) if themes_section: themes = themes_section.group(1).strip().split('\n') analysis['key_themes'] = [theme.strip('- ') for theme in themes if theme.strip()] # Extract technique usage technique_section = re.search(r'Technique Usage:(.*?)(?=\n\n|\Z)', response_text, re.DOTALL) if technique_section: techniques = technique_section.group(1).strip().split('\n') for technique in techniques: if ':' in technique: name, count = technique.split(':') analysis['technique_usage'][name.strip()] = int(count.strip()) # Extract strengths strengths_section = re.search(r'Strengths:(.*?)(?=\n\n|\Z)', response_text, re.DOTALL) if strengths_section: strengths = strengths_section.group(1).strip().split('\n') analysis['strengths'] = [s.strip('- ') for s in strengths if s.strip()] # Extract areas for improvement improvements_section = re.search(r'Areas for Improvement:(.*?)(?=\n\n|\Z)', response_text, re.DOTALL) if improvements_section: improvements = improvements_section.group(1).strip().split('\n') analysis['areas_for_improvement'] = [i.strip('- ') for i in improvements if i.strip()] # Extract session summary summary_section = re.search(r'Session Summary:(.*?)(?=\n\n|\Z)', response_text, re.DOTALL) if summary_section: analysis['session_summary'] = summary_section.group(1).strip() return analysis except Exception as e: st.error(f"Error parsing analysis response: {str(e)}") return None def get_improvement_suggestion(area): """Return specific suggestions for improvement areas""" suggestions = { "open questions": "Practice replacing closed questions with open-ended ones. For example:\n- Instead of: 'Did you exercise?'\n- Try: 'What kinds of physical activity have you been doing?'", "reflections": "Work on using more complex reflections by adding meaning or emotion to what the client has said. Try to make at least two complex reflections for every simple reflection.", "empathy": "Focus on seeing the situation from the client's perspective. Take time to verbalize your understanding of their emotions and experiences.", "summaries": "Use more collecting summaries to gather key points discussed and transition summaries to move between topics.", "affirmations": "Look for opportunities to genuinely affirm client strengths and efforts, not just outcomes." } # Look for matching suggestions using partial string matching for key, value in suggestions.items(): if key in area.lower(): return value return "Focus on incorporating this element more intentionally in your sessions. Consider recording your sessions and reviewing them with a supervisor or peer." def create_gauge_chart(score): """Create a gauge chart for MI Adherence Score""" fig = go.Figure(go.Indicator( mode = "gauge+number", value = score, domain = {'x': [0, 1], 'y': [0, 1]}, title = {'text': "MI Adherence"}, gauge = { 'axis': {'range': [0, 100]}, 'bar': {'color': "darkblue"}, 'steps': [ {'range': [0, 40], 'color': "lightgray"}, {'range': [40, 70], 'color': "gray"}, {'range': [70, 100], 'color': "darkgray"} ], 'threshold': { 'line': {'color': "red", 'width': 4}, 'thickness': 0.75, 'value': 90 } } )) st.plotly_chart(fig) def create_technique_usage_chart(technique_usage): """Create a bar chart for MI technique usage""" df = pd.DataFrame(list(technique_usage.items()), columns=['Technique', 'Count']) fig = px.bar( df, x='Technique', y='Count', title='MI Technique Usage Frequency' ) fig.update_layout( xaxis_title="Technique", yaxis_title="Frequency", showlegend=False ) st.plotly_chart(fig) def extract_transcript_from_json(content): """Extract transcript from JSON content""" if isinstance(content, dict): return json.dumps(content, indent=2) return str(content) # Analysis display functions def show_mi_adherence_analysis(analysis): st.subheader("MI Adherence Analysis") st.write(analysis.get('raw_text', 'No analysis available')) def show_technical_skills_analysis(analysis): st.subheader("Technical Skills Analysis") st.write(analysis.get('raw_text', 'No analysis available')) def show_client_language_analysis(analysis): st.subheader("Client Language Analysis") st.write(analysis.get('raw_text', 'No analysis available')) def show_session_flow_analysis(analysis): st.subheader("Session Flow Analysis") st.write(analysis.get('raw_text', 'No analysis available')) def show_recommendations(analysis): st.subheader("Recommendations") st.write(analysis.get('raw_text', 'No recommendations available')) if __name__ == "__main__": show_session_analysis()