1
0
mirror of https://github.com/go-gitea/gitea.git synced 2025-05-18 00:49:09 -04:00
This commit is contained in:
Lunny Xiao 2022-04-29 20:23:48 +08:00 committed by Jason Song
parent b91167b772
commit 7732392a96
16 changed files with 1801 additions and 0 deletions

108
models/bots/runner.go Normal file
View File

@ -0,0 +1,108 @@
// Copyright 2021 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package bots
import (
"fmt"
"code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/modules/timeutil"
"xorm.io/builder"
)
// ErrRunnerNotExist represents an error for bot runner not exist
type ErrRunnerNotExist struct {
UUID string
}
func (err ErrRunnerNotExist) Error() string {
return fmt.Sprintf("Bot runner [%s] is not exist", err.UUID)
}
// Runner represents runner machines
type Runner struct {
ID int64
UUID string `xorm:"CHAR(36) UNIQUE"`
Name string `xorm:"VARCHAR(32) UNIQUE"`
OS string `xorm:"VARCHAR(16) index"` // the runner running os
Arch string `xorm:"VARCHAR(16) index"` // the runner running architecture
Type string `xorm:"VARCHAR(16)"`
OwnerID int64 `xorm:"index"` // org level runner, 0 means system
RepoID int64 `xorm:"index"` // repo level runner, if orgid also is zero, then it's a global
Description string `xorm:"TEXT"`
Base int // 0 native 1 docker 2 virtual machine
RepoRange string // glob match which repositories could use this runner
Token string
LastOnline timeutil.TimeStamp `xorm:"index"`
Created timeutil.TimeStamp `xorm:"created"`
}
func (Runner) TableName() string {
return "actions_runner"
}
func init() {
db.RegisterModel(&Runner{})
}
type GetRunnerOptions struct {
RepoID int64
OwnerID int64
}
func (opts GetRunnerOptions) toCond() builder.Cond {
cond := builder.NewCond()
if opts.RepoID > 0 {
cond = cond.And(builder.Eq{"repo_id": opts.RepoID})
}
if opts.OwnerID > 0 {
cond = cond.And(builder.Eq{"owner_id": opts.OwnerID})
}
cond = cond.Or(builder.Eq{"repo_id": 0, "owner_id": 0})
return cond
}
// GetUsableRunner returns the usable runner
func GetUsableRunner(opts GetRunnerOptions) (*Runner, error) {
var runner Runner
has, err := db.GetEngine(db.DefaultContext).
Where(opts.toCond()).
Asc("last_online").
Get(&runner)
if err != nil {
return nil, err
}
if !has {
return nil, ErrRunnerNotExist{}
}
return &runner, nil
}
// GetRunnerByUUID returns a bot runner via uuid
func GetRunnerByUUID(uuid string) (*Runner, error) {
var runner Runner
has, err := db.GetEngine(db.DefaultContext).Where("uuid=?", uuid).Get(&runner)
if err != nil {
return nil, err
} else if !has {
return nil, ErrRunnerNotExist{
UUID: uuid,
}
}
return &runner, nil
}
// FindRunnersByRepoID returns all workers for the repository
func FindRunnersByRepoID(repoID int64) ([]*Runner, error) {
var runners []*Runner
err := db.GetEngine(db.DefaultContext).Where("repo_id=? OR repo_id=0", repoID).
Find(&runners)
if err != nil {
return nil, err
}
err = db.GetEngine(db.DefaultContext).Join("INNER", "repository", "repository.owner_id = bot_runner.owner_id").Find(&runners)
return runners, err
}

131
models/bots/task.go Normal file
View File

@ -0,0 +1,131 @@
// Copyright 2022 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package bots
import (
"errors"
"fmt"
"code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/models/webhook"
"code.gitea.io/gitea/modules/timeutil"
"github.com/google/uuid"
)
// TaskStatus represents a task status
type TaskStatus int
// enumerate all the statuses of bot task
const (
TaskPending TaskStatus = iota // wait for assign
TaskAssigned // assigned to a runner
TaskRunning // running
TaskFailed
TaskFinished
TaskCanceled
TaskTimeout
)
// Task represnets bot tasks
type Task struct {
ID int64
UUID string `xorm:"CHAR(36)"`
RepoID int64 `xorm:"index"`
TriggerUserID int64
Ref string
CommitSHA string
Event webhook.HookEventType
Token string // token for this task
Grant string // permissions for this task
EventPayload string `xorm:"LONGTEXT"`
RunnerID int64 `xorm:"index"`
Status TaskStatus `xorm:"index"`
Created timeutil.TimeStamp `xorm:"created"`
StartTime timeutil.TimeStamp
EndTime timeutil.TimeStamp
Updated timeutil.TimeStamp `xorm:"updated"`
}
// TableName represents a bot task
func (Task) TableName() string {
return "actions_task"
}
// InsertTask inserts a bot task
func InsertTask(t *Task) error {
if t.UUID == "" {
t.UUID = uuid.New().String()
}
return db.Insert(db.DefaultContext, t)
}
// UpdateTask updates bot task
func UpdateTask(t *Task, cols ...string) error {
_, err := db.GetEngine(db.DefaultContext).ID(t.ID).Cols(cols...).Update(t)
return err
}
// ErrTaskNotExist represents an error for bot task not exist
type ErrTaskNotExist struct {
UUID string
}
func (err ErrTaskNotExist) Error() string {
return fmt.Sprintf("Bot task [%s] is not exist", err.UUID)
}
// GetTaskByUUID gets bot task by uuid
func GetTaskByUUID(taskUUID string) (*Task, error) {
var task Task
has, err := db.GetEngine(db.DefaultContext).Where("uuid=?", taskUUID).Get(&task)
if err != nil {
return nil, err
} else if !has {
return nil, ErrTaskNotExist{
UUID: taskUUID,
}
}
return &task, nil
}
// GetCurTask return the task for the bot
func GetCurTask(runnerID int64) (*Task, error) {
var tasks []Task
// FIXME: for test, just return all tasks
err := db.GetEngine(db.DefaultContext).Where("status=?", TaskPending).Find(&tasks)
// err := x.Where("runner_id = ?", botID).
// And("status=?", BotTaskPending).
// Find(&tasks)
if err != nil {
return nil, err
}
if len(tasks) == 0 {
return nil, nil
}
return &tasks[0], err
}
// AssignTaskToRunner assign a task to a runner
func AssignTaskToRunner(taskID int64, runnerID int64) error {
cnt, err := db.GetEngine(db.DefaultContext).
Where("runner_id=0").
And("id=?", taskID).
Cols("runner_id").
Update(&Task{
RunnerID: runnerID,
})
if err != nil {
return err
}
if cnt != 1 {
return errors.New("assign faild")
}
return nil
}
type TaskStage struct{}
type StageStep struct{}

