File size: 2,048 Bytes
4f8ad24
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import os
import warnings
from typing import Iterator, Tuple, Union

import requests
from PIL import UnidentifiedImageError, Image
from PIL.Image import DecompressionBombError
from hbutils.system import urlsplit, TemporaryDirectory

from .base import RootDataSource
from ..model import ImageItem
from ..utils import get_requests_session, download_file


class NoURL(Exception):
    pass


class WebDataSource(RootDataSource):
    def __init__(self, group_name: str, session: requests.Session = None, download_silent: bool = True):
        self.download_silent = download_silent
        self.session = session or get_requests_session()
        self.group_name = group_name

    def _iter_data(self) -> Iterator[Tuple[Union[str, int], str, dict]]:
        raise NotImplementedError  # pragma: no cover

    def _iter(self) -> Iterator[ImageItem]:
        for id_, url, meta in self._iter_data():
            _, ext_name = os.path.splitext(urlsplit(url).filename)
            if ext_name.lower() == '.gif':
                warnings.warn(f'{self.group_name.capitalize()} resource {id_} is a GIF image, skipped.')
                continue
            filename = f'{self.group_name}_{id_}{ext_name}'
            with TemporaryDirectory(ignore_cleanup_errors=True) as td:
                td_file = os.path.join(td, filename)
                try:
                    download_file(
                        url, td_file, desc=filename,
                        session=self.session, silent=self.download_silent
                    )
                    image = Image.open(td_file)
                    image.load()
                except UnidentifiedImageError:
                    warnings.warn(f'{self.group_name.capitalize()} resource {id_} unidentified as image, skipped.')
                    continue
                except (IOError, DecompressionBombError) as err:
                    warnings.warn(f'Skipped due to error: {err!r}')
                    continue

                meta = {**meta, 'url': url}
                yield ImageItem(image, meta)