From 4363ff705c361f69b0f91aee11b804ad7aaf09e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=AC=E5=85=89?= <121670274+b3nguang@users.noreply.github.com> Date: Sun, 6 Apr 2025 00:40:47 +0800 Subject: [PATCH] feat: multi process (GH-6) --- helpers/shot.py | 146 +++++++++++++++++++++++++++--------------------- 1 file changed, 83 insertions(+), 63 deletions(-) diff --git a/helpers/shot.py b/helpers/shot.py index b5f8670..59cec1b 100644 --- a/helpers/shot.py +++ b/helpers/shot.py @@ -1,14 +1,15 @@ import argparse -from Crypto.Cipher import AES import logging +import multiprocessing import os import subprocess +from multiprocessing import Pool from typing import Dict, List, Tuple +from Crypto.Cipher import AES from detect import detect_process from runtime import RuntimeInfo - SUBPROCESS_TIMEOUT = 30 @@ -17,75 +18,94 @@ def general_aes_ctr_decrypt(data: bytes, key: bytes, nonce: bytes) -> bytes: return cipher.decrypt(data) +def decrypt_single_file(args_tuple): + path, data, runtime, args, output_dir = args_tuple + logger = logging.getLogger('shot') + try: + serial_number = data[2:8].decode('utf-8') + logger.info(f'Decrypting: {serial_number} ({path})') + + dest_path = os.path.join(output_dir, path) if output_dir else path + dest_dir = os.path.dirname(dest_path) + if not os.path.exists(dest_dir): + os.makedirs(dest_dir) + + if args.export_raw_data: + with open(dest_path + '.1shot.raw', 'wb') as f: + f.write(data) + + cipher_text_offset = int.from_bytes(data[28:32], 'little') + cipher_text_length = int.from_bytes(data[32:36], 'little') + nonce = data[36:40] + data[44:52] + with open(dest_path + '.1shot.seq', 'wb') as f: + f.write(b'\xa1' + runtime.runtime_aes_key) + f.write(b'\xa2' + runtime.mix_str_aes_nonce()) + f.write(b'\xf0\xff') + f.write(data[:cipher_text_offset]) + f.write(general_aes_ctr_decrypt(data[cipher_text_offset : cipher_text_offset + cipher_text_length], runtime.runtime_aes_key, nonce)) + f.write(data[cipher_text_offset + cipher_text_length :]) + + exe_name = 'pyarmor-1shot.exe' if os.name == 'nt' else 'pyarmor-1shot' + exe_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), exe_name) + + sp = subprocess.run( + [ + exe_path, + dest_path + '.1shot.seq', + ], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + timeout=SUBPROCESS_TIMEOUT, + ) + stdout = sp.stdout.decode('latin-1').splitlines() + stderr = sp.stderr.decode('latin-1').splitlines() + for line in stdout: + logger.warning(f'PYCDC: {line} ({path})') + for line in stderr: + if line.startswith( + ( + 'Warning: Stack history is empty', + 'Warning: Stack history is not empty', + 'Warning: block stack is not empty', + ) + ): + if args.show_warn_stack or args.show_all: + logger.warning(f'PYCDC: {line} ({path})') + elif line.startswith('Unsupported opcode:'): + if args.show_err_opcode or args.show_all: + logger.error(f'PYCDC: {line} ({path})') + elif line.startswith('Something TERRIBLE happened'): + if args.show_all: + logger.error(f'PYCDC: {line} ({path})') + else: + logger.error(f'PYCDC: {line} ({path})') + if sp.returncode != 0: + logger.warning(f'PYCDC returned {sp.returncode} ({path})') + return False + return True + except Exception as e: + logger.error(f'Decrypt failed: {e} ({path})') + return False + + def decrypt_process(runtimes: Dict[str, RuntimeInfo], sequences: List[Tuple[str, bytes]], args): logger = logging.getLogger('shot') output_dir: str = args.output_dir or args.directory + process_args = [] for path, data in sequences: try: serial_number = data[2:8].decode('utf-8') runtime = runtimes[serial_number] - logger.info(f'Decrypting: {serial_number} ({path})') - - dest_path = os.path.join(output_dir, path) if output_dir else path - dest_dir = os.path.dirname(dest_path) - if not os.path.exists(dest_dir): - os.makedirs(dest_dir) - - if args.export_raw_data: - with open(dest_path + '.1shot.raw', 'wb') as f: - f.write(data) - - cipher_text_offset = int.from_bytes(data[28:32], 'little') - cipher_text_length = int.from_bytes(data[32:36], 'little') - nonce = data[36:40] + data[44:52] - with open(dest_path + '.1shot.seq', 'wb') as f: - f.write(b'\xa1' + runtime.runtime_aes_key) - f.write(b'\xa2' + runtime.mix_str_aes_nonce()) - f.write(b'\xf0\xff') - f.write(data[:cipher_text_offset]) - f.write(general_aes_ctr_decrypt( - data[cipher_text_offset:cipher_text_offset+cipher_text_length], runtime.runtime_aes_key, nonce)) - f.write(data[cipher_text_offset+cipher_text_length:]) - - exe_name = 'pyarmor-1shot.exe' if os.name == 'nt' else 'pyarmor-1shot' - exe_path = os.path.join( - os.path.dirname(os.path.abspath(__file__)), exe_name) - # TODO: multi process - sp = subprocess.run( - [ - exe_path, - dest_path + '.1shot.seq', - ], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - timeout=SUBPROCESS_TIMEOUT, - ) - stdout = sp.stdout.decode('latin-1').splitlines() - stderr = sp.stderr.decode('latin-1').splitlines() - for line in stdout: - logger.warning(f'PYCDC: {line} ({path})') - for line in stderr: - if line.startswith(( - 'Warning: Stack history is empty', - 'Warning: Stack history is not empty', - 'Warning: block stack is not empty', - )): - if args.show_warn_stack or args.show_all: - logger.warning(f'PYCDC: {line} ({path})') - elif line.startswith('Unsupported opcode:'): - if args.show_err_opcode or args.show_all: - logger.error(f'PYCDC: {line} ({path})') - elif line.startswith('Something TERRIBLE happened'): - if args.show_all: - logger.error(f'PYCDC: {line} ({path})') - else: - logger.error(f'PYCDC: {line} ({path})') - if sp.returncode != 0: - logger.warning(f'PYCDC returned {sp.returncode} ({path})') - continue + process_args.append((path, data, runtime, args, output_dir)) except Exception as e: - logger.error(f'Decrypt failed: {e} ({path})') - continue + logger.error(f'Failed to prepare file for decryption: {e} ({path})') + num_processes = min(args.processes if hasattr(args, 'processes') else multiprocessing.cpu_count(), len(process_args)) + logger.info(f'Starting decryption with {num_processes} processes for {len(process_args)} files') + + with Pool(processes=num_processes) as pool: + results = pool.map(decrypt_single_file, process_args) + success_count = sum(1 for result in results if result) + logger.info(f'Decryption completed: {success_count} succeeded, {len(process_args) - success_count} failed') def parse_args():