Merge pull request GH-9 from Anon0x23/Anon0x23-patch-1

refactor, asyncio, colorful console output, BCC mode fallback, interactive menu
This commit is contained in:
LilRan
2025-04-10 15:59:43 +08:00
committed by GitHub

View File

@@ -1,16 +1,23 @@
import argparse
from Crypto.Cipher import AES
import logging
import multiprocessing
import os
import subprocess
from multiprocessing import Pool
from typing import Dict, List, Tuple
import sys
import time
import asyncio
import traceback
import platform
import glob
from typing import Dict, List, Tuple, Optional
from colorama import init, Fore, Style
from Crypto.Cipher import AES
from detect import detect_process
from runtime import RuntimeInfo
SUBPROCESS_TIMEOUT = 30
# Initialize colorama
init(autoreset=True)
def general_aes_ctr_decrypt(data: bytes, key: bytes, nonce: bytes) -> bytes:
@@ -18,57 +25,68 @@ def general_aes_ctr_decrypt(data: bytes, key: bytes, nonce: bytes) -> bytes:
return cipher.decrypt(data)
def decrypt_single_file(args_tuple):
path, data, runtime, args, output_dir = args_tuple
def bcc_fallback_method(seq_file_path: str, output_path: str = None):
# Fallback method for BCC mode deobfuscation
logger = logging.getLogger('shot')
logger.info(f'{Fore.YELLOW}Attempting BCC fallback method for: {seq_file_path}{Style.RESET_ALL}')
if output_path is None:
output_path = seq_file_path + '.back.1shot.seq'
try:
with open(seq_file_path, 'rb') as f:
origin = f.read()
one_shot_header = origin[:32] # Header format
aes_key = one_shot_header[1:17]
bcc_part_length = int.from_bytes(origin[0x58:0x5C], 'little') # If it is 0, it is not BCC part but bytecode part
bytecode_part = origin[32+bcc_part_length:]
aes_nonce = bytecode_part[36:40] + bytecode_part[44:52] # The same position as non-BCC file
with open(output_path, 'wb') as f:
f.write(one_shot_header)
f.write(bytecode_part[:64])
f.write(AES.new(aes_key, AES.MODE_CTR, nonce=aes_nonce, initial_value=2).decrypt(bytecode_part[64:]))
logger.info(f'{Fore.GREEN}Successfully created BCC fallback file: {output_path}{Style.RESET_ALL}')
return output_path
except Exception as e:
error_details = traceback.format_exc()
logger.error(f'{Fore.RED}BCC fallback method failed: {e}{Style.RESET_ALL}')
logger.error(f'{Fore.RED}Error details: {error_details}{Style.RESET_ALL}')
return None
async def run_subprocess_async(cmd, cwd=None):
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=cwd
)
stdout, stderr = await process.communicate()
return process.returncode, stdout, stderr
async def decrypt_file_async(exe_path, seq_file_path, path, args):
logger = logging.getLogger('shot')
try:
serial_number = data[2:8].decode('utf-8')
logger.info(f'Decrypting: {serial_number} ({path})')
# Run without timeout
returncode, stdout, stderr = await run_subprocess_async([exe_path, seq_file_path])
dest_path = os.path.join(output_dir, path) if output_dir else path
dest_dir = os.path.dirname(dest_path)
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
stdout_lines = stdout.decode('latin-1').splitlines()
stderr_lines = stderr.decode('latin-1').splitlines()
if args.export_raw_data:
with open(dest_path + '.1shot.raw', 'wb') as f:
f.write(data)
cipher_text_offset = int.from_bytes(data[28:32], 'little')
cipher_text_length = int.from_bytes(data[32:36], 'little')
nonce = data[36:40] + data[44:52]
with open(dest_path + '.1shot.seq', 'wb') as f:
f.write(b'\xa1' + runtime.runtime_aes_key)
f.write(b'\xa2' + runtime.mix_str_aes_nonce())
f.write(b'\xf0\xff')
f.write(data[:cipher_text_offset])
f.write(general_aes_ctr_decrypt(data[cipher_text_offset : cipher_text_offset + cipher_text_length], runtime.runtime_aes_key, nonce))
f.write(data[cipher_text_offset + cipher_text_length :])
exe_name = 'pyarmor-1shot.exe' if os.name == 'nt' else 'pyarmor-1shot'
exe_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), exe_name)
sp = subprocess.run(
[
exe_path,
dest_path + '.1shot.seq',
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=SUBPROCESS_TIMEOUT,
)
stdout = sp.stdout.decode('latin-1').splitlines()
stderr = sp.stderr.decode('latin-1').splitlines()
for line in stdout:
for line in stdout_lines:
logger.warning(f'PYCDC: {line} ({path})')
for line in stderr:
if line.startswith(
(
'Warning: Stack history is empty',
'Warning: Stack history is not empty',
'Warning: block stack is not empty',
)
):
for line in stderr_lines:
if line.startswith((
'Warning: Stack history is empty',
'Warning: Stack history is not empty',
'Warning: block stack is not empty',
)):
if args.show_warn_stack or args.show_all:
logger.warning(f'PYCDC: {line} ({path})')
elif line.startswith('Unsupported opcode:'):
@@ -79,33 +97,192 @@ def decrypt_single_file(args_tuple):
logger.error(f'PYCDC: {line} ({path})')
else:
logger.error(f'PYCDC: {line} ({path})')
if sp.returncode != 0:
logger.warning(f'PYCDC returned {sp.returncode} ({path})')
return False
return True
return returncode, stdout_lines, stderr_lines
except Exception as e:
logger.error(f'Decrypt failed: {e} ({path})')
return False
error_details = traceback.format_exc()
logger.error(f'{Fore.RED}Error during async deobfuscation: {e}{Style.RESET_ALL}')
logger.error(f'{Fore.RED}Error details: {error_details}{Style.RESET_ALL}')
return -1, [], []
async def decrypt_process_async(runtimes: Dict[str, RuntimeInfo], sequences: List[Tuple[str, bytes]], args):
logger = logging.getLogger('shot')
output_dir: str = args.output_dir or args.directory
# Create a semaphore to limit concurrent processes
semaphore = asyncio.Semaphore(args.concurrent) # Use the concurrent argument
async def process_file(path, data):
async with semaphore:
try:
serial_number = data[2:8].decode('utf-8')
runtime = runtimes[serial_number]
logger.info(f'{Fore.CYAN}Decrypting: {serial_number} ({path}){Style.RESET_ALL}')
dest_path = os.path.join(output_dir, path) if output_dir else path
dest_dir = os.path.dirname(dest_path)
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
if args.export_raw_data:
with open(dest_path + '.1shot.raw', 'wb') as f:
f.write(data)
cipher_text_offset = int.from_bytes(data[28:32], 'little')
cipher_text_length = int.from_bytes(data[32:36], 'little')
nonce = data[36:40] + data[44:52]
seq_file_path = dest_path + '.1shot.seq'
with open(seq_file_path, 'wb') as f:
f.write(b'\xa1' + runtime.runtime_aes_key)
f.write(b'\xa2' + runtime.mix_str_aes_nonce())
f.write(b'\xf0\xff')
f.write(data[:cipher_text_offset])
f.write(general_aes_ctr_decrypt(
data[cipher_text_offset:cipher_text_offset+cipher_text_length], runtime.runtime_aes_key, nonce))
f.write(data[cipher_text_offset+cipher_text_length:])
# Get the appropriate executable for the current platform
exe_path = get_platform_executable(args)
# Run without timeout
returncode, stdout_lines, stderr_lines = await decrypt_file_async(exe_path, seq_file_path, path, args)
# Check for specific errors that indicate BCC mode
should_try_bcc = False
error_message = ""
if returncode != 0:
error_message = f"PYCDC returned {returncode}"
# Check for specific error patterns that suggest BCC mode
for line in stderr_lines:
if ("Unsupported opcode" in line or
"Something TERRIBLE happened" in line or
"Unknown opcode 0" in line or
"Got unsupported type" in line):
should_try_bcc = True
error_message += f" - {line}"
break
if should_try_bcc:
logger.warning(f'{Fore.YELLOW}{error_message} ({path}) - Attempting BCC fallback{Style.RESET_ALL}')
# Try BCC fallback method
bcc_file_path = bcc_fallback_method(seq_file_path)
if bcc_file_path:
logger.info(f'{Fore.GREEN}Running deobfuscator on BCC fallback file{Style.RESET_ALL}')
try:
# Run without timeout
returncode, stdout_lines, stderr_lines = await decrypt_file_async(exe_path, bcc_file_path, path, args)
if returncode == 0:
logger.info(f'{Fore.GREEN}Successfully deobfuscated using BCC fallback method{Style.RESET_ALL}')
print(f"{Fore.GREEN} BCC Decrypted: {path}{Style.RESET_ALL}")
else:
logger.error(f'{Fore.RED}BCC fallback deobfuscation failed with return code {returncode}{Style.RESET_ALL}')
for line in stderr_lines:
logger.error(f'{Fore.RED}BCC Error: {line}{Style.RESET_ALL}')
except Exception as e:
error_details = traceback.format_exc()
logger.error(f'{Fore.RED}BCC fallback deobfuscation failed with error: {e}{Style.RESET_ALL}')
logger.error(f'{Fore.RED}Error details: {error_details}{Style.RESET_ALL}')
elif returncode == 0:
# Successfully decrypted
logger.info(f'{Fore.GREEN}Successfully decrypted: {path}{Style.RESET_ALL}')
print(f"{Fore.GREEN} Decrypted: {path}{Style.RESET_ALL}")
else:
logger.warning(f'{Fore.YELLOW}{error_message} ({path}){Style.RESET_ALL}')
except Exception as e:
error_details = traceback.format_exc()
logger.error(f'{Fore.RED}Decrypt failed: {e} ({path}){Style.RESET_ALL}')
logger.error(f'{Fore.RED}Error details: {error_details}{Style.RESET_ALL}')
# Create tasks for all files
tasks = [process_file(path, data) for path, data in sequences]
# Run all tasks concurrently
await asyncio.gather(*tasks)
def decrypt_process(runtimes: Dict[str, RuntimeInfo], sequences: List[Tuple[str, bytes]], args):
logger = logging.getLogger('shot')
output_dir: str = args.output_dir or args.directory
process_args = []
for path, data in sequences:
try:
serial_number = data[2:8].decode('utf-8')
runtime = runtimes[serial_number]
process_args.append((path, data, runtime, args, output_dir))
except Exception as e:
logger.error(f'Failed to prepare file for decryption: {e} ({path})')
num_processes = min(args.processes if hasattr(args, 'processes') else multiprocessing.cpu_count(), len(process_args))
logger.info(f'Starting decryption with {num_processes} processes for {len(process_args)} files')
asyncio.run(decrypt_process_async(runtimes, sequences, args))
with Pool(processes=num_processes) as pool:
results = pool.map(decrypt_single_file, process_args)
success_count = sum(1 for result in results if result)
logger.info(f'Decryption completed: {success_count} succeeded, {len(process_args) - success_count} failed')
def get_platform_executable(args) -> str:
"""
Get the appropriate executable for the current platform
"""
logger = logging.getLogger('shot')
# If a specific executable is provided, use it
if args.executable:
if os.path.exists(args.executable):
logger.info(f'{Fore.GREEN}Using specified executable: {args.executable}{Style.RESET_ALL}')
return args.executable
else:
logger.warning(f'{Fore.YELLOW}Specified executable not found: {args.executable}{Style.RESET_ALL}')
# Determine the current platform
system = platform.system().lower()
machine = platform.machine().lower()
# Map platform to executable name
platform_map = {
'windows': 'pyarmor-1shot.exe',
'linux': 'pyarmor-1shot',
'darwin': 'pyarmor-1shot' # macOS
}
# Get the base executable name for the current platform
base_exe_name = platform_map.get(system, 'pyarmor-1shot')
# Check for architecture-specific executables
arch_specific_exe = f'pyarmor-1shot-{system}-{machine}'
if system == 'windows':
arch_specific_exe += '.exe'
# Look for executables in the helpers directory
helpers_dir = os.path.dirname(os.path.abspath(__file__))
# First check for architecture-specific executable
arch_exe_path = os.path.join(helpers_dir, arch_specific_exe)
if os.path.exists(arch_exe_path):
logger.info(f'{Fore.GREEN}Using architecture-specific executable: {arch_specific_exe}{Style.RESET_ALL}')
return arch_exe_path
# Then check for platform-specific executable
platform_exe_path = os.path.join(helpers_dir, base_exe_name)
if os.path.exists(platform_exe_path):
logger.info(f'{Fore.GREEN}Using platform-specific executable: {base_exe_name}{Style.RESET_ALL}')
return platform_exe_path
# Finally, check for generic executable
generic_exe_path = os.path.join(helpers_dir, 'pyarmor-1shot')
if os.path.exists(generic_exe_path):
logger.info(f'{Fore.GREEN}Using generic executable: pyarmor-1shot{Style.RESET_ALL}')
return generic_exe_path
# If no executable is found, use the default name
logger.warning(f'{Fore.YELLOW}No specific executable found for {system}-{machine}, using default: {base_exe_name}{Style.RESET_ALL}')
return os.path.join(helpers_dir, base_exe_name)
def find_runtime_files(directory: str) -> List[str]:
"""
Find all pyarmor_runtime files in the given directory and its subdirectories
"""
runtime_files = []
# Common runtime file patterns
patterns = [
'pyarmor_runtime*.pyd', # Windows
'pyarmor_runtime*.so', # Linux
'pyarmor_runtime*.dylib' # macOS
]
for pattern in patterns:
for file_path in glob.glob(os.path.join(directory, '**', pattern), recursive=True):
runtime_files.append(file_path)
return runtime_files
def parse_args():
@@ -115,6 +292,7 @@ def parse_args():
'directory',
help='the "root" directory of obfuscated scripts',
type=str,
nargs='?',
)
parser.add_argument(
'-r',
@@ -148,9 +326,66 @@ def parse_args():
help='show pycdc stack related warnings',
action='store_true',
)
parser.add_argument(
'--menu',
help='show interactive menu to select folder to unpack',
action='store_true',
)
parser.add_argument(
'--concurrent',
help='number of concurrent deobfuscation processes (default: 4)',
type=int,
default=4,
)
parser.add_argument(
'-e',
'--executable',
help='path to the pyarmor-1shot executable to use',
type=str,
)
parser.add_argument(
'--auto-detect-runtime',
help='automatically detect and use all pyarmor_runtime files in the directory',
action='store_true',
)
return parser.parse_args()
def display_menu():
to_unpack_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'to_unpack')
if not os.path.exists(to_unpack_dir):
os.makedirs(to_unpack_dir)
folders = [d for d in os.listdir(to_unpack_dir)
if os.path.isdir(os.path.join(to_unpack_dir, d))]
if not folders:
print(f"{Fore.YELLOW}No folders found in {to_unpack_dir}{Style.RESET_ALL}")
return None
print(f"\n{Fore.CYAN}=== Available Folders to Unpack ==={Style.RESET_ALL}")
for i, folder in enumerate(folders, 1):
print(f"{Fore.GREEN}[{i}]{Style.RESET_ALL} {folder}")
while True:
try:
choice = input(f"\n{Fore.YELLOW}Enter the number of the folder to unpack (or 'q' to quit): {Style.RESET_ALL}")
if choice.lower() == 'q':
return None
choice_idx = int(choice) - 1
if 0 <= choice_idx < len(folders):
selected_folder = folders[choice_idx]
full_path = os.path.join(to_unpack_dir, selected_folder)
print(f"{Fore.GREEN}Selected: {selected_folder}{Style.RESET_ALL}")
return full_path
else:
print(f"{Fore.RED}Invalid choice. Please enter a number between 1 and {len(folders)}{Style.RESET_ALL}")
except ValueError:
print(f"{Fore.RED}Please enter a valid number{Style.RESET_ALL}")
def main():
args = parse_args()
logging.basicConfig(
@@ -159,8 +394,8 @@ def main():
)
logger = logging.getLogger('shot')
print(r'''
____ ____
print(f'''
{Fore.CYAN} ____ ____
( __ ) ( __ )
| |~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~| |
| | ____ _ ___ _ _ | |
@@ -174,8 +409,18 @@ def main():
For technology exchange only. Use at your own risk.
GitHub: https://github.com/Lil-House/Pyarmor-Static-Unpack-1shot
{Style.RESET_ALL}
''')
# If menu option is selected or no directory is provided, show the menu
if args.menu or not args.directory:
selected_dir = display_menu()
if selected_dir:
args.directory = selected_dir
else:
print(f"{Fore.YELLOW}No directory selected. Exiting.{Style.RESET_ALL}")
return
if args.runtime:
specified_runtime = RuntimeInfo(args.runtime)
print(specified_runtime)
@@ -191,23 +436,40 @@ def main():
if os.path.isfile(args.directory):
if specified_runtime is None:
logger.error('Please specify `pyarmor_runtime` file by `-r` if input is a file')
logger.error(f'{Fore.RED}Please specify `pyarmor_runtime` file by `-r` if input is a file{Style.RESET_ALL}')
return
logger.info('Single file mode')
logger.info(f'{Fore.CYAN}Single file mode{Style.RESET_ALL}')
result = detect_process(args.directory, args.directory)
if result is None:
logger.error('No armored data found')
logger.error(f'{Fore.RED}No armored data found{Style.RESET_ALL}')
return
sequences.extend(result)
decrypt_process(runtimes, sequences, args)
return # single file mode ends here
# Auto-detect runtime files if requested
if args.auto_detect_runtime:
logger.info(f'{Fore.CYAN}Auto-detecting runtime files in {args.directory}...{Style.RESET_ALL}')
runtime_files = find_runtime_files(args.directory)
if runtime_files:
logger.info(f'{Fore.GREEN}Found {len(runtime_files)} runtime files{Style.RESET_ALL}')
for runtime_file in runtime_files:
try:
new_runtime = RuntimeInfo(runtime_file)
runtimes[new_runtime.serial_number] = new_runtime
logger.info(f'{Fore.GREEN}Found runtime: {new_runtime.serial_number} ({runtime_file}){Style.RESET_ALL}')
print(new_runtime)
except Exception as e:
logger.error(f'{Fore.RED}Failed to load runtime {runtime_file}: {e}{Style.RESET_ALL}')
else:
logger.warning(f'{Fore.YELLOW}No runtime files found in {args.directory}{Style.RESET_ALL}')
dir_path: str
dirs: List[str]
files: List[str]
for dir_path, dirs, files in os.walk(args.directory, followlinks=False):
if '.no1shot' in files:
logger.info(f'Skipping {dir_path} because of `.no1shot`')
logger.info(f'{Fore.YELLOW}Skipping {dir_path} because of `.no1shot`{Style.RESET_ALL}')
dirs.clear()
files.clear()
continue
@@ -228,7 +490,7 @@ def main():
and (not os.path.exists(file_path + '_extracted')
or len(os.listdir(file_path + '_extracted')) == 0):
logger.error(
f'A PYZ file containing armored data is detected, but the PYZ file has not been extracted by other tools. This error is not a problem with this tool. If the folder is extracted by Pyinstxtractor, please read the output information of Pyinstxtractor carefully. ({relative_path})')
f'{Fore.RED}A PYZ file containing armored data is detected, but the PYZ file has not been extracted by other tools. This error is not a problem with this tool. If the folder is extracted by Pyinstxtractor, please read the output information of Pyinstxtractor carefully. ({relative_path}){Style.RESET_ALL}')
continue
# is pyarmor_runtime?
@@ -239,7 +501,7 @@ def main():
new_runtime = RuntimeInfo(file_path)
runtimes[new_runtime.serial_number] = new_runtime
logger.info(
f'Found new runtime: {new_runtime.serial_number} ({file_path})')
f'{Fore.GREEN}Found new runtime: {new_runtime.serial_number} ({file_path}){Style.RESET_ALL}')
print(new_runtime)
continue
except:
@@ -250,10 +512,10 @@ def main():
sequences.extend(result)
if not runtimes:
logger.error('No runtime found')
logger.error(f'{Fore.RED}No runtime found{Style.RESET_ALL}')
return
if not sequences:
logger.error('No armored data found')
logger.error(f'{Fore.RED}No armored data found{Style.RESET_ALL}')
return
decrypt_process(runtimes, sequences, args)