完整目录
这一篇介绍后端的实现
这一段代码为创建任务
type PeriodicTaskFrontend struct {
Name string `json:"pTaskName"`
Cron string `json:"pTaskCron"`
Git string `json:"pTaskGit"`
Lang string `json:"pTaskLang"`
Entry string `json:"pTaskEntry"`
Env string `json:"pTaskEnv"`
MailList string `json:"pTaskMailList"`
Describe string `json:"pTaskDescribe"`
}
func (tr *Transition) CreatePeriodicTask(u *models.Users, data map[string]interface{}, suggestion string, t *models.Ticket) error {
//这里的data就是用户在前端表单里提交的数据
dataBytes, _ := json.Marshal(data)
var err error
taskInfo := PeriodicTaskFrontend{}
//解析为结构体
if err = json.Unmarshal(dataBytes, &taskInfo); err != nil {
return err
}
encodeEnvBytes := util.AesEncrypt([]byte(taskInfo.Env))
//创建任务结构体
task := models.PeriodicTask{
Name: taskInfo.Name,
Describe: taskInfo.Describe,
Status: global.PtaskRunning,
Args: encodeEnvBytes,
Git: taskInfo.Git,
Entry: taskInfo.Entry,
Lang: taskInfo.Lang,
Cron: taskInfo.Cron,
MailList: taskInfo.MailList,
UserID: t.CreateUser,
}
//拿到数据库事务, 然后保存任务
dbTR := service.CreateTransaction()
if err = service.TransactionCreate(dbTR, &task); err != nil {
global.Logger.Errorf("保存定期任务数据到数据库失败:%v", err)
return err
}
//把日志放到redis里
redisKey := code.RPeriodicTaskLog + strconv.Itoa(task.ID) + ":" + strconv.Itoa(int(time.Now().Unix()))
if err = service.Set(redisKey, t.User.Name+"创建了任务", code.SixMonths); err != nil {
global.Logger.Errorf("保存定期任务日志到redis失败:%v", err)
return err
}
//创建任务执行日志, 这里记录是谁创建了任务
taskHistory := models.PeriodicTaskHistory{
PeriodicTask: task,
Result: global.PtaskRSuccess,
Output: redisKey,
Executor: t.User.Name,
Time: models.FormartTime(time.Now()),
}
//保存到数据库
if err = service.TransactionCreate(dbTR, &taskHistory); err != nil {
global.Logger.Errorf("保存定期任务历史到数据库失败:%v", err)
return err
}
if err = dbTR.Commit().Error; err != nil {
global.Logger.Errorf("保存定期任务事务到数据库失败:%v", err)
return err
}
//然后开始运行容器
go docker.RunContainer(task.ID, code.AUTO)
return nil
}
然后是运行容器的代码
package docker
const (
pythonImage = "infra-auto-python:v0.0.1"
goImage = "infra-auto-go:v0.0.1"
pTaskMailTitle = "%s定期任务%s执行%s!时间:%s"
)
var (
dockerTimeMount = []string{"/etc/localtime:/etc/localtime:ro", "/etc/timezone:/etc/timezone:ro"}
)
// 给定一个定期任务id, 开始循环
func RunContainer(taskID int, executor string) {
var expr *cronexpr.Expression
var err error
//死循环
for {
//从数据库查出定期任务数据
preload := models.PeriodicTask{}
if err = service.Query(&preload, "id", strconv.Itoa(taskID), nil, false, false); err != nil {
global.Logger.Errorf("从数据库查询定期任务%s失败:%v", taskID, err)
break
}
if preload.Status == global.PtaskDeleted {
break
}
decodeBytes, err := util.AesDecrypt(preload.Args)
preload.Args = string(decodeBytes)
//解析任务的cron表达式
if expr, err = cronexpr.Parse(preload.Cron); err != nil {
global.Logger.Errorf("解析定期任务%s的cron表达式失败:%v", preload.Name, err)
break
}
//分析出任务距离下一次执行还需要多久
interval := time.Until(expr.Next(time.Now()))
global.Logger.Infof("开始执行定时任务%s, 需要等待%.2f秒", preload.Name, interval.Seconds())
//睡眠到下一次执行
if executor == code.AUTO {
t := time.NewTimer(interval)
<-t.C
}
if preload.Status == global.PtaskStoped {
continue
}
//把任务的args转化为字典
env := make([]string, 0)
envMap := map[string]string{}
global.Logger.Infof("开始创建%s定期任务的容器", preload.Name)
if err = json.Unmarshal([]byte(preload.Args), &envMap); err != nil {
errMsg := fmt.Errorf("反序列化预载体参数失败:%v", err)
global.Logger.Errorf(errMsg.Error())
break
}
//把任务的环境变量转成数组
for k, v := range envMap {
env = append(env, fmt.Sprintf("%s=%s", k, v))
}
env = append(env, "HTTP_PROXY=http://proxy-internet-azure-cn.dktapp.cloud:3128")
env = append(env, "HTTPS_PROXY=http://proxy-internet-azure-cn.dktapp.cloud:3128")
start := time.Now()
//创建容器
var containerID string
containerID, err = createAndStartContainer(env, preload)
if err != nil {
errMsg := fmt.Errorf("创建定期任务%s的容器并启动容器失败:%v", preload.Name, err)
global.Logger.Errorf(errMsg.Error())
break
}
global.Logger.Infof("创建%s定期任务的容器成功", preload.Name)
//等待容器退出, 然后删除它
waitForContainerExit(containerID, preload, start, executor)
if executor != code.AUTO {
break
}
}
}
// 创建并启动容器
func createAndStartContainer(env []string, preload models.PeriodicTask) (string, error) {
//分析用户选择的任务语言是什么, 决定使用什么镜像及容器
var cmd []string
var imageName string
var containerHostConfig *container.HostConfig
repoName := strings.Split(strings.Split(preload.Git, "/")[1], ".")[0]
switch preload.Lang {
case "Python":
cmd = []string{"/bin/sh", "-c"}
//pyCMD := fmt.Sprintf("git clone %s &> /dev/null && cd %s && pip install -r requirements &> /dev/null && python %s", preload.Git, repoName, preload.Entry)
pyCMD := fmt.Sprintf("git clone %s &> /dev/null && cd %s && pip install -r requirements --proxy=\"http://proxy-internet-azure-cn.dktapp.cloud:3128\" &> /dev/null && python %s", preload.Git, repoName, preload.Entry)
cmd = append(cmd, pyCMD)
imageName = pythonImage
containerHostConfig = &container.HostConfig{Binds: dockerTimeMount}
case "Golang":
cmd = []string{"/bin/bash", "-c"}
goCMD := fmt.Sprintf("git clone %s &> /dev/null && cd %s && go mod tidy &> /dev/null && go run main.go", preload.Git, repoName)
cmd = append(cmd, goCMD)
imageName = goImage
containerHostConfig = &container.HostConfig{}
default:
return "", code.InvalidParams
}
//配置容器的配置
containerConfig := &container.Config{
Image: imageName,
Env: env,
Cmd: cmd,
}
//创建容器
response, err := global.DockerCli.ContainerCreate(global.Ctx, containerConfig, containerHostConfig, nil, nil, "")
if err != nil {
return "", fmt.Errorf("创建容器失败:%v", err)
}
//启动
if err := global.DockerCli.ContainerStart(global.Ctx, response.ID, types.ContainerStartOptions{}); err != nil {
return "", fmt.Errorf("启动容器失败:%v", err)
}
return response.ID, nil
}
// 读取容器的输出
func printContainerLogs(containerID string) (string, error) {
out, err := global.DockerCli.ContainerLogs(global.Ctx, containerID, types.ContainerLogsOptions{ShowStdout: true, ShowStderr: true})
if err != nil {
return "", fmt.Errorf("读取容器日志失败:%v", err)
}
defer func(out io.ReadCloser) {
err := out.Close()
if err != nil {
global.Logger.Errorf("关闭容器%s的out失败", containerID)
}
}(out)
var buf bytes.Buffer
if _, err = stdcopy.StdCopy(&buf, &buf, out); err != nil {
global.Logger.Errorf("读取容器的输出为utf8失败:%v", err)
return "", err
}
logs := buf.String()
return logs, nil
}
func waitForContainerExit(containerID string, preload models.PeriodicTask, start time.Time, executor string) {
// 等待容器退出
var err error
var subject, logs string
defer func() {
duration := time.Since(start).Seconds()
var durationStr string
if duration < 60 {
durationStr = fmt.Sprintf("%.2f秒", duration)
} else if duration > 60*60 {
durationStr = fmt.Sprintf("%.2f小时", duration/float64(60*60))
} else if duration > 60 {
durationStr = fmt.Sprintf("%.2f分钟", duration/float64(60))
}
redisKey := code.RPeriodicTaskLog + strconv.Itoa(preload.ID) + ":" + strconv.Itoa(int(time.Now().Unix()))
if _err := service.Set(redisKey, logs, code.SixMonths); _err != nil {
global.Logger.Errorf("保存定期任务日志到redis失败:%v", err)
return
}
//初始化任务历史
taskHistory := models.PeriodicTaskHistory{
PeriodicTask: preload,
Result: global.PtaskRSuccess,
Output: redisKey,
Executor: executor,
Time: models.FormartTime(start),
Duration: durationStr,
}
var taskStatus string
//err不为nil 说明任务执行失败了
if err != nil {
taskHistory.Result = global.PtaskRFailed
subject = fmt.Sprintf(pTaskMailTitle, code.InfraAutoMailPrefix, preload.Name, "失败", time.Now().Format(time.DateTime))
global.Logger.Errorf("定期任务%s执行失败:%v", preload.Name, err)
taskStatus = "失败"
} else {
//为nil说明任务执行成功
subject = fmt.Sprintf(pTaskMailTitle, code.InfraAutoMailPrefix, preload.Name, "成功", time.Now().Format(time.DateTime))
global.Logger.Infof("定期任务%s执行成功", preload.Name)
taskStatus = "成功"
}
//创建任务历史
if err = service.Create(&taskHistory); err != nil {
global.Logger.Errorf("创建定期任务%s的任务历史失败:%v", preload.Name, err)
}
//如果任务的通知列表不为空, 发邮件通知
if preload.MailList != "" {
mailList := strings.Split(preload.MailList, ";")
content := global.MailContent{To: mailList,
Subject: subject,
Body: fmt.Sprintf(templateHtml, preload.Name, executor, start.Format(time.DateTime), time.Since(start).Seconds(), taskStatus, logs),
}
global.MailChannel <- content
}
//把容器给删了
if err = DeleteContainer(containerID); err != nil {
global.Logger.Errorf("删除定期任务%s的容器失败:%v", preload.Name, err)
}
}()
//等待容器退出, 这里会返回2个管道
statusCh, errCh := global.DockerCli.ContainerWait(global.Ctx, containerID, container.WaitConditionNotRunning)
select {
//如果退出失败, 会往这里送东西
case err = <-errCh:
if err != nil {
global.Logger.Errorf("等待定期任务%s的容器退出时发生错误: %v", preload.Name, err)
}
//这里如果退出码不为0, 也视为执行失败
case status := <-statusCh:
if status.StatusCode != 0 {
global.Logger.Errorf("定期任务%s的容器退出码不为0:%d", preload.Name, status.StatusCode)
err = code.ContainerExitCodeNot0
}
}
//读取容器的输出
var _err error
if logs, _err = printContainerLogs(containerID); _err != nil {
global.Logger.Errorf("读取定期任务容器%s日志失败:%v", preload.Name, err)
}
}
func DeleteContainer(containerID string) error {
// 删除容器
if err := global.DockerCli.ContainerRemove(global.Ctx, containerID, types.ContainerRemoveOptions{}); err != nil {
return err
}
return nil
}
这里的实现思路是
- 开一个死循环
- 每一次使用任务id去数据库读取到任务信息
- 解析cron表达式, 如果是自动执行, 那么睡眠到cron表达式执行的执行时间(这里的好处是, 当前端点击立即执行时, 我们可以重复使用这个函数, 如果不为自动执行, 执行完成后, 就会break跳出循环)
- 把敏感数据解密出来
- 根据不同的语言, 使用不同的镜像来创建容器, 并且容器的命令也不相同
- 克隆用户的git仓库, 进入项目目录, 执行入口文件, 然后等待容器退出
- 读取到容器的日志, 然后删除容器
- 创建任务执行历史
这里有几个点可以讨论一下
- 是否需要每次去数据库里查出任务?
- 虽然会有资源上的浪费, 但是好处是, 当用户在前端更新环境变量的json时, 我们可以每次都使用最新的环境变量
- 当用户更改任务状态时,我们也可以在任务下一次执行时直接拿到任务状态, 进行相应操作
- 是否每次任务执行后需要删除容器?
- 这里有缺点, 就是需要每一次都git clone用户的git仓库, 然后安装依赖, 再执行, 经过测试, python大概需要花费10秒来构建容器, go大概需要28秒左右来构建容器
- 但是好处是, 当用户更新代码或依赖时, 我们不需要做任何额外处理
- 所以当你的任务为小时级别的执行时, 我觉得30秒以内的开销是可以容忍的
发表回复