import requests import re import base64 import json from mainLogic.big4.dl_obsolete import DL from mainLogic.utils.glv import Global class LicenseKeyFetcher: def __init__(self, token): self.token = token def build_license_url(self, encoded_otp_key): return f"https://api.penpencil.co/v1/videos/get-otp?key={encoded_otp_key}&isEncoded=true" def get_headers(self): headers = { "accept": "*/*", "accept-language": "en-US,en;q=0.9,la;q=0.8", "authorization": f"Bearer {self.token}", "cache-control": "no-cache", "client-id": "5eb393ee95fab7468a79d189", "client-type": "WEB", "client-version": "200", "content-type": "application/json", "dnt": "1", "origin": "https://www.pw.live", "pragma": "no-cache", "priority": "u=1, i", "randomid": "180ff4c6-9ec3-4329-b1b5-1ad2f6746795", "referer": "https://www.pw.live/", "sec-ch-ua": "\"Google Chrome\";v=\"125\", \"Chromium\";v=\"125\", \"Not.A/Brand\";v=\"24\"", "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": "\"Windows\"", "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "cross-site", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36" } return headers def key_char_at(self, key, i): return ord(key[i % len(key)]) def b64_encode(self, data): if not data: return data encoded = base64.b64encode(bytes(data)).decode('utf-8') return encoded def get_key_final(self, otp): decoded_bytes = base64.b64decode(otp) length = len(decoded_bytes) decoded_ints = [int(byte) for byte in decoded_bytes] result = "".join( chr( decoded_ints[i] ^ ord(self.token[i % len(self.token)]) ) for i in range(length) ) return result def xor_encrypt(self, data): return [ord(c) ^ self.key_char_at(self.token, i) for i, c in enumerate(data)] def insert_zeros(self, hex_string): result = "00" for i in range(0, len(hex_string), 2): result += hex_string[i:i+2] if i + 2 < len(hex_string): result += "00" return result def extract_kid_from_mpd(self, url): response = requests.get(url) response.raise_for_status() mpd_content = response.text pattern = r'default_KID="([0-9a-fA-F-]+)"' match = re.search(pattern, mpd_content) return match.group(1) if match else None def get_key(self, id, verbose=True): if verbose: Global.hr() if verbose: Global.dprint("Beginning to get the key for the video... & Audio :) ") if verbose: Global.dprint(f"ID: {id}") if verbose: Global.dprint("Building the URL to get the key...") try: url = DL.buildUrl(id) if verbose: Global.sprint(f"URL: {url}") if verbose: Global.dprint("Extracting the KID from the MPD file...") kid = self.extract_kid_from_mpd(url).replace("-", "") if verbose: Global.sprint(f"KID: {kid}") if verbose: Global.dprint("Encrypting the KID to get the key...") otp_key = self.b64_encode(self.xor_encrypt(kid)) if verbose: Global.sprint(f"OTP Key: {otp_key}") if verbose: Global.dprint("Encoding the OTP key to hex...") encoded_otp_key_step1 = otp_key.encode('utf-8').hex() encoded_otp_key = self.insert_zeros(encoded_otp_key_step1) if verbose: Global.sprint(f"Encoded OTP Key: {encoded_otp_key}") if verbose: Global.dprint("Building the license URL...") license_url = self.build_license_url(encoded_otp_key) if verbose: Global.sprint(f"License URL: {license_url}") if verbose: Global.dprint("Getting the headers...") headers = self.get_headers() if verbose: Global.sprint(f"Headers: {json.dumps(headers, indent=4)}") if verbose: Global.dprint("Making a request to the server to get the license (key)...") response = requests.get(license_url, headers=headers) if verbose: Global.sprint(f"Response: {response}") if response.status_code == 200: if 'data' in response.json() and 'otp' in response.json()['data']: if verbose: Global.sprint("Key received successfully!") key = self.get_key_final(response.json()['data']['otp']) if verbose: Global.sprint(f"Key: {key}") if verbose:Global.hr() return (kid,key) else: Global.errprint("Could not get the key from the server. Exiting...") return None except Exception as e: Global.errprint(f"An error occurred while getting the key: {e}") return None # Example usage # TOKEN = "your_token_here" # fetcher = LicenseKeyFetcher(TOKEN) # key = fetcher.get_key(video_id)