28 Commits

Author SHA1 Message Date
71f9fea419 chore 2025-04-10 18:10:13 +08:00
LilRan
7ff305a0f8 Merge pull request GH-9 from Anon0x23/Anon0x23-patch-1
refactor, asyncio, colorful console output, BCC mode fallback, interactive menu
2025-04-10 15:59:43 +08:00
Anon0x23
665ec7efd6 Update shot.py
Add multi-platform support for pyarmor_runtime executables

1. Platform Detection and Executable Selection:
   - Automatically detects OS (Windows, Linux, macOS) and architecture
   - Intelligently selects the appropriate executable based on platform
   - Supports architecture-specific executables (e.g., pyarmor-1shot-windows-x86_64.exe)
   - Falls back to platform-specific and generic executables when needed

2. Runtime File Auto-Detection:
   - Added ability to automatically find all pyarmor_runtime files in a directory
   - Supports multiple runtime file formats (.pyd, .so, .dylib)
   - Recursively searches through all subdirectories

3. New Command-Line Options:
   - Added --executable option to specify a custom executable path
   - Added --auto-detect-runtime option to automatically detect runtime files
2025-04-10 00:02:21 +02:00
Anon0x23
b404b47d74 Update 0.1.1
## [0.1.1]

### Added
- Interactive menu system

- BCC decryption when standart decryption fails
- Colorful console output for better readability
- Multiprocessing with configurable thread count
- Improved logging with color-coded messages

### Fixed
- Improved error detection for BCC mode switching
- Fixed indentation issues in BCC fallback method
- Enhanced timeout handling for long-running deobfuscation tasks
2025-04-09 23:42:59 +02:00
本光
4363ff705c feat: multi process (GH-6) 2025-04-06 00:40:47 +08:00
352d14ec82 fix: type hints, match 2025-03-26 15:10:12 +08:00
e7397945d3 feat: file format detection, single file mode 2025-03-25 23:47:21 +08:00
48cab893c9 ci: build workflow 2025-03-15 23:26:59 +08:00
64df67ac8a fix: temp patch for robustness 2025-03-15 22:35:38 +08:00
d9e00c3762 docs: update readme 2025-03-07 11:08:08 +08:00
1d5b35a9d6 fix: py3.12 py3.13 limited support 2025-03-07 00:10:45 +08:00
d88944b832 fix: py3.8 limited support 2025-03-06 14:55:22 +08:00
dc5d2b95e0 fix: py3.11 limited support 2025-03-05 20:07:46 +08:00
c3f9db22c1 fix: py3.10 limited support 2025-03-05 18:08:18 +08:00
7d0e38e9cf chore: interact & quality 2025-03-05 10:41:09 +08:00
c66529938f fix: type hints (GH-2) 2025-03-04 21:22:56 +08:00
a3bf6aa443 chore: do not treat warnings as errors 2025-03-04 20:02:27 +08:00
Suyun114
3aaae62400 fix: use cast to avoid warnings 2025-03-04 19:34:16 +08:00
832f35413b feat: mix str for disasm 2025-03-02 23:48:24 +08:00
6465cf03fe wip: executable 2025-03-02 22:42:04 +08:00
87b83d754a wip: ast! :) 2025-03-02 21:39:14 +08:00
69bb088847 fix: wrong xor key offset, remove redeclaration 2025-03-01 17:27:06 +08:00
93ad403e99 wip: co_code 2025-03-01 15:25:03 +08:00
dacfd29c02 wip: co_flags 2025-02-28 16:56:15 +08:00
21785687b6 wip: fulfill co_code aes nonce xor key calculate 2025-02-28 14:38:42 +08:00
5f66aa8d25 wip: co_code aes nonce xor key calculate 2025-02-28 00:39:00 +08:00
2cab8c4c92 wip: header parsing 2025-02-27 16:10:30 +08:00
79e22e193a wip: extract data 2025-02-27 14:01:14 +08:00
24 changed files with 3098 additions and 187 deletions

55
.github/workflows/build.yml vendored Normal file
View File

@@ -0,0 +1,55 @@
name: Build
on:
push:
branches: [main]
pull_request:
jobs:
build:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest, macos-latest]
steps:
- uses: actions/checkout@v4
- name: Build
run: |
mkdir build
cd build
cmake ..
cmake --build . --config Debug
cmake --install .
- name: Upload artifact
uses: actions/upload-artifact@v4
with:
name: pyarmor-1shot-build-${{ matrix.os }}
path: |
helpers
README.md
README-Decompyle++.markdown
LICENSE
windows-build:
runs-on: windows-latest
steps:
- uses: actions/checkout@v4
- name: Build
run: |
mkdir build
cd build
cmake -G "MinGW Makefiles" ..
cmake --build . --config Debug
cmake --install .
- name: Upload artifact
uses: actions/upload-artifact@v4
with:
name: pyarmor-1shot-build-windows
path: |
helpers
README.md
README-Decompyle++.markdown
LICENSE

View File

@@ -1,67 +0,0 @@
# For most projects, this workflow file will not need changing; you simply need
# to commit it to your repository.
#
# You may wish to alter this file to override the set of languages analyzed,
# or to provide custom queries or build logic.
name: "CodeQL"
on:
push:
branches: [master]
pull_request:
# The branches below must be a subset of the branches above
branches: [master]
schedule:
- cron: '0 1 * * 2'
jobs:
analyze:
name: Analyze
runs-on: ubuntu-latest
permissions:
actions: read
contents: read
security-events: write
strategy:
fail-fast: false
matrix:
language: ['cpp', 'python']
# CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python', 'ruby' ]
# Use only 'java' to analyze code written in Java, Kotlin or both
# Use only 'javascript' to analyze code written in JavaScript, TypeScript or both
# Learn more about CodeQL language support at https://aka.ms/codeql-docs/language-support
steps:
- name: Checkout repository
uses: actions/checkout@v3
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v2
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.
# Details on CodeQL's query packs refer to : https://docs.github.com/en/code-security/code-scanning/automatically-scanning-your-code-for-vulnerabilities-and-errors/configuring-code-scanning#using-queries-in-ql-packs
# queries: security-extended,security-and-quality
# Autobuild attempts to build any compiled languages (C/C++, C#, Go, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- if: matrix.language == 'python'
name: Autobuild Python
uses: github/codeql-action/autobuild@v2
- if: matrix.language == 'cpp'
name: Build C++
run: |
mkdir build && cd build
cmake -DCMAKE_BUILD_TYPE=RelWithDebInfo ..
make
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v2
with:
category: "/language:${{matrix.language}}"

View File

@@ -1,29 +0,0 @@
name: Linux-CI
on:
push:
branches: [master]
pull_request:
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- name: Configure and Build
run: |
(
mkdir build-debug && cd build-debug
cmake -DCMAKE_BUILD_TYPE=Debug ..
make -j4
)
(
mkdir build-release && cd build-release
cmake -DCMAKE_BUILD_TYPE=Debug ..
make -j4
)
- name: Test
run: |
cmake --build build-debug --target check
cmake --build build-release --target check

View File

@@ -1,29 +0,0 @@
name: MSVC-CI
on:
push:
branches: [master]
pull_request:
jobs:
build:
runs-on: windows-latest
steps:
- uses: actions/checkout@v2
- name: Configure and Build
run: |
mkdir build
cd build
cmake -G "Visual Studio 17 2022" -A Win32 ..
cmake --build . --config Debug
cmake --build . --config Release
- name: Test
run: |
cmake --build build --config Debug --target check
cmake --build build --config Release --target check
- name: Upload artifact
uses: actions/upload-artifact@v3
with:
name: pycdc-release
path: build\Release\*.exe

5
.gitignore vendored
View File

@@ -6,3 +6,8 @@
/.kdev4
__pycache__
tests-out
.vscode/
build/
pyarmor-1shot
pyarmor-1shot.exe

View File

