This commit is contained in:
zhangquan 2026-03-31 09:33:49 +08:00
parent 8f869f6b3f
commit b08f29e8bf
10 changed files with 0 additions and 1577 deletions

View File

@ -1,55 +0,0 @@
# ============================================
# 初中物理作业批改工作流 - 环境变量配置示例
# ============================================
# 复制此文件为 .env 并填写实际值
# cp .env.example .env
# ============================================
# 必需配置 - 大语言模型 API
# ============================================
# LLM API 密钥从火山引擎或OpenAI获取
LLM_API_KEY=your-api-key-here
# LLM API 基础URL
# 火山引擎: https://ark.cn-beijing.volces.com/api/v3
# OpenAI: https://api.openai.com/v1
LLM_BASE_URL=https://ark.cn-beijing.volces.com/api/v3
# 模型名称
# 火山引擎推荐: doubao-seed-2-0-pro-260215
# OpenAI推荐: gpt-4o
LLM_MODEL_NAME=doubao-seed-2-0-pro-260215
# 注意不需要配置对象存储S3/TOS/OSS等
# 图片直接使用原始URL不上传存储
# ============================================
# 可选配置 - 日志与缓存
# ============================================
# 日志级别: DEBUG, INFO, WARNING, ERROR
LOG_LEVEL=INFO
# 缓存目录(默认: /tmp/cache
CACHE_DIR=/tmp/cache
# 单张图片处理超时(秒,默认: 120
SINGLE_IMAGE_TIMEOUT=120
# ============================================
# 可选配置 - 并发控制
# ============================================
# 最大并发数(默认: 10
MAX_CONCURRENT=10
# ============================================
# 工作目录(系统自动设置,无需修改)
# ============================================
# 工作目录路径(由系统自动设置)
# COZE_WORKSPACE_PATH=/workspace/projects

View File

