用Go实现一个跨语言的定期任务调度器(4)

完整目录

这一篇介绍后端的实现

这一段代码为创建任务

type PeriodicTaskFrontend struct {
	Name     string `json:"pTaskName"`
	Cron     string `json:"pTaskCron"`
	Git      string `json:"pTaskGit"`
	Lang     string `json:"pTaskLang"`
	Entry    string `json:"pTaskEntry"`
	Env      string `json:"pTaskEnv"`
	MailList string `json:"pTaskMailList"`
	Describe string `json:"pTaskDescribe"`
}

func (tr *Transition) CreatePeriodicTask(u *models.Users, data map[string]interface{}, suggestion string, t *models.Ticket) error {
	//这里的data就是用户在前端表单里提交的数据
	dataBytes, _ := json.Marshal(data)
	var err error
	taskInfo := PeriodicTaskFrontend{}

	//解析为结构体
	if err = json.Unmarshal(dataBytes, &taskInfo); err != nil {
		return err
	}
	encodeEnvBytes := util.AesEncrypt([]byte(taskInfo.Env))

	//创建任务结构体
	task := models.PeriodicTask{
		Name:     taskInfo.Name,
		Describe: taskInfo.Describe,
		Status:   global.PtaskRunning,
		Args:     encodeEnvBytes,
		Git:      taskInfo.Git,
		Entry:    taskInfo.Entry,
		Lang:     taskInfo.Lang,
		Cron:     taskInfo.Cron,
		MailList: taskInfo.MailList,
		UserID:   t.CreateUser,
	}

	//拿到数据库事务, 然后保存任务
	dbTR := service.CreateTransaction()
	if err = service.TransactionCreate(dbTR, &task); err != nil {
		global.Logger.Errorf("保存定期任务数据到数据库失败:%v", err)
		return err
	}

	//把日志放到redis里
	redisKey := code.RPeriodicTaskLog + strconv.Itoa(task.ID) + ":" + strconv.Itoa(int(time.Now().Unix()))
	if err = service.Set(redisKey, t.User.Name+"创建了任务", code.SixMonths); err != nil {
		global.Logger.Errorf("保存定期任务日志到redis失败:%v", err)
		return err
	}

	//创建任务执行日志, 这里记录是谁创建了任务
	taskHistory := models.PeriodicTaskHistory{
		PeriodicTask: task,
		Result:       global.PtaskRSuccess,
		Output:       redisKey,
		Executor:     t.User.Name,
		Time:         models.FormartTime(time.Now()),
	}
	
	//保存到数据库
	if err = service.TransactionCreate(dbTR, &taskHistory); err != nil {
		global.Logger.Errorf("保存定期任务历史到数据库失败:%v", err)
		return err
	}
	if err = dbTR.Commit().Error; err != nil {
		global.Logger.Errorf("保存定期任务事务到数据库失败:%v", err)
		return err
	}
	
	//然后开始运行容器
	go docker.RunContainer(task.ID, code.AUTO)
	return nil
}

然后是运行容器的代码

package docker

const (
	pythonImage    = "infra-auto-python:v0.0.1"
	goImage        = "infra-auto-go:v0.0.1"
	pTaskMailTitle = "%s定期任务%s执行%s!时间:%s"
)

var (
	dockerTimeMount = []string{"/etc/localtime:/etc/localtime:ro", "/etc/timezone:/etc/timezone:ro"}
)

// 给定一个定期任务id, 开始循环
func RunContainer(taskID int, executor string) {
	var expr *cronexpr.Expression
	var err error
	//死循环
	for {
		//从数据库查出定期任务数据
		preload := models.PeriodicTask{}
		if err = service.Query(&preload, "id", strconv.Itoa(taskID), nil, false, false); err != nil {
			global.Logger.Errorf("从数据库查询定期任务%s失败:%v", taskID, err)
			break
		}
		if preload.Status == global.PtaskDeleted {
			break
		}
		decodeBytes, err := util.AesDecrypt(preload.Args)
		preload.Args = string(decodeBytes)
		//解析任务的cron表达式
		if expr, err = cronexpr.Parse(preload.Cron); err != nil {
			global.Logger.Errorf("解析定期任务%s的cron表达式失败:%v", preload.Name, err)
			break
		}

		//分析出任务距离下一次执行还需要多久
		interval := time.Until(expr.Next(time.Now()))
		global.Logger.Infof("开始执行定时任务%s, 需要等待%.2f秒", preload.Name, interval.Seconds())

		//睡眠到下一次执行
		if executor == code.AUTO {
			t := time.NewTimer(interval)
			<-t.C
		}
		if preload.Status == global.PtaskStoped {
			continue
		}

		//把任务的args转化为字典
		env := make([]string, 0)
		envMap := map[string]string{}
		global.Logger.Infof("开始创建%s定期任务的容器", preload.Name)
		if err = json.Unmarshal([]byte(preload.Args), &envMap); err != nil {
			errMsg := fmt.Errorf("反序列化预载体参数失败:%v", err)
			global.Logger.Errorf(errMsg.Error())
			break
		}

		//把任务的环境变量转成数组
		for k, v := range envMap {
			env = append(env, fmt.Sprintf("%s=%s", k, v))
		}
		env = append(env, "HTTP_PROXY=http://proxy-internet-azure-cn.dktapp.cloud:3128")
		env = append(env, "HTTPS_PROXY=http://proxy-internet-azure-cn.dktapp.cloud:3128")

		start := time.Now()
		//创建容器
		var containerID string
		containerID, err = createAndStartContainer(env, preload)
		if err != nil {
			errMsg := fmt.Errorf("创建定期任务%s的容器并启动容器失败:%v", preload.Name, err)
			global.Logger.Errorf(errMsg.Error())
			break
		}
		global.Logger.Infof("创建%s定期任务的容器成功", preload.Name)

		//等待容器退出, 然后删除它
		waitForContainerExit(containerID, preload, start, executor)
		if executor != code.AUTO {
			break
		}
	}
}

