Spaces:
Running
Running
from selenium import webdriver | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support import expected_conditions as EC | |
import csv | |
from selenium import webdriver | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support import expected_conditions as EC | |
import csv | |
from selenium.common.exceptions import ElementClickInterceptedException, TimeoutException | |
import pandas as pd | |
import requests | |
from bs4 import BeautifulSoup | |
import time | |
import unicodedata | |
import re | |
import ast | |
import torch | |
from selenium import webdriver | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support import expected_conditions as EC | |
from selenium.common.exceptions import ElementClickInterceptedException | |
def fetch_clinical_trials( | |
disease_name="", | |
freeword="", | |
include_not_yet_recruiting=False, | |
include_suspended=False, | |
specific_clinical_research=True, | |
corporate_clinical_trial=True, | |
physician_initiated_clinical_trial=True, | |
): | |
""" | |
指定された条件に基づいてjRCTから臨床試験情報を取得します。 | |
Args: | |
disease_name (str): 対象疾患名(例: "がん 神経膠腫 骨髄腫") | |
freeword (str): フリーワード検索(例: "免疫療法") | |
include_not_yet_recruiting (bool): 募集前の試験も含める場合はTrue。 | |
include_suspended (bool): 募集中断を含める場合はTrue。 | |
specific_clinical_research (bool): 特定臨床研究を含める場合はTrue。 | |
corporate_clinical_trial (bool): 企業治験を含める場合はTrue。 | |
physician_initiated_clinical_trial (bool): 医師主導治験を含める場合はTrue。 | |
Returns: | |
list: 検索結果のリスト([試験ID, タイトル, 対象疾患, 進捗状況, 日付, リンク]) | |
""" | |
# WebDriverを初期化 | |
driver = webdriver.Chrome() # 必要に応じてChromeDriverを設定 | |
all_results = [] | |
try: | |
# jRCTの検索ページにアクセス | |
driver.get("https://jrct.niph.go.jp/search") | |
# 対象疾患名を入力 | |
if disease_name: | |
disease_field = WebDriverWait(driver, 10).until( | |
EC.presence_of_element_located((By.ID, "reg-plobrem-1")) | |
) | |
disease_field.send_keys(disease_name) | |
# 対象疾患名の条件を「or」に設定 | |
condition_select = driver.find_element(By.ID, "reg-plobrem-type") | |
condition_select.find_element(By.CSS_SELECTOR, "option[value='1']").click() | |
# フリーワード検索を入力 | |
if freeword: | |
freeword_field = WebDriverWait(driver, 10).until( | |
EC.presence_of_element_located((By.ID, "demo-1")) | |
) | |
freeword_field.send_keys(freeword) | |
# フリーワード検索の条件を「or」に設定 | |
condition_select = driver.find_element(By.ID, "others") | |
condition_select.find_element(By.CSS_SELECTOR, "option[value='1']").click() | |
# 募集中を選択 | |
recruitment_checkbox = driver.find_element(By.ID, "reg-recruitment-2") | |
recruitment_checkbox.click() | |
# 募集前も含める場合 | |
if include_not_yet_recruiting: | |
not_yet_checkbox = driver.find_element(By.ID, "reg-recruitment-1") | |
not_yet_checkbox.click() | |
# 募集中断を選択 | |
if include_suspended: | |
suspended_checkbox = driver.find_element(By.ID, "reg-recruitment-3") | |
suspended_checkbox.click() | |
# 特定臨床研究を選択 | |
if specific_clinical_research: | |
specific_checkbox = driver.find_element(By.ID, "is-specific1") | |
specific_checkbox.click() | |
# 企業治験を選択 | |
if corporate_clinical_trial: | |
corporate_checkbox = driver.find_element(By.ID, "is-specific3") | |
corporate_checkbox.click() | |
# 医師主導治験を選択 | |
if physician_initiated_clinical_trial: | |
physician_checkbox = driver.find_element(By.ID, "is-specific7") | |
physician_checkbox.click() | |
# 検索ボタンをクリック | |
try: | |
search_button = driver.find_element(By.NAME, "button_type") | |
driver.execute_script("arguments[0].scrollIntoView();", search_button) # ボタンを画面内にスクロール | |
WebDriverWait(driver, 10).until(EC.element_to_be_clickable((By.NAME, "button_type"))).click() | |
except ElementClickInterceptedException: | |
print("検索ボタンがクリックできないため、JavaScriptでクリックします。") | |
driver.execute_script("arguments[0].click();", search_button) | |
# ページネーション対応ループ | |
while True: | |
# 現在のページの結果がロードされるのを待機 | |
WebDriverWait(driver, 10).until( | |
EC.presence_of_element_located((By.CSS_SELECTOR, "table tbody tr")) | |
) | |
# 現在のページの結果を取得 | |
rows = driver.find_elements(By.CSS_SELECTOR, "table tbody tr") | |
for row in rows: | |
columns = row.find_elements(By.TAG_NAME, "td") | |
if len(columns) > 4: | |
# 試験情報をリストに追加 | |
trial_id = columns[0].text | |
title = columns[1].text | |
condition = columns[2].text | |
status = columns[3].text | |
date = columns[4].text | |
# リンクを取得(エラー処理を追加) | |
try: | |
link = columns[1].find_element(By.TAG_NAME, "a").get_attribute("href") | |
except Exception: | |
link = "リンク取得エラー" | |
all_results.append([trial_id, title, condition, status, date, link]) | |
# ページネーションの確認 | |
try: | |
current_page = driver.find_element(By.CSS_SELECTOR, "ul.pagination li.active").text | |
print(f"{current_page} ページ目を処理しました。") | |
except Exception: | |
print("ページネーションが存在しません。全ての結果を取得しました。") | |
break | |
# 次ページボタンのリストを取得 | |
pagination_buttons = driver.find_elements(By.CSS_SELECTOR, "ul.pagination li a") | |
next_button = None | |
for button in pagination_buttons: | |
if button.text.isdigit() and int(button.text) > int(current_page): | |
next_button = button | |
break | |
if next_button: | |
try: | |
driver.execute_script("arguments[0].scrollIntoView();", next_button) # ボタンを画面内にスクロール | |
WebDriverWait(driver, 10).until(EC.element_to_be_clickable((By.LINK_TEXT, next_button.text))).click() | |
except ElementClickInterceptedException: | |
print("次ページボタンがクリックできないため、JavaScriptでクリックします。") | |
driver.execute_script("arguments[0].click();", next_button) | |
WebDriverWait(driver, 10).until(EC.staleness_of(rows[0])) # ページが変わるまで待機 | |
else: | |
print("次のページはありません。全ての結果を取得しました。") | |
break | |
finally: | |
# ブラウザを閉じる | |
driver.quit() | |
return all_results | |
def scrape_jrct_all_details(url): | |
""" | |
指定されたjRCT URLから必要なすべての情報を抽出します。 | |
""" | |
def normalize_text(text): | |
if not text: | |
return "" | |
# Unicode正規化 + 余分な空白除去 | |
text = unicodedata.normalize('NFKC', text) | |
return " ".join(text.split()) | |
# リクエストを送信 | |
headers = { | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" | |
} | |
try: | |
response = requests.get(url, headers=headers, timeout=10) | |
response.raise_for_status() | |
except requests.RequestException as e: | |
print(f"URLリクエストに失敗しました: {url} - エラー: {e}") | |
return {"URL": url, "エラー": "リクエスト失敗"} | |
soup = BeautifulSoup(response.text, 'html.parser') | |
data = {"URL": url} | |
def extract_label_data(label_text, label_en=None): | |
""" | |
特定のラベルに対応するデータを抽出するヘルパー関数 | |
複数の候補があった場合は、すべて取得してからフィルタする方式をとる。 | |
""" | |
results = [] | |
# 日本語ラベルと英語ラベルが両方指定されていれば、両方含む行を優先的に探す | |
combined_search = None | |
if label_en: | |
combined_search = f"{label_text} / {label_en}" | |
# ページ内のすべての<label>を探索 | |
for l in soup.find_all('label'): | |
lt = normalize_text(l.get_text()) | |
# combined_searchが利用可能ならまず完全な結合形でマッチを試みる | |
# なければ従来通りlabel_textをinでマッチ | |
if combined_search: | |
if combined_search in lt: | |
th = l.find_parent('th') | |
if not th: | |
continue | |
tr = th.find_parent('tr') | |
if not tr: | |
continue | |
tds = tr.find_all('td') | |
if len(tds) >= 1: | |
jp_data = normalize_text(tds[0].get_text()) if len(tds) > 0 else None | |
en_data = normalize_text(tds[1].get_text()) if label_en and len(tds) > 1 else None | |
results.append((jp_data, en_data)) | |
else: | |
# label_enが無い場合は、label_textだけで検索 | |
if label_text in lt: | |
th = l.find_parent('th') | |
if not th: | |
continue | |
tr = th.find_parent('tr') | |
if not tr: | |
continue | |
tds = tr.find_all('td') | |
if len(tds) >= 1: | |
jp_data = normalize_text(tds[0].get_text()) if len(tds) > 0 else None | |
en_data = normalize_text(tds[1].get_text()) if label_en and len(tds) > 1 else None | |
results.append((jp_data, en_data)) | |
# resultsに候補が格納されている | |
if not results: | |
return None, None | |
# 複数候補がある場合、特定キーワードによるフィルタリングが可能 | |
# ここでは特定キーワードがなければそのまま最初のを返す | |
# もし特定の疾患キーワードでフィルタリングしたい場合はここで処理を追加 | |
# ひとまず最初の候補を返す | |
return results[0] | |
# "研究・治験の目的" を抽出 | |
data["研究・治験の目的"], _ = extract_label_data("研究・治験の目的") | |
# 試験デザイン情報(日本語と英語)を抽出 | |
design_labels = [ | |
('試験等のフェーズ', 'Phase'), | |
('試験の種類', 'Study Type'), | |
('無作為化', 'allocation'), | |
('盲検化', 'masking'), | |
('対照', 'control'), | |
('割付け', 'assignment'), | |
('研究目的', 'purpose') | |
] | |
for label_jp, label_en in design_labels: | |
jp, en = extract_label_data(label_jp, label_en) | |
data[label_jp] = jp | |
data[label_en] = en | |
# その他の情報を抽出 | |
# 対象疾患名 / Health Condition(s) or Problem(s) Studiedを追加 | |
details_labels = [ | |
('主たる選択基準', 'Inclusion Criteria'), | |
('主たる除外基準', 'Exclusion Criteria'), | |
('年齢下限', 'Age Minimum'), | |
('年齢上限', 'Age Maximum'), | |
('性別', 'Gender'), | |
('中止基準', 'Discontinuation Criteria'), | |
('対象疾患名', 'Health Condition(s) or Problem(s) Studied'), # 追加 | |
('対象疾患キーワード', 'Keyword'), | |
('介入の内容', 'Intervention(s)') | |
] | |
for label_jp, label_en in details_labels: | |
jp, en = extract_label_data(label_jp, label_en) | |
data[label_jp] = jp | |
data[label_en] = en | |
# "他の臨床研究登録機関への登録" を探索 | |
other_registries_section = soup.find("div", id="area-toggle-07-02") | |
japic_no_list = [] | |
nct_no_list = [] | |
if other_registries_section: | |
rows = other_registries_section.find_all("tr") | |
for row in rows: | |
label = row.find("label") | |
if label and ("ID番号" in label.text or "研究番号" in label.text): | |
value_td = row.find("td") | |
if value_td: | |
id_number = value_td.text.strip() | |
if id_number.startswith("JapicCTI"): | |
japic_no_list.append(id_number) | |
elif id_number.startswith("NCT"): | |
nct_no_list.append(id_number) | |
# JapicCTI No と NCT No を格納(複数あればカンマ区切り) | |
data["JapicCTI No"] = ", ".join(japic_no_list) if japic_no_list else None | |
data["NCT No"] = ", ".join(nct_no_list) if nct_no_list else None | |
# サーバーへの負荷を避けるためのスリープ | |
time.sleep(1) # 必要に応じて調整 | |
return data | |
def create_dataframe_from_urls(urls, delay=5): | |
""" | |
URLのリストを受け取り、pandas DataFrameを作成します。 | |
リクエスト間に待機時間を設定して403エラーを防ぎます。 | |
Args: | |
urls (list): jRCTの詳細ページURLリスト。 | |
delay (int): 各リクエスト間の待機時間(秒単位、デフォルトは5秒)。 | |
Returns: | |
pd.DataFrame: 取得したデータのDataFrame。 | |
""" | |
all_data = [] | |
for url in urls: | |
print(f"Processing URL: {url}") | |
try: | |
# 各URLのデータを取得 | |
data = scrape_jrct_all_details(url) | |
all_data.append(data) | |
# 次のリクエストまで待機 | |
print(f"Waiting for {delay} seconds before the next request...") | |
time.sleep(delay) | |
except Exception as e: | |
print(f"Failed to process URL {url}: {e}") | |
# URLとエラー情報を記録しておく(必要ならログに保存など) | |
all_data.append({"URL": url, "Error": str(e)}) | |
# pandas DataFrameに変換 | |
return pd.DataFrame(all_data) | |
def extract_jrct_links(results): | |
""" | |
fetch_clinical_trialsの結果からjRCT-Noを抽出し、詳細リンクを作成する。 | |
Args: | |
results (list): fetch_clinical_trialsから得られる結果リスト | |
Returns: | |
list: jRCTの詳細ページリンクリスト | |
""" | |
base_url = "https://jrct.niph.go.jp/latest-detail/" | |
links = [] | |
for result in results: | |
if len(result) > 0: | |
jrct_no = result[0] # jRCT-Noは結果リストの最初の要素 | |
links.append(base_url + jrct_no) | |
return links | |
def reorder_columns(df): | |
""" | |
DataFrame の列を日本語の列を前半に、英語の列を後半に並び替える。 | |
""" | |
# 日本語と英語の列を分ける | |
jp_columns = [col for col in df.columns if all(ord(c) < 128 for c in col) is False] # 非 ASCII(日本語)文字列を含む列 | |
en_columns = [col for col in df.columns if col not in jp_columns] # 残りの列を英語と仮定 | |
# 日本語列 + 英語列の順序で整列 | |
ordered_columns = jp_columns + en_columns | |
# 列を並び替えた DataFrame を返す | |
return df[ordered_columns] | |
# Target列を分割する関数 | |
def split_target(target): | |
# 指定された区切り文字で分割 | |
split_words = re.split(r'[,\n、・及びおよび又はまたは]+', target) | |
# 空白文字を除外してリストとして返す | |
return [word.strip() for word in split_words if word.strip()] | |
# Target列を分割する関数(改良後) | |
def split_target_English(target): | |
# 区切り文字を (,) or (\n) or (、) or (・) または文字列"or" として扱う | |
# 正規表現では、パイプ(|)でor条件を定義し、"(?: ... )"はグルーピングのみ行う非捕捉グループ | |
# [,\n、・] はいずれかの1文字とマッチ | |
# or は文字列全体とマッチ | |
# 複数連続した区切り文字をまとめて1回の分割として扱うために+(1回以上)とする | |
split_words = re.split(r'(?:[,\n、・]|or| and)+', target) | |
# 空白文字を除外してリストとして返す | |
return [word.strip() for word in split_words if word.strip()] | |
# 処理プログラム | |
def split_triple_negative_words(target_words): | |
updated_words = [] | |
for word in target_words: | |
if 'triple negative' in word.lower(): | |
# 'triple negative' の部分を追加 | |
updated_words.append('Triple Negative') # 大文字で統一して追加 | |
# 'triple negative' を除いた残りの部分を追加 | |
remaining = word.lower().replace('triple negative', '').strip() | |
if remaining: # 残りの単語が存在する場合のみ追加 | |
updated_words.append(remaining.title().strip()) # 単語の先頭を大文字化 | |
else: | |
updated_words.append(word.strip().title()) # 単語の先頭を大文字化 | |
return updated_words | |
class WordProcessor: | |
def __init__(self, target_words): | |
self.target_words = target_words | |
def process(self, target_words): | |
""" | |
入力された単語のリストを処理して、ターゲット単語に基づき分割します。 | |
""" | |
updated_words = [] | |
for word in target_words: | |
word_lower = word.lower() | |
for target in self.target_words: | |
if target in word_lower: | |
# ターゲット単語を追加 | |
updated_words.append(target.title()) | |
# ターゲット単語を除いた残りを追加 | |
remaining = word_lower.replace(target, '').strip() | |
if remaining: | |
updated_words.append(remaining.title()) | |
break | |
else: | |
# ターゲット単語に該当しない場合 | |
updated_words.append(word.strip().title()) | |
return updated_words | |
def __call__(self, target_words): | |
""" | |
インスタンスを関数として呼び出すためのエントリポイント。 | |
""" | |
return self.process(target_words) | |
import pandas as pd | |
from sentence_transformers import util | |
import torch | |
def DfPostProcess(exclusive_words, model, csv_loc=None, dataframe=None): | |
""" | |
exclusive_words: 除外ワードリスト | |
model: SentenceTransformerなどのモデル | |
csv_loc: CSVファイルのパス(文字列)。dataframeが与えられない場合に使用。 | |
dataframe: 既存のpandas.DataFrame。csv_locが与えられない場合に使用。 | |
""" | |
# csv_locもdataframeも与えられなかった場合はエラー | |
if csv_loc is None and dataframe is None: | |
raise ValueError("Either csv_loc or dataframe must be provided.") | |
# 入力データフレームの決定 | |
if dataframe is not None: | |
basedf = dataframe.copy() | |
else: | |
basedf = pd.read_csv(csv_loc, index_col=0) | |
# '試験等のフェーズ'がNaNの行を削除 | |
basedf = basedf.dropna(subset=['試験等のフェーズ']) | |
# WordProcessorインスタンス作成 | |
processor = WordProcessor(exclusive_words) | |
# TargetEnglish列をsplit_target_Englishで処理しTargetWord列作成 | |
basedf['TargetWord'] = basedf['TargetEnglish'].apply(split_target_English) | |
# NaNやNoneではない場合にprocessor適用 | |
basedf['TargetWord'] = basedf['TargetWord'].apply(lambda x: processor(x) if isinstance(x, list) else x) | |
# TargetWord列をベクトル化し、リスト化して格納 | |
target_vecs_list = [] | |
for idx, target_words in enumerate(basedf['TargetWord']): | |
target_vecs = model.encode(target_words, convert_to_tensor=True).cpu() | |
# テンソルをリストに変換 | |
target_vecs_list.append(target_vecs.tolist()) | |
# TargetVec列にリストを格納 (dtype=objectのままでOK) | |
basedf['TargetVec'] = pd.Series(target_vecs_list, index=basedf.index, dtype=object) | |
return basedf | |
def get_matched_df(basedf, query, model, threshold=0.5): | |
# queryをベクトル化(テンソル化)しCPUへ移動 | |
query_vec = model.encode(query, convert_to_tensor=True).cpu() | |
matched_indices = [] | |
for idx, target_vec_str in enumerate(basedf['TargetVec']): | |
# CSVから読み込んだ時点でTargetVecはPythonリストを文字列化したものになっているため、 | |
# ここでliteral_evalでリストに戻します。 | |
if isinstance(target_vec_str, str): | |
# target_vec_strは"[[...], [...]]"のようなリスト形式 | |
target_list = ast.literal_eval(target_vec_str) # リストに変換 | |
target_vecs = torch.tensor(target_list) # リストからTensorへ | |
else: | |
# 万が一既にTensorの場合はそのまま使用 | |
target_vecs = target_vec_str | |
# 必要であればCPUへ移動(通常はすでにCPU上のはず) | |
"""if target_vecs[0].is_cuda: | |
target_vecs = target_vecs.cpu()""" | |
# コサイン類似度を計算 | |
cosine_scores = util.cos_sim(query_vec, target_vecs).squeeze() | |
# thresholdを超えるスコアが1つでもあればマッチと判断 | |
if (cosine_scores >= threshold).any(): | |
matched_indices.append(idx) | |
# 条件を満たした行を抽出 | |
matched_df = basedf.iloc[matched_indices] | |
return matched_df | |
def GetJRCTCriteria(dataframe, idx): | |
InC = dataframe.iloc[idx,:]['Inclusion Criteria'] | |
ExC = dataframe.iloc[idx,:]['Exclusion Criteria'] | |
return "Inclusion Criteria :" + InC + "\n" + "Exclusion Criteria :" + ExC |