@ -1,464 +0,0 @@
# 项目部署指南
本文档帮助你将初中物理作业批改工作流导出到自己的服务器上运行。
## 📋 目录
- [前置要求](#前置要求)
- [快速部署](#快速部署)
- [详细配置](#详细配置)
- [启动方式](#启动方式)
- [常见问题](#常见问题)
---
## 前置要求
### 1. 系统要求
- **操作系统**: Linux / macOS / Windows (推荐 Linux)
- **Python版本**: Python 3.10 或以上
- **内存**: 建议 4GB 以上
- **磁盘空间**: 建议 10GB 以上
### 2. 必需的第三方服务
本项目依赖以下第三方服务,**必须提前准备好**
#### 大语言模型 API
- **推荐**: 火山引擎豆包大模型(本项目使用 `doubao-seed-2-0-pro-260215`
- **替代方案**:
- OpenAI API
- 其他兼容 OpenAI 格式的 API如 DeepSeek、Kimi
- **获取方式**:
- 火山引擎: https://console.volcengine.com/ark
- OpenAI: https://platform.openai.com/
**注意**
- ✅ **不需要配置对象存储**S3/TOS/OSS 等)
- ✅ 图片直接使用原始URL不上传存储
- ✅ Word文档使用 requests 直接下载,不涉及对象存储
---
## 快速部署
### 步骤 1: 导出项目代码
**方式一:从 Coze 平台下载**
```bash
# 在 Coze Coding 平台点击"导出项目"按钮
# 下载后解压到服务器
```
**方式二:使用 Git 克隆(如果有仓库地址)**
```bash
git clone <your-repo-url>
cd <project-directory>
```
### 步骤 2: 安装依赖
```bash
# 创建虚拟环境(推荐)
python3 -m venv venv
source venv/bin/activate # Linux/macOS
# 或 venv\Scripts\activate # Windows
# 安装依赖
pip install -r requirements.txt
```
### 步骤 3: 配置环境变量
创建 `.env` 文件(或在服务器环境变量中配置):
```bash
# 必需环境变量只需配置大模型API
export LLM_API_KEY="your-api-key-here"
export LLM_BASE_URL="https://ark.cn-beijing.volces.com/api/v3"
export LLM_MODEL_NAME="doubao-seed-2-0-pro-260215"
# 可选:日志级别
export LOG_LEVEL="INFO"
# 注意不需要配置对象存储S3/TOS等
```
### 步骤 4: 启动服务
```bash
# 方式1: 使用启动脚本(推荐)
bash scripts/http_run.sh -p 8000
# 方式2: 直接运行
python src/main.py -m http -p 8000
```
服务启动后,访问:
- 健康检查: `http://localhost:8000/health`
- API 文档: `http://localhost:8000/docs`FastAPI 自动生成)
---
## 详细配置
### 1. 大语言模型配置
#### 方式一:使用火山引擎豆包大模型(推荐)
```bash
# 环境变量
export LLM_API_KEY="your-ark-api-key"
export LLM_BASE_URL="https://ark.cn-beijing.volces.com/api/v3"
export LLM_MODEL_NAME="doubao-seed-2-0-pro-260215"
```
**获取方式**
1. 访问火山引擎控制台: https://console.volcengine.com/ark
2. 创建推理接入点
3. 获取 API Key
#### 方式二:使用 OpenAI API
需要修改代码中的模型配置文件(`config/*.json`),将 `model` 字段改为 OpenAI 模型:
```json
{
"config": {
"model": "gpt-4o",
"temperature": 0.0
}
}
```
环境变量:
```bash
export LLM_API_KEY="your-openai-api-key"
export LLM_BASE_URL="https://api.openai.com/v1"
export LLM_MODEL_NAME="gpt-4o"
```
### 2. ~~对象存储配置~~(已移除)
**重要更新2026-03-27**
- ❌ 不需要配置对象存储
- ✅ 图片直接使用原始URL不上传
- ✅ Word文档直接下载不存储
**架构优化原因**
1. AI模型足够强大可以直接访问原始图片URL
2. 使用相对坐标系统0-1000自动适配任意尺寸
3. 减少存储成本和上传时间,处理速度更快
### 3. 修改代码适配自己的环境
**⚠️ 重要:必须修改 LLM 调用逻辑**
项目原使用了 `coze-coding-dev-sdk`Coze平台专用**必须替换为标准 OpenAI SDK**。
**✅ 已提供替代方案**:我们已创建 `src/utils/llm_client.py`,封装了标准 OpenAI SDK。
**修改步骤(已完成)**
1. **创建自定义LLM客户端**`src/utils/llm_client.py` ✅
- 使用标准 OpenAI SDK
- 兼容原代码接口
- 支持火山引擎/OpenAI/其他兼容API
2. **修改导入语句**(已完成):
- `src/graphs/nodes/recognize_and_correct_node.py`
- `src/graphs/nodes/doc_extract_node.py`
```python
# 修改前(原代码)
from coze_coding_dev_sdk import LLMClient
# 修改后(新代码)
from utils.llm_client import LLMClient
```
**无需手动修改**:代码已经更新完成,直接部署即可。
#### ~~修改对象存储逻辑~~(不需要)
**已移除**2026-03-27 优化后,不再使用对象存储
- 图片直接使用原始URL
- Word文档使用 requests 下载
- 无需修改任何存储相关代码
### 4. 缓存配置(可选)
项目使用文件缓存来存储解析结果,默认缓存目录为 `/tmp/cache`
如需修改缓存目录:
```bash
export CACHE_DIR="/your/custom/cache/dir"
```
---
## 启动方式
### 1. HTTP 服务模式(推荐生产环境)
```bash
# 使用启动脚本
bash scripts/http_run.sh -p 8000
# 或直接运行
python src/main.py -m http -p 8000
```
**特点**
- 提供 REST API 接口
- 支持流式响应SSE
- 支持超时控制
- 支持任务取消
**API 接口**
- `POST /run` - 同步运行工作流
- `POST /stream_run` - 流式运行工作流SSE
- `POST /cancel/{run_id}` - 取消运行
- `GET /health` - 健康检查
- `GET /graph_parameter` - 查看工作流参数
### 2. 命令行模式(本地测试)
```bash
# 运行整个工作流
python src/main.py -m flow -i '{"student_homework": [...], "answer_doc_url": "..."}'
# 运行单个节点
python src/main.py -m node -n doc_extract -i '{"answer_doc_url": "..."}'
```
### 3. Docker 部署(推荐)
创建 `Dockerfile`
```dockerfile
FROM python:3.10-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制项目文件
COPY . .
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["python", "src/main.py", "-m", "http", "-p", "8000"]
```
构建和运行:
```bash
# 构建镜像
docker build -t homework-correction:v1 .
# 运行容器
docker run -d \
--name homework-correction \
-p 8000:8000 \
-e LLM_API_KEY="your-api-key" \
-e LLM_BASE_URL="https://ark.cn-beijing.volces.com/api/v3" \
-e LLM_MODEL_NAME="doubao-seed-2-0-pro-260215" \
homework-correction:v1
```
### 4. 使用 Docker Compose
创建 `docker-compose.yml`
```yaml
version: '3.8'
services:
homework-correction:
build: .
ports:
- "8000:8000"
environment:
- LLM_API_KEY=${LLM_API_KEY}
- LLM_BASE_URL=${LLM_BASE_URL}
- LLM_MODEL_NAME=${LLM_MODEL_NAME}
restart: unless-stopped
volumes:
- ./cache:/tmp/cache # 持久化缓存
```
运行:
```bash
docker-compose up -d
```
---
## 常见问题
### Q1: 如何验证环境变量是否正确配置?
```bash
# 检查环境变量
echo $LLM_API_KEY
echo $LLM_BASE_URL
echo $S3_ACCESS_KEY
# 或在代码中打印
python -c "import os; print(os.getenv('LLM_API_KEY'))"
```
### Q2: 启动时报错 "ModuleNotFoundError: No module named 'xxx'"
**解决方案**
```bash
# 确保在虚拟环境中
source venv/bin/activate
# 重新安装依赖
pip install -r requirements.txt
```
### Q3: LLM 调用失败,报错 "API key not found"
**原因**: 环境变量未正确设置
**解决方案**
```bash
# 方式1: 在 .env 文件中配置
echo "export LLM_API_KEY='your-api-key'" >> ~/.bashrc
source ~/.bashrc
# 方式2: 在启动命令前设置
LLM_API_KEY="your-api-key" python src/main.py -m http -p 8000
```
### Q4: 报错 "S3对象不存在" 或图片URL返回404
**原因**: 图片URL不可访问
**检查清单**
1. ✅ 图片URL是否有效在浏览器中打开测试
2. ✅ URL是否需要认证检查是否有权限
3. ✅ URL是否已过期部分临时URL有时效性
4. ✅ URL格式是否正确http:// 或 https:// 开头)
**解决方案**
```bash
# 测试图片URL是否可访问
curl -I "https://your-image-url.com/image.jpg"
# 如果返回404说明URL无效或已过期
# 需要重新上传图片获取新的URL
```
**支持的图片格式**
- ✅ 公开的HTTP/HTTPS URL推荐
- ❌ 需要认证的URL需先下载到公开存储
- ❌ 本地文件路径(需上传到网络存储)
### Q5: 如何测试工作流是否正常?
使用 curl 发送测试请求:
```bash
curl -X POST http://localhost:8000/run \
-H "Content-Type: application/json" \
-d '{
"student_homework": [
{
"student_id": 0,
"student_name": "测试学生",
"homework_images": ["https://example.com/homework.jpg"]
}
],
"answer_doc_url": "https://example.com/answer.docx"
}'
```
### Q6: 如何查看运行日志?
```bash
# 实时查看日志
tail -f /app/work/logs/bypass/app.log
# 或使用 Docker logs
docker logs -f homework-correction
```
### Q7: 性能优化建议
1. **并发控制**: 调整 `max_concurrent` 参数默认10
2. **超时设置**: 修改 `SINGLE_IMAGE_TIMEOUT` 常量默认120秒
3. **缓存优化**: 定期清理 `/tmp/cache` 目录
4. **资源监控**: 使用 `htop``docker stats` 监控资源使用
### Q8: 如何替换为其他 LLM 模型?
1. 修改环境变量:
```bash
export LLM_API_KEY="your-other-llm-api-key"
export LLM_BASE_URL="https://api.other-llm.com/v1"
export LLM_MODEL_NAME="other-model-id"
```
2. 修改配置文件(`config/*.json`)中的 `model` 字段
3. 测试调用是否正常
---
## 项目文件说明
```
├── src/
│ ├── main.py # 主入口
│ ├── graphs/
│ │ ├── graph.py # 主工作流编排
│ │ ├── loop_graph.py # 子图定义
│ │ ├── state.py # 状态定义
│ │ └── nodes/ # 节点实现
│ │ ├── doc_extract_node.py
│ │ ├── process_images_node.py
│ │ ├── recognize_and_correct_node.py
│ │ └── ...
│ └── utils/ # 工具函数
│ ├── file/file.py # 文件处理
│ └── cache_manager.py # 缓存管理
├── config/ # LLM 配置文件
│ ├── doc_extract_llm_cfg.json
│ ├── homework_correction_cfg.json
│ └── ...
├── scripts/ # 启动脚本
│ ├── http_run.sh
│ └── local_run.sh
├── requirements.txt # Python 依赖
└── README.md # 项目说明
```
---
## 技术支持
如遇到问题,请检查:
1. ✅ 环境变量是否正确配置
2. ✅ 依赖是否完整安装
3. ✅ 第三方服务LLM、存储是否可用
4. ✅ 日志文件中的错误信息
---
## 更新日志
- 2026-03-28: 添加缓存学科隔离,修复等级评定逻辑
- 2026-03-27: 移除图片上传直接使用原始URL
- 2026-03-26: 优化坐标定位,修复识别问题
- 2026-03-25: 支持多学生多图片并行处理

View File

@ -1,111 +0,0 @@
#!/bin/bash
# ============================================
# 初中物理作业批改工作流 - 快速部署脚本
# ============================================
set -e
echo "======================================"
echo " 初中物理作业批改工作流 - 部署向导"
echo "======================================"
echo ""
# 检测操作系统
if [[ "$OSTYPE" == "linux-gnu"* ]]; then
OS="Linux"
elif [[ "$OSTYPE" == "darwin"* ]]; then
OS="macOS"
elif [[ "$OSTYPE" == "msys" ]] || [[ "$OSTYPE" == "cygwin" ]]; then
OS="Windows"
else
OS="Unknown"
fi
echo "检测到操作系统: $OS"
echo ""
# 步骤1: 检查Python版本
echo "步骤 1/5: 检查 Python 版本..."
if command -v python3 &> /dev/null; then
PYTHON_VERSION=$(python3 --version 2>&1 | awk '{print $2}')
PYTHON_MAJOR=$(echo $PYTHON_VERSION | cut -d. -f1)
PYTHON_MINOR=$(echo $PYTHON_VERSION | cut -d. -f2)
if [ "$PYTHON_MAJOR" -ge 3 ] && [ "$PYTHON_MINOR" -ge 10 ]; then
echo "✅ Python 版本: $PYTHON_VERSION"
else
echo "❌ Python 版本过低: $PYTHON_VERSION (需要 3.10+)"
exit 1
fi
else
echo "❌ 未找到 Python 3"
exit 1
fi
echo ""
# 步骤2: 创建虚拟环境
echo "步骤 2/5: 创建虚拟环境..."
if [ ! -d "venv" ]; then
python3 -m venv venv
echo "✅ 虚拟环境已创建"
else
echo "✅ 虚拟环境已存在"
fi
echo ""
# 步骤3: 激活虚拟环境
echo "步骤 3/5: 激活虚拟环境..."
if [ "$OS" == "Windows" ]; then
source venv/Scripts/activate
else
source venv/bin/activate
fi
echo "✅ 虚拟环境已激活"
echo ""
# 步骤4: 安装依赖
echo "步骤 4/5: 安装依赖包..."
if [ -f "requirements.txt" ]; then
pip install --upgrade pip
pip install -r requirements.txt
echo "✅ 依赖安装完成"
else
echo "❌ 未找到 requirements.txt"
exit 1
fi
echo ""
# 步骤5: 配置环境变量
echo "步骤 5/5: 配置环境变量..."
if [ ! -f ".env" ]; then
if [ -f ".env.example" ]; then
cp .env.example .env
echo "✅ 已创建 .env 文件"
echo ""
echo "⚠️ 请编辑 .env 文件,填写以下必需配置:"
echo " - LLM_API_KEY"
echo " - LLM_BASE_URL"
echo " - LLM_MODEL_NAME"
echo ""
echo "注意不需要配置对象存储图片直接使用原始URL"
echo ""
echo "编辑完成后,运行以下命令启动服务:"
echo " source .env"
echo " bash scripts/http_run.sh -p 8000"
else
echo "❌ 未找到 .env.example"
exit 1
fi
else
echo "✅ .env 文件已存在"
echo ""
echo "启动服务:"
echo " source .env"
echo " bash scripts/http_run.sh -p 8000"
fi
echo ""
echo "======================================"
echo " ✅ 部署完成!"
echo "======================================"

View File

@ -1,424 +0,0 @@
import os
import re
from pathlib import Path
from typing import Optional, Any, Dict, List, TypedDict, Iterable
from uuid import uuid4
import boto3
from botocore.exceptions import ClientError
from boto3.s3.transfer import TransferConfig
import logging
logger = logging.getLogger(__name__)
# 允许的文件名字符集(面向用户输入的约束)
FILE_NAME_ALLOWED_RE = re.compile(r"^[A-Za-z0-9._\-/]+$")
class ListFilesResult(TypedDict):
# list_files 的返回结构类型
keys: List[str]
is_truncated: bool
next_continuation_token: Optional[str]
class S3SyncStorage:
"""S3兼容存储实现"""
def __init__(self, *, endpoint_url: Optional[str] = None, access_key: str, secret_key: str, bucket_name: str, region: str = "cn-beijing"):
self.endpoint_url = os.environ.get("COZE_BUCKET_ENDPOINT_URL") or endpoint_url or ''
self.access_key = access_key
self.secret_key = secret_key
self.bucket_name = bucket_name
self.region = region
self._client = None
def _get_client(self):
if self._client is None:
endpoint = self.endpoint_url
if endpoint is None or endpoint == "":
try:
from coze_workload_identity import Client as CozeEnvClient
coze_env_client = CozeEnvClient()
env_vars = coze_env_client.get_project_env_vars()
coze_env_client.close()
for env_var in env_vars:
if env_var.key == "COZE_BUCKET_ENDPOINT_URL":
endpoint = env_var.value.replace("'", "'\\''")
self.endpoint_url = endpoint
break
except Exception as e:
logger.error(f"Error loading COZE_BUCKET_ENDPOINT_URL: {e}")
# 保持向下校验逻辑,避免在此处中断
if endpoint is None or endpoint == "":
logger.error("未配置存储端点请设置endpoint_url")
raise ValueError("未配置存储端点请设置endpoint_url")
client = boto3.client(
"s3",
endpoint_url=endpoint,
aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key,
region_name=self.region,
)
# 注册 before-call 钩子,发送前注入 x-storage-token 头
def _inject_header(**kwargs):
try:
from coze_workload_identity import Client as CozeClient
coze_client = CozeClient()
try:
token = coze_client.get_access_token()
except Exception as e:
logger.error("Error loading COZE_WORKLOAD_IDENTITY_TOKEN: %s", e)
token = None
raise e
finally:
coze_client.close()
params = kwargs.get("params", {})
headers = params.setdefault("headers", {})
headers["x-storage-token"] = token
except Exception as e:
logger.error("Error loading COZE_WORKLOAD_IDENTITY_TOKEN: %s", e)
pass
client.meta.events.register("before-call.s3", _inject_header)
self._client = client
return self._client
def _generate_object_key(self, *, original_name: str) -> str:
suffix = Path(original_name).suffix.lower()
stem = Path(original_name).stem
uniq = uuid4().hex[:8]
return f"{stem}_{uniq}{suffix}"
def _extract_logid(self, e: Exception) -> Optional[str]:
"""从 ClientError 中提取 x-tt-logid"""
if isinstance(e, ClientError):
headers = (e.response or {}).get("ResponseMetadata", {}).get("HTTPHeaders", {})
return headers.get("x-tt-logid")
return None
def _error_msg(self, msg: str, e: Exception) -> str:
"""构建带 logid 的错误信息"""
logid = self._extract_logid(e)
if logid:
return f"{msg}: {e} (x-tt-logid: {logid})"
return f"{msg}: {e}"
def _resolve_bucket(self, bucket: Optional[str]) -> str:
"""统一解析 bucket 来源,确保得到有效桶名。"""
target_bucket = bucket or os.environ.get("COZE_BUCKET_NAME") or self.bucket_name
if not target_bucket:
raise ValueError("未配置 bucket请传入 bucket 或设置 COZE_BUCKET_NAME或在实例化时提供 bucket_name")
return target_bucket
def _validate_file_name(self, name: str) -> None:
"""校验 S3 对象命名长度≤1024允许 [A-Za-z0-9._-/];不以 / 起止且不含 //。"""
msg = (
"file name invalid: 文件名需满足以下 S3 对象命名规范:"
"1) 长度 11024 字节;"
"2) 仅允许字母、数字、点(.)、下划线(_)、短横(-)、目录分隔符(/)"
"3) 不允许空格或以下特殊字符:? # & % { } ^ [ ] ` \\ < > ~ | \" ' + = : ;"
"4) 不以 / 开头或结尾,且不包含连续的 //"
"示例report_2025-12-11.pdf、images/photo-01.png。"
)
if not name or not name.strip():
raise ValueError(msg + "(原因:文件名为空)")
# S3 限制对象 key 最大 1024 字节,这里沿用到输入文件名
if len(name.encode("utf-8")) > 1024:
raise ValueError(msg + "(原因:长度超过 1024 字节)")
if name.startswith("/") or name.endswith("/"):
raise ValueError(msg + "(原因:以 / 开头或结尾)")
if "//" in name:
raise ValueError(msg + "(原因:包含连续的 //")
# 允许字符集校验
if not FILE_NAME_ALLOWED_RE.match(name):
bad = re.findall(r"[^A-Za-z0-9._\-/]", name)
example = bad[0] if bad else "非法字符"
raise ValueError(msg + f"(原因:包含非法字符,例如:{example}")
def upload_file(self, *, file_content: bytes, file_name: str, content_type: str = "application/octet-stream", bucket: Optional[str] = None) -> str:
# 先对输入文件名做规范校验,避免生成无效对象 key
self._validate_file_name(file_name)
try:
client = self._get_client()
object_key = self._generate_object_key(original_name=file_name)
target_bucket = self._resolve_bucket(bucket)
client.put_object(Bucket=target_bucket, Key=object_key, Body=file_content, ContentType=content_type)
return object_key
except Exception as e:
logger.error(self._error_msg("Error uploading file to S3", e))
raise e
def delete_file(self, *, file_key: str, bucket: Optional[str] = None) -> bool:
try:
client = self._get_client()
target_bucket = self._resolve_bucket(bucket)
client.delete_object(Bucket=target_bucket, Key=file_key)
return True
except Exception as e:
logger.error(self._error_msg("Error deleting file from S3", e))
raise e
def file_exists(self, *, file_key: str, bucket: Optional[str] = None) -> bool:
try:
client = self._get_client()
target_bucket = self._resolve_bucket(bucket)
client.head_object(Bucket=target_bucket, Key=file_key)
return True
except ClientError as e:
code = (e.response or {}).get("Error", {}).get("Code", "")
if code in {"404", "NoSuchKey", "NotFound"}:
return False
logger.error(self._error_msg("Error checking file existence in S3", e))
return False
except Exception as e:
logger.error(self._error_msg("Error checking file existence in S3", e))
return False
def read_file(self, *, file_key: str, bucket: Optional[str] = None) -> bytes:
try:
client = self._get_client()
target_bucket = self._resolve_bucket(bucket)
resp = client.get_object(Bucket=target_bucket, Key=file_key)
body = resp.get("Body")
if body is None:
raise RuntimeError("S3 get_object returned no Body")
try:
return body.read()
finally:
try:
body.close()
except Exception as ce:
# 资源关闭失败不影响读取结果,仅记录以便排查
logger.debug("Failed to close S3 response body: %s", ce)
except Exception as e:
logger.error(self._error_msg("Error reading file from S3", e))
raise e
def list_files(self, *, prefix: Optional[str] = None, bucket: Optional[str] = None, max_keys: int = 1000, continuation_token: Optional[str] = None) -> ListFilesResult:
"""列出对象,支持前缀过滤与分页;返回 keys/is_truncated/next_continuation_token。"""
try:
client = self._get_client()
target_bucket = self._resolve_bucket(bucket)
if max_keys <= 0 or max_keys > 1000:
raise ValueError("max_keys 必须在 1 到 1000 之间")
kwargs: Dict[str, Any] = {
"Bucket": target_bucket,
"MaxKeys": max_keys,
"Prefix": prefix,
"ContinuationToken": continuation_token,
}
kwargs = {k: v for k, v in kwargs.items() if v is not None}
resp = client.list_objects_v2(**kwargs)
contents = resp.get("Contents", []) or []
keys: List[str] = [item.get("Key") for item in contents if isinstance(item, dict) and item.get("Key")]
return {
"keys": keys,
"is_truncated": bool(resp.get("IsTruncated")),
"next_continuation_token": resp.get("NextContinuationToken"),
}
except ClientError as e:
code = (e.response or {}).get("Error", {}).get("Code", "")
logger.error(self._error_msg(f"Error listing files in S3 (code={code})", e))
raise e
except Exception as e:
logger.error(self._error_msg("Error listing files in S3", e))
raise e
def generate_presigned_url(self, *, key: str, bucket: Optional[str] = None, expire_time: int = 1800) -> str:
"""通过 S3 Proxy 生成签名 URL。"""
import json
import urllib.request as urllib_request
try:
from coze_workload_identity import Client as CozeClient
coze_client = CozeClient()
try:
token = coze_client.get_access_token()
finally:
try:
coze_client.close()
except Exception:
# 资源释放失败不影响后续流程
pass
except Exception as e:
logger.error(f"Error loading x-storage-token: {e}")
raise RuntimeError(f"获取 x-storage-token 失败: {e}")
try:
sign_base = os.environ.get("COZE_BUCKET_ENDPOINT_URL") or self.endpoint_url
if not sign_base:
raise ValueError("未配置签名端点:请设置 COZE_BUCKET_ENDPOINT_URL 或传入 endpoint_url")
sign_url_endpoint = sign_base.rstrip("/") + "/sign-url"
headers = {
"Content-Type": "application/json",
"x-storage-token": token,
}
target_bucket = self._resolve_bucket(bucket)
payload = {"bucket_name": target_bucket, "path": key, "expire_time": expire_time}
data = json.dumps(payload).encode("utf-8")
request = urllib_request.Request(sign_url_endpoint, data=data, headers=headers, method="POST")
except Exception as e:
logger.error(f"Error creating request for sign-url: {e}")
raise RuntimeError(f"创建 sign-url 请求失败: {e}")
try:
with urllib_request.urlopen(request) as resp:
resp_bytes = resp.read()
content_type = resp.headers.get("Content-Type", "")
text = resp_bytes.decode("utf-8", errors="replace")
if "application/json" in content_type or text.strip().startswith("{"):
try:
obj = json.loads(text)
except Exception:
return text
data = obj.get("data")
if isinstance(data, dict) and "url" in data:
return data["url"]
url_value = obj.get("url") or obj.get("signed_url") or obj.get("presigned_url")
if url_value:
return url_value
raise ValueError("签名服务返回缺少 data.url/url 字段")
return text
except Exception as e:
raise RuntimeError(f"生成签名URL失败: {e}")
def stream_upload_file(
self,
*,
fileobj,
file_name: str,
content_type: str = "application/octet-stream",
bucket: Optional[str] = None,
multipart_chunksize: int = 5 * 1024 * 1024,
multipart_threshold: int = 5 * 1024 * 1024,
max_concurrency: int = 1,
use_threads: bool = False,
) -> str:
"""流式上传(文件对象)
- fileobj: 任何带有 read() 方法的文件对象 open(..., 'rb') 返回的对象io.BytesIO
- file_name: 原始文件名用于生成唯一 key
- content_type: MIME 类型
- bucket: 目标桶为空时取环境变量或实例默认值
- multipart_chunksize: 分片大小默认 5MB以适配代理层限制
- multipart_threshold: 触发分片上传的阈值默认 5MB
- max_concurrency: 并发分片上传的并发数默认 1避免代理层节流影响
- use_threads: 是否启用线程并发默认 False
返回最终写入的对象 key
"""
try:
client = self._get_client()
target_bucket = self._resolve_bucket(bucket)
key = self._generate_object_key(original_name=file_name)
extra_args = {"ContentType": content_type} if content_type else {}
# 使用 boto3 的高阶方法执行多段上传(传入 TransferConfig 控制分片大小)
config = TransferConfig(
multipart_chunksize=multipart_chunksize,
multipart_threshold=multipart_threshold,
max_concurrency=max_concurrency,
use_threads=use_threads,
)
client.upload_fileobj(Fileobj=fileobj, Bucket=target_bucket, Key=key, ExtraArgs=extra_args, Config=config)
return key
except Exception as e:
logger.error(self._error_msg("Error streaming upload (fileobj) to S3", e))
raise e
def upload_from_url(
self,
*,
url: str,
bucket: Optional[str] = None,
timeout: int = 30,
) -> str:
"""从 URL 流式下载并上传到 S3
- url: 源文件 URL
- bucket: 目标桶为空时取环境变量或实例默认值
- timeout: HTTP 请求超时时间默认 30
返回最终写入的对象 key
"""
import urllib.request as urllib_request
from urllib.parse import urlparse, unquote
try:
request = urllib_request.Request(url)
with urllib_request.urlopen(request, timeout=timeout) as resp:
parsed = urlparse(url)
file_name = Path(unquote(parsed.path)).name or "file"
content_type = resp.headers.get("Content-Type", "application/octet-stream")
return self.stream_upload_file(
fileobj=resp,
file_name=file_name,
content_type=content_type,
bucket=bucket,
)
except Exception as e:
logger.error(self._error_msg("Error uploading from URL to S3", e))
raise e
def trunk_upload_file(self, *, chunk_iter: Iterable[bytes], file_name: str,
content_type: str = "application/octet-stream", bucket: Optional[str] = None,
part_size: int = 5 * 1024 * 1024) -> str:
"""流式上传(字节迭代器,显式分片 Multipart Upload
- chunk_iter: 可迭代对象逐块产生 bytes每块大小可变内部累积到 part_size 再上传最后一块可小于 5MB
- file_name: 原始文件名用于生成唯一 key
- content_type: MIME 类型
- bucket: 目标桶为空时取环境或实例默认值
- part_size: 每个 part 的最小大小除最后一个默认 5MB
返回最终写入的对象 key
"""
client = self._get_client()
target_bucket = self._resolve_bucket(bucket)
key = self._generate_object_key(original_name=file_name)
# 初始化分片上传
try:
init_resp = client.create_multipart_upload(Bucket=target_bucket, Key=key, ContentType=content_type)
upload_id = init_resp["UploadId"]
except Exception as e:
logger.error(self._error_msg("create_multipart_upload failed", e))
raise e
parts = []
part_number = 1
buffer = bytearray()
try:
for chunk in chunk_iter:
if not chunk:
continue
buffer.extend(chunk)
while len(buffer) >= part_size:
data = bytes(buffer[:part_size])
buffer = buffer[part_size:]
resp = client.upload_part(Bucket=target_bucket, Key=key, UploadId=upload_id, PartNumber=part_number,
Body=data)
parts.append({"PartNumber": part_number, "ETag": resp["ETag"]})
part_number += 1
# 上传最后不足 part_size 的余量
if len(buffer) > 0:
resp = client.upload_part(Bucket=target_bucket, Key=key, UploadId=upload_id, PartNumber=part_number,
Body=bytes(buffer))
parts.append({"PartNumber": part_number, "ETag": resp["ETag"]})
# 完成分片
client.complete_multipart_upload(
Bucket=target_bucket,
Key=key,
UploadId=upload_id,
MultipartUpload={"Parts": parts},
)
return key
except Exception as e:
logger.error(self._error_msg("multipart upload failed", e))
try:
client.abort_multipart_upload(Bucket=target_bucket, Key=key, UploadId=upload_id)
except Exception as ae:
logger.error(self._error_msg("abort_multipart_upload failed", ae))
raise e

View File

@ -1,135 +0,0 @@
"""LLM客户端封装 - 兼容OpenAI API"""
import os
import logging
from typing import List, Dict, Any, Optional, Union
from openai import OpenAI
logger = logging.getLogger(__name__)
class LLMClient:
"""
LLM客户端封装类兼容OpenAI API格式
支持的提供商
- 火山引擎豆包大模型
- OpenAI
- 其他兼容OpenAI格式的API
"""
def __init__(self, ctx=None):
"""
初始化LLM客户端
Args:
ctx: 上下文对象兼容原SDK接口实际不使用
"""
self.api_key = os.getenv("LLM_API_KEY")
self.base_url = os.getenv("LLM_BASE_URL")
self.model_name = os.getenv("LLM_MODEL_NAME", "doubao-seed-2-0-pro-260215")
if not self.api_key:
raise ValueError("LLM_API_KEY environment variable is not set")
self.client = OpenAI(
api_key=self.api_key,
base_url=self.base_url
)
logger.info(f"LLMClient initialized with base_url: {self.base_url}")
def invoke(
self,
messages: List[Dict[str, Any]],
model: Optional[str] = None,
temperature: float = 0.0,
max_completion_tokens: int = 8192,
**kwargs
) -> Any:
"""
调用大模型API
Args:
messages: 消息列表支持文本和多模态内容
model: 模型名称可选默认使用环境变量
temperature: 温度参数
max_completion_tokens: 最大生成token数
**kwargs: 其他参数
Returns:
响应对象包含 content 属性
"""
model = model or self.model_name
logger.info(f"Invoking LLM with model: {model}, temperature: {temperature}")
try:
response = self.client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature,
max_tokens=max_completion_tokens,
**kwargs
)
# 返回兼容格式的响应对象
class Response:
def __init__(self, content):
self.content = content
# 提取响应内容
if response.choices and len(response.choices) > 0:
content = response.choices[0].message.content
return Response(content=content)
else:
logger.error("Empty response from LLM")
return Response(content="")
except Exception as e:
logger.error(f"LLM invocation failed: {e}")
raise
def stream(
self,
messages: List[Dict[str, Any]],
model: Optional[str] = None,
temperature: float = 0.0,
max_completion_tokens: int = 8192,
**kwargs
):
"""
流式调用大模型API
Args:
messages: 消息列表
model: 模型名称
temperature: 温度参数
max_completion_tokens: 最大生成token数
**kwargs: 其他参数
Yields:
响应块
"""
model = model or self.model_name
logger.info(f"Streaming LLM with model: {model}")
try:
stream = self.client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature,
max_tokens=max_completion_tokens,
stream=True,
**kwargs
)
for chunk in stream:
if chunk.choices and len(chunk.choices) > 0:
delta = chunk.choices[0].delta
if hasattr(delta, 'content') and delta.content:
yield delta.content
except Exception as e:
logger.error(f"LLM streaming failed: {e}")
raise

View File

@ -1,168 +0,0 @@
#!/bin/bash
# ============================================
# 初中物理作业批改工作流 - 部署验证脚本
# ============================================
set -e
echo "======================================"
echo " 部署验证测试"
echo "======================================"
echo ""
# 颜色定义
GREEN='\033[0;32m'
RED='\033[0;31m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# 测试计数
PASS=0
FAIL=0
# 测试函数
test_step() {
local name=$1
local command=$2
echo -n "测试: $name ... "
if eval "$command" > /dev/null 2>&1; then
echo -e "${GREEN}✅ 通过${NC}"
((PASS++))
return 0
else
echo -e "${RED}❌ 失败${NC}"
((FAIL++))
return 1
fi
}
# 1. 环境检查
echo "1. 环境检查"
echo "-----------------------------------"
test_step "Python 版本" "python3 --version"
test_step "虚拟环境" "test -d venv"
test_step "依赖安装" "python3 -c 'import fastapi'"
test_step "依赖安装" "python3 -c 'import langgraph'"
test_step "依赖安装" "python3 -c 'import openai'"
echo ""
# 2. 配置检查
echo "2. 配置检查"
echo "-----------------------------------"
if [ -f ".env" ]; then
echo -e "${GREEN}✅ .env 文件存在${NC}"
((PASS++))
# 检查必需环境变量
source .env
if [ -n "$LLM_API_KEY" ] && [ "$LLM_API_KEY" != "your-api-key-here" ]; then
echo -e "${GREEN}✅ LLM_API_KEY 已配置${NC}"
((PASS++))
else
echo -e "${RED}❌ LLM_API_KEY 未配置${NC}"
((FAIL++))
fi
if [ -n "$LLM_BASE_URL" ]; then
echo -e "${GREEN}✅ LLM_BASE_URL 已配置${NC}"
((PASS++))
else
echo -e "${RED}❌ LLM_BASE_URL 未配置${NC}"
((FAIL++))
fi
if [ -n "$LLM_MODEL_NAME" ]; then
echo -e "${GREEN}✅ LLM_MODEL_NAME 已配置${NC}"
((PASS++))
else
echo -e "${RED}❌ LLM_MODEL_NAME 未配置${NC}"
((FAIL++))
fi
echo -e "${GREEN}✅ 无需配置对象存储(已优化)${NC}"
((PASS++))
else
echo -e "${RED}❌ .env 文件不存在${NC}"
((FAIL++))
fi
echo ""
# 3. 文件完整性检查
echo "3. 文件完整性检查"
echo "-----------------------------------"
test_step "主入口文件" "test -f src/main.py"
test_step "主工作流" "test -f src/graphs/graph.py"
test_step "状态定义" "test -f src/graphs/state.py"
test_step "配置文件目录" "test -d config"
test_step "启动脚本" "test -f scripts/http_run.sh"
echo ""
# 4. 模块导入测试
echo "4. 模块导入测试"
echo "-----------------------------------"
test_step "导入主模块" "python3 -c 'from graphs.state import GlobalState'"
test_step "导入节点模块" "python3 -c 'from graphs.nodes.doc_extract_node import doc_extract_node'"
echo ""
# 5. 服务启动测试(可选)
echo "5. 服务启动测试"
echo "-----------------------------------"
if command -v curl &> /dev/null; then
# 检查服务是否已启动
if curl -s http://localhost:8000/health > /dev/null 2>&1; then
echo -e "${GREEN}✅ 服务已运行在 http://localhost:8000${NC}"
((PASS++))
# 测试健康检查
HEALTH=$(curl -s http://localhost:8000/health)
if echo "$HEALTH" | grep -q "ok"; then
echo -e "${GREEN}✅ 健康检查通过${NC}"
((PASS++))
else
echo -e "${RED}❌ 健康检查失败${NC}"
((FAIL++))
fi
else
echo -e "${YELLOW}⚠️ 服务未启动,跳过服务测试${NC}"
echo " 启动服务: bash scripts/http_run.sh -p 8000"
fi
else
echo -e "${YELLOW}⚠️ curl 未安装,跳过服务测试${NC}"
fi
echo ""
# 测试总结
echo "======================================"
echo " 测试总结"
echo "======================================"
echo ""
echo -e "${GREEN}通过: $PASS${NC}"
echo -e "${RED}失败: $FAIL${NC}"
echo ""
if [ $FAIL -eq 0 ]; then
echo -e "${GREEN}✅ 所有测试通过!部署成功!${NC}"
echo ""
echo "下一步:"
echo " 1. 启动服务: bash scripts/http_run.sh -p 8000"
echo " 2. 访问文档: http://localhost:8000/docs"
echo " 3. 发送测试请求: curl -X POST http://localhost:8000/run -H 'Content-Type: application/json' -d @test_payload.json"
exit 0
else
echo -e "${RED}❌ 部分测试失败,请检查配置${NC}"
echo ""
echo "常见问题:"
echo " - 环境变量未配置: 编辑 .env 文件"
echo " - 依赖未安装: pip install -r requirements.txt"
echo " - 文件缺失: 检查项目完整性"
exit 1
fi

View File

@ -1,97 +0,0 @@
#!/bin/bash
# ============================================
# 图片URL测试脚本
# ============================================
echo "======================================"
echo " 图片URL测试"
echo "======================================"
echo ""
if [ -z "$1" ]; then
echo "用法: bash test_image_url.sh <图片URL>"
echo ""
echo "示例:"
echo " bash test_image_url.sh https://example.com/image.jpg"
exit 1
fi
IMAGE_URL="$1"
echo "测试URL: $IMAGE_URL"
echo ""
# 检查URL格式
if [[ ! "$IMAGE_URL" =~ ^https?:// ]]; then
echo "❌ 错误: URL格式不正确必须以 http:// 或 https:// 开头"
exit 1
fi
echo "✅ URL格式正确"
echo ""
# 检查URL可访问性
echo "检查URL可访问性..."
HTTP_CODE=$(curl -s -o /dev/null -w "%{http_code}" -I "$IMAGE_URL")
if [ "$HTTP_CODE" = "200" ]; then
echo "✅ URL可访问 (HTTP $HTTP_CODE)"
elif [ "$HTTP_CODE" = "404" ]; then
echo "❌ URL不存在 (HTTP 404)"
echo ""
echo "可能的原因:"
echo " 1. 图片已被删除"
echo " 2. URL已过期"
echo " 3. URL错误"
exit 1
elif [ "$HTTP_CODE" = "403" ]; then
echo "❌ 无权限访问 (HTTP 403)"
echo ""
echo "可能的原因:"
echo " 1. 需要认证"
echo " 2. IP被限制"
echo " 3. 需要特定Referer"
exit 1
else
echo "⚠️ 警告: HTTP状态码 $HTTP_CODE"
fi
echo ""
# 检查Content-Type
echo "检查图片类型..."
CONTENT_TYPE=$(curl -s -I "$IMAGE_URL" | grep -i "Content-Type" | awk '{print $2}' | tr -d '\r')
if [[ "$CONTENT_TYPE" =~ image/ ]]; then
echo "✅ 图片类型: $CONTENT_TYPE"
else
echo "⚠️ 警告: Content-Type 不是图片类型: $CONTENT_TYPE"
fi
echo ""
# 检查文件大小
echo "检查文件大小..."
CONTENT_LENGTH=$(curl -s -I "$IMAGE_URL" | grep -i "Content-Length" | awk '{print $2}' | tr -d '\r')
if [ -n "$CONTENT_LENGTH" ]; then
SIZE_KB=$((CONTENT_LENGTH / 1024))
echo "✅ 文件大小: ${SIZE_KB}KB"
if [ $SIZE_KB -lt 10 ]; then
echo "⚠️ 警告: 文件过小,可能不是有效图片"
elif [ $SIZE_KB -gt 10240 ]; then
echo "⚠️ 警告: 文件过大(>10MB可能影响处理速度"
fi
else
echo "⚠️ 警告: 无法获取文件大小"
fi
echo ""
echo "======================================"
echo " ✅ 测试完成"
echo "======================================"
echo ""
echo "该图片URL可以用于作业批改工作流"

View File

@ -1,107 +0,0 @@
#!/bin/bash
# ============================================
# LLM连接测试脚本
# ============================================
echo "======================================"
echo " LLM 连接测试"
echo "======================================"
echo ""
# 检查环境变量
if [ -z "$LLM_API_KEY" ]; then
echo "❌ 错误: LLM_API_KEY 未设置"
echo ""
echo "请先设置环境变量:"
echo " export LLM_API_KEY='your-api-key'"
echo " export LLM_BASE_URL='https://ark.cn-beijing.volces.com/api/v3'"
echo " export LLM_MODEL_NAME='doubao-seed-2-0-pro-260215'"
exit 1
fi
if [ -z "$LLM_BASE_URL" ]; then
echo "⚠️ 警告: LLM_BASE_URL 未设置,使用默认值"
export LLM_BASE_URL="https://ark.cn-beijing.volces.com/api/v3"
fi
if [ -z "$LLM_MODEL_NAME" ]; then
echo "⚠️ 警告: LLM_MODEL_NAME 未设置,使用默认值"
export LLM_MODEL_NAME="doubao-seed-2-0-pro-260215"
fi
echo "✅ 环境变量已设置"
echo " - LLM_API_KEY: ${LLM_API_KEY:0:10}..."
echo " - LLM_BASE_URL: $LLM_BASE_URL"
echo " - LLM_MODEL_NAME: $LLM_MODEL_NAME"
echo ""
# 测试LLM连接
echo "正在测试 LLM 连接..."
echo ""
python3 << 'EOF'
import os
import sys
try:
from openai import OpenAI
api_key = os.getenv("LLM_API_KEY")
base_url = os.getenv("LLM_BASE_URL")
model_name = os.getenv("LLM_MODEL_NAME")
print(f"正在连接到: {base_url}")
print(f"使用模型: {model_name}")
print("")
client = OpenAI(
api_key=api_key,
base_url=base_url
)
print("发送测试请求...")
response = client.chat.completions.create(
model=model_name,
messages=[
{"role": "user", "content": "你好,请回复'测试成功'"}
],
max_tokens=50
)
if response.choices and len(response.choices) > 0:
content = response.choices[0].message.content
print("")
print("✅ LLM 连接成功!")
print(f" 响应: {content}")
print("")
sys.exit(0)
else:
print("❌ LLM 响应为空")
sys.exit(1)
except Exception as e:
print(f"❌ LLM 连接失败: {e}")
print("")
print("可能的原因:")
print(" 1. API Key 无效")
print(" 2. Base URL 错误")
print(" 3. 模型名称错误")
print(" 4. 网络连接问题")
print(" 5. API 配额不足")
sys.exit(1)
EOF
if [ $? -eq 0 ]; then
echo "======================================"
echo " ✅ 测试完成"
echo "======================================"
echo ""
echo "下一步:"
echo " 启动服务: bash scripts/http_run.sh -p 8000"
else
echo "======================================"
echo " ❌ 测试失败"
echo "======================================"
exit 1
fi

View File

@ -1,16 +0,0 @@
{
"student_homework": [
{
"student_id": 0,
"student_name": "测试学生",
"homework_images": [
"https://example.com/homework1.jpg",
"https://example.com/homework2.jpg"
]
}
],
"answer_doc_url": "https://example.com/answer.docx",
"subject": "physics",
"comment_max_length": 100,
"max_concurrent": 10
}