// 创建并启动容器
func createAndStartContainer(env []string, preload models.PeriodicTask) (string, error) {
	//分析用户选择的任务语言是什么, 决定使用什么镜像及容器
	var cmd []string
	var imageName string
	var containerHostConfig *container.HostConfig
	repoName := strings.Split(strings.Split(preload.Git, "/")[1], ".")[0]
	switch preload.Lang {
	case "Python":
		cmd = []string{"/bin/sh", "-c"}
		//pyCMD := fmt.Sprintf("git clone %s &> /dev/null && cd %s && pip install -r requirements &> /dev/null && python %s", preload.Git, repoName, preload.Entry)
		pyCMD := fmt.Sprintf("git clone %s  &> /dev/null && cd %s && pip install -r requirements --proxy=\"http://proxy-internet-azure-cn.dktapp.cloud:3128\" &> /dev/null && python %s", preload.Git, repoName, preload.Entry)
		cmd = append(cmd, pyCMD)
		imageName = pythonImage
		containerHostConfig = &container.HostConfig{Binds: dockerTimeMount}
	case "Golang":
		cmd = []string{"/bin/bash", "-c"}
		goCMD := fmt.Sprintf("git clone %s &> /dev/null && cd %s  && go mod tidy  &> /dev/null && go run main.go", preload.Git, repoName)
		cmd = append(cmd, goCMD)
		imageName = goImage
		containerHostConfig = &container.HostConfig{}
	default:
		return "", code.InvalidParams
	}

	//配置容器的配置
	containerConfig := &container.Config{
		Image: imageName,
		Env:   env,
		Cmd:   cmd,
	}

	//创建容器
	response, err := global.DockerCli.ContainerCreate(global.Ctx, containerConfig, containerHostConfig, nil, nil, "")
	if err != nil {
		return "", fmt.Errorf("创建容器失败:%v", err)
	}

	//启动
	if err := global.DockerCli.ContainerStart(global.Ctx, response.ID, types.ContainerStartOptions{}); err != nil {
		return "", fmt.Errorf("启动容器失败:%v", err)
	}

	return response.ID, nil
}

// 读取容器的输出
func printContainerLogs(containerID string) (string, error) {
	out, err := global.DockerCli.ContainerLogs(global.Ctx, containerID, types.ContainerLogsOptions{ShowStdout: true, ShowStderr: true})
	if err != nil {
		return "", fmt.Errorf("读取容器日志失败:%v", err)
	}
	defer func(out io.ReadCloser) {
		err := out.Close()
		if err != nil {
			global.Logger.Errorf("关闭容器%s的out失败", containerID)
		}
	}(out)
	var buf bytes.Buffer
	if _, err = stdcopy.StdCopy(&buf, &buf, out); err != nil {
		global.Logger.Errorf("读取容器的输出为utf8失败:%v", err)
		return "", err
	}
	logs := buf.String()
	return logs, nil
}

