Spaces:
Runtime error
Runtime error
File size: 5,373 Bytes
4f8ad24 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
import datetime
import os
from enum import Enum
from typing import Optional, Iterator, List, Tuple, Union
from hbutils.system import urlsplit
from .web import NoURL, WebDataSource
from ..utils import get_requests_session, srequest
class Rating(str, Enum):
SAFE = "s"
QUESTIONABLE = "q"
EXPLICIT = "e"
class PostOrder(Enum):
POPULARITY = "popularity"
DATE = "date"
QUALITY = "quality"
RANDOM = "random"
RECENTLY_FAVORITED = "recently_favorited"
RECENTLY_VOTED = "recently_voted"
class FileType(Enum):
IMAGE = "image" # jpeg, png, webp formats
GIF = "animated_gif" # gif format
VIDEO = "video" # mp4, webm formats
def _tags_by_kwargs(**kwargs):
tags = []
for k, v in kwargs.items():
if v is None:
pass
elif k in {"order", "rating", "file_type"} and v is not FileType.IMAGE: # noqa
tags.append(f"{k}:{v.value}")
elif k in {"threshold", "recommended_for", "voted"}:
tags.append(f"{k}:{v}")
elif k == "date":
date = "..".join(d.strftime("%Y-%m-%dT%H:%M") for d in self.date) # type: ignore[union-attr]
tags.append(f"date:{date}")
elif k == "added_by":
for user in self.added_by: # type: ignore[union-attr]
tags.append(f"user:{user}")
return tags
class SankakuSource(WebDataSource):
def __init__(self, tags: List[str], order: Optional[PostOrder] = None,
rating: Optional[Rating] = None, file_type: Optional[FileType] = None,
date: Optional[Tuple[datetime.datetime, datetime.datetime]] = None,
username: Optional[str] = None, password: Optional[str] = None, access_token: Optional[str] = None,
min_size: Optional[int] = 800, download_silent: bool = True, group_name: str = 'sankaku', **kwargs):
WebDataSource.__init__(self, group_name, get_requests_session(), download_silent)
self.tags = tags + _tags_by_kwargs(order=order, rating=rating, file_type=file_type, date=date, **kwargs)
self.username, self.password = username, password
self.access_token = access_token
self.min_size = min_size
self.auth_session = get_requests_session(headers={
'Content-Type': 'application/json; charset=utf-8',
'Accept-Encoding': 'gzip, deflate, br',
'Host': 'capi-v2.sankakucomplex.com',
'X-Requested-With': 'com.android.browser',
})
_FILE_URLS = [
('sample_url', 'sample_width', 'sample_height'),
('preview_url', 'preview_width', 'preview_height'),
('file_url', 'width', 'height'),
]
def _select_url(self, data):
if self.min_size is not None:
f_url, f_width, f_height = None, None, None
for url_name, width_name, height_name in self._FILE_URLS:
if url_name in data and width_name in data and height_name in data:
url, width, height = data[url_name], data[width_name], data[height_name]
if width and height and width >= self.min_size and height >= self.min_size:
if f_url is None or width < f_width:
f_url, f_width, f_height = url, width, height
if f_url is not None:
return f_url
if 'file_url' in data and data['file_url']:
return data['file_url']
else:
raise NoURL
def _login(self):
if self.access_token:
self.auth_session.headers.update({
"Authorization": f"Bearer {self.access_token}",
})
elif self.username and self.password:
resp = srequest(self.auth_session, 'POST', 'https://login.sankakucomplex.com/auth/token',
json={"login": self.username, "password": self.password})
resp.raise_for_status()
login_data = resp.json()
self.auth_session.headers.update({
"Authorization": f"{login_data['token_type']} {login_data['access_token']}",
})
def _iter_data(self) -> Iterator[Tuple[Union[str, int], str, dict]]:
self._login()
page = 1
while True:
resp = srequest(self.auth_session, 'GET', 'https://capi-v2.sankakucomplex.com/posts', params={
'lang': 'en',
'page': str(page),
'limit': '100',
'tags': ' '.join(self.tags),
})
resp.raise_for_status()
if not resp.json():
break
for data in resp.json():
if 'file_type' not in data or 'image' not in data['file_type']:
continue
try:
url = self._select_url(data)
except NoURL:
continue
_, ext_name = os.path.splitext(urlsplit(url).filename)
filename = f'{self.group_name}_{data["id"]}{ext_name}'
meta = {
'sankaku': data,
'group_id': f'{self.group_name}_{data["id"]}',
'filename': filename,
'tags': {key: 1.0 for key in [t_item['name'] for t_item in data['tags']]}
}
yield data["id"], url, meta
page += 1
|