feat: multi process (GH-6)

This commit is contained in:
本光
2025-04-06 00:40:47 +08:00
committed by GitHub
parent 352d14ec82
commit 4363ff705c

View File

@@ -1,14 +1,15 @@
import argparse
from Crypto.Cipher import AES
import logging
import multiprocessing
import os
import subprocess
from multiprocessing import Pool
from typing import Dict, List, Tuple
from Crypto.Cipher import AES
from detect import detect_process
from runtime import RuntimeInfo
SUBPROCESS_TIMEOUT = 30
@@ -17,75 +18,94 @@ def general_aes_ctr_decrypt(data: bytes, key: bytes, nonce: bytes) -> bytes:
return cipher.decrypt(data)
def decrypt_single_file(args_tuple):
path, data, runtime, args, output_dir = args_tuple
logger = logging.getLogger('shot')
try:
serial_number = data[2:8].decode('utf-8')
logger.info(f'Decrypting: {serial_number} ({path})')
dest_path = os.path.join(output_dir, path) if output_dir else path
dest_dir = os.path.dirname(dest_path)
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
if args.export_raw_data:
with open(dest_path + '.1shot.raw', 'wb') as f:
f.write(data)
cipher_text_offset = int.from_bytes(data[28:32], 'little')
cipher_text_length = int.from_bytes(data[32:36], 'little')
nonce = data[36:40] + data[44:52]
with open(dest_path + '.1shot.seq', 'wb') as f:
f.write(b'\xa1' + runtime.runtime_aes_key)
f.write(b'\xa2' + runtime.mix_str_aes_nonce())
f.write(b'\xf0\xff')
f.write(data[:cipher_text_offset])
f.write(general_aes_ctr_decrypt(data[cipher_text_offset : cipher_text_offset + cipher_text_length], runtime.runtime_aes_key, nonce))
f.write(data[cipher_text_offset + cipher_text_length :])
exe_name = 'pyarmor-1shot.exe' if os.name == 'nt' else 'pyarmor-1shot'
exe_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), exe_name)
sp = subprocess.run(
[
exe_path,
dest_path + '.1shot.seq',
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=SUBPROCESS_TIMEOUT,
)
stdout = sp.stdout.decode('latin-1').splitlines()
stderr = sp.stderr.decode('latin-1').splitlines()
for line in stdout:
logger.warning(f'PYCDC: {line} ({path})')
for line in stderr:
if line.startswith(
(
'Warning: Stack history is empty',
'Warning: Stack history is not empty',
'Warning: block stack is not empty',
)
):
if args.show_warn_stack or args.show_all:
logger.warning(f'PYCDC: {line} ({path})')
elif line.startswith('Unsupported opcode:'):
if args.show_err_opcode or args.show_all:
logger.error(f'PYCDC: {line} ({path})')
elif line.startswith('Something TERRIBLE happened'):
if args.show_all:
logger.error(f'PYCDC: {line} ({path})')
else:
logger.error(f'PYCDC: {line} ({path})')
if sp.returncode != 0:
logger.warning(f'PYCDC returned {sp.returncode} ({path})')
return False
return True
except Exception as e:
logger.error(f'Decrypt failed: {e} ({path})')
return False
def decrypt_process(runtimes: Dict[str, RuntimeInfo], sequences: List[Tuple[str, bytes]], args):
logger = logging.getLogger('shot')
output_dir: str = args.output_dir or args.directory
process_args = []
for path, data in sequences:
try:
serial_number = data[2:8].decode('utf-8')
runtime = runtimes[serial_number]
logger.info(f'Decrypting: {serial_number} ({path})')
dest_path = os.path.join(output_dir, path) if output_dir else path
dest_dir = os.path.dirname(dest_path)
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
if args.export_raw_data:
with open(dest_path + '.1shot.raw', 'wb') as f:
f.write(data)
cipher_text_offset = int.from_bytes(data[28:32], 'little')
cipher_text_length = int.from_bytes(data[32:36], 'little')
nonce = data[36:40] + data[44:52]
with open(dest_path + '.1shot.seq', 'wb') as f:
f.write(b'\xa1' + runtime.runtime_aes_key)
f.write(b'\xa2' + runtime.mix_str_aes_nonce())
f.write(b'\xf0\xff')
f.write(data[:cipher_text_offset])
f.write(general_aes_ctr_decrypt(
data[cipher_text_offset:cipher_text_offset+cipher_text_length], runtime.runtime_aes_key, nonce))
f.write(data[cipher_text_offset+cipher_text_length:])
exe_name = 'pyarmor-1shot.exe' if os.name == 'nt' else 'pyarmor-1shot'
exe_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)), exe_name)
# TODO: multi process
sp = subprocess.run(
[
exe_path,
dest_path + '.1shot.seq',
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=SUBPROCESS_TIMEOUT,
)
stdout = sp.stdout.decode('latin-1').splitlines()
stderr = sp.stderr.decode('latin-1').splitlines()
for line in stdout:
logger.warning(f'PYCDC: {line} ({path})')
for line in stderr:
if line.startswith((
'Warning: Stack history is empty',
'Warning: Stack history is not empty',
'Warning: block stack is not empty',
)):
if args.show_warn_stack or args.show_all:
logger.warning(f'PYCDC: {line} ({path})')
elif line.startswith('Unsupported opcode:'):
if args.show_err_opcode or args.show_all:
logger.error(f'PYCDC: {line} ({path})')
elif line.startswith('Something TERRIBLE happened'):
if args.show_all:
logger.error(f'PYCDC: {line} ({path})')
else:
logger.error(f'PYCDC: {line} ({path})')
if sp.returncode != 0:
logger.warning(f'PYCDC returned {sp.returncode} ({path})')
continue
process_args.append((path, data, runtime, args, output_dir))
except Exception as e:
logger.error(f'Decrypt failed: {e} ({path})')
continue
logger.error(f'Failed to prepare file for decryption: {e} ({path})')
num_processes = min(args.processes if hasattr(args, 'processes') else multiprocessing.cpu_count(), len(process_args))
logger.info(f'Starting decryption with {num_processes} processes for {len(process_args)} files')
with Pool(processes=num_processes) as pool:
results = pool.map(decrypt_single_file, process_args)
success_count = sum(1 for result in results if result)
logger.info(f'Decryption completed: {success_count} succeeded, {len(process_args) - success_count} failed')
def parse_args():