File size: 3,627 Bytes
d93884d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
347c128
 
d93884d
347c128
d93884d
347c128
d93884d
 
 
 
 
 
 
 
 
 
347c128
d93884d
 
347c128
 
 
d93884d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import datetime
from acme import errors, challenges

def challens(order, directory) -> dict:
    challs = {}
    for auth in list(order.authorizations):
        for challenge in auth.body.challenges:
            if isinstance(challenge.chall, challenges.DNS01):
                domain = auth.body.identifier.value
                challs[domain] = challs[domain] if domain in challs else []
                challs[domain].append(challenge)
    if not challs:
        msg = f"ACME server at '{directory}' does not support DNS-01 challenge."
        raise errors.ChallengeUnavailable(msg.format(directory=str(directory)))
    return challs

def get_tokens(client, csr, directory):
    verification_tokens = {}
    responses = {}
    order = client.new_order(csr)
    challs = challens(order, directory)
    for domain, challenge_items in challs.items():
        domain = f"_acme-challenge.{domain}"
        for challenge in challenge_items:
            verification_tokens[domain] = verification_tokens[domain] if domain in verification_tokens else []
            response, validation = challenge.response_and_validation(client.net.key)
            verification_tokens[domain].append(validation)
            responses[challenge.chall.token] = response
    _verification_tokens = verification_tokens
    return verification_tokens, challs, order

def process_challenge(client, challenge, domain):
    try:
        response, _validation = challenge.response_and_validation(client.net.key)
        print(f"Challenge verified for domain: {domain}")
        token = challenge.chall.token
        if isinstance(token, bytes):
            token = token.decode('utf-8', errors='replace')
        return response, token
    except Exception as e:
        print(f"Error processing challenge for domain {domain}: {e}")
        return None, None

def answer_challenge(client, challenge, response, domain):
    try:
        answer = client.answer_challenge(challenge, response)
        print(f"Challenge answered for domain: {domain}")
        return answer
    except Exception as e:
        print(f"Error answering challenge for domain {domain}: {e}")
        return None

def finalize_order(client, order, deadline):
    try:
        data = client.poll_and_finalize(order, deadline=deadline)
        return data
    except Exception as e:
        data = client.poll_and_finalize(order, deadline=deadline)
        print(f"Error finalizing order: {e}")
        print(data)
        return None

def retrieve_certificate(final_order):
    try:
        return final_order.fullchain_pem.encode()
    except Exception as e:
        print(f"Error retrieving certificate: {e}")
        return None
    
def verify_tokens(client, challs, order):
    deadline = datetime.datetime.now() + datetime.timedelta(seconds=180)
    answers = []
    responses = {}
    print(client)
    print(challs)
    print(order)
    for domain, challenge_list in challs.items():
        print(f"Fetching challenges for domain: {domain}")
        for challenge in challenge_list:
            response, token = process_challenge(client, challenge, domain)
            if response is None:
                continue
            
            responses[token] = response
            answer = answer_challenge(client, challenge, response, domain)
            if answer is not None:
                answers.append(answer)
    final_order = finalize_order(client, order, deadline)
    if final_order is None:
        return None
    cert = retrieve_certificate(final_order)
    return cert