diff --git a/helpers/shot.py b/helpers/shot.py index 59cec1b..6f1ae66 100644 --- a/helpers/shot.py +++ b/helpers/shot.py @@ -1,16 +1,21 @@ import argparse +from Crypto.Cipher import AES import logging -import multiprocessing import os import subprocess -from multiprocessing import Pool +import sys +import time +import asyncio +import traceback from typing import Dict, List, Tuple +from colorama import init, Fore, Style -from Crypto.Cipher import AES from detect import detect_process from runtime import RuntimeInfo -SUBPROCESS_TIMEOUT = 30 + +# Initialize colorama +init(autoreset=True) def general_aes_ctr_decrypt(data: bytes, key: bytes, nonce: bytes) -> bytes: @@ -18,57 +23,68 @@ def general_aes_ctr_decrypt(data: bytes, key: bytes, nonce: bytes) -> bytes: return cipher.decrypt(data) -def decrypt_single_file(args_tuple): - path, data, runtime, args, output_dir = args_tuple +def bcc_fallback_method(seq_file_path: str, output_path: str = None): + # Fallback method for BCC mode deobfuscation + logger = logging.getLogger('shot') + logger.info(f'{Fore.YELLOW}Attempting BCC fallback method for: {seq_file_path}{Style.RESET_ALL}') + + if output_path is None: + output_path = seq_file_path + '.back.1shot.seq' + + try: + with open(seq_file_path, 'rb') as f: + origin = f.read() + + one_shot_header = origin[:32] # Header format + aes_key = one_shot_header[1:17] + + bcc_part_length = int.from_bytes(origin[0x58:0x5C], 'little') # If it is 0, it is not BCC part but bytecode part + bytecode_part = origin[32+bcc_part_length:] + aes_nonce = bytecode_part[36:40] + bytecode_part[44:52] # The same position as non-BCC file + + with open(output_path, 'wb') as f: + f.write(one_shot_header) + f.write(bytecode_part[:64]) + f.write(AES.new(aes_key, AES.MODE_CTR, nonce=aes_nonce, initial_value=2).decrypt(bytecode_part[64:])) + + logger.info(f'{Fore.GREEN}Successfully created BCC fallback file: {output_path}{Style.RESET_ALL}') + return output_path + except Exception as e: + error_details = traceback.format_exc() + logger.error(f'{Fore.RED}BCC fallback method failed: {e}{Style.RESET_ALL}') + logger.error(f'{Fore.RED}Error details: {error_details}{Style.RESET_ALL}') + return None + + +async def run_subprocess_async(cmd, cwd=None): + process = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + cwd=cwd + ) + stdout, stderr = await process.communicate() + return process.returncode, stdout, stderr + + +async def decrypt_file_async(exe_path, seq_file_path, path, args): logger = logging.getLogger('shot') try: - serial_number = data[2:8].decode('utf-8') - logger.info(f'Decrypting: {serial_number} ({path})') - - dest_path = os.path.join(output_dir, path) if output_dir else path - dest_dir = os.path.dirname(dest_path) - if not os.path.exists(dest_dir): - os.makedirs(dest_dir) - - if args.export_raw_data: - with open(dest_path + '.1shot.raw', 'wb') as f: - f.write(data) - - cipher_text_offset = int.from_bytes(data[28:32], 'little') - cipher_text_length = int.from_bytes(data[32:36], 'little') - nonce = data[36:40] + data[44:52] - with open(dest_path + '.1shot.seq', 'wb') as f: - f.write(b'\xa1' + runtime.runtime_aes_key) - f.write(b'\xa2' + runtime.mix_str_aes_nonce()) - f.write(b'\xf0\xff') - f.write(data[:cipher_text_offset]) - f.write(general_aes_ctr_decrypt(data[cipher_text_offset : cipher_text_offset + cipher_text_length], runtime.runtime_aes_key, nonce)) - f.write(data[cipher_text_offset + cipher_text_length :]) - - exe_name = 'pyarmor-1shot.exe' if os.name == 'nt' else 'pyarmor-1shot' - exe_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), exe_name) - - sp = subprocess.run( - [ - exe_path, - dest_path + '.1shot.seq', - ], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - timeout=SUBPROCESS_TIMEOUT, - ) - stdout = sp.stdout.decode('latin-1').splitlines() - stderr = sp.stderr.decode('latin-1').splitlines() - for line in stdout: + # Run without timeout + returncode, stdout, stderr = await run_subprocess_async([exe_path, seq_file_path]) + + stdout_lines = stdout.decode('latin-1').splitlines() + stderr_lines = stderr.decode('latin-1').splitlines() + + for line in stdout_lines: logger.warning(f'PYCDC: {line} ({path})') - for line in stderr: - if line.startswith( - ( - 'Warning: Stack history is empty', - 'Warning: Stack history is not empty', - 'Warning: block stack is not empty', - ) - ): + + for line in stderr_lines: + if line.startswith(( + 'Warning: Stack history is empty', + 'Warning: Stack history is not empty', + 'Warning: block stack is not empty', + )): if args.show_warn_stack or args.show_all: logger.warning(f'PYCDC: {line} ({path})') elif line.startswith('Unsupported opcode:'): @@ -79,33 +95,114 @@ def decrypt_single_file(args_tuple): logger.error(f'PYCDC: {line} ({path})') else: logger.error(f'PYCDC: {line} ({path})') - if sp.returncode != 0: - logger.warning(f'PYCDC returned {sp.returncode} ({path})') - return False - return True + + return returncode, stdout_lines, stderr_lines except Exception as e: - logger.error(f'Decrypt failed: {e} ({path})') - return False + error_details = traceback.format_exc() + logger.error(f'{Fore.RED}Error during async deobfuscation: {e}{Style.RESET_ALL}') + logger.error(f'{Fore.RED}Error details: {error_details}{Style.RESET_ALL}') + return -1, [], [] + + +async def decrypt_process_async(runtimes: Dict[str, RuntimeInfo], sequences: List[Tuple[str, bytes]], args): + logger = logging.getLogger('shot') + output_dir: str = args.output_dir or args.directory + + # Create a semaphore to limit concurrent processes + semaphore = asyncio.Semaphore(args.concurrent) # Use the concurrent argument + + async def process_file(path, data): + async with semaphore: + try: + serial_number = data[2:8].decode('utf-8') + runtime = runtimes[serial_number] + logger.info(f'{Fore.CYAN}Decrypting: {serial_number} ({path}){Style.RESET_ALL}') + + dest_path = os.path.join(output_dir, path) if output_dir else path + dest_dir = os.path.dirname(dest_path) + if not os.path.exists(dest_dir): + os.makedirs(dest_dir) + + if args.export_raw_data: + with open(dest_path + '.1shot.raw', 'wb') as f: + f.write(data) + + cipher_text_offset = int.from_bytes(data[28:32], 'little') + cipher_text_length = int.from_bytes(data[32:36], 'little') + nonce = data[36:40] + data[44:52] + seq_file_path = dest_path + '.1shot.seq' + with open(seq_file_path, 'wb') as f: + f.write(b'\xa1' + runtime.runtime_aes_key) + f.write(b'\xa2' + runtime.mix_str_aes_nonce()) + f.write(b'\xf0\xff') + f.write(data[:cipher_text_offset]) + f.write(general_aes_ctr_decrypt( + data[cipher_text_offset:cipher_text_offset+cipher_text_length], runtime.runtime_aes_key, nonce)) + f.write(data[cipher_text_offset+cipher_text_length:]) + + exe_name = 'pyarmor-1shot.exe' if os.name == 'nt' else 'pyarmor-1shot' + exe_path = os.path.join( + os.path.dirname(os.path.abspath(__file__)), exe_name) + + # Run without timeout + returncode, stdout_lines, stderr_lines = await decrypt_file_async(exe_path, seq_file_path, path, args) + + # Check for specific errors that indicate BCC mode + should_try_bcc = False + error_message = "" + + if returncode != 0: + error_message = f"PYCDC returned {returncode}" + # Check for specific error patterns that suggest BCC mode + for line in stderr_lines: + if ("Unsupported opcode" in line or + "Something TERRIBLE happened" in line or + "Unknown opcode 0" in line or + "Got unsupported type" in line): + should_try_bcc = True + error_message += f" - {line}" + break + + if should_try_bcc: + logger.warning(f'{Fore.YELLOW}{error_message} ({path}) - Attempting BCC fallback{Style.RESET_ALL}') + # Try BCC fallback method + bcc_file_path = bcc_fallback_method(seq_file_path) + if bcc_file_path: + logger.info(f'{Fore.GREEN}Running deobfuscator on BCC fallback file{Style.RESET_ALL}') + try: + # Run without timeout + returncode, stdout_lines, stderr_lines = await decrypt_file_async(exe_path, bcc_file_path, path, args) + if returncode == 0: + logger.info(f'{Fore.GREEN}Successfully deobfuscated using BCC fallback method{Style.RESET_ALL}') + print(f"{Fore.GREEN} BCC Decrypted: {path}{Style.RESET_ALL}") + else: + logger.error(f'{Fore.RED}BCC fallback deobfuscation failed with return code {returncode}{Style.RESET_ALL}') + for line in stderr_lines: + logger.error(f'{Fore.RED}BCC Error: {line}{Style.RESET_ALL}') + except Exception as e: + error_details = traceback.format_exc() + logger.error(f'{Fore.RED}BCC fallback deobfuscation failed with error: {e}{Style.RESET_ALL}') + logger.error(f'{Fore.RED}Error details: {error_details}{Style.RESET_ALL}') + elif returncode == 0: + # Successfully decrypted + logger.info(f'{Fore.GREEN}Successfully decrypted: {path}{Style.RESET_ALL}') + print(f"{Fore.GREEN} Decrypted: {path}{Style.RESET_ALL}") + else: + logger.warning(f'{Fore.YELLOW}{error_message} ({path}){Style.RESET_ALL}') + except Exception as e: + error_details = traceback.format_exc() + logger.error(f'{Fore.RED}Decrypt failed: {e} ({path}){Style.RESET_ALL}') + logger.error(f'{Fore.RED}Error details: {error_details}{Style.RESET_ALL}') + + # Create tasks for all files + tasks = [process_file(path, data) for path, data in sequences] + + # Run all tasks concurrently + await asyncio.gather(*tasks) def decrypt_process(runtimes: Dict[str, RuntimeInfo], sequences: List[Tuple[str, bytes]], args): - logger = logging.getLogger('shot') - output_dir: str = args.output_dir or args.directory - process_args = [] - for path, data in sequences: - try: - serial_number = data[2:8].decode('utf-8') - runtime = runtimes[serial_number] - process_args.append((path, data, runtime, args, output_dir)) - except Exception as e: - logger.error(f'Failed to prepare file for decryption: {e} ({path})') - num_processes = min(args.processes if hasattr(args, 'processes') else multiprocessing.cpu_count(), len(process_args)) - logger.info(f'Starting decryption with {num_processes} processes for {len(process_args)} files') - - with Pool(processes=num_processes) as pool: - results = pool.map(decrypt_single_file, process_args) - success_count = sum(1 for result in results if result) - logger.info(f'Decryption completed: {success_count} succeeded, {len(process_args) - success_count} failed') + asyncio.run(decrypt_process_async(runtimes, sequences, args)) def parse_args(): @@ -115,6 +212,7 @@ def parse_args(): 'directory', help='the "root" directory of obfuscated scripts', type=str, + nargs='?', ) parser.add_argument( '-r', @@ -148,9 +246,55 @@ def parse_args(): help='show pycdc stack related warnings', action='store_true', ) + parser.add_argument( + '--menu', + help='show interactive menu to select folder to unpack', + action='store_true', + ) + parser.add_argument( + '--concurrent', + help='number of concurrent deobfuscation processes (default: 4)', + type=int, + default=4, + ) return parser.parse_args() +def display_menu(): + to_unpack_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'to_unpack') + + if not os.path.exists(to_unpack_dir): + os.makedirs(to_unpack_dir) + + folders = [d for d in os.listdir(to_unpack_dir) + if os.path.isdir(os.path.join(to_unpack_dir, d))] + + if not folders: + print(f"{Fore.YELLOW}No folders found in {to_unpack_dir}{Style.RESET_ALL}") + return None + + print(f"\n{Fore.CYAN}=== Available Folders to Unpack ==={Style.RESET_ALL}") + for i, folder in enumerate(folders, 1): + print(f"{Fore.GREEN}[{i}]{Style.RESET_ALL} {folder}") + + while True: + try: + choice = input(f"\n{Fore.YELLOW}Enter the number of the folder to unpack (or 'q' to quit): {Style.RESET_ALL}") + if choice.lower() == 'q': + return None + + choice_idx = int(choice) - 1 + if 0 <= choice_idx < len(folders): + selected_folder = folders[choice_idx] + full_path = os.path.join(to_unpack_dir, selected_folder) + print(f"{Fore.GREEN}Selected: {selected_folder}{Style.RESET_ALL}") + return full_path + else: + print(f"{Fore.RED}Invalid choice. Please enter a number between 1 and {len(folders)}{Style.RESET_ALL}") + except ValueError: + print(f"{Fore.RED}Please enter a valid number{Style.RESET_ALL}") + + def main(): args = parse_args() logging.basicConfig( @@ -159,8 +303,8 @@ def main(): ) logger = logging.getLogger('shot') - print(r''' - ____ ____ + print(f''' +{Fore.CYAN} ____ ____ ( __ ) ( __ ) | |~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~| | | | ____ _ ___ _ _ | | @@ -174,8 +318,18 @@ def main(): For technology exchange only. Use at your own risk. GitHub: https://github.com/Lil-House/Pyarmor-Static-Unpack-1shot +{Style.RESET_ALL} ''') + # If menu option is selected or no directory is provided, show the menu + if args.menu or not args.directory: + selected_dir = display_menu() + if selected_dir: + args.directory = selected_dir + else: + print(f"{Fore.YELLOW}No directory selected. Exiting.{Style.RESET_ALL}") + return + if args.runtime: specified_runtime = RuntimeInfo(args.runtime) print(specified_runtime) @@ -191,12 +345,12 @@ def main(): if os.path.isfile(args.directory): if specified_runtime is None: - logger.error('Please specify `pyarmor_runtime` file by `-r` if input is a file') + logger.error(f'{Fore.RED}Please specify `pyarmor_runtime` file by `-r` if input is a file{Style.RESET_ALL}') return - logger.info('Single file mode') + logger.info(f'{Fore.CYAN}Single file mode{Style.RESET_ALL}') result = detect_process(args.directory, args.directory) if result is None: - logger.error('No armored data found') + logger.error(f'{Fore.RED}No armored data found{Style.RESET_ALL}') return sequences.extend(result) decrypt_process(runtimes, sequences, args) @@ -207,7 +361,7 @@ def main(): files: List[str] for dir_path, dirs, files in os.walk(args.directory, followlinks=False): if '.no1shot' in files: - logger.info(f'Skipping {dir_path} because of `.no1shot`') + logger.info(f'{Fore.YELLOW}Skipping {dir_path} because of `.no1shot`{Style.RESET_ALL}') dirs.clear() files.clear() continue @@ -228,7 +382,7 @@ def main(): and (not os.path.exists(file_path + '_extracted') or len(os.listdir(file_path + '_extracted')) == 0): logger.error( - f'A PYZ file containing armored data is detected, but the PYZ file has not been extracted by other tools. This error is not a problem with this tool. If the folder is extracted by Pyinstxtractor, please read the output information of Pyinstxtractor carefully. ({relative_path})') + f'{Fore.RED}A PYZ file containing armored data is detected, but the PYZ file has not been extracted by other tools. This error is not a problem with this tool. If the folder is extracted by Pyinstxtractor, please read the output information of Pyinstxtractor carefully. ({relative_path}){Style.RESET_ALL}') continue # is pyarmor_runtime? @@ -239,7 +393,7 @@ def main(): new_runtime = RuntimeInfo(file_path) runtimes[new_runtime.serial_number] = new_runtime logger.info( - f'Found new runtime: {new_runtime.serial_number} ({file_path})') + f'{Fore.GREEN}Found new runtime: {new_runtime.serial_number} ({file_path}){Style.RESET_ALL}') print(new_runtime) continue except: @@ -250,10 +404,10 @@ def main(): sequences.extend(result) if not runtimes: - logger.error('No runtime found') + logger.error(f'{Fore.RED}No runtime found{Style.RESET_ALL}') return if not sequences: - logger.error('No armored data found') + logger.error(f'{Fore.RED}No armored data found{Style.RESET_ALL}') return decrypt_process(runtimes, sequences, args)