refactor: add path handling (gh-29), detect encodings, format py scripts

This commit is contained in:
2025-10-29 18:50:31 +08:00
parent 8c46b62352
commit 10a3c24fac
3 changed files with 392 additions and 241 deletions

View File

@@ -9,10 +9,13 @@ def ascii_ratio(data: bytes) -> float:
def source_as_file(file_path: str) -> Union[List[bytes], None]:
try:
with open(file_path, 'r') as f:
co = compile(f.read(), '<str>', 'exec')
data = [i for i in co.co_consts if type(i) is bytes
and i.startswith(b'PY00') and len(i) > 64]
with open(file_path, "r") as f:
co = compile(f.read(), "<str>", "exec")
data = [
i
for i in co.co_consts
if type(i) is bytes and i.startswith(b"PY00") and len(i) > 64
]
return data
except:
return None
@@ -21,12 +24,19 @@ def source_as_file(file_path: str) -> Union[List[bytes], None]:
def source_as_lines(file_path: str) -> Union[List[bytes], None]:
data = []
try:
with open(file_path, 'r') as f:
with open(file_path, "r") as f:
for line in f:
try:
co = compile(line, '<str>', 'exec')
data.extend([i for i in co.co_consts if type(i) is bytes
and i.startswith(b'PY00') and len(i) > 64])
co = compile(line, "<str>", "exec")
data.extend(
[
i
for i in co.co_consts
if type(i) is bytes
and i.startswith(b"PY00")
and len(i) > 64
]
)
except:
# ignore not compilable lines
pass
@@ -39,14 +49,14 @@ def find_data_from_bytes(data: bytes, max_count=-1) -> List[bytes]:
result = []
idx = 0
while len(result) != max_count:
idx = data.find(b'PY00')
idx = data.find(b"PY00")
if idx == -1:
break
data = data[idx:]
if len(data) < 64:
break
header_len = int.from_bytes(data[28:32], 'little')
body_len = int.from_bytes(data[32:36], 'little')
header_len = int.from_bytes(data[28:32], "little")
body_len = int.from_bytes(data[32:36], "little")
if header_len > 256 or body_len > 0xFFFFF or header_len + body_len > len(data):
# compressed or coincident, skip
data = data[5:]
@@ -55,16 +65,20 @@ def find_data_from_bytes(data: bytes, max_count=-1) -> List[bytes]:
complete_object_length = header_len + body_len
# maybe followed by data for other Python versions or another part of BCC
next_segment_offset = int.from_bytes(data[56:60], 'little')
next_segment_offset = int.from_bytes(data[56:60], "little")
data_next = data[next_segment_offset:]
while next_segment_offset != 0 and data_next.startswith(b'PY00') and len(data_next) >= 64:
header_len = int.from_bytes(data_next[28:32], 'little')
body_len = int.from_bytes(data_next[32:36], 'little')
while (
next_segment_offset != 0
and data_next.startswith(b"PY00")
and len(data_next) >= 64
):
header_len = int.from_bytes(data_next[28:32], "little")
body_len = int.from_bytes(data_next[32:36], "little")
complete_object_length = next_segment_offset + header_len + body_len
if int.from_bytes(data_next[56:60], 'little') == 0:
if int.from_bytes(data_next[56:60], "little") == 0:
break
next_segment_offset += int.from_bytes(data_next[56:60], 'little')
next_segment_offset += int.from_bytes(data_next[56:60], "little")
data_next = data[next_segment_offset:]
result.append(data[:complete_object_length])
@@ -72,51 +86,60 @@ def find_data_from_bytes(data: bytes, max_count=-1) -> List[bytes]:
return result
def nuitka_package(head: bytes, relative_path: str) -> Union[List[Tuple[str, bytes]], None]:
first_occurrence = head.find(b'PY00')
def nuitka_package(
head: bytes, relative_path: str
) -> Union[List[Tuple[str, bytes]], None]:
first_occurrence = head.find(b"PY00")
if first_occurrence == -1:
return None
last_dot_bytecode = head.rfind(b'.bytecode\x00', 0, first_occurrence)
last_dot_bytecode = head.rfind(b".bytecode\x00", 0, first_occurrence)
if last_dot_bytecode == -1:
return None
length = int.from_bytes(
head[last_dot_bytecode-4:last_dot_bytecode], 'little')
length = int.from_bytes(head[last_dot_bytecode - 4 : last_dot_bytecode], "little")
end = last_dot_bytecode + length
cur = last_dot_bytecode
result = []
while cur < end:
module_name_len = head.find(b'\x00', cur, end) - cur
module_name = head[cur:cur + module_name_len].decode('utf-8')
module_name_len = head.find(b"\x00", cur, end) - cur
module_name = head[cur : cur + module_name_len].decode("utf-8")
cur += module_name_len + 1
module_len = int.from_bytes(head[cur:cur + 4], 'little')
module_len = int.from_bytes(head[cur : cur + 4], "little")
cur += 4
module_data = find_data_from_bytes(head[cur:cur + module_len], 1)
module_data = find_data_from_bytes(head[cur : cur + module_len], 1)
if module_data:
result.append((os.path.join(relative_path.rstrip(
'/\\') + '.1shot.ext', module_name), module_data[0]))
result.append(
(
os.path.join(
relative_path.rstrip("/\\") + ".1shot.ext", module_name
),
module_data[0],
)
)
cur += module_len
if result:
logger = logging.getLogger('detect')
logger.info(f'Found data in Nuitka package: {relative_path}')
logger = logging.getLogger("detect")
logger.info(f"Found data in Nuitka package: {relative_path}")
return result
return None
def detect_process(file_path: str, relative_path: str) -> Union[List[Tuple[str, bytes]], None]:
'''
def detect_process(
file_path: str, relative_path: str
) -> Union[List[Tuple[str, bytes]], None]:
"""
Returns a list of (relative_path, bytes_raw) tuples, or None.
Do not raise exceptions.
'''
logger = logging.getLogger('detect')
"""
logger = logging.getLogger("detect")
try:
with open(file_path, 'rb') as f:
with open(file_path, "rb") as f:
head = f.read(16 * 1024 * 1024)
except:
logger.error(f'Failed to read file: {relative_path}')
logger.error(f"Failed to read file: {relative_path}")
return None
if b'__pyarmor__' not in head:
if b"__pyarmor__" not in head:
# no need to dig deeper
return None
@@ -134,16 +157,16 @@ def detect_process(file_path: str, relative_path: str) -> Union[List[Tuple[str,
if result_len == 0:
return None
elif result_len == 1:
logger.info(f'Found data in source: {relative_path}')
logger.info(f"Found data in source: {relative_path}")
return [(relative_path, result[0])]
else:
logger.info(f'Found data in source: {relative_path}')
return [(f'{relative_path}__{i}', result[i]) for i in range(len(result))]
logger.info(f"Found data in source: {relative_path}")
return [(f"{relative_path}__{i}", result[i]) for i in range(len(result))]
# binary file
# ignore data after 16MB, before we have a reason to read more
if b'Error, corrupted constants object' in head:
if b"Error, corrupted constants object" in head:
# an interesting special case: packer put armored data in a Nuitka package
# we can know the exact module names, instead of adding boring __0, __1, ...
return nuitka_package(head, relative_path)
@@ -153,8 +176,8 @@ def detect_process(file_path: str, relative_path: str) -> Union[List[Tuple[str,
if result_len == 0:
return None
elif result_len == 1:
logger.info(f'Found data in binary: {relative_path}')
logger.info(f"Found data in binary: {relative_path}")
return [(relative_path, result[0])]
else:
logger.info(f'Found data in binary: {relative_path}')
return [(f'{relative_path}__{i}', result[i]) for i in range(len(result))]
logger.info(f"Found data in binary: {relative_path}")
return [(f"{relative_path}__{i}", result[i]) for i in range(len(result))]

View File

@@ -1,7 +1,7 @@
import hashlib
GLOBAL_CERT = bytes.fromhex('''
GLOBAL_CERT = bytes.fromhex("""
30 82 01 0a 02 82 01 01 00 bf 65 30 f3 bd 67 e7
a6 9d f8 db 18 b2 b9 c1 c0 5f fe fb e5 4b 91 df
6f 38 da 51 cc ea c4 d3 04 bd 95 27 86 c1 13 ca
@@ -19,13 +19,13 @@ af 09 fb 04 54 a9 ea c0 c1 e9 32 6c 77 92 7f 9f
4b 2c 1a 78 85 7c bc 2c d0 d7 83 77 5f 92 d5 db
59 10 96 53 2e 5d c7 42 12 b8 61 cb 2c 5f 46 14
9e 93 b0 53 21 a2 74 34 2d 02 03 01 00 01
''')
""")
class RuntimeInfo:
def __init__(self, file_path: str) -> None:
self.file_path = file_path
if file_path.endswith('.pyd'):
if file_path.endswith(".pyd"):
self.extract_info_win64()
else:
# TODO: implement for other platforms
@@ -35,62 +35,64 @@ class RuntimeInfo:
self.runtime_aes_key = self.calc_aes_key()
def __str__(self) -> str:
trial = self.serial_number == '000000'
product = ''
trial = self.serial_number == "000000"
product = ""
for c in self.part_3[2:]:
if 32 <= c <= 126:
product += chr(c)
else:
break
return f'''\
return f"""\
========================
Pyarmor Runtime ({'Trial' if trial else self.serial_number}) Information:
Pyarmor Runtime ({"Trial" if trial else self.serial_number}) Information:
Product: {product}
AES key: {self.runtime_aes_key.hex()}
Mix string AES nonce: {self.mix_str_aes_nonce().hex()}
========================'''
========================"""
def __repr__(self) -> str:
return f'RuntimeInfo(part_1={self.part_1}, part_2={self.part_2}, part_3={self.part_3})'
return f"RuntimeInfo(part_1={self.part_1}, part_2={self.part_2}, part_3={self.part_3})"
def extract_info_win64(self) -> None:
'''
"""
Try to find useful information from `pyarmor_runtime.pyd` file,
and store all three parts in the object.
'''
with open(self.file_path, 'rb') as f:
"""
with open(self.file_path, "rb") as f:
data = f.read(16 * 1024 * 1024)
cur = data.index(b'pyarmor-vax')
cur = data.index(b"pyarmor-vax")
if data[cur+11:cur+18] == b'\x00' * 7:
raise ValueError(f'{self.file_path} is a runtime template')
if data[cur + 11 : cur + 18] == b"\x00" * 7:
raise ValueError(f"{self.file_path} is a runtime template")
self.part_1 = data[cur:cur+20]
self.part_1 = data[cur : cur + 20]
cur += 36
part_2_offset = int.from_bytes(data[cur:cur+4], 'little')
part_2_len = int.from_bytes(data[cur+4:cur+8], 'little')
part_3_offset = int.from_bytes(data[cur+8:cur+12], 'little')
part_2_offset = int.from_bytes(data[cur : cur + 4], "little")
part_2_len = int.from_bytes(data[cur + 4 : cur + 8], "little")
part_3_offset = int.from_bytes(data[cur + 8 : cur + 12], "little")
cur += 16
self.part_2 = data[cur+part_2_offset:cur+part_2_offset+part_2_len]
self.part_2 = data[cur + part_2_offset : cur + part_2_offset + part_2_len]
cur += part_3_offset
part_3_len = int.from_bytes(data[cur+4:cur+8], 'little')
part_3_len = int.from_bytes(data[cur + 4 : cur + 8], "little")
cur += 32
self.part_3 = data[cur:cur+part_3_len]
self.part_3 = data[cur : cur + part_3_len]
def calc_aes_key(self) -> bytes:
return hashlib.md5(self.part_1 + self.part_2 + self.part_3 + GLOBAL_CERT).digest()
return hashlib.md5(
self.part_1 + self.part_2 + self.part_3 + GLOBAL_CERT
).digest()
def mix_str_aes_nonce(self) -> bytes:
return self.part_3[:12]
@classmethod
def default(cls) -> 'RuntimeInfo':
def default(cls) -> "RuntimeInfo":
instance = cls.__new__(cls)
instance.file_path = '<default>'
instance.part_1 = b'pyarmor-vax-000000\x00\x00'
instance.part_2 = bytes.fromhex('''
instance.file_path = "<default>"
instance.part_1 = b"pyarmor-vax-000000\x00\x00"
instance.part_2 = bytes.fromhex("""
30 81 89 02 81 81 00 A8 ED 64 F4 83 49 13 FC 0F
86 6F 00 5A 8F E4 91 AA ED 1C EA D4 BB 4C 3F 7C
24 21 01 A8 D0 7D 93 F4 BF E7 FB 8C 06 57 88 6A
@@ -100,23 +102,24 @@ class RuntimeInfo:
BA 52 C5 B6 40 F6 AD AB BC D5 CF 5B 40 CB 8D 13
C4 28 B8 90 93 C4 76 01 09 8E 05 1E 61 FA 90 4C
BF 67 D4 A7 D5 82 C1 02 03 01 00 01
''')
instance.part_3 = bytes.fromhex('''
""")
instance.part_3 = bytes.fromhex("""
69 2E 6E 6F 6E 2D 70 72 6F 66 69 74 73 E7 5A 41
9B DC 77 53 CA 1D E7 04 EB EF DA C9 A3 6C 0F 7B
00 00 00 01 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00
''')
instance.serial_number = '000000'
""")
instance.serial_number = "000000"
instance.runtime_aes_key = instance.calc_aes_key()
return instance
if __name__ == '__main__':
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print('Usage: python runtime.py path/to/pyarmor_runtime[.pyd|.so|.dylib]')
print("Usage: python runtime.py path/to/pyarmor_runtime[.pyd|.so|.dylib]")
exit(1)
for i in sys.argv[1:]:
runtime = RuntimeInfo(i)

View File

@@ -5,14 +5,22 @@ import os
import asyncio
import traceback
import platform
import locale
from typing import Dict, List, Tuple
try:
from colorama import init, Fore, Style
from colorama import init, Fore, Style # type: ignore
except ImportError:
def init(**kwargs): pass
class Fore: CYAN = RED = YELLOW = GREEN = ''
class Style: RESET_ALL = ''
def init(**kwargs):
pass
class Fore:
CYAN = RED = YELLOW = GREEN = ""
class Style:
RESET_ALL = ""
from detect import detect_process
from runtime import RuntimeInfo
@@ -27,10 +35,53 @@ def general_aes_ctr_decrypt(data: bytes, key: bytes, nonce: bytes) -> bytes:
return cipher.decrypt(data)
async def decrypt_file_async(exe_path, seq_file_path, path, args):
logger = logging.getLogger('shot')
def decode_output(data: bytes) -> str:
if not data:
return ""
# 1) try chardet if available to guess encoding
try:
import chardet # type: ignore
res = chardet.detect(data)
enc = res.get("encoding")
if enc:
return data.decode(enc, errors="replace")
except Exception:
pass
# 2) try common encodings in a reasonable order
attempts = [
"utf-8",
"utf-8-sig",
locale.getpreferredencoding(False) or None,
"cp936",
"latin-1",
]
for enc in attempts:
if not enc:
continue
try:
return data.decode(enc)
except Exception:
continue
try:
return data.decode("latin-1", errors="replace")
except Exception:
return ""
async def run_pycdc_async(
exe_path: str,
seq_file_path: str,
path_for_log: str,
show_all: bool = False,
show_err_opcode: bool = False,
show_warn_stack: bool = False,
):
logger = logging.getLogger("shot")
try:
# Run without timeout
process = await asyncio.create_subprocess_exec(
exe_path,
seq_file_path,
@@ -38,143 +89,196 @@ async def decrypt_file_async(exe_path, seq_file_path, path, args):
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
stdout_lines = stdout.decode('latin-1').splitlines()
stderr_lines = stderr.decode('latin-1').splitlines()
stdout_lines = decode_output(stdout).splitlines()
stderr_lines = decode_output(stderr).splitlines()
for line in stdout_lines:
logger.warning(f'PYCDC: {line} ({path})')
logger.warning(f"PYCDC: {line} ({path_for_log})")
for line in stderr_lines:
if line.startswith((
'Warning: Stack history is empty',
'Warning: Stack history is not empty',
'Warning: block stack is not empty',
)):
if args.show_warn_stack or args.show_all:
logger.warning(f'PYCDC: {line} ({path})')
elif line.startswith('Unsupported opcode:'):
if args.show_err_opcode or args.show_all:
logger.error(f'PYCDC: {line} ({path})')
elif line.startswith((
'Something TERRIBLE happened',
'Unsupported argument',
'Unsupported Node type',
'Unsupported node type',
)): # annoying wont-fix errors
if args.show_all:
logger.error(f'PYCDC: {line} ({path})')
if line.startswith(
(
"Warning: Stack history is empty",
"Warning: Stack history is not empty",
"Warning: block stack is not empty",
)
):
if show_warn_stack or show_all:
logger.warning(f"PYCDC: {line} ({path_for_log})")
elif line.startswith("Unsupported opcode:"):
if show_err_opcode or show_all:
logger.error(f"PYCDC: {line} ({path_for_log})")
elif line.startswith(
(
"Something TERRIBLE happened",
"Unsupported argument",
"Unsupported Node type",
"Unsupported node type",
)
): # annoying wont-fix errors
if show_all:
logger.error(f"PYCDC: {line} ({path_for_log})")
else:
logger.error(f'PYCDC: {line} ({path})')
logger.error(f"PYCDC: {line} ({path_for_log})")
if process.returncode != 0:
logger.warning(f'{Fore.YELLOW}PYCDC returned 0x{process.returncode:x} ({path}){Style.RESET_ALL}')
logger.warning(
f"{Fore.YELLOW}PYCDC returned 0x{process.returncode:x} ({path_for_log}){Style.RESET_ALL}"
)
except Exception as e:
error_details = traceback.format_exc()
logger.error(f'{Fore.RED}Exception: {e} ({path}){Style.RESET_ALL}')
logger.error(f'{Fore.RED}Error details: {error_details}{Style.RESET_ALL}')
logger.error(f"{Fore.RED}Exception: {e} ({path_for_log}){Style.RESET_ALL}")
logger.error(f"{Fore.RED}Error details: {error_details}{Style.RESET_ALL}")
async def decrypt_process_async(runtimes: Dict[str, RuntimeInfo], sequences: List[Tuple[str, bytes]], args):
logger = logging.getLogger('shot')
async def decrypt_process_async(
runtimes: Dict[str, RuntimeInfo], sequences: List[Tuple[str, bytes]], args
):
logger = logging.getLogger("shot")
output_dir: str = args.output_dir or args.directory
# Create a semaphore to limit concurrent processes
semaphore = asyncio.Semaphore(args.concurrent) # Use the concurrent argument
# Get the appropriate executable for the current platform
exe_path = get_platform_executable(args)
exe_path = get_platform_executable(args.executable)
semaphore = asyncio.Semaphore(args.concurrent)
async def process_file(path, data):
async def process_file(relative_path, data):
async with semaphore:
try:
serial_number = data[2:8].decode('utf-8')
serial_number = data[2:8].decode("utf-8")
runtime = runtimes[serial_number]
logger.info(f'{Fore.CYAN}Decrypting: {serial_number} ({path}){Style.RESET_ALL}')
logger.info(
f"{Fore.CYAN}Decrypting: {serial_number} ({relative_path}){Style.RESET_ALL}"
)
dest_path = os.path.join(output_dir, path) if output_dir else path
dest_path = (
os.path.join(output_dir, relative_path)
if output_dir
else os.path.abspath(relative_path) # resolve with working dir
) # abs or rel, must has a dirname, must not ends with slash
dest_dir = os.path.dirname(dest_path)
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
if args.export_raw_data:
with open(dest_path + '.1shot.raw', 'wb') as f:
with open(dest_path + ".1shot.raw", "wb") as f:
f.write(data)
# Check BCC
if int.from_bytes(data[20:24], 'little') == 9:
cipher_text_offset = int.from_bytes(data[28:32], 'little')
cipher_text_length = int.from_bytes(data[32:36], 'little')
# Check BCC; mutates "data"
if int.from_bytes(data[20:24], "little") == 9:
cipher_text_offset = int.from_bytes(data[28:32], "little")
cipher_text_length = int.from_bytes(data[32:36], "little")
nonce = data[36:40] + data[44:52]
bcc_aes_decrypted = general_aes_ctr_decrypt(
data[cipher_text_offset:cipher_text_offset+cipher_text_length], runtime.runtime_aes_key, nonce)
data = data[int.from_bytes(data[56:60], 'little'):]
data[
cipher_text_offset : cipher_text_offset + cipher_text_length
],
runtime.runtime_aes_key,
nonce,
)
data = data[int.from_bytes(data[56:60], "little") :]
bcc_architecture_mapping = {
0x2001: 'win-x64',
0x2003: 'linux-x64',
0x2001: "win-x64",
0x2003: "linux-x64",
}
while True:
if len(bcc_aes_decrypted) < 16:
break
bcc_segment_offset = int.from_bytes(bcc_aes_decrypted[0:4], 'little')
bcc_segment_length = int.from_bytes(bcc_aes_decrypted[4:8], 'little')
bcc_architecture_id = int.from_bytes(bcc_aes_decrypted[8:12], 'little')
bcc_next_segment_offset = int.from_bytes(bcc_aes_decrypted[12:16], 'little')
bcc_architecture = bcc_architecture_mapping.get(bcc_architecture_id, f'0x{bcc_architecture_id:x}')
bcc_file_path = f'{dest_path}.1shot.bcc.{bcc_architecture}.so'
with open(bcc_file_path, 'wb') as f:
f.write(bcc_aes_decrypted[bcc_segment_offset:bcc_segment_offset+bcc_segment_length])
logger.info(f'{Fore.GREEN}Extracted BCC mode native part: {bcc_file_path}{Style.RESET_ALL}')
bcc_segment_offset = int.from_bytes(
bcc_aes_decrypted[0:4], "little"
)
bcc_segment_length = int.from_bytes(
bcc_aes_decrypted[4:8], "little"
)
bcc_architecture_id = int.from_bytes(
bcc_aes_decrypted[8:12], "little"
)
bcc_next_segment_offset = int.from_bytes(
bcc_aes_decrypted[12:16], "little"
)
bcc_architecture = bcc_architecture_mapping.get(
bcc_architecture_id, f"0x{bcc_architecture_id:x}"
)
bcc_file_path = f"{dest_path}.1shot.bcc.{bcc_architecture}.so"
with open(bcc_file_path, "wb") as f:
f.write(
bcc_aes_decrypted[
bcc_segment_offset : bcc_segment_offset
+ bcc_segment_length
]
)
logger.info(
f"{Fore.GREEN}Extracted BCC mode native part: {bcc_file_path}{Style.RESET_ALL}"
)
if bcc_next_segment_offset == 0:
break
bcc_aes_decrypted = bcc_aes_decrypted[bcc_next_segment_offset:]
cipher_text_offset = int.from_bytes(data[28:32], 'little')
cipher_text_length = int.from_bytes(data[32:36], 'little')
cipher_text_offset = int.from_bytes(data[28:32], "little")
cipher_text_length = int.from_bytes(data[32:36], "little")
nonce = data[36:40] + data[44:52]
seq_file_path = dest_path + '.1shot.seq'
with open(seq_file_path, 'wb') as f:
f.write(b'\xa1' + runtime.runtime_aes_key)
f.write(b'\xa2' + runtime.mix_str_aes_nonce())
f.write(b'\xf0\xff')
seq_file_path = dest_path + ".1shot.seq"
with open(seq_file_path, "wb") as f:
f.write(b"\xa1" + runtime.runtime_aes_key)
f.write(b"\xa2" + runtime.mix_str_aes_nonce())
f.write(b"\xf0\xff")
f.write(data[:cipher_text_offset])
f.write(general_aes_ctr_decrypt(
data[cipher_text_offset:cipher_text_offset+cipher_text_length], runtime.runtime_aes_key, nonce))
f.write(data[cipher_text_offset+cipher_text_length:])
f.write(
general_aes_ctr_decrypt(
data[
cipher_text_offset : cipher_text_offset
+ cipher_text_length
],
runtime.runtime_aes_key,
nonce,
)
)
f.write(data[cipher_text_offset + cipher_text_length :])
# Run without timeout
await decrypt_file_async(exe_path, seq_file_path, path, args)
await run_pycdc_async(
exe_path,
seq_file_path,
relative_path,
args.show_all,
args.show_err_opcode,
args.show_warn_stack,
)
except Exception as e:
error_details = traceback.format_exc()
logger.error(f'{Fore.RED}Decrypt failed: {e} ({path}){Style.RESET_ALL}')
logger.error(f'{Fore.RED}Error details: {error_details}{Style.RESET_ALL}')
# Create tasks for all files
logger.error(
f"{Fore.RED}Decrypt failed: {e} ({relative_path}){Style.RESET_ALL}"
)
logger.error(
f"{Fore.RED}Error details: {error_details}{Style.RESET_ALL}"
)
tasks = [process_file(path, data) for path, data in sequences]
# Run all tasks concurrently
await asyncio.gather(*tasks)
def decrypt_process(runtimes: Dict[str, RuntimeInfo], sequences: List[Tuple[str, bytes]], args):
def decrypt_process(
runtimes: Dict[str, RuntimeInfo], sequences: List[Tuple[str, bytes]], args
):
asyncio.run(decrypt_process_async(runtimes, sequences, args))
def get_platform_executable(args) -> str:
def get_platform_executable(specified: str) -> str:
"""
Get the appropriate executable for the current platform
"""
logger = logging.getLogger('shot')
logger = logging.getLogger("shot")
# If a specific executable is provided, use it
if args.executable:
if os.path.exists(args.executable):
logger.info(f'{Fore.GREEN}Using specified executable: {args.executable}{Style.RESET_ALL}')
return args.executable
if specified:
if os.path.exists(specified):
logger.info(
f"{Fore.GREEN}Using specified executable: {specified}{Style.RESET_ALL}"
)
return specified
else:
logger.warning(f'{Fore.YELLOW}Specified executable not found: {args.executable}{Style.RESET_ALL}')
logger.warning(
f"{Fore.YELLOW}Specified executable not found: {specified}{Style.RESET_ALL}"
)
oneshot_dir = os.path.dirname(os.path.abspath(__file__))
@@ -182,88 +286,91 @@ def get_platform_executable(args) -> str:
machine = platform.machine().lower()
# Check for architecture-specific executables
arch_specific_exe = f'pyarmor-1shot-{system}-{machine}'
if system == 'windows':
arch_specific_exe += '.exe'
arch_specific_exe = f"pyarmor-1shot-{system}-{machine}"
if system == "windows":
arch_specific_exe += ".exe"
arch_exe_path = os.path.join(oneshot_dir, arch_specific_exe)
if os.path.exists(arch_exe_path):
logger.info(f'{Fore.GREEN}Using architecture-specific executable: {arch_specific_exe}{Style.RESET_ALL}')
logger.info(
f"{Fore.GREEN}Using architecture-specific executable: {arch_specific_exe}{Style.RESET_ALL}"
)
return arch_exe_path
platform_map = {
'windows': 'pyarmor-1shot.exe',
'linux': 'pyarmor-1shot',
'darwin': 'pyarmor-1shot',
"windows": "pyarmor-1shot.exe",
"linux": "pyarmor-1shot",
"darwin": "pyarmor-1shot",
}
base_exe_name = platform_map.get(system, 'pyarmor-1shot')
base_exe_name = platform_map.get(system, "pyarmor-1shot")
# Then check for platform-specific executable
platform_exe_path = os.path.join(oneshot_dir, base_exe_name)
if os.path.exists(platform_exe_path):
logger.info(f'{Fore.GREEN}Using executable: {base_exe_name}{Style.RESET_ALL}')
logger.info(f"{Fore.GREEN}Using executable: {base_exe_name}{Style.RESET_ALL}")
return platform_exe_path
# Finally, check for generic executable
generic_exe_path = os.path.join(oneshot_dir, 'pyarmor-1shot')
generic_exe_path = os.path.join(oneshot_dir, "pyarmor-1shot")
if os.path.exists(generic_exe_path):
logger.info(f'{Fore.GREEN}Using executable: pyarmor-1shot{Style.RESET_ALL}')
logger.info(f"{Fore.GREEN}Using executable: pyarmor-1shot{Style.RESET_ALL}")
return generic_exe_path
logger.critical(f'{Fore.RED}Executable {base_exe_name} not found, please build it first or download on https://github.com/Lil-House/Pyarmor-Static-Unpack-1shot/releases {Style.RESET_ALL}')
logger.critical(
f"{Fore.RED}Executable {base_exe_name} not found, please build it first or download on https://github.com/Lil-House/Pyarmor-Static-Unpack-1shot/releases {Style.RESET_ALL}"
)
exit(1)
def parse_args():
parser = argparse.ArgumentParser(
description='Pyarmor Static Unpack 1 Shot Entry')
parser = argparse.ArgumentParser(description="Pyarmor Static Unpack 1 Shot Entry")
parser.add_argument(
'directory',
"directory",
help='the "root" directory of obfuscated scripts',
type=str,
)
parser.add_argument(
'-r',
'--runtime',
help='path to pyarmor_runtime[.pyd|.so|.dylib]',
"-r",
"--runtime",
help="path to pyarmor_runtime[.pyd|.so|.dylib]",
type=str, # argparse.FileType('rb'),
)
parser.add_argument(
'-o',
'--output-dir',
help='save output files in another directory instead of in-place, with folder structure remain unchanged',
"-o",
"--output-dir",
help="save output files in another directory instead of in-place, with folder structure remain unchanged",
type=str,
)
parser.add_argument(
'--export-raw-data',
help='save data found in source files as-is',
action='store_true',
"--export-raw-data",
help="save data found in source files as-is",
action="store_true",
)
parser.add_argument(
'--show-all',
help='show all pycdc errors and warnings',
action='store_true',
"--show-all",
help="show all pycdc errors and warnings",
action="store_true",
)
parser.add_argument(
'--show-err-opcode',
help='show pycdc unsupported opcode errors',
action='store_true',
"--show-err-opcode",
help="show pycdc unsupported opcode errors",
action="store_true",
)
parser.add_argument(
'--show-warn-stack',
help='show pycdc stack related warnings',
action='store_true',
"--show-warn-stack",
help="show pycdc stack related warnings",
action="store_true",
)
parser.add_argument(
'--concurrent',
help='number of concurrent deobfuscation processes (default: 4)',
"--concurrent",
help="number of concurrent handling processes (default: 4)",
type=int,
default=4,
)
parser.add_argument(
'-e',
'--executable',
help='path to the pyarmor-1shot executable to use',
"-e",
"--executable",
help="path to the pyarmor-1shot executable to use",
type=str,
)
return parser.parse_args()
@@ -273,11 +380,11 @@ def main():
args = parse_args()
logging.basicConfig(
level=logging.INFO,
format='%(levelname)-8s %(asctime)-28s %(message)s',
format="%(levelname)-8s %(asctime)-28s %(message)s",
)
logger = logging.getLogger('shot')
logger = logging.getLogger("shot")
print(Fore.CYAN + r'''
print(rf"""{Fore.CYAN}
____ ____
( __ ) ( __ )
| |~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~| |
@@ -288,11 +395,11 @@ def main():
| | |_| \_, |\__,_|_| |_||_||_|\___/|_| |_|___/|_||_|\___/ \__| | |
| | |__/ | |
|__|~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~|__|
(____) v0.2.1 (____)
(____) v0.2.1+ (____)
For technology exchange only. Use at your own risk.
GitHub: https://github.com/Lil-House/Pyarmor-Static-Unpack-1shot
''' + Style.RESET_ALL)
{Style.RESET_ALL}""")
if args.runtime:
specified_runtime = RuntimeInfo(args.runtime)
@@ -300,71 +407,89 @@ def main():
runtimes = {specified_runtime.serial_number: specified_runtime}
else:
specified_runtime = None
runtimes = {'000000': RuntimeInfo.default()}
sequences: List[Tuple[str, bytes]] = []
runtimes = {"000000": RuntimeInfo.default()}
if args.output_dir and not os.path.exists(args.output_dir):
os.makedirs(args.output_dir)
if args.output_dir and not os.path.isdir(args.output_dir):
logger.error(f'{Fore.RED}Cannot use {repr(args.output_dir)} as output directory{Style.RESET_ALL}')
logger.error(
f"{Fore.RED}Cannot use {repr(args.output_dir)} as output directory{Style.RESET_ALL}"
)
return
# Note for path handling:
# args.output_dir: is either None or an existing directory path, can be absolute or relative
# args.directory: before calling `decrypt_process`, it is an existing directory path, can be absolute or relative
# paths in `sequences`: must be relative file paths, without trailing slashes, can exist or not
if os.path.isfile(args.directory):
single_file_path = os.path.abspath(args.directory)
args.directory = os.path.dirname(single_file_path)
relative_path = os.path.basename(single_file_path)
if specified_runtime is None:
logger.error(f'{Fore.RED}Please specify `pyarmor_runtime` file by `-r` if input is a file{Style.RESET_ALL}')
logger.error(
f"{Fore.RED}Please specify `pyarmor_runtime` file by `-r` if input is a file{Style.RESET_ALL}"
)
return
logger.info(f'{Fore.CYAN}Single file mode{Style.RESET_ALL}')
result = detect_process(args.directory, args.directory)
if result is None:
logger.error(f'{Fore.RED}No armored data found{Style.RESET_ALL}')
logger.info(f"{Fore.CYAN}Single file mode{Style.RESET_ALL}")
single_file_sequences = detect_process(single_file_path, relative_path)
if single_file_sequences is None:
logger.error(f"{Fore.RED}No armored data found{Style.RESET_ALL}")
return
sequences.extend(result)
decrypt_process(runtimes, sequences, args)
decrypt_process(runtimes, single_file_sequences, args)
return # single file mode ends here
sequences: List[Tuple[str, bytes]] = []
dir_path: str
dirs: List[str]
files: List[str]
for dir_path, dirs, files in os.walk(args.directory, followlinks=False):
if '.no1shot' in files:
logger.info(f'{Fore.YELLOW}Skipping {dir_path} because of `.no1shot`{Style.RESET_ALL}')
if ".no1shot" in files:
logger.info(
f"{Fore.YELLOW}Skipping {dir_path} because of `.no1shot`{Style.RESET_ALL}"
)
dirs.clear()
files.clear()
continue
for d in ['__pycache__', 'site-packages']:
for d in ["__pycache__", "site-packages"]:
if d in dirs:
dirs.remove(d)
for file_name in files:
if '.1shot.' in file_name:
if ".1shot." in file_name:
continue
file_path = os.path.join(dir_path, file_name)
relative_path = os.path.relpath(file_path, args.directory)
if file_name.endswith('.pyz'):
with open(file_path, 'rb') as f:
if file_name.endswith(".pyz"):
with open(file_path, "rb") as f:
head = f.read(16 * 1024 * 1024)
if b'PY00' in head \
and (not os.path.exists(file_path + '_extracted')
or len(os.listdir(file_path + '_extracted')) == 0):
if b"PY00" in head and (
not os.path.exists(file_path + "_extracted")
or len(os.listdir(file_path + "_extracted")) == 0
):
logger.error(
f'{Fore.RED}A PYZ file containing armored data is detected, but the PYZ file has not been extracted by other tools. This error is not a problem with this tool. If the folder is extracted by Pyinstxtractor, please read the output information of Pyinstxtractor carefully. ({relative_path}){Style.RESET_ALL}')
f"{Fore.RED}A PYZ file containing armored data is detected, but the PYZ file has not been extracted by other tools. This error is not a problem with this tool. If the folder is extracted by Pyinstxtractor, please read the output information of Pyinstxtractor carefully. ({relative_path}){Style.RESET_ALL}"
)
continue
# is pyarmor_runtime?
if specified_runtime is None \
and file_name.startswith('pyarmor_runtime') \
and file_name.endswith(('.pyd', '.so', '.dylib')):
if (
specified_runtime is None
and file_name.startswith("pyarmor_runtime")
and file_name.endswith((".pyd", ".so", ".dylib"))
):
try:
new_runtime = RuntimeInfo(file_path)
runtimes[new_runtime.serial_number] = new_runtime
logger.info(
f'{Fore.GREEN}Found new runtime: {new_runtime.serial_number} ({file_path}){Style.RESET_ALL}')
f"{Fore.GREEN}Found new runtime: {new_runtime.serial_number} ({file_path}){Style.RESET_ALL}"
)
print(new_runtime)
continue
except:
except Exception:
pass
result = detect_process(file_path, relative_path)
@@ -372,13 +497,13 @@ def main():
sequences.extend(result)
if not runtimes:
logger.error(f'{Fore.RED}No runtime found{Style.RESET_ALL}')
logger.error(f"{Fore.RED}No runtime found{Style.RESET_ALL}")
return
if not sequences:
logger.error(f'{Fore.RED}No armored data found{Style.RESET_ALL}')
logger.error(f"{Fore.RED}No armored data found{Style.RESET_ALL}")
return
decrypt_process(runtimes, sequences, args)
if __name__ == '__main__':
if __name__ == "__main__":
main()