func waitForContainerExit(containerID string, preload models.PeriodicTask, start time.Time, executor string) {
	// 等待容器退出
	var err error
	var subject, logs string
	defer func() {
		duration := time.Since(start).Seconds()
		var durationStr string
		if duration < 60 {
			durationStr = fmt.Sprintf("%.2f秒", duration)
		} else if duration > 60*60 {
			durationStr = fmt.Sprintf("%.2f小时", duration/float64(60*60))
		} else if duration > 60 {
			durationStr = fmt.Sprintf("%.2f分钟", duration/float64(60))
		}
		redisKey := code.RPeriodicTaskLog + strconv.Itoa(preload.ID) + ":" + strconv.Itoa(int(time.Now().Unix()))
		if _err := service.Set(redisKey, logs, code.SixMonths); _err != nil {
			global.Logger.Errorf("保存定期任务日志到redis失败:%v", err)
			return
		}
		//初始化任务历史
		taskHistory := models.PeriodicTaskHistory{
			PeriodicTask: preload,
			Result:       global.PtaskRSuccess,
			Output:       redisKey,
			Executor:     executor,
			Time:         models.FormartTime(start),
			Duration:     durationStr,
		}
		var taskStatus string
		//err不为nil 说明任务执行失败了
		if err != nil {
			taskHistory.Result = global.PtaskRFailed
			subject = fmt.Sprintf(pTaskMailTitle, code.InfraAutoMailPrefix, preload.Name, "失败", time.Now().Format(time.DateTime))
			global.Logger.Errorf("定期任务%s执行失败:%v", preload.Name, err)
			taskStatus = "失败"
		} else {
			//为nil说明任务执行成功
			subject = fmt.Sprintf(pTaskMailTitle, code.InfraAutoMailPrefix, preload.Name, "成功", time.Now().Format(time.DateTime))
			global.Logger.Infof("定期任务%s执行成功", preload.Name)
			taskStatus = "成功"
		}
		//创建任务历史
		if err = service.Create(&taskHistory); err != nil {
			global.Logger.Errorf("创建定期任务%s的任务历史失败:%v", preload.Name, err)
		}

		//如果任务的通知列表不为空, 发邮件通知
		if preload.MailList != "" {
			mailList := strings.Split(preload.MailList, ";")

			content := global.MailContent{To: mailList,
				Subject: subject,
				Body:    fmt.Sprintf(templateHtml, preload.Name, executor, start.Format(time.DateTime), time.Since(start).Seconds(), taskStatus, logs),
			}
			global.MailChannel <- content
		}

		//把容器给删了
		if err = DeleteContainer(containerID); err != nil {
			global.Logger.Errorf("删除定期任务%s的容器失败:%v", preload.Name, err)
		}
	}()

	//等待容器退出, 这里会返回2个管道
	statusCh, errCh := global.DockerCli.ContainerWait(global.Ctx, containerID, container.WaitConditionNotRunning)
	select {

	//如果退出失败, 会往这里送东西
	case err = <-errCh:
		if err != nil {
			global.Logger.Errorf("等待定期任务%s的容器退出时发生错误: %v", preload.Name, err)
		}

		//这里如果退出码不为0, 也视为执行失败
	case status := <-statusCh:
		if status.StatusCode != 0 {
			global.Logger.Errorf("定期任务%s的容器退出码不为0:%d", preload.Name, status.StatusCode)
			err = code.ContainerExitCodeNot0
		}
	}
	//读取容器的输出
	var _err error
	if logs, _err = printContainerLogs(containerID); _err != nil {
		global.Logger.Errorf("读取定期任务容器%s日志失败:%v", preload.Name, err)
	}

}

func DeleteContainer(containerID string) error {
	// 删除容器
	if err := global.DockerCli.ContainerRemove(global.Ctx, containerID, types.ContainerRemoveOptions{}); err != nil {
		return err
	}
	return nil
}

这里的实现思路是

  • 开一个死循环
  • 每一次使用任务id去数据库读取到任务信息
  • 解析cron表达式, 如果是自动执行, 那么睡眠到cron表达式执行的执行时间(这里的好处是, 当前端点击立即执行时, 我们可以重复使用这个函数, 如果不为自动执行, 执行完成后, 就会break跳出循环)
  • 把敏感数据解密出来
  • 根据不同的语言, 使用不同的镜像来创建容器, 并且容器的命令也不相同
  • 克隆用户的git仓库, 进入项目目录, 执行入口文件, 然后等待容器退出
  • 读取到容器的日志, 然后删除容器
  • 创建任务执行历史

这里有几个点可以讨论一下

  • 是否需要每次去数据库里查出任务?
    • 虽然会有资源上的浪费, 但是好处是, 当用户在前端更新环境变量的json时, 我们可以每次都使用最新的环境变量
    • 当用户更改任务状态时,我们也可以在任务下一次执行时直接拿到任务状态, 进行相应操作
  • 是否每次任务执行后需要删除容器?
    • 这里有缺点, 就是需要每一次都git clone用户的git仓库, 然后安装依赖, 再执行, 经过测试, python大概需要花费10秒来构建容器, go大概需要28秒左右来构建容器
    • 但是好处是, 当用户更新代码或依赖时, 我们不需要做任何额外处理
    • 所以当你的任务为小时级别的执行时, 我觉得30秒以内的开销是可以容忍的


评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注