@@ -5,6 +5,7 @@
#include "FastStack.h"
#include "pyc_numeric.h"
#include "bytecode.h"
#include "plusaes.hpp"
// This must be a triple quote (''' or """), to handle interpolated string literals containing the opposite quote style.
// E.g. f'''{"interpolated "123' literal"}''' -> valid.
@@ -72,6 +73,134 @@ static void CheckIfExpr(FastStack& stack, PycRef<ASTBlock> curblock)
stack.push(new ASTTernary(std::move(if_block), std::move(if_expr), std::move(else_expr)));
}
PycRef<ASTNode> PyarmorMixStrDecrypt(const std::string &inputString, PycModule *mod)
{
std::string result(inputString.substr(1));
if (inputString[0] & 0x80)
{
unsigned char nonce[16] = {0};
memcpy(nonce, mod->pyarmor_mix_str_aes_nonce, 12);
nonce[15] = 2;
plusaes::crypt_ctr(
(unsigned char *)&result[0],
result.length(),
mod->pyarmor_aes_key,
16,
&nonce);
}
switch (inputString[0] & 0x7F)
{
case 1:
{
PycRef<PycString> new_str = new PycString(PycString::TYPE_UNICODE);
new_str->setValue(result);
return new ASTObject(new_str.cast<PycObject>());
}
case 2:
{
PycBuffer buf(result.data(), (int)result.length());
return new ASTObject(LoadObject(&buf, mod));
}
case 3:
{
PycRef<PycString> new_str = new PycString(PycString::TYPE_UNICODE);
new_str->setValue(result);
return new ASTImport(new ASTName(new_str), nullptr);
}
case 4:
default:
{
fprintf(stderr, "Unknown PyarmorAssert string first byte: %d\n", inputString[0] & 0x7F);
PycRef<PycString> new_str = new PycString(PycString::TYPE_UNICODE);
new_str->setValue((char)(inputString[0] & 0x7F) + result);
return new ASTObject(new_str.cast<PycObject>());
}
}
}
void CallOrPyarmorBuiltins(FastStack &stack, PycRef<ASTBlock> &curblock, PycModule *mod)
{
if (stack.empty() || stack.top()->type() != ASTNode::NODE_CALL)
return;
PycRef<ASTCall> call = stack.top().cast<ASTCall>();
if (call->func().type() != ASTNode::NODE_OBJECT)
return;
PycRef<PycString> func_name = call->func().cast<ASTObject>()->object().try_cast<PycString>();
if (func_name == nullptr || !func_name->startsWith("__pyarmor_"))
return;
const std::string& name = func_name->strValue();
if (name.find("__pyarmor_assert_") == std::string::npos)
{
PycRef<PycString> new_str = new PycString(PycString::TYPE_UNICODE);
new_str->setValue(name + "(...)");
stack.pop();
stack.push(new ASTObject(new_str.cast<PycObject>()));
// str '__pyarmor_enter_12345__(...)'
return;
}
if (call->pparams().size() != 1) // pyarmor_assert takes exactly one parameter
return;
const auto& param = call->pparams().front();
if (param.type() == ASTNode::NODE_OBJECT)
{
PycRef<PycString> obj = param.cast<ASTObject>()->object().try_cast<PycString>();
if (obj == nullptr)
return;
PycRef<ASTNode> new_node = PyarmorMixStrDecrypt(obj->strValue(), mod);
stack.pop();
stack.push(new_node);
// result of __pyarmor_assert__(b'something')
return;
}
if (param.type() == ASTNode::NODE_TUPLE)
{
const auto &tuple = param.cast<ASTTuple>();
if (tuple->values().size() <= 1 || tuple->values().size() > 3)
return;
if (tuple->values()[1].type() != ASTNode::NODE_OBJECT)
return;
auto enc_str = tuple->values()[1].cast<ASTObject>()->object().try_cast<PycString>();
if (enc_str == nullptr)
return;
PycRef<ASTNode> attr_name = PyarmorMixStrDecrypt(enc_str->strValue(), mod);
if (attr_name->type() != ASTNode::NODE_OBJECT)
return;
auto name_str = attr_name.cast<ASTObject>()->object().try_cast<PycString>();
if (name_str == nullptr)
return;
PycRef<ASTNode> attr_ref = new ASTBinary(tuple->values()[0], new ASTName(name_str), ASTBinary::BIN_ATTR);
if (tuple->values().size() == 2)
{
stack.pop();
stack.push(attr_ref);
// __pyarmor_assert__((from, b'enc_attr')) -> from.enc_attr
return;
}
else if (tuple->values().size() == 3)
{
stack.pop();
curblock->append(new ASTStore(tuple->values()[2], attr_ref));
// __pyarmor_assert__((from, b'enc_attr', value)) -> from.enc_attr = value
return;
}
}
if (param.type() == ASTNode::NODE_NAME)
{
stack.pop();
stack.push(param);
// __pyarmor_assert__(name) -> name
return;
}
}
PycRef<ASTNode> BuildFromCode(PycRef<PycCode> code, PycModule* mod)
{
PycBuffer source(code->code()->value(), code->code()->length());
@@ -423,6 +552,9 @@ PycRef<ASTNode> BuildFromCode(PycRef<PycCode> code, PycModule* mod)
}
break;
case Pyc::CALL_A:
// BEGIN ONESHOT TEMPORARY PATCH
case Pyc::CALL_KW_A:
// END ONESHOT PATCH
case Pyc::CALL_FUNCTION_A:
case Pyc::INSTRUMENTED_CALL_A:
{
@@ -454,6 +586,17 @@ PycRef<ASTNode> BuildFromCode(PycRef<PycCode> code, PycModule* mod)
stack.pop();
PycRef<ASTNode> loadbuild = stack.top();
stack.pop();
// BEGIN ONESHOT TEMPORARY PATCH
if (loadbuild == nullptr)
{
loadbuild = stack.top();
stack.pop();
}
if (stack.top() == nullptr)
{
stack.pop();
}
// END ONESHOT PATCH
int loadbuild_type = loadbuild.type();
if (loadbuild_type == ASTNode::NODE_LOADBUILDCLASS) {
PycRef<ASTNode> call = new ASTCall(function, pparamList, kwparamList);
@@ -467,13 +610,19 @@ PycRef<ASTNode> BuildFromCode(PycRef<PycCode> code, PycModule* mod)
stack_hist.pop();
}
/*
KW_NAMES(i)
Stores a reference to co_consts[consti] into an internal variable for use by CALL.
co_consts[consti] must be a tuple of strings.
New in version 3.11.
*/
if (mod->verCompare(3, 11) >= 0) {
// BEGIN ONESHOT TEMPORARY PATCH
if (mod->verCompare(3, 13) >= 0 && opcode == Pyc::CALL_KW_A) {
PycRef<PycTuple> kw_names = stack.top().cast<ASTObject>()->object().cast<PycTuple>();
stack.pop();
kwparams = kw_names->values().size();
pparams = operand - kwparams;
for (auto it = kw_names->values().rbegin(); it != kw_names->values().rend(); it++) {
kwparamList.push_front(std::make_pair(new ASTObject(*it), stack.top()));
stack.pop();
}
}
else if (mod->verCompare(3, 11) >= 0) {
// END ONESHOT PATCH
PycRef<ASTNode> object_or_map = stack.top();
if (object_or_map.type() == ASTNode::NODE_KW_NAMES_MAP) {
stack.pop();
@@ -513,14 +662,28 @@ PycRef<ASTNode> BuildFromCode(PycRef<PycCode> code, PycModule* mod)
pparamList.push_front(param);
}
}
// BEGIN ONESHOT TEMPORARY PATCH
// For Python 3.13 and later
if (mod->verCompare(3, 13) >= 0) {
PycRef<ASTNode> self_or_null = stack.top();
stack.pop();
if (self_or_null != nullptr) {
pparamList.push_front(self_or_null);
}
}
PycRef<ASTNode> func = stack.top();
stack.pop();
if ((opcode == Pyc::CALL_A || opcode == Pyc::INSTRUMENTED_CALL_A) &&
stack.top() == nullptr) {
mod->verCompare(3, 13) < 0 && stack.top() == nullptr) {
stack.pop();
}
// END ONESHOT PATCH
stack.push(new ASTCall(func, pparamList, kwparamList));
// BEGIN ONESHOT TEMPORARY PATCH
CallOrPyarmorBuiltins(stack, curblock, mod);
// END ONESHOT PATCH
}
break;
case Pyc::CALL_FUNCTION_VAR_A:
@@ -548,6 +711,10 @@ PycRef<ASTNode> BuildFromCode(PycRef<PycCode> code, PycModule* mod)
PycRef<ASTNode> call = new ASTCall(func, pparamList, kwparamList);
call.cast<ASTCall>()->setVar(var);
stack.push(call);
// BEGIN ONESHOT TEMPORARY PATCH
CallOrPyarmorBuiltins(stack, curblock, mod);
// END ONESHOT PATCH
}
break;
case Pyc::CALL_FUNCTION_KW_A:
@@ -575,6 +742,10 @@ PycRef<ASTNode> BuildFromCode(PycRef<PycCode> code, PycModule* mod)
PycRef<ASTNode> call = new ASTCall(func, pparamList, kwparamList);
call.cast<ASTCall>()->setKW(kw);
stack.push(call);
// BEGIN ONESHOT TEMPORARY PATCH
CallOrPyarmorBuiltins(stack, curblock, mod);
// END ONESHOT PATCH
}
break;
case Pyc::CALL_FUNCTION_VAR_KW_A:
@@ -605,8 +776,49 @@ PycRef<ASTNode> BuildFromCode(PycRef<PycCode> code, PycModule* mod)
call.cast<ASTCall>()->setKW(kw);
call.cast<ASTCall>()->setVar(var);
stack.push(call);
// BEGIN ONESHOT TEMPORARY PATCH
CallOrPyarmorBuiltins(stack, curblock, mod);
// END ONESHOT PATCH
}
break;
// BEGIN ONESHOT TEMPORARY PATCH
case Pyc::CALL_FUNCTION_EX_A:
{
PycRef<ASTNode> kw;
if (operand & 0x01)
{
kw = stack.top();
stack.pop();
}
PycRef<ASTNode> var = stack.top();
stack.pop();
ASTCall::pparam_t pparamList;
if (mod->verCompare(3, 13) >= 0)
{
PycRef<ASTNode> param = stack.top();
stack.pop();
if (param != nullptr)
pparamList.push_front(param);
}
PycRef<ASTNode> func = stack.top();
stack.pop();
if (mod->verCompare(3, 13) < 0 && stack.top() == nullptr)
{
stack.pop();
}
PycRef<ASTNode> call = new ASTCall(func, pparamList, ASTCall::kwparam_t());
if (operand & 0x01)
call.cast<ASTCall>()->setKW(kw);
call.cast<ASTCall>()->setVar(var);
stack.push(call);
CallOrPyarmorBuiltins(stack, curblock, mod);
}
break;
// END ONESHOT PATCH
case Pyc::CALL_METHOD_A:
{
ASTCall::pparam_t pparamList;
@@ -633,6 +845,10 @@ PycRef<ASTNode> BuildFromCode(PycRef<PycCode> code, PycModule* mod)
PycRef<ASTNode> func = stack.top();
stack.pop();
stack.push(new ASTCall(func, pparamList, ASTCall::kwparam_t()));
// BEGIN ONESHOT TEMPORARY PATCH
CallOrPyarmorBuiltins(stack, curblock, mod);
// END ONESHOT PATCH
}
break;
case Pyc::CONTINUE_LOOP_A:
@@ -1307,6 +1523,19 @@ PycRef<ASTNode> BuildFromCode(PycRef<PycCode> code, PycModule* mod)
bool push = true;
do {
// BEGIN ONESHOT TEMPORARY PATCH
// This implementation is probably wrong
// Pyarmor jumps forward on top level scope
auto &top = blocks.top();
if (top == defblock) {
pos += offs;
for (int i = 0; i < offs; i++) {
source.getByte();
}
break;
}
// END ONESHOT PATCH
blocks.pop();
if (!blocks.empty())
@@ -1374,7 +1603,8 @@ PycRef<ASTNode> BuildFromCode(PycRef<PycCode> code, PycModule* mod)
blocks.push(except);
}
} else {
fprintf(stderr, "Something TERRIBLE happened!!\n");
fprintf(stderr, "Something TERRIBLE happened!! at %s\n",
mod->verCompare(3, 11) >= 0 ? code->qualName()->value() : code->name()->value());
}
prev = nil;
} else {
@@ -1550,6 +1780,9 @@ PycRef<ASTNode> BuildFromCode(PycRef<PycCode> code, PycModule* mod)
break;
case Pyc::MAKE_CLOSURE_A:
case Pyc::MAKE_FUNCTION_A:
// BEGIN ONESHOT TEMPORARY PATCH
case Pyc::MAKE_FUNCTION:
// END ONESHOT PATCH
{
PycRef<ASTNode> fun_code = stack.top();
stack.pop();
@@ -1563,19 +1796,49 @@ PycRef<ASTNode> BuildFromCode(PycRef<PycCode> code, PycModule* mod)
}
ASTFunction::defarg_t defArgs, kwDefArgs;
const int defCount = operand & 0xFF;
const int kwDefCount = (operand >> 8) & 0xFF;
for (int i = 0; i < defCount; ++i) {
defArgs.push_front(stack.top());
stack.pop();
// BEGIN ONESHOT TEMPORARY PATCH
if (mod->verCompare(3, 6) < 0)
{
const int defCount = operand & 0xFF;
const int kwDefCount = (operand >> 8) & 0xFF;
for (int i = 0; i < defCount; ++i) {
defArgs.push_front(stack.top());
stack.pop();
}
for (int i = 0; i < kwDefCount; ++i) {
kwDefArgs.push_front(stack.top());
stack.pop();
}
if ((operand >> 16) & 0x7FFF) {
// a tuple listing the parameter names for the annotations (only if there are any annotation objects)
stack.pop();
}
}
for (int i = 0; i < kwDefCount; ++i) {
kwDefArgs.push_front(stack.top());
stack.pop();
else if (mod->verCompare(3, 13) < 0)
{
if (operand & 0x08)
stack.pop();
if (operand & 0x04)
stack.pop();
if (operand & 0x02)
stack.pop();
if (operand & 0x01)
stack.pop();
}
// END ONESHOT PATCH
stack.push(new ASTFunction(fun_code, defArgs, kwDefArgs));
}
break;
// BEGIN ONESHOT TEMPORARY PATCH
case Pyc::SET_FUNCTION_ATTRIBUTE_A:
{
PycRef<ASTNode> func = stack.top();
stack.pop();
stack.pop();
stack.push(func);
}
break;
// END ONESHOT PATCH
case Pyc::NOP:
break;
case Pyc::POP_BLOCK:
@@ -1606,9 +1869,17 @@ PycRef<ASTNode> BuildFromCode(PycRef<PycCode> code, PycModule* mod)
stack = stack_hist.top();
stack_hist.pop();
} else {
fprintf(stderr, "Warning: Stack history is empty, something wrong might have happened\n");
fprintf(stderr, "Warning: Stack history is empty, something wrong might have happened at %s\n",
mod->verCompare(3, 11) >= 0 ? code->qualName()->value() : code->name()->value());
}
}
// BEGIN ONESHOT TEMPORARY PATCH
// The problem is not here, but in the way the blocks are created
// Add this to avoid segfault
if (blocks.top() == defblock)
break;
// END ONESHOT PATCH
PycRef<ASTBlock> tmp = curblock;
blocks.pop();
@@ -1890,7 +2161,8 @@ PycRef<ASTNode> BuildFromCode(PycRef<PycCode> code, PycModule* mod)
stack.pop();
if (none != NULL) {
fprintf(stderr, "Something TERRIBLE happened!\n");
fprintf(stderr, "Something TERRIBLE happened! at %s\n",
mod->verCompare(3, 11) >= 0 ? code->qualName()->value() : code->name()->value());
break;
}
@@ -1998,7 +2270,8 @@ PycRef<ASTNode> BuildFromCode(PycRef<PycCode> code, PycModule* mod)
if (tup.type() == ASTNode::NODE_TUPLE)
tup.cast<ASTTuple>()->add(attr);
else
fputs("Something TERRIBLE happened!\n", stderr);
fprintf(stderr, "Something TERRIBLE happened! at %s\n",
mod->verCompare(3, 11) >= 0 ? code->qualName()->value() : code->name()->value());
if (--unpack <= 0) {
stack.pop();
@@ -2033,7 +2306,8 @@ PycRef<ASTNode> BuildFromCode(PycRef<PycCode> code, PycModule* mod)
if (tup.type() == ASTNode::NODE_TUPLE)
tup.cast<ASTTuple>()->add(name);
else
fputs("Something TERRIBLE happened!\n", stderr);
fprintf(stderr, "Something TERRIBLE happened! at %s\n",
mod->verCompare(3, 11) >= 0 ? code->qualName()->value() : code->name()->value());
if (--unpack <= 0) {
stack.pop();
@@ -2073,7 +2347,8 @@ PycRef<ASTNode> BuildFromCode(PycRef<PycCode> code, PycModule* mod)
if (tup.type() == ASTNode::NODE_TUPLE)
tup.cast<ASTTuple>()->add(name);
else
fputs("Something TERRIBLE happened!\n", stderr);
fprintf(stderr, "Something TERRIBLE happened! at %s\n",
mod->verCompare(3, 11) >= 0 ? code->qualName()->value() : code->name()->value());
if (--unpack <= 0) {
stack.pop();
@@ -2132,7 +2407,8 @@ PycRef<ASTNode> BuildFromCode(PycRef<PycCode> code, PycModule* mod)
if (tup.type() == ASTNode::NODE_TUPLE)
tup.cast<ASTTuple>()->add(name);
else
fputs("Something TERRIBLE happened!\n", stderr);
fprintf(stderr, "Something TERRIBLE happened! at %s\n",
mod->verCompare(3, 11) >= 0 ? code->qualName()->value() : code->name()->value());
if (--unpack <= 0) {
stack.pop();
@@ -2174,7 +2450,8 @@ PycRef<ASTNode> BuildFromCode(PycRef<PycCode> code, PycModule* mod)
if (tup.type() == ASTNode::NODE_TUPLE)
tup.cast<ASTTuple>()->add(name);
else
fputs("Something TERRIBLE happened!\n", stderr);
fprintf(stderr, "Something TERRIBLE happened! at %s\n",
mod->verCompare(3, 11) >= 0 ? code->qualName()->value() : code->name()->value());
if (--unpack <= 0) {
stack.pop();
@@ -2295,7 +2572,8 @@ PycRef<ASTNode> BuildFromCode(PycRef<PycCode> code, PycModule* mod)
if (tup.type() == ASTNode::NODE_TUPLE)
tup.cast<ASTTuple>()->add(save);
else
fputs("Something TERRIBLE happened!\n", stderr);
fprintf(stderr, "Something TERRIBLE happened! at %s\n",
mod->verCompare(3, 11) >= 0 ? code->qualName()->value() : code->name()->value());
if (--unpack <= 0) {
stack.pop();
@@ -2450,6 +2728,9 @@ PycRef<ASTNode> BuildFromCode(PycRef<PycCode> code, PycModule* mod)
as no-ops. */
break;
case Pyc::PUSH_NULL:
// BEGIN ONESHOT TEMPORARY PATCH
case Pyc::BEGIN_FINALLY:
// END ONESHOT PATCH
stack.push(nullptr);
break;
case Pyc::GEN_START_A:
@@ -2473,8 +2754,54 @@ PycRef<ASTNode> BuildFromCode(PycRef<PycCode> code, PycModule* mod)
stack.push(next_tup);
}
break;
// BEGIN ONESHOT TEMPORARY PATCH
// THESE OPCODES ARE NOT IMPLEMENTED HERE
case Pyc::COPY_A:
{
FastStack tmp_stack(20);
for (int i = 0; i < operand - 1; i++) {
tmp_stack.push(stack.top());
stack.pop();
}
auto value = stack.top();
for (int i = 0; i < operand - 1; i++) {
stack.push(tmp_stack.top());
tmp_stack.pop();
}
stack.push(value);
}
break;
case Pyc::PUSH_EXC_INFO:
{
stack.push(stack.top());
}
break;
case Pyc::JUMP_IF_NOT_EXC_MATCH_A:
{
PycRef<ASTNode> ex_type = stack.top();
stack.pop();
PycRef<ASTNode> cur_ex = stack.top();
stack.pop();
}
break;
case Pyc::RERAISE:
case Pyc::RERAISE_A:
case Pyc::MAKE_CELL_A:
case Pyc::COPY_FREE_VARS_A:
case Pyc::TO_BOOL:
case Pyc::CALL_INTRINSIC_1_A:
break;
case Pyc::CALL_INTRINSIC_2_A:
{
stack.pop();
}
break;
// END ONESHOT PATCH
default:
fprintf(stderr, "Unsupported opcode: %s (%d)\n", Pyc::OpcodeName(opcode), opcode);
fprintf(stderr, "Unsupported opcode: %s (%d) at %s\n",
Pyc::OpcodeName(opcode),
opcode,
mod->verCompare(3, 11) >= 0 ? code->qualName()->value() : code->name()->value());
cleanBuild = false;
return new ASTNodeList(defblock->nodes());
}
@@ -2486,7 +2813,8 @@ PycRef<ASTNode> BuildFromCode(PycRef<PycCode> code, PycModule* mod)
}
if (stack_hist.size()) {
fputs("Warning: Stack history is not empty!\n", stderr);
fprintf(stderr, "Warning: Stack history is not empty! at %s\n",
mod->verCompare(3, 11) >= 0 ? code->qualName()->value() : code->name()->value());
while (stack_hist.size()) {
stack_hist.pop();
@@ -2494,7 +2822,8 @@ PycRef<ASTNode> BuildFromCode(PycRef<PycCode> code, PycModule* mod)
}
if (blocks.size() > 1) {
fputs("Warning: block stack is not empty!\n", stderr);
fprintf(stderr, "Warning: block stack is not empty! at %s\n",
mod->verCompare(3, 11) >= 0 ? code->qualName()->value() : code->name()->value());
while (blocks.size() > 1) {
PycRef<ASTBlock> tmp = blocks.top();
@@ -2898,6 +3227,12 @@ void print_src(PycRef<ASTNode> node, PycModule* mod, std::ostream& pyc_output)
case ASTNode::NODE_BLOCK:
{
PycRef<ASTBlock> blk = node.cast<ASTBlock>();
// BEGIN ONESHOT TEMPORARY PATCH
if (blk->blktype() == ASTBlock::BLK_MAIN)
break;
// END ONESHOT PATCH
if (blk->blktype() == ASTBlock::BLK_ELSE && blk->size() == 0)
break;

View File

@@ -17,15 +17,19 @@ if (ENABLE_STACK_DEBUG)
endif()
if(CMAKE_COMPILER_IS_GNUCXX OR "${CMAKE_CXX_COMPILER_ID}" MATCHES "Clang")
set(CMAKE_CXX_FLAGS "-Wall -Wextra -Wno-error=shadow -Werror ${CMAKE_CXX_FLAGS}")
elseif(MSVC)
set(CMAKE_CXX_FLAGS "/WX ${CMAKE_CXX_FLAGS}")
add_definitions(-D_CRT_SECURE_NO_WARNINGS)
set(CMAKE_CXX_FLAGS "-Wall -Wextra -Wno-error=shadow ${CMAKE_CXX_FLAGS}")
endif()
if(CMAKE_COMPILER_IS_GNUCXX)
set(CMAKE_EXE_LINKER_FLAGS "-static -static-libgcc -static-libstdc++ ${CMAKE_EXE_LINKER_FLAGS}")
endif()
include_directories(${CMAKE_CURRENT_SOURCE_DIR})
add_library(pycxx STATIC
add_executable(pyarmor-1shot
pyarmor-1shot.cpp
ASTree.cpp
ASTNode.cpp
bytecode.cpp
data.cpp
pyc_code.cpp
@@ -64,22 +68,5 @@ add_library(pycxx STATIC
bytes/python_3_13.cpp
)
add_executable(pycdas pycdas.cpp)
target_link_libraries(pycdas pycxx)
install(TARGETS pycdas
RUNTIME DESTINATION bin)
add_executable(pycdc pycdc.cpp ASTree.cpp ASTNode.cpp)
target_link_libraries(pycdc pycxx)
install(TARGETS pycdc
RUNTIME DESTINATION bin)
find_package(Python3 3.6 COMPONENTS Interpreter)
if(Python3_FOUND)
add_custom_target(check
COMMAND "${Python3_EXECUTABLE}" "${CMAKE_CURRENT_SOURCE_DIR}/tests/run_tests.py"
WORKING_DIRECTORY "$<TARGET_FILE_DIR:pycdc>")
add_dependencies(check pycdc)
endif()
install(TARGETS pyarmor-1shot
RUNTIME DESTINATION ${CMAKE_CURRENT_SOURCE_DIR}/helpers)

70
README.md Normal file
View File

@@ -0,0 +1,70 @@
# Pyarmor Static Unpack One-Shot Tool
[Pyarmor](https://github.com/dashingsoft/pyarmor) is a popular tool to protect Python source code. It turns Python scripts into binary data, which can be regarded as an encrypted variant of pyc files. They can be decrypted by a shared library (pyarmor_runtime) and then executed by Python interpreter.
This project aims to convert armored data back to bytecode assembly and (experimentally) source code. We forked the awesome [Decompyle++](https://github.com/zrax/pycdc) (aka pycdc), and added some processes on it like modifying abstract syntax tree.
> [!WARNING]
>
> **Disassembly results are accurate, but decompiled code can be incomplete and incorrect.** [See issue #3](https://github.com/Lil-House/Pyarmor-Static-Unpack-1shot/issues/3)
## Features
### Static
You don't need to execute the encrypted script. We decrypt them using the same algorithm as pyarmor_runtime. This is useful when the scripts cannot be trusted.
### Universal
Currently we are trying to support Pyarmor 8.0 to 9.1.3 (latest), Python 3.7 - 3.13, on all operating systems, with obfuscating options as many as possible. (However, we only have limited tests.)
You can run this tool in any environment, no need to be the same with obfuscated scripts or runtime.
> [!NOTE]
>
> If the data starts with `PY` followed by six digits, it is supported. Otherwise, if it starts with `PYARMOR`, it is generated by Pyarmor 7 or earlier, and is not supported.
### Easy to use
The only thing you need to do is specifying where your obfuscated scripts are. The tool does everything like detecting armored data, parsing, disassembling, and decompiling. See "Usage" section below.
## Build
``` bash
mkdir build
cd build
cmake ..
cmake --build .
cmake --install .
```
You can also download prebuilt binary files on [releases page](https://github.com/Lil-House/Pyarmor-Static-Unpack-1shot/releases).
## Usage
``` bash
python /path/to/helpers/shot.py /path/to/scripts
```
Before running `shot.py`, make sure the executable `pyarmor-1shot` (`pyarmor-1shot.exe` on Windows) exists in `helpers` directory.
You only need to specify the directory that contains all armored data and `pyarmor_runtime`. The tool finds and handles them recursively as much as possible.
When necessary, specify a `pyarmor_runtime` executable with `-r path/to/pyarmor_runtime[.pyd|.so|.dylib]`.
All files generated from this tool have a `.1shot.` in file names. If you want to save them in another directory instead of in-place, use `-o another/path/`. Folder structure will remain unchanged.
Note:
- Subdirectories will not be touched if the folder name is exactly `__pycache__` or `site-packages` or it directly contains a file named `.no1shot`, and symbolic links will not be followed, to avoid repeat or forever loop and save time. If you really need them, run the script later in these directories and specify the runtime.
- Archives, executables generated by PyInstaller and so on, must be unpacked by other tools before decrypting, or you will encounter undefined behavior.
## Feedback
Feel free to open an issue if you have any questions, suggestions, or problems. Don't forget to attach the armored data and the `pyarmor_runtime` executable if possible.
## Todo (PR Welcome!)
- [ ] Multi-platform pyarmor_runtime executable
- [ ] Support more obfuscating options
- [ ] Regenerate pyc for other backends

View File

@@ -116,7 +116,7 @@ int Pyc::ByteToOpcode(int maj, int min, int opcode)
}
void print_const(std::ostream& pyc_output, PycRef<PycObject> obj, PycModule* mod,
const char* parent_f_string_quote)
const char* parent_f_string_quote, bool das_decrypt_print)
{
if (obj == NULL) {
pyc_output << "<NULL>";
@@ -131,7 +131,9 @@ void print_const(std::ostream& pyc_output, PycRef<PycObject> obj, PycModule* mod
case PycObject::TYPE_ASCII_INTERNED:
case PycObject::TYPE_SHORT_ASCII:
case PycObject::TYPE_SHORT_ASCII_INTERNED:
obj.cast<PycString>()->print(pyc_output, mod, false, parent_f_string_quote);
das_decrypt_print
? obj.cast<PycString>()->dasPrintAndDecrypt(pyc_output, mod, false, parent_f_string_quote)
: obj.cast<PycString>()->print(pyc_output, mod, false, parent_f_string_quote);
break;
case PycObject::TYPE_TUPLE:
case PycObject::TYPE_SMALL_TUPLE:
@@ -362,7 +364,7 @@ void bc_disasm(std::ostream& pyc_output, PycRef<PycCode> code, PycModule* mod,
try {
auto constParam = code->getConst(operand);
formatted_print(pyc_output, "%d: ", operand);
print_const(pyc_output, constParam, mod);
print_const(pyc_output, constParam, mod, nullptr, true);
} catch (const std::out_of_range &) {
formatted_print(pyc_output, "%d <INVALID>", operand);
}

View File

@@ -28,7 +28,7 @@ int ByteToOpcode(int maj, int min, int opcode);
}
void print_const(std::ostream& pyc_output, PycRef<PycObject> obj, PycModule* mod,
const char* parent_f_string_quote = nullptr);
const char* parent_f_string_quote = nullptr, bool das_decrypt_print = false);
void bc_next(PycBuffer& source, PycModule* mod, int& opcode, int& operand, int& pos);
void bc_disasm(std::ostream& pyc_output, PycRef<PycCode> code, PycModule* mod,
int indent, unsigned flags);

0
helpers/__init__.py Normal file
View File

157
helpers/detect.py Normal file
View File

@@ -0,0 +1,157 @@
import logging
import os
from typing import List, Tuple, Union
def ascii_ratio(data: bytes) -> float:
return sum(32 <= c < 127 for c in data) / len(data)
def source_as_file(file_path: str) -> Union[List[bytes], None]:
try:
with open(file_path, 'r') as f:
co = compile(f.read(), '<str>', 'exec')
data = [i for i in co.co_consts if type(i) is bytes
and i.startswith(b'PY00') and len(i) > 64]
return data
except:
return None
def source_as_lines(file_path: str) -> Union[List[bytes], None]:
data = []
try:
with open(file_path, 'r') as f:
for line in f:
try:
co = compile(line, '<str>', 'exec')
data.extend([i for i in co.co_consts if type(i) is bytes
and i.startswith(b'PY00') and len(i) > 64])
except:
# ignore not compilable lines
pass
except:
return None
return data
def find_data_from_bytes(data: bytes, max_count=-1) -> List[bytes]:
result = []
idx = 0
while len(result) != max_count:
idx = data.find(b'PY00')
if idx == -1:
break
data = data[idx:]
if len(data) < 64:
break
header_len = int.from_bytes(data[28:32], 'little')
body_len = int.from_bytes(data[32:36], 'little')
if header_len > 256 or body_len > 0xFFFFF or header_len + body_len > len(data):
# compressed or coincident, skip
data = data[5:]
continue
result.append(data[:header_len + body_len])
# maybe followed by data for other Python versions from the same file,
# we do not extract them
followed_by_another_equivalent = int.from_bytes(
data[56:60], 'little') != 0
data = data[header_len + body_len:]
while followed_by_another_equivalent \
and data.startswith(b'PY00') \
and len(data) >= 64:
header_len = int.from_bytes(data[28:32], 'little')
body_len = int.from_bytes(data[32:36], 'little')
followed_by_another_equivalent = int.from_bytes(
data[56:60], 'little') != 0
data = data[header_len + body_len:]
return result
def nuitka_package(head: bytes, relative_path: str) -> Union[List[Tuple[str, bytes]], None]:
first_occurrence = head.find(b'PY00')
if first_occurrence == -1:
return None
last_dot_bytecode = head.rfind(b'.bytecode\x00', 0, first_occurrence)
if last_dot_bytecode == -1:
return None
length = int.from_bytes(
head[last_dot_bytecode-4:last_dot_bytecode], 'little')
end = last_dot_bytecode + length
cur = last_dot_bytecode
result = []
while cur < end:
module_name_len = head.find(b'\x00', cur, end) - cur
module_name = head[cur:cur + module_name_len].decode('utf-8')
cur += module_name_len + 1
module_len = int.from_bytes(head[cur:cur + 4], 'little')
cur += 4
module_data = find_data_from_bytes(head[cur:cur + module_len], 1)
if module_data:
result.append((os.path.join(relative_path.rstrip(
'/\\') + '.1shot.ext', module_name), module_data[0]))
cur += module_len
if result:
logger = logging.getLogger('detect')
logger.info(f'Found data in Nuitka package: {relative_path}')
return result
return None
def detect_process(file_path: str, relative_path: str) -> Union[List[Tuple[str, bytes]], None]:
'''
Returns a list of (relative_path, bytes_raw) tuples, or None.
Do not raise exceptions.
'''
logger = logging.getLogger('detect')
try:
with open(file_path, 'rb') as f:
head = f.read(16 * 1024 * 1024)
except:
logger.error(f'Failed to read file: {relative_path}')
return None
if b'__pyarmor__' not in head:
# no need to dig deeper
return None
if ascii_ratio(head[:2048]) >= 0.9:
# the whole file may not be compiled, but we can still try some lines;
# None means failure (then we make another try),
# empty list means success but no data found (then we skip this file)
result = source_as_file(file_path)
if result is None:
result = source_as_lines(file_path)
if result is None:
return None
result_len = len(result)
if result_len == 0:
return None
elif result_len == 1:
logger.info(f'Found data in source: {relative_path}')
return [(relative_path, result[0])]
else:
logger.info(f'Found data in source: {relative_path}')
return [(f'{relative_path}__{i}', result[i]) for i in range(len(result))]
# binary file
# ignore data after 16MB, before we have a reason to read more
if b'Error, corrupted constants object' in head:
# an interesting special case: packer put armored data in a Nuitka package
# we can know the exact module names, instead of adding boring __0, __1, ...
return nuitka_package(head, relative_path)
result = find_data_from_bytes(head)
result_len = len(result)
if result_len == 0:
return None
elif result_len == 1:
logger.info(f'Found data in binary: {relative_path}')
return [(relative_path, result[0])]
else:
logger.info(f'Found data in binary: {relative_path}')
return [(f'{relative_path}__{i}', result[i]) for i in range(len(result))]

96
helpers/runtime.py Normal file
View File

@@ -0,0 +1,96 @@
import hashlib
GLOBAL_CERT = bytes.fromhex('''
30 82 01 0a 02 82 01 01 00 bf 65 30 f3 bd 67 e7
a6 9d f8 db 18 b2 b9 c1 c0 5f fe fb e5 4b 91 df
6f 38 da 51 cc ea c4 d3 04 bd 95 27 86 c1 13 ca
73 15 44 4d 97 f5 10 b9 52 21 72 16 c8 b2 84 5f
45 56 32 e7 c2 6b ad 2b d9 df 52 d6 e9 d1 2a ba
35 e4 43 ab 54 e7 91 c5 ce d1 f1 ba a5 9f f4 ca
db 89 04 3d f8 9f 6a 8b 8a 29 39 f8 4c 0d b8 a0
6d 51 c4 74 24 64 fe 1a 23 97 f3 61 ea de c8 97
dc 57 60 34 be 2c 18 50 3b d1 76 3b 49 2a 39 9a
37 18 53 8f 1d 4c 82 b1 a0 33 43 57 19 ad 67 e7
af 09 fb 04 54 a9 ea c0 c1 e9 32 6c 77 92 7f 9f
7c 08 7c e8 a1 5d a4 fc 40 e6 6e 18 db bf 45 53
4b 5c a7 9d f2 8f 7e 6c 04 b0 4d ee 99 25 9a 87
84 6e 9e fe 3c 72 ec b0 64 dd 2e db ad 32 fa 1d
4b 2c 1a 78 85 7c bc 2c d0 d7 83 77 5f 92 d5 db
59 10 96 53 2e 5d c7 42 12 b8 61 cb 2c 5f 46 14
9e 93 b0 53 21 a2 74 34 2d 02 03 01 00 01
''')
class RuntimeInfo:
def __init__(self, file_path: str) -> None:
self.file_path = file_path
if file_path.endswith('.pyd'):
self.extract_info_win64()
else:
# TODO: implement for other platforms
self.extract_info_win64()
self.serial_number = self.part_1[12:18].decode()
self.runtime_aes_key = self.calc_aes_key()
def __str__(self) -> str:
trial = self.serial_number == '000000'
product = ''
for c in self.part_3[2:]:
if 32 <= c <= 126:
product += chr(c)
else:
break
return f'''\
========================
Pyarmor Runtime ({'Trial' if trial else self.serial_number}) Information:
Product: {product}
AES key: {self.runtime_aes_key.hex()}
Mix string AES nonce: {self.mix_str_aes_nonce().hex()}
========================'''
def __repr__(self) -> str:
return f'RuntimeInfo(part_1={self.part_1}, part_2={self.part_2}, part_3={self.part_3})'
def extract_info_win64(self) -> None:
'''
Try to find useful information from `pyarmor_runtime.pyd` file,
and store all three parts in the object.
'''
with open(self.file_path, 'rb') as f:
data = f.read(16 * 1024 * 1024)
cur = data.index(b'pyarmor-vax')
if data[cur+11:cur+18] == b'\x00' * 7:
raise ValueError(f'{self.file_path} is a runtime template')
self.part_1 = data[cur:cur+20]
cur += 36
part_2_offset = int.from_bytes(data[cur:cur+4], 'little')
part_2_len = int.from_bytes(data[cur+4:cur+8], 'little')
part_3_offset = int.from_bytes(data[cur+8:cur+12], 'little')
cur += 16
self.part_2 = data[cur+part_2_offset:cur+part_2_offset+part_2_len]
cur += part_3_offset
part_3_len = int.from_bytes(data[cur+4:cur+8], 'little')
cur += 32
self.part_3 = data[cur:cur+part_3_len]
def calc_aes_key(self) -> bytes:
return hashlib.md5(self.part_1 + self.part_2 + self.part_3 + GLOBAL_CERT).digest()
def mix_str_aes_nonce(self) -> bytes:
return self.part_3[:12]
if __name__ == '__main__':
import sys
if len(sys.argv) < 2:
print('Usage: python runtime.py path/to/pyarmor_runtime[.pyd|.so|.dylib]')
exit(1)
for i in sys.argv[1:]:
runtime = RuntimeInfo(i)
print(runtime)

482
helpers/shot.py Normal file
View File

@@ -0,0 +1,482 @@
import argparse
from Crypto.Cipher import AES
import logging
import os
import asyncio
import traceback
import platform
from typing import Dict, List, Tuple
try:
from colorama import init, Fore, Style
except ImportError:
def init(**kwargs): pass
class Fore: CYAN = RED = YELLOW = GREEN = ''
class Style: RESET_ALL = ''
from detect import detect_process
from runtime import RuntimeInfo
# Initialize colorama
init(autoreset=True)
def general_aes_ctr_decrypt(data: bytes, key: bytes, nonce: bytes) -> bytes:
cipher = AES.new(key, AES.MODE_CTR, nonce=nonce, initial_value=2)
return cipher.decrypt(data)
def bcc_fallback_method(seq_file_path: str, output_path: str = None):
# Fallback method for BCC mode deobfuscation
logger = logging.getLogger('shot')
logger.info(f'{Fore.YELLOW}Attempting BCC fallback method for: {seq_file_path}{Style.RESET_ALL}')
if output_path is None:
output_path = seq_file_path + '.back.1shot.seq'
try:
with open(seq_file_path, 'rb') as f:
origin = f.read()
one_shot_header = origin[:32] # Header format
aes_key = one_shot_header[1:17]
bcc_part_length = int.from_bytes(origin[0x58:0x5C], 'little') # If it is 0, it is not BCC part but bytecode part
bytecode_part = origin[32+bcc_part_length:]
aes_nonce = bytecode_part[36:40] + bytecode_part[44:52] # The same position as non-BCC file
with open(output_path, 'wb') as f:
f.write(one_shot_header)
f.write(bytecode_part[:64])
f.write(general_aes_ctr_decrypt(bytecode_part[64:], aes_key, aes_nonce))
logger.info(f'{Fore.GREEN}Successfully created BCC fallback file: {output_path}{Style.RESET_ALL}')
return output_path
except Exception as e:
error_details = traceback.format_exc()
logger.error(f'{Fore.RED}BCC fallback method failed: {e}{Style.RESET_ALL}')
logger.error(f'{Fore.RED}Error details: {error_details}{Style.RESET_ALL}')
return None
async def run_subprocess_async(cmd, cwd=None):
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=cwd
)
stdout, stderr = await process.communicate()
return process.returncode, stdout, stderr
async def decrypt_file_async(exe_path, seq_file_path, path, args):
logger = logging.getLogger('shot')
try:
# Run without timeout
returncode, stdout, stderr = await run_subprocess_async([exe_path, seq_file_path])
stdout_lines = stdout.decode('latin-1').splitlines()
stderr_lines = stderr.decode('latin-1').splitlines()
for line in stdout_lines:
logger.warning(f'PYCDC: {line} ({path})')
for line in stderr_lines:
if line.startswith((
'Warning: Stack history is empty',
'Warning: Stack history is not empty',
'Warning: block stack is not empty',
)):
if args.show_warn_stack or args.show_all:
logger.warning(f'PYCDC: {line} ({path})')
elif line.startswith('Unsupported opcode:'):
if args.show_err_opcode or args.show_all:
logger.error(f'PYCDC: {line} ({path})')
elif line.startswith((
'Something TERRIBLE happened',
'Unsupported argument',
'Unsupported Node type',
'Unsupported node type',
)): # annoying wont-fix errors
if args.show_all:
logger.error(f'PYCDC: {line} ({path})')
else:
logger.error(f'PYCDC: {line} ({path})')
return returncode, stdout_lines, stderr_lines
except Exception as e:
error_details = traceback.format_exc()
logger.error(f'{Fore.RED}Error during async deobfuscation: {e}{Style.RESET_ALL}')
logger.error(f'{Fore.RED}Error details: {error_details}{Style.RESET_ALL}')
return -1, [], []
async def decrypt_process_async(runtimes: Dict[str, RuntimeInfo], sequences: List[Tuple[str, bytes]], args):
logger = logging.getLogger('shot')
output_dir: str = args.output_dir or args.directory
# Create a semaphore to limit concurrent processes
semaphore = asyncio.Semaphore(args.concurrent) # Use the concurrent argument
# Get the appropriate executable for the current platform
exe_path = get_platform_executable(args)
async def process_file(path, data):
async with semaphore:
try:
serial_number = data[2:8].decode('utf-8')
runtime = runtimes[serial_number]
logger.info(f'{Fore.CYAN}Decrypting: {serial_number} ({path}){Style.RESET_ALL}')
dest_path = os.path.join(output_dir, path) if output_dir else path
dest_dir = os.path.dirname(dest_path)
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
if args.export_raw_data:
with open(dest_path + '.1shot.raw', 'wb') as f:
f.write(data)
cipher_text_offset = int.from_bytes(data[28:32], 'little')
cipher_text_length = int.from_bytes(data[32:36], 'little')
nonce = data[36:40] + data[44:52]
seq_file_path = dest_path + '.1shot.seq'
with open(seq_file_path, 'wb') as f:
f.write(b'\xa1' + runtime.runtime_aes_key)
f.write(b'\xa2' + runtime.mix_str_aes_nonce())
f.write(b'\xf0\xff')
f.write(data[:cipher_text_offset])
f.write(general_aes_ctr_decrypt(
data[cipher_text_offset:cipher_text_offset+cipher_text_length], runtime.runtime_aes_key, nonce))
f.write(data[cipher_text_offset+cipher_text_length:])
# Run without timeout
returncode, stdout_lines, stderr_lines = await decrypt_file_async(exe_path, seq_file_path, path, args)
# Check for specific errors that indicate BCC mode
should_try_bcc = False
error_message = ""
# FIXME: Probably false positive
if returncode != 0:
error_message = f"PYCDC returned {returncode}"
# Check for specific error patterns that suggest BCC mode
for line in stderr_lines:
if ("Unsupported opcode" in line or
"Something TERRIBLE happened" in line or
"Unknown opcode 0" in line or
"Got unsupported type" in line):
should_try_bcc = True
error_message += f" - {line}"
break
if should_try_bcc:
logger.warning(f'{Fore.YELLOW}{error_message} ({path}) - Attempting BCC fallback{Style.RESET_ALL}')
# Try BCC fallback method
bcc_file_path = bcc_fallback_method(seq_file_path)
if bcc_file_path:
logger.info(f'{Fore.GREEN}Running deobfuscator on BCC fallback file{Style.RESET_ALL}')
try:
# Run without timeout
returncode, stdout_lines, stderr_lines = await decrypt_file_async(exe_path, bcc_file_path, path, args)
if returncode == 0:
logger.info(f'{Fore.GREEN}Successfully deobfuscated using BCC fallback method{Style.RESET_ALL}')
print(f"{Fore.GREEN} BCC Decrypted: {path}{Style.RESET_ALL}")
else:
logger.error(f'{Fore.RED}BCC fallback deobfuscation failed with return code {returncode}{Style.RESET_ALL}')
for line in stderr_lines:
logger.error(f'{Fore.RED}BCC Error: {line}{Style.RESET_ALL}')
except Exception as e:
error_details = traceback.format_exc()
logger.error(f'{Fore.RED}BCC fallback deobfuscation failed with error: {e}{Style.RESET_ALL}')
logger.error(f'{Fore.RED}Error details: {error_details}{Style.RESET_ALL}')
elif returncode == 0:
# Successfully decrypted
logger.info(f'{Fore.GREEN}Successfully decrypted: {path}{Style.RESET_ALL}')
print(f"{Fore.GREEN} Decrypted: {path}{Style.RESET_ALL}")
else:
logger.warning(f'{Fore.YELLOW}{error_message} ({path}){Style.RESET_ALL}')
except Exception as e:
error_details = traceback.format_exc()
logger.error(f'{Fore.RED}Decrypt failed: {e} ({path}){Style.RESET_ALL}')
logger.error(f'{Fore.RED}Error details: {error_details}{Style.RESET_ALL}')
# Create tasks for all files
tasks = [process_file(path, data) for path, data in sequences]
# Run all tasks concurrently
await asyncio.gather(*tasks)
def decrypt_process(runtimes: Dict[str, RuntimeInfo], sequences: List[Tuple[str, bytes]], args):
asyncio.run(decrypt_process_async(runtimes, sequences, args))
def get_platform_executable(args) -> str:
"""
Get the appropriate executable for the current platform
"""
logger = logging.getLogger('shot')
# If a specific executable is provided, use it
if args.executable:
if os.path.exists(args.executable):
logger.info(f'{Fore.GREEN}Using specified executable: {args.executable}{Style.RESET_ALL}')
return args.executable
else:
logger.warning(f'{Fore.YELLOW}Specified executable not found: {args.executable}{Style.RESET_ALL}')
helpers_dir = os.path.dirname(os.path.abspath(__file__))
system = platform.system().lower()
machine = platform.machine().lower()
# Check for architecture-specific executables
arch_specific_exe = f'pyarmor-1shot-{system}-{machine}'
if system == 'windows':
arch_specific_exe += '.exe'
arch_exe_path = os.path.join(helpers_dir, arch_specific_exe)
if os.path.exists(arch_exe_path):
logger.info(f'{Fore.GREEN}Using architecture-specific executable: {arch_specific_exe}{Style.RESET_ALL}')
return arch_exe_path
platform_map = {
'windows': 'pyarmor-1shot.exe',
'linux': 'pyarmor-1shot',
'darwin': 'pyarmor-1shot',
}
base_exe_name = platform_map.get(system, 'pyarmor-1shot')
# Then check for platform-specific executable
platform_exe_path = os.path.join(helpers_dir, base_exe_name)
if os.path.exists(platform_exe_path):
logger.info(f'{Fore.GREEN}Using platform-specific executable: {base_exe_name}{Style.RESET_ALL}')
return platform_exe_path
# Finally, check for generic executable
generic_exe_path = os.path.join(helpers_dir, 'pyarmor-1shot')
if os.path.exists(generic_exe_path):
logger.info(f'{Fore.GREEN}Using generic executable: pyarmor-1shot{Style.RESET_ALL}')
return generic_exe_path
logger.critical(f'{Fore.RED}Executable {base_exe_name} not found, please build it first or download on https://github.com/Lil-House/Pyarmor-Static-Unpack-1shot/releases {Style.RESET_ALL}')
exit(1)
def parse_args():
parser = argparse.ArgumentParser(
description='Pyarmor Static Unpack 1 Shot Entry')
parser.add_argument(
'directory',
help='the "root" directory of obfuscated scripts',
type=str,
nargs='?',
)
parser.add_argument(
'-r',
'--runtime',
help='path to pyarmor_runtime[.pyd|.so|.dylib]',
type=str, # argparse.FileType('rb'),
)
parser.add_argument(
'-o',
'--output-dir',
help='save output files in another directory instead of in-place, with folder structure remain unchanged',
type=str,
)
parser.add_argument(
'--export-raw-data',
help='save data found in source files as-is',
action='store_true',
)
parser.add_argument(
'--show-all',
help='show all pycdc errors and warnings',
action='store_true',
)
parser.add_argument(
'--show-err-opcode',
help='show pycdc unsupported opcode errors',
action='store_true',
)
parser.add_argument(
'--show-warn-stack',
help='show pycdc stack related warnings',
action='store_true',
)
parser.add_argument(
'--menu',
help='show interactive menu to select folder to unpack',
action='store_true',
)
parser.add_argument(
'--concurrent',
help='number of concurrent deobfuscation processes (default: 4)',
type=int,
default=4,
)
parser.add_argument(
'-e',
'--executable',
help='path to the pyarmor-1shot executable to use',
type=str,
)
return parser.parse_args()
def display_menu():
to_unpack_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'to_unpack')
if not os.path.exists(to_unpack_dir):
os.makedirs(to_unpack_dir)
folders = [d for d in os.listdir(to_unpack_dir)
if os.path.isdir(os.path.join(to_unpack_dir, d))]
if not folders:
print(f"{Fore.YELLOW}No folders found in {to_unpack_dir}{Style.RESET_ALL}")
return None
print(f"\n{Fore.CYAN}=== Available Folders to Unpack ==={Style.RESET_ALL}")
for i, folder in enumerate(folders, 1):
print(f"{Fore.GREEN}[{i}]{Style.RESET_ALL} {folder}")
while True:
try:
choice = input(f"\n{Fore.YELLOW}Enter the number of the folder to unpack (or 'q' to quit): {Style.RESET_ALL}")
if choice.lower() == 'q':
return None
choice_idx = int(choice) - 1
if 0 <= choice_idx < len(folders):
selected_folder = folders[choice_idx]
full_path = os.path.join(to_unpack_dir, selected_folder)
print(f"{Fore.GREEN}Selected: {selected_folder}{Style.RESET_ALL}")
return full_path
else:
print(f"{Fore.RED}Invalid choice. Please enter a number between 1 and {len(folders)}{Style.RESET_ALL}")
except ValueError:
print(f"{Fore.RED}Please enter a valid number{Style.RESET_ALL}")
def main():
args = parse_args()
logging.basicConfig(
level=logging.INFO,
format='%(levelname)-8s %(asctime)-28s %(message)s',
)
logger = logging.getLogger('shot')
print(Fore.CYAN + r'''
____ ____
( __ ) ( __ )
| |~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~| |
| | ____ _ ___ _ _ | |
| | | _ \ _ _ __ _ _ __ _ _ __ ___ _ _ / / __|| |_ ___ | |_ | |
| | | |_) | || |/ _` | '__| ' ` \ / _ \| '_| | \__ \| ' \ / _ \| __| | |
| | | __/| || | (_| | | | || || | (_) | | | |__) | || | (_) | |_ | |
| | |_| \_, |\__,_|_| |_||_||_|\___/|_| |_|___/|_||_|\___/ \__| | |
| | |__/ | |
|__|~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~|__|
(____) (____)
For technology exchange only. Use at your own risk.
GitHub: https://github.com/Lil-House/Pyarmor-Static-Unpack-1shot
''' + Style.RESET_ALL)
# If menu option is selected or no directory is provided, show the menu
if args.menu or not args.directory:
selected_dir = display_menu()
if selected_dir:
args.directory = selected_dir
else:
print(f"{Fore.YELLOW}No directory selected. Exiting.{Style.RESET_ALL}")
return
if args.runtime:
specified_runtime = RuntimeInfo(args.runtime)
print(specified_runtime)
runtimes = {specified_runtime.serial_number: specified_runtime}
else:
specified_runtime = None
runtimes = {}
sequences: List[Tuple[str, bytes]] = []
if args.output_dir and not os.path.exists(args.output_dir):
os.makedirs(args.output_dir)
if os.path.isfile(args.directory):
if specified_runtime is None:
logger.error(f'{Fore.RED}Please specify `pyarmor_runtime` file by `-r` if input is a file{Style.RESET_ALL}')
return
logger.info(f'{Fore.CYAN}Single file mode{Style.RESET_ALL}')
result = detect_process(args.directory, args.directory)
if result is None:
logger.error(f'{Fore.RED}No armored data found{Style.RESET_ALL}')
return
sequences.extend(result)
decrypt_process(runtimes, sequences, args)
return # single file mode ends here
dir_path: str
dirs: List[str]
files: List[str]
for dir_path, dirs, files in os.walk(args.directory, followlinks=False):
if '.no1shot' in files:
logger.info(f'{Fore.YELLOW}Skipping {dir_path} because of `.no1shot`{Style.RESET_ALL}')
dirs.clear()
files.clear()
continue
for d in ['__pycache__', 'site-packages']:
if d in dirs:
dirs.remove(d)
for file_name in files:
if '.1shot.' in file_name:
continue
file_path = os.path.join(dir_path, file_name)
relative_path = os.path.relpath(file_path, args.directory)
if file_name.endswith('.pyz'):
with open(file_path, 'rb') as f:
head = f.read(16 * 1024 * 1024)
if b'PY00' in head \
and (not os.path.exists(file_path + '_extracted')
or len(os.listdir(file_path + '_extracted')) == 0):
logger.error(
f'{Fore.RED}A PYZ file containing armored data is detected, but the PYZ file has not been extracted by other tools. This error is not a problem with this tool. If the folder is extracted by Pyinstxtractor, please read the output information of Pyinstxtractor carefully. ({relative_path}){Style.RESET_ALL}')
continue
# is pyarmor_runtime?
if specified_runtime is None \
and file_name.startswith('pyarmor_runtime') \
and file_name.endswith(('.pyd', '.so', '.dylib')):
try:
new_runtime = RuntimeInfo(file_path)
runtimes[new_runtime.serial_number] = new_runtime
logger.info(
f'{Fore.GREEN}Found new runtime: {new_runtime.serial_number} ({file_path}){Style.RESET_ALL}')
print(new_runtime)
continue
except:
pass
result = detect_process(file_path, relative_path)
if result is not None:
sequences.extend(result)
if not runtimes:
logger.error(f'{Fore.RED}No runtime found{Style.RESET_ALL}')
return
if not sequences:
logger.error(f'{Fore.RED}No armored data found{Style.RESET_ALL}')
return
decrypt_process(runtimes, sequences, args)
if __name__ == '__main__':
main()

1340
plusaes.hpp Normal file

File diff suppressed because it is too large Load Diff

107
pyarmor-1shot.cpp Normal file
View File

@@ -0,0 +1,107 @@
/** I want to use functions in pycdas.cpp directly, but not moving them to
* another file, to sync with upstream in the future easily.
*/
#define main pycdas_main
# include "pycdas.cpp"
#undef main
#include "ASTree.h"
int main(int argc, char* argv[])
{
const char* infile = nullptr;
unsigned disasm_flags = 0;
std::ofstream dc_out_file;
std::ofstream das_out_file;
for (int arg = 1; arg < argc; ++arg) {
if (strcmp(argv[arg], "--pycode-extra") == 0) {
disasm_flags |= Pyc::DISASM_PYCODE_VERBOSE;
} else if (strcmp(argv[arg], "--show-caches") == 0) {
disasm_flags |= Pyc::DISASM_SHOW_CACHES;
} else if (strcmp(argv[arg], "--help") == 0 || strcmp(argv[arg], "-h") == 0) {
fprintf(stderr, "Usage: %s [options] input.1shot.seq\n\n", argv[0]);
fputs("Options:\n", stderr);
fputs(" --pycode-extra Show extra fields in PyCode object dumps\n", stderr);
fputs(" --show-caches Don't suprress CACHE instructions in Python 3.11+ disassembly\n", stderr);
fputs(" --help Show this help text and then exit\n", stderr);
return 0;
} else if (argv[arg][0] == '-') {
fprintf(stderr, "Error: Unrecognized argument %s\n", argv[arg]);
return 1;
} else {
infile = argv[arg];
}
}
if (!infile) {
fputs("No input file specified\n", stderr);
return 1;
}
std::string prefix_name;
const char *prefix_name_pos = strstr(infile, ".1shot.seq");
if (prefix_name_pos == NULL) {
prefix_name = infile;
} else {
prefix_name = std::string(infile, prefix_name_pos - infile + 6);
}
dc_out_file.open(prefix_name + ".cdc.py", std::ios_base::out);
if (dc_out_file.fail()) {
fprintf(stderr, "Error opening file '%s' for writing\n", (prefix_name + ".cdc.py").c_str());
return 1;
}
das_out_file.open(prefix_name + ".das", std::ios_base::out);
if (das_out_file.fail()) {
fprintf(stderr, "Error opening file '%s' for writing\n", (prefix_name + ".das").c_str());
return 1;
}
PycModule mod;
try {
mod.loadFromOneshotSequenceFile(infile);
} catch (std::exception &ex) {
fprintf(stderr, "Error disassembling %s: %s\n", infile, ex.what());
return 1;
}
if (!mod.isValid()) {
fprintf(stderr, "Could not load file %s\n", infile);
return 1;
}
const char* dispname = strrchr(infile, PATHSEP);
dispname = (dispname == NULL) ? infile : dispname + 1;
formatted_print(das_out_file, "%s (Python %d.%d%s)\n", dispname,
mod.majorVer(), mod.minorVer(),
(mod.majorVer() < 3 && mod.isUnicode()) ? " -U" : "");
try {
output_object(mod.code().try_cast<PycObject>(), &mod, 0, disasm_flags,
das_out_file);
} catch (std::exception& ex) {
fprintf(stderr, "Error disassembling %s: %s\n", infile, ex.what());
return 1;
}
das_out_file.flush();
das_out_file.close();
dc_out_file << "# Source Generated with Decompyle++\n";
formatted_print(dc_out_file, "# File: %s (Python %d.%d%s)\n\n", dispname,
mod.majorVer(), mod.minorVer(),
(mod.majorVer() < 3 && mod.isUnicode()) ? " Unicode" : "");
try {
decompyle(mod.code(), &mod, dc_out_file);
} catch (std::exception& ex) {
fprintf(stderr, "Error decompyling %s: %s\n", infile, ex.what());
return 1;
}
dc_out_file.flush();
dc_out_file.close();
return 0;
}

View File

@@ -1,6 +1,7 @@
#include "pyc_code.h"
#include "pyc_module.h"
#include "data.h"
#include "plusaes.hpp"
/* == Marshal structure for Code object ==
1.0 1.3 1.5 2.1 2.3 3.0 3.8 3.11
@@ -64,11 +65,14 @@ void PycCode::load(PycData* stream, PycModule* mod)
else
m_flags = 0;
bool pyarmor_co_obfuscated_flag = m_flags & 0x20000000;
if (mod->verCompare(3, 8) < 0) {
// Remap flags to new values introduced in 3.8
if (m_flags & 0xF0000000)
throw std::runtime_error("Cannot remap unexpected flags");
m_flags = (m_flags & 0xFFFF) | ((m_flags & 0xFFF0000) << 4);
// Pyarmor CO_OBFUSCATED flag always locates at 0x20000000
if (m_flags & 0xD0000000)
fprintf(stderr, "Remapping flags (%08X) may not be correct\n", m_flags);
m_flags = (m_flags & 0x1FFF) | ((m_flags & 0xFFFE000) << 4) | (m_flags & 0x20000000);
}
m_code = LoadObject(stream, mod).cast<PycString>();
@@ -117,6 +121,113 @@ void PycCode::load(PycData* stream, PycModule* mod)
m_exceptTable = LoadObject(stream, mod).cast<PycString>();
else
m_exceptTable = new PycString;
// Pyarmor extra fields
if (!pyarmor_co_obfuscated_flag)
return;
unsigned char extra_data[256] = {0};
unsigned char extra_length = stream->getByte();
stream->getBuffer(extra_length, extra_data);
unsigned char pyarmor_fn_count = extra_data[0] & 3;
unsigned char pyarmor_co_descriptor_count = (extra_data[0] >> 2) & 3;
if (extra_data[0] & 0xF0)
{
fprintf(stderr, "Unsupported Pyarmor CO extra flag (%02X)\n", extra_data[0]);
fprintf(stderr, "Please open an issue at https://github.com/Lil-House/Pyarmor-Static-Unpack-1shot/issues to request support and help to make this tool better.\n");
}
if (pyarmor_co_descriptor_count > 1)
{
fprintf(stderr, "Do not support multiple Pyarmor CO descriptors (%d in total)\n", pyarmor_co_descriptor_count);
fprintf(stderr, "Please open an issue at https://github.com/Lil-House/Pyarmor-Static-Unpack-1shot/issues to request support and help to make this tool better.\n");
}
unsigned char *extra_ptr = extra_data + 4;
for (unsigned char i = 0; i < pyarmor_fn_count; i++)
{
unsigned char item_length = (*extra_ptr >> 6) + 2;
// Ignore the details
extra_ptr += item_length;
}
for (unsigned char i = 0; i < pyarmor_co_descriptor_count; i++)
{
unsigned char item_length = (*extra_ptr >> 6) + 2;
unsigned char *item_end = extra_ptr + item_length;
// Ignore low 6 bits
extra_ptr++;
unsigned long consts_index = 0;
while (extra_ptr < item_end)
{
consts_index = (consts_index << 8) | *extra_ptr;
extra_ptr++;
}
pyarmorDecryptCoCode(consts_index, mod);
}
}
void PycCode::pyarmorDecryptCoCode(unsigned long consts_index, PycModule *mod)
{
PycRef<PycString> descriptor = getConst(consts_index).cast<PycString>();
const std::string &descriptor_str = descriptor->strValue();
if (descriptor_str.length() < 20)
{
fprintf(stderr, "Pyarmor CO descriptor is too short\n");
return;
}
const PyarmorCoDescriptor *desc = (const PyarmorCoDescriptor *)(descriptor_str.data() + 8);
bool copy_prologue = desc->flags & 0x8;
bool xor_aes_nonce = desc->flags & 0x4;
bool short_code = desc->flags & 0x2;
unsigned int nonce_index = short_code
? desc->short_nonce_index
: desc->short_nonce_index + desc->decrypt_begin_index + desc->decrypt_length;
unsigned char nonce[16] = {0};
memcpy(nonce, m_code->value() + nonce_index, 12);
nonce[15] = 2;
if (xor_aes_nonce)
{
if (!mod->pyarmor_co_code_aes_nonce_xor_enabled)
{
fprintf(stderr, "FATAL: Pyarmor CO code AES nonce XOR is not enabled but used\n");
}
else
{
unsigned char *xor_key = mod->pyarmor_co_code_aes_nonce_xor_key;
for (int i = 0; i < 12; i++)
nonce[i] ^= xor_key[i];
}
}
std::string &code_bytes = (std::string &)m_code->strValue();
plusaes::crypt_ctr(
(unsigned char *)&code_bytes[desc->decrypt_begin_index],
desc->decrypt_length,
mod->pyarmor_aes_key,
16,
&nonce);
if (copy_prologue)
{
memcpy(
&code_bytes[0],
&code_bytes[desc->decrypt_length],
desc->decrypt_begin_index);
// Assume tail of code is not used there
memset(
&code_bytes[desc->decrypt_length],
9, // NOP
desc->decrypt_begin_index);
}
// When running, the first 8 bytes are set to &PyCodeObject
std::string new_str = "<COAddr>" + descriptor_str.substr(8);
descriptor->setValue(new_str);
}
PycRef<PycString> PycCode::getCellVar(PycModule* mod, int idx) const

View File

@@ -8,6 +8,16 @@
class PycData;
class PycModule;
struct PyarmorCoDescriptor
{
unsigned char flags;
unsigned char short_nonce_index;
unsigned char _;
unsigned char decrypt_begin_index;
unsigned int decrypt_length;
unsigned int _enter_count;
};
class PycCode : public PycObject {
public:
typedef std::vector<PycRef<PycString>> globals_t;
@@ -35,6 +45,8 @@ public:
CO_FUTURE_GENERATOR_STOP = 0x800000, // 3.5 ->
CO_FUTURE_ANNOTATIONS = 0x1000000, // 3.7 ->
CO_NO_MONITORING_EVENTS = 0x2000000, // 3.13 ->
CO_PYARMOR_OBFUSCATED = 0x20000000, // Pyarmor all
};
PycCode(int type = TYPE_CODE)
@@ -43,6 +55,8 @@ public:
void load(PycData* stream, PycModule* mod) override;
void pyarmorDecryptCoCode(unsigned long consts_index, PycModule *mod);
int argCount() const { return m_argCount; }
int posOnlyArgCount() const { return m_posOnlyArgCount; }
int kwOnlyArgCount() const { return m_kwOnlyArgCount; }

View File

@@ -1,6 +1,7 @@
#include "pyc_module.h"
#include "data.h"
#include <stdexcept>
#include <cstring>
void PycModule::setVersion(unsigned int magic)
{
@@ -251,6 +252,242 @@ void PycModule::loadFromMarshalledFile(const char* filename, int major, int mino
m_code = LoadObject(&in, this).cast<PycCode>();
}
void PycModule::loadFromOneshotSequenceFile(const char *filename)
{
PycFile in(filename);
if (!in.isOpen())
{
fprintf(stderr, "Error opening file %s\n", filename);
return;
}
bool oneshot_seq_header = true;
while (oneshot_seq_header)
{
int indicator = in.getByte();
switch (indicator)
{
case 0xA1:
in.getBuffer(16, this->pyarmor_aes_key);
break;
case 0xA2:
in.getBuffer(12, this->pyarmor_mix_str_aes_nonce);
break;
case 0xF0:
break;
case 0xFF:
oneshot_seq_header = false;
break;
default:
fprintf(stderr, "Unknown 1-shot sequence indicator %02X\n", indicator);
break;
}
}
// Write only. Some fields unknown to us or not needed for decryption are discarded.
char discard_buffer[64];
char pyarmor_header[64];
in.getBuffer(64, pyarmor_header);
this->m_maj = pyarmor_header[9];
this->m_min = pyarmor_header[10];
this->m_unicode = (m_maj >= 3);
unsigned int remain_header_length = *(unsigned int *)(pyarmor_header + 28) - 64;
while (remain_header_length)
{
unsigned int discard_length = (remain_header_length > 64) ? 64 : remain_header_length;
in.getBuffer(discard_length, discard_buffer);
remain_header_length -= discard_length;
}
// For 1-shot sequence, the following part has been decrypted once.
unsigned int code_object_offset = in.get32();
unsigned int xor_key_procedure_length = in.get32();
this->pyarmor_co_code_aes_nonce_xor_enabled = (xor_key_procedure_length > 0);
unsigned int remain_second_part_length = code_object_offset - 8;
while (remain_second_part_length)
{
unsigned int discard_length = (remain_second_part_length > 64) ? 64 : remain_second_part_length;
in.getBuffer(discard_length, discard_buffer);
remain_second_part_length -= discard_length;
}
if (this->pyarmor_co_code_aes_nonce_xor_enabled)
{
char *procedure_buffer = (char *)malloc(xor_key_procedure_length);
in.getBuffer(xor_key_procedure_length, procedure_buffer);
pyarmorCoCodeAesNonceXorKeyCalculate(
procedure_buffer,
xor_key_procedure_length,
this->pyarmor_co_code_aes_nonce_xor_key);
free(procedure_buffer);
}
m_code = LoadObject(&in, this).cast<PycCode>();
}
#define GET_REAL_OPERAND_2_AND_ADD_CURRENT_PTR(CUR, REF) \
do \
{ \
unsigned char _INSIDE_LOW_NIBBLE = (CUR)[1] & 0xF; \
if (valid_index[_INSIDE_LOW_NIBBLE] != -1) \
{ \
(REF) = registers[_INSIDE_LOW_NIBBLE]; \
(CUR) += 2; \
} \
else \
{ \
unsigned int _INSIDE_SIZE = (CUR)[1] & 0x7; \
if (_INSIDE_SIZE == 1) \
{ \
(REF) = *(char *)((CUR) + 2); \
(CUR) += 3; \
} \
else if (_INSIDE_SIZE == 2) \
{ \
(REF) = *(short *)((CUR) + 2); \
(CUR) += 4; \
} \
else \
{ \
(REF) = *(int *)((CUR) + 2); \
(CUR) += 6; \
} \
} \
} while (0)
void pyarmorCoCodeAesNonceXorKeyCalculate(const char *in_buffer, unsigned int in_buffer_length, unsigned char *out_buffer)
{
unsigned char *cur = (unsigned char *)in_buffer + 16;
unsigned char *end = (unsigned char *)in_buffer + in_buffer_length;
int registers[8] = {0};
const int valid_index[16] = {
0,
1,
2,
3,
4,
5,
-1,
7, /* origin is 15 */
-1,
-1,
-1,
-1,
-1,
-1,
-1,
-1,
};
while (cur < end)
{
int operand_2 = 0;
unsigned char high_nibble = 0;
unsigned char reg = 0;
switch (*cur)
{
case 1:
// terminator
cur++;
break;
case 2:
high_nibble = cur[1] >> 4;
GET_REAL_OPERAND_2_AND_ADD_CURRENT_PTR(cur, operand_2);
registers[high_nibble] += operand_2;
break;
case 3:
high_nibble = cur[1] >> 4;
GET_REAL_OPERAND_2_AND_ADD_CURRENT_PTR(cur, operand_2);
registers[high_nibble] -= operand_2;
break;
case 4:
high_nibble = cur[1] >> 4;
GET_REAL_OPERAND_2_AND_ADD_CURRENT_PTR(cur, operand_2);
registers[high_nibble] *= operand_2;
/** We found that in x86_64, machine code is
* imul reg64, reg/imm
* so we get the low bits of the result.
*/
break;
case 5:
high_nibble = cur[1] >> 4;
GET_REAL_OPERAND_2_AND_ADD_CURRENT_PTR(cur, operand_2);
registers[high_nibble] /= operand_2;
/** We found that in x86_64, machine code is
* mov r10d, imm32 ; when necessary
* mov rax, reg64
* cqo
* idiv r10/reg64 ; r10/reg64 is the operand_2
* mov reg64, rax
* so rax (0) is tampered.
*/
registers[0] = registers[high_nibble];
break;
case 6:
high_nibble = cur[1] >> 4;
GET_REAL_OPERAND_2_AND_ADD_CURRENT_PTR(cur, operand_2);
registers[high_nibble] ^= operand_2;
break;
case 7:
high_nibble = cur[1] >> 4;
GET_REAL_OPERAND_2_AND_ADD_CURRENT_PTR(cur, operand_2);
registers[high_nibble] = operand_2;
break;
case 8:
/** We found that in x86_64, machine code is
* mov reg1, ptr [reg2]
* This hardly happens.
*/
cur += 2;
break;
case 9:
reg = cur[1] & 0x7;
*(int *)out_buffer = registers[reg];
cur += 2;
break;
case 0xA:
/**
* This happens when 4 bytes of total 12 bytes nonce are calculated,
* and the result is to be stored in the memory. So the address from
* register 7 (15) is moved to one of the registers.
*
* We don't really care about the address and the register number.
* So we just skip 6 bytes (0A ... and 02 ...).
*
* For example:
*
* [0A [1F] 00] - [00][011][111] - mov rbx<3>, [rbp<7>-18h]
* [rbp-18h] is the address
* [02 [39] 0C] - [0011][1][001] - add rbx<3>, 0Ch
* 0Ch is a fixed offset
* [09 [98] ] - [10][011][000] - mov [rbx<3>], eax<0>
* eax<0> is the value to be stored
*
* Another example:
*
* [0A [07] 00] - [00][000][111] - mov rax<0>, [rbp<7>-18h]
* [02 [09] 0C] - [0000][1][001] - add rax<0>, 0Ch
* [0B [83] 04] - [10][000][011] - mov [rax<0>+4], ebx<3>
* 4 means [4..8] of 12 bytes nonce
*/
cur += 6;
break;
case 0xB:
reg = cur[1] & 0x7;
*(int *)(out_buffer + cur[2]) = registers[reg];
cur += 3;
break;
default:
fprintf(stderr, "FATAL: Unknown opcode %d at %lld\n", *cur, (long long)(cur - (unsigned char *)in_buffer));
memset(out_buffer, 0, 12);
cur = end;
break;
}
}
}
PycRef<PycString> PycModule::getIntern(int ref) const
{
if (ref < 0 || (size_t)ref >= m_interns.size())

View File

@@ -46,6 +46,7 @@ public:
void loadFromFile(const char* filename);
void loadFromMarshalledFile(const char *filename, int major, int minor);
void loadFromOneshotSequenceFile(const char* filename);
bool isValid() const { return (m_maj >= 0) && (m_min >= 0); }
int majorVer() const { return m_maj; }
@@ -80,6 +81,11 @@ public:
static bool isSupportedVersion(int major, int minor);
unsigned char pyarmor_aes_key[16];
unsigned char pyarmor_mix_str_aes_nonce[12];
bool pyarmor_co_code_aes_nonce_xor_enabled;
unsigned char pyarmor_co_code_aes_nonce_xor_key[12];
private:
void setVersion(unsigned int magic);
@@ -92,4 +98,6 @@ private:
std::vector<PycRef<PycObject>> m_refs;
};
void pyarmorCoCodeAesNonceXorKeyCalculate(const char *in_buffer, unsigned int in_buffer_length, unsigned char *out_buffer);
#endif

View File

@@ -2,6 +2,7 @@
#include "pyc_module.h"
#include "data.h"
#include <stdexcept>
#include "plusaes.hpp"
static bool check_ascii(const std::string& data)
{
@@ -154,3 +155,28 @@ void PycString::print(std::ostream &pyc_output, PycModule* mod, bool triple,
pyc_output << (useQuotes ? '"' : '\'');
}
}
void PycString::dasPrintAndDecrypt(std::ostream &stream, PycModule *mod, bool triple, const char *parent_f_string_quote)
{
if (m_value.empty() || !(m_value[0] & 0x80)
|| (m_value[0] & 0x7F) == 0 || (m_value[0] & 0x7F) > 4)
return print(stream, mod, triple, parent_f_string_quote);
std::string result(m_value.substr(1));
unsigned char nonce[16] = {0};
memcpy(nonce, mod->pyarmor_mix_str_aes_nonce, 12);
nonce[15] = 2;
plusaes::crypt_ctr(
(unsigned char *)&result[0],
result.length(),
mod->pyarmor_aes_key,
16,
&nonce);
PycString decrypted(TYPE_UNICODE);
decrypted.setValue(result);
decrypted.print(stream, mod, triple, parent_f_string_quote);
stream << " # ";
print(stream, mod, triple, parent_f_string_quote);
}

View File

@@ -30,6 +30,10 @@ public:
void print(std::ostream& stream, class PycModule* mod, bool triple = false,
const char* parent_f_string_quote = nullptr);
void dasPrintAndDecrypt(std::ostream& stream, class PycModule* mod,
bool triple = false,
const char* parent_f_string_quote = nullptr);
private:
std::string m_value;
};

View File

@@ -23,7 +23,7 @@ static const char* flag_names[] = {
"CO_FUTURE_PRINT_FUNCTION", "CO_FUTURE_UNICODE_LITERALS", "CO_FUTURE_BARRY_AS_BDFL",
"CO_FUTURE_GENERATOR_STOP",
"CO_FUTURE_ANNOTATIONS", "CO_NO_MONITORING_EVENTS", "<0x4000000>", "<0x8000000>",
"<0x10000000>", "<0x20000000>", "<0x40000000>", "<0x80000000>"
"<0x10000000>", "CO_PYARMOR_OBFUSCATED", "<0x40000000>", "<0x80000000>"
};
static void print_coflags(unsigned long flags, std::ostream& pyc_output)
@@ -104,7 +104,7 @@ void output_object(PycRef<PycObject> obj, PycModule* mod, int indent,
unsigned int orig_flags = codeObj->flags();
if (mod->verCompare(3, 8) < 0) {
// Remap flags back to the value stored in the PyCode object
orig_flags = (orig_flags & 0xFFFF) | ((orig_flags & 0xFFF00000) >> 4);
orig_flags = (orig_flags & 0x1FFF) | ((orig_flags & 0xDFFE0000) >> 4) | (orig_flags & 0x20000000);
}
iprintf(pyc_output, indent + 1, "Flags: 0x%08X", orig_flags);
print_coflags(codeObj->flags(), pyc_output);
@@ -165,7 +165,7 @@ void output_object(PycRef<PycObject> obj, PycModule* mod, int indent,
case PycObject::TYPE_SHORT_ASCII:
case PycObject::TYPE_SHORT_ASCII_INTERNED:
iputs(pyc_output, indent, "");
obj.cast<PycString>()->print(pyc_output, mod);
obj.cast<PycString>()->dasPrintAndDecrypt(pyc_output, mod);
pyc_output << "\n";
break;
case PycObject::TYPE_TUPLE: