Spaces:
Sleeping
Sleeping
File size: 3,627 Bytes
d93884d 347c128 d93884d 347c128 d93884d 347c128 d93884d 347c128 d93884d 347c128 d93884d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
import datetime
from acme import errors, challenges
def challens(order, directory) -> dict:
challs = {}
for auth in list(order.authorizations):
for challenge in auth.body.challenges:
if isinstance(challenge.chall, challenges.DNS01):
domain = auth.body.identifier.value
challs[domain] = challs[domain] if domain in challs else []
challs[domain].append(challenge)
if not challs:
msg = f"ACME server at '{directory}' does not support DNS-01 challenge."
raise errors.ChallengeUnavailable(msg.format(directory=str(directory)))
return challs
def get_tokens(client, csr, directory):
verification_tokens = {}
responses = {}
order = client.new_order(csr)
challs = challens(order, directory)
for domain, challenge_items in challs.items():
domain = f"_acme-challenge.{domain}"
for challenge in challenge_items:
verification_tokens[domain] = verification_tokens[domain] if domain in verification_tokens else []
response, validation = challenge.response_and_validation(client.net.key)
verification_tokens[domain].append(validation)
responses[challenge.chall.token] = response
_verification_tokens = verification_tokens
return verification_tokens, challs, order
def process_challenge(client, challenge, domain):
try:
response, _validation = challenge.response_and_validation(client.net.key)
print(f"Challenge verified for domain: {domain}")
token = challenge.chall.token
if isinstance(token, bytes):
token = token.decode('utf-8', errors='replace')
return response, token
except Exception as e:
print(f"Error processing challenge for domain {domain}: {e}")
return None, None
def answer_challenge(client, challenge, response, domain):
try:
answer = client.answer_challenge(challenge, response)
print(f"Challenge answered for domain: {domain}")
return answer
except Exception as e:
print(f"Error answering challenge for domain {domain}: {e}")
return None
def finalize_order(client, order, deadline):
try:
data = client.poll_and_finalize(order, deadline=deadline)
return data
except Exception as e:
data = client.poll_and_finalize(order, deadline=deadline)
print(f"Error finalizing order: {e}")
print(data)
return None
def retrieve_certificate(final_order):
try:
return final_order.fullchain_pem.encode()
except Exception as e:
print(f"Error retrieving certificate: {e}")
return None
def verify_tokens(client, challs, order):
deadline = datetime.datetime.now() + datetime.timedelta(seconds=180)
answers = []
responses = {}
print(client)
print(challs)
print(order)
for domain, challenge_list in challs.items():
print(f"Fetching challenges for domain: {domain}")
for challenge in challenge_list:
response, token = process_challenge(client, challenge, domain)
if response is None:
continue
responses[token] = response
answer = answer_challenge(client, challenge, response, domain)
if answer is not None:
answers.append(answer)
final_order = finalize_order(client, order, deadline)
if final_order is None:
return None
cert = retrieve_certificate(final_order)
return cert |