diff --git a/models/bots/run_job_list.go b/models/bots/run_job_list.go index 3685248982..c9841377ab 100644 --- a/models/bots/run_job_list.go +++ b/models/bots/run_job_list.go @@ -16,12 +16,16 @@ type RunJobList []*RunJob type FindRunJobOptions struct { db.ListOptions + RunID int64 Statuses []Status StartedBefore timeutil.TimeStamp } func (opts FindRunJobOptions) toConds() builder.Cond { cond := builder.NewCond() + if opts.RunID > 0 { + cond = cond.And(builder.Eq{"run_id": opts.RunID}) + } if len(opts.Statuses) > 0 { cond = cond.And(builder.In("status", opts.Statuses)) } diff --git a/models/bots/status.go b/models/bots/status.go index bddbc17449..5f8e66179c 100644 --- a/models/bots/status.go +++ b/models/bots/status.go @@ -52,6 +52,15 @@ func (s Status) IsRunning() bool { return s == StatusRunning } +func (s Status) In(statuses ...Status) bool { + for _, v := range statuses { + if s == v { + return true + } + } + return false +} + var statusNames = map[Status]string{ StatusUnknown: "unknown", StatusWaiting: "waiting", @@ -60,4 +69,5 @@ var statusNames = map[Status]string{ StatusFailure: "failure", StatusCancelled: "cancelled", StatusSkipped: "skipped", + StatusBlocked: "blocked", } diff --git a/models/bots/task.go b/models/bots/task.go index edb4fa9d9c..e1853a7449 100644 --- a/models/bots/task.go +++ b/models/bots/task.go @@ -16,7 +16,6 @@ import ( auth_model "code.gitea.io/gitea/models/auth" "code.gitea.io/gitea/models/db" "code.gitea.io/gitea/modules/base" - "code.gitea.io/gitea/modules/bots" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/util" @@ -292,7 +291,7 @@ func CreateTaskForRunner(ctx context.Context, runner *Runner) (*Task, bool, erro return nil, false, err } - task.LogFilename = bots.LogFileName(job.Run.Repo.FullName(), task.ID) + task.LogFilename = logFileName(job.Run.Repo.FullName(), task.ID) if _, err := e.ID(task.ID).Cols("log_filename").Update(task); err != nil { return nil, false, err } @@ -483,3 +482,7 @@ func convertTimestamp(timestamp *timestamppb.Timestamp) timeutil.TimeStamp { } return timeutil.TimeStamp(timestamp.AsTime().Unix()) } + +func logFileName(repoFullName string, taskID int64) string { + return fmt.Sprintf("%s/%02x/%d.log", repoFullName, taskID%256, taskID) +} diff --git a/modules/bots/job_emitter.go b/modules/bots/job_emitter.go new file mode 100644 index 0000000000..91c8c9f43a --- /dev/null +++ b/modules/bots/job_emitter.go @@ -0,0 +1,140 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package bots + +import ( + "context" + "fmt" + + bots_model "code.gitea.io/gitea/models/bots" + "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/queue" + "xorm.io/builder" +) + +var jobEmitterQueue queue.UniqueQueue + +type jobUpdate struct { + RunID int64 +} + +func InitJobEmitter() { + jobEmitterQueue = queue.CreateUniqueQueue("bots_ready_job", jobEmitterQueueHandle, new(jobUpdate)) + go graceful.GetManager().RunWithShutdownFns(jobEmitterQueue.Run) +} + +func EmitJobsIfReady(runID int64) error { + return jobEmitterQueue.Push(&jobUpdate{ + RunID: runID, + }) +} + +func jobEmitterQueueHandle(data ...queue.Data) []queue.Data { + ctx := graceful.GetManager().ShutdownContext() + var ret []queue.Data + for _, d := range data { + update := d.(*jobUpdate) + if err := checkJobsOfRun(ctx, update.RunID); err != nil { + ret = append(ret, d) + } + } + return ret +} + +func checkJobsOfRun(ctx context.Context, runID int64) error { + return db.WithTx(func(ctx context.Context) error { + jobs, _, err := bots_model.FindRunJobs(ctx, bots_model.FindRunJobOptions{RunID: runID}) + if err != nil { + return err + } + idToJobs := make(map[string][]*bots_model.RunJob, len(jobs)) + for _, job := range jobs { + idToJobs[job.JobID] = append(idToJobs[job.JobID], job) + } + + updates := newJobStatusResolver(jobs).Resolve() + for _, job := range jobs { + if status, ok := updates[job.ID]; ok { + job.Status = status + if n, err := bots_model.UpdateRunJob(ctx, job, builder.Eq{"status": bots_model.StatusBlocked}, "status"); err != nil { + return err + } else if n != 1 { + return fmt.Errorf("no affected for updating blocked job %v", job.ID) + } + } + } + return nil + }, ctx) +} + +type jobStatusResolver struct { + statuses map[int64]bots_model.Status + needs map[int64][]int64 +} + +func newJobStatusResolver(jobs bots_model.RunJobList) *jobStatusResolver { + idToJobs := make(map[string][]*bots_model.RunJob, len(jobs)) + for _, job := range jobs { + idToJobs[job.JobID] = append(idToJobs[job.JobID], job) + } + + statuses := make(map[int64]bots_model.Status, len(jobs)) + needs := make(map[int64][]int64, len(jobs)) + for _, job := range jobs { + statuses[job.ID] = job.Status + for _, need := range job.Needs { + for _, v := range idToJobs[need] { + needs[job.ID] = append(needs[job.ID], v.ID) + } + } + } + return &jobStatusResolver{ + statuses: statuses, + needs: needs, + } +} + +func (r *jobStatusResolver) Resolve() map[int64]bots_model.Status { + ret := map[int64]bots_model.Status{} + for i := 0; i < len(r.statuses); i++ { + updated := r.resolve() + if len(updated) == 0 { + return ret + } + for k, v := range updated { + ret[k] = v + r.statuses[k] = v + } + } + return ret +} + +func (r *jobStatusResolver) resolve() map[int64]bots_model.Status { + ret := map[int64]bots_model.Status{} + for id, status := range r.statuses { + if status != bots_model.StatusBlocked { + continue + } + allDone, allSucceed := true, true + for _, need := range r.needs[id] { + needStatus := r.statuses[need] + if !needStatus.IsDone() { + allDone = false + } + if needStatus.In(bots_model.StatusFailure, bots_model.StatusCancelled, bots_model.StatusSkipped) { + allSucceed = false + } + } + if allDone { + if allSucceed { + ret[id] = bots_model.StatusWaiting + } else { + ret[id] = bots_model.StatusSkipped + } + } + } + return ret +} diff --git a/modules/bots/job_emitter_test.go b/modules/bots/job_emitter_test.go new file mode 100644 index 0000000000..52d7d5d6df --- /dev/null +++ b/modules/bots/job_emitter_test.go @@ -0,0 +1,81 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package bots + +import ( + "testing" + + bots_model "code.gitea.io/gitea/models/bots" + + "github.com/stretchr/testify/assert" +) + +func Test_jobStatusResolver_Resolve(t *testing.T) { + tests := []struct { + name string + jobs bots_model.RunJobList + want map[int64]bots_model.Status + }{ + { + name: "no blocked", + jobs: bots_model.RunJobList{ + {ID: 1, JobID: "1", Status: bots_model.StatusWaiting, Needs: []string{}}, + {ID: 2, JobID: "2", Status: bots_model.StatusWaiting, Needs: []string{}}, + {ID: 3, JobID: "3", Status: bots_model.StatusWaiting, Needs: []string{}}, + }, + want: map[int64]bots_model.Status{}, + }, + { + name: "single blocked", + jobs: bots_model.RunJobList{ + {ID: 1, JobID: "1", Status: bots_model.StatusSuccess, Needs: []string{}}, + {ID: 2, JobID: "2", Status: bots_model.StatusBlocked, Needs: []string{"1"}}, + {ID: 3, JobID: "3", Status: bots_model.StatusWaiting, Needs: []string{}}, + }, + want: map[int64]bots_model.Status{ + 2: bots_model.StatusWaiting, + }, + }, + { + name: "multiple blocked", + jobs: bots_model.RunJobList{ + {ID: 1, JobID: "1", Status: bots_model.StatusSuccess, Needs: []string{}}, + {ID: 2, JobID: "2", Status: bots_model.StatusBlocked, Needs: []string{"1"}}, + {ID: 3, JobID: "3", Status: bots_model.StatusBlocked, Needs: []string{"1"}}, + }, + want: map[int64]bots_model.Status{ + 2: bots_model.StatusWaiting, + 3: bots_model.StatusWaiting, + }, + }, + { + name: "chain blocked", + jobs: bots_model.RunJobList{ + {ID: 1, JobID: "1", Status: bots_model.StatusFailure, Needs: []string{}}, + {ID: 2, JobID: "2", Status: bots_model.StatusBlocked, Needs: []string{"1"}}, + {ID: 3, JobID: "3", Status: bots_model.StatusBlocked, Needs: []string{"2"}}, + }, + want: map[int64]bots_model.Status{ + 2: bots_model.StatusSkipped, + 3: bots_model.StatusSkipped, + }, + }, + { + name: "loop need", + jobs: bots_model.RunJobList{ + {ID: 1, JobID: "1", Status: bots_model.StatusBlocked, Needs: []string{"3"}}, + {ID: 2, JobID: "2", Status: bots_model.StatusBlocked, Needs: []string{"1"}}, + {ID: 3, JobID: "3", Status: bots_model.StatusBlocked, Needs: []string{"2"}}, + }, + want: map[int64]bots_model.Status{}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := newJobStatusResolver(tt.jobs) + assert.Equal(t, tt.want, r.Resolve()) + }) + } +} diff --git a/modules/bots/log.go b/modules/bots/log.go index cb6a392dd2..3a7497e379 100644 --- a/modules/bots/log.go +++ b/modules/bots/log.go @@ -148,7 +148,3 @@ func ParseLog(in string) (timestamp time.Time, content string, err error) { content = in[index+1:] return } - -func LogFileName(repoFullName string, taskID int64) string { - return fmt.Sprintf("%s/%02x/%d.log", repoFullName, taskID%256, taskID) -} diff --git a/routers/api/bots/runner/runner.go b/routers/api/bots/runner/runner.go index 6e2e8fae7d..ca32d582ac 100644 --- a/routers/api/bots/runner/runner.go +++ b/routers/api/bots/runner/runner.go @@ -151,7 +151,7 @@ func toCommitStatus(status bots_model.Status) api.CommitStatusState { return api.CommitStatusSuccess case bots_model.StatusFailure, bots_model.StatusCancelled, bots_model.StatusSkipped: return api.CommitStatusFailure - case bots_model.StatusWaiting: + case bots_model.StatusWaiting, bots_model.StatusBlocked: return api.CommitStatusPending case bots_model.StatusRunning: return api.CommitStatusRunning @@ -200,13 +200,19 @@ func (s *Service) UpdateTask( Description: "", Context: task.Job.Name, CreatorID: payload.Pusher.ID, - State: toCommitStatus(bots_model.Status(req.Msg.State.Result)), + State: toCommitStatus(task.Job.Status), }, }); err != nil { log.Error("Update commit status failed: %v", err) } } + if req.Msg.State.Result != runnerv1.Result_RESULT_UNSPECIFIED { + if err := bots.EmitJobsIfReady(task.Job.RunID); err != nil { + log.Error("Emit ready jobs of run %d: %v", task.Job.RunID, err) + } + } + return res, nil } diff --git a/routers/init.go b/routers/init.go index 65bb0fc6c3..03e151f727 100644 --- a/routers/init.go +++ b/routers/init.go @@ -11,6 +11,7 @@ import ( "code.gitea.io/gitea/models" asymkey_model "code.gitea.io/gitea/models/asymkey" + "code.gitea.io/gitea/modules/bots" "code.gitea.io/gitea/modules/cache" "code.gitea.io/gitea/modules/eventsource" "code.gitea.io/gitea/modules/git" @@ -174,6 +175,8 @@ func GlobalInitInstalled(ctx context.Context) { auth.Init() svg.Init() + bots.InitJobEmitter() + // Finally start up the cron cron.NewContext(ctx) }