|
import torch |
|
import numpy as np |
|
import io |
|
import matplotlib.pyplot as plt |
|
import pandas as pd |
|
from sentence_transformers import SentenceTransformer |
|
from transformers import pipeline |
|
from datetime import datetime |
|
from PIL import Image |
|
import os |
|
from datetime import datetime |
|
|
|
|
|
|
|
if torch.cuda.is_available(): |
|
model = model.to('cuda') |
|
|
|
dials_embeddings = pd.read_pickle('https://huggingface.co/datasets/vsrinivas/CBT_dialogue_embed_ds/resolve/main/kaggle_therapy_embeddings.pkl') |
|
with open ('emotion_group_labels.txt') as file: |
|
emotion_group_labels = file.read().splitlines() |
|
|
|
embed_model = SentenceTransformer('paraphrase-MiniLM-L6-v2') |
|
classifier = pipeline("zero-shot-classification", model ='facebook/bart-large-mnli') |
|
|
|
|
|
|
|
huggingface_token = os.getenv('hf_token') |
|
print("hf-token:",huggingface_token) |
|
|
|
|
|
def detect_emotions(text): |
|
emotion = classifier(text, candidate_labels=emotion_group_labels, batch_size=16) |
|
top_5_scores = [i/sum(emotion['scores'][:5]) for i in emotion['scores'][:5]] |
|
top_5_emotions = emotion['labels'][:5] |
|
emotion_set = {l: "{:.2%}".format(s) for l, s in zip(top_5_emotions, top_5_scores)} |
|
return emotion_set |
|
|
|
|
|
def cosine_distance(vec1,vec2): |
|
cosine = (np.dot(vec1, vec2)/(np.linalg.norm(vec1)*np.linalg.norm(vec2))) |
|
return cosine |
|
|
|
|
|
def generate_triggers_img(items): |
|
labels = list(items.keys()) |
|
values = [float(v.strip('%')) for v in items.values()] |
|
|
|
new_items = {k:v for k, v in zip(labels, values)} |
|
new_items = dict(sorted(new_items.items(), key=lambda item: item[1])) |
|
labels = list(new_items.keys()) |
|
values = list(new_items.values()) |
|
|
|
fig, ax = plt.subplots(figsize=(10, 6)) |
|
colors = plt.cm.viridis(np.linspace(0, 1, len(labels))) |
|
|
|
bars = ax.barh(labels, values, color=colors) |
|
|
|
for spine in ax.spines.values(): |
|
spine.set_visible(False) |
|
|
|
ax.tick_params(axis='y', labelsize=18) |
|
ax.xaxis.set_visible(False) |
|
ax.yaxis.set_ticks_position('none') |
|
|
|
for bar in bars: |
|
width = bar.get_width() |
|
ax.text(width, bar.get_y() + bar.get_height()/2, f'{width:.2f}%', |
|
ha='left', va='center', fontweight='bold', fontsize=18) |
|
|
|
plt.tight_layout() |
|
plt.savefig('triggeres.png') |
|
triggers_img = Image.open('triggeres.png') |
|
return triggers_img |
|
|
|
def get_doc_response_emotions(user_message, therapy_session_conversation): |
|
user_messages = [] |
|
user_messages.append(user_message) |
|
emotion_set = detect_emotions(user_message) |
|
print(emotion_set) |
|
|
|
emotions_msg = generate_triggers_img(emotion_set) |
|
user_embedding = embed_model.encode(user_message, device='cuda' if torch.cuda.is_available() else 'cpu') |
|
|
|
similarities =[] |
|
for v in dials_embeddings['embeddings']: |
|
similarities.append(cosine_distance(user_embedding,v)) |
|
|
|
top_match_index = similarities.index(max(similarities)) |
|
doc_response = dials_embeddings.iloc[top_match_index]['Doctor'] |
|
|
|
therapy_session_conversation.append(["User: "+user_message, "Therapist: "+doc_response]) |
|
|
|
print(f"User's message: {user_message}") |
|
print(f"RAG Matching message: {dials_embeddings.iloc[top_match_index]['Patient']}") |
|
print(f"Therapist's response: {dials_embeddings.iloc[top_match_index]['Doctor']}\n\n") |
|
print(f"therapy_session_conversation: {therapy_session_conversation}") |
|
return '', therapy_session_conversation, emotions_msg |
|
|
|
from transformers import AutoTokenizer, AutoModelForCausalLM |
|
import torch |
|
|
|
|
|
tokenizer = AutoTokenizer.from_pretrained("tiiuae/Falcon-H1-3B-Instruct") |
|
use_cuda = torch.cuda.is_available() |
|
model = AutoModelForCausalLM.from_pretrained( |
|
"tiiuae/Falcon-H1-1.5B-Deep-Instruct", |
|
torch_dtype=torch.float16 if use_cuda else torch.float32, |
|
device_map="auto" if use_cuda else None |
|
) |
|
def generate_falcon_response(prompt, max_new_tokens=300): |
|
inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=1800).to(model.device) |
|
outputs = model.generate( |
|
**inputs, |
|
max_new_tokens=max_new_tokens, |
|
do_sample=True, |
|
temperature=0.7, |
|
top_p=0.9 |
|
) |
|
decoded_output = tokenizer.decode(outputs[0], skip_special_tokens=True) |
|
return decoded_output[len(prompt):].strip() |
|
|
|
|
|
def summarize_and_recommend(therapy_session_conversation): |
|
|
|
session_time = str(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) |
|
|
|
|
|
session_conversation = [x for item in therapy_session_conversation for x in item if x is not None] |
|
|
|
session_conversation.insert(0, "Session_time: " + session_time) |
|
|
|
session_conversation_processed = '\n'.join(session_conversation) |
|
print("session_conversation_processed:", session_conversation_processed) |
|
|
|
|
|
summary_prompt = f"""You are an Expert Cognitive Behavioural Therapist and Precis writer. |
|
Summarize STRICTLY the following session into concise, ethical, and clinically meaningful content. |
|
|
|
Session: |
|
{session_conversation_processed} |
|
|
|
Format your response as: |
|
Session Time: |
|
Summary of the patient messages: |
|
Summary of therapist messages: |
|
Summary of the whole session: |
|
Ensure the entire summary is less than 300 tokens.""" |
|
|
|
full_summary = generate_falcon_response(summary_prompt, max_new_tokens=300) |
|
print("\nFull summary:", full_summary) |
|
|
|
|
|
recommendation_prompt = f"""You are an expert Cognitive Behavioural Therapist. |
|
Based STRICTLY on the following summary, provide a clinically valid action plan for the patient. |
|
|
|
Summary: |
|
{full_summary} |
|
|
|
Use this format: |
|
- The patient is referred to... |
|
- The patient is advised to... |
|
- The patient is refrained from... |
|
- It is suggested that the patient... |
|
- Scheduled a follow-up session with the patient... |
|
|
|
Ensure the list contains NOT MORE THAN 7 points and is in passive voice with proper tense.""" |
|
|
|
full_recommendations = generate_falcon_response(recommendation_prompt, max_new_tokens=400) |
|
print("\nFull recommendations:", full_recommendations) |
|
|
|
chatbox = [] |
|
return full_summary, full_recommendations, chatbox |
|
|
|
|
|
|