完整目录
这一篇介绍后端的实现
这一段代码为创建任务
type PeriodicTaskFrontend struct { Name string `json:"pTaskName"` Cron string `json:"pTaskCron"` Git string `json:"pTaskGit"` Lang string `json:"pTaskLang"` Entry string `json:"pTaskEntry"` Env string `json:"pTaskEnv"` MailList string `json:"pTaskMailList"` Describe string `json:"pTaskDescribe"` } func (tr *Transition) CreatePeriodicTask(u *models.Users, data map[string]interface{}, suggestion string, t *models.Ticket) error { //这里的data就是用户在前端表单里提交的数据 dataBytes, _ := json.Marshal(data) var err error taskInfo := PeriodicTaskFrontend{} //解析为结构体 if err = json.Unmarshal(dataBytes, &taskInfo); err != nil { return err } encodeEnvBytes := util.AesEncrypt([]byte(taskInfo.Env)) //创建任务结构体 task := models.PeriodicTask{ Name: taskInfo.Name, Describe: taskInfo.Describe, Status: global.PtaskRunning, Args: encodeEnvBytes, Git: taskInfo.Git, Entry: taskInfo.Entry, Lang: taskInfo.Lang, Cron: taskInfo.Cron, MailList: taskInfo.MailList, UserID: t.CreateUser, } //拿到数据库事务, 然后保存任务 dbTR := service.CreateTransaction() if err = service.TransactionCreate(dbTR, &task); err != nil { global.Logger.Errorf("保存定期任务数据到数据库失败:%v", err) return err } //把日志放到redis里 redisKey := code.RPeriodicTaskLog + strconv.Itoa(task.ID) + ":" + strconv.Itoa(int(time.Now().Unix())) if err = service.Set(redisKey, t.User.Name+"创建了任务", code.SixMonths); err != nil { global.Logger.Errorf("保存定期任务日志到redis失败:%v", err) return err } //创建任务执行日志, 这里记录是谁创建了任务 taskHistory := models.PeriodicTaskHistory{ PeriodicTask: task, Result: global.PtaskRSuccess, Output: redisKey, Executor: t.User.Name, Time: models.FormartTime(time.Now()), } //保存到数据库 if err = service.TransactionCreate(dbTR, &taskHistory); err != nil { global.Logger.Errorf("保存定期任务历史到数据库失败:%v", err) return err } if err = dbTR.Commit().Error; err != nil { global.Logger.Errorf("保存定期任务事务到数据库失败:%v", err) return err } //然后开始运行容器 go docker.RunContainer(task.ID, code.AUTO) return nil }
然后是运行容器的代码
package docker const ( pythonImage = "infra-auto-python:v0.0.1" goImage = "infra-auto-go:v0.0.1" pTaskMailTitle = "%s定期任务%s执行%s!时间:%s" ) var ( dockerTimeMount = []string{"/etc/localtime:/etc/localtime:ro", "/etc/timezone:/etc/timezone:ro"} ) // 给定一个定期任务id, 开始循环 func RunContainer(taskID int, executor string) { var expr *cronexpr.Expression var err error //死循环 for { //从数据库查出定期任务数据 preload := models.PeriodicTask{} if err = service.Query(&preload, "id", strconv.Itoa(taskID), nil, false, false); err != nil { global.Logger.Errorf("从数据库查询定期任务%s失败:%v", taskID, err) break } if preload.Status == global.PtaskDeleted { break } decodeBytes, err := util.AesDecrypt(preload.Args) preload.Args = string(decodeBytes) //解析任务的cron表达式 if expr, err = cronexpr.Parse(preload.Cron); err != nil { global.Logger.Errorf("解析定期任务%s的cron表达式失败:%v", preload.Name, err) break } //分析出任务距离下一次执行还需要多久 interval := time.Until(expr.Next(time.Now())) global.Logger.Infof("开始执行定时任务%s, 需要等待%.2f秒", preload.Name, interval.Seconds()) //睡眠到下一次执行 if executor == code.AUTO { t := time.NewTimer(interval) <-t.C } if preload.Status == global.PtaskStoped { continue } //把任务的args转化为字典 env := make([]string, 0) envMap := map[string]string{} global.Logger.Infof("开始创建%s定期任务的容器", preload.Name) if err = json.Unmarshal([]byte(preload.Args), &envMap); err != nil { errMsg := fmt.Errorf("反序列化预载体参数失败:%v", err) global.Logger.Errorf(errMsg.Error()) break } //把任务的环境变量转成数组 for k, v := range envMap { env = append(env, fmt.Sprintf("%s=%s", k, v)) } env = append(env, "HTTP_PROXY=http://proxy-internet-azure-cn.dktapp.cloud:3128") env = append(env, "HTTPS_PROXY=http://proxy-internet-azure-cn.dktapp.cloud:3128") start := time.Now() //创建容器 var containerID string containerID, err = createAndStartContainer(env, preload) if err != nil { errMsg := fmt.Errorf("创建定期任务%s的容器并启动容器失败:%v", preload.Name, err) global.Logger.Errorf(errMsg.Error()) break } global.Logger.Infof("创建%s定期任务的容器成功", preload.Name) //等待容器退出, 然后删除它 waitForContainerExit(containerID, preload, start, executor) if executor != code.AUTO { break } } } // 创建并启动容器 func createAndStartContainer(env []string, preload models.PeriodicTask) (string, error) { //分析用户选择的任务语言是什么, 决定使用什么镜像及容器 var cmd []string var imageName string var containerHostConfig *container.HostConfig repoName := strings.Split(strings.Split(preload.Git, "/")[1], ".")[0] switch preload.Lang { case "Python": cmd = []string{"/bin/sh", "-c"} //pyCMD := fmt.Sprintf("git clone %s &> /dev/null && cd %s && pip install -r requirements &> /dev/null && python %s", preload.Git, repoName, preload.Entry) pyCMD := fmt.Sprintf("git clone %s &> /dev/null && cd %s && pip install -r requirements --proxy=\"http://proxy-internet-azure-cn.dktapp.cloud:3128\" &> /dev/null && python %s", preload.Git, repoName, preload.Entry) cmd = append(cmd, pyCMD) imageName = pythonImage containerHostConfig = &container.HostConfig{Binds: dockerTimeMount} case "Golang": cmd = []string{"/bin/bash", "-c"} goCMD := fmt.Sprintf("git clone %s &> /dev/null && cd %s && go mod tidy &> /dev/null && go run main.go", preload.Git, repoName) cmd = append(cmd, goCMD) imageName = goImage containerHostConfig = &container.HostConfig{} default: return "", code.InvalidParams } //配置容器的配置 containerConfig := &container.Config{ Image: imageName, Env: env, Cmd: cmd, } //创建容器 response, err := global.DockerCli.ContainerCreate(global.Ctx, containerConfig, containerHostConfig, nil, nil, "") if err != nil { return "", fmt.Errorf("创建容器失败:%v", err) } //启动 if err := global.DockerCli.ContainerStart(global.Ctx, response.ID, types.ContainerStartOptions{}); err != nil { return "", fmt.Errorf("启动容器失败:%v", err) } return response.ID, nil } // 读取容器的输出 func printContainerLogs(containerID string) (string, error) { out, err := global.DockerCli.ContainerLogs(global.Ctx, containerID, types.ContainerLogsOptions{ShowStdout: true, ShowStderr: true}) if err != nil { return "", fmt.Errorf("读取容器日志失败:%v", err) } defer func(out io.ReadCloser) { err := out.Close() if err != nil { global.Logger.Errorf("关闭容器%s的out失败", containerID) } }(out) var buf bytes.Buffer if _, err = stdcopy.StdCopy(&buf, &buf, out); err != nil { global.Logger.Errorf("读取容器的输出为utf8失败:%v", err) return "", err } logs := buf.String() return logs, nil } func waitForContainerExit(containerID string, preload models.PeriodicTask, start time.Time, executor string) { // 等待容器退出 var err error var subject, logs string defer func() { duration := time.Since(start).Seconds() var durationStr string if duration < 60 { durationStr = fmt.Sprintf("%.2f秒", duration) } else if duration > 60*60 { durationStr = fmt.Sprintf("%.2f小时", duration/float64(60*60)) } else if duration > 60 { durationStr = fmt.Sprintf("%.2f分钟", duration/float64(60)) } redisKey := code.RPeriodicTaskLog + strconv.Itoa(preload.ID) + ":" + strconv.Itoa(int(time.Now().Unix())) if _err := service.Set(redisKey, logs, code.SixMonths); _err != nil { global.Logger.Errorf("保存定期任务日志到redis失败:%v", err) return } //初始化任务历史 taskHistory := models.PeriodicTaskHistory{ PeriodicTask: preload, Result: global.PtaskRSuccess, Output: redisKey, Executor: executor, Time: models.FormartTime(start), Duration: durationStr, } var taskStatus string //err不为nil 说明任务执行失败了 if err != nil { taskHistory.Result = global.PtaskRFailed subject = fmt.Sprintf(pTaskMailTitle, code.InfraAutoMailPrefix, preload.Name, "失败", time.Now().Format(time.DateTime)) global.Logger.Errorf("定期任务%s执行失败:%v", preload.Name, err) taskStatus = "失败" } else { //为nil说明任务执行成功 subject = fmt.Sprintf(pTaskMailTitle, code.InfraAutoMailPrefix, preload.Name, "成功", time.Now().Format(time.DateTime)) global.Logger.Infof("定期任务%s执行成功", preload.Name) taskStatus = "成功" } //创建任务历史 if err = service.Create(&taskHistory); err != nil { global.Logger.Errorf("创建定期任务%s的任务历史失败:%v", preload.Name, err) } //如果任务的通知列表不为空, 发邮件通知 if preload.MailList != "" { mailList := strings.Split(preload.MailList, ";") content := global.MailContent{To: mailList, Subject: subject, Body: fmt.Sprintf(templateHtml, preload.Name, executor, start.Format(time.DateTime), time.Since(start).Seconds(), taskStatus, logs), } global.MailChannel <- content } //把容器给删了 if err = DeleteContainer(containerID); err != nil { global.Logger.Errorf("删除定期任务%s的容器失败:%v", preload.Name, err) } }() //等待容器退出, 这里会返回2个管道 statusCh, errCh := global.DockerCli.ContainerWait(global.Ctx, containerID, container.WaitConditionNotRunning) select { //如果退出失败, 会往这里送东西 case err = <-errCh: if err != nil { global.Logger.Errorf("等待定期任务%s的容器退出时发生错误: %v", preload.Name, err) } //这里如果退出码不为0, 也视为执行失败 case status := <-statusCh: if status.StatusCode != 0 { global.Logger.Errorf("定期任务%s的容器退出码不为0:%d", preload.Name, status.StatusCode) err = code.ContainerExitCodeNot0 } } //读取容器的输出 var _err error if logs, _err = printContainerLogs(containerID); _err != nil { global.Logger.Errorf("读取定期任务容器%s日志失败:%v", preload.Name, err) } } func DeleteContainer(containerID string) error { // 删除容器 if err := global.DockerCli.ContainerRemove(global.Ctx, containerID, types.ContainerRemoveOptions{}); err != nil { return err } return nil }
这里的实现思路是
- 开一个死循环
- 每一次使用任务id去数据库读取到任务信息
- 解析cron表达式, 如果是自动执行, 那么睡眠到cron表达式执行的执行时间(这里的好处是, 当前端点击立即执行时, 我们可以重复使用这个函数, 如果不为自动执行, 执行完成后, 就会break跳出循环)
- 把敏感数据解密出来
- 根据不同的语言, 使用不同的镜像来创建容器, 并且容器的命令也不相同
- 克隆用户的git仓库, 进入项目目录, 执行入口文件, 然后等待容器退出
- 读取到容器的日志, 然后删除容器
- 创建任务执行历史
这里有几个点可以讨论一下
- 是否需要每次去数据库里查出任务?
- 虽然会有资源上的浪费, 但是好处是, 当用户在前端更新环境变量的json时, 我们可以每次都使用最新的环境变量
- 当用户更改任务状态时,我们也可以在任务下一次执行时直接拿到任务状态, 进行相应操作
- 是否每次任务执行后需要删除容器?
- 这里有缺点, 就是需要每一次都git clone用户的git仓库, 然后安装依赖, 再执行, 经过测试, python大概需要花费10秒来构建容器, go大概需要28秒左右来构建容器
- 但是好处是, 当用户更新代码或依赖时, 我们不需要做任何额外处理
- 所以当你的任务为小时级别的执行时, 我觉得30秒以内的开销是可以容忍的
发表回复