背景
有一个需求是这样的,前端需要通过http请求的form-data上传图片文件,后端接收图片后调用AI接口执行命令,由于命令执行时间较长,需要持续返回当前任务在全局任务列表中的位置,以便前端即时更新排队信息。
思考
如果直接在gin的请求处理函数中开goroutine,并在goroutine中通过类似c.JSON的方法来返回响应,前端会无法收到响应。原因是当处理函数返回时,Gin 会自动关闭 HTTP 请求的响应,这意味着在处理函数返回后,后台的协程无法再向响应中写入数据,从而导致客户端收不到信息。(不用goroutine也不行,因为持续返回响应涉及到使用for循环,会阻塞通道,阻塞主协程,仍然无法返回响应),使用gin配合websocket解决。
于是解决方案就是,使用gin的处理函数接收图片后,把当前任务的id作为响应返回给前端。然后前端再用当前任务的id建立websocket请求,在websocket请求中,通常不会自动关闭,除非显式地由客户端或服务器关闭,所以就可以在ws的处理函数中开goroutine来持续返回响应。
func WsHandleConnect(s *melody.Session) {
TaskId, _ := s.Get("task_id")
// 将 TaskId 转换为字符串并解析为整数
taskIdStr, _ := TaskId.(string)
taskId, err := strconv.Atoi(taskIdStr)
if err != nil {
sendJSONResponse(s, Res{
Code: 4000,
Msg: "Invalid Task ID",
Data: nil,
})
s.Close()
return
}
var task *model.Task
// 遍历 TaskQueue 查找对应的任务
for _, t := range global.TaskQueue {
if t.ID == taskId {
task = t
break
}
}
if task == nil {
sendJSONResponse(s, Res{
Code: 4000,
Msg: "Task ID not provided",
Data: nil,
})
s.Close()
return
}
logger.Log.Error("task:", task.ID)
go func() {
for {
select {
case result := <-task.Response:
sendJSONResponse(s, Res{
Code: global.SuccessCode,
Msg: result,
Data: nil,
})
s.Close()
return
default:
time.Sleep(3 * time.Second)
if len(global.TaskQueue) == 0 {
sendJSONResponse(s, Res{
Code: global.SuccessCode,
Msg: "Task failed",
Data: nil,
})
s.Close()
return
}
position := service.FindTaskPosition(task.ID)
sendJSONResponse(s, Res{
Code: global.SuccessCode,
Data: position,
})
}
}
}()
}