1
0
mirror of https://github.com/go-gitea/gitea.git synced 2025-04-18 00:47:48 -04:00

chore(gRPC): handle requesut for stage data

Signed-off-by: Bo-Yi.Wu <appleboy.tw@gmail.com>
This commit is contained in:
Bo-Yi.Wu 2022-09-03 15:58:58 +08:00 committed by Jason Song
parent eac6425b2f
commit ea0cf8515e
7 changed files with 289 additions and 2 deletions

View File

@ -5,8 +5,12 @@
package bots
import (
"context"
"code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/modules/timeutil"
"code.gitea.io/gitea/modules/util"
"xorm.io/builder"
)
type BuildStage struct {
@ -16,6 +20,9 @@ type BuildStage struct {
Name string
Kind string
Type string
Machine string
OS string
Arch string
Filename string
Status BuildStatus
Started timeutil.TimeStamp
@ -32,6 +39,35 @@ func init() {
db.RegisterModel(new(BuildStage))
}
type FindStageOptions struct {
db.ListOptions
BuildID int64
IsClosed util.OptionalBool
}
func (opts FindStageOptions) toConds() builder.Cond {
cond := builder.NewCond()
if opts.BuildID > 0 {
cond = cond.And(builder.Eq{"build_id": opts.BuildID})
}
if opts.IsClosed.IsTrue() {
cond = cond.And(builder.Expr("status IN (?,?,?,?)", StatusError, StatusFailing, StatusPassing))
} else if opts.IsClosed.IsFalse() {
cond = cond.And(builder.Expr("status IN (?,?,?)", StatusPending, StatusRunning))
}
return cond
}
func FindStages(ctx context.Context, opts FindStageOptions) (BuildStageList, error) {
sess := db.GetEngine(ctx).Where(opts.toConds())
if opts.ListOptions.PageSize > 0 {
skip, take := opts.GetSkipTake()
sess.Limit(take, skip)
}
var rows []*BuildStage
return rows, sess.Find(&rows)
}
func GetBuildWorkflows(buildID int64) (map[string]map[string]*BuildStage, error) {
jobs := make(map[string]map[string]*BuildStage)
err := db.GetEngine(db.DefaultContext).Iterate(new(BuildStage), func(idx int, bean interface{}) error {

View File

@ -0,0 +1,7 @@
// Copyright 2022 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package bots
type BuildStageList []*BuildStage

View File

@ -0,0 +1,24 @@
package core
import (
"context"
runnerv1 "gitea.com/gitea/proto-go/runner/v1"
)
type Filter struct {
Kind string
Type string
OS string
Arch string
Kernel string
}
// Scheduler schedules Build stages for execution.
type Scheduler interface {
// Schedule schedules the stage for execution.
Schedule(context.Context, *runnerv1.Stage) error
// Request requests the next stage scheduled for execution.
Request(context.Context, Filter) (*runnerv1.Stage, error)
}

View File

@ -8,11 +8,14 @@ import (
"net/http"
"code.gitea.io/gitea/routers/api/bots/runner"
"code.gitea.io/gitea/routers/api/bots/scheduler/queue"
"gitea.com/gitea/proto-go/runner/v1/runnerv1connect"
)
func RunnerRoute() (string, http.Handler) {
runnerService := &runner.Service{}
runnerService := &runner.Service{
Scheduler: queue.New(),
}
return runnerv1connect.NewRunnerServiceHandler(
runnerService,

View File

@ -10,6 +10,7 @@ import (
bots_model "code.gitea.io/gitea/models/bots"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/routers/api/bots/core"
runnerv1 "gitea.com/gitea/proto-go/runner/v1"
"gitea.com/gitea/proto-go/runner/v1/runnerv1connect"
@ -17,6 +18,8 @@ import (
)
type Service struct {
Scheduler core.Scheduler
runnerv1connect.UnimplementedRunnerServiceHandler
}
@ -64,8 +67,25 @@ func (s *Service) Request(
ctx context.Context,
req *connect.Request[runnerv1.RequestRequest],
) (*connect.Response[runnerv1.RequestResponse], error) {
log.Debug("manager: request queue item")
stage, err := s.Scheduler.Request(ctx, core.Filter{
Kind: req.Msg.Kind,
Type: req.Msg.Type,
OS: req.Msg.Os,
Arch: req.Msg.Arch,
})
if err != nil && ctx.Err() != nil {
log.Debug("manager: context canceled")
return nil, err
}
if err != nil {
log.Warn("manager: request queue item error")
return nil, err
}
res := connect.NewResponse(&runnerv1.RequestResponse{
Stage: &runnerv1.Stage{},
Stage: stage,
})
return res, nil
}

View File

@ -0,0 +1,160 @@
// Copyright 2022 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package queue
import (
"context"
"sync"
"time"
"code.gitea.io/gitea/models/bots"
"code.gitea.io/gitea/routers/api/bots/core"
runnerv1 "gitea.com/gitea/proto-go/runner/v1"
)
type worker struct {
kind string
typ string
os string
arch string
channel chan *runnerv1.Stage
}
type queue struct {
sync.Mutex
ready chan struct{}
paused bool
interval time.Duration
workers map[*worker]struct{}
ctx context.Context
}
func (q *queue) Schedule(ctx context.Context, stage *runnerv1.Stage) error {
select {
case q.ready <- struct{}{}:
default:
}
return nil
}
func (q *queue) Request(ctx context.Context, params core.Filter) (*runnerv1.Stage, error) {
w := &worker{
kind: params.Kind,
typ: params.Type,
os: params.OS,
arch: params.Arch,
channel: make(chan *runnerv1.Stage),
}
q.Lock()
q.workers[w] = struct{}{}
q.Unlock()
select {
case q.ready <- struct{}{}:
default:
}
select {
case <-ctx.Done():
q.Lock()
delete(q.workers, w)
q.Unlock()
return nil, ctx.Err()
case b := <-w.channel:
return b, nil
}
}
func (q *queue) start() error {
for {
select {
case <-q.ctx.Done():
return q.ctx.Err()
case <-q.ready:
_ = q.signal(q.ctx)
case <-time.After(q.interval):
_ = q.signal(q.ctx)
}
}
}
func (q *queue) signal(ctx context.Context) error {
q.Lock()
count := len(q.workers)
pause := q.paused
q.Unlock()
if pause {
return nil
}
if count == 0 {
return nil
}
items, err := bots.FindStages(ctx, bots.FindStageOptions{})
if err != nil {
return err
}
q.Lock()
defer q.Unlock()
for _, item := range items {
if item.Status == bots.StatusRunning {
continue
}
if item.Machine != "" {
continue
}
loop:
for w := range q.workers {
// the worker must match the resource kind and type
if !matchResource(w.kind, w.typ, item.Kind, item.Type) {
continue
}
if w.os != "" || w.arch != "" {
if w.os != item.OS {
continue
}
if w.arch != item.Arch {
continue
}
}
stage := &runnerv1.Stage{
Id: item.ID,
BuildId: item.BuildID,
Name: item.Name,
Kind: item.Name,
Type: item.Type,
Status: string(item.Status),
Started: int64(item.Started),
Stopped: int64(item.Stopped),
}
w.channel <- stage
delete(q.workers, w)
break loop
}
}
return nil
}
// matchResource is a helper function that returns
func matchResource(kinda, typea, kindb, typeb string) bool {
if kinda == "" {
kinda = "pipeline"
}
if kindb == "" {
kindb = "pipeline"
}
if typea == "" {
typea = "docker"
}
if typeb == "" {
typeb = "docker"
}
return kinda == kindb && typea == typeb
}

View File

@ -0,0 +1,37 @@
// Copyright 2022 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package queue
import (
"context"
"time"
"code.gitea.io/gitea/routers/api/bots/core"
)
type scheduler struct {
*queue
}
// New creates a new scheduler.
func New() core.Scheduler {
return scheduler{
queue: newQueue(),
}
}
// newQueue returns a new Queue backed by the build datastore.
func newQueue() *queue {
q := &queue{
ready: make(chan struct{}, 1),
workers: map[*worker]struct{}{},
interval: time.Minute,
ctx: context.Background(),
}
go func() {
_ = q.start()
}()
return q
}