|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
import sys |
|
import importlib.util |
|
import re |
|
from pathlib import Path |
|
import traceback |
|
|
|
|
|
def load_module_from_path(module_name, file_path): |
|
"""从文件路径加载Python模块""" |
|
try: |
|
spec = importlib.util.spec_from_file_location(module_name, file_path) |
|
if spec is None: |
|
print(f"错误: 无法从 {file_path} 创建模块规范") |
|
return None |
|
|
|
module = importlib.util.module_from_spec(spec) |
|
sys.modules[module_name] = module |
|
spec.loader.exec_module(module) |
|
return module |
|
except Exception as e: |
|
print(f"加载模块时出错: {e}") |
|
traceback.print_exc() |
|
return None |
|
|
|
|
|
def run_script_with_env_question(script_name): |
|
"""使用环境变量中的问题运行脚本""" |
|
|
|
question = os.environ.get("OWL_QUESTION") |
|
if not question: |
|
print("错误: 未设置OWL_QUESTION环境变量") |
|
sys.exit(1) |
|
|
|
|
|
script_path = Path(script_name).resolve() |
|
if not script_path.exists(): |
|
print(f"错误: 脚本 {script_path} 不存在") |
|
sys.exit(1) |
|
|
|
|
|
temp_script_path = script_path.with_name(f"temp_{script_path.name}") |
|
|
|
try: |
|
|
|
try: |
|
with open(script_path, "r", encoding="utf-8") as f: |
|
content = f.read() |
|
except Exception as e: |
|
print(f"读取脚本文件时出错: {e}") |
|
sys.exit(1) |
|
|
|
|
|
has_main = re.search(r"def\s+main\s*\(\s*\)\s*:", content) is not None |
|
|
|
|
|
escaped_question = ( |
|
question.replace("\\", "\\\\") |
|
.replace('"', '\\"') |
|
.replace("'", "\\'") |
|
.replace("\n", "\\n") |
|
.replace("\r", "\\r") |
|
) |
|
|
|
|
|
|
|
question_assignments = re.findall( |
|
r'question\s*=\s*(?:["\'].*?["\']|""".*?"""|\'\'\'.*?\'\'\'|\(.*?\))', |
|
content, |
|
re.DOTALL, |
|
) |
|
print(f"在脚本中找到 {len(question_assignments)} 个question赋值") |
|
|
|
|
|
modified_content = content |
|
|
|
|
|
if question_assignments: |
|
for assignment in question_assignments: |
|
modified_content = modified_content.replace( |
|
assignment, f'question = "{escaped_question}"' |
|
) |
|
print(f"已替换脚本中的所有question赋值为: {question}") |
|
else: |
|
|
|
if has_main: |
|
main_match = re.search(r"def\s+main\s*\(\s*\)\s*:", content) |
|
if main_match: |
|
insert_pos = main_match.start() |
|
modified_content = ( |
|
content[:insert_pos] |
|
+ f'\n# 用户输入的问题\nquestion = "{escaped_question}"\n\n' |
|
+ content[insert_pos:] |
|
) |
|
print(f"已在main函数前插入问题: {question}") |
|
else: |
|
|
|
modified_content = ( |
|
f'# 用户输入的问题\nquestion = "{escaped_question}"\n\n' + content |
|
) |
|
print(f"已在文件开头插入问题: {question}") |
|
|
|
|
|
monkey_patch_code = f""" |
|
# 确保construct_society函数使用用户的问题 |
|
original_construct_society = globals().get('construct_society') |
|
if original_construct_society: |
|
def patched_construct_society(*args, **kwargs): |
|
# 忽略传入的参数,始终使用用户的问题 |
|
return original_construct_society("{escaped_question}") |
|
|
|
# 替换原始函数 |
|
globals()['construct_society'] = patched_construct_society |
|
print("已修补construct_society函数,确保使用用户问题") |
|
""" |
|
|
|
|
|
modified_content += monkey_patch_code |
|
|
|
|
|
if has_main and "__main__" not in content: |
|
modified_content += """ |
|
|
|
# 确保调用main函数 |
|
if __name__ == "__main__": |
|
main() |
|
""" |
|
print("已添加main函数调用代码") |
|
|
|
|
|
if ( |
|
"construct_society" in content |
|
and "run_society" in content |
|
and "Answer:" not in content |
|
): |
|
modified_content += f""" |
|
|
|
# 确保执行construct_society和run_society |
|
if "construct_society" in globals() and "run_society" in globals(): |
|
try: |
|
society = construct_society("{escaped_question}") |
|
from utils import run_society |
|
answer, chat_history, token_count = run_society(society) |
|
print(f"Answer: {{answer}}") |
|
except Exception as e: |
|
print(f"运行时出错: {{e}}") |
|
import traceback |
|
traceback.print_exc() |
|
""" |
|
print("已添加construct_society和run_society调用代码") |
|
|
|
|
|
try: |
|
|
|
script_dir = script_path.parent |
|
if str(script_dir) not in sys.path: |
|
sys.path.insert(0, str(script_dir)) |
|
|
|
|
|
try: |
|
with open(temp_script_path, "w", encoding="utf-8") as f: |
|
f.write(modified_content) |
|
print(f"已创建临时脚本文件: {temp_script_path}") |
|
except Exception as e: |
|
print(f"创建临时脚本文件时出错: {e}") |
|
sys.exit(1) |
|
|
|
try: |
|
|
|
print("开始执行脚本...") |
|
|
|
|
|
if has_main: |
|
|
|
module_name = f"temp_{script_path.stem}" |
|
module = load_module_from_path(module_name, temp_script_path) |
|
|
|
if module is None: |
|
print(f"错误: 无法加载模块 {module_name}") |
|
sys.exit(1) |
|
|
|
|
|
setattr(module, "question", question) |
|
|
|
|
|
if hasattr(module, "construct_society"): |
|
original_func = module.construct_society |
|
|
|
def patched_func(*args, **kwargs): |
|
return original_func(question) |
|
|
|
module.construct_society = patched_func |
|
print("已在模块级别修补construct_society函数") |
|
|
|
|
|
if hasattr(module, "main"): |
|
print("调用main函数...") |
|
module.main() |
|
else: |
|
print(f"错误: 脚本 {script_path} 中没有main函数") |
|
sys.exit(1) |
|
else: |
|
|
|
print("直接执行脚本内容...") |
|
|
|
with open(temp_script_path, "r", encoding="utf-8") as f: |
|
script_code = f.read() |
|
|
|
|
|
safe_globals = { |
|
"__file__": str(temp_script_path), |
|
"__name__": "__main__", |
|
} |
|
|
|
safe_globals.update( |
|
{k: v for k, v in globals().items() if k in ["__builtins__"]} |
|
) |
|
|
|
|
|
exec(script_code, safe_globals) |
|
|
|
except Exception as e: |
|
print(f"执行脚本时出错: {e}") |
|
traceback.print_exc() |
|
sys.exit(1) |
|
|
|
except Exception as e: |
|
print(f"处理脚本时出错: {e}") |
|
traceback.print_exc() |
|
sys.exit(1) |
|
|
|
except Exception as e: |
|
print(f"处理脚本时出错: {e}") |
|
traceback.print_exc() |
|
sys.exit(1) |
|
|
|
finally: |
|
|
|
if temp_script_path.exists(): |
|
try: |
|
temp_script_path.unlink() |
|
print(f"已删除临时脚本文件: {temp_script_path}") |
|
except Exception as e: |
|
print(f"删除临时脚本文件时出错: {e}") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
if len(sys.argv) < 2: |
|
print("用法: python script_adapter.py <script_path>") |
|
sys.exit(1) |
|
|
|
|
|
run_script_with_env_question(sys.argv[1]) |
|
|