# 필요한 라이브러리 설치를 위한 셀 # !pip install transformers torch pandas openpyxl from transformers import AutoTokenizer, AutoModelForSequenceClassification import torch import pandas as pd from datetime import datetime class SimplifiedMedicalChatbot: def __init__(self, data_path="/content/sample_data/sick.xlsx"): self.load_database(data_path) self.disclaimer = """ 주의: 이 챗봇은 참고용으로만 사용하시기 바랍니다. 정확한 진단을 위해서는 반드시 의료 전문가와 상담하세요. 증상이 심각하다고 판단되면 즉시 병원을 방문하시기 바랍니다. """ def load_database(self, data_path): try: self.df = pd.read_excel(data_path) self.df = self.df.fillna('') print(f"데이터베이스 로드 완료: {len(self.df)} 개의 레코드") except Exception as e: print(f"데이터베이스 로딩 실패: {str(e)}") self.df = pd.DataFrame() def calculate_age(self, born_year): """출생연도로 나이 계산""" current_year = datetime.now().year return current_year - int(born_year) def get_age_group(self, age): if age < 12: return "어린이" elif age >= 65: return "노인" else: return "성인" def simple_symptom_matching(self, pain, description): """증상 매칭 로직 - pain과 description 활용, 신뢰도 점수 계산""" if '증상' not in self.df.columns: print("증상 컬럼이 데이터베이스에 없습니다.") return [] matched_diseases = [] symptoms = f"{pain} {description}".strip().split() for _, row in self.df.iterrows(): db_symptoms = row['증상'].split() # 각 증상별 점수 계산 score = 0 # 주요 증상(pain) 체크 if pain.strip() and pain in row['증상']: score += 60 # 주요 증상 일치시 기본 60점 # 상세 증상 체크 desc_symptoms = description.strip().split() if description else [] for symptom in desc_symptoms: if symptom in row['증상']: score += 20 # 각 부가 증상 일치시 20점 추가 # 최소한의 관련성이 있는 경우 추가 (0점 이상) if score > 0 or pain in row['증상']: matched_diseases.append((row['질환명'], score)) # 결과가 3개 미만인 경우 관련 질병 추가 sorted_diseases = sorted(matched_diseases, key=lambda x: x[1], reverse=True) if len(sorted_diseases) < 3: # 점수와 관계없이 주요 증상이 포함된 다른 질병들도 추가 for _, row in self.df.iterrows(): disease_name = row['질환명'] if disease_name not in [d[0] for d in sorted_diseases]: if pain in row['증상']: sorted_diseases.append((disease_name, 10)) # 낮은 점수로 추가 if len(sorted_diseases) >= 3: break # 상위 3개 반환 (3개 미만인 경우 전체 반환) return sorted_diseases[:3] def get_medicine_info(self, disease, age_group, medi_tf=False): """질병과 나이대에 따른 약품 정보 반환""" try: disease_info = self.df[self.df['질환명'] == disease] if disease_info.empty: return { "ageWarning": "", "medicine": "", "ingredients": "", "medicalAttention": [], "error": "해당 질병에 대한 정보를 찾을 수 없습니다." } result = { "ageWarning": "", "medicine": "", "ingredients": "", "medicalAttention": [] } # 나이대별 주의사항 age_specific_info = disease_info['나이대 '].iloc[0] if age_specific_info: if age_group == "어린이": result["ageWarning"] = f"어린이(12세 미만) 주의사항: {age_specific_info}" elif age_group == "노인": result["ageWarning"] = f"노인(65세 이상) 주의사항: {age_specific_info}" # mediTF가 true일 때만 약품 정보 포함 if medi_tf: if disease_info['추천 의약품'].iloc[0]: result["medicine"] = disease_info['추천 의약품'].iloc[0] if disease_info['성분'].iloc[0]: result["ingredients"] = disease_info['성분'].iloc[0] # 의사 진료 필요 사항 medical_attention = disease_info['의사의 진료가 필요한 경우'].iloc[0] if medical_attention: attention_list = [item.strip() for item in medical_attention.split(' ')] result["medicalAttention"] = attention_list return result except Exception as e: print(f"의약품 정보 조회 중 오류: {str(e)}") return { "ageWarning": "", "medicine": "", "ingredients": "", "medicalAttention": [], "error": f"정보 조회 중 오류 발생: {str(e)}" } def format_chat_response(self, response_data): """JSON 응답을 사용자 친화적인 텍스트로 변환""" chat_response = [] # 사용자 정보 요약 chat_response.append(f"\n[{response_data['ageGroup']}({response_data['age']}세) / {response_data['sex']} 사용자 분석 결과]") # 분석 결과 for analysis in response_data['analysis']: if analysis['disease']: chat_response.append(f"\n## 추정 질환: {analysis['disease']}") if analysis['ageWarning']: chat_response.append(analysis['ageWarning']) if analysis['medicine']: chat_response.append(f"추천 의약품: {analysis['medicine']}") if analysis['ingredients']: chat_response.append(f"주요 성분: {analysis['ingredients']}") if analysis['medicalAttention']: chat_response.append("의사의 진료가 필요한 경우:") chat_response.extend(analysis['medicalAttention']) else: chat_response.append("\n일치하는 질병을 찾을 수 없습니다.") chat_response.append("-" * 40) # 면책 조항 chat_response.append(f"\n{response_data['disclaimer']}") return "\n".join(chat_response) def process_request(self, request_data): """백엔드 요청 처리""" try: # 사용자 정보 추출 user_id = request_data.get("userId", "") born_year = request_data.get("bornYear", "") sex = request_data.get("sex", "") # 증상 정보 추출 pain = request_data.get("pain", "") pain_description = request_data.get("description", "") medi_tf = request_data.get("mediTF", False) # 나이 계산 age = self.calculate_age(born_year) age_group = self.get_age_group(age) # 증상 매칭 matched_diseases = self.simple_symptom_matching(pain, pain_description) # 응답 생성 response = { "userId": user_id, "age": age, "ageGroup": age_group, "sex": sex, "analysis": [] } if matched_diseases: response["analysis"] = [] for disease, _ in matched_diseases: medical_info = self.get_medicine_info(disease, age_group, medi_tf) disease_info = { "disease": disease, "ageWarning": medical_info["ageWarning"], "medicine": medical_info["medicine"], "ingredients": medical_info["ingredients"], "medicalAttention": medical_info["medicalAttention"] } response["analysis"].append(disease_info) else: response["analysis"].append({ "disease": None, "confidence": 0.0, # 매칭 실패시 0% 신뢰도 "ageWarning": "", "medicine": "", "ingredients": "", "medicalAttention": [], "error": "일치하는 질병을 찾을 수 없습니다." }) response["disclaimer"] = self.disclaimer return { "json_response": response, "chat_message": self.format_chat_response(response) } except Exception as e: error_response = { "error": f"요청 처리 중 오류 발생: {str(e)}", "disclaimer": self.disclaimer } return { "json_response": error_response, "chat_message": f"죄송합니다. 오류가 발생했습니다: {str(e)}\n\n{self.disclaimer}" } def create_sample_data(): """테스트용 샘플 데이터 생성""" data = { '질환명': ['편두통', '군발성 두통', '긴장성 두통', '뇌수막염', '감기', '위염'], '증상': [ '두통 한쪽 머리 통증 구토 어지러움 메스꺼움', '한쪽 머리 통증 눈통증 콧물 어지러움', '두통 목통증 어깨통증 메스꺼움 스트레스', '두통 구토 발열 목이 뻣뻣함 메스꺼움', '두통 발열 기침 콧물', '복통 메스꺼움 구토 소화불량' ], '추천 의약품': [ '게보린', '타이레놀', '이지엔6', '항생제 처방 필요', '타이레놀', '개비스콘' ], '성분': [ '이부프로펜', '아세트아미노펜', '나프록센', '처방약만 가능', '아세트아미노펜', '알긴산나트륨' ], '의사의 진료가 필요한 경우': [ '두통이 24시간 이상 지속될 때\n시야가 흐려질 때', '통증이 매우 심할 때\n하루 여러번 발생할 때', '두통이 만성적일 때\n일상생활이 어려울 때', '즉시 병원 방문 필요\n응급상황일 수 있음', '고열이 3일 이상 지속될 때\n호흡곤란이 있을 때', '복통이 심하고 지속될 때\n위장 출혈 증상이 있을 때' ], '나이대 ': [ '진통제 복용량 조절 필요', '노인 투약 주의', '청소년 복용량 조절', '연령별 항생제 처방 필요', '해열제 복용 시 주의', '위산 분비 조절제 주의' ] } return pd.DataFrame(data) def run_test(): """테스트 실행""" print("=== 의료 상담 챗봇 테스트 시작 ===") # 샘플 데이터 생성 및 임시 파일 저장 sample_df = create_sample_data() temp_file = "/content/sample_data/sick.xlsx" sample_df.to_excel(temp_file, index=False) print("샘플 데이터 생성 완료") # 챗봇 인스턴스 생성 chatbot = SimplifiedMedicalChatbot(temp_file) # 테스트 케이스들 test_cases = [ { "userId": "user1", "bornYear": "1990", "sex": "male", "pain": "두통", "description": "한쪽 머리가 아프고 메스꺼워요", "mediTF": True }, { "userId": "user2", "bornYear": "2015", "sex": "female", "pain": "복통", "description": "배가 아프고 구토를 해요", "mediTF": True } ] # 각 테스트 케이스 실행 for i, test_case in enumerate(test_cases, 1): print(f"\n=== 테스트 케이스 {i} ===") print("입력:", test_case) # 요청 처리 result = chatbot.process_request(test_case) # 결과 출력 print("\n[챗봇 응답]") print(result["chat_message"]) print("\n[JSON 응답]") print(result["json_response"]) print("\n" + "="*50) if __name__ == "__main__": run_test()