refactor(scripts): replace int.from_bytes with dword
This commit is contained in:
@@ -3,6 +3,9 @@ import os
|
|||||||
from typing import List, Tuple, Union
|
from typing import List, Tuple, Union
|
||||||
|
|
||||||
|
|
||||||
|
from util import dword
|
||||||
|
|
||||||
|
|
||||||
def ascii_ratio(data: bytes) -> float:
|
def ascii_ratio(data: bytes) -> float:
|
||||||
return sum(32 <= c < 127 for c in data) / len(data)
|
return sum(32 <= c < 127 for c in data) / len(data)
|
||||||
|
|
||||||
@@ -17,7 +20,7 @@ def source_as_file(file_path: str) -> Union[List[bytes], None]:
|
|||||||
co = compile(f.read(), "<str>", "exec")
|
co = compile(f.read(), "<str>", "exec")
|
||||||
data = [i for i in co.co_consts if type(i) is bytes and valid_bytes(i)]
|
data = [i for i in co.co_consts if type(i) is bytes and valid_bytes(i)]
|
||||||
return data
|
return data
|
||||||
except:
|
except Exception:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
@@ -31,10 +34,10 @@ def source_as_lines(file_path: str) -> Union[List[bytes], None]:
|
|||||||
data.extend(
|
data.extend(
|
||||||
[i for i in co.co_consts if type(i) is bytes and valid_bytes(i)]
|
[i for i in co.co_consts if type(i) is bytes and valid_bytes(i)]
|
||||||
)
|
)
|
||||||
except:
|
except Exception:
|
||||||
# ignore not compilable lines
|
# ignore not compilable lines
|
||||||
pass
|
pass
|
||||||
except:
|
except Exception:
|
||||||
return None
|
return None
|
||||||
return data
|
return data
|
||||||
|
|
||||||
@@ -53,8 +56,8 @@ def find_data_from_bytes(data: bytes, max_count=-1) -> List[bytes]:
|
|||||||
if len(data) < 64:
|
if len(data) < 64:
|
||||||
# don't break if len > 64, maybe there is PY00blahPY000000
|
# don't break if len > 64, maybe there is PY00blahPY000000
|
||||||
break
|
break
|
||||||
header_len = int.from_bytes(data[28:32], "little")
|
header_len = dword(data, 28)
|
||||||
body_len = int.from_bytes(data[32:36], "little")
|
body_len = dword(data, 32)
|
||||||
if header_len > 256 or body_len > 0xFFFFF or header_len + body_len > len(data):
|
if header_len > 256 or body_len > 0xFFFFF or header_len + body_len > len(data):
|
||||||
# compressed or coincident, skip
|
# compressed or coincident, skip
|
||||||
data = data[4:]
|
data = data[4:]
|
||||||
@@ -63,16 +66,16 @@ def find_data_from_bytes(data: bytes, max_count=-1) -> List[bytes]:
|
|||||||
complete_object_length = header_len + body_len
|
complete_object_length = header_len + body_len
|
||||||
|
|
||||||
# maybe followed by data for other Python versions or another part of BCC
|
# maybe followed by data for other Python versions or another part of BCC
|
||||||
next_segment_offset = int.from_bytes(data[56:60], "little")
|
next_segment_offset = dword(data, 56)
|
||||||
data_next = data[next_segment_offset:]
|
data_next = data[next_segment_offset:]
|
||||||
while next_segment_offset != 0 and valid_bytes(data_next):
|
while next_segment_offset != 0 and valid_bytes(data_next):
|
||||||
header_len = int.from_bytes(data_next[28:32], "little")
|
header_len = dword(data_next, 28)
|
||||||
body_len = int.from_bytes(data_next[32:36], "little")
|
body_len = dword(data_next, 32)
|
||||||
complete_object_length = next_segment_offset + header_len + body_len
|
complete_object_length = next_segment_offset + header_len + body_len
|
||||||
|
|
||||||
if int.from_bytes(data_next[56:60], "little") == 0:
|
if dword(data_next, 56) == 0:
|
||||||
break
|
break
|
||||||
next_segment_offset += int.from_bytes(data_next[56:60], "little")
|
next_segment_offset += dword(data_next, 56)
|
||||||
data_next = data[next_segment_offset:]
|
data_next = data[next_segment_offset:]
|
||||||
|
|
||||||
result.append(data[:complete_object_length])
|
result.append(data[:complete_object_length])
|
||||||
@@ -89,15 +92,17 @@ def nuitka_package(
|
|||||||
last_dot_bytecode = head.rfind(b".bytecode\x00", 0, first_occurrence)
|
last_dot_bytecode = head.rfind(b".bytecode\x00", 0, first_occurrence)
|
||||||
if last_dot_bytecode == -1:
|
if last_dot_bytecode == -1:
|
||||||
return None
|
return None
|
||||||
length = int.from_bytes(head[last_dot_bytecode - 4 : last_dot_bytecode], "little")
|
length = dword(head, last_dot_bytecode - 4)
|
||||||
end = last_dot_bytecode + length
|
end = last_dot_bytecode + length
|
||||||
cur = last_dot_bytecode
|
cur = last_dot_bytecode
|
||||||
result = []
|
result = []
|
||||||
while cur < end:
|
while cur < end:
|
||||||
module_name_len = head.find(b"\x00", cur, end) - cur
|
module_name_len = head.find(b"\x00", cur, end) - cur
|
||||||
module_name = head[cur : cur + module_name_len].decode("utf-8")
|
module_name = head[cur : cur + module_name_len].decode(
|
||||||
|
"utf-8", errors="replace"
|
||||||
|
)
|
||||||
cur += module_name_len + 1
|
cur += module_name_len + 1
|
||||||
module_len = int.from_bytes(head[cur : cur + 4], "little")
|
module_len = dword(head, cur)
|
||||||
cur += 4
|
cur += 4
|
||||||
module_data = find_data_from_bytes(head[cur : cur + module_len], 1)
|
module_data = find_data_from_bytes(head[cur : cur + module_len], 1)
|
||||||
if module_data:
|
if module_data:
|
||||||
@@ -129,7 +134,7 @@ def detect_process(
|
|||||||
try:
|
try:
|
||||||
with open(file_path, "rb") as f:
|
with open(file_path, "rb") as f:
|
||||||
head = f.read(16 * 1024 * 1024)
|
head = f.read(16 * 1024 * 1024)
|
||||||
except:
|
except Exception:
|
||||||
logger.error(f"Failed to read file: {relative_path}")
|
logger.error(f"Failed to read file: {relative_path}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|||||||
@@ -37,7 +37,7 @@ class RuntimeInfo:
|
|||||||
# TODO: implement for other platforms
|
# TODO: implement for other platforms
|
||||||
self.extract_info_win64()
|
self.extract_info_win64()
|
||||||
|
|
||||||
self.serial_number = self.part_1[12:18].decode()
|
self.serial_number = self.part_1[12:18].decode("utf-8", errors="replace")
|
||||||
self.runtime_aes_key = self.calc_aes_key()
|
self.runtime_aes_key = self.calc_aes_key()
|
||||||
|
|
||||||
def __str__(self) -> str:
|
def __str__(self) -> str:
|
||||||
|
|||||||
@@ -24,6 +24,7 @@ except ImportError:
|
|||||||
|
|
||||||
from detect import detect_process
|
from detect import detect_process
|
||||||
from runtime import RuntimeInfo
|
from runtime import RuntimeInfo
|
||||||
|
from util import dword, bytes_sub
|
||||||
|
|
||||||
|
|
||||||
# Initialize colorama
|
# Initialize colorama
|
||||||
@@ -169,7 +170,7 @@ async def decrypt_process_async(
|
|||||||
async def process_file(relative_path, data):
|
async def process_file(relative_path, data):
|
||||||
async with semaphore:
|
async with semaphore:
|
||||||
try:
|
try:
|
||||||
serial_number = data[2:8].decode("utf-8")
|
serial_number = data[2:8].decode("utf-8", errors="replace")
|
||||||
runtime = runtimes[serial_number]
|
runtime = runtimes[serial_number]
|
||||||
logger.info(
|
logger.info(
|
||||||
f"{Fore.CYAN}Decrypting: {serial_number} ({relative_path}){Style.RESET_ALL}"
|
f"{Fore.CYAN}Decrypting: {serial_number} ({relative_path}){Style.RESET_ALL}"
|
||||||
@@ -189,18 +190,16 @@ async def decrypt_process_async(
|
|||||||
f.write(data)
|
f.write(data)
|
||||||
|
|
||||||
# Check BCC; mutates "data"
|
# Check BCC; mutates "data"
|
||||||
if int.from_bytes(data[20:24], "little") == 9:
|
if dword(data, 20) == 9:
|
||||||
cipher_text_offset = int.from_bytes(data[28:32], "little")
|
cipher_text_offset = dword(data, 28)
|
||||||
cipher_text_length = int.from_bytes(data[32:36], "little")
|
cipher_text_length = dword(data, 32)
|
||||||
nonce = data[36:40] + data[44:52]
|
nonce = data[36:40] + data[44:52]
|
||||||
bcc_aes_decrypted = general_aes_ctr_decrypt(
|
bcc_aes_decrypted = general_aes_ctr_decrypt(
|
||||||
data[
|
bytes_sub(data, cipher_text_offset, cipher_text_length),
|
||||||
cipher_text_offset : cipher_text_offset + cipher_text_length
|
|
||||||
],
|
|
||||||
runtime.runtime_aes_key,
|
runtime.runtime_aes_key,
|
||||||
nonce,
|
nonce,
|
||||||
)
|
)
|
||||||
data = data[int.from_bytes(data[56:60], "little") :]
|
data = data[dword(data, 56) :]
|
||||||
bcc_architecture_mapping = {
|
bcc_architecture_mapping = {
|
||||||
0x2001: "win-x64",
|
0x2001: "win-x64",
|
||||||
0x2003: "linux-x64",
|
0x2003: "linux-x64",
|
||||||
@@ -208,28 +207,21 @@ async def decrypt_process_async(
|
|||||||
while True:
|
while True:
|
||||||
if len(bcc_aes_decrypted) < 16:
|
if len(bcc_aes_decrypted) < 16:
|
||||||
break
|
break
|
||||||
bcc_segment_offset = int.from_bytes(
|
bcc_segment_offset = dword(bcc_aes_decrypted, 0)
|
||||||
bcc_aes_decrypted[0:4], "little"
|
bcc_segment_length = dword(bcc_aes_decrypted, 4)
|
||||||
)
|
bcc_architecture_id = dword(bcc_aes_decrypted, 8)
|
||||||
bcc_segment_length = int.from_bytes(
|
bcc_next_segment_offset = dword(bcc_aes_decrypted, 12)
|
||||||
bcc_aes_decrypted[4:8], "little"
|
|
||||||
)
|
|
||||||
bcc_architecture_id = int.from_bytes(
|
|
||||||
bcc_aes_decrypted[8:12], "little"
|
|
||||||
)
|
|
||||||
bcc_next_segment_offset = int.from_bytes(
|
|
||||||
bcc_aes_decrypted[12:16], "little"
|
|
||||||
)
|
|
||||||
bcc_architecture = bcc_architecture_mapping.get(
|
bcc_architecture = bcc_architecture_mapping.get(
|
||||||
bcc_architecture_id, f"0x{bcc_architecture_id:x}"
|
bcc_architecture_id, f"0x{bcc_architecture_id:x}"
|
||||||
)
|
)
|
||||||
bcc_file_path = f"{dest_path}.1shot.bcc.{bcc_architecture}.so"
|
bcc_file_path = f"{dest_path}.1shot.bcc.{bcc_architecture}.so"
|
||||||
with open(bcc_file_path, "wb") as f:
|
with open(bcc_file_path, "wb") as f:
|
||||||
f.write(
|
f.write(
|
||||||
bcc_aes_decrypted[
|
bytes_sub(
|
||||||
bcc_segment_offset : bcc_segment_offset
|
bcc_aes_decrypted,
|
||||||
+ bcc_segment_length
|
bcc_segment_offset,
|
||||||
]
|
bcc_segment_length,
|
||||||
|
)
|
||||||
)
|
)
|
||||||
logger.info(
|
logger.info(
|
||||||
f"{Fore.GREEN}Extracted BCC mode native part: {bcc_file_path}{Style.RESET_ALL}"
|
f"{Fore.GREEN}Extracted BCC mode native part: {bcc_file_path}{Style.RESET_ALL}"
|
||||||
@@ -238,8 +230,8 @@ async def decrypt_process_async(
|
|||||||
break
|
break
|
||||||
bcc_aes_decrypted = bcc_aes_decrypted[bcc_next_segment_offset:]
|
bcc_aes_decrypted = bcc_aes_decrypted[bcc_next_segment_offset:]
|
||||||
|
|
||||||
cipher_text_offset = int.from_bytes(data[28:32], "little")
|
cipher_text_offset = dword(data, 28)
|
||||||
cipher_text_length = int.from_bytes(data[32:36], "little")
|
cipher_text_length = dword(data, 32)
|
||||||
nonce = data[36:40] + data[44:52]
|
nonce = data[36:40] + data[44:52]
|
||||||
seq_file_path = dest_path + ".1shot.seq"
|
seq_file_path = dest_path + ".1shot.seq"
|
||||||
with open(seq_file_path, "wb") as f:
|
with open(seq_file_path, "wb") as f:
|
||||||
@@ -249,10 +241,7 @@ async def decrypt_process_async(
|
|||||||
f.write(data[:cipher_text_offset])
|
f.write(data[:cipher_text_offset])
|
||||||
f.write(
|
f.write(
|
||||||
general_aes_ctr_decrypt(
|
general_aes_ctr_decrypt(
|
||||||
data[
|
bytes_sub(data, cipher_text_offset, cipher_text_length),
|
||||||
cipher_text_offset : cipher_text_offset
|
|
||||||
+ cipher_text_length
|
|
||||||
],
|
|
||||||
runtime.runtime_aes_key,
|
runtime.runtime_aes_key,
|
||||||
nonce,
|
nonce,
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -2,5 +2,5 @@ def dword(buffer, idx: int) -> int:
|
|||||||
return int.from_bytes(buffer[idx : idx + 4], "little")
|
return int.from_bytes(buffer[idx : idx + 4], "little")
|
||||||
|
|
||||||
|
|
||||||
def bytes_sub(buffer, start: int, length: int) -> int:
|
def bytes_sub(buffer, start: int, length: int) -> bytes:
|
||||||
return buffer[start : start + length]
|
return buffer[start : start + length]
|
||||||
|
|||||||
Reference in New Issue
Block a user