183 lines
5.9 KiB
Python
183 lines
5.9 KiB
Python
import logging
|
|
import os
|
|
from typing import List, Tuple, Union
|
|
|
|
|
|
from util import dword
|
|
|
|
|
|
def ascii_ratio(data: bytes) -> float:
|
|
return sum(32 <= c < 127 for c in data) / len(data)
|
|
|
|
|
|
def valid_bytes(data: bytes) -> bool:
|
|
return len(data) > 64 and all(0x30 <= b <= 0x39 for b in data[2:8]) and data[9] == 3
|
|
|
|
|
|
def source_as_file(file_path: str) -> Union[List[bytes], None]:
|
|
try:
|
|
with open(file_path, "r") as f:
|
|
co = compile(f.read(), "<str>", "exec")
|
|
data = [i for i in co.co_consts if type(i) is bytes and valid_bytes(i)]
|
|
return data
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def source_as_lines(file_path: str) -> Union[List[bytes], None]:
|
|
data = []
|
|
try:
|
|
with open(file_path, "r") as f:
|
|
for line in f:
|
|
try:
|
|
co = compile(line, "<str>", "exec")
|
|
data.extend(
|
|
[i for i in co.co_consts if type(i) is bytes and valid_bytes(i)]
|
|
)
|
|
except Exception:
|
|
# ignore not compilable lines
|
|
pass
|
|
except Exception:
|
|
return None
|
|
return data
|
|
|
|
|
|
# XXX: use bytes view instead of copying slices
|
|
|
|
|
|
def find_data_from_bytes(data: bytes, max_count=-1) -> List[bytes]:
|
|
result = []
|
|
idx = 0
|
|
while len(result) != max_count:
|
|
idx = data.find(b"PY0") # XXX: not necessarily starts with b"PY"
|
|
if idx == -1:
|
|
break
|
|
data = data[idx:]
|
|
if len(data) < 64:
|
|
# don't break if len > 64, maybe there is PY0blahPY0
|
|
break
|
|
header_len = dword(data, 28)
|
|
body_len = dword(data, 32)
|
|
if header_len > 256 or body_len > 0xFFFFF or header_len + body_len > len(data):
|
|
# compressed or coincident, skip
|
|
data = data[4:]
|
|
continue
|
|
|
|
complete_object_length = header_len + body_len
|
|
|
|
# maybe followed by data for other Python versions or another part of BCC
|
|
next_segment_offset = dword(data, 56)
|
|
data_next = data[next_segment_offset:]
|
|
while next_segment_offset != 0 and valid_bytes(data_next):
|
|
header_len = dword(data_next, 28)
|
|
body_len = dword(data_next, 32)
|
|
complete_object_length = next_segment_offset + header_len + body_len
|
|
|
|
if dword(data_next, 56) == 0:
|
|
break
|
|
next_segment_offset += dword(data_next, 56)
|
|
data_next = data[next_segment_offset:]
|
|
|
|
result.append(data[:complete_object_length])
|
|
data = data[complete_object_length:]
|
|
return result
|
|
|
|
|
|
def nuitka_package(
|
|
head: bytes, relative_path: str
|
|
) -> Union[List[Tuple[str, bytes]], None]:
|
|
first_occurrence = head.find(b"PY0")
|
|
if first_occurrence == -1:
|
|
return None
|
|
last_dot_bytecode = head.rfind(b".bytecode\x00", 0, first_occurrence)
|
|
if last_dot_bytecode == -1:
|
|
return None
|
|
length = dword(head, last_dot_bytecode - 4)
|
|
end = last_dot_bytecode + length
|
|
cur = last_dot_bytecode
|
|
result = []
|
|
while cur < end:
|
|
module_name_len = head.find(b"\x00", cur, end) - cur
|
|
module_name = head[cur : cur + module_name_len].decode(
|
|
"utf-8", errors="replace"
|
|
)
|
|
cur += module_name_len + 1
|
|
module_len = dword(head, cur)
|
|
cur += 4
|
|
module_data = find_data_from_bytes(head[cur : cur + module_len], 1)
|
|
if module_data:
|
|
result.append(
|
|
(
|
|
os.path.join(
|
|
relative_path.rstrip("/\\") + ".1shot.ext", module_name
|
|
),
|
|
module_data[0],
|
|
)
|
|
)
|
|
cur += module_len
|
|
if result:
|
|
logger = logging.getLogger("detect")
|
|
logger.info(f"Found data in Nuitka package: {relative_path}")
|
|
return result
|
|
return None
|
|
|
|
|
|
def detect_process(
|
|
file_path: str, relative_path: str
|
|
) -> Union[List[Tuple[str, bytes]], None]:
|
|
"""
|
|
Returns a list of (relative_path, bytes_raw) tuples, or None.
|
|
Do not raise exceptions.
|
|
"""
|
|
logger = logging.getLogger("detect")
|
|
|
|
try:
|
|
with open(file_path, "rb") as f:
|
|
head = f.read(16 * 1024 * 1024)
|
|
except Exception:
|
|
logger.error(f"Failed to read file: {relative_path}")
|
|
return None
|
|
|
|
if b"__pyarmor__" not in head:
|
|
# no need to dig deeper
|
|
return None
|
|
|
|
if ascii_ratio(head[:2048]) >= 0.9:
|
|
# the whole file may not be compiled, but we can still try some lines;
|
|
# None means failure (then we make another try),
|
|
# empty list means success but no data found (then we skip this file)
|
|
result = source_as_file(file_path)
|
|
if result is None:
|
|
result = source_as_lines(file_path)
|
|
if result is None:
|
|
return None
|
|
|
|
result_len = len(result)
|
|
if result_len == 0:
|
|
return None
|
|
elif result_len == 1:
|
|
logger.info(f"Found data in source: {relative_path}")
|
|
return [(relative_path, result[0])]
|
|
else:
|
|
logger.info(f"Found data in source: {relative_path}")
|
|
return [(f"{relative_path}__{i}", result[i]) for i in range(len(result))]
|
|
|
|
# binary file
|
|
# ignore data after 16MB, before we have a reason to read more
|
|
|
|
if b"Error, corrupted constants object" in head:
|
|
# an interesting special case: packer put armored data in a Nuitka package
|
|
# we can know the exact module names, instead of adding boring __0, __1, ...
|
|
return nuitka_package(head, relative_path)
|
|
|
|
result = find_data_from_bytes(head)
|
|
result_len = len(result)
|
|
if result_len == 0:
|
|
return None
|
|
elif result_len == 1:
|
|
logger.info(f"Found data in binary: {relative_path}")
|
|
return [(relative_path, result[0])]
|
|
else:
|
|
logger.info(f"Found data in binary: {relative_path}")
|
|
return [(f"{relative_path}__{i}", result[i]) for i in range(len(result))]
|