import os,sys import traceback # 상단에 추가 # install required packages os.system('pip install plotly') # plotly 설치 os.system('pip install matplotlib') # matplotlib 설치 os.system('pip install dgl==1.0.2+cu116 -f https://data.dgl.ai/wheels/cu116/repo.html') os.environ["DGLBACKEND"] = "pytorch" print('Modules installed') # 기본 args 설정 if not os.path.exists('./tmp'): os.makedirs('./tmp') if not os.path.exists('./tmp/args.json'): default_args = { 'checkpoint': None, 'dump_trb': False, 'dump_args': True, 'save_best_plddt': True, 'T': 25, 'strand_bias': 0.0, 'loop_bias': 0.0, 'helix_bias': 0.0, 'd_t1d': 24, 'potentials': None, 'potential_scale': None, 'aa_composition': None } with open('./tmp/args.json', 'w') as f: json.dump(default_args, f) # 체크포인트 파일 다운로드 if not os.path.exists('./SEQDIFF_230205_dssp_hotspots_25mask_EQtasks_mod30.pt'): print('Downloading model weights 1') os.system('wget http://files.ipd.uw.edu/pub/sequence_diffusion/checkpoints/SEQDIFF_230205_dssp_hotspots_25mask_EQtasks_mod30.pt') print('Successfully Downloaded') if not os.path.exists('./SEQDIFF_221219_equalTASKS_nostrSELFCOND_mod30.pt'): print('Downloading model weights 2') os.system('wget http://files.ipd.uw.edu/pub/sequence_diffusion/checkpoints/SEQDIFF_221219_equalTASKS_nostrSELFCOND_mod30.pt') print('Successfully Downloaded') from openai import OpenAI import gradio as gr import json # json 모듈 추가 from datasets import load_dataset import plotly.graph_objects as go import numpy as np import py3Dmol from io import StringIO import json import secrets import copy import matplotlib.pyplot as plt from utils.sampler import HuggingFace_sampler from utils.parsers_inference import parse_pdb from model.util import writepdb from utils.inpainting_util import * import os # args 로드 with open('./tmp/args.json', 'r') as f: args = json.load(f) plt.rcParams.update({'font.size': 13}) # manually set checkpoint to load args['checkpoint'] = None args['dump_trb'] = False args['dump_args'] = True args['save_best_plddt'] = True args['T'] = 25 args['strand_bias'] = 0.0 args['loop_bias'] = 0.0 args['helix_bias'] = 0.0 # Hugging Face 토큰 설정 ACCESS_TOKEN = os.getenv("HF_TOKEN") if not ACCESS_TOKEN: raise ValueError("HF_TOKEN not found in environment variables") # OpenAI 클라이언트 설정 (Hugging Face 엔드포인트 사용) client = OpenAI( base_url="https://api-inference.huggingface.co/v1/", api_key=ACCESS_TOKEN, ) # 데이터셋 로드 및 구조 확인 try: ds = load_dataset("lamm-mit/protein_secondary_structure_from_PDB", token=ACCESS_TOKEN) print("Dataset structure:", ds) print("First entry example:", next(iter(ds['train']))) except Exception as e: print(f"Dataset loading error: {str(e)}") raise def respond( message, history, system_message, max_tokens, temperature, top_p, ): messages = [{"role": "system", "content": system_message}] for msg in history: messages.append({"role": "user", "content": msg[0]}) if msg[1]: messages.append({"role": "assistant", "content": msg[1]}) messages.append({"role": "user", "content": message}) try: response = "" for chunk in client.chat.completions.create( model="CohereForAI/c4ai-command-r-plus-08-2024", max_tokens=max_tokens, stream=True, temperature=temperature, top_p=top_p, messages=messages, ): if hasattr(chunk.choices[0].delta, 'content'): token = chunk.choices[0].delta.content if token is not None: response += token yield [{"role": "user", "content": message}, {"role": "assistant", "content": response}] return [{"role": "user", "content": message}, {"role": "assistant", "content": response}] except Exception as e: print(f"Error in respond: {str(e)}") return [{"role": "user", "content": message}, {"role": "assistant", "content": f"오류가 발생했습니다: {str(e)}"}] def analyze_prompt(message): """LLM을 사용하여 프롬프트 분석""" try: analysis_prompt = f""" 다음 요청을 분석하여 단백질 설계에 필요한 주요 특성을 추출하세요: 요청: {message} 다음 항목들을 분석해주세요: 1. 주요 기능 (예: 치료, 결합, 촉매 등) 2. 목표 환경 (예: 세포막, 수용성, 등) 3. 필요한 구조적 특징 4. 크기 및 복잡도 요구사항 """ response = client.chat.completions.create( model="CohereForAI/c4ai-command-r-plus-08-2024", messages=[{"role": "user", "content": analysis_prompt}], temperature=0.7 ) return response.choices[0].message.content except Exception as e: print(f"프롬프트 분석 중 오류: {str(e)}") return None def search_protein_data(analysis, dataset): """분석 결과를 바탕으로 데이터셋에서 유사한 구조 검색""" try: # 키워드 추출 keywords = extract_keywords(analysis) print("Extracted keywords:", keywords) # 데이터셋 구조 확인 if not dataset or 'train' not in dataset: print("Invalid dataset structure") return [] # 유사도 점수 계산 scored_entries = [] for entry in dataset['train']: try: score = calculate_similarity(keywords, entry) scored_entries.append((score, entry)) except Exception as e: print(f"Error processing entry: {str(e)}") continue # 결과 정렬 및 반환 scored_entries.sort(reverse=True) return scored_entries[:3] except Exception as e: print(f"데이터 검색 중 오류: {str(e)}") return [] def extract_parameters(analysis, similar_structures): """분석 결과와 유사 구조를 바탕으로 생성 파라미터 결정""" try: # 기본 파라미터 템플릿 params = { 'sequence_length': 100, 'helix_bias': 0.02, 'strand_bias': 0.02, 'loop_bias': 0.1, 'hydrophobic_target_score': 0 } # 분석 결과에서 구조적 요구사항 파악 if "막 투과" in analysis or "소수성" in analysis: params['hydrophobic_target_score'] = -2 params['helix_bias'] = 0.03 elif "수용성" in analysis or "가용성" in analysis: params['hydrophobic_target_score'] = 2 params['loop_bias'] = 0.15 # 유사 구조들의 특성 반영 if similar_structures: avg_length = sum(len(s[1]['sequence']) for s in similar_structures) / len(similar_structures) params['sequence_length'] = int(avg_length) # 구조적 특성 분석 및 반영 for _, structure in similar_structures: if 'secondary_structure' in structure: helix_ratio = structure['secondary_structure'].count('H') / len(structure['secondary_structure']) sheet_ratio = structure['secondary_structure'].count('E') / len(structure['secondary_structure']) params['helix_bias'] = max(0.01, min(0.05, helix_ratio)) params['strand_bias'] = max(0.01, min(0.05, sheet_ratio)) return params except Exception as e: print(f"파라미터 추출 중 오류: {str(e)}") return None def process_chat(message, history): try: if any(keyword in message.lower() for keyword in ['protein', 'generate', '단백질', '생성', '치료']): # 1. LLM을 사용한 프롬프트 분석 analysis = analyze_prompt(message) if not analysis: return history + [ {"role": "user", "content": message}, {"role": "assistant", "content": "요청 분석에 실패했습니다."} ] # 2. 유사 구조 검색 similar_structures = search_protein_data(analysis, ds) if not similar_structures: return history + [ {"role": "user", "content": message}, {"role": "assistant", "content": "적합한 참조 구조를 찾지 못했습니다."} ] # 3. 생성 파라미터 결정 params = extract_parameters(analysis, similar_structures) if not params: return history + [ {"role": "user", "content": message}, {"role": "assistant", "content": "파라미터 설정에 실패했습니다."} ] # 4. 단백질 생성 try: protein_result = protein_diffusion_model( sequence=None, seq_len=params['sequence_length'], helix_bias=params['helix_bias'], strand_bias=params['strand_bias'], loop_bias=params['loop_bias'], secondary_structure=None, aa_bias=None, aa_bias_potential=None, num_steps="25", noise="normal", hydrophobic_target_score=str(params['hydrophobic_target_score']), hydrophobic_potential="2", contigs=None, pssm=None, seq_mask=None, str_mask=None, rewrite_pdb=None ) output_seq, output_pdb, structure_view, plddt_plot = next(protein_result) # 5. 결과 설명 생성 explanation = f""" 요청하신 기능에 맞는 단백질을 생성했습니다: 분석된 요구사항: {analysis} 설계된 구조적 특징: - 길이: {params['sequence_length']} 아미노산 - 알파 헬릭스 비율: {params['helix_bias']*100:.1f}% - 베타 시트 비율: {params['strand_bias']*100:.1f}% - 루프 구조 비율: {params['loop_bias']*100:.1f}% - 소수성 점수: {params['hydrophobic_target_score']} 참조된 유사 구조: {len(similar_structures)}개 생성된 단백질의 3D 구조와 시퀀스를 확인하실 수 있습니다. """ # 6. 결과 저장 global current_protein_result current_protein_result = { 'sequence': output_seq, 'pdb': output_pdb, 'structure_view': structure_view, 'plddt_plot': plddt_plot, 'params': params } return history + [ {"role": "user", "content": message}, {"role": "assistant", "content": explanation} ] except Exception as e: return history + [ {"role": "user", "content": message}, {"role": "assistant", "content": f"단백질 생성 중 오류가 발생했습니다: {str(e)}"} ] else: return history + [ {"role": "user", "content": message}, {"role": "assistant", "content": "단백질 생성 관련 키워드를 포함해주세요."} ] except Exception as e: return history + [ {"role": "user", "content": message}, {"role": "assistant", "content": f"처리 중 오류가 발생했습니다: {str(e)}"} ] def generate_protein(params): # 기존 protein_diffusion_model 함수 호출 result = protein_diffusion_model( sequence=None, seq_len=params['sequence_length'], helix_bias=params['helix_bias'], strand_bias=params['strand_bias'], loop_bias=params['loop_bias'], secondary_structure=None, aa_bias=None, aa_bias_potential=None, num_steps="25", noise="normal", hydrophobic_target_score=str(params['hydrophobic_target_score']), hydrophobic_potential="2", contigs=None, pssm=None, seq_mask=None, str_mask=None, rewrite_pdb=None ) return result def generate_explanation(result, params): explanation = f""" 생성된 단백질 분석: - 길이: {params['sequence_length']} 아미노산 - 구조적 특징: * 알파 나선 비율: {params['helix_bias']*100}% * 베타 시트 비율: {params['strand_bias']*100}% * 루프 구조 비율: {params['loop_bias']*100}% - 특수 기능: {result.get('special_features', '없음')} """ return explanation # 체크포인트 파일 경로를 절대 경로로 수정 def protein_diffusion_model(sequence, seq_len, helix_bias, strand_bias, loop_bias, secondary_structure, aa_bias, aa_bias_potential, num_steps, noise, hydrophobic_target_score, hydrophobic_potential, contigs, pssm, seq_mask, str_mask, rewrite_pdb): dssp_checkpoint = './SEQDIFF_230205_dssp_hotspots_25mask_EQtasks_mod30.pt' og_checkpoint = './SEQDIFF_221219_equalTASKS_nostrSELFCOND_mod30.pt' # 체크포인트 파일 존재 확인 if not os.path.exists(dssp_checkpoint): raise FileNotFoundError(f"DSSP checkpoint file not found at: {dssp_checkpoint}") if not os.path.exists(og_checkpoint): raise FileNotFoundError(f"OG checkpoint file not found at: {og_checkpoint}") model_args = copy.deepcopy(args) # make sampler S = HuggingFace_sampler(args=model_args) # get random prefix S.out_prefix = './tmp/'+secrets.token_hex(nbytes=10).upper() # set args S.args['checkpoint'] = None S.args['dump_trb'] = False S.args['dump_args'] = True S.args['save_best_plddt'] = True S.args['T'] = 25 S.args['strand_bias'] = 0.0 S.args['loop_bias'] = 0.0 S.args['helix_bias'] = 0.0 S.args['potentials'] = None S.args['potential_scale'] = None S.args['aa_composition'] = None # get sequence if entered and make sure all chars are valid alt_aa_dict = {'B':['D','N'],'J':['I','L'],'U':['C'],'Z':['E','Q'],'O':['K']} if sequence not in ['',None]: L = len(sequence) aa_seq = [] for aa in sequence.upper(): if aa in alt_aa_dict.keys(): aa_seq.append(np.random.choice(alt_aa_dict[aa])) else: aa_seq.append(aa) S.args['sequence'] = aa_seq elif contigs not in ['',None]: S.args['contigs'] = [contigs] else: S.args['contigs'] = [f'{seq_len}'] L = int(seq_len) print('DEBUG: ',rewrite_pdb) if rewrite_pdb not in ['',None]: S.args['pdb'] = rewrite_pdb.name if seq_mask not in ['',None]: S.args['inpaint_seq'] = [seq_mask] if str_mask not in ['',None]: S.args['inpaint_str'] = [str_mask] if secondary_structure in ['',None]: secondary_structure = None else: secondary_structure = ''.join(['E' if x == 'S' else x for x in secondary_structure]) if L < len(secondary_structure): secondary_structure = secondary_structure[:len(sequence)] elif L == len(secondary_structure): pass else: dseq = L - len(secondary_structure) secondary_structure += secondary_structure[-1]*dseq # potentials potential_list = [] potential_bias_list = [] if aa_bias not in ['',None]: potential_list.append('aa_bias') S.args['aa_composition'] = aa_bias if aa_bias_potential in ['',None]: aa_bias_potential = 3 potential_bias_list.append(str(aa_bias_potential)) ''' if target_charge not in ['',None]: potential_list.append('charge') if charge_potential in ['',None]: charge_potential = 1 potential_bias_list.append(str(charge_potential)) S.args['target_charge'] = float(target_charge) if target_ph in ['',None]: target_ph = 7.4 S.args['target_pH'] = float(target_ph) ''' if hydrophobic_target_score not in ['',None]: potential_list.append('hydrophobic') S.args['hydrophobic_score'] = float(hydrophobic_target_score) if hydrophobic_potential in ['',None]: hydrophobic_potential = 3 potential_bias_list.append(str(hydrophobic_potential)) if pssm not in ['',None]: potential_list.append('PSSM') potential_bias_list.append('5') S.args['PSSM'] = pssm.name if len(potential_list) > 0: S.args['potentials'] = ','.join(potential_list) S.args['potential_scale'] = ','.join(potential_bias_list) # normalise secondary_structure bias from range 0-0.3 S.args['secondary_structure'] = secondary_structure S.args['helix_bias'] = helix_bias S.args['strand_bias'] = strand_bias S.args['loop_bias'] = loop_bias # set T if num_steps in ['',None]: S.args['T'] = 20 else: S.args['T'] = int(num_steps) # noise if 'normal' in noise: S.args['sample_distribution'] = noise S.args['sample_distribution_gmm_means'] = [0] S.args['sample_distribution_gmm_variances'] = [1] elif 'gmm2' in noise: S.args['sample_distribution'] = noise S.args['sample_distribution_gmm_means'] = [-1,1] S.args['sample_distribution_gmm_variances'] = [1,1] elif 'gmm3' in noise: S.args['sample_distribution'] = noise S.args['sample_distribution_gmm_means'] = [-1,0,1] S.args['sample_distribution_gmm_variances'] = [1,1,1] if secondary_structure not in ['',None] or helix_bias+strand_bias+loop_bias > 0: S.args['checkpoint'] = dssp_checkpoint S.args['d_t1d'] = 29 print('using dssp checkpoint') else: S.args['checkpoint'] = og_checkpoint S.args['d_t1d'] = 24 print('using og checkpoint') for k,v in S.args.items(): print(f"{k} --> {v}") # init S S.model_init() S.diffuser_init() S.setup() # sampling loop plddt_data = [] for j in range(S.max_t): print(f'on step {j}') output_seq, output_pdb, plddt = S.take_step_get_outputs(j) plddt_data.append(plddt) yield output_seq, output_pdb, display_pdb(output_pdb), get_plddt_plot(plddt_data, S.max_t) output_seq, output_pdb, plddt = S.get_outputs() return output_seq, output_pdb, display_pdb(output_pdb), get_plddt_plot(plddt_data, S.max_t) def get_plddt_plot(plddt_data, max_t): fig, ax = plt.subplots(figsize=(15,6)) x = list(range(1, len(plddt_data) + 1)) ax.plot(x, plddt_data, color='#661dbf', linewidth=3, marker='o') ax.set_xticks(range(1, max_t + 1)) ax.set_yticks([i/10 for i in range(11)]) # 0부터 1까지 ax.set_ylim([0, 1]) ax.set_ylabel('model confidence (plddt)') ax.set_xlabel('diffusion steps (t)') plt.close() # 메모리 관리를 위해 닫기 return fig def display_pdb(path_to_pdb): ''' #function to display pdb in py3dmol ''' pdb = open(path_to_pdb, "r").read() view = py3Dmol.view(width=500, height=500) view.addModel(pdb, "pdb") view.setStyle({'model': -1}, {"cartoon": {'colorscheme':{'prop':'b','gradient':'roygb','min':0,'max':1}}})#'linear', 'min': 0, 'max': 1, 'colors': ["#ff9ef0","#a903fc",]}}}) view.zoomTo() output = view._make_html().replace("'", '"') print(view._make_html()) x = f""" {output} """ # do not use ' in this input return f"""""" ''' return f"""""" ''' def get_motif_preview(pdb_id, contigs): try: input_pdb = fetch_pdb(pdb_id=pdb_id.lower() if pdb_id else None) if input_pdb is None: return gr.HTML("PDB ID를 입력해주세요"), None parse = parse_pdb(input_pdb) output_name = input_pdb pdb = open(output_name, "r").read() view = py3Dmol.view(width=500, height=500) view.addModel(pdb, "pdb") if contigs in ['',0]: contigs = ['0'] else: contigs = [contigs] print('DEBUG: ',contigs) pdb_map = get_mappings(ContigMap(parse,contigs)) print('DEBUG: ',pdb_map) print('DEBUG: ',pdb_map['con_ref_idx0']) roi = [x[1]-1 for x in pdb_map['con_ref_pdb_idx']] colormap = {0:'#D3D3D3', 1:'#F74CFF'} colors = {i+1: colormap[1] if i in roi else colormap[0] for i in range(parse['xyz'].shape[0])} view.setStyle({"cartoon": {"colorscheme": {"prop": "resi", "map": colors}}}) view.zoomTo() output = view._make_html().replace("'", '"') print(view._make_html()) x = f""" {output} """ # do not use ' in this input return f"""""", output_name except Exception as e: return gr.HTML(f"오류가 발생했습니다: {str(e)}"), None def fetch_pdb(pdb_id=None): if pdb_id is None or pdb_id == "": return None else: os.system(f"wget -qnc https://files.rcsb.org/view/{pdb_id}.pdb") return f"{pdb_id}.pdb" # MSA AND PSSM GUIDANCE def save_pssm(file_upload): filename = file_upload.name orig_name = file_upload.orig_name if filename.split('.')[-1] in ['fasta', 'a3m']: return msa_to_pssm(file_upload) return filename def msa_to_pssm(msa_file): # Define the lookup table for converting amino acids to indices aa_to_index = {'A': 0, 'R': 1, 'N': 2, 'D': 3, 'C': 4, 'Q': 5, 'E': 6, 'G': 7, 'H': 8, 'I': 9, 'L': 10, 'K': 11, 'M': 12, 'F': 13, 'P': 14, 'S': 15, 'T': 16, 'W': 17, 'Y': 18, 'V': 19, 'X': 20, '-': 21} # Open the FASTA file and read the sequences records = list(SeqIO.parse(msa_file.name, "fasta")) assert len(records) >= 1, "MSA must contain more than one protein sequecne." first_seq = str(records[0].seq) aligned_seqs = [first_seq] # print(aligned_seqs) # Perform sequence alignment using the Needleman-Wunsch algorithm aligner = Align.PairwiseAligner() aligner.open_gap_score = -0.7 aligner.extend_gap_score = -0.3 for record in records[1:]: alignment = aligner.align(first_seq, str(record.seq))[0] alignment = alignment.format().split("\n") al1 = alignment[0] al2 = alignment[2] al1_fin = "" al2_fin = "" percent_gap = al2.count('-')/ len(al2) if percent_gap > 0.4: continue for i in range(len(al1)): if al1[i] != '-': al1_fin += al1[i] al2_fin += al2[i] aligned_seqs.append(str(al2_fin)) # Get the length of the aligned sequences aligned_seq_length = len(first_seq) # Initialize the position scoring matrix matrix = np.zeros((22, aligned_seq_length)) # Iterate through the aligned sequences and count the amino acids at each position for seq in aligned_seqs: #print(seq) for i in range(aligned_seq_length): if i == len(seq): break amino_acid = seq[i] if amino_acid.upper() not in aa_to_index.keys(): continue else: aa_index = aa_to_index[amino_acid.upper()] matrix[aa_index, i] += 1 # Normalize the counts to get the frequency of each amino acid at each position matrix /= len(aligned_seqs) print(len(aligned_seqs)) matrix[20:,]=0 outdir = ".".join(msa_file.name.split('.')[:-1]) + ".csv" np.savetxt(outdir, matrix[:21,:].T, delimiter=",") return outdir def get_pssm(fasta_msa, input_pssm): try: if input_pssm is not None: outdir = input_pssm.name elif fasta_msa is not None: outdir = save_pssm(fasta_msa) else: return gr.Plot(label="파일을 업로드해주세요"), None pssm = np.loadtxt(outdir, delimiter=",", dtype=float) fig, ax = plt.subplots(figsize=(15,6)) plt.imshow(torch.permute(torch.tensor(pssm),(1,0))) return fig, outdir except Exception as e: return gr.Plot(label=f"오류가 발생했습니다: {str(e)}"), None # 히어로 능력치 계산 함수 추가 def calculate_hero_stats(helix_bias, strand_bias, loop_bias, hydrophobic_score): stats = { 'strength': strand_bias * 20, # 베타시트 구조 기반 'flexibility': helix_bias * 20, # 알파헬릭스 구조 기반 'speed': loop_bias * 5, # 루프 구조 기반 'defense': abs(hydrophobic_score) if hydrophobic_score else 0 } return stats def toggle_seq_input(choice): if choice == "자동 설계": return gr.update(visible=True), gr.update(visible=False) else: # "직접 입력" return gr.update(visible=False), gr.update(visible=True) def toggle_secondary_structure(choice): if choice == "슬라이더로 설정": return ( gr.update(visible=True), # helix_bias gr.update(visible=True), # strand_bias gr.update(visible=True), # loop_bias gr.update(visible=False) # secondary_structure ) else: # "직접 입력" return ( gr.update(visible=False), # helix_bias gr.update(visible=False), # strand_bias gr.update(visible=False), # loop_bias gr.update(visible=True) # secondary_structure ) def create_radar_chart(stats): # 레이더 차트 생성 로직 categories = list(stats.keys()) values = list(stats.values()) fig = go.Figure(data=go.Scatterpolar( r=values, theta=categories, fill='toself' )) fig.update_layout( polar=dict( radialaxis=dict( visible=True, range=[0, 1] )), showlegend=False ) return fig def generate_hero_description(name, stats, abilities): # 히어로 설명 생성 로직 description = f""" 히어로 이름: {name} 주요 능력: - 근력: {'★' * int(stats['strength'] * 5)} - 유연성: {'★' * int(stats['flexibility'] * 5)} - 스피드: {'★' * int(stats['speed'] * 5)} - 방어력: {'★' * int(stats['defense'] * 5)} 특수 능력: {', '.join(abilities)} """ return description def combined_generation(name, strength, flexibility, speed, defense, size, abilities, sequence, seq_len, helix_bias, strand_bias, loop_bias, secondary_structure, aa_bias, aa_bias_potential, num_steps, noise, hydrophobic_target_score, hydrophobic_potential, contigs, pssm, seq_mask, str_mask, rewrite_pdb): try: # protein_diffusion_model 실행 generator = protein_diffusion_model( sequence=None, seq_len=size, # 히어로 크기를 seq_len으로 사용 helix_bias=flexibility, # 히어로 유연성을 helix_bias로 사용 strand_bias=strength, # 히어로 강도를 strand_bias로 사용 loop_bias=speed, # 히어로 스피드를 loop_bias로 사용 secondary_structure=None, aa_bias=None, aa_bias_potential=None, num_steps="25", noise="normal", hydrophobic_target_score=str(-defense), # 히어로 방어력을 hydrophobic score로 사용 hydrophobic_potential="2", contigs=None, pssm=None, seq_mask=None, str_mask=None, rewrite_pdb=None ) # 마지막 결과 가져오기 final_result = None for result in generator: final_result = result if final_result is None: raise Exception("생성 결과가 없습니다") output_seq, output_pdb, structure_view, plddt_plot = final_result # 히어로 능력치 계산 stats = calculate_hero_stats(flexibility, strength, speed, defense) # 모든 결과 반환 return ( create_radar_chart(stats), # 능력치 차트 generate_hero_description(name, stats, abilities), # 히어로 설명 output_seq, # 단백질 서열 output_pdb, # PDB 파일 structure_view, # 3D 구조 plddt_plot # 신뢰도 차트 ) except Exception as e: print(f"Error in combined_generation: {str(e)}") return ( None, f"에러: {str(e)}", None, None, gr.HTML("에러가 발생했습니다"), None ) def extract_parameters_from_chat(chat_response): """챗봇 응답에서 파라미터 추출""" try: params = { 'sequence_length': 100, 'helix_bias': 0.02, 'strand_bias': 0.02, 'loop_bias': 0.1, 'hydrophobic_target_score': 0 } # 응답 텍스트에서 값 추출 if "길이:" in chat_response: length_match = re.search(r'길이: (\d+)', chat_response) if length_match: params['sequence_length'] = int(length_match.group(1)) if "알파 헬릭스 비율:" in chat_response: helix_match = re.search(r'알파 헬릭스 비율: ([\d.]+)', chat_response) if helix_match: params['helix_bias'] = float(helix_match.group(1)) / 100 if "베타 시트 비율:" in chat_response: strand_match = re.search(r'베타 시트 비율: ([\d.]+)', chat_response) if strand_match: params['strand_bias'] = float(strand_match.group(1)) / 100 if "루프 구조 비율:" in chat_response: loop_match = re.search(r'루프 구조 비율: ([\d.]+)', chat_response) if loop_match: params['loop_bias'] = float(loop_match.group(1)) / 100 if "소수성 점수:" in chat_response: hydro_match = re.search(r'소수성 점수: ([-\d.]+)', chat_response) if hydro_match: params['hydrophobic_target_score'] = float(hydro_match.group(1)) return params except Exception as e: print(f"파라미터 추출 중 오류: {str(e)}") return None def update_protein_display(chat_response): if "생성된 단백질 분석" in chat_response: params = extract_parameters_from_chat(chat_response) if params: result = generate_protein(params) stats = calculate_hero_stats( helix_bias=params['helix_bias'], strand_bias=params['strand_bias'], loop_bias=params['loop_bias'], hydrophobic_score=params['hydrophobic_target_score'] ) return { hero_stats: create_radar_chart(stats), hero_description: chat_response, output_seq: result[0], output_pdb: result[1], output_viewer: display_pdb(result[1]), plddt_plot: result[3] } return None def analyze_active_sites(sequence): """활성 부위 분석""" return "분석 중..." # 임시 구현 def predict_interactions(params): """상호작용 예측""" return "예측 중..." # 임시 구현 def evaluate_stability(plddt_data): """안정성 평가""" if not plddt_data: return "평가 불가" avg_score = np.mean(plddt_data) if avg_score > 0.8: return "매우 안정적" elif avg_score > 0.6: return "안정적" else: return "보통" def process_chat_and_generate(message, history): try: # 1. 초기 응답 생성 (이전 대화 기록 유지) current_history = history + [ {"role": "user", "content": message}, {"role": "assistant", "content": "단백질 설계를 시작합니다. 잠시만 기다려주세요..."} ] yield (current_history, None, None, None, None, None, None) # 2. 프롬프트 분석 analysis = analyze_prompt(message) similar_structures = search_protein_data(analysis, ds) params = extract_parameters(analysis, similar_structures) # 3. 분석 결과 추가 (이전 메시지 유지) current_history = current_history[:-1] + [ {"role": "assistant", "content": f""" 분석 결과: {analysis} 단백질 구조 생성을 시작합니다... """} ] yield (current_history, None, None, None, None, None, None) # 4. 단백질 생성 generator = protein_diffusion_model(...) # 기존 파라미터 유지 # 5. 생성 과정 추적 (이전 메시지들 유지) step = 0 for result in generator: step += 1 progress_msg = f"단백질 생성 중... {step}/25 단계 완료" current_history = current_history[:-1] + [ {"role": "assistant", "content": progress_msg} ] yield ( current_history, create_radar_chart(calculate_hero_stats( params['helix_bias'], params['strand_bias'], params['loop_bias'], float(params['hydrophobic_target_score']) )), progress_msg, result[0], result[1], result[2], result[3] ) # 6. 최종 결과 및 설명 추가 (모든 이전 메시지 유지) final_explanation = f""" 단백질 설계가 완료되었습니다. [분석 결과] {analysis} [구조적 특징] - 길이: {params['sequence_length']} 아미노산 - 알파 헬릭스 비율: {params['helix_bias']*100:.1f}% - 베타 시트 비율: {params['strand_bias']*100:.1f}% - 루프 구조 비율: {params['loop_bias']*100:.1f}% - 소수성 점수: {params['hydrophobic_target_score']} [생성 과정] - 총 {step}단계의 최적화 완료 - 최종 안정성 점수: {np.mean(plddt_data) if plddt_data else 0:.2f} - 참조된 유사 구조: {len(similar_structures)}개 3D 구조와 상세 분석 결과를 확인하실 수 있습니다. """ final_history = current_history + [ {"role": "assistant", "content": final_explanation} ] return ( final_history, # 모든 대화 기록 유지 create_radar_chart(stats), final_explanation, output_seq, output_pdb, structure_view, plddt_plot ) except Exception as e: print(f"Error in process_chat_and_generate: {str(e)}") traceback.print_exc() error_msg = f"오류가 발생했습니다: {str(e)}" return ( history + [ {"role": "user", "content": message}, {"role": "assistant", "content": error_msg} ], None, None, None, None, None, None ) def extract_keywords(analysis): """분석 텍스트에서 키워드 추출""" try: # 기본 키워드 추출 keywords = [] # 주요 기능 키워드 if "치료" in analysis: keywords.extend(["therapeutic", "binding"]) if "결합" in analysis: keywords.extend(["binding", "interaction"]) if "촉매" in analysis: keywords.extend(["enzyme", "catalytic"]) # 환경 키워드 if "막" in analysis: keywords.extend(["membrane", "transmembrane"]) if "수용성" in analysis: keywords.extend(["soluble", "hydrophilic"]) if "소수성" in analysis: keywords.extend(["hydrophobic"]) # 구조 키워드 if "알파" in analysis or "나선" in analysis: keywords.append("helix") if "베타" in analysis or "시트" in analysis: keywords.append("sheet") if "루프" in analysis: keywords.append("loop") return list(set(keywords)) # 중복 제거 except Exception as e: print(f"키워드 추출 중 오류: {str(e)}") return [] def calculate_similarity(keywords, entry): """키워드와 데이터셋 항목 간의 유사도 계산""" try: score = 0 # 데이터셋 구조 확인 및 안전한 접근 sequence = entry.get('sequence', '').lower() if isinstance(entry, dict) else str(entry).lower() # 데이터셋 구조 디버깅 print("Entry structure:", type(entry)) print("Entry content:", entry) for keyword in keywords: # 안전한 접근을 위한 수정 description = entry.get('description', '') if isinstance(entry, dict) else '' if keyword in description.lower(): score += 2 if keyword in sequence: score += 1 if isinstance(entry, dict) and 'secondary_structure' in entry: sec_structure = entry['secondary_structure'] if keyword in ['helix'] and 'H' in sec_structure: score += 1 if keyword in ['sheet'] and 'E' in sec_structure: score += 1 if keyword in ['loop'] and 'L' in sec_structure: score += 1 return score except Exception as e: print(f"유사도 계산 중 상세 오류: {str(e)}") print("Entry:", entry) return 0 def download_checkpoint_files(): """필요한 체크포인트 파일 다운로드""" try: import requests # 체크포인트 파일 URL (실제 URL로 교체 필요) dssp_url = "YOUR_DSSP_CHECKPOINT_URL" og_url = "YOUR_OG_CHECKPOINT_URL" # DSSP 체크포인트 다운로드 if not os.path.exists(dssp_checkpoint): print("Downloading DSSP checkpoint...") response = requests.get(dssp_url) with open(dssp_checkpoint, 'wb') as f: f.write(response.content) # OG 체크포인트 다운로드 if not os.path.exists(og_checkpoint): print("Downloading OG checkpoint...") response = requests.get(og_url) with open(og_checkpoint, 'wb') as f: f.write(response.content) print("Checkpoint files downloaded successfully") except Exception as e: print(f"Error downloading checkpoint files: {str(e)}") raise # 시작 시 체크포인트 파일 확인 및 다운로드 try: download_checkpoint_files() except Exception as e: print(f"Warning: Could not download checkpoint files: {str(e)}") with gr.Blocks(theme='ParityError/Interstellar') as demo: with gr.Row(): with gr.Column(scale=1): # 챗봇 인터페이스 gr.Markdown("# 🤖 AI 단백질 설계 도우미") # 여기를 수정 chatbot = gr.Chatbot( height=600, type='messages' # 메시지 형식 지정 ) with gr.Row(): msg = gr.Textbox( label="메시지를 입력하세요", placeholder="예: 알츠하이머 치료용 단백질을 생성해주세요", lines=2, scale=4 ) submit_btn = gr.Button("전송", variant="primary", scale=1) clear = gr.Button("대화 내용 지우기") with gr.Accordion("채팅 설정", open=False): system_message = gr.Textbox( value="당신은 단백질 설계를 도와주는 전문가입니다.", label="시스템 메시지" ) max_tokens = gr.Slider( minimum=1, maximum=3800, value=3800, step=1, label="최대 토큰 수" ) temperature = gr.Slider( minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature" ) top_p = gr.Slider( minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-P" ) # 탭 인터페이스 with gr.Tabs(): with gr.TabItem("🦸‍♂️ 커스텀 디자인"): gr.Markdown(""" ### ✨ 당신만의 특별한 커스텀을 만들어보세요! 각 능력치를 조절하면 커스텀된 단백질이 자동으로 설계됩니다. """) # 히어로 기본 정보 hero_name = gr.Textbox( label="커스텀 이름", placeholder="당신의 커스텀 단백질 이름을 지어주세요!", info="당신만의 정체성을 나타내는 이름을 입력하세요" ) # 능력치 설정 gr.Markdown("### 💪 커스텀 능력치 설정") with gr.Row(): strength = gr.Slider( minimum=0.0, maximum=0.05, label="💪 초강력(근력)", value=0.02, info="단단한 베타시트 구조로 강력한 힘을 생성합니다" ) flexibility = gr.Slider( minimum=0.0, maximum=0.05, label="🤸‍♂️ 유연성", value=0.02, info="나선형 알파헬릭스 구조로 유연한 움직임을 가능하게 합니다" ) with gr.Row(): speed = gr.Slider( minimum=0.0, maximum=0.20, label="⚡ 스피드", value=0.1, info="루프 구조로 빠른 움직임을 구현합니다" ) defense = gr.Slider( minimum=-10, maximum=10, label="🛡️ 방어력", value=0, info="음수: 수중 활동에 특화, 양수: 지상 활동에 특화" ) # 히어로 크기 설정 hero_size = gr.Slider( minimum=50, maximum=200, label="📏 커스텀 단백질 크기", value=100, info="전체적인 크기를 결정합니다" ) # 특수 능력 설정 with gr.Accordion("🌟 특수 능력", open=False): gr.Markdown(""" 특수 능력을 선택하면 커스텀 단백질질에 특별한 구조가 추가됩니다. - 자가 회복: 단백질 구조 복구 능력 강화 - 원거리 공격: 특수한 구조적 돌출부 형성 - 방어막 생성: 안정적인 보호층 구조 생성 """) special_ability = gr.CheckboxGroup( choices=["자가 회복", "원거리 공격", "방어막 생성"], label="특수 능력 선택" ) # 생성 버튼 create_btn = gr.Button("🧬 커스텀 단백질 생성!", variant="primary", scale=2) with gr.TabItem("🧬 커스텀 단백질 설계"): gr.Markdown(""" ### 🧪 커스텀 단백질 고급 설정 유전자 구조를 더 세밀하게 조정할 수 있습니다. """) seq_opt = gr.Radio( ["자동 설계", "직접 입력"], label="DNA 설계 방식", value="자동 설계" ) sequence = gr.Textbox( label="단백질 시퀀스", lines=1, placeholder='사용 가능한 아미노산: A,C,D,E,F,G,H,I,K,L,M,N,P,Q,R,S,T,V,W,Y (X는 무작위)', visible=False ) seq_len = gr.Slider( minimum=5.0, maximum=250.0, label="DNA 길이", value=100, visible=True ) with gr.Accordion(label='🦴 골격 구조 설정', open=True): gr.Markdown(""" 커스텀 단백질 기본 골격 구조를 설정합니다. - 나선형 구조: 유연하고 탄력있는 움직임 - 병풍형 구조: 단단하고 강력한 힘 - 고리형 구조: 빠르고 민첩한 움직임 """) sec_str_opt = gr.Radio( ["슬라이더로 설정", "직접 입력"], label="골격 구조 설정 방식", value="슬라이더로 설정" ) secondary_structure = gr.Textbox( label="골격 구조", lines=1, placeholder='H:나선형, S:병풍형, L:고리형, X:자동설정', visible=False ) with gr.Column(): helix_bias = gr.Slider( minimum=0.0, maximum=0.05, label="나선형 구조 비율", visible=True ) strand_bias = gr.Slider( minimum=0.0, maximum=0.05, label="병풍형 구조 비율", visible=True ) loop_bias = gr.Slider( minimum=0.0, maximum=0.20, label="고리형 구조 비율", visible=True ) with gr.Accordion(label='🧬 단백질 구성 설정', open=False): gr.Markdown(""" 특정 아미노산의 비율을 조절하여 특성을 강화할 수 있습니다. 예시: W0.2,E0.1 (트립토판 20%, 글루탐산 10%) """) with gr.Row(): aa_bias = gr.Textbox( label="아미노산 비율", lines=1, placeholder='예시: W0.2,E0.1' ) aa_bias_potential = gr.Textbox( label="강화 정도", lines=1, placeholder='1.0-5.0 사이 값 입력' ) with gr.Accordion(label='🌍 환경 적응력 설정', open=False): gr.Markdown(""" 환경 적응력을 조절합니다. 음수: 수중 활동에 특화, 양수: 지상 활동에 특화 """) with gr.Row(): hydrophobic_target_score = gr.Textbox( label="환경 적응 점수", lines=1, placeholder='예시: -5 (수중 활동에 특화)' ) hydrophobic_potential = gr.Textbox( label="적응력 강화 정도", lines=1, placeholder='1.0-2.0 사이 값 입력' ) with gr.Accordion(label='⚙️ 고급 설정', open=False): gr.Markdown(""" DNA 생성 과정의 세부 매개변수를 조정합니다. """) with gr.Row(): num_steps = gr.Textbox( label="생성 단계", lines=1, placeholder='25 이하 권장' ) noise = gr.Dropdown( ['normal','gmm2 [-1,1]','gmm3 [-1,0,1]'], label='노이즈 타입', value='normal' ) design_btn = gr.Button("🧬 단백질 설계 생성!", variant="primary", scale=2) with gr.TabItem("🧪 커스텀 단백질 강화"): gr.Markdown(""" ### ⚡ 기존 커스텀 단백질 활용 기존 단백질 일부를 새로운 커스텀에게 이식합니다. """) gr.Markdown("공개된 커스텀 단백질 데이터베이스에서 코드를 찾을 수 있습니다") pdb_id_code = gr.Textbox( label="커스텀 단백질 코드", lines=1, placeholder='기존 커스텀 단백질 코드를 입력하세요 (예: 1DPX)' ) gr.Markdown("이식하고 싶은 단백질 영역을 선택하고 새로운 단백질을 추가할 수 있습니다") contigs = gr.Textbox( label="이식할 단백질 영역", lines=1, placeholder='예시: 15,A3-10,20-30' ) with gr.Row(): seq_mask = gr.Textbox( label='능력 재설계', lines=1, placeholder='선택한 영역의 능력을 새롭게 디자인' ) str_mask = gr.Textbox( label='구조 재설계', lines=1, placeholder='선택한 영역의 구조를 새롭게 디자인' ) preview_viewer = gr.HTML() rewrite_pdb = gr.File(label='커스텀 단백질 파일') preview_btn = gr.Button("🔍 미리보기", variant="secondary") enhance_btn = gr.Button("⚡ 강화된 커스텀 단백질 생성!", variant="primary", scale=2) with gr.TabItem("👑 커스텀 단백질 족보"): gr.Markdown(""" ### 🏰 위대한 커스텀 단백질 가문의 유산 강력한 특성을 계승하여 새로운 커스텀 단백질을 만듭니다. """) with gr.Row(): with gr.Column(): gr.Markdown("커스텀 단백질 정보가 담긴 파일을 업로드하세요") fasta_msa = gr.File(label='가문 DNA 데이터') with gr.Column(): gr.Markdown("이미 분석된 가문 특성 데이터가 있다면 업로드하세요") input_pssm = gr.File(label='가문 특성 데이터') pssm = gr.File(label='분석된 가문 특성') pssm_view = gr.Plot(label='가문 특성 분석 결과') pssm_gen_btn = gr.Button("✨ 가문 특성 분석", variant="secondary") inherit_btn = gr.Button("👑 가문의 힘 계승!", variant="primary", scale=2) # 오른쪽 열: 결과 표시 with gr.Column(scale=1): gr.Markdown("## 🦸‍♂️ 커스텀 단백질 프로필") hero_stats = gr.Plot(label="능력치 분석") hero_description = gr.Textbox(label="커스텀 단백질 특성", lines=3) gr.Markdown("## 🧬 커스텀 단백질 분석 결과") gr.Markdown("#### ⚡ 커스텀 단백질 안정성 점수") plddt_plot = gr.Plot(label='안정성 분석') gr.Markdown("#### 📝 커스텀 단백질 시퀀스") output_seq = gr.Textbox(label="커스텀 단백질 서열") gr.Markdown("#### 💾 커스텀 단백질 데이터") output_pdb = gr.File(label="커스텀 단백질 파일") gr.Markdown("#### 🔬 커스텀 단백질 구조") output_viewer = gr.HTML() # 이벤트 연결 # 챗봇 이벤트 msg.submit(process_chat, [msg, chatbot], [chatbot]) clear.click(lambda: None, None, chatbot, queue=False) seq_opt.change( fn=toggle_seq_input, inputs=[seq_opt], outputs=[seq_len, sequence], queue=False ) sec_str_opt.change( fn=toggle_secondary_structure, inputs=[sec_str_opt], outputs=[helix_bias, strand_bias, loop_bias, secondary_structure], queue=False ) preview_btn.click( get_motif_preview, inputs=[pdb_id_code, contigs], outputs=[preview_viewer, rewrite_pdb] ) pssm_gen_btn.click( get_pssm, inputs=[fasta_msa, input_pssm], outputs=[pssm_view, pssm] ) # 챗봇 기반 단백질 생성 결과 업데이트 def update_protein_display(chat_response): if "생성된 단백질 분석" in chat_response: params = extract_parameters_from_chat(chat_response) result = generate_protein(params) return { hero_stats: create_radar_chart(calculate_hero_stats(params)), hero_description: chat_response, output_seq: result[0], output_pdb: result[1], output_viewer: display_pdb(result[1]), plddt_plot: result[3] } return None # 각 생성 버튼 이벤트 연결 for btn in [create_btn, design_btn, enhance_btn, inherit_btn]: btn.click( combined_generation, inputs=[ hero_name, strength, flexibility, speed, defense, hero_size, special_ability, sequence, seq_len, helix_bias, strand_bias, loop_bias, secondary_structure, aa_bias, aa_bias_potential, num_steps, noise, hydrophobic_target_score, hydrophobic_potential, contigs, pssm, seq_mask, str_mask, rewrite_pdb ], outputs=[ hero_stats, hero_description, output_seq, output_pdb, output_viewer, plddt_plot ] ) # 이벤트 핸들러 연결 msg.submit( fn=process_chat_and_generate, inputs=[msg, chatbot], outputs=[ chatbot, hero_stats, hero_description, output_seq, output_pdb, output_viewer, plddt_plot ] ) submit_btn.click( fn=process_chat_and_generate, inputs=[msg, chatbot], outputs=[ chatbot, hero_stats, hero_description, output_seq, output_pdb, output_viewer, plddt_plot ] ) # 채팅 내용 지우기 버튼 clear.click( lambda: (None, None, None, None, None, None, None), None, [chatbot, hero_stats, hero_description, output_seq, output_pdb, output_viewer, plddt_plot], queue=False ) # 챗봇 응답에 따른 결과 업데이트 msg.submit( update_protein_display, inputs=[chatbot], outputs=[hero_stats, hero_description, output_seq, output_pdb, output_viewer, plddt_plot] ) submit_btn.click(respond, [msg, chatbot, system_message, max_tokens, temperature, top_p], [chatbot]) msg.submit(respond, [msg, chatbot, system_message, max_tokens, temperature, top_p], [chatbot]) clear.click(lambda: None, None, chatbot, queue=False) # 이벤트 핸들러 연결 msg.submit( fn=process_chat_and_generate, inputs=[msg, chatbot], outputs=[ chatbot, hero_stats, hero_description, output_seq, output_pdb, output_viewer, plddt_plot ], show_progress=True ) submit_btn.click( fn=process_chat_and_generate, inputs=[msg, chatbot], outputs=[ chatbot, hero_stats, hero_description, output_seq, output_pdb, output_viewer, plddt_plot ], show_progress=True ) # 실행 demo.queue() demo.launch(debug=True)