chore: directory structure
This commit is contained in:
0
oneshot/__init__.py
Normal file
0
oneshot/__init__.py
Normal file
160
oneshot/detect.py
Normal file
160
oneshot/detect.py
Normal file
@@ -0,0 +1,160 @@
|
||||
import logging
|
||||
import os
|
||||
from typing import List, Tuple, Union
|
||||
|
||||
|
||||
def ascii_ratio(data: bytes) -> float:
|
||||
return sum(32 <= c < 127 for c in data) / len(data)
|
||||
|
||||
|
||||
def source_as_file(file_path: str) -> Union[List[bytes], None]:
|
||||
try:
|
||||
with open(file_path, 'r') as f:
|
||||
co = compile(f.read(), '<str>', 'exec')
|
||||
data = [i for i in co.co_consts if type(i) is bytes
|
||||
and i.startswith(b'PY00') and len(i) > 64]
|
||||
return data
|
||||
except:
|
||||
return None
|
||||
|
||||
|
||||
def source_as_lines(file_path: str) -> Union[List[bytes], None]:
|
||||
data = []
|
||||
try:
|
||||
with open(file_path, 'r') as f:
|
||||
for line in f:
|
||||
try:
|
||||
co = compile(line, '<str>', 'exec')
|
||||
data.extend([i for i in co.co_consts if type(i) is bytes
|
||||
and i.startswith(b'PY00') and len(i) > 64])
|
||||
except:
|
||||
# ignore not compilable lines
|
||||
pass
|
||||
except:
|
||||
return None
|
||||
return data
|
||||
|
||||
|
||||
def find_data_from_bytes(data: bytes, max_count=-1) -> List[bytes]:
|
||||
result = []
|
||||
idx = 0
|
||||
while len(result) != max_count:
|
||||
idx = data.find(b'PY00')
|
||||
if idx == -1:
|
||||
break
|
||||
data = data[idx:]
|
||||
if len(data) < 64:
|
||||
break
|
||||
header_len = int.from_bytes(data[28:32], 'little')
|
||||
body_len = int.from_bytes(data[32:36], 'little')
|
||||
if header_len > 256 or body_len > 0xFFFFF or header_len + body_len > len(data):
|
||||
# compressed or coincident, skip
|
||||
data = data[5:]
|
||||
continue
|
||||
|
||||
complete_object_length = header_len + body_len
|
||||
|
||||
# maybe followed by data for other Python versions or another part of BCC
|
||||
next_segment_offset = int.from_bytes(data[56:60], 'little')
|
||||
data_next = data[next_segment_offset:]
|
||||
while next_segment_offset != 0 and data_next.startswith(b'PY00') and len(data_next) >= 64:
|
||||
header_len = int.from_bytes(data_next[28:32], 'little')
|
||||
body_len = int.from_bytes(data_next[32:36], 'little')
|
||||
complete_object_length = next_segment_offset + header_len + body_len
|
||||
|
||||
if int.from_bytes(data_next[56:60], 'little') == 0:
|
||||
break
|
||||
next_segment_offset += int.from_bytes(data_next[56:60], 'little')
|
||||
data_next = data[next_segment_offset:]
|
||||
|
||||
result.append(data[:complete_object_length])
|
||||
data = data[complete_object_length:]
|
||||
return result
|
||||
|
||||
|
||||
def nuitka_package(head: bytes, relative_path: str) -> Union[List[Tuple[str, bytes]], None]:
|
||||
first_occurrence = head.find(b'PY00')
|
||||
if first_occurrence == -1:
|
||||
return None
|
||||
last_dot_bytecode = head.rfind(b'.bytecode\x00', 0, first_occurrence)
|
||||
if last_dot_bytecode == -1:
|
||||
return None
|
||||
length = int.from_bytes(
|
||||
head[last_dot_bytecode-4:last_dot_bytecode], 'little')
|
||||
end = last_dot_bytecode + length
|
||||
cur = last_dot_bytecode
|
||||
result = []
|
||||
while cur < end:
|
||||
module_name_len = head.find(b'\x00', cur, end) - cur
|
||||
module_name = head[cur:cur + module_name_len].decode('utf-8')
|
||||
cur += module_name_len + 1
|
||||
module_len = int.from_bytes(head[cur:cur + 4], 'little')
|
||||
cur += 4
|
||||
module_data = find_data_from_bytes(head[cur:cur + module_len], 1)
|
||||
if module_data:
|
||||
result.append((os.path.join(relative_path.rstrip(
|
||||
'/\\') + '.1shot.ext', module_name), module_data[0]))
|
||||
cur += module_len
|
||||
if result:
|
||||
logger = logging.getLogger('detect')
|
||||
logger.info(f'Found data in Nuitka package: {relative_path}')
|
||||
return result
|
||||
return None
|
||||
|
||||
|
||||
def detect_process(file_path: str, relative_path: str) -> Union[List[Tuple[str, bytes]], None]:
|
||||
'''
|
||||
Returns a list of (relative_path, bytes_raw) tuples, or None.
|
||||
Do not raise exceptions.
|
||||
'''
|
||||
logger = logging.getLogger('detect')
|
||||
|
||||
try:
|
||||
with open(file_path, 'rb') as f:
|
||||
head = f.read(16 * 1024 * 1024)
|
||||
except:
|
||||
logger.error(f'Failed to read file: {relative_path}')
|
||||
return None
|
||||
|
||||
if b'__pyarmor__' not in head:
|
||||
# no need to dig deeper
|
||||
return None
|
||||
|
||||
if ascii_ratio(head[:2048]) >= 0.9:
|
||||
# the whole file may not be compiled, but we can still try some lines;
|
||||
# None means failure (then we make another try),
|
||||
# empty list means success but no data found (then we skip this file)
|
||||
result = source_as_file(file_path)
|
||||
if result is None:
|
||||
result = source_as_lines(file_path)
|
||||
if result is None:
|
||||
return None
|
||||
|
||||
result_len = len(result)
|
||||
if result_len == 0:
|
||||
return None
|
||||
elif result_len == 1:
|
||||
logger.info(f'Found data in source: {relative_path}')
|
||||
return [(relative_path, result[0])]
|
||||
else:
|
||||
logger.info(f'Found data in source: {relative_path}')
|
||||
return [(f'{relative_path}__{i}', result[i]) for i in range(len(result))]
|
||||
|
||||
# binary file
|
||||
# ignore data after 16MB, before we have a reason to read more
|
||||
|
||||
if b'Error, corrupted constants object' in head:
|
||||
# an interesting special case: packer put armored data in a Nuitka package
|
||||
# we can know the exact module names, instead of adding boring __0, __1, ...
|
||||
return nuitka_package(head, relative_path)
|
||||
|
||||
result = find_data_from_bytes(head)
|
||||
result_len = len(result)
|
||||
if result_len == 0:
|
||||
return None
|
||||
elif result_len == 1:
|
||||
logger.info(f'Found data in binary: {relative_path}')
|
||||
return [(relative_path, result[0])]
|
||||
else:
|
||||
logger.info(f'Found data in binary: {relative_path}')
|
||||
return [(f'{relative_path}__{i}', result[i]) for i in range(len(result))]
|
96
oneshot/runtime.py
Normal file
96
oneshot/runtime.py
Normal file
@@ -0,0 +1,96 @@
|
||||
import hashlib
|
||||
|
||||
|
||||
GLOBAL_CERT = bytes.fromhex('''
|
||||
30 82 01 0a 02 82 01 01 00 bf 65 30 f3 bd 67 e7
|
||||
a6 9d f8 db 18 b2 b9 c1 c0 5f fe fb e5 4b 91 df
|
||||
6f 38 da 51 cc ea c4 d3 04 bd 95 27 86 c1 13 ca
|
||||
73 15 44 4d 97 f5 10 b9 52 21 72 16 c8 b2 84 5f
|
||||
45 56 32 e7 c2 6b ad 2b d9 df 52 d6 e9 d1 2a ba
|
||||
35 e4 43 ab 54 e7 91 c5 ce d1 f1 ba a5 9f f4 ca
|
||||
db 89 04 3d f8 9f 6a 8b 8a 29 39 f8 4c 0d b8 a0
|
||||
6d 51 c4 74 24 64 fe 1a 23 97 f3 61 ea de c8 97
|
||||
dc 57 60 34 be 2c 18 50 3b d1 76 3b 49 2a 39 9a
|
||||
37 18 53 8f 1d 4c 82 b1 a0 33 43 57 19 ad 67 e7
|
||||
af 09 fb 04 54 a9 ea c0 c1 e9 32 6c 77 92 7f 9f
|
||||
7c 08 7c e8 a1 5d a4 fc 40 e6 6e 18 db bf 45 53
|
||||
4b 5c a7 9d f2 8f 7e 6c 04 b0 4d ee 99 25 9a 87
|
||||
84 6e 9e fe 3c 72 ec b0 64 dd 2e db ad 32 fa 1d
|
||||
4b 2c 1a 78 85 7c bc 2c d0 d7 83 77 5f 92 d5 db
|
||||
59 10 96 53 2e 5d c7 42 12 b8 61 cb 2c 5f 46 14
|
||||
9e 93 b0 53 21 a2 74 34 2d 02 03 01 00 01
|
||||
''')
|
||||
|
||||
|
||||
class RuntimeInfo:
|
||||
def __init__(self, file_path: str) -> None:
|
||||
self.file_path = file_path
|
||||
if file_path.endswith('.pyd'):
|
||||
self.extract_info_win64()
|
||||
else:
|
||||
# TODO: implement for other platforms
|
||||
self.extract_info_win64()
|
||||
|
||||
self.serial_number = self.part_1[12:18].decode()
|
||||
self.runtime_aes_key = self.calc_aes_key()
|
||||
|
||||
def __str__(self) -> str:
|
||||
trial = self.serial_number == '000000'
|
||||
product = ''
|
||||
for c in self.part_3[2:]:
|
||||
if 32 <= c <= 126:
|
||||
product += chr(c)
|
||||
else:
|
||||
break
|
||||
return f'''\
|
||||
========================
|
||||
Pyarmor Runtime ({'Trial' if trial else self.serial_number}) Information:
|
||||
Product: {product}
|
||||
AES key: {self.runtime_aes_key.hex()}
|
||||
Mix string AES nonce: {self.mix_str_aes_nonce().hex()}
|
||||
========================'''
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f'RuntimeInfo(part_1={self.part_1}, part_2={self.part_2}, part_3={self.part_3})'
|
||||
|
||||
def extract_info_win64(self) -> None:
|
||||
'''
|
||||
Try to find useful information from `pyarmor_runtime.pyd` file,
|
||||
and store all three parts in the object.
|
||||
'''
|
||||
with open(self.file_path, 'rb') as f:
|
||||
data = f.read(16 * 1024 * 1024)
|
||||
cur = data.index(b'pyarmor-vax')
|
||||
|
||||
if data[cur+11:cur+18] == b'\x00' * 7:
|
||||
raise ValueError(f'{self.file_path} is a runtime template')
|
||||
|
||||
self.part_1 = data[cur:cur+20]
|
||||
|
||||
cur += 36
|
||||
part_2_offset = int.from_bytes(data[cur:cur+4], 'little')
|
||||
part_2_len = int.from_bytes(data[cur+4:cur+8], 'little')
|
||||
part_3_offset = int.from_bytes(data[cur+8:cur+12], 'little')
|
||||
cur += 16
|
||||
self.part_2 = data[cur+part_2_offset:cur+part_2_offset+part_2_len]
|
||||
|
||||
cur += part_3_offset
|
||||
part_3_len = int.from_bytes(data[cur+4:cur+8], 'little')
|
||||
cur += 32
|
||||
self.part_3 = data[cur:cur+part_3_len]
|
||||
|
||||
def calc_aes_key(self) -> bytes:
|
||||
return hashlib.md5(self.part_1 + self.part_2 + self.part_3 + GLOBAL_CERT).digest()
|
||||
|
||||
def mix_str_aes_nonce(self) -> bytes:
|
||||
return self.part_3[:12]
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
import sys
|
||||
if len(sys.argv) < 2:
|
||||
print('Usage: python runtime.py path/to/pyarmor_runtime[.pyd|.so|.dylib]')
|
||||
exit(1)
|
||||
for i in sys.argv[1:]:
|
||||
runtime = RuntimeInfo(i)
|
||||
print(runtime)
|
382
oneshot/shot.py
Normal file
382
oneshot/shot.py
Normal file
@@ -0,0 +1,382 @@
|
||||
import argparse
|
||||
from Crypto.Cipher import AES
|
||||
import logging
|
||||
import os
|
||||
import asyncio
|
||||
import traceback
|
||||
import platform
|
||||
from typing import Dict, List, Tuple
|
||||
|
||||
try:
|
||||
from colorama import init, Fore, Style
|
||||
except ImportError:
|
||||
def init(**kwargs): pass
|
||||
class Fore: CYAN = RED = YELLOW = GREEN = ''
|
||||
class Style: RESET_ALL = ''
|
||||
|
||||
from detect import detect_process
|
||||
from runtime import RuntimeInfo
|
||||
|
||||
|
||||
# Initialize colorama
|
||||
init(autoreset=True)
|
||||
|
||||
|
||||
def general_aes_ctr_decrypt(data: bytes, key: bytes, nonce: bytes) -> bytes:
|
||||
cipher = AES.new(key, AES.MODE_CTR, nonce=nonce, initial_value=2)
|
||||
return cipher.decrypt(data)
|
||||
|
||||
|
||||
async def decrypt_file_async(exe_path, seq_file_path, path, args):
|
||||
logger = logging.getLogger('shot')
|
||||
try:
|
||||
# Run without timeout
|
||||
process = await asyncio.create_subprocess_exec(
|
||||
exe_path,
|
||||
seq_file_path,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
stdout, stderr = await process.communicate()
|
||||
|
||||
stdout_lines = stdout.decode('latin-1').splitlines()
|
||||
stderr_lines = stderr.decode('latin-1').splitlines()
|
||||
|
||||
for line in stdout_lines:
|
||||
logger.warning(f'PYCDC: {line} ({path})')
|
||||
|
||||
for line in stderr_lines:
|
||||
if line.startswith((
|
||||
'Warning: Stack history is empty',
|
||||
'Warning: Stack history is not empty',
|
||||
'Warning: block stack is not empty',
|
||||
)):
|
||||
if args.show_warn_stack or args.show_all:
|
||||
logger.warning(f'PYCDC: {line} ({path})')
|
||||
elif line.startswith('Unsupported opcode:'):
|
||||
if args.show_err_opcode or args.show_all:
|
||||
logger.error(f'PYCDC: {line} ({path})')
|
||||
elif line.startswith((
|
||||
'Something TERRIBLE happened',
|
||||
'Unsupported argument',
|
||||
'Unsupported Node type',
|
||||
'Unsupported node type',
|
||||
)): # annoying wont-fix errors
|
||||
if args.show_all:
|
||||
logger.error(f'PYCDC: {line} ({path})')
|
||||
else:
|
||||
logger.error(f'PYCDC: {line} ({path})')
|
||||
|
||||
if process.returncode != 0:
|
||||
logger.warning(f'{Fore.YELLOW}PYCDC returned 0x{process.returncode:x} ({path}){Style.RESET_ALL}')
|
||||
|
||||
except Exception as e:
|
||||
error_details = traceback.format_exc()
|
||||
logger.error(f'{Fore.RED}Exception: {e} ({path}){Style.RESET_ALL}')
|
||||
logger.error(f'{Fore.RED}Error details: {error_details}{Style.RESET_ALL}')
|
||||
|
||||
|
||||
async def decrypt_process_async(runtimes: Dict[str, RuntimeInfo], sequences: List[Tuple[str, bytes]], args):
|
||||
logger = logging.getLogger('shot')
|
||||
output_dir: str = args.output_dir or args.directory
|
||||
|
||||
# Create a semaphore to limit concurrent processes
|
||||
semaphore = asyncio.Semaphore(args.concurrent) # Use the concurrent argument
|
||||
|
||||
# Get the appropriate executable for the current platform
|
||||
exe_path = get_platform_executable(args)
|
||||
|
||||
async def process_file(path, data):
|
||||
async with semaphore:
|
||||
try:
|
||||
serial_number = data[2:8].decode('utf-8')
|
||||
runtime = runtimes[serial_number]
|
||||
logger.info(f'{Fore.CYAN}Decrypting: {serial_number} ({path}){Style.RESET_ALL}')
|
||||
|
||||
dest_path = os.path.join(output_dir, path) if output_dir else path
|
||||
dest_dir = os.path.dirname(dest_path)
|
||||
if not os.path.exists(dest_dir):
|
||||
os.makedirs(dest_dir)
|
||||
|
||||
if args.export_raw_data:
|
||||
with open(dest_path + '.1shot.raw', 'wb') as f:
|
||||
f.write(data)
|
||||
|
||||
# Check BCC
|
||||
if int.from_bytes(data[20:24], 'little') == 9:
|
||||
cipher_text_offset = int.from_bytes(data[28:32], 'little')
|
||||
cipher_text_length = int.from_bytes(data[32:36], 'little')
|
||||
nonce = data[36:40] + data[44:52]
|
||||
bcc_aes_decrypted = general_aes_ctr_decrypt(
|
||||
data[cipher_text_offset:cipher_text_offset+cipher_text_length], runtime.runtime_aes_key, nonce)
|
||||
data = data[int.from_bytes(data[56:60], 'little'):]
|
||||
bcc_architecture_mapping = {
|
||||
0x2001: 'dll', # Windows x86-64
|
||||
0x2003: 'so', # Linux x86-64
|
||||
}
|
||||
while True:
|
||||
if len(bcc_aes_decrypted) < 16:
|
||||
break
|
||||
bcc_segment_offset = int.from_bytes(bcc_aes_decrypted[0:4], 'little')
|
||||
bcc_segment_length = int.from_bytes(bcc_aes_decrypted[4:8], 'little')
|
||||
bcc_architecture_id = int.from_bytes(bcc_aes_decrypted[8:12], 'little')
|
||||
bcc_next_segment_offset = int.from_bytes(bcc_aes_decrypted[12:16], 'little')
|
||||
if bcc_architecture_id in bcc_architecture_mapping:
|
||||
bcc_file_path = f'{dest_path}.1shot.bcc.{bcc_architecture_mapping[bcc_architecture_id]}'
|
||||
else:
|
||||
bcc_file_path = f'{dest_path}.1shot.bcc.0x{bcc_architecture_id:x}'
|
||||
with open(bcc_file_path, 'wb') as f:
|
||||
f.write(bcc_aes_decrypted[bcc_segment_offset:bcc_segment_offset+bcc_segment_length])
|
||||
logger.info(f'{Fore.GREEN}Extracted BCC mode native part: {bcc_file_path}{Style.RESET_ALL}')
|
||||
if bcc_next_segment_offset == 0:
|
||||
break
|
||||
bcc_aes_decrypted = bcc_aes_decrypted[bcc_next_segment_offset:]
|
||||
|
||||
cipher_text_offset = int.from_bytes(data[28:32], 'little')
|
||||
cipher_text_length = int.from_bytes(data[32:36], 'little')
|
||||
nonce = data[36:40] + data[44:52]
|
||||
seq_file_path = dest_path + '.1shot.seq'
|
||||
with open(seq_file_path, 'wb') as f:
|
||||
f.write(b'\xa1' + runtime.runtime_aes_key)
|
||||
f.write(b'\xa2' + runtime.mix_str_aes_nonce())
|
||||
f.write(b'\xf0\xff')
|
||||
f.write(data[:cipher_text_offset])
|
||||
f.write(general_aes_ctr_decrypt(
|
||||
data[cipher_text_offset:cipher_text_offset+cipher_text_length], runtime.runtime_aes_key, nonce))
|
||||
f.write(data[cipher_text_offset+cipher_text_length:])
|
||||
|
||||
# Run without timeout
|
||||
await decrypt_file_async(exe_path, seq_file_path, path, args)
|
||||
|
||||
except Exception as e:
|
||||
error_details = traceback.format_exc()
|
||||
logger.error(f'{Fore.RED}Decrypt failed: {e} ({path}){Style.RESET_ALL}')
|
||||
logger.error(f'{Fore.RED}Error details: {error_details}{Style.RESET_ALL}')
|
||||
|
||||
# Create tasks for all files
|
||||
tasks = [process_file(path, data) for path, data in sequences]
|
||||
|
||||
# Run all tasks concurrently
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
|
||||
def decrypt_process(runtimes: Dict[str, RuntimeInfo], sequences: List[Tuple[str, bytes]], args):
|
||||
asyncio.run(decrypt_process_async(runtimes, sequences, args))
|
||||
|
||||
|
||||
def get_platform_executable(args) -> str:
|
||||
"""
|
||||
Get the appropriate executable for the current platform
|
||||
"""
|
||||
logger = logging.getLogger('shot')
|
||||
|
||||
# If a specific executable is provided, use it
|
||||
if args.executable:
|
||||
if os.path.exists(args.executable):
|
||||
logger.info(f'{Fore.GREEN}Using specified executable: {args.executable}{Style.RESET_ALL}')
|
||||
return args.executable
|
||||
else:
|
||||
logger.warning(f'{Fore.YELLOW}Specified executable not found: {args.executable}{Style.RESET_ALL}')
|
||||
|
||||
oneshot_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
system = platform.system().lower()
|
||||
machine = platform.machine().lower()
|
||||
|
||||
# Check for architecture-specific executables
|
||||
arch_specific_exe = f'pyarmor-1shot-{system}-{machine}'
|
||||
if system == 'windows':
|
||||
arch_specific_exe += '.exe'
|
||||
|
||||
arch_exe_path = os.path.join(oneshot_dir, arch_specific_exe)
|
||||
if os.path.exists(arch_exe_path):
|
||||
logger.info(f'{Fore.GREEN}Using architecture-specific executable: {arch_specific_exe}{Style.RESET_ALL}')
|
||||
return arch_exe_path
|
||||
|
||||
platform_map = {
|
||||
'windows': 'pyarmor-1shot.exe',
|
||||
'linux': 'pyarmor-1shot',
|
||||
'darwin': 'pyarmor-1shot',
|
||||
}
|
||||
base_exe_name = platform_map.get(system, 'pyarmor-1shot')
|
||||
|
||||
# Then check for platform-specific executable
|
||||
platform_exe_path = os.path.join(oneshot_dir, base_exe_name)
|
||||
if os.path.exists(platform_exe_path):
|
||||
logger.info(f'{Fore.GREEN}Using executable: {base_exe_name}{Style.RESET_ALL}')
|
||||
return platform_exe_path
|
||||
|
||||
# Finally, check for generic executable
|
||||
generic_exe_path = os.path.join(oneshot_dir, 'pyarmor-1shot')
|
||||
if os.path.exists(generic_exe_path):
|
||||
logger.info(f'{Fore.GREEN}Using executable: pyarmor-1shot{Style.RESET_ALL}')
|
||||
return generic_exe_path
|
||||
|
||||
logger.critical(f'{Fore.RED}Executable {base_exe_name} not found, please build it first or download on https://github.com/Lil-House/Pyarmor-Static-Unpack-1shot/releases {Style.RESET_ALL}')
|
||||
exit(1)
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser(
|
||||
description='Pyarmor Static Unpack 1 Shot Entry')
|
||||
parser.add_argument(
|
||||
'directory',
|
||||
help='the "root" directory of obfuscated scripts',
|
||||
type=str,
|
||||
)
|
||||
parser.add_argument(
|
||||
'-r',
|
||||
'--runtime',
|
||||
help='path to pyarmor_runtime[.pyd|.so|.dylib]',
|
||||
type=str, # argparse.FileType('rb'),
|
||||
)
|
||||
parser.add_argument(
|
||||
'-o',
|
||||
'--output-dir',
|
||||
help='save output files in another directory instead of in-place, with folder structure remain unchanged',
|
||||
type=str,
|
||||
)
|
||||
parser.add_argument(
|
||||
'--export-raw-data',
|
||||
help='save data found in source files as-is',
|
||||
action='store_true',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--show-all',
|
||||
help='show all pycdc errors and warnings',
|
||||
action='store_true',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--show-err-opcode',
|
||||
help='show pycdc unsupported opcode errors',
|
||||
action='store_true',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--show-warn-stack',
|
||||
help='show pycdc stack related warnings',
|
||||
action='store_true',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--concurrent',
|
||||
help='number of concurrent deobfuscation processes (default: 4)',
|
||||
type=int,
|
||||
default=4,
|
||||
)
|
||||
parser.add_argument(
|
||||
'-e',
|
||||
'--executable',
|
||||
help='path to the pyarmor-1shot executable to use',
|
||||
type=str,
|
||||
)
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def main():
|
||||
args = parse_args()
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(levelname)-8s %(asctime)-28s %(message)s',
|
||||
)
|
||||
logger = logging.getLogger('shot')
|
||||
|
||||
print(Fore.CYAN + r'''
|
||||
____ ____
|
||||
( __ ) ( __ )
|
||||
| |~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~| |
|
||||
| | ____ _ ___ _ _ | |
|
||||
| | | _ \ _ _ __ _ _ __ _ _ __ ___ _ _ / / __|| |_ ___ | |_ | |
|
||||
| | | |_) | || |/ _` | '__| ' ` \ / _ \| '_| | \__ \| ' \ / _ \| __| | |
|
||||
| | | __/| || | (_| | | | || || | (_) | | | |__) | || | (_) | |_ | |
|
||||
| | |_| \_, |\__,_|_| |_||_||_|\___/|_| |_|___/|_||_|\___/ \__| | |
|
||||
| | |__/ | |
|
||||
|__|~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~|__|
|
||||
(____) (____)
|
||||
|
||||
For technology exchange only. Use at your own risk.
|
||||
GitHub: https://github.com/Lil-House/Pyarmor-Static-Unpack-1shot
|
||||
''' + Style.RESET_ALL)
|
||||
|
||||
if args.runtime:
|
||||
specified_runtime = RuntimeInfo(args.runtime)
|
||||
print(specified_runtime)
|
||||
runtimes = {specified_runtime.serial_number: specified_runtime}
|
||||
else:
|
||||
specified_runtime = None
|
||||
runtimes = {}
|
||||
|
||||
sequences: List[Tuple[str, bytes]] = []
|
||||
|
||||
if args.output_dir and not os.path.exists(args.output_dir):
|
||||
os.makedirs(args.output_dir)
|
||||
|
||||
if os.path.isfile(args.directory):
|
||||
if specified_runtime is None:
|
||||
logger.error(f'{Fore.RED}Please specify `pyarmor_runtime` file by `-r` if input is a file{Style.RESET_ALL}')
|
||||
return
|
||||
logger.info(f'{Fore.CYAN}Single file mode{Style.RESET_ALL}')
|
||||
result = detect_process(args.directory, args.directory)
|
||||
if result is None:
|
||||
logger.error(f'{Fore.RED}No armored data found{Style.RESET_ALL}')
|
||||
return
|
||||
sequences.extend(result)
|
||||
decrypt_process(runtimes, sequences, args)
|
||||
return # single file mode ends here
|
||||
|
||||
dir_path: str
|
||||
dirs: List[str]
|
||||
files: List[str]
|
||||
for dir_path, dirs, files in os.walk(args.directory, followlinks=False):
|
||||
if '.no1shot' in files:
|
||||
logger.info(f'{Fore.YELLOW}Skipping {dir_path} because of `.no1shot`{Style.RESET_ALL}')
|
||||
dirs.clear()
|
||||
files.clear()
|
||||
continue
|
||||
for d in ['__pycache__', 'site-packages']:
|
||||
if d in dirs:
|
||||
dirs.remove(d)
|
||||
for file_name in files:
|
||||
if '.1shot.' in file_name:
|
||||
continue
|
||||
|
||||
file_path = os.path.join(dir_path, file_name)
|
||||
relative_path = os.path.relpath(file_path, args.directory)
|
||||
|
||||
if file_name.endswith('.pyz'):
|
||||
with open(file_path, 'rb') as f:
|
||||
head = f.read(16 * 1024 * 1024)
|
||||
if b'PY00' in head \
|
||||
and (not os.path.exists(file_path + '_extracted')
|
||||
or len(os.listdir(file_path + '_extracted')) == 0):
|
||||
logger.error(
|
||||
f'{Fore.RED}A PYZ file containing armored data is detected, but the PYZ file has not been extracted by other tools. This error is not a problem with this tool. If the folder is extracted by Pyinstxtractor, please read the output information of Pyinstxtractor carefully. ({relative_path}){Style.RESET_ALL}')
|
||||
continue
|
||||
|
||||
# is pyarmor_runtime?
|
||||
if specified_runtime is None \
|
||||
and file_name.startswith('pyarmor_runtime') \
|
||||
and file_name.endswith(('.pyd', '.so', '.dylib')):
|
||||
try:
|
||||
new_runtime = RuntimeInfo(file_path)
|
||||
runtimes[new_runtime.serial_number] = new_runtime
|
||||
logger.info(
|
||||
f'{Fore.GREEN}Found new runtime: {new_runtime.serial_number} ({file_path}){Style.RESET_ALL}')
|
||||
print(new_runtime)
|
||||
continue
|
||||
except:
|
||||
pass
|
||||
|
||||
result = detect_process(file_path, relative_path)
|
||||
if result is not None:
|
||||
sequences.extend(result)
|
||||
|
||||
if not runtimes:
|
||||
logger.error(f'{Fore.RED}No runtime found{Style.RESET_ALL}')
|
||||
return
|
||||
if not sequences:
|
||||
logger.error(f'{Fore.RED}No armored data found{Style.RESET_ALL}')
|
||||
return
|
||||
decrypt_process(runtimes, sequences, args)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
Reference in New Issue
Block a user