File size: 1,890 Bytes
fe5c39d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import json
from typing import Callable

from aiohttp.client import ClientSession

origin_request = ClientSession.request


class MockAioResponse:
    check_funcs: dict[tuple[str, str], Callable[[dict], str]] = {}
    rsp_cache: dict[str, str] = {}
    name = "aiohttp"
    status = 200

    def __init__(self, session, method, url, **kwargs) -> None:
        fn = self.check_funcs.get((method, url))
        _kwargs = {k: v for k, v in kwargs.items() if k != "proxy"}
        self.key = f"{self.name}-{method}-{url}-{fn(kwargs) if fn else json.dumps(_kwargs, sort_keys=True)}"
        self.mng = self.response = None
        if self.key not in self.rsp_cache:
            self.mng = origin_request(session, method, url, **kwargs)

    async def __aenter__(self):
        if self.response:
            await self.response.__aenter__()
            self.status = self.response.status
        elif self.mng:
            self.response = await self.mng.__aenter__()
        return self

    async def __aexit__(self, *args, **kwargs):
        if self.response:
            await self.response.__aexit__(*args, **kwargs)
            self.response = None
        elif self.mng:
            await self.mng.__aexit__(*args, **kwargs)
            self.mng = None

    async def json(self, *args, **kwargs):
        if self.key in self.rsp_cache:
            return self.rsp_cache[self.key]
        data = await self.response.json(*args, **kwargs)
        self.rsp_cache[self.key] = data
        return data

    @property
    def content(self):
        return self

    async def read(self):
        if self.key in self.rsp_cache:
            return eval(self.rsp_cache[self.key])
        data = await self.response.content.read()
        self.rsp_cache[self.key] = str(data)
        return data

    def raise_for_status(self):
        if self.response:
            self.response.raise_for_status()