6 Commits

Author SHA1 Message Date
71f9fea419 chore 2025-04-10 18:10:13 +08:00
LilRan
7ff305a0f8 Merge pull request GH-9 from Anon0x23/Anon0x23-patch-1
refactor, asyncio, colorful console output, BCC mode fallback, interactive menu
2025-04-10 15:59:43 +08:00
Anon0x23
665ec7efd6 Update shot.py
Add multi-platform support for pyarmor_runtime executables

1. Platform Detection and Executable Selection:
   - Automatically detects OS (Windows, Linux, macOS) and architecture
   - Intelligently selects the appropriate executable based on platform
   - Supports architecture-specific executables (e.g., pyarmor-1shot-windows-x86_64.exe)
   - Falls back to platform-specific and generic executables when needed

2. Runtime File Auto-Detection:
   - Added ability to automatically find all pyarmor_runtime files in a directory
   - Supports multiple runtime file formats (.pyd, .so, .dylib)
   - Recursively searches through all subdirectories

3. New Command-Line Options:
   - Added --executable option to specify a custom executable path
   - Added --auto-detect-runtime option to automatically detect runtime files
2025-04-10 00:02:21 +02:00
Anon0x23
b404b47d74 Update 0.1.1
## [0.1.1]

### Added
- Interactive menu system

- BCC decryption when standart decryption fails
- Colorful console output for better readability
- Multiprocessing with configurable thread count
- Improved logging with color-coded messages

### Fixed
- Improved error detection for BCC mode switching
- Fixed indentation issues in BCC fallback method
- Enhanced timeout handling for long-running deobfuscation tasks
2025-04-09 23:42:59 +02:00
本光
4363ff705c feat: multi process (GH-6) 2025-04-06 00:40:47 +08:00
352d14ec82 fix: type hints, match 2025-03-26 15:10:12 +08:00
4 changed files with 340 additions and 101 deletions

View File

@@ -1,7 +1,7 @@
name: Build
on:
push:
branches: [ci-build]
branches: [main]
pull_request:
jobs:

View File

@@ -16,7 +16,7 @@ You don't need to execute the encrypted script. We decrypt them using the same a
### Universal
Currently we are trying to support Pyarmor 8.0 to 9.1.2 (latest), Python 3.7 - 3.13, on all operating systems, with obfuscating options as many as possible. (However, we only have limited tests.)
Currently we are trying to support Pyarmor 8.0 to 9.1.3 (latest), Python 3.7 - 3.13, on all operating systems, with obfuscating options as many as possible. (However, we only have limited tests.)
You can run this tool in any environment, no need to be the same with obfuscated scripts or runtime.
@@ -67,5 +67,4 @@ Feel free to open an issue if you have any questions, suggestions, or problems.
- [ ] Multi-platform pyarmor_runtime executable
- [ ] Support more obfuscating options
- [ ] Use asyncio for concurrency
- [ ] Pyarmor 7 and before (Later or never.)
- [ ] Regenerate pyc for other backends

View File

