From d9a5dee5aa4b53a7a8331d099f5ca286b23dffc2 Mon Sep 17 00:00:00 2001 From: Lil-Ran Date: Thu, 20 Nov 2025 16:06:53 +0800 Subject: [PATCH] refactor(scripts): replace int.from_bytes with dword --- oneshot/detect.py | 33 ++++++++++++++++++------------- oneshot/runtime.py | 2 +- oneshot/shot.py | 49 ++++++++++++++++++---------------------------- oneshot/util.py | 2 +- 4 files changed, 40 insertions(+), 46 deletions(-) diff --git a/oneshot/detect.py b/oneshot/detect.py index 65fc9cb..1125618 100644 --- a/oneshot/detect.py +++ b/oneshot/detect.py @@ -3,6 +3,9 @@ import os from typing import List, Tuple, Union +from util import dword + + def ascii_ratio(data: bytes) -> float: return sum(32 <= c < 127 for c in data) / len(data) @@ -17,7 +20,7 @@ def source_as_file(file_path: str) -> Union[List[bytes], None]: co = compile(f.read(), "", "exec") data = [i for i in co.co_consts if type(i) is bytes and valid_bytes(i)] return data - except: + except Exception: return None @@ -31,10 +34,10 @@ def source_as_lines(file_path: str) -> Union[List[bytes], None]: data.extend( [i for i in co.co_consts if type(i) is bytes and valid_bytes(i)] ) - except: + except Exception: # ignore not compilable lines pass - except: + except Exception: return None return data @@ -53,8 +56,8 @@ def find_data_from_bytes(data: bytes, max_count=-1) -> List[bytes]: if len(data) < 64: # don't break if len > 64, maybe there is PY00blahPY000000 break - header_len = int.from_bytes(data[28:32], "little") - body_len = int.from_bytes(data[32:36], "little") + header_len = dword(data, 28) + body_len = dword(data, 32) if header_len > 256 or body_len > 0xFFFFF or header_len + body_len > len(data): # compressed or coincident, skip data = data[4:] @@ -63,16 +66,16 @@ def find_data_from_bytes(data: bytes, max_count=-1) -> List[bytes]: complete_object_length = header_len + body_len # maybe followed by data for other Python versions or another part of BCC - next_segment_offset = int.from_bytes(data[56:60], "little") + next_segment_offset = dword(data, 56) data_next = data[next_segment_offset:] while next_segment_offset != 0 and valid_bytes(data_next): - header_len = int.from_bytes(data_next[28:32], "little") - body_len = int.from_bytes(data_next[32:36], "little") + header_len = dword(data_next, 28) + body_len = dword(data_next, 32) complete_object_length = next_segment_offset + header_len + body_len - if int.from_bytes(data_next[56:60], "little") == 0: + if dword(data_next, 56) == 0: break - next_segment_offset += int.from_bytes(data_next[56:60], "little") + next_segment_offset += dword(data_next, 56) data_next = data[next_segment_offset:] result.append(data[:complete_object_length]) @@ -89,15 +92,17 @@ def nuitka_package( last_dot_bytecode = head.rfind(b".bytecode\x00", 0, first_occurrence) if last_dot_bytecode == -1: return None - length = int.from_bytes(head[last_dot_bytecode - 4 : last_dot_bytecode], "little") + length = dword(head, last_dot_bytecode - 4) end = last_dot_bytecode + length cur = last_dot_bytecode result = [] while cur < end: module_name_len = head.find(b"\x00", cur, end) - cur - module_name = head[cur : cur + module_name_len].decode("utf-8") + module_name = head[cur : cur + module_name_len].decode( + "utf-8", errors="replace" + ) cur += module_name_len + 1 - module_len = int.from_bytes(head[cur : cur + 4], "little") + module_len = dword(head, cur) cur += 4 module_data = find_data_from_bytes(head[cur : cur + module_len], 1) if module_data: @@ -129,7 +134,7 @@ def detect_process( try: with open(file_path, "rb") as f: head = f.read(16 * 1024 * 1024) - except: + except Exception: logger.error(f"Failed to read file: {relative_path}") return None diff --git a/oneshot/runtime.py b/oneshot/runtime.py index 446b8b4..1b4d54a 100644 --- a/oneshot/runtime.py +++ b/oneshot/runtime.py @@ -37,7 +37,7 @@ class RuntimeInfo: # TODO: implement for other platforms self.extract_info_win64() - self.serial_number = self.part_1[12:18].decode() + self.serial_number = self.part_1[12:18].decode("utf-8", errors="replace") self.runtime_aes_key = self.calc_aes_key() def __str__(self) -> str: diff --git a/oneshot/shot.py b/oneshot/shot.py index 1241a4b..56f63a0 100644 --- a/oneshot/shot.py +++ b/oneshot/shot.py @@ -24,6 +24,7 @@ except ImportError: from detect import detect_process from runtime import RuntimeInfo +from util import dword, bytes_sub # Initialize colorama @@ -169,7 +170,7 @@ async def decrypt_process_async( async def process_file(relative_path, data): async with semaphore: try: - serial_number = data[2:8].decode("utf-8") + serial_number = data[2:8].decode("utf-8", errors="replace") runtime = runtimes[serial_number] logger.info( f"{Fore.CYAN}Decrypting: {serial_number} ({relative_path}){Style.RESET_ALL}" @@ -189,18 +190,16 @@ async def decrypt_process_async( f.write(data) # Check BCC; mutates "data" - if int.from_bytes(data[20:24], "little") == 9: - cipher_text_offset = int.from_bytes(data[28:32], "little") - cipher_text_length = int.from_bytes(data[32:36], "little") + if dword(data, 20) == 9: + cipher_text_offset = dword(data, 28) + cipher_text_length = dword(data, 32) nonce = data[36:40] + data[44:52] bcc_aes_decrypted = general_aes_ctr_decrypt( - data[ - cipher_text_offset : cipher_text_offset + cipher_text_length - ], + bytes_sub(data, cipher_text_offset, cipher_text_length), runtime.runtime_aes_key, nonce, ) - data = data[int.from_bytes(data[56:60], "little") :] + data = data[dword(data, 56) :] bcc_architecture_mapping = { 0x2001: "win-x64", 0x2003: "linux-x64", @@ -208,28 +207,21 @@ async def decrypt_process_async( while True: if len(bcc_aes_decrypted) < 16: break - bcc_segment_offset = int.from_bytes( - bcc_aes_decrypted[0:4], "little" - ) - bcc_segment_length = int.from_bytes( - bcc_aes_decrypted[4:8], "little" - ) - bcc_architecture_id = int.from_bytes( - bcc_aes_decrypted[8:12], "little" - ) - bcc_next_segment_offset = int.from_bytes( - bcc_aes_decrypted[12:16], "little" - ) + bcc_segment_offset = dword(bcc_aes_decrypted, 0) + bcc_segment_length = dword(bcc_aes_decrypted, 4) + bcc_architecture_id = dword(bcc_aes_decrypted, 8) + bcc_next_segment_offset = dword(bcc_aes_decrypted, 12) bcc_architecture = bcc_architecture_mapping.get( bcc_architecture_id, f"0x{bcc_architecture_id:x}" ) bcc_file_path = f"{dest_path}.1shot.bcc.{bcc_architecture}.so" with open(bcc_file_path, "wb") as f: f.write( - bcc_aes_decrypted[ - bcc_segment_offset : bcc_segment_offset - + bcc_segment_length - ] + bytes_sub( + bcc_aes_decrypted, + bcc_segment_offset, + bcc_segment_length, + ) ) logger.info( f"{Fore.GREEN}Extracted BCC mode native part: {bcc_file_path}{Style.RESET_ALL}" @@ -238,8 +230,8 @@ async def decrypt_process_async( break bcc_aes_decrypted = bcc_aes_decrypted[bcc_next_segment_offset:] - cipher_text_offset = int.from_bytes(data[28:32], "little") - cipher_text_length = int.from_bytes(data[32:36], "little") + cipher_text_offset = dword(data, 28) + cipher_text_length = dword(data, 32) nonce = data[36:40] + data[44:52] seq_file_path = dest_path + ".1shot.seq" with open(seq_file_path, "wb") as f: @@ -249,10 +241,7 @@ async def decrypt_process_async( f.write(data[:cipher_text_offset]) f.write( general_aes_ctr_decrypt( - data[ - cipher_text_offset : cipher_text_offset - + cipher_text_length - ], + bytes_sub(data, cipher_text_offset, cipher_text_length), runtime.runtime_aes_key, nonce, ) diff --git a/oneshot/util.py b/oneshot/util.py index 7d7fab9..75f4ec7 100644 --- a/oneshot/util.py +++ b/oneshot/util.py @@ -2,5 +2,5 @@ def dword(buffer, idx: int) -> int: return int.from_bytes(buffer[idx : idx + 4], "little") -def bytes_sub(buffer, start: int, length: int) -> int: +def bytes_sub(buffer, start: int, length: int) -> bytes: return buffer[start : start + length]