Spaces:
No application file
No application file
import json | |
import hashlib | |
import datetime | |
import requests | |
import os | |
import gradio as gr | |
from datetime import datetime, timedelta | |
from dataclasses import dataclass | |
from datetime import datetime | |
from typing import List, Optional, Any, Dict | |
# 修改后的数据类(添加 Optional 和默认值) | |
class Author: | |
_id: Optional[str] = None | |
name: Optional[str] = None | |
hidden: Optional[bool] = None | |
class Paper: | |
id: Optional[str] = None | |
authors: List[Author] = None | |
publishedAt: Optional[datetime] = None | |
title: Optional[str] = None | |
summary: Optional[str] = None | |
upvotes: Optional[int] = None | |
discussionId: Optional[str] = None | |
class SubmittedBy: | |
_id: Optional[str] = None | |
avatarUrl: Optional[str] = None | |
fullname: Optional[str] = None | |
name: Optional[str] = None | |
type: Optional[str] = None | |
isPro: Optional[bool] = None | |
isHf: Optional[bool] = None | |
isMod: Optional[bool] = None | |
followerCount: Optional[int] = None | |
class Article: | |
paper: Optional[Paper] = None | |
publishedAt: Optional[datetime] = None | |
title: Optional[str] = None | |
thumbnail: Optional[str] = None | |
numComments: Optional[int] = None | |
submittedBy: Optional[SubmittedBy] = None | |
isAuthorParticipating: Optional[bool] = None | |
def safe_get(data: Dict, *keys: str) -> Any: | |
"""安全获取嵌套字典值""" | |
for key in keys: | |
data = data.get(key, {}) if isinstance(data, dict) else None | |
return data if data != {} else None | |
def parse_article(data: Dict[str, Any]) -> Article: | |
"""容错式解析函数""" | |
def parse_datetime(dt_str: Optional[str]) -> Optional[datetime]: | |
"""安全解析时间""" | |
if not dt_str: | |
return None | |
try: | |
if dt_str.endswith('Z'): | |
dt_str = dt_str[:-1] + '+00:00' | |
return datetime.fromisoformat(dt_str) | |
except ValueError: | |
return None | |
# 解析作者列表 | |
authors = [] | |
for author_data in safe_get(data, "paper", "authors") or []: | |
authors.append(Author( | |
_id=author_data.get("_id"), | |
name=author_data.get("name"), | |
hidden=author_data.get("hidden") | |
)) | |
# 解析论文 | |
paper = Paper( | |
id=safe_get(data, "paper", "id"), | |
authors=authors, | |
publishedAt=parse_datetime(safe_get(data, "paper", "publishedAt")), | |
title=safe_get(data, "paper", "title"), | |
summary=safe_get(data, "paper", "summary"), | |
upvotes=safe_get(data, "paper", "upvotes"), | |
discussionId=safe_get(data, "paper", "discussionId") | |
) if safe_get(data, "paper") else None | |
# 解析提交者 | |
submitted_by_data = safe_get(data, "submittedBy") | |
submitted_by = SubmittedBy( | |
_id=submitted_by_data.get("_id") if submitted_by_data else None, | |
avatarUrl=submitted_by_data.get("avatarUrl") if submitted_by_data else None, | |
fullname=submitted_by_data.get("fullname") if submitted_by_data else None, | |
name=submitted_by_data.get("name") if submitted_by_data else None, | |
type=submitted_by_data.get("type") if submitted_by_data else None, | |
isPro=submitted_by_data.get("isPro") if submitted_by_data else None, | |
isHf=submitted_by_data.get("isHf") if submitted_by_data else None, | |
isMod=submitted_by_data.get("isMod") if submitted_by_data else None, | |
followerCount=submitted_by_data.get("followerCount") if submitted_by_data else None | |
) if submitted_by_data else None | |
# 构建最终对象 | |
return Article( | |
paper=paper, | |
publishedAt=parse_datetime(data.get("publishedAt")), | |
title=data.get("title"), | |
thumbnail=data.get("thumbnail"), | |
numComments=data.get("numComments"), | |
submittedBy=submitted_by, | |
isAuthorParticipating=data.get("isAuthorParticipating") | |
) | |
API_URL = "https://huggingface.co/api/daily_papers" | |
cache = {} | |
def make_request(url: str): | |
# Create a hash of the URL to use as the cache key | |
url_hash = hashlib.md5(url.encode()).hexdigest() | |
# Check if the response is already cached | |
if url_hash in cache: | |
print(f"Cache hit for URL: {url}") | |
return cache[url_hash] | |
http_proxy = os.getenv("HF_HTTP_PROXY") | |
https_proxy = os.getenv("HF_HTTPS_PROXY") | |
proxies = { | |
"http": http_proxy, | |
"https": https_proxy | |
} if http_proxy or https_proxy else None | |
attempts = 0 | |
while attempts < 3: | |
try: | |
response = requests.get(url, proxies=proxies) | |
response.raise_for_status() | |
data = response.json() | |
# Cache the response | |
cache[url_hash] = data | |
return data | |
except requests.RequestException as e: | |
attempts += 1 | |
print(f"Attempt {attempts} failed: {e}") | |
if attempts == 3: | |
return [] | |
def fetch_papers(): | |
data = make_request(API_URL) | |
return [parse_article(item) for item in data] | |
def fetch_papers_with_date(date: datetime): | |
formatted_date = date.strftime("%Y-%m-%d") | |
data = make_request(API_URL + "?date=" + formatted_date) | |
return [parse_article(item) for item in data] | |
def fetch_papers_with_daterange(start_date: datetime, end_date: datetime): | |
# return [] | |
# 每天的数据都是独立的,所以只需要遍历日期范围即可 | |
articles = [] | |
current_date = start_date | |
while current_date <= end_date: | |
print(current_date) | |
articles.extend(fetch_papers_with_date(current_date)) | |
print(f"Total articles: {len(articles)}") | |
current_date += datetime.timedelta(days=1) | |
# 根据每个文章的.paper.id去重 | |
unique_articles = {} | |
for article in articles: | |
if article.paper.id not in unique_articles: | |
unique_articles[article.paper.id] = article | |
return list(unique_articles.values()) | |
def sort_by_date(articles): | |
return sorted(articles, key=lambda x: x.publishedAt, reverse=True) | |
def sort_by_upvotes(articles): | |
return sorted(articles, key=lambda x: x.paper.upvotes, reverse=True) | |
def sort_by_comments(articles): | |
return sorted(articles, key=lambda x: x.numComments, reverse=True) | |
def format_author(author): | |
"""格式化作者信息""" | |
if not author: | |
return "" | |
hidden_status = "(隐藏)" if author.hidden else "" | |
if author.name: | |
return f"<a href='https://scholar.google.com/citations?view_op=search_authors&mauthors={author.name.replace(' ', '+')}'>{author.name}</a>{hidden_status}" | |
return f"匿名作者{hidden_status}" | |
def format_paper_info(article): | |
"""生成论文展示的 HTML 内容""" | |
if not article.paper: | |
return "论文信息缺失" | |
info = [] | |
# 标题部分 | |
info.append(f"<h2>{article.title or '无标题论文'}</h2>") | |
# 缩略图 | |
if article.thumbnail: | |
info.append(f"<p><img src='{article.thumbnail}' style='max-width: 30em; width: 100%; margin: auto'/></p>") | |
# 基本信息 | |
info.append(f"<p><strong>论文 ID</strong>:<a href='https://huggingface.co/papers/{article.paper.id}'>{article.paper.id or '未知'}</a></p>") | |
info.append(f"<p><strong>发布时间</strong>:{article.paper.publishedAt.strftime('%Y-%m-%d %H:%M') if article.paper.publishedAt else '未知'}</p>") | |
# 作者信息 | |
authors = "、".join([format_author(a) for a in article.paper.authors]) if article.paper.authors else "作者信息暂缺" | |
info.append(f"<p><strong>作者</strong>:{authors}</p>") | |
# 摘要 | |
if article.paper.summary: | |
summary = article.paper.summary.replace('{{', '{').replace('}}', '}').replace('\n', ' ') | |
info.append(f"<h3>摘要</h3><p>{summary}</p>") | |
# 讨论信息 | |
info.append(f"<p><strong>点赞数</strong>:{article.paper.upvotes or 0}<span style='margin-left: .5rem'></span>") | |
info.append(f"<strong>评论数</strong>:{article.numComments or 0}</p>") | |
if article.paper.discussionId: | |
info.append(f"<a href='https://huggingface.co/papers/{article.paper.id}/discussion/{article.paper.discussionId}'>进入讨论</a></p>") | |
# 提交者信息 | |
if article.submittedBy: | |
submitter = article.submittedBy | |
info.append(f"<hr><p><strong>提交者</strong>: ") | |
info.append( | |
f"<span><img src='{submitter.avatarUrl}' class='author' /></span>{submitter.fullname}(<a href='https://huggingface.co/{submitter.name}'>@{submitter.name}</a>) ") | |
info.append(f"粉丝数:{submitter.followerCount or 0}</p>") | |
return "".join(info) | |
def generate_table_html(papers): | |
"""生成带可点击标题的表格 HTML""" | |
html = ['<table class="paper-table"><tr><th>标题</th><th>👍点赞</th><th>💬评论</th><th>📅日期</th></tr>'] | |
for article in papers: | |
title = article.title or "无标题" | |
upvotes = article.paper.upvotes or 0 | |
comments = article.numComments or 0 | |
date = article.paper.publishedAt.strftime("%Y-%m-%d") if article.paper.publishedAt else "未知" | |
paper_id = article.paper.id | |
row = f""" | |
<tr> | |
<td><a class="paper-title" href="javascript:void(0)" onclick="showDetail('{paper_id}')">{title}</a></td> | |
<td>{upvotes}</td> | |
<td>{comments}</td> | |
<td>{date}</td> | |
</tr> | |
""" | |
html.append(row) | |
html.append("</table>") | |
return "".join(html) | |
def build_html(papers): | |
# 将所有的papers转换为一个html字符串,每个paper用一个div包裹,div内部包含paper的信息,div的id为paper的id | |
html = "" | |
for article in papers: | |
article_html = format_paper_info(article) | |
html += f"<div id='smartflow-paper-{article.paper.id.replace('.', '-')}' style='display: none'>{article_html}</div>" | |
return html | |
def query_papers(start_date_str, end_date_str): | |
"""处理日期查询""" | |
try: | |
start_date = datetime.strptime(start_date_str, "%Y-%m-%d") | |
end_date = datetime.strptime(end_date_str, "%Y-%m-%d") | |
papers = fetch_papers_with_daterange(start_date, end_date) | |
papers = sort_by_upvotes(papers) | |
return generate_table_html(papers), build_html(papers) | |
except Exception as e: | |
print(f"查询出错: {e}") | |
return "<p>⚠️ 查询失败,请检查日期格式(YYYY-MM-DD)</p>", "<p>⚠️ 查询失败,请检查日期格式(YYYY-MM-DD)</p>" | |
def show_detail(paper_id, papers): | |
"""显示论文详情""" | |
if not papers: | |
return "请先进行查询" | |
return build_html(papers) | |
# CSS 样式(可放入单独文件) | |
custom_css = """ | |
.paper-table { width: 100%; border-collapse: collapse; } | |
.paper-table td { padding: 12px; border-bottom: 1px solid #ddd; } | |
.paper-table th { font-weight: bold; background: #f9f9f920; } | |
.paper-table tr:hover { background: #f9f9f920; } | |
.paper-title { color: #1a73e8; cursor: pointer; text-decoration: none !important; } | |
.paper-title:hover { text-decoration: underline !important; } | |
.paper-table td:nth-child(2), .paper-table td:nth-child(3), .paper-table td:nth-child(4) { text-align: center; } | |
.paper-table th:nth-child(2), .paper-table th:nth-child(3), .paper-table th:nth-child(4) { text-align: center; } | |
.detail-area { margin-top: 20px; padding: 20px; border: 1px solid #ddd; border-radius: 5px; } | |
""" | |
custom_js = """ | |
function showDetail(paperId) { | |
// 隐藏 smartflow-paper-paperId 的所有兄弟节点 | |
var siblings = document.querySelectorAll(`div[id^='smartflow-paper-']:not(#smartflow-paper-${paperId.replace('.', '-')})`); | |
siblings.forEach(sibling => sibling.style.display = 'none'); | |
// 显示当前节点 | |
var paper = document.getElementById(`smartflow-paper-${paperId.replace('.', '-')}`); | |
if (paper) { | |
paper.style.display = 'block'; | |
} | |
} | |
""" | |
def create_interface(): | |
"""创建新的界面布局""" | |
with gr.Blocks(title="Hugging Face Daily Paper", css=custom_css, head=f"<script>{custom_js}</script>") as app: | |
# 主界面 | |
gr.Markdown("# 📚 Hugging Face Daily Paper") | |
# 查询控制区 | |
with gr.Row(): | |
start_date = gr.Textbox(label="起始日期", placeholder="YYYY-MM-DD", value=datetime.now().strftime("%Y-%m-%d")) | |
end_date = gr.Textbox(label="结束日期", placeholder="YYYY-MM-DD", value=datetime.now().strftime("%Y-%m-%d")) | |
query_btn = gr.Button("🔍 查询", variant="primary") | |
# 结果显示区 | |
with gr.Column(visible=True): | |
results_html = gr.HTML(label="查询结果") | |
# 论文详情区 | |
with gr.Column(visible=True, elem_classes="detail-area"): | |
gr.Markdown("## 论文详情") | |
detail_html = gr.HTML(elem_id="detail-html") | |
# 事件处理 | |
query_btn.click( | |
fn=query_papers, | |
inputs=[start_date, end_date], | |
outputs=[results_html, detail_html] | |
) | |
return app | |
if __name__ == "__main__": | |
gr.close_all() | |
app = create_interface() | |
app.launch( | |
# server_name="localhost", | |
# server_port=7860, | |
# share=True | |
) | |