@@ -1,13 +1,13 @@
import logging
import os
from typing import List, Tuple
from typing import List, Tuple, Union
def ascii_ratio(data: bytes) -> float:
return sum(32 <= c < 127 for c in data) / len(data)
def source_as_file(file_path: str) -> List[bytes] | None:
def source_as_file(file_path: str) -> Union[List[bytes], None]:
try:
with open(file_path, 'r') as f:
co = compile(f.read(), '<str>', 'exec')
@@ -18,7 +18,7 @@ def source_as_file(file_path: str) -> List[bytes] | None:
return None
def source_as_lines(file_path: str) -> List[bytes] | None:
def source_as_lines(file_path: str) -> Union[List[bytes], None]:
data = []
try:
with open(file_path, 'r') as f:
@@ -69,7 +69,7 @@ def find_data_from_bytes(data: bytes, max_count=-1) -> List[bytes]:
return result
def nuitka_package(head: bytes, relative_path: str) -> None | List[Tuple[str, bytes]]:
def nuitka_package(head: bytes, relative_path: str) -> Union[List[Tuple[str, bytes]], None]:
first_occurrence = head.find(b'PY00')
if first_occurrence == -1:
return None
@@ -99,7 +99,7 @@ def nuitka_package(head: bytes, relative_path: str) -> None | List[Tuple[str, by
return None
def detect_process(file_path: str, relative_path: str) -> None | List[Tuple[str, bytes]]:
def detect_process(file_path: str, relative_path: str) -> Union[List[Tuple[str, bytes]], None]:
'''
Returns a list of (relative_path, bytes_raw) tuples, or None.
Do not raise exceptions.
@@ -127,15 +127,15 @@ def detect_process(file_path: str, relative_path: str) -> None | List[Tuple[str,
if result is None:
return None
match len(result):
case 0:
return None
case 1:
logger.info(f'Found data in source: {relative_path}')
return [(relative_path, result[0])]
case _:
logger.info(f'Found data in source: {relative_path}')
return [(f'{relative_path}__{i}', result[i]) for i in range(len(result))]
result_len = len(result)
if result_len == 0:
return None
elif result_len == 1:
logger.info(f'Found data in source: {relative_path}')
return [(relative_path, result[0])]
else:
logger.info(f'Found data in source: {relative_path}')
return [(f'{relative_path}__{i}', result[i]) for i in range(len(result))]
# binary file
# ignore data after 16MB, before we have a reason to read more
@@ -146,12 +146,12 @@ def detect_process(file_path: str, relative_path: str) -> None | List[Tuple[str,
return nuitka_package(head, relative_path)
result = find_data_from_bytes(head)
match len(result):
case 0:
return None
case 1:
logger.info(f'Found data in binary: {relative_path}')
return [(relative_path, result[0])]
case _:
logger.info(f'Found data in binary: {relative_path}')
return [(f'{relative_path}__{i}', result[i]) for i in range(len(result))]
result_len = len(result)
if result_len == 0:
return None
elif result_len == 1:
logger.info(f'Found data in binary: {relative_path}')
return [(relative_path, result[0])]
else:
logger.info(f'Found data in binary: {relative_path}')
return [(f'{relative_path}__{i}', result[i]) for i in range(len(result))]

View File

@@ -2,14 +2,24 @@ import argparse
from Crypto.Cipher import AES
import logging
import os
import subprocess
import asyncio
import traceback
import platform
from typing import Dict, List, Tuple
try:
from colorama import init, Fore, Style
except ImportError:
def init(**kwargs): pass
class Fore: CYAN = RED = YELLOW = GREEN = ''
class Style: RESET_ALL = ''
from detect import detect_process
from runtime import RuntimeInfo
SUBPROCESS_TIMEOUT = 30
# Initialize colorama
init(autoreset=True)
def general_aes_ctr_decrypt(data: bytes, key: bytes, nonce: bytes) -> bytes:
@@ -17,75 +27,243 @@ def general_aes_ctr_decrypt(data: bytes, key: bytes, nonce: bytes) -> bytes:
return cipher.decrypt(data)
def decrypt_process(runtimes: Dict[str, RuntimeInfo], sequences: List[Tuple[str, bytes]], args):
def bcc_fallback_method(seq_file_path: str, output_path: str = None):
# Fallback method for BCC mode deobfuscation
logger = logging.getLogger('shot')
logger.info(f'{Fore.YELLOW}Attempting BCC fallback method for: {seq_file_path}{Style.RESET_ALL}')
if output_path is None:
output_path = seq_file_path + '.back.1shot.seq'
try:
with open(seq_file_path, 'rb') as f:
origin = f.read()
one_shot_header = origin[:32] # Header format
aes_key = one_shot_header[1:17]
bcc_part_length = int.from_bytes(origin[0x58:0x5C], 'little') # If it is 0, it is not BCC part but bytecode part
bytecode_part = origin[32+bcc_part_length:]
aes_nonce = bytecode_part[36:40] + bytecode_part[44:52] # The same position as non-BCC file
with open(output_path, 'wb') as f:
f.write(one_shot_header)
f.write(bytecode_part[:64])
f.write(general_aes_ctr_decrypt(bytecode_part[64:], aes_key, aes_nonce))
logger.info(f'{Fore.GREEN}Successfully created BCC fallback file: {output_path}{Style.RESET_ALL}')
return output_path
except Exception as e:
error_details = traceback.format_exc()
logger.error(f'{Fore.RED}BCC fallback method failed: {e}{Style.RESET_ALL}')
logger.error(f'{Fore.RED}Error details: {error_details}{Style.RESET_ALL}')
return None
async def run_subprocess_async(cmd, cwd=None):
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=cwd
)
stdout, stderr = await process.communicate()
return process.returncode, stdout, stderr
async def decrypt_file_async(exe_path, seq_file_path, path, args):
logger = logging.getLogger('shot')
try:
# Run without timeout
returncode, stdout, stderr = await run_subprocess_async([exe_path, seq_file_path])
stdout_lines = stdout.decode('latin-1').splitlines()
stderr_lines = stderr.decode('latin-1').splitlines()
for line in stdout_lines:
logger.warning(f'PYCDC: {line} ({path})')
for line in stderr_lines:
if line.startswith((
'Warning: Stack history is empty',
'Warning: Stack history is not empty',
'Warning: block stack is not empty',
)):
if args.show_warn_stack or args.show_all:
logger.warning(f'PYCDC: {line} ({path})')
elif line.startswith('Unsupported opcode:'):
if args.show_err_opcode or args.show_all:
logger.error(f'PYCDC: {line} ({path})')
elif line.startswith((
'Something TERRIBLE happened',
'Unsupported argument',
'Unsupported Node type',
'Unsupported node type',
)): # annoying wont-fix errors
if args.show_all:
logger.error(f'PYCDC: {line} ({path})')
else:
logger.error(f'PYCDC: {line} ({path})')
return returncode, stdout_lines, stderr_lines
except Exception as e:
error_details = traceback.format_exc()
logger.error(f'{Fore.RED}Error during async deobfuscation: {e}{Style.RESET_ALL}')
logger.error(f'{Fore.RED}Error details: {error_details}{Style.RESET_ALL}')
return -1, [], []
async def decrypt_process_async(runtimes: Dict[str, RuntimeInfo], sequences: List[Tuple[str, bytes]], args):
logger = logging.getLogger('shot')
output_dir: str = args.output_dir or args.directory
for path, data in sequences:
try:
serial_number = data[2:8].decode('utf-8')
runtime = runtimes[serial_number]
logger.info(f'Decrypting: {serial_number} ({path})')
# Create a semaphore to limit concurrent processes
semaphore = asyncio.Semaphore(args.concurrent) # Use the concurrent argument
# Get the appropriate executable for the current platform
exe_path = get_platform_executable(args)
dest_path = os.path.join(output_dir, path) if output_dir else path
dest_dir = os.path.dirname(dest_path)
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
async def process_file(path, data):
async with semaphore:
try:
serial_number = data[2:8].decode('utf-8')
runtime = runtimes[serial_number]
logger.info(f'{Fore.CYAN}Decrypting: {serial_number} ({path}){Style.RESET_ALL}')
if args.export_raw_data:
with open(dest_path + '.1shot.raw', 'wb') as f:
f.write(data)
dest_path = os.path.join(output_dir, path) if output_dir else path
dest_dir = os.path.dirname(dest_path)
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
cipher_text_offset = int.from_bytes(data[28:32], 'little')
cipher_text_length = int.from_bytes(data[32:36], 'little')
nonce = data[36:40] + data[44:52]
with open(dest_path + '.1shot.seq', 'wb') as f:
f.write(b'\xa1' + runtime.runtime_aes_key)
f.write(b'\xa2' + runtime.mix_str_aes_nonce())
f.write(b'\xf0\xff')
f.write(data[:cipher_text_offset])
f.write(general_aes_ctr_decrypt(
data[cipher_text_offset:cipher_text_offset+cipher_text_length], runtime.runtime_aes_key, nonce))
f.write(data[cipher_text_offset+cipher_text_length:])
if args.export_raw_data:
with open(dest_path + '.1shot.raw', 'wb') as f:
f.write(data)
exe_name = 'pyarmor-1shot.exe' if os.name == 'nt' else 'pyarmor-1shot'
exe_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)), exe_name)
# TODO: multi process
sp = subprocess.run(
[
exe_path,
dest_path + '.1shot.seq',
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=SUBPROCESS_TIMEOUT,
)
stdout = sp.stdout.decode('latin-1').splitlines()
stderr = sp.stderr.decode('latin-1').splitlines()
for line in stdout:
logger.warning(f'PYCDC: {line} ({path})')
for line in stderr:
if line.startswith((
'Warning: Stack history is empty',
'Warning: Stack history is not empty',
'Warning: block stack is not empty',
)):
if args.show_warn_stack or args.show_all:
logger.warning(f'PYCDC: {line} ({path})')
elif line.startswith('Unsupported opcode:'):
if args.show_err_opcode or args.show_all:
logger.error(f'PYCDC: {line} ({path})')
elif line.startswith('Something TERRIBLE happened'):
if args.show_all:
logger.error(f'PYCDC: {line} ({path})')
cipher_text_offset = int.from_bytes(data[28:32], 'little')
cipher_text_length = int.from_bytes(data[32:36], 'little')
nonce = data[36:40] + data[44:52]
seq_file_path = dest_path + '.1shot.seq'
with open(seq_file_path, 'wb') as f:
f.write(b'\xa1' + runtime.runtime_aes_key)
f.write(b'\xa2' + runtime.mix_str_aes_nonce())
f.write(b'\xf0\xff')
f.write(data[:cipher_text_offset])
f.write(general_aes_ctr_decrypt(
data[cipher_text_offset:cipher_text_offset+cipher_text_length], runtime.runtime_aes_key, nonce))
f.write(data[cipher_text_offset+cipher_text_length:])
# Run without timeout
returncode, stdout_lines, stderr_lines = await decrypt_file_async(exe_path, seq_file_path, path, args)
# Check for specific errors that indicate BCC mode
should_try_bcc = False
error_message = ""
# FIXME: Probably false positive
if returncode != 0:
error_message = f"PYCDC returned {returncode}"
# Check for specific error patterns that suggest BCC mode
for line in stderr_lines:
if ("Unsupported opcode" in line or
"Something TERRIBLE happened" in line or
"Unknown opcode 0" in line or
"Got unsupported type" in line):
should_try_bcc = True
error_message += f" - {line}"
break
if should_try_bcc:
logger.warning(f'{Fore.YELLOW}{error_message} ({path}) - Attempting BCC fallback{Style.RESET_ALL}')
# Try BCC fallback method
bcc_file_path = bcc_fallback_method(seq_file_path)
if bcc_file_path:
logger.info(f'{Fore.GREEN}Running deobfuscator on BCC fallback file{Style.RESET_ALL}')
try:
# Run without timeout
returncode, stdout_lines, stderr_lines = await decrypt_file_async(exe_path, bcc_file_path, path, args)
if returncode == 0:
logger.info(f'{Fore.GREEN}Successfully deobfuscated using BCC fallback method{Style.RESET_ALL}')
print(f"{Fore.GREEN} BCC Decrypted: {path}{Style.RESET_ALL}")
else:
logger.error(f'{Fore.RED}BCC fallback deobfuscation failed with return code {returncode}{Style.RESET_ALL}')
for line in stderr_lines:
logger.error(f'{Fore.RED}BCC Error: {line}{Style.RESET_ALL}')
except Exception as e:
error_details = traceback.format_exc()
logger.error(f'{Fore.RED}BCC fallback deobfuscation failed with error: {e}{Style.RESET_ALL}')
logger.error(f'{Fore.RED}Error details: {error_details}{Style.RESET_ALL}')
elif returncode == 0:
# Successfully decrypted
logger.info(f'{Fore.GREEN}Successfully decrypted: {path}{Style.RESET_ALL}')
print(f"{Fore.GREEN} Decrypted: {path}{Style.RESET_ALL}")
else:
logger.error(f'PYCDC: {line} ({path})')
if sp.returncode != 0:
logger.warning(f'PYCDC returned {sp.returncode} ({path})')
continue
except Exception as e:
logger.error(f'Decrypt failed: {e} ({path})')
continue
logger.warning(f'{Fore.YELLOW}{error_message} ({path}){Style.RESET_ALL}')
except Exception as e:
error_details = traceback.format_exc()
logger.error(f'{Fore.RED}Decrypt failed: {e} ({path}){Style.RESET_ALL}')
logger.error(f'{Fore.RED}Error details: {error_details}{Style.RESET_ALL}')
# Create tasks for all files
tasks = [process_file(path, data) for path, data in sequences]
# Run all tasks concurrently
await asyncio.gather(*tasks)
def decrypt_process(runtimes: Dict[str, RuntimeInfo], sequences: List[Tuple[str, bytes]], args):
asyncio.run(decrypt_process_async(runtimes, sequences, args))
def get_platform_executable(args) -> str:
"""
Get the appropriate executable for the current platform
"""
logger = logging.getLogger('shot')
# If a specific executable is provided, use it
if args.executable:
if os.path.exists(args.executable):
logger.info(f'{Fore.GREEN}Using specified executable: {args.executable}{Style.RESET_ALL}')
return args.executable
else:
logger.warning(f'{Fore.YELLOW}Specified executable not found: {args.executable}{Style.RESET_ALL}')
helpers_dir = os.path.dirname(os.path.abspath(__file__))
system = platform.system().lower()
machine = platform.machine().lower()
# Check for architecture-specific executables
arch_specific_exe = f'pyarmor-1shot-{system}-{machine}'
if system == 'windows':
arch_specific_exe += '.exe'
arch_exe_path = os.path.join(helpers_dir, arch_specific_exe)
if os.path.exists(arch_exe_path):
logger.info(f'{Fore.GREEN}Using architecture-specific executable: {arch_specific_exe}{Style.RESET_ALL}')
return arch_exe_path
platform_map = {
'windows': 'pyarmor-1shot.exe',
'linux': 'pyarmor-1shot',
'darwin': 'pyarmor-1shot',
}
base_exe_name = platform_map.get(system, 'pyarmor-1shot')
# Then check for platform-specific executable
platform_exe_path = os.path.join(helpers_dir, base_exe_name)
if os.path.exists(platform_exe_path):
logger.info(f'{Fore.GREEN}Using platform-specific executable: {base_exe_name}{Style.RESET_ALL}')
return platform_exe_path
# Finally, check for generic executable
generic_exe_path = os.path.join(helpers_dir, 'pyarmor-1shot')
if os.path.exists(generic_exe_path):
logger.info(f'{Fore.GREEN}Using generic executable: pyarmor-1shot{Style.RESET_ALL}')
return generic_exe_path
logger.critical(f'{Fore.RED}Executable {base_exe_name} not found, please build it first or download on https://github.com/Lil-House/Pyarmor-Static-Unpack-1shot/releases {Style.RESET_ALL}')
exit(1)
def parse_args():
@@ -95,6 +273,7 @@ def parse_args():
'directory',
help='the "root" directory of obfuscated scripts',
type=str,
nargs='?',
)
parser.add_argument(
'-r',
@@ -128,9 +307,61 @@ def parse_args():
help='show pycdc stack related warnings',
action='store_true',
)
parser.add_argument(
'--menu',
help='show interactive menu to select folder to unpack',
action='store_true',
)
parser.add_argument(
'--concurrent',
help='number of concurrent deobfuscation processes (default: 4)',
type=int,
default=4,
)
parser.add_argument(
'-e',
'--executable',
help='path to the pyarmor-1shot executable to use',
type=str,
)
return parser.parse_args()
def display_menu():
to_unpack_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'to_unpack')
if not os.path.exists(to_unpack_dir):
os.makedirs(to_unpack_dir)
folders = [d for d in os.listdir(to_unpack_dir)
if os.path.isdir(os.path.join(to_unpack_dir, d))]
if not folders:
print(f"{Fore.YELLOW}No folders found in {to_unpack_dir}{Style.RESET_ALL}")
return None
print(f"\n{Fore.CYAN}=== Available Folders to Unpack ==={Style.RESET_ALL}")
for i, folder in enumerate(folders, 1):
print(f"{Fore.GREEN}[{i}]{Style.RESET_ALL} {folder}")
while True:
try:
choice = input(f"\n{Fore.YELLOW}Enter the number of the folder to unpack (or 'q' to quit): {Style.RESET_ALL}")
if choice.lower() == 'q':
return None
choice_idx = int(choice) - 1
if 0 <= choice_idx < len(folders):
selected_folder = folders[choice_idx]
full_path = os.path.join(to_unpack_dir, selected_folder)
print(f"{Fore.GREEN}Selected: {selected_folder}{Style.RESET_ALL}")
return full_path
else:
print(f"{Fore.RED}Invalid choice. Please enter a number between 1 and {len(folders)}{Style.RESET_ALL}")
except ValueError:
print(f"{Fore.RED}Please enter a valid number{Style.RESET_ALL}")
def main():
args = parse_args()
logging.basicConfig(
@@ -139,7 +370,7 @@ def main():
)
logger = logging.getLogger('shot')
print(r'''
print(Fore.CYAN + r'''
____ ____
( __ ) ( __ )
| |~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~| |
@@ -154,7 +385,16 @@ def main():
For technology exchange only. Use at your own risk.
GitHub: https://github.com/Lil-House/Pyarmor-Static-Unpack-1shot
''')
''' + Style.RESET_ALL)
# If menu option is selected or no directory is provided, show the menu
if args.menu or not args.directory:
selected_dir = display_menu()
if selected_dir:
args.directory = selected_dir
else:
print(f"{Fore.YELLOW}No directory selected. Exiting.{Style.RESET_ALL}")
return
if args.runtime:
specified_runtime = RuntimeInfo(args.runtime)
@@ -171,12 +411,12 @@ def main():
if os.path.isfile(args.directory):
if specified_runtime is None:
logger.error('Please specify `pyarmor_runtime` file by `-r` if input is a file')
logger.error(f'{Fore.RED}Please specify `pyarmor_runtime` file by `-r` if input is a file{Style.RESET_ALL}')
return
logger.info('Single file mode')
logger.info(f'{Fore.CYAN}Single file mode{Style.RESET_ALL}')
result = detect_process(args.directory, args.directory)
if result is None:
logger.error('No armored data found')
logger.error(f'{Fore.RED}No armored data found{Style.RESET_ALL}')
return
sequences.extend(result)
decrypt_process(runtimes, sequences, args)
@@ -187,7 +427,7 @@ def main():
files: List[str]
for dir_path, dirs, files in os.walk(args.directory, followlinks=False):
if '.no1shot' in files:
logger.info(f'Skipping {dir_path} because of `.no1shot`')
logger.info(f'{Fore.YELLOW}Skipping {dir_path} because of `.no1shot`{Style.RESET_ALL}')
dirs.clear()
files.clear()
continue
@@ -208,7 +448,7 @@ def main():
and (not os.path.exists(file_path + '_extracted')
or len(os.listdir(file_path + '_extracted')) == 0):
logger.error(
f'A PYZ file containing armored data is detected, but the PYZ file has not been extracted by other tools. This error is not a problem with this tool. If the folder is extracted by Pyinstxtractor, please read the output information of Pyinstxtractor carefully. ({relative_path})')
f'{Fore.RED}A PYZ file containing armored data is detected, but the PYZ file has not been extracted by other tools. This error is not a problem with this tool. If the folder is extracted by Pyinstxtractor, please read the output information of Pyinstxtractor carefully. ({relative_path}){Style.RESET_ALL}')
continue
# is pyarmor_runtime?
@@ -219,7 +459,7 @@ def main():
new_runtime = RuntimeInfo(file_path)
runtimes[new_runtime.serial_number] = new_runtime
logger.info(
f'Found new runtime: {new_runtime.serial_number} ({file_path})')
f'{Fore.GREEN}Found new runtime: {new_runtime.serial_number} ({file_path}){Style.RESET_ALL}')
print(new_runtime)
continue
except:
@@ -230,10 +470,10 @@ def main():
sequences.extend(result)
if not runtimes:
logger.error('No runtime found')
logger.error(f'{Fore.RED}No runtime found{Style.RESET_ALL}')
return
if not sequences:
logger.error('No armored data found')
logger.error(f'{Fore.RED}No armored data found{Style.RESET_ALL}')
return
decrypt_process(runtimes, sequences, args)