52
models/migrations/v216.go Normal file
View File

@ -0,0 +1,52 @@
// Copyright 2022 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package migrations
import (
"code.gitea.io/gitea/modules/timeutil"
"xorm.io/xorm"
)
func addBotTables(x *xorm.Engine) error {
type BotRunner struct {
ID int64
UUID string `xorm:"CHAR(36) UNIQUE"`
Name string `xorm:"VARCHAR(32) UNIQUE"`
OS string `xorm:"VARCHAR(16) index"` // the runner running os
Arch string `xorm:"VARCHAR(16) index"` // the runner running architecture
Type string `xorm:"VARCHAR(16)"`
OwnerID int64 `xorm:"index"` // org level runner, 0 means system
RepoID int64 `xorm:"index"` // repo level runner, if orgid also is zero, then it's a global
Description string `xorm:"TEXT"`
Base int // 0 native 1 docker 2 virtual machine
RepoRange string // glob match which repositories could use this runner
Token string
LastOnline timeutil.TimeStamp
Created timeutil.TimeStamp `xorm:"created"`
}
type BotTask struct {
ID int64
UUID string `xorm:"CHAR(36)"`
RepoID int64 `xorm:"index"`
Type string `xorm:"VARCHAR(16)"` // 0 commit 1 pullrequest
Ref string
CommitSHA string
Event string
Token string // token for this task
Grant string // permissions for this task
EventPayload string `xorm:"LONGTEXT"`
RunnerID int64 `xorm:"index"`
Status int
Content string `xorm:"LONGTEXT"`
Created timeutil.TimeStamp `xorm:"created"`
StartTime timeutil.TimeStamp
EndTime timeutil.TimeStamp
Updated timeutil.TimeStamp `xorm:"updated"`
}
return x.Sync2(new(BotRunner), new(BotTask))
}

View File

@ -0,0 +1,79 @@
package gitea
import (
"fmt"
"io"
"strings"
"gopkg.in/yaml.v3"
)
// ActionRunsUsing is the type of runner for the action
type ActionRunsUsing string
func (a *ActionRunsUsing) UnmarshalYAML(unmarshal func(interface{}) error) error {
var using string
if err := unmarshal(&using); err != nil {
return err
}
// Force input to lowercase for case insensitive comparison
format := ActionRunsUsing(strings.ToLower(using))
switch format {
case ActionRunsUsingNode12, ActionRunsUsingDocker:
*a = format
default:
return fmt.Errorf(fmt.Sprintf("The runs.using key in action.yml must be one of: %v, got %s", []string{
ActionRunsUsingDocker,
ActionRunsUsingNode12,
}, format))
}
return nil
}
const (
// ActionRunsUsingNode12 for running with node12
ActionRunsUsingNode12 = "node12"
// ActionRunsUsingDocker for running with docker
ActionRunsUsingDocker = "docker"
)
// Action describes a metadata file for GitHub actions. The metadata filename must be either action.yml or action.yaml. The data in the metadata file defines the inputs, outputs and main entrypoint for your action.
type Action struct {
Name string `yaml:"name"`
Author string `yaml:"author"`
Description string `yaml:"description"`
Inputs map[string]Input `yaml:"inputs"`
Outputs map[string]Output `yaml:"outputs"`
Runs struct {
Using ActionRunsUsing `yaml:"using"`
Env map[string]string `yaml:"env"`
Main string `yaml:"main"`
Image string `yaml:"image"`
Entrypoint []string `yaml:"entrypoint"`
Args []string `yaml:"args"`
} `yaml:"runs"`
Branding struct {
Color string `yaml:"color"`
Icon string `yaml:"icon"`
} `yaml:"branding"`
}
// Input parameters allow you to specify data that the action expects to use during runtime. GitHub stores input parameters as environment variables. Input ids with uppercase letters are converted to lowercase during runtime. We recommended using lowercase input ids.
type Input struct {
Description string `yaml:"description"`
Required bool `yaml:"required"`
Default string `yaml:"default"`
}
// Output parameters allow you to declare data that an action sets. Actions that run later in a workflow can use the output data set in previously run actions. For example, if you had an action that performed the addition of two inputs (x + y = z), the action could output the sum (z) for other actions to use as an input.
type Output struct {
Description string `yaml:"description"`
}
// ReadAction reads an action from a reader
func ReadAction(in io.Reader) (*Action, error) {
a := new(Action)
err := yaml.NewDecoder(in).Decode(a)
return a, err
}

View File

@ -0,0 +1,60 @@
package gitea
import (
"code.gitea.io/gitea/models/webhook"
"code.gitea.io/gitea/modules/bot/runner"
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/util"
"code.gitea.io/gitea/modules/json"
)
func init() {
runner.RegisterRunnerType(new(GiteaRunner))
}
type GiteaRunner struct {
}
func (gw *GiteaRunner) Name() string {
return "gitea"
}
func (gw *GiteaRunner) Detect(commit *git.Commit, event webhook.HookEventType, ref string) (bool, string, error) {
tree, err := commit.SubTree(".gitea/workflow")
if err != nil {
return false, "", err
}
entries, err := tree.ListEntries()
if err != nil {
return false, "", err
}
var wfs []*Workflow
for _, entry := range entries {
blob := entry.Blob()
rd, err := blob.DataAsync()
if err != nil {
return false, "", err
}
defer rd.Close()
wf, err := ReadWorkflow(rd)
if err != nil {
log.Error("ReadWorkflow file %s failed: %v", entry.Name(), err)
continue
}
// FIXME: we have to convert the event type to github known name
if !util.IsStringInSlice(string(event), wf.On()) {
continue
}
wfs = append(wfs, wf)
}
wfBs, err := json.Marshal(wfs)
if err != nil {
return false, "", err
}
return true, string(wfBs), nil
}

View File

