From 10a3c24facd508a592a83d47e6b43efde8a2ea16 Mon Sep 17 00:00:00 2001 From: Lil-Ran Date: Wed, 29 Oct 2025 18:50:31 +0800 Subject: [PATCH] refactor: add path handling (gh-29), detect encodings, format py scripts --- oneshot/detect.py | 111 ++++++----- oneshot/runtime.py | 69 +++---- oneshot/shot.py | 453 +++++++++++++++++++++++++++++---------------- 3 files changed, 392 insertions(+), 241 deletions(-) diff --git a/oneshot/detect.py b/oneshot/detect.py index 9c61922..5c04b00 100644 --- a/oneshot/detect.py +++ b/oneshot/detect.py @@ -9,10 +9,13 @@ def ascii_ratio(data: bytes) -> float: def source_as_file(file_path: str) -> Union[List[bytes], None]: try: - with open(file_path, 'r') as f: - co = compile(f.read(), '', 'exec') - data = [i for i in co.co_consts if type(i) is bytes - and i.startswith(b'PY00') and len(i) > 64] + with open(file_path, "r") as f: + co = compile(f.read(), "", "exec") + data = [ + i + for i in co.co_consts + if type(i) is bytes and i.startswith(b"PY00") and len(i) > 64 + ] return data except: return None @@ -21,12 +24,19 @@ def source_as_file(file_path: str) -> Union[List[bytes], None]: def source_as_lines(file_path: str) -> Union[List[bytes], None]: data = [] try: - with open(file_path, 'r') as f: + with open(file_path, "r") as f: for line in f: try: - co = compile(line, '', 'exec') - data.extend([i for i in co.co_consts if type(i) is bytes - and i.startswith(b'PY00') and len(i) > 64]) + co = compile(line, "", "exec") + data.extend( + [ + i + for i in co.co_consts + if type(i) is bytes + and i.startswith(b"PY00") + and len(i) > 64 + ] + ) except: # ignore not compilable lines pass @@ -39,14 +49,14 @@ def find_data_from_bytes(data: bytes, max_count=-1) -> List[bytes]: result = [] idx = 0 while len(result) != max_count: - idx = data.find(b'PY00') + idx = data.find(b"PY00") if idx == -1: break data = data[idx:] if len(data) < 64: break - header_len = int.from_bytes(data[28:32], 'little') - body_len = int.from_bytes(data[32:36], 'little') + header_len = int.from_bytes(data[28:32], "little") + body_len = int.from_bytes(data[32:36], "little") if header_len > 256 or body_len > 0xFFFFF or header_len + body_len > len(data): # compressed or coincident, skip data = data[5:] @@ -55,16 +65,20 @@ def find_data_from_bytes(data: bytes, max_count=-1) -> List[bytes]: complete_object_length = header_len + body_len # maybe followed by data for other Python versions or another part of BCC - next_segment_offset = int.from_bytes(data[56:60], 'little') + next_segment_offset = int.from_bytes(data[56:60], "little") data_next = data[next_segment_offset:] - while next_segment_offset != 0 and data_next.startswith(b'PY00') and len(data_next) >= 64: - header_len = int.from_bytes(data_next[28:32], 'little') - body_len = int.from_bytes(data_next[32:36], 'little') + while ( + next_segment_offset != 0 + and data_next.startswith(b"PY00") + and len(data_next) >= 64 + ): + header_len = int.from_bytes(data_next[28:32], "little") + body_len = int.from_bytes(data_next[32:36], "little") complete_object_length = next_segment_offset + header_len + body_len - if int.from_bytes(data_next[56:60], 'little') == 0: + if int.from_bytes(data_next[56:60], "little") == 0: break - next_segment_offset += int.from_bytes(data_next[56:60], 'little') + next_segment_offset += int.from_bytes(data_next[56:60], "little") data_next = data[next_segment_offset:] result.append(data[:complete_object_length]) @@ -72,51 +86,60 @@ def find_data_from_bytes(data: bytes, max_count=-1) -> List[bytes]: return result -def nuitka_package(head: bytes, relative_path: str) -> Union[List[Tuple[str, bytes]], None]: - first_occurrence = head.find(b'PY00') +def nuitka_package( + head: bytes, relative_path: str +) -> Union[List[Tuple[str, bytes]], None]: + first_occurrence = head.find(b"PY00") if first_occurrence == -1: return None - last_dot_bytecode = head.rfind(b'.bytecode\x00', 0, first_occurrence) + last_dot_bytecode = head.rfind(b".bytecode\x00", 0, first_occurrence) if last_dot_bytecode == -1: return None - length = int.from_bytes( - head[last_dot_bytecode-4:last_dot_bytecode], 'little') + length = int.from_bytes(head[last_dot_bytecode - 4 : last_dot_bytecode], "little") end = last_dot_bytecode + length cur = last_dot_bytecode result = [] while cur < end: - module_name_len = head.find(b'\x00', cur, end) - cur - module_name = head[cur:cur + module_name_len].decode('utf-8') + module_name_len = head.find(b"\x00", cur, end) - cur + module_name = head[cur : cur + module_name_len].decode("utf-8") cur += module_name_len + 1 - module_len = int.from_bytes(head[cur:cur + 4], 'little') + module_len = int.from_bytes(head[cur : cur + 4], "little") cur += 4 - module_data = find_data_from_bytes(head[cur:cur + module_len], 1) + module_data = find_data_from_bytes(head[cur : cur + module_len], 1) if module_data: - result.append((os.path.join(relative_path.rstrip( - '/\\') + '.1shot.ext', module_name), module_data[0])) + result.append( + ( + os.path.join( + relative_path.rstrip("/\\") + ".1shot.ext", module_name + ), + module_data[0], + ) + ) cur += module_len if result: - logger = logging.getLogger('detect') - logger.info(f'Found data in Nuitka package: {relative_path}') + logger = logging.getLogger("detect") + logger.info(f"Found data in Nuitka package: {relative_path}") return result return None -def detect_process(file_path: str, relative_path: str) -> Union[List[Tuple[str, bytes]], None]: - ''' +def detect_process( + file_path: str, relative_path: str +) -> Union[List[Tuple[str, bytes]], None]: + """ Returns a list of (relative_path, bytes_raw) tuples, or None. Do not raise exceptions. - ''' - logger = logging.getLogger('detect') + """ + logger = logging.getLogger("detect") try: - with open(file_path, 'rb') as f: + with open(file_path, "rb") as f: head = f.read(16 * 1024 * 1024) except: - logger.error(f'Failed to read file: {relative_path}') + logger.error(f"Failed to read file: {relative_path}") return None - if b'__pyarmor__' not in head: + if b"__pyarmor__" not in head: # no need to dig deeper return None @@ -134,16 +157,16 @@ def detect_process(file_path: str, relative_path: str) -> Union[List[Tuple[str, if result_len == 0: return None elif result_len == 1: - logger.info(f'Found data in source: {relative_path}') + logger.info(f"Found data in source: {relative_path}") return [(relative_path, result[0])] else: - logger.info(f'Found data in source: {relative_path}') - return [(f'{relative_path}__{i}', result[i]) for i in range(len(result))] + logger.info(f"Found data in source: {relative_path}") + return [(f"{relative_path}__{i}", result[i]) for i in range(len(result))] # binary file # ignore data after 16MB, before we have a reason to read more - if b'Error, corrupted constants object' in head: + if b"Error, corrupted constants object" in head: # an interesting special case: packer put armored data in a Nuitka package # we can know the exact module names, instead of adding boring __0, __1, ... return nuitka_package(head, relative_path) @@ -153,8 +176,8 @@ def detect_process(file_path: str, relative_path: str) -> Union[List[Tuple[str, if result_len == 0: return None elif result_len == 1: - logger.info(f'Found data in binary: {relative_path}') + logger.info(f"Found data in binary: {relative_path}") return [(relative_path, result[0])] else: - logger.info(f'Found data in binary: {relative_path}') - return [(f'{relative_path}__{i}', result[i]) for i in range(len(result))] + logger.info(f"Found data in binary: {relative_path}") + return [(f"{relative_path}__{i}", result[i]) for i in range(len(result))] diff --git a/oneshot/runtime.py b/oneshot/runtime.py index dc829fd..36dd55d 100644 --- a/oneshot/runtime.py +++ b/oneshot/runtime.py @@ -1,7 +1,7 @@ import hashlib -GLOBAL_CERT = bytes.fromhex(''' +GLOBAL_CERT = bytes.fromhex(""" 30 82 01 0a 02 82 01 01 00 bf 65 30 f3 bd 67 e7 a6 9d f8 db 18 b2 b9 c1 c0 5f fe fb e5 4b 91 df 6f 38 da 51 cc ea c4 d3 04 bd 95 27 86 c1 13 ca @@ -19,13 +19,13 @@ af 09 fb 04 54 a9 ea c0 c1 e9 32 6c 77 92 7f 9f 4b 2c 1a 78 85 7c bc 2c d0 d7 83 77 5f 92 d5 db 59 10 96 53 2e 5d c7 42 12 b8 61 cb 2c 5f 46 14 9e 93 b0 53 21 a2 74 34 2d 02 03 01 00 01 -''') +""") class RuntimeInfo: def __init__(self, file_path: str) -> None: self.file_path = file_path - if file_path.endswith('.pyd'): + if file_path.endswith(".pyd"): self.extract_info_win64() else: # TODO: implement for other platforms @@ -35,62 +35,64 @@ class RuntimeInfo: self.runtime_aes_key = self.calc_aes_key() def __str__(self) -> str: - trial = self.serial_number == '000000' - product = '' + trial = self.serial_number == "000000" + product = "" for c in self.part_3[2:]: if 32 <= c <= 126: product += chr(c) else: break - return f'''\ + return f"""\ ======================== - Pyarmor Runtime ({'Trial' if trial else self.serial_number}) Information: + Pyarmor Runtime ({"Trial" if trial else self.serial_number}) Information: Product: {product} AES key: {self.runtime_aes_key.hex()} Mix string AES nonce: {self.mix_str_aes_nonce().hex()} - ========================''' + ========================""" def __repr__(self) -> str: - return f'RuntimeInfo(part_1={self.part_1}, part_2={self.part_2}, part_3={self.part_3})' + return f"RuntimeInfo(part_1={self.part_1}, part_2={self.part_2}, part_3={self.part_3})" def extract_info_win64(self) -> None: - ''' + """ Try to find useful information from `pyarmor_runtime.pyd` file, and store all three parts in the object. - ''' - with open(self.file_path, 'rb') as f: + """ + with open(self.file_path, "rb") as f: data = f.read(16 * 1024 * 1024) - cur = data.index(b'pyarmor-vax') + cur = data.index(b"pyarmor-vax") - if data[cur+11:cur+18] == b'\x00' * 7: - raise ValueError(f'{self.file_path} is a runtime template') + if data[cur + 11 : cur + 18] == b"\x00" * 7: + raise ValueError(f"{self.file_path} is a runtime template") - self.part_1 = data[cur:cur+20] + self.part_1 = data[cur : cur + 20] cur += 36 - part_2_offset = int.from_bytes(data[cur:cur+4], 'little') - part_2_len = int.from_bytes(data[cur+4:cur+8], 'little') - part_3_offset = int.from_bytes(data[cur+8:cur+12], 'little') + part_2_offset = int.from_bytes(data[cur : cur + 4], "little") + part_2_len = int.from_bytes(data[cur + 4 : cur + 8], "little") + part_3_offset = int.from_bytes(data[cur + 8 : cur + 12], "little") cur += 16 - self.part_2 = data[cur+part_2_offset:cur+part_2_offset+part_2_len] + self.part_2 = data[cur + part_2_offset : cur + part_2_offset + part_2_len] cur += part_3_offset - part_3_len = int.from_bytes(data[cur+4:cur+8], 'little') + part_3_len = int.from_bytes(data[cur + 4 : cur + 8], "little") cur += 32 - self.part_3 = data[cur:cur+part_3_len] + self.part_3 = data[cur : cur + part_3_len] def calc_aes_key(self) -> bytes: - return hashlib.md5(self.part_1 + self.part_2 + self.part_3 + GLOBAL_CERT).digest() + return hashlib.md5( + self.part_1 + self.part_2 + self.part_3 + GLOBAL_CERT + ).digest() def mix_str_aes_nonce(self) -> bytes: return self.part_3[:12] @classmethod - def default(cls) -> 'RuntimeInfo': + def default(cls) -> "RuntimeInfo": instance = cls.__new__(cls) - instance.file_path = '' - instance.part_1 = b'pyarmor-vax-000000\x00\x00' - instance.part_2 = bytes.fromhex(''' + instance.file_path = "" + instance.part_1 = b"pyarmor-vax-000000\x00\x00" + instance.part_2 = bytes.fromhex(""" 30 81 89 02 81 81 00 A8 ED 64 F4 83 49 13 FC 0F 86 6F 00 5A 8F E4 91 AA ED 1C EA D4 BB 4C 3F 7C 24 21 01 A8 D0 7D 93 F4 BF E7 FB 8C 06 57 88 6A @@ -100,23 +102,24 @@ class RuntimeInfo: BA 52 C5 B6 40 F6 AD AB BC D5 CF 5B 40 CB 8D 13 C4 28 B8 90 93 C4 76 01 09 8E 05 1E 61 FA 90 4C BF 67 D4 A7 D5 82 C1 02 03 01 00 01 - ''') - instance.part_3 = bytes.fromhex(''' + """) + instance.part_3 = bytes.fromhex(""" 69 2E 6E 6F 6E 2D 70 72 6F 66 69 74 73 E7 5A 41 9B DC 77 53 CA 1D E7 04 EB EF DA C9 A3 6C 0F 7B 00 00 00 01 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 - ''') - instance.serial_number = '000000' + """) + instance.serial_number = "000000" instance.runtime_aes_key = instance.calc_aes_key() return instance -if __name__ == '__main__': +if __name__ == "__main__": import sys + if len(sys.argv) < 2: - print('Usage: python runtime.py path/to/pyarmor_runtime[.pyd|.so|.dylib]') + print("Usage: python runtime.py path/to/pyarmor_runtime[.pyd|.so|.dylib]") exit(1) for i in sys.argv[1:]: runtime = RuntimeInfo(i) diff --git a/oneshot/shot.py b/oneshot/shot.py index 273d535..bf1c2ee 100644 --- a/oneshot/shot.py +++ b/oneshot/shot.py @@ -5,14 +5,22 @@ import os import asyncio import traceback import platform +import locale from typing import Dict, List, Tuple try: - from colorama import init, Fore, Style + from colorama import init, Fore, Style # type: ignore except ImportError: - def init(**kwargs): pass - class Fore: CYAN = RED = YELLOW = GREEN = '' - class Style: RESET_ALL = '' + + def init(**kwargs): + pass + + class Fore: + CYAN = RED = YELLOW = GREEN = "" + + class Style: + RESET_ALL = "" + from detect import detect_process from runtime import RuntimeInfo @@ -27,10 +35,53 @@ def general_aes_ctr_decrypt(data: bytes, key: bytes, nonce: bytes) -> bytes: return cipher.decrypt(data) -async def decrypt_file_async(exe_path, seq_file_path, path, args): - logger = logging.getLogger('shot') +def decode_output(data: bytes) -> str: + if not data: + return "" + + # 1) try chardet if available to guess encoding + try: + import chardet # type: ignore + + res = chardet.detect(data) + enc = res.get("encoding") + if enc: + return data.decode(enc, errors="replace") + except Exception: + pass + + # 2) try common encodings in a reasonable order + attempts = [ + "utf-8", + "utf-8-sig", + locale.getpreferredencoding(False) or None, + "cp936", + "latin-1", + ] + for enc in attempts: + if not enc: + continue + try: + return data.decode(enc) + except Exception: + continue + + try: + return data.decode("latin-1", errors="replace") + except Exception: + return "" + + +async def run_pycdc_async( + exe_path: str, + seq_file_path: str, + path_for_log: str, + show_all: bool = False, + show_err_opcode: bool = False, + show_warn_stack: bool = False, +): + logger = logging.getLogger("shot") try: - # Run without timeout process = await asyncio.create_subprocess_exec( exe_path, seq_file_path, @@ -38,143 +89,196 @@ async def decrypt_file_async(exe_path, seq_file_path, path, args): stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() - - stdout_lines = stdout.decode('latin-1').splitlines() - stderr_lines = stderr.decode('latin-1').splitlines() - + + stdout_lines = decode_output(stdout).splitlines() + stderr_lines = decode_output(stderr).splitlines() + for line in stdout_lines: - logger.warning(f'PYCDC: {line} ({path})') - + logger.warning(f"PYCDC: {line} ({path_for_log})") + for line in stderr_lines: - if line.startswith(( - 'Warning: Stack history is empty', - 'Warning: Stack history is not empty', - 'Warning: block stack is not empty', - )): - if args.show_warn_stack or args.show_all: - logger.warning(f'PYCDC: {line} ({path})') - elif line.startswith('Unsupported opcode:'): - if args.show_err_opcode or args.show_all: - logger.error(f'PYCDC: {line} ({path})') - elif line.startswith(( - 'Something TERRIBLE happened', - 'Unsupported argument', - 'Unsupported Node type', - 'Unsupported node type', - )): # annoying wont-fix errors - if args.show_all: - logger.error(f'PYCDC: {line} ({path})') + if line.startswith( + ( + "Warning: Stack history is empty", + "Warning: Stack history is not empty", + "Warning: block stack is not empty", + ) + ): + if show_warn_stack or show_all: + logger.warning(f"PYCDC: {line} ({path_for_log})") + elif line.startswith("Unsupported opcode:"): + if show_err_opcode or show_all: + logger.error(f"PYCDC: {line} ({path_for_log})") + elif line.startswith( + ( + "Something TERRIBLE happened", + "Unsupported argument", + "Unsupported Node type", + "Unsupported node type", + ) + ): # annoying wont-fix errors + if show_all: + logger.error(f"PYCDC: {line} ({path_for_log})") else: - logger.error(f'PYCDC: {line} ({path})') - + logger.error(f"PYCDC: {line} ({path_for_log})") + if process.returncode != 0: - logger.warning(f'{Fore.YELLOW}PYCDC returned 0x{process.returncode:x} ({path}){Style.RESET_ALL}') + logger.warning( + f"{Fore.YELLOW}PYCDC returned 0x{process.returncode:x} ({path_for_log}){Style.RESET_ALL}" + ) except Exception as e: error_details = traceback.format_exc() - logger.error(f'{Fore.RED}Exception: {e} ({path}){Style.RESET_ALL}') - logger.error(f'{Fore.RED}Error details: {error_details}{Style.RESET_ALL}') + logger.error(f"{Fore.RED}Exception: {e} ({path_for_log}){Style.RESET_ALL}") + logger.error(f"{Fore.RED}Error details: {error_details}{Style.RESET_ALL}") -async def decrypt_process_async(runtimes: Dict[str, RuntimeInfo], sequences: List[Tuple[str, bytes]], args): - logger = logging.getLogger('shot') +async def decrypt_process_async( + runtimes: Dict[str, RuntimeInfo], sequences: List[Tuple[str, bytes]], args +): + logger = logging.getLogger("shot") output_dir: str = args.output_dir or args.directory - - # Create a semaphore to limit concurrent processes - semaphore = asyncio.Semaphore(args.concurrent) # Use the concurrent argument - - # Get the appropriate executable for the current platform - exe_path = get_platform_executable(args) + exe_path = get_platform_executable(args.executable) + semaphore = asyncio.Semaphore(args.concurrent) - async def process_file(path, data): + async def process_file(relative_path, data): async with semaphore: try: - serial_number = data[2:8].decode('utf-8') + serial_number = data[2:8].decode("utf-8") runtime = runtimes[serial_number] - logger.info(f'{Fore.CYAN}Decrypting: {serial_number} ({path}){Style.RESET_ALL}') + logger.info( + f"{Fore.CYAN}Decrypting: {serial_number} ({relative_path}){Style.RESET_ALL}" + ) - dest_path = os.path.join(output_dir, path) if output_dir else path + dest_path = ( + os.path.join(output_dir, relative_path) + if output_dir + else os.path.abspath(relative_path) # resolve with working dir + ) # abs or rel, must has a dirname, must not ends with slash dest_dir = os.path.dirname(dest_path) if not os.path.exists(dest_dir): os.makedirs(dest_dir) if args.export_raw_data: - with open(dest_path + '.1shot.raw', 'wb') as f: + with open(dest_path + ".1shot.raw", "wb") as f: f.write(data) - # Check BCC - if int.from_bytes(data[20:24], 'little') == 9: - cipher_text_offset = int.from_bytes(data[28:32], 'little') - cipher_text_length = int.from_bytes(data[32:36], 'little') + # Check BCC; mutates "data" + if int.from_bytes(data[20:24], "little") == 9: + cipher_text_offset = int.from_bytes(data[28:32], "little") + cipher_text_length = int.from_bytes(data[32:36], "little") nonce = data[36:40] + data[44:52] bcc_aes_decrypted = general_aes_ctr_decrypt( - data[cipher_text_offset:cipher_text_offset+cipher_text_length], runtime.runtime_aes_key, nonce) - data = data[int.from_bytes(data[56:60], 'little'):] + data[ + cipher_text_offset : cipher_text_offset + cipher_text_length + ], + runtime.runtime_aes_key, + nonce, + ) + data = data[int.from_bytes(data[56:60], "little") :] bcc_architecture_mapping = { - 0x2001: 'win-x64', - 0x2003: 'linux-x64', + 0x2001: "win-x64", + 0x2003: "linux-x64", } while True: if len(bcc_aes_decrypted) < 16: break - bcc_segment_offset = int.from_bytes(bcc_aes_decrypted[0:4], 'little') - bcc_segment_length = int.from_bytes(bcc_aes_decrypted[4:8], 'little') - bcc_architecture_id = int.from_bytes(bcc_aes_decrypted[8:12], 'little') - bcc_next_segment_offset = int.from_bytes(bcc_aes_decrypted[12:16], 'little') - bcc_architecture = bcc_architecture_mapping.get(bcc_architecture_id, f'0x{bcc_architecture_id:x}') - bcc_file_path = f'{dest_path}.1shot.bcc.{bcc_architecture}.so' - with open(bcc_file_path, 'wb') as f: - f.write(bcc_aes_decrypted[bcc_segment_offset:bcc_segment_offset+bcc_segment_length]) - logger.info(f'{Fore.GREEN}Extracted BCC mode native part: {bcc_file_path}{Style.RESET_ALL}') + bcc_segment_offset = int.from_bytes( + bcc_aes_decrypted[0:4], "little" + ) + bcc_segment_length = int.from_bytes( + bcc_aes_decrypted[4:8], "little" + ) + bcc_architecture_id = int.from_bytes( + bcc_aes_decrypted[8:12], "little" + ) + bcc_next_segment_offset = int.from_bytes( + bcc_aes_decrypted[12:16], "little" + ) + bcc_architecture = bcc_architecture_mapping.get( + bcc_architecture_id, f"0x{bcc_architecture_id:x}" + ) + bcc_file_path = f"{dest_path}.1shot.bcc.{bcc_architecture}.so" + with open(bcc_file_path, "wb") as f: + f.write( + bcc_aes_decrypted[ + bcc_segment_offset : bcc_segment_offset + + bcc_segment_length + ] + ) + logger.info( + f"{Fore.GREEN}Extracted BCC mode native part: {bcc_file_path}{Style.RESET_ALL}" + ) if bcc_next_segment_offset == 0: break bcc_aes_decrypted = bcc_aes_decrypted[bcc_next_segment_offset:] - cipher_text_offset = int.from_bytes(data[28:32], 'little') - cipher_text_length = int.from_bytes(data[32:36], 'little') + cipher_text_offset = int.from_bytes(data[28:32], "little") + cipher_text_length = int.from_bytes(data[32:36], "little") nonce = data[36:40] + data[44:52] - seq_file_path = dest_path + '.1shot.seq' - with open(seq_file_path, 'wb') as f: - f.write(b'\xa1' + runtime.runtime_aes_key) - f.write(b'\xa2' + runtime.mix_str_aes_nonce()) - f.write(b'\xf0\xff') + seq_file_path = dest_path + ".1shot.seq" + with open(seq_file_path, "wb") as f: + f.write(b"\xa1" + runtime.runtime_aes_key) + f.write(b"\xa2" + runtime.mix_str_aes_nonce()) + f.write(b"\xf0\xff") f.write(data[:cipher_text_offset]) - f.write(general_aes_ctr_decrypt( - data[cipher_text_offset:cipher_text_offset+cipher_text_length], runtime.runtime_aes_key, nonce)) - f.write(data[cipher_text_offset+cipher_text_length:]) + f.write( + general_aes_ctr_decrypt( + data[ + cipher_text_offset : cipher_text_offset + + cipher_text_length + ], + runtime.runtime_aes_key, + nonce, + ) + ) + f.write(data[cipher_text_offset + cipher_text_length :]) - # Run without timeout - await decrypt_file_async(exe_path, seq_file_path, path, args) + await run_pycdc_async( + exe_path, + seq_file_path, + relative_path, + args.show_all, + args.show_err_opcode, + args.show_warn_stack, + ) except Exception as e: error_details = traceback.format_exc() - logger.error(f'{Fore.RED}Decrypt failed: {e} ({path}){Style.RESET_ALL}') - logger.error(f'{Fore.RED}Error details: {error_details}{Style.RESET_ALL}') - - # Create tasks for all files + logger.error( + f"{Fore.RED}Decrypt failed: {e} ({relative_path}){Style.RESET_ALL}" + ) + logger.error( + f"{Fore.RED}Error details: {error_details}{Style.RESET_ALL}" + ) + tasks = [process_file(path, data) for path, data in sequences] - - # Run all tasks concurrently await asyncio.gather(*tasks) -def decrypt_process(runtimes: Dict[str, RuntimeInfo], sequences: List[Tuple[str, bytes]], args): +def decrypt_process( + runtimes: Dict[str, RuntimeInfo], sequences: List[Tuple[str, bytes]], args +): asyncio.run(decrypt_process_async(runtimes, sequences, args)) -def get_platform_executable(args) -> str: +def get_platform_executable(specified: str) -> str: """ Get the appropriate executable for the current platform """ - logger = logging.getLogger('shot') + logger = logging.getLogger("shot") # If a specific executable is provided, use it - if args.executable: - if os.path.exists(args.executable): - logger.info(f'{Fore.GREEN}Using specified executable: {args.executable}{Style.RESET_ALL}') - return args.executable + if specified: + if os.path.exists(specified): + logger.info( + f"{Fore.GREEN}Using specified executable: {specified}{Style.RESET_ALL}" + ) + return specified else: - logger.warning(f'{Fore.YELLOW}Specified executable not found: {args.executable}{Style.RESET_ALL}') + logger.warning( + f"{Fore.YELLOW}Specified executable not found: {specified}{Style.RESET_ALL}" + ) oneshot_dir = os.path.dirname(os.path.abspath(__file__)) @@ -182,88 +286,91 @@ def get_platform_executable(args) -> str: machine = platform.machine().lower() # Check for architecture-specific executables - arch_specific_exe = f'pyarmor-1shot-{system}-{machine}' - if system == 'windows': - arch_specific_exe += '.exe' + arch_specific_exe = f"pyarmor-1shot-{system}-{machine}" + if system == "windows": + arch_specific_exe += ".exe" arch_exe_path = os.path.join(oneshot_dir, arch_specific_exe) if os.path.exists(arch_exe_path): - logger.info(f'{Fore.GREEN}Using architecture-specific executable: {arch_specific_exe}{Style.RESET_ALL}') + logger.info( + f"{Fore.GREEN}Using architecture-specific executable: {arch_specific_exe}{Style.RESET_ALL}" + ) return arch_exe_path platform_map = { - 'windows': 'pyarmor-1shot.exe', - 'linux': 'pyarmor-1shot', - 'darwin': 'pyarmor-1shot', + "windows": "pyarmor-1shot.exe", + "linux": "pyarmor-1shot", + "darwin": "pyarmor-1shot", } - base_exe_name = platform_map.get(system, 'pyarmor-1shot') + base_exe_name = platform_map.get(system, "pyarmor-1shot") # Then check for platform-specific executable platform_exe_path = os.path.join(oneshot_dir, base_exe_name) if os.path.exists(platform_exe_path): - logger.info(f'{Fore.GREEN}Using executable: {base_exe_name}{Style.RESET_ALL}') + logger.info(f"{Fore.GREEN}Using executable: {base_exe_name}{Style.RESET_ALL}") return platform_exe_path # Finally, check for generic executable - generic_exe_path = os.path.join(oneshot_dir, 'pyarmor-1shot') + generic_exe_path = os.path.join(oneshot_dir, "pyarmor-1shot") if os.path.exists(generic_exe_path): - logger.info(f'{Fore.GREEN}Using executable: pyarmor-1shot{Style.RESET_ALL}') + logger.info(f"{Fore.GREEN}Using executable: pyarmor-1shot{Style.RESET_ALL}") return generic_exe_path - logger.critical(f'{Fore.RED}Executable {base_exe_name} not found, please build it first or download on https://github.com/Lil-House/Pyarmor-Static-Unpack-1shot/releases {Style.RESET_ALL}') + logger.critical( + f"{Fore.RED}Executable {base_exe_name} not found, please build it first or download on https://github.com/Lil-House/Pyarmor-Static-Unpack-1shot/releases {Style.RESET_ALL}" + ) exit(1) def parse_args(): - parser = argparse.ArgumentParser( - description='Pyarmor Static Unpack 1 Shot Entry') + parser = argparse.ArgumentParser(description="Pyarmor Static Unpack 1 Shot Entry") parser.add_argument( - 'directory', + "directory", help='the "root" directory of obfuscated scripts', type=str, ) parser.add_argument( - '-r', - '--runtime', - help='path to pyarmor_runtime[.pyd|.so|.dylib]', + "-r", + "--runtime", + help="path to pyarmor_runtime[.pyd|.so|.dylib]", type=str, # argparse.FileType('rb'), ) parser.add_argument( - '-o', - '--output-dir', - help='save output files in another directory instead of in-place, with folder structure remain unchanged', + "-o", + "--output-dir", + help="save output files in another directory instead of in-place, with folder structure remain unchanged", type=str, ) parser.add_argument( - '--export-raw-data', - help='save data found in source files as-is', - action='store_true', + "--export-raw-data", + help="save data found in source files as-is", + action="store_true", ) parser.add_argument( - '--show-all', - help='show all pycdc errors and warnings', - action='store_true', + "--show-all", + help="show all pycdc errors and warnings", + action="store_true", ) parser.add_argument( - '--show-err-opcode', - help='show pycdc unsupported opcode errors', - action='store_true', + "--show-err-opcode", + help="show pycdc unsupported opcode errors", + action="store_true", ) parser.add_argument( - '--show-warn-stack', - help='show pycdc stack related warnings', - action='store_true', + "--show-warn-stack", + help="show pycdc stack related warnings", + action="store_true", ) parser.add_argument( - '--concurrent', - help='number of concurrent deobfuscation processes (default: 4)', + "--concurrent", + help="number of concurrent handling processes (default: 4)", type=int, default=4, ) parser.add_argument( - '-e', - '--executable', - help='path to the pyarmor-1shot executable to use', + "-e", + "--executable", + help="path to the pyarmor-1shot executable to use", type=str, ) return parser.parse_args() @@ -273,11 +380,11 @@ def main(): args = parse_args() logging.basicConfig( level=logging.INFO, - format='%(levelname)-8s %(asctime)-28s %(message)s', + format="%(levelname)-8s %(asctime)-28s %(message)s", ) - logger = logging.getLogger('shot') + logger = logging.getLogger("shot") - print(Fore.CYAN + r''' + print(rf"""{Fore.CYAN} ____ ____ ( __ ) ( __ ) | |~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~| | @@ -288,11 +395,11 @@ def main(): | | |_| \_, |\__,_|_| |_||_||_|\___/|_| |_|___/|_||_|\___/ \__| | | | | |__/ | | |__|~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~|__| -(____) v0.2.1 (____) +(____) v0.2.1+ (____) For technology exchange only. Use at your own risk. GitHub: https://github.com/Lil-House/Pyarmor-Static-Unpack-1shot -''' + Style.RESET_ALL) +{Style.RESET_ALL}""") if args.runtime: specified_runtime = RuntimeInfo(args.runtime) @@ -300,71 +407,89 @@ def main(): runtimes = {specified_runtime.serial_number: specified_runtime} else: specified_runtime = None - runtimes = {'000000': RuntimeInfo.default()} - - sequences: List[Tuple[str, bytes]] = [] + runtimes = {"000000": RuntimeInfo.default()} if args.output_dir and not os.path.exists(args.output_dir): os.makedirs(args.output_dir) if args.output_dir and not os.path.isdir(args.output_dir): - logger.error(f'{Fore.RED}Cannot use {repr(args.output_dir)} as output directory{Style.RESET_ALL}') + logger.error( + f"{Fore.RED}Cannot use {repr(args.output_dir)} as output directory{Style.RESET_ALL}" + ) return + # Note for path handling: + # args.output_dir: is either None or an existing directory path, can be absolute or relative + # args.directory: before calling `decrypt_process`, it is an existing directory path, can be absolute or relative + # paths in `sequences`: must be relative file paths, without trailing slashes, can exist or not + if os.path.isfile(args.directory): + single_file_path = os.path.abspath(args.directory) + args.directory = os.path.dirname(single_file_path) + relative_path = os.path.basename(single_file_path) if specified_runtime is None: - logger.error(f'{Fore.RED}Please specify `pyarmor_runtime` file by `-r` if input is a file{Style.RESET_ALL}') + logger.error( + f"{Fore.RED}Please specify `pyarmor_runtime` file by `-r` if input is a file{Style.RESET_ALL}" + ) return - logger.info(f'{Fore.CYAN}Single file mode{Style.RESET_ALL}') - result = detect_process(args.directory, args.directory) - if result is None: - logger.error(f'{Fore.RED}No armored data found{Style.RESET_ALL}') + logger.info(f"{Fore.CYAN}Single file mode{Style.RESET_ALL}") + single_file_sequences = detect_process(single_file_path, relative_path) + if single_file_sequences is None: + logger.error(f"{Fore.RED}No armored data found{Style.RESET_ALL}") return - sequences.extend(result) - decrypt_process(runtimes, sequences, args) + decrypt_process(runtimes, single_file_sequences, args) return # single file mode ends here + sequences: List[Tuple[str, bytes]] = [] + dir_path: str dirs: List[str] files: List[str] for dir_path, dirs, files in os.walk(args.directory, followlinks=False): - if '.no1shot' in files: - logger.info(f'{Fore.YELLOW}Skipping {dir_path} because of `.no1shot`{Style.RESET_ALL}') + if ".no1shot" in files: + logger.info( + f"{Fore.YELLOW}Skipping {dir_path} because of `.no1shot`{Style.RESET_ALL}" + ) dirs.clear() files.clear() continue - for d in ['__pycache__', 'site-packages']: + for d in ["__pycache__", "site-packages"]: if d in dirs: dirs.remove(d) for file_name in files: - if '.1shot.' in file_name: + if ".1shot." in file_name: continue file_path = os.path.join(dir_path, file_name) relative_path = os.path.relpath(file_path, args.directory) - if file_name.endswith('.pyz'): - with open(file_path, 'rb') as f: + if file_name.endswith(".pyz"): + with open(file_path, "rb") as f: head = f.read(16 * 1024 * 1024) - if b'PY00' in head \ - and (not os.path.exists(file_path + '_extracted') - or len(os.listdir(file_path + '_extracted')) == 0): + if b"PY00" in head and ( + not os.path.exists(file_path + "_extracted") + or len(os.listdir(file_path + "_extracted")) == 0 + ): logger.error( - f'{Fore.RED}A PYZ file containing armored data is detected, but the PYZ file has not been extracted by other tools. This error is not a problem with this tool. If the folder is extracted by Pyinstxtractor, please read the output information of Pyinstxtractor carefully. ({relative_path}){Style.RESET_ALL}') + f"{Fore.RED}A PYZ file containing armored data is detected, but the PYZ file has not been extracted by other tools. This error is not a problem with this tool. If the folder is extracted by Pyinstxtractor, please read the output information of Pyinstxtractor carefully. ({relative_path}){Style.RESET_ALL}" + ) continue # is pyarmor_runtime? - if specified_runtime is None \ - and file_name.startswith('pyarmor_runtime') \ - and file_name.endswith(('.pyd', '.so', '.dylib')): + if ( + specified_runtime is None + and file_name.startswith("pyarmor_runtime") + and file_name.endswith((".pyd", ".so", ".dylib")) + ): try: new_runtime = RuntimeInfo(file_path) runtimes[new_runtime.serial_number] = new_runtime logger.info( - f'{Fore.GREEN}Found new runtime: {new_runtime.serial_number} ({file_path}){Style.RESET_ALL}') + f"{Fore.GREEN}Found new runtime: {new_runtime.serial_number} ({file_path}){Style.RESET_ALL}" + ) print(new_runtime) continue - except: + except Exception: pass result = detect_process(file_path, relative_path) @@ -372,13 +497,13 @@ def main(): sequences.extend(result) if not runtimes: - logger.error(f'{Fore.RED}No runtime found{Style.RESET_ALL}') + logger.error(f"{Fore.RED}No runtime found{Style.RESET_ALL}") return if not sequences: - logger.error(f'{Fore.RED}No armored data found{Style.RESET_ALL}') + logger.error(f"{Fore.RED}No armored data found{Style.RESET_ALL}") return decrypt_process(runtimes, sequences, args) -if __name__ == '__main__': +if __name__ == "__main__": main()