Update 0.1.1

## [0.1.1]

### Added
- Interactive menu system

- BCC decryption when standart decryption fails
- Colorful console output for better readability
- Multiprocessing with configurable thread count
- Improved logging with color-coded messages

### Fixed
- Improved error detection for BCC mode switching
- Fixed indentation issues in BCC fallback method
- Enhanced timeout handling for long-running deobfuscation tasks
This commit is contained in:
Anon0x23
2025-04-09 23:42:59 +02:00
committed by GitHub
parent 4363ff705c
commit b404b47d74

View File

@@ -1,16 +1,21 @@
import argparse import argparse
from Crypto.Cipher import AES
import logging import logging
import multiprocessing
import os import os
import subprocess import subprocess
from multiprocessing import Pool import sys
import time
import asyncio
import traceback
from typing import Dict, List, Tuple from typing import Dict, List, Tuple
from colorama import init, Fore, Style
from Crypto.Cipher import AES
from detect import detect_process from detect import detect_process
from runtime import RuntimeInfo from runtime import RuntimeInfo
SUBPROCESS_TIMEOUT = 30
# Initialize colorama
init(autoreset=True)
def general_aes_ctr_decrypt(data: bytes, key: bytes, nonce: bytes) -> bytes: def general_aes_ctr_decrypt(data: bytes, key: bytes, nonce: bytes) -> bytes:
@@ -18,57 +23,68 @@ def general_aes_ctr_decrypt(data: bytes, key: bytes, nonce: bytes) -> bytes:
return cipher.decrypt(data) return cipher.decrypt(data)
def decrypt_single_file(args_tuple): def bcc_fallback_method(seq_file_path: str, output_path: str = None):
path, data, runtime, args, output_dir = args_tuple # Fallback method for BCC mode deobfuscation
logger = logging.getLogger('shot')
logger.info(f'{Fore.YELLOW}Attempting BCC fallback method for: {seq_file_path}{Style.RESET_ALL}')
if output_path is None:
output_path = seq_file_path + '.back.1shot.seq'
try:
with open(seq_file_path, 'rb') as f:
origin = f.read()
one_shot_header = origin[:32] # Header format
aes_key = one_shot_header[1:17]
bcc_part_length = int.from_bytes(origin[0x58:0x5C], 'little') # If it is 0, it is not BCC part but bytecode part
bytecode_part = origin[32+bcc_part_length:]
aes_nonce = bytecode_part[36:40] + bytecode_part[44:52] # The same position as non-BCC file
with open(output_path, 'wb') as f:
f.write(one_shot_header)
f.write(bytecode_part[:64])
f.write(AES.new(aes_key, AES.MODE_CTR, nonce=aes_nonce, initial_value=2).decrypt(bytecode_part[64:]))
logger.info(f'{Fore.GREEN}Successfully created BCC fallback file: {output_path}{Style.RESET_ALL}')
return output_path
except Exception as e:
error_details = traceback.format_exc()
logger.error(f'{Fore.RED}BCC fallback method failed: {e}{Style.RESET_ALL}')
logger.error(f'{Fore.RED}Error details: {error_details}{Style.RESET_ALL}')
return None
async def run_subprocess_async(cmd, cwd=None):
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=cwd
)
stdout, stderr = await process.communicate()
return process.returncode, stdout, stderr
async def decrypt_file_async(exe_path, seq_file_path, path, args):
logger = logging.getLogger('shot') logger = logging.getLogger('shot')
try: try:
serial_number = data[2:8].decode('utf-8') # Run without timeout
logger.info(f'Decrypting: {serial_number} ({path})') returncode, stdout, stderr = await run_subprocess_async([exe_path, seq_file_path])
dest_path = os.path.join(output_dir, path) if output_dir else path stdout_lines = stdout.decode('latin-1').splitlines()
dest_dir = os.path.dirname(dest_path) stderr_lines = stderr.decode('latin-1').splitlines()
if not os.path.exists(dest_dir):
os.makedirs(dest_dir) for line in stdout_lines:
if args.export_raw_data:
with open(dest_path + '.1shot.raw', 'wb') as f:
f.write(data)
cipher_text_offset = int.from_bytes(data[28:32], 'little')
cipher_text_length = int.from_bytes(data[32:36], 'little')
nonce = data[36:40] + data[44:52]
with open(dest_path + '.1shot.seq', 'wb') as f:
f.write(b'\xa1' + runtime.runtime_aes_key)
f.write(b'\xa2' + runtime.mix_str_aes_nonce())
f.write(b'\xf0\xff')
f.write(data[:cipher_text_offset])
f.write(general_aes_ctr_decrypt(data[cipher_text_offset : cipher_text_offset + cipher_text_length], runtime.runtime_aes_key, nonce))
f.write(data[cipher_text_offset + cipher_text_length :])
exe_name = 'pyarmor-1shot.exe' if os.name == 'nt' else 'pyarmor-1shot'
exe_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), exe_name)
sp = subprocess.run(
[
exe_path,
dest_path + '.1shot.seq',
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=SUBPROCESS_TIMEOUT,
)
stdout = sp.stdout.decode('latin-1').splitlines()
stderr = sp.stderr.decode('latin-1').splitlines()
for line in stdout:
logger.warning(f'PYCDC: {line} ({path})') logger.warning(f'PYCDC: {line} ({path})')
for line in stderr:
if line.startswith( for line in stderr_lines:
( if line.startswith((
'Warning: Stack history is empty', 'Warning: Stack history is empty',
'Warning: Stack history is not empty', 'Warning: Stack history is not empty',
'Warning: block stack is not empty', 'Warning: block stack is not empty',
) )):
):
if args.show_warn_stack or args.show_all: if args.show_warn_stack or args.show_all:
logger.warning(f'PYCDC: {line} ({path})') logger.warning(f'PYCDC: {line} ({path})')
elif line.startswith('Unsupported opcode:'): elif line.startswith('Unsupported opcode:'):
@@ -79,33 +95,114 @@ def decrypt_single_file(args_tuple):
logger.error(f'PYCDC: {line} ({path})') logger.error(f'PYCDC: {line} ({path})')
else: else:
logger.error(f'PYCDC: {line} ({path})') logger.error(f'PYCDC: {line} ({path})')
if sp.returncode != 0:
logger.warning(f'PYCDC returned {sp.returncode} ({path})') return returncode, stdout_lines, stderr_lines
return False
return True
except Exception as e: except Exception as e:
logger.error(f'Decrypt failed: {e} ({path})') error_details = traceback.format_exc()
return False logger.error(f'{Fore.RED}Error during async deobfuscation: {e}{Style.RESET_ALL}')
logger.error(f'{Fore.RED}Error details: {error_details}{Style.RESET_ALL}')
return -1, [], []
async def decrypt_process_async(runtimes: Dict[str, RuntimeInfo], sequences: List[Tuple[str, bytes]], args):
logger = logging.getLogger('shot')
output_dir: str = args.output_dir or args.directory
# Create a semaphore to limit concurrent processes
semaphore = asyncio.Semaphore(args.concurrent) # Use the concurrent argument
async def process_file(path, data):
async with semaphore:
try:
serial_number = data[2:8].decode('utf-8')
runtime = runtimes[serial_number]
logger.info(f'{Fore.CYAN}Decrypting: {serial_number} ({path}){Style.RESET_ALL}')
dest_path = os.path.join(output_dir, path) if output_dir else path
dest_dir = os.path.dirname(dest_path)
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
if args.export_raw_data:
with open(dest_path + '.1shot.raw', 'wb') as f:
f.write(data)
cipher_text_offset = int.from_bytes(data[28:32], 'little')
cipher_text_length = int.from_bytes(data[32:36], 'little')
nonce = data[36:40] + data[44:52]
seq_file_path = dest_path + '.1shot.seq'
with open(seq_file_path, 'wb') as f:
f.write(b'\xa1' + runtime.runtime_aes_key)
f.write(b'\xa2' + runtime.mix_str_aes_nonce())
f.write(b'\xf0\xff')
f.write(data[:cipher_text_offset])
f.write(general_aes_ctr_decrypt(
data[cipher_text_offset:cipher_text_offset+cipher_text_length], runtime.runtime_aes_key, nonce))
f.write(data[cipher_text_offset+cipher_text_length:])
exe_name = 'pyarmor-1shot.exe' if os.name == 'nt' else 'pyarmor-1shot'
exe_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)), exe_name)
# Run without timeout
returncode, stdout_lines, stderr_lines = await decrypt_file_async(exe_path, seq_file_path, path, args)
# Check for specific errors that indicate BCC mode
should_try_bcc = False
error_message = ""
if returncode != 0:
error_message = f"PYCDC returned {returncode}"
# Check for specific error patterns that suggest BCC mode
for line in stderr_lines:
if ("Unsupported opcode" in line or
"Something TERRIBLE happened" in line or
"Unknown opcode 0" in line or
"Got unsupported type" in line):
should_try_bcc = True
error_message += f" - {line}"
break
if should_try_bcc:
logger.warning(f'{Fore.YELLOW}{error_message} ({path}) - Attempting BCC fallback{Style.RESET_ALL}')
# Try BCC fallback method
bcc_file_path = bcc_fallback_method(seq_file_path)
if bcc_file_path:
logger.info(f'{Fore.GREEN}Running deobfuscator on BCC fallback file{Style.RESET_ALL}')
try:
# Run without timeout
returncode, stdout_lines, stderr_lines = await decrypt_file_async(exe_path, bcc_file_path, path, args)
if returncode == 0:
logger.info(f'{Fore.GREEN}Successfully deobfuscated using BCC fallback method{Style.RESET_ALL}')
print(f"{Fore.GREEN} BCC Decrypted: {path}{Style.RESET_ALL}")
else:
logger.error(f'{Fore.RED}BCC fallback deobfuscation failed with return code {returncode}{Style.RESET_ALL}')
for line in stderr_lines:
logger.error(f'{Fore.RED}BCC Error: {line}{Style.RESET_ALL}')
except Exception as e:
error_details = traceback.format_exc()
logger.error(f'{Fore.RED}BCC fallback deobfuscation failed with error: {e}{Style.RESET_ALL}')
logger.error(f'{Fore.RED}Error details: {error_details}{Style.RESET_ALL}')
elif returncode == 0:
# Successfully decrypted
logger.info(f'{Fore.GREEN}Successfully decrypted: {path}{Style.RESET_ALL}')
print(f"{Fore.GREEN} Decrypted: {path}{Style.RESET_ALL}")
else:
logger.warning(f'{Fore.YELLOW}{error_message} ({path}){Style.RESET_ALL}')
except Exception as e:
error_details = traceback.format_exc()
logger.error(f'{Fore.RED}Decrypt failed: {e} ({path}){Style.RESET_ALL}')
logger.error(f'{Fore.RED}Error details: {error_details}{Style.RESET_ALL}')
# Create tasks for all files
tasks = [process_file(path, data) for path, data in sequences]
# Run all tasks concurrently
await asyncio.gather(*tasks)
def decrypt_process(runtimes: Dict[str, RuntimeInfo], sequences: List[Tuple[str, bytes]], args): def decrypt_process(runtimes: Dict[str, RuntimeInfo], sequences: List[Tuple[str, bytes]], args):
logger = logging.getLogger('shot') asyncio.run(decrypt_process_async(runtimes, sequences, args))
output_dir: str = args.output_dir or args.directory
process_args = []
for path, data in sequences:
try:
serial_number = data[2:8].decode('utf-8')
runtime = runtimes[serial_number]
process_args.append((path, data, runtime, args, output_dir))
except Exception as e:
logger.error(f'Failed to prepare file for decryption: {e} ({path})')
num_processes = min(args.processes if hasattr(args, 'processes') else multiprocessing.cpu_count(), len(process_args))
logger.info(f'Starting decryption with {num_processes} processes for {len(process_args)} files')
with Pool(processes=num_processes) as pool:
results = pool.map(decrypt_single_file, process_args)
success_count = sum(1 for result in results if result)
logger.info(f'Decryption completed: {success_count} succeeded, {len(process_args) - success_count} failed')
def parse_args(): def parse_args():
@@ -115,6 +212,7 @@ def parse_args():
'directory', 'directory',
help='the "root" directory of obfuscated scripts', help='the "root" directory of obfuscated scripts',
type=str, type=str,
nargs='?',
) )
parser.add_argument( parser.add_argument(
'-r', '-r',
@@ -148,9 +246,55 @@ def parse_args():
help='show pycdc stack related warnings', help='show pycdc stack related warnings',
action='store_true', action='store_true',
) )
parser.add_argument(
'--menu',
help='show interactive menu to select folder to unpack',
action='store_true',
)
parser.add_argument(
'--concurrent',
help='number of concurrent deobfuscation processes (default: 4)',
type=int,
default=4,
)
return parser.parse_args() return parser.parse_args()
def display_menu():
to_unpack_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'to_unpack')
if not os.path.exists(to_unpack_dir):
os.makedirs(to_unpack_dir)
folders = [d for d in os.listdir(to_unpack_dir)
if os.path.isdir(os.path.join(to_unpack_dir, d))]
if not folders:
print(f"{Fore.YELLOW}No folders found in {to_unpack_dir}{Style.RESET_ALL}")
return None
print(f"\n{Fore.CYAN}=== Available Folders to Unpack ==={Style.RESET_ALL}")
for i, folder in enumerate(folders, 1):
print(f"{Fore.GREEN}[{i}]{Style.RESET_ALL} {folder}")
while True:
try:
choice = input(f"\n{Fore.YELLOW}Enter the number of the folder to unpack (or 'q' to quit): {Style.RESET_ALL}")
if choice.lower() == 'q':
return None
choice_idx = int(choice) - 1
if 0 <= choice_idx < len(folders):
selected_folder = folders[choice_idx]
full_path = os.path.join(to_unpack_dir, selected_folder)
print(f"{Fore.GREEN}Selected: {selected_folder}{Style.RESET_ALL}")
return full_path
else:
print(f"{Fore.RED}Invalid choice. Please enter a number between 1 and {len(folders)}{Style.RESET_ALL}")
except ValueError:
print(f"{Fore.RED}Please enter a valid number{Style.RESET_ALL}")
def main(): def main():
args = parse_args() args = parse_args()
logging.basicConfig( logging.basicConfig(
@@ -159,8 +303,8 @@ def main():
) )
logger = logging.getLogger('shot') logger = logging.getLogger('shot')
print(r''' print(f'''
____ ____ {Fore.CYAN} ____ ____
( __ ) ( __ ) ( __ ) ( __ )
| |~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~| | | |~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~| |
| | ____ _ ___ _ _ | | | | ____ _ ___ _ _ | |
@@ -174,8 +318,18 @@ def main():
For technology exchange only. Use at your own risk. For technology exchange only. Use at your own risk.
GitHub: https://github.com/Lil-House/Pyarmor-Static-Unpack-1shot GitHub: https://github.com/Lil-House/Pyarmor-Static-Unpack-1shot
{Style.RESET_ALL}
''') ''')
# If menu option is selected or no directory is provided, show the menu
if args.menu or not args.directory:
selected_dir = display_menu()
if selected_dir:
args.directory = selected_dir
else:
print(f"{Fore.YELLOW}No directory selected. Exiting.{Style.RESET_ALL}")
return
if args.runtime: if args.runtime:
specified_runtime = RuntimeInfo(args.runtime) specified_runtime = RuntimeInfo(args.runtime)
print(specified_runtime) print(specified_runtime)
@@ -191,12 +345,12 @@ def main():
if os.path.isfile(args.directory): if os.path.isfile(args.directory):
if specified_runtime is None: if specified_runtime is None:
logger.error('Please specify `pyarmor_runtime` file by `-r` if input is a file') logger.error(f'{Fore.RED}Please specify `pyarmor_runtime` file by `-r` if input is a file{Style.RESET_ALL}')
return return
logger.info('Single file mode') logger.info(f'{Fore.CYAN}Single file mode{Style.RESET_ALL}')
result = detect_process(args.directory, args.directory) result = detect_process(args.directory, args.directory)
if result is None: if result is None:
logger.error('No armored data found') logger.error(f'{Fore.RED}No armored data found{Style.RESET_ALL}')
return return
sequences.extend(result) sequences.extend(result)
decrypt_process(runtimes, sequences, args) decrypt_process(runtimes, sequences, args)
@@ -207,7 +361,7 @@ def main():
files: List[str] files: List[str]
for dir_path, dirs, files in os.walk(args.directory, followlinks=False): for dir_path, dirs, files in os.walk(args.directory, followlinks=False):
if '.no1shot' in files: if '.no1shot' in files:
logger.info(f'Skipping {dir_path} because of `.no1shot`') logger.info(f'{Fore.YELLOW}Skipping {dir_path} because of `.no1shot`{Style.RESET_ALL}')
dirs.clear() dirs.clear()
files.clear() files.clear()
continue continue
@@ -228,7 +382,7 @@ def main():
and (not os.path.exists(file_path + '_extracted') and (not os.path.exists(file_path + '_extracted')
or len(os.listdir(file_path + '_extracted')) == 0): or len(os.listdir(file_path + '_extracted')) == 0):
logger.error( logger.error(
f'A PYZ file containing armored data is detected, but the PYZ file has not been extracted by other tools. This error is not a problem with this tool. If the folder is extracted by Pyinstxtractor, please read the output information of Pyinstxtractor carefully. ({relative_path})') f'{Fore.RED}A PYZ file containing armored data is detected, but the PYZ file has not been extracted by other tools. This error is not a problem with this tool. If the folder is extracted by Pyinstxtractor, please read the output information of Pyinstxtractor carefully. ({relative_path}){Style.RESET_ALL}')
continue continue
# is pyarmor_runtime? # is pyarmor_runtime?
@@ -239,7 +393,7 @@ def main():
new_runtime = RuntimeInfo(file_path) new_runtime = RuntimeInfo(file_path)
runtimes[new_runtime.serial_number] = new_runtime runtimes[new_runtime.serial_number] = new_runtime
logger.info( logger.info(
f'Found new runtime: {new_runtime.serial_number} ({file_path})') f'{Fore.GREEN}Found new runtime: {new_runtime.serial_number} ({file_path}){Style.RESET_ALL}')
print(new_runtime) print(new_runtime)
continue continue
except: except:
@@ -250,10 +404,10 @@ def main():
sequences.extend(result) sequences.extend(result)
if not runtimes: if not runtimes:
logger.error('No runtime found') logger.error(f'{Fore.RED}No runtime found{Style.RESET_ALL}')
return return
if not sequences: if not sequences:
logger.error('No armored data found') logger.error(f'{Fore.RED}No armored data found{Style.RESET_ALL}')
return return
decrypt_process(runtimes, sequences, args) decrypt_process(runtimes, sequences, args)