import logging import os from typing import List, Tuple, Union from util import dword def ascii_ratio(data: bytes) -> float: return sum(32 <= c < 127 for c in data) / len(data) def valid_bytes(data: bytes) -> bool: return len(data) > 64 and all(0x30 <= b <= 0x39 for b in data[2:8]) and data[9] == 3 def source_as_file(file_path: str) -> Union[List[bytes], None]: try: with open(file_path, "r") as f: co = compile(f.read(), "", "exec") data = [i for i in co.co_consts if type(i) is bytes and valid_bytes(i)] return data except Exception: return None def source_as_lines(file_path: str) -> Union[List[bytes], None]: data = [] try: with open(file_path, "r") as f: for line in f: try: co = compile(line, "", "exec") data.extend( [i for i in co.co_consts if type(i) is bytes and valid_bytes(i)] ) except Exception: # ignore not compilable lines pass except Exception: return None return data # XXX: use bytes view instead of copying slices def find_data_from_bytes(data: bytes, max_count=-1) -> List[bytes]: result = [] idx = 0 while len(result) != max_count: idx = data.find(b"PY00") # XXX: not necessarily starts with b"PY" if idx == -1: break data = data[idx:] if len(data) < 64: # don't break if len > 64, maybe there is PY00blahPY000000 break header_len = dword(data, 28) body_len = dword(data, 32) if header_len > 256 or body_len > 0xFFFFF or header_len + body_len > len(data): # compressed or coincident, skip data = data[4:] continue complete_object_length = header_len + body_len # maybe followed by data for other Python versions or another part of BCC next_segment_offset = dword(data, 56) data_next = data[next_segment_offset:] while next_segment_offset != 0 and valid_bytes(data_next): header_len = dword(data_next, 28) body_len = dword(data_next, 32) complete_object_length = next_segment_offset + header_len + body_len if dword(data_next, 56) == 0: break next_segment_offset += dword(data_next, 56) data_next = data[next_segment_offset:] result.append(data[:complete_object_length]) data = data[complete_object_length:] return result def nuitka_package( head: bytes, relative_path: str ) -> Union[List[Tuple[str, bytes]], None]: first_occurrence = head.find(b"PY00") if first_occurrence == -1: return None last_dot_bytecode = head.rfind(b".bytecode\x00", 0, first_occurrence) if last_dot_bytecode == -1: return None length = dword(head, last_dot_bytecode - 4) end = last_dot_bytecode + length cur = last_dot_bytecode result = [] while cur < end: module_name_len = head.find(b"\x00", cur, end) - cur module_name = head[cur : cur + module_name_len].decode( "utf-8", errors="replace" ) cur += module_name_len + 1 module_len = dword(head, cur) cur += 4 module_data = find_data_from_bytes(head[cur : cur + module_len], 1) if module_data: result.append( ( os.path.join( relative_path.rstrip("/\\") + ".1shot.ext", module_name ), module_data[0], ) ) cur += module_len if result: logger = logging.getLogger("detect") logger.info(f"Found data in Nuitka package: {relative_path}") return result return None def detect_process( file_path: str, relative_path: str ) -> Union[List[Tuple[str, bytes]], None]: """ Returns a list of (relative_path, bytes_raw) tuples, or None. Do not raise exceptions. """ logger = logging.getLogger("detect") try: with open(file_path, "rb") as f: head = f.read(16 * 1024 * 1024) except Exception: logger.error(f"Failed to read file: {relative_path}") return None if b"__pyarmor__" not in head: # no need to dig deeper return None if ascii_ratio(head[:2048]) >= 0.9: # the whole file may not be compiled, but we can still try some lines; # None means failure (then we make another try), # empty list means success but no data found (then we skip this file) result = source_as_file(file_path) if result is None: result = source_as_lines(file_path) if result is None: return None result_len = len(result) if result_len == 0: return None elif result_len == 1: logger.info(f"Found data in source: {relative_path}") return [(relative_path, result[0])] else: logger.info(f"Found data in source: {relative_path}") return [(f"{relative_path}__{i}", result[i]) for i in range(len(result))] # binary file # ignore data after 16MB, before we have a reason to read more if b"Error, corrupted constants object" in head: # an interesting special case: packer put armored data in a Nuitka package # we can know the exact module names, instead of adding boring __0, __1, ... return nuitka_package(head, relative_path) result = find_data_from_bytes(head) result_len = len(result) if result_len == 0: return None elif result_len == 1: logger.info(f"Found data in binary: {relative_path}") return [(relative_path, result[0])] else: logger.info(f"Found data in binary: {relative_path}") return [(f"{relative_path}__{i}", result[i]) for i in range(len(result))]