优化 拆分任务与工作流的执行进度

新增 PPT清洗流程
This commit is contained in:
小肥羊 2026-03-05 18:15:18 +08:00
parent a2200c0296
commit a8ac40d6fb
35 changed files with 1345 additions and 508 deletions

View File

@ -23,6 +23,8 @@ namespace Learn.VideoAnalysis.Expand
#endif
service.AddTransient<ClearAllCacheJob>();
service.AddTransient<NodePackageJob>();
// 注册心跳 Job
service.AddTransient<DeviceHeartbeatJob>();
}
public static void UseCoravelExpand(this IApplicationBuilder provider)
{
@ -30,6 +32,8 @@ namespace Learn.VideoAnalysis.Expand
{
//任务缓存清理
scheduler.Schedule<ClearAllCacheJob>().HourlyAt(10);
//在线心跳 30秒一次
scheduler.Schedule<DeviceHeartbeatJob>().EveryThirtySeconds();
//强制清理所有缓存内容
//scheduler.Schedule<ClearAllCacheJob>().Hourly();
//scheduler.Schedule<ClearAllCacheJob>().EverySeconds(40);

View File

@ -8,7 +8,7 @@ VITE_PUBLIC_PATH = /
VITE_ROUTER_HISTORY = "hash"
# 接口地址
VITE_API_BASEURL = "http://192.168.2.33:5238"
VITE_API_BASEURL = "http://192.168.2.33:7532"
# # 接口地址

View File

@ -28,11 +28,22 @@ export interface ShowTaskInfoRes {
videoKnows: VideoKnowRes[];
mediaUrl: string;
}
export interface VideoTaskWorkflow {
id: number;
videoTaskId: number;
workflowName: string;
currentStep: string;
currentStepValue: number;
message?: string;
updateTime: string;
}
export interface RowRloadResult {
progress: string;
lastEnum: string;
startTime: string;
errorMessage: string;
workflows: VideoTaskWorkflow[]; // 新增字段
}
/** 刷新任务实时数据 */
@ -42,21 +53,35 @@ export const RowRload = (id: any) => {
});
};
/** 重试任务 */
/** 重试任务 (VideoSliceWorkflow) */
export const ReStart = (id: any, selectEnum: number) => {
return http.request<any>("get", "/api/VideoTask/ReStart", {
params: { id, selectEnum }
});
};
/** 重试任务 (TidySlideWorkflow) */
export const ReStartTidySlide = (id: any, selectEnum: number) => {
return http.request<any>("get", "/api/VideoTask/ReStartTidySlide", {
params: { id, selectEnum }
});
};
/** 展示数据 */
/** 展示数据 (VideoSliceWorkflow) */
export const ShowTaskInfo = (id: any) => {
return http.request<ShowTaskInfoRes>("get", "/api/VideoTask/ShowTaskInfo", {
params: { id }
});
};
/** 展示数据 (TidySlideWorkflow) */
export const ShowTidySlideTaskInfo = (id: any) => {
return http.request<any>("get", "/api/VideoTask/ShowTidySlideTaskInfo", {
params: { id }
});
};
/** 展示数据 */
@ -66,6 +91,11 @@ export const RunningTaskList = (data: any) => {
});
};
/** 获取在线设备列表 */
export const GetOnlineDevices = () => {
return http.request<string[]>("get", "/api/VideoTask/OnlineDevices");
};
/** 展示数据 */
export const ErrorTaskList = (data: any) => {

View File

@ -6,6 +6,7 @@ import LayNavMix from "../lay-sidebar/NavMix.vue";
import LaySidebarFullScreen from "../lay-sidebar/components/SidebarFullScreen.vue";
import LaySidebarBreadCrumb from "../lay-sidebar/components/SidebarBreadCrumb.vue";
import LaySidebarTopCollapse from "../lay-sidebar/components/SidebarTopCollapse.vue";
import LinkIcon from "~icons/ri/links-fill"; //
import LogoutCircleRLine from "~icons/ri/logout-circle-r-line";
import Setting from "~icons/ri/settings-3-line";
@ -21,6 +22,11 @@ const {
avatarsStyle,
toggleSideBar,
} = useNav();
function openSwagger() {
const swaggerUrl = `${window.location.protocol}//${window.location.hostname}:7532/swagger/index.html`;
window.open(swaggerUrl, "_blank");
}
</script>
<template>
@ -40,6 +46,14 @@ const {
<LayNavMix v-if="layout === 'mix'" />
<div v-if="layout === 'vertical'" class="vertical-header-right">
<!-- Swagger 链接 -->
<span
class="set-icon navbar-bg-hover"
title="打开 Swagger 文档"
@click="openSwagger"
>
<IconifyIconOffline :icon="LinkIcon" />
</span>
<!-- 菜单搜索 -->
<LaySearch id="header-search" />
<!-- 全屏 -->

View File

@ -493,19 +493,11 @@ function openMenu(tag, e) {
function tagOnClick(item) {
const { name, path } = item;
if (name) {
if (item.query) {
router.push({
name,
query: item.query
});
} else if (item.params) {
router.push({
name,
params: item.params
});
} else {
router.push({ name });
}
router.push({
name,
query: item.query,
params: item.params
});
} else {
router.push({ path });
}

View File

@ -5,13 +5,23 @@ export default {
path: "/",
name: "Home",
component: Layout,
redirect: "/welcome",
redirect: "/welcome/monitor",
meta: {
icon: "ep/home-filled",
title: "首页",
rank: 0
},
children: [
{
path: "/welcome/monitor",
name: "monitor",
component: () => import("@/views/welcome/monitor.vue"),
meta: {
title: "设备监控",
showLink: true,
keepAlive: true,
}
},
{
path: "/welcome/runningTask",
name: "runningTask",
@ -32,7 +42,7 @@ export default {
}
},
{
path: "/welcome/showTask",
path: "/welcome/showTask_:id",
name: "showTask",
component: () => import("@/views/welcome/showTask.vue"),
meta: {
@ -41,6 +51,16 @@ export default {
keepAlive:true,
}
},
{
path: "/welcome/showTidySlideTask_:id",
name: "showTidySlideTask",
component: () => import("@/views/welcome/showTidySlideTask.vue"),
meta: {
title: "TidySlide任务预览",
showLink: false,
keepAlive: true,
}
},
{
path: "/welcome/errorTask",
name: "errorTask",

View File

@ -14,13 +14,19 @@ import { hTableAPI } from "@/api/hTable";
import { getenum } from "@/api/enum";
import { ruleRequired, ruleRequiredNumber } from "@/utils/rules";
import { ElMessage } from "element-plus";
import { ReStart, RowRload } from "@/api/videoTask";
import { ReStart, ReStartTidySlide, RowRload } from "@/api/videoTask";
import { Refresh } from "@element-plus/icons-vue";
import { message } from "@/utils/message";
import { json } from "stream/consumers";
import { useMultiTagsStoreHook } from "@/store/modules/multiTags";
import { useRouter } from "vue-router";
import {
workflowRegistry,
getWorkflowConfig,
StepData,
WorkflowConfig,
} from "./workflowConfig";
const ControllerName = "VideoTask";
@ -103,8 +109,9 @@ const dialogRef = ref({
visible: false,
value: null as any,
data: Object as any,
workflowName: "VideoSliceWorkflow", //
workflowOptions: [] as ComboModel[], //
});
let redisChannelEnum = ref<ComboModel[]>([]);
const showTable = ref(false);
onMounted(async () => {
//
@ -112,38 +119,71 @@ onMounted(async () => {
tableData.column.videoType.setting.datasource = await getenum("AttachmentsInfoType");
tableData.column.lastEnum.setting.datasource = await getenum("RedisChannelEnum");
tableData.column.subject.setting.datasource = await getenum("SubjectEnum");
redisChannelEnum.value = tableData.column.lastEnum.setting.datasource;
//
for (const key in workflowRegistry) {
const config = workflowRegistry[key];
config.enumOptions = await getenum(config.enumName);
}
showTable.value = true;
});
async function showDialog(row) {
// showDialog workflow
async function showDialog(row: any, workflowName: string = "VideoSliceWorkflow") {
dialogRef.value.data = row;
dialogRef.value.value = row.lastEnum;
dialogRef.value.workflowName = workflowName;
const config = getWorkflowConfig(workflowName);
dialogRef.value.workflowOptions = config.enumOptions || [];
//
const wf = row.TaskInfo?.workflows?.find((w) => w.workflowName === workflowName);
// 使 LastEnum
if (!wf && workflowName === "VideoSliceWorkflow") {
dialogRef.value.value = row.lastEnum;
} else {
dialogRef.value.value = wf ? wf.currentStep : null;
}
dialogRef.value.visible = true;
}
async function submitRowRload() {
await ReStart(dialogRef.value.data.id, dialogRef.value.value);
dialogRef.value.visible = false;
message("重试任务", { type: "success" });
const { id } = dialogRef.value.data;
const { value, workflowName } = dialogRef.value;
const config = getWorkflowConfig(workflowName);
if (config && config.retryApi) {
await config.retryApi(id, value);
dialogRef.value.visible = false;
message(`重试任务 (${workflowName})`, { type: "success" });
} else {
message(`未找到工作流配置 (${workflowName})`, { type: "error" });
}
}
async function expandChange(row: any, expandedRows: any[]) {
if (expandedRows.find((s) => s == row)) RloadTaskInfo(row);
}
function previewTask(row: any) {
let pageName = "showTask";
let queryData = { id: row.id.toString() };
function previewTask(row: any, workflowName: string) {
const idStr = row.id.toString();
let queryData = { id: idStr };
const config = getWorkflowConfig(workflowName);
const pageName = config.previewRouteName;
useMultiTagsStoreHook().handleTags("push", {
path: `/welcome/showTask_` + row.id,
path: `/welcome/${pageName}_` + idStr,
name: pageName,
query: queryData,
params: { id: idStr },
meta: {
title: `任务预览` + row.id.toString().slice(-4),
title: `${workflowName}预览` + idStr.slice(-4),
dynamicLevel: 3,
keepAlive: true,
},
});
//
route.push({ name: pageName, query: queryData });
route.push({ name: pageName, params: { id: idStr }, query: queryData });
}
function firstLetterToLower(str) {
if (typeof str !== "string" || !str) return str;
@ -153,26 +193,27 @@ async function RloadTaskInfo(row: any) {
let res = await RowRload(row.id);
row.TaskInfo = res;
row.TaskInfo.logs = row.TaskInfo.logs.reverse();
row.TaskInfo.stepData = JSON.parse(JSON.stringify(stepData.value));
row.TaskInfo.active = row.TaskInfo.stepData.findIndex(
(s) => s.title == row.TaskInfo.lastEnum
);
if (row.TaskInfo.startTime != null) {
for (const element of row.TaskInfo.stepData) {
element.time = formatDateToChinese(
row.TaskInfo.startTime[firstLetterToLower(element.title)]
);
let i = row.TaskInfo.stepData.indexOf(element);
if (i < row.TaskInfo.active) {
element.status = "success";
} else if (row.TaskInfo.active == 6 && element.value == 60) {
element.status = "success";
} else if (i == row.TaskInfo.active) {
element.status = "process";
} else {
element.status = "wait";
}
}
// workflows LastEnum VideoSliceWorkflow
if (!row.TaskInfo.workflows || row.TaskInfo.workflows.length === 0) {
row.TaskInfo.workflows = [
{
id: 0,
videoTaskId: row.id,
workflowName: "VideoSliceWorkflow",
currentStep: row.TaskInfo.lastEnum,
currentStepValue: 0, //
updateTime: new Date().toISOString(),
},
{
id: 0,
videoTaskId: row.id,
workflowName: "TidySlideWorkflow",
currentStep: row.TaskInfo.lastEnum,
currentStepValue: 0, //
updateTime: new Date().toISOString(),
},
];
}
}
function formatDateToChinese(dateString) {
@ -194,15 +235,47 @@ interface StepData {
title: string;
value: number;
}
const stepData = ref<StepData[]>([
{ status: "wait", time: null, title: "下载文件", value: 5 },
{ status: "wait", time: null, title: "分离音频", value: 10 },
{ status: "wait", time: null, title: "解析字幕", value: 20 },
{ status: "wait", time: null, title: "AI课程类型", value: 30 },
{ status: "wait", time: null, title: "AI模型分析", value: 40 },
{ status: "wait", time: null, title: "AI分析试题", value: 50 },
{ status: "wait", time: null, title: "结束任务", value: 60 },
]);
//
function getWorkflowSteps(workflowName: string) {
const config = getWorkflowConfig(workflowName);
return config.steps || [];
}
function getWorkflowStepActive(wf: any) {
const steps = getWorkflowSteps(wf.workflowName);
return steps.findIndex((s) => s.title === wf.currentStep);
}
function getWorkflowStepStatus(wf: any, step: StepData) {
const steps = getWorkflowSteps(wf.workflowName);
const activeIndex = steps.findIndex((s) => s.title === wf.currentStep);
const stepIndex = steps.findIndex((s) => s.title === step.title);
if (stepIndex < activeIndex) return "success";
if (stepIndex === activeIndex) return "process";
return "wait";
}
function getWorkflowLogs(logs: any[], workflowName: string) {
if (!logs) return [];
//
// 1. workflowName
// 2. workflowName VideoSliceWorkflow
return logs.filter((log) => {
if (log.workflowName) {
return log.workflowName === workflowName;
}
//
if (workflowName === "VideoSliceWorkflow") return true;
// : TidySlideWorkflow, UploadWorkflow
if (workflowName === "TidySlideWorkflow" && log.workflowName === "UploadWorkflow")
return true;
return false;
});
}
</script>
<template>
@ -231,58 +304,111 @@ const stepData = ref<StepData[]>([
</div>
</div>
<div v-else>
<span>进度</span>
<span>任务进度</span>
<div class="content">
{{ props.row.TaskInfo.lastEnum }} {{ props.row.TaskInfo.progress }}
{{ props.row.TaskInfo.lastEnum }}
<!-- 全局进度展示如果需要可以保留或完全移入 Tab -->
</div>
</div>
<div>
<span>操作</span>
<span>全局操作</span>
<div class="content">
<el-button
type="primary"
:icon="Refresh"
@click="RloadTaskInfo(props.row)"
circle
title="刷新任务详情"
/>
<el-button type="danger" @click="showDialog(props.row)">重试</el-button>
<el-button type="primary" @click="previewTask(props.row)">预览</el-button>
</div>
</div>
<div class="grid_item_full_width" v-if="props.row.TaskInfo != null">
<span>步骤</span>
<el-steps
class="content"
style="max-width: 100%"
:active="props.row.TaskInfo?.active"
align-center
>
<el-step
v-for="s in props.row.TaskInfo.stepData"
:title="s.title"
:description="s.time"
:status="s.status"
/>
</el-steps>
</div>
<div class="grid_item_full_width" v-if="props.row.TaskInfo != null">
<span>日志</span>
<div class="content">
<el-timeline
style="max-height: 200px; padding-top: 2px; overflow-y: scroll"
<el-tabs type="border-card">
<el-tab-pane
v-for="wf in props.row.TaskInfo.workflows"
:key="wf.id"
:label="wf.workflowName"
>
<el-timeline-item
v-for="(activity, index) in props.row.TaskInfo.logs"
:key="index"
:timestamp="activity.createTime"
<div class="flex justify-between items-center mb-4">
<div class="text-gray-600">
<div class="font-bold mb-1">
当前状态: {{ wf.currentStep }}
<span
v-if="
wf.workflowName === 'VideoSliceWorkflow' &&
props.row.TaskInfo.progress
"
class="ml-2 text-primary"
>
({{ props.row.TaskInfo.progress }})
</span>
<span
v-else-if="
wf.workflowName === 'TidySlideWorkflow' &&
props.row.TaskInfo.tidySlideProgress
"
class="ml-2 text-blue-600"
>
({{ props.row.TaskInfo.tidySlideProgress }})
</span>
</div>
<div class="text-sm">
更新时间: {{ formatDateToChinese(wf.updateTime) }}
</div>
</div>
<div>
<el-button
type="danger"
size="small"
@click="showDialog(props.row, wf.workflowName)"
>
重试此工作流
</el-button>
<el-button
type="primary"
size="small"
@click="previewTask(props.row, wf.workflowName)"
>
预览结果
</el-button>
</div>
</div>
<el-steps
class="content"
style="max-width: 100%"
:active="getWorkflowStepActive(wf)"
align-center
>
{{ activity.message }}
</el-timeline-item>
</el-timeline>
</div>
<el-step
v-for="s in getWorkflowSteps(wf.workflowName)"
:key="s.title"
:title="s.title"
:status="getWorkflowStepStatus(wf, s)"
/>
</el-steps>
<div class="mt-4">
<div class="font-bold mb-2">工作流日志</div>
<el-timeline
style="max-height: 300px; padding-top: 2px; overflow-y: scroll"
>
<el-timeline-item
v-for="(log, index) in getWorkflowLogs(
props.row.TaskInfo.logs,
wf.workflowName
)"
:key="index"
:timestamp="log.createTime"
:type="log.message?.includes('异常') ? 'danger' : 'primary'"
>
{{ log.message }}
</el-timeline-item>
</el-timeline>
</div>
</el-tab-pane>
</el-tabs>
</div>
</div>
</div>
@ -300,7 +426,7 @@ const stepData = ref<StepData[]>([
style="width: 240px"
>
<el-option
v-for="item in redisChannelEnum"
v-for="item in dialogRef.workflowOptions"
:key="item.value"
:label="item.text"
:value="item.value"

View File

@ -0,0 +1,96 @@
<script setup lang="ts">
import { onMounted, ref, watch } from "vue";
import { RunningTaskList, GetOnlineDevices } from "@/api/videoTask";
import videoTask from "./index.vue";
import { SearchConditions, TableConfig } from "@/components/hTable/hTable";
defineOptions({
name: `monitor`,
});
const devices = ref<string[]>([]);
const selectedDevice = ref<string>("all");
const videoTaskKey = ref(0);
const fetchDevices = async () => {
const res = await GetOnlineDevices();
devices.value = res;
};
async function searchCallback(s: SearchConditions, tv: TableConfig): Promise<boolean> {
// Pass deviceId to API
const params = { ...s, DeviceId: selectedDevice.value };
let res = await RunningTaskList(params);
tv.data = res.data.map((s: any, i: number) => {
return { ...s, customId: i };
});
tv.pageData = res;
return true;
}
const handleDeviceClick = (device: string) => {
if (selectedDevice.value === device) return;
selectedDevice.value = device;
videoTaskKey.value++; // Force re-render to trigger initial search with new device
};
onMounted(async () => {
await fetchDevices();
});
</script>
<template>
<div>
<el-card class="mb-4">
<template #header>
<div class="card-header flex justify-between items-center">
<span>在线设备 ({{ devices.length }})</span>
<el-button text @click="fetchDevices">刷新设备</el-button>
</div>
</template>
<div class="device-list">
<el-tag
class="mx-1 cursor-pointer"
:effect="selectedDevice === 'all' ? 'dark' : 'plain'"
@click="handleDeviceClick('all')"
>
全部
</el-tag>
<el-tag
v-for="device in devices"
:key="device"
class="mx-1 cursor-pointer"
:effect="selectedDevice === device ? 'dark' : 'plain'"
@click="handleDeviceClick(device)"
>
{{ device }}
</el-tag>
</div>
</el-card>
<videoTask :key="videoTaskKey" :searchCallback="searchCallback"></videoTask>
</div>
</template>
<style scoped>
.device-list {
display: flex;
flex-wrap: wrap;
gap: 10px;
}
.cursor-pointer {
cursor: pointer;
}
.mb-4 {
margin-bottom: 1rem;
}
.flex {
display: flex;
}
.justify-between {
justify-content: space-between;
}
.items-center {
align-items: center;
}
</style>

View File

@ -0,0 +1,117 @@
<template>
<div class="p-4 bg-white rounded-lg shadow-md min-h-[500px]">
<h2 class="text-2xl font-bold mb-6 text-gray-800 border-b pb-2">
TidySlide 任务结果预览 (ID: {{ taskId }})
</h2>
<div v-if="loading" class="flex justify-center items-center h-64">
<el-icon class="is-loading text-4xl text-blue-500"><Loading /></el-icon>
<span class="ml-2 text-gray-500">加载中...</span>
</div>
<div v-else-if="taskResult" class="space-y-6">
<!-- 基本信息卡片 -->
<div class="bg-gray-50 p-6 rounded-lg border border-gray-200">
<div class="grid grid-cols-1 md:grid-cols-2 gap-4">
<div class="flex items-center">
<span class="font-bold text-gray-600 w-24">任务ID:</span>
<span class="text-gray-800">{{ taskResult.videoTaskId }}</span>
</div>
<div class="flex items-center">
<span class="font-bold text-gray-600 w-24">VOD VideoId:</span>
<span class="font-mono text-blue-600 bg-blue-50 px-2 py-1 rounded">{{ taskResult.videoId }}</span>
</div>
<div class="flex items-center">
<span class="font-bold text-gray-600 w-24">创建时间:</span>
<span class="text-gray-800">{{ formatDate(taskResult.createTime) }}</span>
</div>
<div class="flex items-center">
<span class="font-bold text-gray-600 w-24">媒体地址:</span>
<span v-if="taskResult.mediaUrl" class="text-gray-800 truncate max-w-xs" :title="taskResult.mediaUrl">{{ taskResult.mediaUrl }}</span>
<span v-else class="text-gray-400 italic">暂无播放地址</span>
</div>
</div>
</div>
<!-- 播放器区域 (如果有播放地址) -->
<div v-if="taskResult.mediaUrl" class="mt-6">
<h3 class="text-lg font-semibold mb-3 text-gray-700">视频预览</h3>
<div class="aspect-w-16 aspect-h-9 bg-black rounded-lg overflow-hidden shadow-lg">
<video
controls
class="w-full h-full object-contain"
:src="taskResult.mediaUrl"
>
您的浏览器不支持 HTML5 视频
</video>
</div>
</div>
<div v-else class="mt-6 p-8 text-center bg-yellow-50 rounded-lg border border-yellow-100">
<el-icon class="text-yellow-500 text-3xl mb-2"><Warning /></el-icon>
<p class="text-yellow-700">
该视频已上传至 VOD但尚未生成播放地址<br/>
请前往阿里云 VOD 控制台查看 VideoId: <strong>{{ taskResult.videoId }}</strong>
</p>
</div>
</div>
<div v-else class="flex flex-col justify-center items-center h-64 text-gray-400">
<el-icon class="text-5xl mb-4"><DocumentDelete /></el-icon>
<span>未找到相关任务结果</span>
</div>
</div>
</template>
<script setup lang="ts">
import { ref, onMounted } from "vue";
import { useRoute } from "vue-router";
import { ShowTidySlideTaskInfo } from "@/api/videoTask";
import { message } from "@/utils/message";
import { isEmpty } from "@pureadmin/utils";
import { Loading, Warning, DocumentDelete } from "@element-plus/icons-vue";
defineOptions({
name: "showTidySlideTask",
});
const route = useRoute();
const loading = ref(false);
const taskId = ref("");
const taskResult = ref<any>(null);
function formatDate(dateString: string) {
if (!dateString) return "-";
return new Date(dateString).toLocaleString();
}
onMounted(async () => {
const data = isEmpty(route.params) ? route.query : route.params;
if (isEmpty(data.id) || data.id == null) {
message("无效的任务ID", { type: "warning" });
return;
}
taskId.value = data.id.toString();
loading.value = true;
try {
const res = await ShowTidySlideTaskInfo(taskId.value);
if (res) {
taskResult.value = res;
} else {
message("未找到任务结果数据", { type: "info" });
}
} catch (error) {
console.error(error);
message("获取任务结果失败", { type: "error" });
} finally {
loading.value = false;
}
});
</script>
<style scoped>
/* 使用 Tailwind CSS 类,无需额外样式 */
</style>

View File

@ -0,0 +1,61 @@
import { ReStart, ReStartTidySlide } from "@/api/videoTask";
import { ComboModel } from "@/components/hTable/hTable";
export interface StepData {
status: "" | "wait" | "process" | "finish" | "error" | "success";
time: string | null;
title: string;
value: number;
}
export interface WorkflowConfig {
name: string;
steps: StepData[];
enumName: string; // 后端枚举名称,用于获取下拉列表
previewRouteName: string; // 预览页面路由名称
retryApi: (id: any, selectEnum: number) => Promise<any>; // 重试 API 函数
enumOptions?: ComboModel[]; // 运行时加载的枚举选项
}
// 默认视频切片工作流步骤
const videoSliceSteps: StepData[] = [
{ status: "wait", time: null, title: "下载文件", value: 5 },
{ status: "wait", time: null, title: "分离音频", value: 10 },
{ status: "wait", time: null, title: "解析字幕", value: 20 },
{ status: "wait", time: null, title: "AI课程类型", value: 30 },
{ status: "wait", time: null, title: "AI模型分析", value: 40 },
{ status: "wait", time: null, title: "AI分析试题", value: 50 },
{ status: "wait", time: null, title: "结束任务", value: 60 },
];
// 视频合并工作流步骤
const tidySlideSteps: StepData[] = [
{ status: "wait", time: null, title: "下载文件", value: 10 },
{ status: "wait", time: null, title: "合并切片", value: 20 },
{ status: "wait", time: null, title: "上传视频", value: 30 },
{ status: "wait", time: null, title: "结束任务", value: 40 },
];
export const workflowRegistry: Record<string, WorkflowConfig> = {
VideoSliceWorkflow: {
name: "VideoSliceWorkflow",
steps: videoSliceSteps,
enumName: "RedisChannelEnum",
previewRouteName: "showTask",
retryApi: ReStart,
},
TidySlideWorkflow: {
name: "TidySlideWorkflow",
steps: tidySlideSteps,
enumName: "RedisTidySlideChannelEnum",
previewRouteName: "showTidySlideTask",
retryApi: ReStartTidySlide,
},
};
export function getWorkflowConfig(workflowName: string): WorkflowConfig {
// 默认返回 VideoSliceWorkflow 配置,兼容旧数据
return (
workflowRegistry[workflowName] || workflowRegistry["VideoSliceWorkflow"]
);
}

View File

@ -76,7 +76,7 @@
//"ConnectionString": "AllowLoadLocalInfile=true;Server=rm-2vc20nd3d11g0oh6g2o.rwlb.cn-chengdu.rds.aliyuncs.com;User ID=marking;Password=poiuytPOIUYT098765)(*&^%;Port=3306;Database=learn.videoanalysis;CharSet=utf8mb4;pooling=true;SslMode=None",
"SqlType": "MySql",
"UpdateTable": false
"UpdateTable": true
},
"AlibabaCloudVod": {
"AccessKeyId": "LTAI5tDC6p9h747B7FHbgwkH",

View File

@ -42,10 +42,10 @@ namespace VideoAnalysisCore.AICore.FFMPGE
? $"/usr/bin/ffmpeg"
: Path.Combine(AppCommon.AIModelFile, "ffmpeg.exe");
private Repository<VideoTask> videoTaskDB { get; set; }
private RedisManager redisManager { get; set; }
public FFMPGEHandle(RedisManager redisManager, Repository<VideoTask> videoTaskDB)
private VideoSliceWorkflowManager _workflowManager { get; set; }
public FFMPGEHandle(VideoSliceWorkflowManager workflowManager, Repository<VideoTask> videoTaskDB)
{
this.redisManager = redisManager;
_workflowManager = workflowManager;
this.videoTaskDB = videoTaskDB;
}
@ -69,16 +69,16 @@ namespace VideoAnalysisCore.AICore.FFMPGE
var filePath = Path.Combine(localPath, "ppt.mp4");
if (!File.Exists(filePath))
{
redisManager.AddTaskLog(task,"存在PPT Code但未能找到对应资源文件");
_workflowManager.AddTaskLog(task,"存在PPT Code但未能找到对应资源文件");
return;
}
var ffmpeg = new Engine(FFmpegPath);
var cToken = new CancellationToken();
redisManager.SetTaskProgress(task, "Frame=>10%");
_workflowManager.SetTaskProgress(task, "Frame=>10%");
foreach (string jpgFile in Directory.GetFiles(localPath, "*.jpg"))
File.Delete(jpgFile);
redisManager.SetTaskProgress(task, "Frame=>20%");
_workflowManager.SetTaskProgress(task, "Frame=>20%");
await ffmpeg.ExecuteAsync($"-i {filePath} -vf \"fps=1/{intervalSec},scale=960:540\" {localPath}/{ExpandFunction.FrameName}%03d.jpg", cToken);
@ -86,7 +86,7 @@ namespace VideoAnalysisCore.AICore.FFMPGE
var frameFiles = Directory.GetFiles(localPath, "*.jpg")
.OrderBy(f => f)
.ToList();
redisManager.SetTaskProgress(task, "Frame=>80%");
_workflowManager.SetTaskProgress(task, "Frame=>80%");
Image<Rgb24> prevFrame = null;
var keyFrames = new List<int>(10) { 5};
foreach (var frameFile in frameFiles)
@ -237,7 +237,7 @@ namespace VideoAnalysisCore.AICore.FFMPGE
// -shortest 以最短的流为准
var mergeArgs = $"-i \"{pptPath}\" -i \"{taskPath}\" -map 0:v -map 1:a -c:v copy -c:a aac -strict experimental -shortest \"{mergedPath}\" -y";
await redisManager.AddTaskLog(task, "开始合并视频与音频...");
await _workflowManager.AddTaskLog(task, "开始合并视频与音频...");
await ffmpeg.ExecuteAsync(mergeArgs, cToken);
if (!File.Exists(mergedPath)) throw new Exception("视频合并失败");
@ -250,13 +250,13 @@ namespace VideoAnalysisCore.AICore.FFMPGE
// -hls_segment_filename out%03d.ts 切片文件名
var sliceArgs = $"-i \"{mergedPath}\" -c copy -f hls -hls_time 10 -hls_list_size 0 -hls_segment_filename \"{Path.Combine(localPath, "out%03d.ts")}\" \"{m3u8Path}\" -y";
await redisManager.AddTaskLog(task, "开始视频切片...");
await _workflowManager.AddTaskLog(task, "开始视频切片...");
await ffmpeg.ExecuteAsync(sliceArgs, cToken);
if (!File.Exists(m3u8Path)) throw new Exception("视频切片失败");
// 更新任务状态或路径? 目前只需要生成文件
await redisManager.AddTaskLog(task, "视频处理完成");
await _workflowManager.AddTaskLog(task, "视频处理完成");
}
}
}

View File

@ -1,4 +1,4 @@
using VideoAnalysisCore.Common;
using VideoAnalysisCore.Common;
using System.Net.Http.Headers;
using System.Text;
using Microsoft.Extensions.Logging;
@ -26,7 +26,7 @@ namespace VideoAnalysisCore.AICore.GPT.ChatGPT
private readonly IHttpClientFactory _httpClientFactory;
private readonly RedisManager redisManager;
public BestAIClient(IHttpClientFactory httpClientFactory, RedisManager redisManager) : base(httpClientFactory, redisManager)
public BestAIClient(IHttpClientFactory httpClientFactory, RedisManager redisManager, VideoSliceWorkflowManager workflowManager) : base(httpClientFactory, redisManager, workflowManager)
{
_httpClientFactory = httpClientFactory;
this.redisManager = redisManager;

View File

@ -1,4 +1,4 @@
using VideoAnalysisCore.Common;
using VideoAnalysisCore.Common;
using System.Net.Http.Headers;
using System.Text;
using Microsoft.Extensions.Logging;
@ -21,8 +21,8 @@ namespace VideoAnalysisCore.AICore.GPT.DeepSeek
private readonly IHttpClientFactory _httpClientFactory;
private readonly RedisManager redisManager;
public DeepSeekGPTClient(IHttpClientFactory httpClientFactory, RedisManager redisManager)
: base(httpClientFactory, redisManager)
public DeepSeekGPTClient(IHttpClientFactory httpClientFactory, RedisManager redisManager, VideoSliceWorkflowManager workflowManager)
: base(httpClientFactory, redisManager, workflowManager)
{
_httpClientFactory = httpClientFactory;
this.redisManager = redisManager;

View File

@ -1,4 +1,4 @@
using VideoAnalysisCore.Common;
using VideoAnalysisCore.Common;
using System.Net.Http.Headers;
using System.Text;
using Microsoft.Extensions.Logging;
@ -24,12 +24,14 @@ namespace VideoAnalysisCore.AICore.GPT
private readonly IHttpClientFactory _httpClientFactory;
private readonly RedisManager redisManager;
private readonly VideoSliceWorkflowManager _workflowManager;
public GPTClient(IHttpClientFactory httpClientFactory, RedisManager redisManager)
public GPTClient(IHttpClientFactory httpClientFactory, RedisManager redisManager, VideoSliceWorkflowManager workflowManager)
{
_httpClientFactory = httpClientFactory;
this.redisManager = redisManager;
_workflowManager = workflowManager;
}
/// <summary>
@ -73,7 +75,7 @@ namespace VideoAnalysisCore.AICore.GPT
var chatResp = await PostJsonStreamAsync(Config.Host + Config.Path, chatReq, Config.ApiKey);
if (!chatResp.IsSuccessStatusCode)
{
await redisManager.AddTaskLog(chatReq.taskId, "==>请求GPT服务器异常 " + chatResp?.StatusCode + " " + await chatResp.Content.ReadAsStringAsync());
await _workflowManager.AddTaskLog(chatReq.taskId, "==>请求GPT服务器异常 " + chatResp?.StatusCode + " " + await chatResp.Content.ReadAsStringAsync());
if (--i < 0)
{
throw new Exception("请求GPT服务器失败次数过多");
@ -101,7 +103,7 @@ namespace VideoAnalysisCore.AICore.GPT
}
catch (OperationCanceledException)
{
await redisManager.AddTaskLog(chatReq.taskId, "==>流式响应超时(3分钟),尝试重新读取...");
await _workflowManager.AddTaskLog(chatReq.taskId, "==>流式响应超时(3分钟),尝试重新读取...");
maxLoop--;
continue;
}
@ -143,17 +145,17 @@ namespace VideoAnalysisCore.AICore.GPT
{
var steamCount = messageBuilder.Length + messageBuilder1.Length;
if (++threshold % 30 == 0)
redisManager.SetTaskProgress(chatReq.taskId, "steam=>" + steamCount);
_workflowManager.SetTaskProgress(chatReq.taskId, "steam=>" + steamCount);
}
}
catch (Exception e)
{
await redisManager.AddTaskLog(chatReq.taskId, "异常 ChatSSE=>" + line + "\r\n" + e.Message + "\r\n" + e.StackTrace);
await _workflowManager.AddTaskLog(chatReq.taskId, "异常 ChatSSE=>" + line + "\r\n" + e.Message + "\r\n" + e.StackTrace);
}
}
}
throw new Exception(DateTime.Now + "==>AI请求超时 " + chatReq.taskId);
//await redisManager.AddTaskLog(chatReq.taskId, DateTime.Now + "==>AI请求超时 " + chatReq.taskId);
//await _workflowManager.AddTaskLog(chatReq.taskId, DateTime.Now + "==>AI请求超时 " + chatReq.taskId);
//return null;
}
@ -224,10 +226,10 @@ namespace VideoAnalysisCore.AICore.GPT
}
catch (Exception ex)
{
await redisManager.AddTaskLog(chatRep.taskId, $"==>GPT结果解析错误 重试剩余{tryCount} {ex.Message} {ex.StackTrace}");
await _workflowManager.AddTaskLog(chatRep.taskId, $"==>GPT结果解析错误 重试剩余{tryCount} {ex.Message} {ex.StackTrace}");
}
}
await redisManager.AddTaskLog(chatRep.taskId, $"==>GPT请求失败次数过多!!!");
await _workflowManager.AddTaskLog(chatRep.taskId, $"==>GPT请求失败次数过多!!!");
throw new Exception(DateTime.Now + "==>GPT请求失败次数过多!!!");
}
@ -267,7 +269,7 @@ namespace VideoAnalysisCore.AICore.GPT
{e.StackTrace}
==============================================
""";
await redisManager.AddTaskLog(data.taskId, $"==>GPT Http请求失败 {msg} 1秒后重试");
await _workflowManager.AddTaskLog(data.taskId, $"==>GPT Http请求失败 {msg} 1秒后重试");
Thread.Sleep(1000);
}
}

View File

@ -1,4 +1,4 @@
using VideoAnalysisCore.Common;
using VideoAnalysisCore.Common;
using System.Text.Json;
using VideoAnalysisCore.Model;
using System.Text;
@ -40,6 +40,7 @@ namespace VideoAnalysisCore.AICore.GPT
private readonly BestAIClient chatGPTClient;
private readonly Repository<CourseGradingCriteria> criteriaDB;
private readonly RedisManager redisManager;
private readonly VideoSliceWorkflowManager _workflowManager;
private readonly Repository<VideoTask> videoTaskDB;
private readonly Repository<VideoKonwPoint> videoKonwPointDB;
private readonly Repository<VideoTaskStage> videoTaskStageDB;
@ -55,7 +56,7 @@ namespace VideoAnalysisCore.AICore.GPT
/// <param name="logger"></param>
public GTP_Analysis_1(DeepSeekGPTClient moonshotClient, Repository<CourseGradingCriteria> criteria, Repository<VideoTask> videoTaskDB,
Repository<KnowledgeInfo> knowledgeInfoDB, Repository<VideoKonwPoint> videoKonwPointDB, SimpLetexClient simpLetexClient,
Repository<VideoQuestion> videoQuestionDB, OssClient ossClient, Repository<VideoQuestionKonw> videoQuestionKonwDB, RedisManager redisManager, BestAIClient chatGPTClient, GeminiGPTClient geminiClient, Repository<VideoTaskStage> videoTaskStageDB)
Repository<VideoQuestion> videoQuestionDB, OssClient ossClient, Repository<VideoQuestionKonw> videoQuestionKonwDB, RedisManager redisManager, VideoSliceWorkflowManager workflowManager, BestAIClient chatGPTClient, GeminiGPTClient geminiClient, Repository<VideoTaskStage> videoTaskStageDB)
{
deepSeekClient = moonshotClient;
criteriaDB = criteria;
@ -67,6 +68,7 @@ namespace VideoAnalysisCore.AICore.GPT
this.ossClient = ossClient;
this.videoQuestionKonwDB = videoQuestionKonwDB;
this.redisManager = redisManager;
_workflowManager = workflowManager;
this.chatGPTClient = chatGPTClient;
this.geminiClient = geminiClient;
this.videoTaskStageDB = videoTaskStageDB;
@ -108,7 +110,7 @@ namespace VideoAnalysisCore.AICore.GPT
Id|Name{knows}
{checkResFormat1}
""";
await redisManager.AddTaskLog(taskInfo.Id, "==>2.开始分析视频内容知识点");
await _workflowManager.AddTaskLog(taskInfo.Id, "==>2.开始分析视频内容知识点");
List<VideoKnowRes> konwRes;
var knowOK = false;
var chatClentArr = new GPTClient[] { chatGPTClient, geminiClient, deepSeekClient };
@ -127,7 +129,7 @@ namespace VideoAnalysisCore.AICore.GPT
}
if (!knowOK)
{
await redisManager.AddTaskLog(taskInfo.Id, "GPT未能分析出有效的分段的知识点");
await _workflowManager.AddTaskLog(taskInfo.Id, "GPT未能分析出有效的分段的知识点");
throw new Exception("GPT未能分析出有效的分段的知识点");
}
@ -297,7 +299,7 @@ namespace VideoAnalysisCore.AICore.GPT
var newCaptionsList = new List<SenseVoiceRes>(captionsArr.Length);
var spanCount = 75;
var totalCount = captionsArr.Length / spanCount + 1;
await redisManager.AddTaskLog(taskInfo.Id, $"==>字幕优化");
await _workflowManager.AddTaskLog(taskInfo.Id, $"==>字幕优化");
Func<string, Task<List<SenseVoiceInput>>>[] chatClentArr =
[
@ -360,12 +362,12 @@ namespace VideoAnalysisCore.AICore.GPT
if (cArr.Count() - resData.Count() < 5)
break;
else
await redisManager.AddTaskLog(taskInfo.Id, $"==>字幕优化 分段{s} AI结果数量不匹配 重试{i} 剩余{captionsArr.Length - (decimal)newCaptionsList.Count}条字幕");
await _workflowManager.AddTaskLog(taskInfo.Id, $"==>字幕优化 分段{s} AI结果数量不匹配 重试{i} 剩余{captionsArr.Length - (decimal)newCaptionsList.Count}条字幕");
}
if (cArr.Count() - resData.Count() > 5)
{
resData = cStrArr.ToList();
await redisManager.AddTaskLog(taskInfo.Id, $"==>字幕优化 分段{s} AI结果数量不匹配 采用原始值");
await _workflowManager.AddTaskLog(taskInfo.Id, $"==>字幕优化 分段{s} AI结果数量不匹配 采用原始值");
}
newCaptionsList.AddRange(resData.Select((el, i) => new SenseVoiceRes()
{
@ -424,13 +426,13 @@ namespace VideoAnalysisCore.AICore.GPT
{captions.Captions} !
""";
await redisManager.AddTaskLog(taskInfo.Id, $"开始分析视频内容 {tryCount}");
await _workflowManager.AddTaskLog(taskInfo.Id, $"开始分析视频内容 {tryCount}");
var res = await geminiClient.ChatAsync<List<VideoKnowRes>>(taskInfo.Id.ToString(), postMessages, "分析字幕", ChatGPTType.Gemini_3_Chat);
return res;
}
catch (Exception ex)
{
await redisManager.AddTaskLog(taskInfo.Id, $"分析视频内容失败 {tryCount} \r\n{ex.Message}\r\n{ex.StackTrace}");
await _workflowManager.AddTaskLog(taskInfo.Id, $"分析视频内容失败 {tryCount} \r\n{ex.Message}\r\n{ex.StackTrace}");
}
}
return null;
@ -568,9 +570,9 @@ namespace VideoAnalysisCore.AICore.GPT
/// <returns></returns>
private async Task<SenseVoiceRes[]> AnalysisVideoQuestions(VideoTask taskInfo, List<KnowledgeInfo> knowledgeInfos)
{
await redisManager.AddTaskLog(taskInfo.Id, $"==>提取试题功能已禁用");
await _workflowManager.AddTaskLog(taskInfo.Id, $"==>提取试题功能已禁用");
return null;
await redisManager.AddTaskLog(taskInfo.Id, $"==>{taskInfo.Id} 提取试题");
await _workflowManager.AddTaskLog(taskInfo.Id, $"==>{taskInfo.Id} 提取试题");
if (taskInfo is null || string.IsNullOrEmpty(taskInfo.PPTKeyFrame))
return null;
var farmeArr = JsonSerializer.Deserialize<int[]>(taskInfo.PPTKeyFrame);
@ -611,7 +613,7 @@ namespace VideoAnalysisCore.AICore.GPT
break;
#if DEBUG
await redisManager.AddTaskLog(taskInfo.Id, $"==>{taskInfo.Id} 提取{knowInfoArr.StartTime}秒试题的试题内容");
await _workflowManager.AddTaskLog(taskInfo.Id, $"==>{taskInfo.Id} 提取{knowInfoArr.StartTime}秒试题的试题内容");
#endif
//var knowArr=JsonSerializer.Serialize(knowInfoArr.Select(s => new { s.KnowPointId, s.KnowPoint }));
@ -670,7 +672,7 @@ namespace VideoAnalysisCore.AICore.GPT
}
catch (Exception ex)
{
await redisManager.AddTaskLog(taskInfo.Id, $"==>{taskInfo.Id} 提取{knowInfoArr.StartTime}秒试题出现错误 {ex.Message}");
await _workflowManager.AddTaskLog(taskInfo.Id, $"==>{taskInfo.Id} 提取{knowInfoArr.StartTime}秒试题出现错误 {ex.Message}");
}
}
}
@ -748,7 +750,7 @@ namespace VideoAnalysisCore.AICore.GPT
var homework = await DetectHomeworkAssignment(taskInfo, captions, sections);
if (homework != null)
{
await redisManager.AddTaskLog(taskInfo.Id, $"==>识别到作业布置 {homework.Content}");
await _workflowManager.AddTaskLog(taskInfo.Id, $"==>识别到作业布置 {homework.Content}");
await redisManager.Redis.HMSetAsync(RedisExpandKey.Task(task), "Homework", homework);
}
var maxVideoTime = captions?.TimeBase?.LastOrDefault()?.End ?? 0;
@ -765,16 +767,16 @@ namespace VideoAnalysisCore.AICore.GPT
//校验结果质量
var checkRes = await VerifySpanQuality(questionRes, taskInfo, captions, sections, Course_Id);
await redisManager.AddTaskLog(taskInfo.Id, $"==>课堂内容AI分析结果 得分=>{checkRes.Score}");
await redisManager.AddTaskLog(taskInfo.Id, $"==>改进意见 {checkRes.Suggestion}");
await redisManager.AddTaskLog(taskInfo.Id, $"==>扣分原因 {checkRes.MinusScore}");
await _workflowManager.AddTaskLog(taskInfo.Id, $"==>课堂内容AI分析结果 得分=>{checkRes.Score}");
await _workflowManager.AddTaskLog(taskInfo.Id, $"==>改进意见 {checkRes.Suggestion}");
await _workflowManager.AddTaskLog(taskInfo.Id, $"==>扣分原因 {checkRes.MinusScore}");
// 质量复检
//if (checkRes != null)
//{
// var improved = await ImproveSpanBySuggestion(questionRes, taskInfo, captions, sections, "扣分原因 {checkRes.MinusScore} \n 改进意见 {checkRes.Suggestion}");
// var improvedCheck = await VerifySpanQuality(improved, taskInfo, captions, sections, Course_Id);
// await redisManager.AddTaskLog(taskInfo.Id, $"==>优化后复检得分=>{improvedCheck.Score}");
// await redisManager.AddTaskLog(taskInfo.Id, $"==>优化后扣分原因 {improvedCheck.MinusScore}");
// await _workflowManager.AddTaskLog(taskInfo.Id, $"==>优化后复检得分=>{improvedCheck.Score}");
// await _workflowManager.AddTaskLog(taskInfo.Id, $"==>优化后扣分原因 {improvedCheck.MinusScore}");
// if (improved != null)
// {
// if (improvedCheck != null && improvedCheck.Score >= 90 && improvedCheck.Score > checkRes.Score)
@ -783,7 +785,7 @@ namespace VideoAnalysisCore.AICore.GPT
// }
// else
// {
// await redisManager.AddTaskLog(taskInfo.Id, $"==>优化之后的得分降低/得分过低");
// await _workflowManager.AddTaskLog(taskInfo.Id, $"==>优化之后的得分降低/得分过低");
// continue;
// }
// }
@ -819,10 +821,10 @@ namespace VideoAnalysisCore.AICore.GPT
break;
}
else
await redisManager.AddTaskLog(taskInfo.Id, $"==>课堂内容AI分析结果不合格!即将重试 剩余次数{tryCount}");
await _workflowManager.AddTaskLog(taskInfo.Id, $"==>课堂内容AI分析结果不合格!即将重试 剩余次数{tryCount}");
if (questionRes.Any(s => s.KeepTime < 30))
{
await redisManager.AddTaskLog(taskInfo.Id, "==>视频分段过短!! 重新进行AI分析");
await _workflowManager.AddTaskLog(taskInfo.Id, "==>视频分段过短!! 重新进行AI分析");
continue;
}
}

View File

@ -1,4 +1,4 @@
using VideoAnalysisCore.Common;
using VideoAnalysisCore.Common;
using System.Net.Http.Headers;
using System.Text;
using Microsoft.Extensions.Logging;
@ -22,7 +22,7 @@ namespace VideoAnalysisCore.AICore.GPT.Gemini
private readonly IHttpClientFactory _httpClientFactory;
private readonly RedisManager redisManager;
public GeminiGPTClient(IHttpClientFactory httpClientFactory, RedisManager redisManager) : base(httpClientFactory, redisManager)
public GeminiGPTClient(IHttpClientFactory httpClientFactory, RedisManager redisManager, VideoSliceWorkflowManager workflowManager) : base(httpClientFactory, redisManager, workflowManager)
{
_httpClientFactory = httpClientFactory;
this.redisManager = redisManager;

View File

@ -51,16 +51,16 @@ namespace VideoAnalysisCore.AICore.SherpaOnnx
{
private VadModelConfig VADModelConfig;
private readonly RedisManager redisManager;
private readonly VideoSliceWorkflowManager _workflowManager;
private int WindowSize = 512;
private readonly IServiceProvider serviceProvider;
private readonly VoiceActivityDetector vad;
private Func<int, float[], OfflineStream> Callback;
public SherpaVad(RedisManager redisManager, IServiceProvider serviceProvider)
public SherpaVad(VideoSliceWorkflowManager workflowManager, IServiceProvider serviceProvider)
{
this.redisManager = redisManager;
_workflowManager = workflowManager;
this.serviceProvider = serviceProvider;
VADModelConfig = new VadModelConfig();
@ -171,7 +171,7 @@ namespace VideoAnalysisCore.AICore.SherpaOnnx
while (!vad.IsEmpty())
{
var p = ReadNext(vad,res, totalSecond);
if (p != null) redisManager.SetTaskProgress(task, p + "%");
if (p != null) _workflowManager.SetTaskProgress(task, p + "%");
}
}
}
@ -179,19 +179,19 @@ namespace VideoAnalysisCore.AICore.SherpaOnnx
while (!vad.IsEmpty())
{
var p = ReadNext(vad, res, totalSecond);
if(p!= null) redisManager.SetTaskProgress(task, p + "%");
if(p!= null) _workflowManager.SetTaskProgress(task, p + "%");
}
//如果携带任务ID
if (!string.IsNullOrEmpty(task))
{
_ = redisManager.AddTaskLog(task, "==>字幕数量" + res.Count);
_ = _workflowManager.AddTaskLog(task, "==>字幕数量" + res.Count);
var captionsStr = res.ToJson();
_ = serviceProvider.GetRequiredService<Repository<VideoTask>>()
.AsUpdateable()
.SetColumns(it => it.Captions == captionsStr)
.Where(it => it.Id == long.Parse(task))
.ExecuteCommandAsync();
_ = redisManager.Redis.HMSetAsync(RedisExpandKey.Task(task), "Captions", res);
_ = _workflowManager.Redis.HMSetAsync(RedisExpandKey.Task(task), "Captions", res);
//分析完成视频字幕后继续接收任务
//redisManager.NewTask();
}

View File

@ -1,4 +1,4 @@
using FFmpeg.NET.Services;
using FFmpeg.NET.Services;
using FreeRedis;
using Microsoft.Extensions.DependencyModel;
using Microsoft.IdentityModel.Tokens;
@ -113,7 +113,7 @@ namespace VideoAnalysisCore.Common
/// </summary>
/// <param name="taskId"></param>
/// <returns></returns>
public static async Task<bool> DeleteTaskFileAsync(long? taskId,RedisManager redisManager)
public static async Task<bool> DeleteTaskFileAsync(long? taskId, dynamic workflowManager)
{
if (taskId is null || taskId == 0) return false;
var path = taskId.ToString().LocalPath();
@ -129,16 +129,16 @@ namespace VideoAnalysisCore.Common
{
File.Delete(filePath);
await redisManager.AddTaskLog(taskId, $"已成功删除文件: {filePath}");
await workflowManager.AddTaskLog(taskId, $"已成功删除文件: {filePath}");
}
catch (Exception ex)
{
await redisManager.AddTaskLog(taskId, $"删除文件 {filePath} 时出错: {ex.Message}");
await workflowManager.AddTaskLog(taskId, $"删除文件 {filePath} 时出错: {ex.Message}");
}
}
//else
//{
// await redisManager.AddTaskLog(chatReq.taskId, $"文件不存在,跳过删除: {filePath}");
// await workflowManager.AddTaskLog(chatReq.taskId, $"文件不存在,跳过删除: {filePath}");
//}
}
return true;
@ -149,7 +149,7 @@ namespace VideoAnalysisCore.Common
/// </summary>
/// <param name="taskId"></param>
/// <returns></returns>
public static async Task<bool> DeleteTaskAllFileAsync(long? taskId, RedisManager redisManager)
public static async Task<bool> DeleteTaskAllFileAsync(long? taskId, dynamic workflowManager)
{
if (taskId is null || taskId == 0) return false;
var path = taskId.ToString().LocalPath();
@ -158,11 +158,11 @@ namespace VideoAnalysisCore.Common
try
{
Directory.Delete(path, true);
await redisManager.AddTaskLog(taskId, $"已清理所有缓存文件: {taskId}");
await workflowManager.AddTaskLog(taskId, $"已清理所有缓存文件: {taskId}");
}
catch (Exception ex)
{
await redisManager.AddTaskLog(taskId, $"删除缓存文件 {taskId} 时出错: {ex.Message}");
await workflowManager.AddTaskLog(taskId, $"删除缓存文件 {taskId} 时出错: {ex.Message}");
}
}
return true;

View File

@ -79,7 +79,7 @@ namespace VideoAnalysisCore.Common
public class WorkflowConfig
{
public WorkflowItemConfig Default { get; set; } = new WorkflowItemConfig();
public WorkflowItemConfig Upload { get; set; } = new WorkflowItemConfig { Enabled = true };
public WorkflowItemConfig TidySlide { get; set; } = new WorkflowItemConfig { Enabled = true };
}
public class WorkflowItemConfig
{

View File

@ -14,6 +14,7 @@ using VideoAnalysisCore.Model.Enum;
using AlibabaCloud.SDK.Vod20170321;
using UserCenter.Model.Enum;
using System.Security.Policy;
using AlibabaCloud.SDK.Vod20170321.Models;
namespace VideoAnalysisCore.Common
{
@ -97,14 +98,27 @@ namespace VideoAnalysisCore.Common
private readonly IServiceProvider serviceProvider;
readonly string taskVideoName = "task.mp4";
readonly string taskPPTVideoName = "ppt.mp4";
// 注入工作流管理器,用于更新进度
private readonly VideoSliceWorkflowManager _videoSliceWorkflowManager;
private readonly TidySlideWorkflowManager _tidySlideWorkflowManager;
public DownloadFile(Repository<VideoTask> videoTaskDB, Client vodClient, Repository<NodePackageInfo> nackageInfoTaskDB, RedisManager redisManager, IServiceProvider serviceProvider)
public DownloadFile(Repository<VideoTask> videoTaskDB, Client vodClient, Repository<NodePackageInfo> nackageInfoTaskDB, RedisManager redisManager, IServiceProvider serviceProvider, VideoSliceWorkflowManager videoSliceWorkflowManager, TidySlideWorkflowManager tidySlideWorkflowManager)
{
this.videoTaskDB = videoTaskDB;
this.vodClient = vodClient;
this.packageInfoTaskDB = nackageInfoTaskDB;
this.redisManager = redisManager;
this.serviceProvider = serviceProvider;
_videoSliceWorkflowManager = videoSliceWorkflowManager;
_tidySlideWorkflowManager = tidySlideWorkflowManager;
}
// 辅助方法:根据工作流名称获取对应的管理器实例
private dynamic GetWorkflowManager(string workflowName)
{
if (workflowName == "TidySlideWorkflow") return _tidySlideWorkflowManager;
return _videoSliceWorkflowManager;
}
// 根据 Content-Type 映射文件后缀
@ -131,7 +145,7 @@ namespace VideoAnalysisCore.Common
/// </summary>
/// <param name="task"></param>
/// <returns></returns>
public async Task RunTask(string task)
public async Task RunTask(string task, string workflowName = "VideoSliceWorkflow")
{
var taskId = long.Parse(task);
//获取资源文件 地址
@ -151,7 +165,7 @@ namespace VideoAnalysisCore.Common
}
if (string.IsNullOrEmpty(fileUrl))
{
var videoInfo = await vodClient.GetPlayInfoAsync(new AlibabaCloud.SDK.Vod20170321.Models.GetPlayInfoRequest()
var videoInfo = await vodClient.GetPlayInfoAsync(new GetPlayInfoRequest()
{
VideoId = taskInfo.TagId,
Formats = "mp4",
@ -200,45 +214,20 @@ namespace VideoAnalysisCore.Common
if (!string.IsNullOrEmpty(taskInfo.PPTVideoUrl))
{
await Download(taskInfo.PPTVideoUrl, localPath, taskPPTVideoName,
(s, e) => redisManager.SetTaskProgress(task, "PPT->" + Math.Round(e.ProgressPercentage, 1)
));
//try
//{
// var url = string.Empty;
// if (taskInfo.PPTVideoCode.Contains("http"))
// url = taskInfo.PPTVideoCode;
// else
// {
// var videoInfo = await vodClient.GetPlayInfoAsync(new AlibabaCloud.SDK.Vod20170321.Models.GetPlayInfoRequest()
// {
// VideoId = taskInfo.PPTVideoCode,
// Formats = "mp4",
// OutputType = "cdn",
// AuthTimeout = 3600 * 24 * 12,
// });
// if (videoInfo is null || videoInfo.StatusCode != 200 && !videoInfo.Body.PlayInfoList.PlayInfo.Any())
// throw new Exception($"{DateTime.Now} 视频订阅=>获取阿里云视频信息失败 VideoCode {taskInfo.TagId} StatusCode {videoInfo?.StatusCode}");
// url = videoInfo.Body.PlayInfoList.PlayInfo.First().PlayURL;
// }
//}
//catch
//{
// throw;
//}
(s, e) => GetWorkflowManager(workflowName).SetTaskProgress(task, "PPT->" + Math.Round(e.ProgressPercentage, 1))
);
}
try
{
//下载原视频
await Download(fileUrl, localPath, taskVideoName,
(s, e) => redisManager.SetTaskProgress(task, Math.Round(e.ProgressPercentage,1)
));
(s, e) => GetWorkflowManager(workflowName).SetTaskProgress(task, Math.Round(e.ProgressPercentage, 1))
);
}
catch
{
throw;
}
}

View File

@ -1,4 +1,4 @@
using Coravel;
using Coravel;
using Coravel.Scheduling.Schedule;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting.Server;
@ -15,6 +15,7 @@ using VideoAnalysisCore.Job;
using Microsoft.Extensions.Hosting;
using VideoAnalysisCore.Common;
using System.Diagnostics;
using FreeRedis;
namespace VideoAnalysisCore.Common.Expand
{
@ -31,6 +32,11 @@ namespace VideoAnalysisCore.Common.Expand
app.GetRequiredService<IHostApplicationLifetime>()
.ApplicationStarted.Register(() =>
{
// 立即上线一次,防止等待 Job 调度周期的空白
var redis = AppCommon.Services.GetService<RedisClient>();
// redis?.SAdd(RedisExpandKey.OnlineDevices, AppCommon.Config.ID); // 移除
redis?.Set(RedisExpandKey.DeviceHeartbeat(AppCommon.Config.ID.ToString()), DateTime.Now.ToString(), 60);
var server = AppCommon.Services.GetRequiredService<IServer>();
var addressFeature = server.Features.Get<IServerAddressesFeature>();
Console.WriteLine("===========================================");
@ -51,6 +57,17 @@ namespace VideoAnalysisCore.Common.Expand
if(action != null)
action();
});
// 注册程序停止时的回调
app.GetRequiredService<IHostApplicationLifetime>()
.ApplicationStopping.Register(() =>
{
// 注销设备下线
var redis = AppCommon.Services.GetService<RedisClient>();
// redis?.SRem(RedisExpandKey.OnlineDevices, AppCommon.Config.ID); // 移除
redis?.Del(RedisExpandKey.DeviceHeartbeat(AppCommon.Config.ID.ToString())); // 立即删除心跳Key
Console.WriteLine($"设备 {AppCommon.Config.ID} 已下线");
});
}
// 跨平台打开浏览器的方法

View File

@ -11,30 +11,37 @@ using VideoAnalysisCore.Model;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System.Collections.Generic;
using System.Security.Cryptography;
using Yitter.IdGenerator;
namespace VideoAnalysisCore.Common.Expand
{
public static class UploadExpand
public static class TidySlideExpand
{
public static void AddUploadExpand(this IServiceCollection services)
public static void AddTidySlideExpand(this IServiceCollection services)
{
services.AddSingleton<UploadHandle>();
services.AddSingleton<TidySlideHandle>();
services.AddTransient<Repository<TidySlideTaskResult>>();
}
}
public class UploadHandle
public class TidySlideHandle
{
private readonly Client _vodClient;
private readonly Repository<VideoTask> _videoTaskDB;
private readonly Repository<TidySlideTaskResult> _tidySlideTaskResultDB;
private readonly RedisManager _redisManager;
private readonly OssClient _ossClient; // 使用系统统一注入的 OSS Client
private readonly TidySlideWorkflowManager _workflowManager; // 注入工作流管理器
public UploadHandle(Client vodClient, Repository<VideoTask> videoTaskDB, RedisManager redisManager, OssClient ossClient)
public TidySlideHandle(Client vodClient, Repository<VideoTask> videoTaskDB, Repository<TidySlideTaskResult> tidySlideTaskResultDB, RedisManager redisManager, OssClient ossClient, TidySlideWorkflowManager workflowManager)
{
_vodClient = vodClient;
_videoTaskDB = videoTaskDB;
_tidySlideTaskResultDB = tidySlideTaskResultDB;
_redisManager = redisManager;
_ossClient = ossClient;
_workflowManager = workflowManager;
}
public async Task RunAsync(string task)
@ -45,7 +52,7 @@ namespace VideoAnalysisCore.Common.Expand
if (!File.Exists(m3u8Path))
{
await _redisManager.AddTaskLog(task, "未找到 m3u8 文件,无法进行切片上传");
await _workflowManager.AddTaskLog(task, "未找到 m3u8 文件,无法进行切片上传");
throw new FileNotFoundException("M3U8文件未找到", m3u8Path);
}
@ -53,13 +60,13 @@ namespace VideoAnalysisCore.Common.Expand
var tsFiles = Directory.GetFiles(localPath, "out*.ts");
if (tsFiles.Length == 0)
{
await _redisManager.AddTaskLog(task, "未找到 ts 切片文件");
await _workflowManager.AddTaskLog(task, "未找到 ts 切片文件");
throw new FileNotFoundException("TS切片文件未找到");
}
var title = $"Task_{taskId}_{DateTime.Now:yyyyMMddHHmmss}";
await _redisManager.AddTaskLog(task, "正在获取VOD上传凭证...");
await _workflowManager.AddTaskLog(task, "正在获取VOD上传凭证...");
// 1. 获取上传凭证和地址
// 注意VOD上传m3u8时FileName必须以 .m3u8 结尾
@ -67,9 +74,9 @@ namespace VideoAnalysisCore.Common.Expand
{
Title = title,
FileName = "out.m3u8", // 必须是 m3u8 文件名
Description = "Video Analysis HLS Upload",
// CoverURL = "...", // 可选:设置封面
// Tags = "...", // 可选:设置标签
Description = "视频分析_PPT清洗 ",
Tags = "PPT清洗", // 可选:设置标签
CateId = 1000709090,
};
var response = await _vodClient.CreateUploadVideoAsync(request);
@ -82,7 +89,7 @@ namespace VideoAnalysisCore.Common.Expand
var uploadAddressStr = response.Body.UploadAddress;
var uploadAuthStr = response.Body.UploadAuth;
await _redisManager.AddTaskLog(task, $"获取凭证成功VideoId: {videoId}");
await _workflowManager.AddTaskLog(task, $"获取凭证成功VideoId: {videoId}");
// 2. 解析凭证 (Base64 -> JSON)
var addressJson = JObject.Parse(Encoding.UTF8.GetString(Convert.FromBase64String(uploadAddressStr)));
@ -90,22 +97,18 @@ namespace VideoAnalysisCore.Common.Expand
var endpoint = addressJson["Endpoint"]?.ToString();
var bucket = addressJson["Bucket"]?.ToString();
var objectName = addressJson["FileName"]?.ToString(); // 这是 VOD 分配的 m3u8 存储路径,例如 "sv/243d.../out.m3u8"
// 这是 VOD 分配的 m3u8 存储路径,例如 "sv/243d.../out.m3u8"
var objectName = addressJson["FileName"]?.ToString();
var accessKeyId = authJson["AccessKeyId"]?.ToString();
var accessKeySecret = authJson["AccessKeySecret"]?.ToString();
var securityToken = authJson["SecurityToken"]?.ToString();
if (string.IsNullOrEmpty(endpoint) || string.IsNullOrEmpty(bucket) || string.IsNullOrEmpty(objectName))
{
throw new Exception("解析上传地址失败");
}
// 修正 Endpoint 格式 (如果缺少协议头)
if (!endpoint.StartsWith("http"))
{
endpoint = "https://" + endpoint;
}
// 3. 构造 OSS 客户端 (使用临时凭证)
var ossClient = new OssClient(endpoint, accessKeyId, accessKeySecret, securityToken);
@ -116,42 +119,54 @@ namespace VideoAnalysisCore.Common.Expand
// 则 prefix = "sv/5903240e-19544975a64/"
var ossPrefix = objectName.Substring(0, objectName.LastIndexOf('/') + 1);
await _redisManager.AddTaskLog(task, $"开始上传文件到 VOD OSS (Bucket: {bucket}, Prefix: {ossPrefix})...");
await _workflowManager.AddTaskLog(task, $"开始上传文件到 VOD OSS (Bucket: {bucket}, Prefix: {ossPrefix})...");
try
{
// A. 上传所有 TS 切片
await _redisManager.AddTaskLog(task, $"开始上传 TS 切片 (共 {tsFiles.Length} 个)...");
foreach (var tsFile in tsFiles)
await _workflowManager.AddTaskLog(task, $"开始上传 TS 切片 (共 {tsFiles.Length} 个)...");
for (int i = 0; i < tsFiles.Length; i++)
{
var tsFile = tsFiles[i];
var fileName = Path.GetFileName(tsFile);
var tsObjectKey = ossPrefix + fileName;
using var fs = File.OpenRead(tsFile);
ossClient.PutObject(bucket, tsObjectKey, fs);
// 更新上传进度
if (i % 5 == 0) // 每5个文件更新一次进度
{
var progress = Math.Round((double)i / tsFiles.Length * 100, 1);
_workflowManager.SetTaskProgress(taskId, $"Upload->{progress}%");
}
}
// B. 上传 m3u8 索引文件
// 必须使用 VOD 指定的 objectName
await _redisManager.AddTaskLog(task, "开始上传 m3u8 索引文件...");
await _workflowManager.AddTaskLog(task, "开始上传 m3u8 索引文件...");
using (var fs = File.OpenRead(m3u8Path))
{
ossClient.PutObject(bucket, objectName, fs);
}
await _redisManager.AddTaskLog(task, "上传成功");
await _workflowManager.AddTaskLog(task, "上传成功");
// 5. 更新数据库
// 对于 VOD 托管视频,我们主要存储 VideoId (TagId),播放地址通常由前端调用 VOD 接口获取
// 或者我们可以尝试获取播放地址存入 MediaUrl
await _videoTaskDB.CopyNew().AsUpdateable()
.SetColumns(it => it.TagId == videoId)
.Where(it => it.Id == taskId)
.ExecuteCommandAsync();
await _tidySlideTaskResultDB.InsertAsync(new TidySlideTaskResult()
{
Id = YitIdHelper.NextId(),
VideoTaskId = long.Parse(task),
VideoId = videoId,
});
}
catch (Exception ex)
{
await _redisManager.AddTaskLog(task, $"上传 VOD OSS 异常: {ex.Message}");
await _workflowManager.AddTaskLog(task, $"上传 VOD OSS 异常: {ex.Message}");
throw;
}
}

View File

@ -42,9 +42,9 @@ namespace VideoAnalysisCore.Common
/// </summary>
public const string ChannelKey = BaseKey + "TaskChannel";
/// <summary>
/// 上传工作流 Channel key
/// TidySlide 工作流 Channel key
/// </summary>
public const string UploadChannelKey = BaseKey + "UploadTaskChannel";
public const string TidySlideChannelKey = BaseKey + "TidySlideTaskChannel";
/// <summary>
/// 下载文件
/// </summary>
@ -81,6 +81,14 @@ namespace VideoAnalysisCore.Common
/// </summary>
public static string Task(object taskId) => BaseKey + "Info:" + taskId;
public static string IDTask => BaseKey + "Services:" + AppCommon.Config.ID;
/// <summary>
/// 在线设备Key集合 (已弃用,直接扫描 Heartbeat)
/// </summary>
// public static string OnlineDevices => BaseKey + "OnlineDevices";
/// <summary>
/// 设备心跳Key前缀 (VideoAnalysis:Heartbeat:{DeviceId})
/// </summary>
public static string DeviceHeartbeat(string deviceId) => BaseKey + "Heartbeat:" + deviceId;
public static string TaskGPT(object taskId) => BaseKey + "GPTCached:" + taskId;
/// <summary>
/// 初始化 redis
@ -107,7 +115,12 @@ namespace VideoAnalysisCore.Common
service.AddSingleton(redis);
service.AddSingleton<RedisManager>();
service.AddVideoSliceWorkflow();
service.AddUploadWorkflow();
service.AddTidySlideWorkflow();
// 注册心跳 Job
// service.AddTransient<DeviceHeartbeatJob>(); // 迁移到 CoravelExpand 中统一管理
// service.AddTransient<TaskFileClearJob>();
// service.AddTransient<ClearAllCacheJob>();
}
}
@ -117,7 +130,7 @@ namespace VideoAnalysisCore.Common
public RedisInit(IServiceProvider serviceProvider)
{
serviceProvider.GetService<VideoSliceWorkflowInit>();
serviceProvider.GetService<UploadWorkflowInit>();
serviceProvider.GetService<TidySlideWorkflowInit>();
// serviceProvider.GetService<RedisManager>().InitChannel(); // 已废弃,由各工作流自行初始化
}
}
@ -172,155 +185,22 @@ namespace VideoAnalysisCore.Common
tran.Exec();
}
}
/// <summary>
/// 添加日志
/// </summary>
/// <param name="taskId">任务id</param>
/// <param name="msg">内容</param>
public async Task AddTaskLog(object taskId, string msg)
{
#if DEBUG
Console.WriteLine($"{DateTime.Now.ToString("MM-dd HH:mm:ss")} => {taskId} \r\n{msg}\r\n");
#endif
await Redis.RPushAsync(RedisExpandKey.TaskLog,
new TaskLog()
{
VideoTaskId = long.Parse(taskId.ToString()),
CreateTime = DateTime.Now,
Message = msg,
DeviceId = AppCommon.Config.ID
});
var count = 50;
lock (RedisExpandKey.TaskLog)
{
var oldTaskCount = Redis.LLen(RedisExpandKey.TaskLog);
if (oldTaskCount > count)
{
try
{
var insertData = Redis.LRange<TaskLog>(RedisExpandKey.TaskLog, 0, count -1);
taskLogDB.CopyNew().AsInsertable(insertData).ExecuteCommand();
//同步删除redis
Redis.LTrim(RedisExpandKey.TaskLog, count, 1000);
}
catch (Exception ex)
{
Console.WriteLine("写入任务日志出错" + "\r\n" + ex.Message + "\r\n" + ex.StackTrace);
}
}
}
}
// AddTaskLog 已迁移至 WorkflowBase
/// <summary>
/// 获取任务进度
/// </summary>
/// <param name="taskId"></param>
public float GetTaskProgress(object taskId)
/// <param name="workflowName">工作流名称(可选)</param>
public string GetTaskProgress(object taskId, string workflowName = "VideoSliceWorkflow")
{
return Redis.HMGet<float>(RedisExpandKey.Task(taskId), "Progress")[0];
var fieldName = workflowName == "VideoSliceWorkflow" ? "Progress" : $"Progress:{workflowName}";
return Redis.HMGet<string>(RedisExpandKey.Task(taskId), fieldName)[0] ?? "";
}
/// <summary>
/// 设置任务进度
/// </summary>
/// <param name="p">进度百分比</param>
/// <param name="taskId"></param>
public void SetTaskProgress(object taskId, object p)
{
Redis.HMSet(RedisExpandKey.Task(taskId), "Progress", p.ToString());
}
public async Task TaskEnd(string task)
{
var tId = long.Parse(task);
//var gptRes = (await Redis
// .HMGetAsync<TaskRes>(RedisExpandKey.Task(task), "ChatAnalysis")).FirstOrDefault();
//if (gptRes is null)
// throw new Exception("未能读取到GPT处理结果");
//删除任务执行状态
await Redis.LRemAsync(RedisExpandKey.IDTask, 1, task);
var taskData = await videoTaskDB
.CopyNew()
.GetFirstAsync(s => s.Id == tId);
if (taskData.Captions == "[]")
taskData.Captions = (await Redis.HMGetAsync(RedisExpandKey.Task(task), "Captions")).First();
//if (taskData.Speaker == "[]")
// taskData.Speaker = (await Redis.HMGetAsync(RedisExpandKey.Task(task), "Speaker"))?.FirstOrDefault()??"[]";
//未使用结果暂时屏蔽
//taskData.ChatAnalysis = JsonSerializer.Serialize(gptRes);
taskData.ChatAnalysisScore = 0;
taskData.ErrorMessage = string.Empty;
taskData.LastEnum = RedisChannelEnum.;
taskData.EndTime = DateTime.Now;
await videoTaskDB.CopyNew().AsUpdateable(taskData)
.UpdateColumns(it => new
{
//it.ChatAnalysis,
it.Captions,
it.Speaker,
it.ChatAnalysisScore,
it.ErrorMessage,
it.TotalTokens,
it.LastEnum,
it.EndTime
}).ExecuteCommandAsync();
try
{
//await ExpandFunction.DeleteTaskFileAsync(tId, this);
await ExpandFunction.DeleteTaskAllFileAsync(tId, this);
}
catch (Exception)
{
throw;
}
//NewTask();
}
/// <summary>
/// 写入任务异常
/// </summary>
/// <param name="taskID"></param>
/// <param name="ex"></param>
/// <returns></returns>
public async Task<bool> SetTaskErrorMessage(long taskID, Exception? ex)
{
var error = string.Empty;
if (ex != null)
{
await Redis.LRemAsync(RedisExpandKey.IDTask, 1, taskID.ToString());
//执行任务时出现异常
error = ex.Message + ex.StackTrace;
await AddTaskLog(taskID, $""" 出现异常 {ex.Message} {ex.StackTrace} """);
//清除失败任务 重新接收任务
// NewTask(); // 已废弃,工作流会自动处理
}
return await SetTaskError(taskID, error);
}
/// <summary>
/// 清除 任务的错误信息
/// </summary>
/// <param name="taskID"></param>
/// <returns></returns>
public async Task<bool> ClearTaskError(long taskID) => await SetTaskError(taskID, string.Empty);
/// <summary>
/// 修改任务的错误信息
/// </summary>
/// <param name="taskID"></param>
/// <param name="error"></param>
/// <returns></returns>
public async Task<bool> SetTaskError(long taskID, string? error)
{
var vDB = AppCommon.Services.GetService<Repository<VideoTask>>();
Redis.HMSet(RedisExpandKey.Task(taskID), "ErrorMessage", error);
return await vDB.CopyNew().AsUpdateable()
.SetColumns(it => it.ErrorMessage == error)//SetColumns是可以叠加的 写2个就2个字段赋值
.Where(it => it.Id == taskID)
.ExecuteCommandAsync() == 1;
}
// 注意SetTaskProgress, TaskEnd, SetTaskErrorMessage, ClearTaskError, SetTaskError
// 已迁移至 WorkflowBase此处移除或标记为已废弃
}
}

View File

@ -0,0 +1,104 @@
using FreeRedis;
using Microsoft.Extensions.DependencyInjection;
using SqlSugar.IOC;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using VideoAnalysisCore.AICore.FFMPGE;
using VideoAnalysisCore.Common.Expand;
using VideoAnalysisCore.Model;
using VideoAnalysisCore.Model.Enum;
namespace VideoAnalysisCore.Common
{
public static class TidySlideWorkflowExpand
{
public static void AddTidySlideWorkflow(this IServiceCollection services)
{
// 只有在配置启用时才注册
if (AppCommon.Config.Workflow.TidySlide.Enabled)
{
Console.WriteLine($"{DateTime.Now}=>初始化 视频合并工作流(TidySlide)");
services.AddTidySlideExpand(); // Register TidySlideHandle
services.AddSingleton<TidySlideWorkflowManager>();
services.AddSingleton<TidySlideWorkflowInit>();
}
}
}
public class TidySlideWorkflowInit
{
private readonly TidySlideWorkflowManager _manager;
private readonly IServiceProvider _serviceProvider;
public TidySlideWorkflowInit(TidySlideWorkflowManager manager, IServiceProvider serviceProvider)
{
_manager = manager;
_serviceProvider = serviceProvider;
Init();
_manager.InitChannel();
}
public void Init()
{
var SubscribeList = _manager.SubscribeList;
SubscribeList.Add(RedisTidySlideChannelEnum., async (task) => await Task.CompletedTask);
SubscribeList.Add(RedisTidySlideChannelEnum., async (task) =>
{
using var scope = _serviceProvider.CreateScope();
var downloadService = scope.ServiceProvider.GetRequiredService<DownloadFile>();
await downloadService.RunTask(task, "TidySlideWorkflow");
});
SubscribeList.Add(RedisTidySlideChannelEnum., async (task) =>
{
using var scope = _serviceProvider.CreateScope();
var ffmpegService = scope.ServiceProvider.GetRequiredService<FFMPGEHandle>();
await ffmpegService.MergeAndSliceAsync(task);
});
SubscribeList.Add(RedisTidySlideChannelEnum., async (task) =>
{
using var scope = _serviceProvider.CreateScope();
var uploadService = scope.ServiceProvider.GetRequiredService<TidySlideHandle>();
await uploadService.RunAsync(task);
});
SubscribeList.Add(RedisTidySlideChannelEnum., _manager.TaskEnd);
}
}
public class TidySlideWorkflowManager : WorkflowBase<RedisTidySlideChannelEnum>
{
public TidySlideWorkflowManager(RedisClient redis, RedisManager redisManager) : base(redis, redisManager)
{
}
protected override string ChannelKey => RedisExpandKey.TidySlideChannelKey;
protected override int Concurrency => AppCommon.Config.Workflow.TidySlide.Concurrency;
protected override string WorkflowName => "TidySlideWorkflow"; // 显式指定
protected override async Task UpdateTaskStateAsync(string taskId, RedisTidySlideChannelEnum step)
{
// TidySlide 工作流只更新 VideoTaskWorkflow 表,不污染 VideoTask.LastEnum
// 调用基类实现即可
await base.UpdateTaskStateAsync(taskId, step);
}
public override async Task TaskEnd(string task)
{
var tId = long.Parse(task);
await base.TaskEnd(task);
// TidySlide 工作流结束时清理文件
try
{
await ExpandFunction.DeleteTaskAllFileAsync(tId, this);
}
catch (Exception)
{
throw;
}
}
}
}

View File

@ -1,80 +0,0 @@
using FreeRedis;
using Microsoft.Extensions.DependencyInjection;
using SqlSugar.IOC;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using VideoAnalysisCore.AICore.FFMPGE;
using VideoAnalysisCore.Common.Expand;
using VideoAnalysisCore.Model;
using VideoAnalysisCore.Model.Enum;
namespace VideoAnalysisCore.Common
{
public static class UploadWorkflowExpand
{
public static void AddUploadWorkflow(this IServiceCollection services)
{
// 只有在配置启用时才注册
if (AppCommon.Config.Workflow.Upload.Enabled)
{
Console.WriteLine($"{DateTime.Now}=>初始化 视频合并工作流");
services.AddUploadExpand(); // Register UploadHandle
services.AddSingleton<UploadWorkflowManager>();
services.AddSingleton<UploadWorkflowInit>();
}
}
}
public class UploadWorkflowInit
{
private readonly UploadWorkflowManager _manager;
private readonly IServiceProvider _serviceProvider;
public UploadWorkflowInit(UploadWorkflowManager manager, IServiceProvider serviceProvider)
{
_manager = manager;
_serviceProvider = serviceProvider;
Init();
_manager.InitChannel();
}
public void Init()
{
var SubscribeList = _manager.SubscribeList;
SubscribeList.Add(RedisUploadChannelEnum., async (task) => await Task.CompletedTask);
SubscribeList.Add(RedisUploadChannelEnum., async (task) =>
{
using var scope = _serviceProvider.CreateScope();
var downloadService = scope.ServiceProvider.GetRequiredService<DownloadFile>();
await downloadService.RunTask(task);
});
SubscribeList.Add(RedisUploadChannelEnum., async (task) =>
{
using var scope = _serviceProvider.CreateScope();
var ffmpegService = scope.ServiceProvider.GetRequiredService<FFMPGEHandle>();
await ffmpegService.MergeAndSliceAsync(task);
});
SubscribeList.Add(RedisUploadChannelEnum., async (task) =>
{
using var scope = _serviceProvider.CreateScope();
var uploadService = scope.ServiceProvider.GetRequiredService<UploadHandle>();
await uploadService.RunAsync(task);
});
SubscribeList.Add(RedisUploadChannelEnum., _manager.RedisManager.TaskEnd);
}
}
public class UploadWorkflowManager : WorkflowBase<RedisUploadChannelEnum>
{
public UploadWorkflowManager(RedisClient redis, RedisManager redisManager) : base(redis, redisManager)
{
}
protected override string ChannelKey => RedisExpandKey.UploadChannelKey;
protected override int Concurrency => AppCommon.Config.Workflow.Upload.Concurrency;
}
}

View File

@ -39,15 +39,13 @@ namespace VideoAnalysisCore.Common
private readonly IServiceProvider _serviceProvider;
private readonly FFMPGEHandle _ffmpeg;
private readonly SenseVoice _senseVoice;
private readonly RedisManager _redisManager;
public VideoSliceWorkflowInit(VideoSliceWorkflowManager manager, IServiceProvider serviceProvider, FFMPGEHandle ffmpeg, SenseVoice senseVoice, RedisManager redisManager)
public VideoSliceWorkflowInit(VideoSliceWorkflowManager manager, IServiceProvider serviceProvider, FFMPGEHandle ffmpeg, SenseVoice senseVoice)
{
_manager = manager;
_serviceProvider = serviceProvider;
_ffmpeg = ffmpeg;
_senseVoice = senseVoice;
_redisManager = redisManager;
Init();
_manager.InitChannel();
}
@ -61,11 +59,11 @@ namespace VideoAnalysisCore.Common
using var scope = _serviceProvider.CreateScope();
var downloadService = scope.ServiceProvider.GetService<DownloadFile>();
if (downloadService is null) throw new Exception("DownloadFile 未注入");
await downloadService.RunTask(task);
await downloadService.RunTask(task, "VideoSliceWorkflow");
});
SubscribeList.Add(RedisChannelEnum., _ffmpeg.RunAsync);
SubscribeList.Add(RedisChannelEnum., _senseVoice.RunTask);
SubscribeList.Add(RedisChannelEnum.AI课程类型, async (task) =>
{
using var scope = _serviceProvider.CreateScope();
@ -87,10 +85,10 @@ namespace VideoAnalysisCore.Common
if (service is null) throw new Exception("IBserGPT 未注入");
await service.GetVideoQuestion(task);
});
SubscribeList.Add(RedisChannelEnum., _redisManager.TaskEnd);
SubscribeList.Add(RedisChannelEnum., _manager.TaskEnd);
}
}
public class VideoSliceWorkflowManager : WorkflowBase<RedisChannelEnum>
{
public VideoSliceWorkflowManager(RedisClient redis, RedisManager redisManager) : base(redis, redisManager)
@ -99,6 +97,28 @@ namespace VideoAnalysisCore.Common
protected override string ChannelKey => RedisExpandKey.ChannelKey;
protected override int Concurrency => AppCommon.Config.Workflow.Default.Concurrency;
protected override string WorkflowName => "VideoSliceWorkflow"; // 显式指定,避免重构改名风险
/// <summary>
/// 重写状态更新逻辑:保持兼容性,继续更新 VideoTask.LastEnum
/// </summary>
protected override async Task UpdateTaskStateAsync(string taskId, RedisChannelEnum step)
{
var tID = long.Parse(taskId);
// 1. 调用基类方法,更新 VideoTaskWorkflow 表 (可选,如果想双写)
await base.UpdateTaskStateAsync(taskId, step);
// 2. 更新旧的 VideoTask 表,保持前端兼容
using var scope = AppCommon.Services.CreateScope();
var vDB = scope.ServiceProvider.GetService<Repository<VideoTask>>();
if (vDB != null)
{
await vDB.CopyNew().AsUpdateable()
.SetColumns(it => it.LastEnum == step)
.Where(it => it.Id == tID)
.ExecuteCommandAsync();
}
}
protected override async Task HandleSpecialFlowAsync(RedisChannelEnum currentStep, RedisChannelEnum nextStep, string taskId)
{
@ -110,5 +130,47 @@ namespace VideoAnalysisCore.Common
}
await Task.CompletedTask;
}
public override async Task TaskEnd(string task)
{
var tId = long.Parse(task);
await base.TaskEnd(task);
// 原 RedisManager.TaskEnd 逻辑迁移至此
using var scope = AppCommon.Services.CreateScope();
var videoTaskDB = scope.ServiceProvider.GetService<Repository<VideoTask>>();
if (videoTaskDB == null) return;
var taskData = await videoTaskDB.CopyNew().GetFirstAsync(s => s.Id == tId);
if (taskData.Captions == "[]")
taskData.Captions = (await Redis.HMGetAsync(RedisExpandKey.Task(task), "Captions")).First();
taskData.ChatAnalysisScore = 0;
taskData.ErrorMessage = string.Empty;
taskData.LastEnum = RedisChannelEnum.;
taskData.EndTime = DateTime.Now;
await videoTaskDB.CopyNew().AsUpdateable(taskData)
.UpdateColumns(it => new
{
it.Captions,
it.Speaker,
it.ChatAnalysisScore,
it.ErrorMessage,
it.TotalTokens,
it.LastEnum,
it.EndTime
}).ExecuteCommandAsync();
try
{
await ExpandFunction.DeleteTaskAllFileAsync(tId, this);
}
catch (Exception)
{
throw;
}
}
}
}
}

View File

@ -1,5 +1,6 @@
using FreeRedis;
using Microsoft.Extensions.DependencyInjection;
using SqlSugar;
using SqlSugar.IOC;
using System;
using System.Collections.Concurrent;
@ -33,6 +34,12 @@ namespace VideoAnalysisCore.Common
/// </summary>
protected abstract int Concurrency { get; }
/// <summary>
/// 工作流名称 (e.g. "VideoSliceWorkflow")
/// <para>默认通过类名去除 "Manager" 后缀生成,子类可重写</para>
/// </summary>
protected virtual string WorkflowName => this.GetType().Name.Replace("Manager", "");
public WorkflowBase(RedisClient redis, RedisManager redisManager)
{
Redis = redis;
@ -71,15 +78,15 @@ namespace VideoAnalysisCore.Common
{
try
{
await RedisManager.AddTaskLog(oldTask, DateTime.Now.ToString("HH:mm:ss") + $"-------------> 接收上次未完成任务 [{typeof(TEnum).Name}] " + oldTask);
await RedisManager.ClearTaskError(long.Parse(oldTask));
await AddTaskLog(oldTask, DateTime.Now.ToString("HH:mm:ss") + $"-------------> 接收上次未完成任务 [{typeof(TEnum).Name}] " + oldTask, WorkflowName);
await ClearTaskError(long.Parse(oldTask));
var lastEnum = (TEnum)result;
await InsertChannel(lastEnum, oldTask);
}
catch (Exception ex)
{
await RedisManager.SetTaskErrorMessage(long.Parse(oldTask), ex);
await SetTaskErrorMessage(long.Parse(oldTask), ex);
Console.WriteLine($"恢复任务 {oldTask} 失败: {ex.Message}");
}
finally
@ -110,7 +117,6 @@ namespace VideoAnalysisCore.Common
}
else
{
Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + "-------------> 接收新任务!");
ReceivingTaskAsync();
}
}
@ -139,8 +145,6 @@ namespace VideoAnalysisCore.Common
if (!string.IsNullOrEmpty(taskId))
{
Redis.LPush(RedisExpandKey.IDTask, taskId);
await RedisManager.AddTaskLog(taskId, $"==> 接收到任务 [{typeof(TEnum).Name}] ");
// 获取第一个枚举值作为起始步骤
var firstStep = Enum.GetValues(typeof(TEnum)).Cast<TEnum>().FirstOrDefault();
if (Convert.ToInt32(firstStep) == 0) // 跳过排队中
@ -148,6 +152,7 @@ namespace VideoAnalysisCore.Common
var next = firstStep.NextEnum();
if (next.HasValue) firstStep = next.Value;
}
await AddTaskLog(taskId, $"==> 接收到任务 [{typeof(TEnum).Name}] ", WorkflowName);
await InsertChannel(firstStep, taskId);
}
@ -165,7 +170,7 @@ namespace VideoAnalysisCore.Common
public async Task InsertChannel(TEnum @enum, string taskId)
{
await RedisManager.AddTaskLog(taskId, "==> 开始执行任务 ");
await AddTaskLog(taskId, "==> 开始执行任务 ", WorkflowName);
await ProcessTaskFlow(@enum, taskId, taskId);
}
/// <summary>
@ -198,10 +203,9 @@ namespace VideoAnalysisCore.Common
{
if (StopTask)
{
await RedisManager.AddTaskLog(tId, "==> 手动停止任务 ");
await AddTaskLog(tId, "==> 手动停止任务 ", WorkflowName);
return;
}
// 1. 记录步骤开始时间 (需要转换 RedisChannelEnum 才能调用 UpdateStepTimeAsync如果类型不匹配则需要适配)
// 这里简化,暂不记录非主流程的时间,或者需要在 RedisManager 增加泛型支持
// await RedisManager.UpdateStepTimeAsync(taskId, currentStep);
@ -230,7 +234,7 @@ namespace VideoAnalysisCore.Common
}
catch (Exception ex)
{
await RedisManager.SetTaskErrorMessage(long.Parse(tId), ex);
await SetTaskErrorMessage(long.Parse(tId), ex);
}
finally
{
@ -238,39 +242,202 @@ namespace VideoAnalysisCore.Common
}
}
/// <summary>
/// 更新任务状态
/// <para>默认实现:保存到 WorkflowState 表。子类可重写以保存到特定表(如 VideoTask</para>
/// </summary>
/// <param name="taskId">任务ID</param>
/// <param name="step">当前步骤枚举</param>
protected virtual async Task UpdateTaskStateAsync(string taskId, TEnum step)
{
var tID = long.Parse(taskId);
var stepName = step.ToString();
var stepValue = Convert.ToInt32(step);
using var scope = AppCommon.Services.CreateScope();
var db = scope.ServiceProvider.GetService<ISqlSugarClient>();
if (db == null) return;
// 尝试更新或插入 WorkflowState
// 注意:这里假设 VideoTaskWorkflow 表存在且已正确配置
// 如果不想引入新表依赖,也可以在这里留空,由子类实现
try
{
// 使用 upsert 逻辑
var existing = await db.Queryable<VideoTaskWorkflow>()
.FirstAsync(it => it.VideoTaskId == tID && it.WorkflowName == WorkflowName);
if (existing == null)
{
await db.Insertable(new VideoTaskWorkflow
{
Id = Yitter.IdGenerator.YitIdHelper.NextId(),
VideoTaskId = tID,
WorkflowName = WorkflowName,
CurrentStep = stepName,
CurrentStepValue = stepValue,
UpdateTime = DateTime.Now
}).ExecuteCommandAsync();
}
else
{
existing.CurrentStep = stepName;
existing.CurrentStepValue = stepValue;
existing.UpdateTime = DateTime.Now;
await db.Updateable(existing).ExecuteCommandAsync();
}
}
catch (Exception ex)
{
Console.WriteLine($"更新工作流状态失败: {ex.Message}");
}
}
public async Task TouchChannel(TEnum key, string taskId, Func<string, Task> action)
{
var tID = long.Parse(taskId);
await RedisManager.AddTaskLog(taskId, " 开始执行 " + key + " " + taskId);
await AddTaskLog(taskId, " 开始执行 " + key + " " + taskId, WorkflowName);
try
{
// 尝试将当前枚举转为 RedisChannelEnum 存储状态,如果无法转换则强转 int
RedisChannelEnum dbEnum;
if (key is RedisChannelEnum rc) dbEnum = rc;
else
{
// 使用 Convert.ToInt32 处理各种枚举基础类型 (int, byte, long 等)
var intValue = Convert.ToInt32(key);
dbEnum = (RedisChannelEnum)intValue;
}
// 更新 Redis 状态 (通用)
Redis.HMSet(RedisExpandKey.Task(taskId), "LastEnum", key.ToString());
Redis.HMSet(RedisExpandKey.Task(taskId), "Progress", 0);
// 使用新的进度设置方法
SetTaskProgress(taskId, 0);
var vDB = AppCommon.Services.GetService<Repository<VideoTask>>();
await vDB.CopyNew().AsUpdateable()
.SetColumns(it => it.LastEnum == dbEnum)
.Where(it => it.Id == tID)
.ExecuteCommandAsync();
// 调用状态更新逻辑 (由子类决定存储位置)
await UpdateTaskStateAsync(taskId, key);
await action(taskId);
}
catch (Exception ex)
{
await RedisManager.AddTaskLog(taskId, $""" 出现异常 {ex.Message} {ex.StackTrace} """);
await AddTaskLog(taskId, $""" 出现异常 {ex.Message} {ex.StackTrace} """);
throw;
}
}
/// <summary>
/// 设置任务进度
/// </summary>
/// <param name="taskId"></param>
/// <param name="p">进度百分比</param>
public void SetTaskProgress(object taskId, object p)
{
var fieldName = WorkflowName == "VideoSliceWorkflow" ? "Progress" : $"Progress:{WorkflowName}";
Redis.HMSet(RedisExpandKey.Task(taskId), fieldName, p.ToString());
}
/// <summary>
/// 任务结束处理
/// </summary>
/// <param name="taskId"></param>
public virtual async Task TaskEnd(string taskId)
{
var tId = long.Parse(taskId);
//删除任务执行状态
await Redis.LRemAsync(RedisExpandKey.IDTask, 1, taskId);
// 更新 VideoTaskWorkflow 表状态为结束
// 注意:这里假设结束状态对应的枚举值为 100或者子类重写此方法
// 由于泛型限制,无法直接获取 TEnum 的结束值,建议子类重写或在此处做通用处理
// 这里仅做基础清理,具体业务逻辑建议在子类重写
}
/// <summary>
/// 写入任务异常
/// </summary>
/// <param name="taskID"></param>
/// <param name="ex"></param>
/// <returns></returns>
public async Task<bool> SetTaskErrorMessage(long taskID, Exception? ex)
{
var error = string.Empty;
if (ex != null)
{
await Redis.LRemAsync(RedisExpandKey.IDTask, 1, taskID.ToString());
//执行任务时出现异常
error = ex.Message + ex.StackTrace;
await AddTaskLog(taskID, $""" 出现异常 {ex.Message} {ex.StackTrace} """);
}
return await SetTaskError(taskID, error);
}
/// <summary>
/// 清除 任务的错误信息
/// </summary>
/// <param name="taskID"></param>
/// <returns></returns>
public async Task<bool> ClearTaskError(long taskID) => await SetTaskError(taskID, string.Empty);
/// <summary>
/// 修改任务的错误信息
/// </summary>
/// <param name="taskID"></param>
/// <param name="error"></param>
/// <returns></returns>
public async Task<bool> SetTaskError(long taskID, string? error)
{
using var scope = AppCommon.Services.CreateScope();
var vDB = scope.ServiceProvider.GetService<Repository<VideoTask>>();
if (vDB == null) return false;
Redis.HMSet(RedisExpandKey.Task(taskID), "ErrorMessage", error);
// 同时更新 VideoTaskWorkflow 表的错误信息(如果需要)
return await vDB.CopyNew().AsUpdateable()
.SetColumns(it => it.ErrorMessage == error)
.Where(it => it.Id == taskID)
.ExecuteCommandAsync() == 1;
}
/// <summary>
/// 添加日志
/// </summary>
/// <param name="taskId">任务id</param>
/// <param name="msg">内容</param>
/// <param name="workflowName">工作流名称(可选,默认为当前工作流)</param>
public async Task AddTaskLog(object taskId, string msg, string? workflowName = null)
{
var wfName = workflowName ?? WorkflowName;
#if DEBUG
Console.WriteLine($"{DateTime.Now.ToString("MM-dd HH:mm:ss")} => {taskId} [{wfName}] \r\n{msg}\r\n");
#endif
await Redis.RPushAsync(RedisExpandKey.TaskLog,
new TaskLog()
{
VideoTaskId = long.Parse(taskId.ToString()),
CreateTime = DateTime.Now,
Message = msg,
DeviceId = AppCommon.Config.ID,
WorkflowName = wfName
});
var count = 50;
lock (RedisExpandKey.TaskLog)
{
var oldTaskCount = Redis.LLen(RedisExpandKey.TaskLog);
if (oldTaskCount > count)
{
try
{
using var scope = AppCommon.Services.CreateScope();
var taskLogDB = scope.ServiceProvider.GetService<Repository<TaskLog>>();
if (taskLogDB != null)
{
var insertData = Redis.LRange<TaskLog>(RedisExpandKey.TaskLog, 0, count - 1);
taskLogDB.CopyNew().AsInsertable(insertData).ExecuteCommand();
//同步删除redis
Redis.LTrim(RedisExpandKey.TaskLog, count, 1000);
}
}
catch (Exception ex)
{
Console.WriteLine("写入任务日志出错" + "\r\n" + ex.Message + "\r\n" + ex.StackTrace);
}
}
}
}
/// <summary>
/// 异步流程
/// </summary>

View File

@ -1,4 +1,4 @@

using FFmpeg.NET.Services;
using MapsterMapper;
using Microsoft.AspNetCore.Authorization;
@ -25,6 +25,14 @@ using Yitter.IdGenerator;
namespace VideoAnalysisCore.Controllers
{
public class RunningTaskListReq : QueryRequestBase
{
/// <summary>
/// 设备ID (可选,若为空则默认获取当前节点)
/// </summary>
public string? DeviceId { get; set; }
}
/// <summary>
/// 路由菜单
/// </summary>
@ -39,7 +47,7 @@ namespace VideoAnalysisCore.Controllers
readonly RedisManager redisManager;
readonly UploadWorkflowManager uploadWorkflowManager;
readonly TidySlideWorkflowManager tidySlideWorkflowManager;
readonly VideoSliceWorkflowManager videoSliceWorkflowManager;
public readonly SenseVoice senseVoice;
@ -48,7 +56,7 @@ namespace VideoAnalysisCore.Controllers
private readonly IMapper mp;
public VideoTaskController(Repository<VideoTask> baseService, RedisManager redisManager,
Repository<VideoQuestion> videoQuestionDB,
Repository<VideoQuestionKonw> videoQuestionKonwDB, Repository<VideoKonwPoint> videoKonwPointDB, SenseVoice senseVoice, IMapper mp, Repository<TaskLog> taskLogDB, FunASRNano funASRNano, Repository<VideoTaskStage> videoTaskStageDB, UploadWorkflowManager uploadWorkflowManager, VideoSliceWorkflowManager videoSliceWorkflowManager) : base(baseService)
Repository<VideoQuestionKonw> videoQuestionKonwDB, Repository<VideoKonwPoint> videoKonwPointDB, SenseVoice senseVoice, IMapper mp, Repository<TaskLog> taskLogDB, FunASRNano funASRNano, Repository<VideoTaskStage> videoTaskStageDB, TidySlideWorkflowManager tidySlideWorkflowManager, VideoSliceWorkflowManager videoSliceWorkflowManager) : base(baseService)
{
this.baseService = baseService;
this.redisManager = redisManager;
@ -60,7 +68,7 @@ namespace VideoAnalysisCore.Controllers
this.taskLogDB = taskLogDB;
this.funASRNano = funASRNano;
this.videoTaskStageDB = videoTaskStageDB;
this.uploadWorkflowManager = uploadWorkflowManager;
this.tidySlideWorkflowManager = tidySlideWorkflowManager;
this.videoSliceWorkflowManager = videoSliceWorkflowManager;
}
@ -88,6 +96,7 @@ namespace VideoAnalysisCore.Controllers
/// <param name="url">文件流</param>
/// <returns></returns>
[HttpGet(Name = "InitDbTable")]
[AllowAnonymous]
public IActionResult InitDbTable()
{
var b = AppCommon.Config.DB.UpdateTable;
@ -318,14 +327,30 @@ namespace VideoAnalysisCore.Controllers
[HttpGet]
public async Task ReStart(long id, RedisChannelEnum selectEnum)
{
await redisManager.AddTaskLog(id,"手动重试任务");
await redisManager.ClearTaskError(id);
await videoSliceWorkflowManager.AddTaskLog(id, "手动重试任务");
await videoSliceWorkflowManager.ClearTaskError(id);
_ = Task.Run(async () =>
await videoSliceWorkflowManager.InsertChannel(selectEnum, id.ToString())
);
}
/// <summary>
/// 重试任务 (TidySlide 工作流)
/// </summary>
/// <param name="id">任务id</param>
/// <param name="selectEnum">任务类型</param>
/// <returns></returns>
[HttpGet]
public async Task ReStartTidySlide(long id, RedisTidySlideChannelEnum selectEnum)
{
await tidySlideWorkflowManager.AddTaskLog(id, "手动重试 TidySlide 任务");
await tidySlideWorkflowManager.ClearTaskError(id);
_ = Task.Run(async () =>
await tidySlideWorkflowManager.InsertChannel(selectEnum, id.ToString())
);
}
/// <summary>
/// 刷新数据
/// </summary>
@ -337,7 +362,7 @@ namespace VideoAnalysisCore.Controllers
if (id == 0)
return BadRequest("无效id");
var d = await redisManager.Redis.HMGetAsync<string>(RedisExpandKey.Task(id),
"Progress", "LastEnum", "StartTime", "ErrorMessage");
"Progress", "LastEnum", "StartTime", "ErrorMessage", "Progress:TidySlideWorkflow"); // 获取所有可能的进度字段
var logArr = await taskLogDB.AsQueryable()
.Where(s => s.VideoTaskId == id)
@ -347,17 +372,23 @@ namespace VideoAnalysisCore.Controllers
.Where(s => s.VideoTaskId == id);
logArr = logArr.Concat(insertData).ToArray();
// 获取所有相关工作流的状态
var workflows = await baseService.Context.Queryable<VideoTaskWorkflow>()
.Where(w => w.VideoTaskId == id)
.ToListAsync();
return Ok(new
{
Progress = d[0],
TidySlideProgress = d[4], // 返回 TidySlideWorkflow 的进度
LastEnum = d[1]?.ToEnum<RedisChannelEnum>().ToString(),
StartTime = d[2] != null
? JsonSerializer.Deserialize<Dictionary<RedisChannelEnum, DateTime>>(d[2])
: null,
ErrorMessage = d[3],
Logs = logArr,
Workflows = workflows // 返回工作流列表
});
}
@ -424,15 +455,57 @@ namespace VideoAnalysisCore.Controllers
}
/// <summary>
/// 获取在线设备列表
/// </summary>
/// <returns></returns>
[HttpGet]
public IActionResult OnlineDevices()
{
// 扫描 Heartbeat Key
var keys = redisManager.Redis.Keys(RedisExpandKey.DeviceHeartbeat("*"));
var prefix = RedisExpandKey.DeviceHeartbeat("");
var devices = keys.Select(k => k.Replace(prefix, "")).ToList();
return Ok(devices);
}
/// <summary>
/// 执行中的任务
/// </summary>
/// <param name="model">查询模型</param>
/// <returns></returns>
[HttpPost]
public async Task<object> RunningTaskList([FromBody] QueryRequestBase model)
public async Task<object> RunningTaskList([FromBody] RunningTaskListReq model)
{
var oldTaskArr = redisManager.Redis.LRange<long>(RedisExpandKey.IDTask, 0, 999);
List<long> oldTaskArr;
if (string.IsNullOrEmpty(model.DeviceId))
{
// 默认获取当前节点
oldTaskArr = redisManager.Redis.LRange<long>(RedisExpandKey.IDTask, 0, 999).ToList();
}
else if (model.DeviceId == "all")
{
// 获取所有在线节点
oldTaskArr = new List<long>();
// 直接扫描 Heartbeat Key 获取在线设备
var keys = redisManager.Redis.Keys(RedisExpandKey.DeviceHeartbeat("*"));
var prefix = RedisExpandKey.DeviceHeartbeat("");
var onlineDevices = keys.Select(k => k.Replace(prefix, "")).ToList();
foreach (var deviceId in onlineDevices)
{
var key = RedisExpandKey.BaseKey + "Services:" + deviceId;
var tasks = redisManager.Redis.LRange<long>(key, 0, 999);
oldTaskArr.AddRange(tasks);
}
}
else
{
// 获取指定节点
var key = RedisExpandKey.BaseKey + "Services:" + model.DeviceId;
oldTaskArr = redisManager.Redis.LRange<long>(key, 0, 999).ToList();
}
var sqlquery = base.BaseQuery(model)
.Where(s => oldTaskArr.Contains(s.Id))
.Select(s => new VideoTask
@ -497,10 +570,25 @@ namespace VideoAnalysisCore.Controllers
.Where(s=>s.VideoTaskId == id);
return logArr.Concat(insertData);
}
/// <summary>
/// 预览 TidySlide 任务结果
/// </summary>
/// <param name="id">任务id</param>
/// <returns></returns>
[HttpGet]
public async Task<object> ShowTidySlideTaskInfo(long id)
{
var db = baseService.Context;
var result = await db.Queryable<TidySlideTaskResult>()
.Where(s => s.VideoTaskId == id)
.FirstAsync();
if (result == null)
return BadRequest("未找到 TidySlide 任务结果");
return Ok(result);
}
}
}

View File

@ -0,0 +1,37 @@
using Coravel.Invocable;
using FreeRedis;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Threading.Tasks;
using VideoAnalysisCore.Common;
namespace VideoAnalysisCore.Job
{
/// <summary>
/// 设备心跳上报任务
/// </summary>
public class DeviceHeartbeatJob : IInvocable
{
public Task Invoke()
{
try
{
var redis = AppCommon.Services.GetService<RedisClient>();
if (redis == null) return Task.CompletedTask;
var deviceId = AppCommon.Config.ID.ToString();
// 1. 发送心跳 (设置一个带过期时间的Key)
// 只有当程序正常运行时这个Key才会不断被续期
// 过期时间设为 60秒Job每30秒执行一次
redis.Set(RedisExpandKey.DeviceHeartbeat(deviceId), DateTime.Now.ToString(), 60);
}
catch (Exception ex)
{
Console.WriteLine($"心跳任务异常: {ex.Message}");
}
return Task.CompletedTask;
}
}
}

View File

@ -24,12 +24,12 @@ namespace VideoAnalysisCore.Job
public class TaskFileClearJob : IInvocable
{
private readonly Repository<VideoTask> videotaskDB;
private readonly RedisManager redisManager;
private readonly VideoSliceWorkflowManager _videoSliceWorkflowManager;
public TaskFileClearJob(Repository<VideoTask> videotaskDB, RedisManager redisManager)
public TaskFileClearJob(Repository<VideoTask> videotaskDB, VideoSliceWorkflowManager videoSliceWorkflowManager)
{
this.videotaskDB = videotaskDB;
this.redisManager = redisManager;
_videoSliceWorkflowManager = videoSliceWorkflowManager;
}
public async Task DeleteTaskAllCachesAsync()
{
@ -54,7 +54,7 @@ namespace VideoAnalysisCore.Job
// 遍历查询结果,删除缓存文件
foreach (var taskId in completedTasks)
await ExpandFunction.DeleteTaskAllFileAsync(taskId, redisManager);
await ExpandFunction.DeleteTaskAllFileAsync(taskId, _videoSliceWorkflowManager);
}
public async Task DeleteTaskVideoCachesAsync()
{
@ -80,7 +80,7 @@ namespace VideoAnalysisCore.Job
// 遍历查询结果,删除缓存文件
foreach (var taskId in completedTasks)
await ExpandFunction.DeleteTaskFileAsync(taskId, redisManager);
await ExpandFunction.DeleteTaskFileAsync(taskId, _videoSliceWorkflowManager);
}
catch (Exception ex)
{

View File

@ -7,9 +7,9 @@ using System.Threading.Tasks;
namespace VideoAnalysisCore.Model.Enum
{
/// <summary>
/// 上传工作流 Redis 频道枚举
/// TidySlide 工作流 Redis 频道枚举
/// </summary>
public enum RedisUploadChannelEnum
public enum RedisTidySlideChannelEnum
{
/// <summary>
/// 排队中

View File

@ -1,4 +1,4 @@
using SqlSugar;
using SqlSugar;
using System.ComponentModel.DataAnnotations;
using System.Net;
using System.Text.Json;
@ -40,5 +40,10 @@ namespace VideoAnalysisCore.Model
/// </summary>
[SugarColumn(IsNullable = true)]
public int? DeviceId { get; set; }
/// <summary>
/// 工作流名称
/// </summary>
[SugarColumn(Length = 50, IsNullable = true)]
public string? WorkflowName { get; set; }
}
}

View File

@ -0,0 +1,38 @@
using SqlSugar;
using System;
using VideoAnalysisCore.Model.Interface;
namespace VideoAnalysisCore.Model
{
/// <summary>
/// 整理PPT视频切片任务结果表
/// </summary>
[SugarTable("tidyslidetaskresult")]
public class TidySlideTaskResult : IDB
{
[SugarColumn(IsPrimaryKey = true)]
public long Id { get; set; }
/// <summary>
/// 关联的任务ID
/// </summary>
public long VideoTaskId { get; set; }
/// <summary>
/// 视频ID (VOD VideoId)
/// </summary>
[SugarColumn(Length = 100)]
public string VideoId { get; set; } = string.Empty;
/// <summary>
/// 媒体路径 (OSS 地址或播放地址)
/// </summary>
[SugarColumn(Length = 500, IsNullable = true)]
public string? MediaUrl { get; set; }
/// <summary>
/// 创建时间
/// </summary>
public DateTime CreateTime { get; set; } = DateTime.Now;
}
}

View File

@ -0,0 +1,51 @@
using SqlSugar;
using System;
using VideoAnalysisCore.Model.Interface;
namespace VideoAnalysisCore.Model
{
/// <summary>
/// 任务工作流状态表
/// <para>用于记录单个任务在不同工作流中的执行状态</para>
/// </summary>
[SugarTable("videotask_workflow")]
public class VideoTaskWorkflow : IDB
{
[SugarColumn(IsPrimaryKey = true)]
public long Id { get; set; }
/// <summary>
/// 关联的任务ID
/// </summary>
public long VideoTaskId { get; set; }
/// <summary>
/// 工作流名称 (e.g. "VideoSlice", "Upload")
/// </summary>
[SugarColumn(Length = 50)]
public string WorkflowName { get; set; } = string.Empty;
/// <summary>
/// 当前步骤 (枚举的字符串表示)
/// </summary>
[SugarColumn(Length = 50)]
public string CurrentStep { get; set; } = string.Empty;
/// <summary>
/// 当前步骤 (枚举的整数值)
/// </summary>
public int CurrentStepValue { get; set; }
/// <summary>
/// 状态信息/错误信息
/// </summary>
[SugarColumn(Length = 500, IsNullable = true)]
public string? Message { get; set; }
/// <summary>
/// 更新时间
/// </summary>
public DateTime UpdateTime { get; set; } = DateTime.Now;
}
}