@ -0,0 +1,265 @@
package gitea
import (
"io"
"io/ioutil"
"math"
"os"
"path/filepath"
"sort"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
// WorkflowPlanner contains methods for creating plans
type WorkflowPlanner interface {
PlanEvent(eventName string) *Plan
PlanJob(jobName string) *Plan
GetEvents() []string
}
// Plan contains a list of stages to run in series
type Plan struct {
Stages []*Stage
}
// Stage contains a list of runs to execute in parallel
type Stage struct {
Runs []*Run
}
// Run represents a job from a workflow that needs to be run
type Run struct {
Workflow *Workflow
JobID string
}
func (r *Run) String() string {
jobName := r.Job().Name
if jobName == "" {
jobName = r.JobID
}
return jobName
}
// Job returns the job for this Run
func (r *Run) Job() *Job {
return r.Workflow.GetJob(r.JobID)
}
// NewWorkflowPlanner will load a specific workflow or all workflows from a directory
func NewWorkflowPlanner(path string) (WorkflowPlanner, error) {
fi, err := os.Stat(path)
if err != nil {
return nil, err
}
var files []os.FileInfo
var dirname string
if fi.IsDir() {
log.Debugf("Loading workflows from '%s'", path)
dirname = path
files, err = ioutil.ReadDir(path)
} else {
log.Debugf("Loading workflow '%s'", path)
dirname, err = filepath.Abs(filepath.Dir(path))
files = []os.FileInfo{fi}
}
if err != nil {
return nil, err
}
wp := new(workflowPlanner)
for _, file := range files {
ext := filepath.Ext(file.Name())
if ext == ".yml" || ext == ".yaml" {
f, err := os.Open(filepath.Join(dirname, file.Name()))
if err != nil {
return nil, err
}
log.Debugf("Reading workflow '%s'", f.Name())
workflow, err := ReadWorkflow(f)
if err != nil {
f.Close()
if err == io.EOF {
return nil, errors.WithMessagef(err, "unable to read workflow, %s file is empty", file.Name())
}
return nil, err
}
if workflow.Name == "" {
workflow.Name = file.Name()
}
wp.workflows = append(wp.workflows, workflow)
f.Close()
}
}
return wp, nil
}
type workflowPlanner struct {
workflows []*Workflow
}
// PlanEvent builds a new list of runs to execute in parallel for an event name
func (wp *workflowPlanner) PlanEvent(eventName string) *Plan {
plan := new(Plan)
if len(wp.workflows) == 0 {
log.Debugf("no events found for workflow: %s", eventName)
}
for _, w := range wp.workflows {
for _, e := range w.When().Events {
if e.Type == eventName {
plan.mergeStages(createStages(w, w.GetJobIDs()...))
}
}
}
return plan
}
// PlanJob builds a new run to execute in parallel for a job name
func (wp *workflowPlanner) PlanJob(jobName string) *Plan {
plan := new(Plan)
if len(wp.workflows) == 0 {
log.Debugf("no jobs found for workflow: %s", jobName)
}
for _, w := range wp.workflows {
plan.mergeStages(createStages(w, jobName))
}
return plan
}
// GetEvents gets all the events in the workflows file
func (wp *workflowPlanner) GetEvents() []string {
events := make([]string, 0)
for _, w := range wp.workflows {
found := false
for _, e := range events {
for _, we := range w.When().Events {
if e == we.Type {
found = true
break
}
}
if found {
break
}
}
if !found {
for _, evt := range w.When().Events {
events = append(events, evt.Type)
}
}
}
// sort the list based on depth of dependencies
sort.Slice(events, func(i, j int) bool {
return events[i] < events[j]
})
return events
}
// MaxRunNameLen determines the max name length of all jobs
func (p *Plan) MaxRunNameLen() int {
maxRunNameLen := 0
for _, stage := range p.Stages {
for _, run := range stage.Runs {
runNameLen := len(run.String())
if runNameLen > maxRunNameLen {
maxRunNameLen = runNameLen
}
}
}
return maxRunNameLen
}
// GetJobIDs will get all the job names in the stage
func (s *Stage) GetJobIDs() []string {
names := make([]string, 0)
for _, r := range s.Runs {
names = append(names, r.JobID)
}
return names
}
// Merge stages with existing stages in plan
func (p *Plan) mergeStages(stages []*Stage) {
newStages := make([]*Stage, int(math.Max(float64(len(p.Stages)), float64(len(stages)))))
for i := 0; i < len(newStages); i++ {
newStages[i] = new(Stage)
if i >= len(p.Stages) {
newStages[i].Runs = append(newStages[i].Runs, stages[i].Runs...)
} else if i >= len(stages) {
newStages[i].Runs = append(newStages[i].Runs, p.Stages[i].Runs...)
} else {
newStages[i].Runs = append(newStages[i].Runs, p.Stages[i].Runs...)
newStages[i].Runs = append(newStages[i].Runs, stages[i].Runs...)
}
}
p.Stages = newStages
}
func createStages(w *Workflow, jobIDs ...string) []*Stage {
// first, build a list of all the necessary jobs to run, and their dependencies
jobDependencies := make(map[string][]string)
for len(jobIDs) > 0 {
newJobIDs := make([]string, 0)
for _, jID := range jobIDs {
// make sure we haven't visited this job yet
if _, ok := jobDependencies[jID]; !ok {
if job := w.GetJob(jID); job != nil {
jobDependencies[jID] = job.Needs()
newJobIDs = append(newJobIDs, job.Needs()...)
}
}
}
jobIDs = newJobIDs
}
// next, build an execution graph
stages := make([]*Stage, 0)
for len(jobDependencies) > 0 {
stage := new(Stage)
for jID, jDeps := range jobDependencies {
// make sure all deps are in the graph already
if listInStages(jDeps, stages...) {
stage.Runs = append(stage.Runs, &Run{
Workflow: w,
JobID: jID,
})
delete(jobDependencies, jID)
}
}
if len(stage.Runs) == 0 {
log.Fatalf("Unable to build dependency graph!")
}
stages = append(stages, stage)
}
return stages
}
// return true iff all strings in srcList exist in at least one of the stages
func listInStages(srcList []string, stages ...*Stage) bool {
for _, src := range srcList {
found := false
for _, stage := range stages {
for _, search := range stage.GetJobIDs() {
if src == search {
found = true
}
}
}
if !found {
return false
}
}
return true
}

View File

@ -0,0 +1,377 @@
package gitea
import (
"fmt"
"io"
"reflect"
"regexp"
"strings"
log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v3"
)
// Workflow is the structure of the files in .github/workflows
type Workflow struct {
Name string `yaml:"name"`
RawWhen yaml.Node `yaml:"when"`
Env map[string]string `yaml:"env"`
Jobs map[string]*Job `yaml:"jobs"`
Defaults Defaults `yaml:"defaults"`
}
type Event struct {
Type string
Ref string
}
type When struct {
Events []Event
}
func (w *When) Match(tp string) bool {
for _, evt := range w.Events {
if strings.EqualFold(tp, evt.Type) {
return true
}
}
return false
}
// When events for the workflow
func (w *Workflow) When() *When {
switch w.RawWhen.Kind {
case yaml.ScalarNode:
var val string
err := w.RawWhen.Decode(&val)
if err != nil {
log.Fatal(err)
}
return &When{
Events: []Event{
{
Type: val,
},
},
}
case yaml.SequenceNode:
var vals []string
err := w.RawWhen.Decode(&vals)
if err != nil {
log.Fatal(err)
}
var when When
for _, val := range vals {
when.Events = append(when.Events, Event{
Type: val,
})
}
return &when
case yaml.MappingNode:
var val map[string]interface{}
err := w.RawWhen.Decode(&val)
if err != nil {
log.Fatal(err)
}
var keys []string
for k := range val {
keys = append(keys, k)
}
var when When
for _, val := range keys {
when.Events = append(when.Events, Event{
Type: val,
})
}
return &when
}
return nil
}
// Job is the structure of one job in a workflow
type Job struct {
Name string `yaml:"name"`
RawNeeds yaml.Node `yaml:"needs"`
RawRunsOn yaml.Node `yaml:"runs-on"`
Env map[string]string `yaml:"env"`
If string `yaml:"if"`
Steps []*Step `yaml:"steps"`
TimeoutMinutes int64 `yaml:"timeout-minutes"`
Services map[string]*ContainerSpec `yaml:"services"`
Strategy *Strategy `yaml:"strategy"`
RawContainer yaml.Node `yaml:"container"`
Defaults Defaults `yaml:"defaults"`
}
// Strategy for the job
type Strategy struct {
FailFast bool `yaml:"fail-fast"`
MaxParallel int `yaml:"max-parallel"`
Matrix map[string][]interface{} `yaml:"matrix"`
}
// Default settings that will apply to all steps in the job or workflow
type Defaults struct {
Run RunDefaults `yaml:"run"`
}
// Defaults for all run steps in the job or workflow
type RunDefaults struct {
Shell string `yaml:"shell"`
WorkingDirectory string `yaml:"working-directory"`
}
// Container details for the job
func (j *Job) Container() *ContainerSpec {
var val *ContainerSpec
switch j.RawContainer.Kind {
case yaml.ScalarNode:
val = new(ContainerSpec)
err := j.RawContainer.Decode(&val.Image)
if err != nil {
log.Fatal(err)
}
case yaml.MappingNode:
val = new(ContainerSpec)
err := j.RawContainer.Decode(val)
if err != nil {
log.Fatal(err)
}
}
return val
}
// Needs list for Job
func (j *Job) Needs() []string {
switch j.RawNeeds.Kind {
case yaml.ScalarNode:
var val string
err := j.RawNeeds.Decode(&val)
if err != nil {
log.Fatal(err)
}
return []string{val}
case yaml.SequenceNode:
var val []string
err := j.RawNeeds.Decode(&val)
if err != nil {
log.Fatal(err)
}
return val
}
return nil
}
// RunsOn list for Job
func (j *Job) RunsOn() []string {
switch j.RawRunsOn.Kind {
case yaml.ScalarNode:
var val string
err := j.RawRunsOn.Decode(&val)
if err != nil {
log.Fatal(err)
}
return []string{val}
case yaml.SequenceNode:
var val []string
err := j.RawRunsOn.Decode(&val)
if err != nil {
log.Fatal(err)
}
return val
}
return nil
}
// GetMatrixes returns the matrix cross product
func (j *Job) GetMatrixes() []map[string]interface{} {
matrixes := make([]map[string]interface{}, 0)
/*if j.Strategy != nil {
includes := make([]map[string]interface{}, 0)
for _, v := range j.Strategy.Matrix["include"] {
includes = append(includes, v.(map[string]interface{}))
}
delete(j.Strategy.Matrix, "include")
excludes := make([]map[string]interface{}, 0)
for _, v := range j.Strategy.Matrix["exclude"] {
excludes = append(excludes, v.(map[string]interface{}))
}
delete(j.Strategy.Matrix, "exclude")
matrixProduct := common.CartesianProduct(j.Strategy.Matrix)
MATRIX:
for _, matrix := range matrixProduct {
for _, exclude := range excludes {
if commonKeysMatch(matrix, exclude) {
log.Debugf("Skipping matrix '%v' due to exclude '%v'", matrix, exclude)
continue MATRIX
}
}
matrixes = append(matrixes, matrix)
}
for _, include := range includes {
log.Debugf("Adding include '%v'", include)
matrixes = append(matrixes, include)
}
} else {
matrixes = append(matrixes, make(map[string]interface{}))
}*/
return matrixes
}
func commonKeysMatch(a map[string]interface{}, b map[string]interface{}) bool {
for aKey, aVal := range a {
if bVal, ok := b[aKey]; ok && !reflect.DeepEqual(aVal, bVal) {
return false
}
}
return true
}
// ContainerSpec is the specification of the container to use for the job
type ContainerSpec struct {
Image string `yaml:"image"`
Env map[string]string `yaml:"env"`
Ports []string `yaml:"ports"`
Volumes []string `yaml:"volumes"`
Options string `yaml:"options"`
Entrypoint string
Args string
Name string
Reuse bool
}
// Step is the structure of one step in a job
type Step struct {
ID string `yaml:"id"`
If string `yaml:"if"`
Name string `yaml:"name"`
Uses string `yaml:"uses"`
Run string `yaml:"run"`
WorkingDirectory string `yaml:"working-directory"`
Shell string `yaml:"shell"`
Env map[string]string `yaml:"env"`
With map[string]string `yaml:"with"`
ContinueOnError bool `yaml:"continue-on-error"`
TimeoutMinutes int64 `yaml:"timeout-minutes"`
}
// String gets the name of step
func (s *Step) String() string {
if s.Name != "" {
return s.Name
} else if s.Uses != "" {
return s.Uses
} else if s.Run != "" {
return s.Run
}
return s.ID
}
// GetEnv gets the env for a step
func (s *Step) GetEnv() map[string]string {
rtnEnv := make(map[string]string)
for k, v := range s.Env {
rtnEnv[k] = v
}
for k, v := range s.With {
envKey := regexp.MustCompile("[^A-Z0-9-]").ReplaceAllString(strings.ToUpper(k), "_")
envKey = fmt.Sprintf("INPUT_%s", strings.ToUpper(envKey))
rtnEnv[envKey] = v
}
return rtnEnv
}
// ShellCommand returns the command for the shell
func (s *Step) ShellCommand() string {
shellCommand := ""
switch s.Shell {
case "", "bash":
shellCommand = "bash --noprofile --norc -eo pipefail {0}"
case "pwsh":
shellCommand = "pwsh -command \"& '{0}'\""
case "python":
shellCommand = "python {0}"
case "sh":
shellCommand = "sh -e -c {0}"
case "cmd":
shellCommand = "%ComSpec% /D /E:ON /V:OFF /S /C \"CALL \"{0}\"\""
case "powershell":
shellCommand = "powershell -command \"& '{0}'\""
default:
shellCommand = s.Shell
}
return shellCommand
}
// StepType describes what type of step we are about to run
type StepType int
const (
// StepTypeRun is all steps that have a `run` attribute
StepTypeRun StepType = iota
//StepTypeUsesDockerURL is all steps that have a `uses` that is of the form `docker://...`
StepTypeUsesDockerURL
//StepTypeUsesActionLocal is all steps that have a `uses` that is a local action in a subdirectory
StepTypeUsesActionLocal
//StepTypeUsesActionRemote is all steps that have a `uses` that is a reference to a github repo
StepTypeUsesActionRemote
)
// Type returns the type of the step
func (s *Step) Type() StepType {
if s.Run != "" {
return StepTypeRun
} else if strings.HasPrefix(s.Uses, "docker://") {
return StepTypeUsesDockerURL
} else if strings.HasPrefix(s.Uses, "./") {
return StepTypeUsesActionLocal
}
return StepTypeUsesActionRemote
}
// ReadWorkflow returns a list of jobs for a given workflow file reader
func ReadWorkflow(in io.Reader) (*Workflow, error) {
w := new(Workflow)
err := yaml.NewDecoder(in).Decode(w)
return w, err
}
// GetJob will get a job by name in the workflow
func (w *Workflow) GetJob(jobID string) *Job {
for id, j := range w.Jobs {
if jobID == id {
if j.Name == "" {
j.Name = id
}
return j
}
}
return nil
}
// GetJobIDs will get all the job names in the workflow
func (w *Workflow) GetJobIDs() []string {
ids := make([]string, 0)
for id := range w.Jobs {
ids = append(ids, id)
}
return ids
}
func (w *Workflow) On() []string {
var evts []string
for _, job := range w.Jobs {
evts = append(evts, job.RunsOn()...)
}
return evts
}

View File

@ -0,0 +1,100 @@
package gitea
import (
"strings"
"testing"
"github.com/stretchr/testify/assert"
)
func TestReadWorkflow_StringEvent(t *testing.T) {
yaml := `
name: local-action-docker-url
on: push
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: ./actions/docker-url
`
workflow, err := ReadWorkflow(strings.NewReader(yaml))
assert.NoError(t, err, "read workflow should succeed")
assert.Len(t, workflow.On(), 1)
assert.Contains(t, workflow.On(), "push")
}
func TestReadWorkflow_ListEvent(t *testing.T) {
yaml := `
name: local-action-docker-url
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: ./actions/docker-url
`
workflow, err := ReadWorkflow(strings.NewReader(yaml))
assert.NoError(t, err, "read workflow should succeed")
assert.Len(t, workflow.On(), 2)
assert.Contains(t, workflow.On(), "push")
assert.Contains(t, workflow.On(), "pull_request")
}
func TestReadWorkflow_MapEvent(t *testing.T) {
yaml := `
name: local-action-docker-url
on:
push:
branches:
- master
pull_request:
branches:
- master
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: ./actions/docker-url
`
workflow, err := ReadWorkflow(strings.NewReader(yaml))
assert.NoError(t, err, "read workflow should succeed")
assert.Len(t, workflow.On(), 2)
assert.Contains(t, workflow.On(), "push")
assert.Contains(t, workflow.On(), "pull_request")
}
func TestReadWorkflow_StringContainer(t *testing.T) {
yaml := `
name: local-action-docker-url
jobs:
test:
container: nginx:latest
runs-on: ubuntu-latest
steps:
- uses: ./actions/docker-url
test2:
container:
image: nginx:latest
env:
foo: bar
runs-on: ubuntu-latest
steps:
- uses: ./actions/docker-url
`
workflow, err := ReadWorkflow(strings.NewReader(yaml))
assert.NoError(t, err, "read workflow should succeed")
assert.Len(t, workflow.Jobs, 2)
assert.Contains(t, workflow.Jobs["test"].Container().Image, "nginx:latest")
assert.Contains(t, workflow.Jobs["test2"].Container().Image, "nginx:latest")
assert.Contains(t, workflow.Jobs["test2"].Container().Env["foo"], "bar")
}

View File

@ -0,0 +1,165 @@
package github
import (
"context"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
bot_model "code.gitea.io/gitea/models/bot"
repo_model "code.gitea.io/gitea/models/repo"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/models/webhook"
"code.gitea.io/gitea/modules/bot/runner"
"code.gitea.io/gitea/modules/git"
//"code.gitea.io/gitea/modules/log"
//"code.gitea.io/gitea/modules/util"
"github.com/nektos/act/pkg/model"
act_runner "github.com/nektos/act/pkg/runner"
)
func init() {
runner.RegisterRunnerType(new(GithubRunner))
}
type GithubRunner struct {
}
func (gw *GithubRunner) Name() string {
return "github"
}
func (gw *GithubRunner) Detect(commit *git.Commit, event webhook.HookEventType, ref string) (bool, string, error) {
tree, err := commit.SubTree(".github/workflow")
if err != nil {
return false, "", err
}
entries, err := tree.ListEntries()
if err != nil {
return false, "", err
}
var content = make(map[string]string)
for _, entry := range entries {
blob := entry.Blob()
rd, err := blob.DataAsync()
if err != nil {
return false, "", err
}
bs, err := io.ReadAll(rd)
rd.Close()
if err != nil {
return false, "", err
}
content[entry.Name()] = string(bs)
}
res, err := json.Marshal(content)
if err != nil {
return false, "", err
}
return true, string(res), nil
}
func (gw *GithubRunner) Run(task *bot_model.Task) error {
tmpDir, err := os.MkdirTemp("", fmt.Sprintf("%d", task.ID))
if err != nil {
return err
}
var files = make(map[string]string)
if err := json.Unmarshal([]byte(task.Content), &files); err != nil {
return err
}
for name, content := range files {
f, err := os.Create(filepath.Join(tmpDir, name))
if err != nil {
return err
}
if _, err := f.WriteString(content); err != nil {
f.Close()
return err
}
f.Close()
}
repo, err := repo_model.GetRepositoryByID(task.RepoID)
if err != nil {
return err
}
evtFilePath := filepath.Join(tmpDir, "event.json")
evtFile, err := os.Create(evtFilePath)
if err != nil {
return err
}
if _, err := evtFile.WriteString(task.EventPayload); err != nil {
evtFile.Close()
return err
}
evtFile.Close()
planner, err := model.NewWorkflowPlanner(tmpDir, false)
if err != nil {
return err
}
plan := planner.PlanEvent(task.Event)
actor, err := user_model.GetUserByID(task.TriggerUserID)
if err != nil {
return err
}
// run the plan
config := &act_runner.Config{
Actor: actor.LoginName,
EventName: task.Event,
EventPath: evtFilePath,
DefaultBranch: repo.DefaultBranch,
/*ForcePull: input.forcePull,
ForceRebuild: input.forceRebuild,
ReuseContainers: input.reuseContainers,
Workdir: input.Workdir(),
BindWorkdir: input.bindWorkdir,
LogOutput: !input.noOutput,*/
//Env: envs,
Secrets: map[string]string{
"token": "614e597274a527b6fcf6ddfe45def79430126f08",
},
//InsecureSecrets: input.insecureSecrets,*/
Platforms: map[string]string{
"ubuntu-latest": "node:12-buster-slim",
"ubuntu-20.04": "node:12-buster-slim",
"ubuntu-18.04": "node:12-buster-slim",
},
/*Privileged: input.privileged,
UsernsMode: input.usernsMode,
ContainerArchitecture: input.containerArchitecture,
ContainerDaemonSocket: input.containerDaemonSocket,
UseGitIgnore: input.useGitIgnore,*/
GitHubInstance: "gitea.com",
/*ContainerCapAdd: input.containerCapAdd,
ContainerCapDrop: input.containerCapDrop,
AutoRemove: input.autoRemove,
ArtifactServerPath: input.artifactServerPath,
ArtifactServerPort: input.artifactServerPort,*/
}
r, err := act_runner.New(config)
if err != nil {
return err
}
//ctx, cancel := context.WithTimeout(context.Background(), )
executor := r.NewPlanExecutor(plan).Finally(func(ctx context.Context) error {
//cancel()
return nil
})
return executor(context.Background())
}

View File

@ -0,0 +1,27 @@
package runner
import (
bots_model "code.gitea.io/gitea/models/bots"
"code.gitea.io/gitea/models/webhook"
"code.gitea.io/gitea/modules/git"
)
var runnerTypes = make(map[string]RunnerType)
type RunnerType interface {
Name() string
Detect(commit *git.Commit, event webhook.HookEventType, ref string) (bool, string, error)
Run(task *bots_model.Task) error
}
func RegisterRunnerType(runnerType RunnerType) {
runnerTypes[runnerType.Name()] = runnerType
}
func GetRunnerType(name string) RunnerType {
return runnerTypes[name]
}
func GetRunnerTypes() map[string]RunnerType {
return runnerTypes
}

View File

@ -5,6 +5,9 @@
package context
import (
"bufio"
"errors"
"net"
"net/http"
)
@ -84,6 +87,14 @@ func (r *Response) Before(f func(ResponseWriter)) {
r.befores = append(r.befores, f)
}
func (r *Response) Hijack() (net.Conn, *bufio.ReadWriter, error) {
if h, ok := r.ResponseWriter.(http.Hijacker); ok {
return h.Hijack()
}
return nil, nil, errors.New("unimplemented http.Hijacker ")
}
// NewResponse creates a response
func NewResponse(resp http.ResponseWriter) *Response {
if v, ok := resp.(*Response); ok {

View File

@ -0,0 +1,209 @@
// Copyright 2022 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package bots
import (
"context"
"encoding/json"
"fmt"
"code.gitea.io/gitea/models"
bots_model "code.gitea.io/gitea/models/bots"
"code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/models/perm"
repo_model "code.gitea.io/gitea/models/repo"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/models/webhook"
"code.gitea.io/gitea/modules/convert"
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/notification/base"
"code.gitea.io/gitea/modules/process"
"code.gitea.io/gitea/modules/repository"
"code.gitea.io/gitea/modules/setting"
api "code.gitea.io/gitea/modules/structs"
bots_service "code.gitea.io/gitea/services/bots"
)
type botsNotifier struct {
base.NullNotifier
}
var _ base.Notifier = &botsNotifier{}
// NewNotifier create a new botsNotifier notifier
func NewNotifier() base.Notifier {
return &botsNotifier{}
}
func notifyIssue(issue *models.Issue, doer *user_model.User, evt webhook.HookEventType, payload string) {
err := issue.LoadRepo(db.DefaultContext)
if err != nil {
log.Error("issue.LoadRepo: %v", err)
return
}
if issue.Repo.IsEmpty || issue.Repo.IsArchived {
return
}
ref := issue.Ref
if ref == "" {
ref = issue.Repo.DefaultBranch
}
gitRepo, err := git.OpenRepository(context.Background(), issue.Repo.RepoPath())
if err != nil {
log.Error("issue.LoadRepo: %v", err)
return
}
defer gitRepo.Close()
// Get the commit object for the ref
commit, err := gitRepo.GetCommit(ref)
if err != nil {
log.Error("gitRepo.GetCommit: %v", err)
return
}
task := bots_model.Task{
RepoID: issue.RepoID,
TriggerUserID: doer.ID,
Event: evt,
EventPayload: payload,
Status: bots_model.TaskPending,
Ref: ref,
CommitSHA: commit.ID.String(),
}
if err := bots_model.InsertTask(&task); err != nil {
log.Error("InsertBotTask: %v", err)
} else {
bots_service.PushToQueue(&task)
}
}
// TODO: implement all events
func (a *botsNotifier) NotifyNewIssue(issue *models.Issue, mentions []*user_model.User) {
payload := map[string]interface{}{
"issue": map[string]interface{}{
"number": issue.Index,
},
}
bs, err := json.Marshal(payload)
if err != nil {
log.Error("NotifyNewIssue: %v", err)
return
}
notifyIssue(issue, issue.Poster, webhook.HookEventIssues, string(bs))
}
// NotifyIssueChangeStatus notifies close or reopen issue to notifiers
func (a *botsNotifier) NotifyIssueChangeStatus(doer *user_model.User, issue *models.Issue, actionComment *models.Comment, closeOrReopen bool) {
}
func (a *botsNotifier) NotifyIssueChangeLabels(doer *user_model.User, issue *models.Issue,
addedLabels []*models.Label, removedLabels []*models.Label,
) {
payload := map[string]interface{}{
"issue": map[string]interface{}{
"number": issue.Index,
},
}
bs, err := json.Marshal(payload)
if err != nil {
log.Error("NotifyNewIssue: %v", err)
return
}
notifyIssue(issue, doer, webhook.HookEventIssueLabel, string(bs))
}
// NotifyCreateIssueComment notifies comment on an issue to notifiers
func (a *botsNotifier) NotifyCreateIssueComment(doer *user_model.User, repo *repo_model.Repository,
issue *models.Issue, comment *models.Comment, mentions []*user_model.User) {
}
func (a *botsNotifier) NotifyNewPullRequest(pull *models.PullRequest, mentions []*user_model.User) {
}
func (a *botsNotifier) NotifyRenameRepository(doer *user_model.User, repo *repo_model.Repository, oldRepoName string) {
}
func (a *botsNotifier) NotifyTransferRepository(doer *user_model.User, repo *repo_model.Repository, oldOwnerName string) {
}
func (a *botsNotifier) NotifyCreateRepository(doer *user_model.User, u *user_model.User, repo *repo_model.Repository) {
}
func (a *botsNotifier) NotifyForkRepository(doer *user_model.User, oldRepo, repo *repo_model.Repository) {
}
func (a *botsNotifier) NotifyPullRequestReview(pr *models.PullRequest, review *models.Review, comment *models.Comment, mentions []*user_model.User) {
}
func (*botsNotifier) NotifyMergePullRequest(pr *models.PullRequest, doer *user_model.User) {
}
func (a *botsNotifier) NotifyPushCommits(pusher *user_model.User, repo *repo_model.Repository, opts *repository.PushUpdateOptions, commits *repository.PushCommits) {
ctx, _, finished := process.GetManager().AddContext(graceful.GetManager().HammerContext(), fmt.Sprintf("webhook.NotifyPushCommits User: %s[%d] in %s[%d]", pusher.Name, pusher.ID, repo.FullName(), repo.ID))
defer finished()
apiPusher := convert.ToUser(pusher, nil)
apiCommits, apiHeadCommit, err := commits.ToAPIPayloadCommits(ctx, repo.RepoPath(), repo.HTMLURL())
if err != nil {
log.Error("commits.ToAPIPayloadCommits failed: %v", err)
return
}
payload := &api.PushPayload{
Ref: opts.RefFullName,
Before: opts.OldCommitID,
After: opts.NewCommitID,
CompareURL: setting.AppURL + commits.CompareURL,
Commits: apiCommits,
HeadCommit: apiHeadCommit,
Repo: convert.ToRepo(repo, perm.AccessModeOwner),
Pusher: apiPusher,
Sender: apiPusher,
}
bs, err := json.Marshal(payload)
if err != nil {
log.Error("json.Marshal(payload) failed: %v", err)
return
}
task := bots_model.Task{
RepoID: repo.ID,
TriggerUserID: pusher.ID,
Event: webhook.HookEventPush,
EventPayload: string(bs),
Status: bots_model.TaskPending,
}
if err := bots_model.InsertTask(&task); err != nil {
log.Error("InsertBotTask: %v", err)
} else {
bots_service.PushToQueue(&task)
}
}
func (a *botsNotifier) NotifyCreateRef(doer *user_model.User, repo *repo_model.Repository, refType, refFullName, refID string) {
}
func (a *botsNotifier) NotifyDeleteRef(doer *user_model.User, repo *repo_model.Repository, refType, refFullName string) {
}
func (a *botsNotifier) NotifySyncPushCommits(pusher *user_model.User, repo *repo_model.Repository, opts *repository.PushUpdateOptions, commits *repository.PushCommits) {
}
func (a *botsNotifier) NotifySyncCreateRef(doer *user_model.User, repo *repo_model.Repository, refType, refFullName, refID string) {
}
func (a *botsNotifier) NotifySyncDeleteRef(doer *user_model.User, repo *repo_model.Repository, refType, refFullName string) {
}
func (a *botsNotifier) NotifyNewRelease(rel *models.Release) {
}

View File

@ -13,6 +13,7 @@ import (
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/notification/action"
"code.gitea.io/gitea/modules/notification/base"
"code.gitea.io/gitea/modules/notification/bots"
"code.gitea.io/gitea/modules/notification/indexer"
"code.gitea.io/gitea/modules/notification/mail"
"code.gitea.io/gitea/modules/notification/mirror"
@ -40,6 +41,7 @@ func NewContext() {
RegisterNotifier(webhook.NewNotifier())
RegisterNotifier(action.NewNotifier())
RegisterNotifier(mirror.NewNotifier())
RegisterNotifier(bots.NewNotifier())
}
// NotifyNewWikiPage notifies creating new wiki pages to notifiers

149
routers/api/bots/bots.go Normal file
View File

@ -0,0 +1,149 @@
// Copyright 2022 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package bots
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"time"
bots_model "code.gitea.io/gitea/models/bots"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/web"
"github.com/gorilla/websocket"
)
func Routes() *web.Route {
r := web.NewRoute()
r.Get("/", Serve)
return r
}
var upgrader = websocket.Upgrader{
ReadBufferSize: 4096,
WriteBufferSize: 4096,
EnableCompression: true,
CheckOrigin: func(r *http.Request) bool {
return true
},
}
var pongWait = 60 * time.Second
type Message struct {
Version int //
Type int // message type, 1 register 2 error
RunnerUUID string // runner uuid
ErrCode int // error code
ErrContent string // errors message
}
func Serve(w http.ResponseWriter, r *http.Request) {
log.Trace("websocket init request begin from %s", r.RemoteAddr)
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Error("websocket upgrade failed: %v", err)
return
}
defer c.Close()
log.Trace("websocket upgrade from %s successfully", r.RemoteAddr)
c.SetReadDeadline(time.Now().Add(pongWait))
c.SetPongHandler(func(string) error { c.SetReadDeadline(time.Now().Add(pongWait)); return nil })
MESSAGE_BUMP:
for {
// read log from client
mt, message, err := c.ReadMessage()
if err != nil {
if websocket.IsCloseError(err, websocket.CloseAbnormalClosure) ||
websocket.IsCloseError(err, websocket.CloseNormalClosure) {
c.Close()
break
}
if !strings.Contains(err.Error(), "i/o timeout") {
log.Error("websocket[%s] read failed: %#v", r.RemoteAddr, err)
}
break
} else {
log.Trace("websocket[%s] received message: %s", r.RemoteAddr, message)
}
// read message first
var msg Message
if err = json.Unmarshal(message, &msg); err != nil {
log.Error("websocket[%s] unmarshal failed: %#v", r.RemoteAddr, err)
break
}
switch msg.Version {
case 1:
switch msg.Type {
case 1:
log.Info("websocket[%s] registered", r.RemoteAddr)
runner, err := bots_model.GetRunnerByUUID(msg.RunnerUUID)
if err != nil {
if !errors.Is(err, bots_model.ErrRunnerNotExist{}) {
log.Error("websocket[%s] get runner [%s] failed: %v", r.RemoteAddr, msg.RunnerUUID, err)
break
}
err = c.WriteMessage(mt, message)
if err != nil {
log.Error("websocket[%s] sent message failed: %v", r.RemoteAddr, err)
break
}
} else {
fmt.Printf("-----%v\n", runner)
// TODO: handle read message
err = c.WriteMessage(mt, message)
if err != nil {
log.Error("websocket[%s] sent message failed: %v", r.RemoteAddr, err)
break
}
}
default:
returnMsg := Message{
Version: 1,
Type: 2,
ErrCode: 1,
ErrContent: "type is not supported",
}
bs, err := json.Marshal(&returnMsg)
if err != nil {
log.Error("websocket[%s] marshal message failed: %v", r.RemoteAddr, err)
break MESSAGE_BUMP
}
err = c.WriteMessage(mt, bs)
if err != nil {
log.Error("websocket[%s] sent message failed: %v", r.RemoteAddr, err)
}
break MESSAGE_BUMP
}
default:
returnMsg := Message{
Version: 1,
Type: 2,
ErrCode: 1,
ErrContent: "version is not supported",
}
bs, err := json.Marshal(&returnMsg)
if err != nil {
log.Error("websocket[%s] marshal message failed: %v", r.RemoteAddr, err)
break MESSAGE_BUMP
}
err = c.WriteMessage(mt, bs)
if err != nil {
log.Error("websocket[%s] sent message failed: %v", r.RemoteAddr, err)
}
break MESSAGE_BUMP
}
// TODO: find new task and send to client
}
}

View File

@ -31,6 +31,7 @@ import (
"code.gitea.io/gitea/modules/translation"
"code.gitea.io/gitea/modules/util"
"code.gitea.io/gitea/modules/web"
bots_router "code.gitea.io/gitea/routers/api/bots"
packages_router "code.gitea.io/gitea/routers/api/packages"
apiv1 "code.gitea.io/gitea/routers/api/v1"
"code.gitea.io/gitea/routers/common"
@ -39,6 +40,7 @@ import (
"code.gitea.io/gitea/services/auth"
"code.gitea.io/gitea/services/auth/source/oauth2"
"code.gitea.io/gitea/services/automerge"
bots_service "code.gitea.io/gitea/services/bots"
"code.gitea.io/gitea/services/cron"
"code.gitea.io/gitea/services/mailer"
markup_service "code.gitea.io/gitea/services/markup"
@ -160,6 +162,7 @@ func GlobalInitInstalled(ctx context.Context) {
mustInit(pull_service.Init)
mustInit(automerge.Init)
mustInit(task.Init)
mustInit(bots_service.Init)
mustInit(repo_migrations.Init)
eventsource.GetManager().Init()
@ -195,5 +198,6 @@ func NormalRoutes(ctx context.Context) *web.Route {
// This implements the OCI API (Note this is not preceded by /api but is instead /v2)
r.Mount("/v2", packages_router.ContainerRoutes(ctx))
}
r.Mount("/api/actions", bots_router.Routes())
return r
}

62
services/bots/bots.go Normal file
View File

@ -0,0 +1,62 @@
// Copyright 2022 Gitea. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package bots
import (
"fmt"
bots_model "code.gitea.io/gitea/models/bots"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/queue"
//"code.gitea.io/gitea/modules/json"
)
// taskQueue is a global queue of tasks
var taskQueue queue.Queue
// PushToQueue
func PushToQueue(task *bots_model.Task) {
taskQueue.Push(task)
}
// Dispatch assign a task to a runner
func Dispatch(task *bots_model.Task) (*bots_model.Runner, error) {
runner, err := bots_model.GetUsableRunner(bots_model.GetRunnerOptions{
RepoID: task.RepoID,
})
if err != nil {
return nil, err
}
return runner, bots_model.AssignTaskToRunner(task.ID, runner.ID)
}
// Init will start the service to get all unfinished tasks and run them
func Init() error {
taskQueue = queue.CreateQueue("actions_task", handle, &bots_model.Task{})
if taskQueue == nil {
return fmt.Errorf("Unable to create Task Queue")
}
go graceful.GetManager().RunWithShutdownFns(taskQueue.Run)
return nil
}
func handle(data ...queue.Data) []queue.Data {
var unhandled []queue.Data
for _, datum := range data {
task := datum.(*bots_model.Task)
runner, err := Dispatch(task)
if err != nil {
log.Error("Run task failed: %v", err)
unhandled = append(unhandled, task)
} else {
log.Trace("task %v assigned to %s", task.UUID, runner.UUID)
}
}
return unhandled
}