From ea0cf8515e25b5c5854c96fbb8f5bfef6a367be7 Mon Sep 17 00:00:00 2001 From: "Bo-Yi.Wu" Date: Sat, 3 Sep 2022 15:58:58 +0800 Subject: [PATCH] chore(gRPC): handle requesut for stage data Signed-off-by: Bo-Yi.Wu --- models/bots/build_stage.go | 36 ++++ models/bots/build_stage_list.go | 7 + routers/api/bots/core/scheduler.go | 24 +++ routers/api/bots/grpc/runner.go | 5 +- routers/api/bots/runner/runner.go | 22 ++- routers/api/bots/scheduler/queue/queue.go | 160 ++++++++++++++++++ routers/api/bots/scheduler/queue/scheduler.go | 37 ++++ 7 files changed, 289 insertions(+), 2 deletions(-) create mode 100644 models/bots/build_stage_list.go create mode 100644 routers/api/bots/core/scheduler.go create mode 100644 routers/api/bots/scheduler/queue/queue.go create mode 100644 routers/api/bots/scheduler/queue/scheduler.go diff --git a/models/bots/build_stage.go b/models/bots/build_stage.go index 52fc0f0492..1f2f15f9bb 100644 --- a/models/bots/build_stage.go +++ b/models/bots/build_stage.go @@ -5,8 +5,12 @@ package bots import ( + "context" + "code.gitea.io/gitea/models/db" "code.gitea.io/gitea/modules/timeutil" + "code.gitea.io/gitea/modules/util" + "xorm.io/builder" ) type BuildStage struct { @@ -16,6 +20,9 @@ type BuildStage struct { Name string Kind string Type string + Machine string + OS string + Arch string Filename string Status BuildStatus Started timeutil.TimeStamp @@ -32,6 +39,35 @@ func init() { db.RegisterModel(new(BuildStage)) } +type FindStageOptions struct { + db.ListOptions + BuildID int64 + IsClosed util.OptionalBool +} + +func (opts FindStageOptions) toConds() builder.Cond { + cond := builder.NewCond() + if opts.BuildID > 0 { + cond = cond.And(builder.Eq{"build_id": opts.BuildID}) + } + if opts.IsClosed.IsTrue() { + cond = cond.And(builder.Expr("status IN (?,?,?,?)", StatusError, StatusFailing, StatusPassing)) + } else if opts.IsClosed.IsFalse() { + cond = cond.And(builder.Expr("status IN (?,?,?)", StatusPending, StatusRunning)) + } + return cond +} + +func FindStages(ctx context.Context, opts FindStageOptions) (BuildStageList, error) { + sess := db.GetEngine(ctx).Where(opts.toConds()) + if opts.ListOptions.PageSize > 0 { + skip, take := opts.GetSkipTake() + sess.Limit(take, skip) + } + var rows []*BuildStage + return rows, sess.Find(&rows) +} + func GetBuildWorkflows(buildID int64) (map[string]map[string]*BuildStage, error) { jobs := make(map[string]map[string]*BuildStage) err := db.GetEngine(db.DefaultContext).Iterate(new(BuildStage), func(idx int, bean interface{}) error { diff --git a/models/bots/build_stage_list.go b/models/bots/build_stage_list.go new file mode 100644 index 0000000000..4137e63725 --- /dev/null +++ b/models/bots/build_stage_list.go @@ -0,0 +1,7 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package bots + +type BuildStageList []*BuildStage diff --git a/routers/api/bots/core/scheduler.go b/routers/api/bots/core/scheduler.go new file mode 100644 index 0000000000..167d2b3b62 --- /dev/null +++ b/routers/api/bots/core/scheduler.go @@ -0,0 +1,24 @@ +package core + +import ( + "context" + + runnerv1 "gitea.com/gitea/proto-go/runner/v1" +) + +type Filter struct { + Kind string + Type string + OS string + Arch string + Kernel string +} + +// Scheduler schedules Build stages for execution. +type Scheduler interface { + // Schedule schedules the stage for execution. + Schedule(context.Context, *runnerv1.Stage) error + + // Request requests the next stage scheduled for execution. + Request(context.Context, Filter) (*runnerv1.Stage, error) +} diff --git a/routers/api/bots/grpc/runner.go b/routers/api/bots/grpc/runner.go index 496820f91a..74c59872a7 100644 --- a/routers/api/bots/grpc/runner.go +++ b/routers/api/bots/grpc/runner.go @@ -8,11 +8,14 @@ import ( "net/http" "code.gitea.io/gitea/routers/api/bots/runner" + "code.gitea.io/gitea/routers/api/bots/scheduler/queue" "gitea.com/gitea/proto-go/runner/v1/runnerv1connect" ) func RunnerRoute() (string, http.Handler) { - runnerService := &runner.Service{} + runnerService := &runner.Service{ + Scheduler: queue.New(), + } return runnerv1connect.NewRunnerServiceHandler( runnerService, diff --git a/routers/api/bots/runner/runner.go b/routers/api/bots/runner/runner.go index 5801211d7f..18a61c22bd 100644 --- a/routers/api/bots/runner/runner.go +++ b/routers/api/bots/runner/runner.go @@ -10,6 +10,7 @@ import ( bots_model "code.gitea.io/gitea/models/bots" "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/routers/api/bots/core" runnerv1 "gitea.com/gitea/proto-go/runner/v1" "gitea.com/gitea/proto-go/runner/v1/runnerv1connect" @@ -17,6 +18,8 @@ import ( ) type Service struct { + Scheduler core.Scheduler + runnerv1connect.UnimplementedRunnerServiceHandler } @@ -64,8 +67,25 @@ func (s *Service) Request( ctx context.Context, req *connect.Request[runnerv1.RequestRequest], ) (*connect.Response[runnerv1.RequestResponse], error) { + log.Debug("manager: request queue item") + + stage, err := s.Scheduler.Request(ctx, core.Filter{ + Kind: req.Msg.Kind, + Type: req.Msg.Type, + OS: req.Msg.Os, + Arch: req.Msg.Arch, + }) + if err != nil && ctx.Err() != nil { + log.Debug("manager: context canceled") + return nil, err + } + if err != nil { + log.Warn("manager: request queue item error") + return nil, err + } + res := connect.NewResponse(&runnerv1.RequestResponse{ - Stage: &runnerv1.Stage{}, + Stage: stage, }) return res, nil } diff --git a/routers/api/bots/scheduler/queue/queue.go b/routers/api/bots/scheduler/queue/queue.go new file mode 100644 index 0000000000..d8ab0c15ae --- /dev/null +++ b/routers/api/bots/scheduler/queue/queue.go @@ -0,0 +1,160 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "context" + "sync" + "time" + + "code.gitea.io/gitea/models/bots" + "code.gitea.io/gitea/routers/api/bots/core" + runnerv1 "gitea.com/gitea/proto-go/runner/v1" +) + +type worker struct { + kind string + typ string + os string + arch string + channel chan *runnerv1.Stage +} + +type queue struct { + sync.Mutex + + ready chan struct{} + paused bool + interval time.Duration + workers map[*worker]struct{} + ctx context.Context +} + +func (q *queue) Schedule(ctx context.Context, stage *runnerv1.Stage) error { + select { + case q.ready <- struct{}{}: + default: + } + return nil +} + +func (q *queue) Request(ctx context.Context, params core.Filter) (*runnerv1.Stage, error) { + w := &worker{ + kind: params.Kind, + typ: params.Type, + os: params.OS, + arch: params.Arch, + channel: make(chan *runnerv1.Stage), + } + q.Lock() + q.workers[w] = struct{}{} + q.Unlock() + + select { + case q.ready <- struct{}{}: + default: + } + + select { + case <-ctx.Done(): + q.Lock() + delete(q.workers, w) + q.Unlock() + return nil, ctx.Err() + case b := <-w.channel: + return b, nil + } +} + +func (q *queue) start() error { + for { + select { + case <-q.ctx.Done(): + return q.ctx.Err() + case <-q.ready: + _ = q.signal(q.ctx) + case <-time.After(q.interval): + _ = q.signal(q.ctx) + } + } +} + +func (q *queue) signal(ctx context.Context) error { + q.Lock() + count := len(q.workers) + pause := q.paused + q.Unlock() + if pause { + return nil + } + if count == 0 { + return nil + } + items, err := bots.FindStages(ctx, bots.FindStageOptions{}) + if err != nil { + return err + } + + q.Lock() + defer q.Unlock() + for _, item := range items { + if item.Status == bots.StatusRunning { + continue + } + if item.Machine != "" { + continue + } + + loop: + for w := range q.workers { + // the worker must match the resource kind and type + if !matchResource(w.kind, w.typ, item.Kind, item.Type) { + continue + } + + if w.os != "" || w.arch != "" { + if w.os != item.OS { + continue + } + if w.arch != item.Arch { + continue + } + } + + stage := &runnerv1.Stage{ + Id: item.ID, + BuildId: item.BuildID, + Name: item.Name, + Kind: item.Name, + Type: item.Type, + Status: string(item.Status), + Started: int64(item.Started), + Stopped: int64(item.Stopped), + } + + w.channel <- stage + delete(q.workers, w) + break loop + } + } + return nil +} + +// matchResource is a helper function that returns +func matchResource(kinda, typea, kindb, typeb string) bool { + if kinda == "" { + kinda = "pipeline" + } + if kindb == "" { + kindb = "pipeline" + } + if typea == "" { + typea = "docker" + } + if typeb == "" { + typeb = "docker" + } + return kinda == kindb && typea == typeb +} diff --git a/routers/api/bots/scheduler/queue/scheduler.go b/routers/api/bots/scheduler/queue/scheduler.go new file mode 100644 index 0000000000..9c3100dca6 --- /dev/null +++ b/routers/api/bots/scheduler/queue/scheduler.go @@ -0,0 +1,37 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "context" + "time" + + "code.gitea.io/gitea/routers/api/bots/core" +) + +type scheduler struct { + *queue +} + +// New creates a new scheduler. +func New() core.Scheduler { + return scheduler{ + queue: newQueue(), + } +} + +// newQueue returns a new Queue backed by the build datastore. +func newQueue() *queue { + q := &queue{ + ready: make(chan struct{}, 1), + workers: map[*worker]struct{}{}, + interval: time.Minute, + ctx: context.Background(), + } + go func() { + _ = q.start() + }() + return q +}