mirror of
https://github.com/go-gitea/gitea.git
synced 2025-04-18 00:47:48 -04:00
chore(router): Add grpc and grep-web api
Using connect-go package https://github.com/bufbuild/connect-go Signed-off-by: Bo-Yi Wu <appleboy.tw@gmail.com>
This commit is contained in:
parent
92d15afd18
commit
0e79fc556a
6
go.mod
6
go.mod
@ -5,6 +5,8 @@ go 1.18
|
|||||||
require (
|
require (
|
||||||
code.gitea.io/gitea-vet v0.2.2-0.20220122151748-48ebc902541b
|
code.gitea.io/gitea-vet v0.2.2-0.20220122151748-48ebc902541b
|
||||||
code.gitea.io/sdk/gitea v0.15.1
|
code.gitea.io/sdk/gitea v0.15.1
|
||||||
|
gitea.com/gitea/proto v0.0.0-20220802024851-7ee5947f928a
|
||||||
|
gitea.com/go-chi/binding v0.0.0-20220309004920-114340dabecb
|
||||||
codeberg.org/gusted/mcaptcha v0.0.0-20220723083913-4f3072e1d570
|
codeberg.org/gusted/mcaptcha v0.0.0-20220723083913-4f3072e1d570
|
||||||
gitea.com/go-chi/binding v0.0.0-20221013104517-b29891619681
|
gitea.com/go-chi/binding v0.0.0-20221013104517-b29891619681
|
||||||
gitea.com/go-chi/cache v0.2.0
|
gitea.com/go-chi/cache v0.2.0
|
||||||
@ -54,6 +56,7 @@ require (
|
|||||||
github.com/google/uuid v1.3.0
|
github.com/google/uuid v1.3.0
|
||||||
github.com/gorilla/feeds v1.1.1
|
github.com/gorilla/feeds v1.1.1
|
||||||
github.com/gorilla/sessions v1.2.1
|
github.com/gorilla/sessions v1.2.1
|
||||||
|
github.com/gorilla/websocket v1.4.2
|
||||||
github.com/hashicorp/go-version v1.6.0
|
github.com/hashicorp/go-version v1.6.0
|
||||||
github.com/hashicorp/golang-lru v0.5.4
|
github.com/hashicorp/golang-lru v0.5.4
|
||||||
github.com/huandu/xstrings v1.3.2
|
github.com/huandu/xstrings v1.3.2
|
||||||
@ -71,6 +74,7 @@ require (
|
|||||||
github.com/microcosm-cc/bluemonday v1.0.20
|
github.com/microcosm-cc/bluemonday v1.0.20
|
||||||
github.com/minio/minio-go/v7 v7.0.39
|
github.com/minio/minio-go/v7 v7.0.39
|
||||||
github.com/msteinert/pam v1.1.0
|
github.com/msteinert/pam v1.1.0
|
||||||
|
github.com/nektos/act v0.2.24
|
||||||
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646
|
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646
|
||||||
github.com/niklasfasching/go-org v1.6.5
|
github.com/niklasfasching/go-org v1.6.5
|
||||||
github.com/oliamb/cutter v0.2.2
|
github.com/oliamb/cutter v0.2.2
|
||||||
@ -198,7 +202,6 @@ require (
|
|||||||
github.com/gorilla/handlers v1.5.1 // indirect
|
github.com/gorilla/handlers v1.5.1 // indirect
|
||||||
github.com/gorilla/mux v1.8.0 // indirect
|
github.com/gorilla/mux v1.8.0 // indirect
|
||||||
github.com/gorilla/securecookie v1.1.1 // indirect
|
github.com/gorilla/securecookie v1.1.1 // indirect
|
||||||
github.com/gorilla/websocket v1.4.2 // indirect
|
|
||||||
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
|
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
|
||||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
|
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
|
||||||
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
|
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
|
||||||
@ -233,7 +236,6 @@ require (
|
|||||||
github.com/modern-go/reflect2 v1.0.2 // indirect
|
github.com/modern-go/reflect2 v1.0.2 // indirect
|
||||||
github.com/mrjones/oauth v0.0.0-20190623134757-126b35219450 // indirect
|
github.com/mrjones/oauth v0.0.0-20190623134757-126b35219450 // indirect
|
||||||
github.com/mschoch/smat v0.2.0 // indirect
|
github.com/mschoch/smat v0.2.0 // indirect
|
||||||
github.com/nektos/act v0.2.26 // indirect
|
|
||||||
github.com/nwaples/rardecode v1.1.3 // indirect
|
github.com/nwaples/rardecode v1.1.3 // indirect
|
||||||
github.com/oklog/ulid v1.3.1 // indirect
|
github.com/oklog/ulid v1.3.1 // indirect
|
||||||
github.com/olekukonko/tablewriter v0.0.5 // indirect
|
github.com/olekukonko/tablewriter v0.0.5 // indirect
|
||||||
|
@ -9,6 +9,9 @@ import (
|
|||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
|
"golang.org/x/net/http2"
|
||||||
|
"golang.org/x/net/http2/h2c"
|
||||||
)
|
)
|
||||||
|
|
||||||
func newHTTPServer(network, address, name string, handler http.Handler) (*Server, ServeFunction) {
|
func newHTTPServer(network, address, name string, handler http.Handler) (*Server, ServeFunction) {
|
||||||
@ -17,7 +20,7 @@ func newHTTPServer(network, address, name string, handler http.Handler) (*Server
|
|||||||
ReadTimeout: DefaultReadTimeOut,
|
ReadTimeout: DefaultReadTimeOut,
|
||||||
WriteTimeout: DefaultWriteTimeOut,
|
WriteTimeout: DefaultWriteTimeOut,
|
||||||
MaxHeaderBytes: DefaultMaxHeaderBytes,
|
MaxHeaderBytes: DefaultMaxHeaderBytes,
|
||||||
Handler: handler,
|
Handler: h2c.NewHandler(handler, &http2.Server{}),
|
||||||
BaseContext: func(net.Listener) context.Context { return GetManager().HammerContext() },
|
BaseContext: func(net.Listener) context.Context { return GetManager().HammerContext() },
|
||||||
}
|
}
|
||||||
server.OnShutdown = func() {
|
server.OnShutdown = func() {
|
||||||
|
@ -5,207 +5,48 @@
|
|||||||
package bots
|
package bots
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"net/http"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
bots_model "code.gitea.io/gitea/models/bots"
|
|
||||||
"code.gitea.io/gitea/modules/log"
|
|
||||||
"code.gitea.io/gitea/modules/timeutil"
|
|
||||||
"code.gitea.io/gitea/modules/web"
|
"code.gitea.io/gitea/modules/web"
|
||||||
|
"gitea.com/gitea/proto/gen/proto/v1/v1connect"
|
||||||
|
|
||||||
"github.com/gorilla/websocket"
|
"github.com/bufbuild/connect-go"
|
||||||
|
grpchealth "github.com/bufbuild/connect-grpchealth-go"
|
||||||
|
grpcreflect "github.com/bufbuild/connect-grpcreflect-go"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Routes() *web.Route {
|
func Routes(r *web.Route) {
|
||||||
r := web.NewRoute()
|
compress1KB := connect.WithCompressMinBytes(1024)
|
||||||
r.Get("/", Serve)
|
|
||||||
return r
|
service := &RunnerService{}
|
||||||
}
|
path, handler := v1connect.NewBuildServiceHandler(
|
||||||
|
service,
|
||||||
var upgrader = websocket.Upgrader{
|
compress1KB,
|
||||||
ReadBufferSize: 4096,
|
)
|
||||||
WriteBufferSize: 4096,
|
|
||||||
EnableCompression: true,
|
// grpcV1
|
||||||
CheckOrigin: func(r *http.Request) bool {
|
grpcPath, gHandler := grpcreflect.NewHandlerV1(
|
||||||
return true
|
grpcreflect.NewStaticReflector(v1connect.BuildServiceName),
|
||||||
},
|
compress1KB,
|
||||||
}
|
)
|
||||||
|
|
||||||
var pongWait = 60 * time.Second
|
// grpcV1Alpha
|
||||||
|
grpcAlphaPath, gAlphaHandler := grpcreflect.NewHandlerV1Alpha(
|
||||||
type Message struct {
|
grpcreflect.NewStaticReflector(v1connect.BuildServiceName),
|
||||||
Version int //
|
compress1KB,
|
||||||
Type int // message type, 1 register 2 error 3 task 4 no task
|
)
|
||||||
RunnerUUID string // runner uuid
|
|
||||||
BuildUUID string // build uuid
|
// grpcHealthCheck
|
||||||
ErrCode int // error code
|
grpcHealthPath, gHealthHandler := grpchealth.NewHandler(
|
||||||
ErrContent string // errors message
|
grpchealth.NewStaticChecker(v1connect.BuildServiceName),
|
||||||
EventName string
|
compress1KB,
|
||||||
EventPayload string
|
)
|
||||||
JobID string // only run the special job, empty means run all the jobs
|
|
||||||
}
|
// socket connection
|
||||||
|
r.Get("/socket", socketServe)
|
||||||
const (
|
// restful connection
|
||||||
version1 = 1
|
r.Post(path+"{name}", giteaHandler(handler))
|
||||||
)
|
// grpc connection
|
||||||
|
r.Post(grpcPath+"{name}", giteaHandler(gHandler))
|
||||||
const (
|
r.Post(grpcAlphaPath+"{name}", giteaHandler(gAlphaHandler))
|
||||||
MsgTypeRegister = iota + 1 // register
|
// healthy check connection
|
||||||
MsgTypeError // error
|
r.Post(grpcHealthPath+"{name}", giteaHandler(gHealthHandler))
|
||||||
MsgTypeRequestBuild // request build task
|
|
||||||
MsgTypeIdle // no task
|
|
||||||
MsgTypeBuildResult // build result
|
|
||||||
MsgTypeBuildJobResult // build job result
|
|
||||||
)
|
|
||||||
|
|
||||||
func handleVersion1(r *http.Request, c *websocket.Conn, mt int, message []byte, msg *Message) error {
|
|
||||||
switch msg.Type {
|
|
||||||
case MsgTypeRegister:
|
|
||||||
log.Info("websocket[%s] registered", r.RemoteAddr)
|
|
||||||
runner, err := bots_model.GetRunnerByUUID(msg.RunnerUUID)
|
|
||||||
if err != nil {
|
|
||||||
if !errors.Is(err, bots_model.ErrRunnerNotExist{}) {
|
|
||||||
return fmt.Errorf("websocket[%s] get runner [%s] failed: %v", r.RemoteAddr, msg.RunnerUUID, err)
|
|
||||||
}
|
|
||||||
err = c.WriteMessage(mt, message)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("websocket[%s] sent message failed: %v", r.RemoteAddr, err)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
fmt.Printf("-----%v\n", runner)
|
|
||||||
// TODO: handle read message
|
|
||||||
err = c.WriteMessage(mt, message)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("websocket[%s] sent message failed: %v", r.RemoteAddr, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case MsgTypeRequestBuild:
|
|
||||||
// TODO: find new task and send to client
|
|
||||||
build, err := bots_model.GetCurBuildByUUID(msg.RunnerUUID)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("websocket[%s] get task[%s] failed: %v", r.RemoteAddr, msg.RunnerUUID, err)
|
|
||||||
}
|
|
||||||
var returnMsg *Message
|
|
||||||
if build == nil {
|
|
||||||
time.Sleep(3 * time.Second)
|
|
||||||
returnMsg = &Message{
|
|
||||||
Version: version1,
|
|
||||||
Type: MsgTypeIdle,
|
|
||||||
RunnerUUID: msg.RunnerUUID,
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
returnMsg = &Message{
|
|
||||||
Version: version1,
|
|
||||||
Type: MsgTypeRequestBuild,
|
|
||||||
RunnerUUID: msg.RunnerUUID,
|
|
||||||
BuildUUID: build.UUID,
|
|
||||||
EventName: build.Event.Event(),
|
|
||||||
EventPayload: build.EventPayload,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
bs, err := json.Marshal(&returnMsg)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("websocket[%s] marshal message failed: %v", r.RemoteAddr, err)
|
|
||||||
}
|
|
||||||
err = c.WriteMessage(mt, bs)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("websocket[%s] sent message failed: %v", r.RemoteAddr, err)
|
|
||||||
}
|
|
||||||
case MsgTypeBuildResult:
|
|
||||||
log.Info("websocket[%s] returned CI result: %v", r.RemoteAddr, msg)
|
|
||||||
build, err := bots_model.GetBuildByUUID(msg.BuildUUID)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("websocket[%s] get build by uuid failed: %v", r.RemoteAddr, err)
|
|
||||||
}
|
|
||||||
cols := []string{"status", "end_time"}
|
|
||||||
if msg.ErrCode == 0 {
|
|
||||||
build.Status = bots_model.BuildFinished
|
|
||||||
} else {
|
|
||||||
build.Status = bots_model.BuildFailed
|
|
||||||
}
|
|
||||||
build.EndTime = timeutil.TimeStampNow()
|
|
||||||
if err := bots_model.UpdateBuild(build, cols...); err != nil {
|
|
||||||
log.Error("websocket[%s] update build failed: %v", r.RemoteAddr, err)
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
returnMsg := Message{
|
|
||||||
Version: version1,
|
|
||||||
Type: MsgTypeError,
|
|
||||||
ErrCode: 1,
|
|
||||||
ErrContent: fmt.Sprintf("message type %d is not supported", msg.Type),
|
|
||||||
}
|
|
||||||
bs, err := json.Marshal(&returnMsg)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("websocket[%s] marshal message failed: %v", r.RemoteAddr, err)
|
|
||||||
}
|
|
||||||
err = c.WriteMessage(mt, bs)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("websocket[%s] sent message failed: %v", r.RemoteAddr, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func Serve(w http.ResponseWriter, r *http.Request) {
|
|
||||||
log.Trace("websocket init request begin from %s", r.RemoteAddr)
|
|
||||||
c, err := upgrader.Upgrade(w, r, nil)
|
|
||||||
if err != nil {
|
|
||||||
log.Error("websocket upgrade failed: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer c.Close()
|
|
||||||
log.Trace("websocket upgrade from %s successfully", r.RemoteAddr)
|
|
||||||
|
|
||||||
c.SetReadDeadline(time.Now().Add(pongWait))
|
|
||||||
c.SetPongHandler(func(string) error { c.SetReadDeadline(time.Now().Add(pongWait)); return nil })
|
|
||||||
|
|
||||||
for {
|
|
||||||
// read message from client
|
|
||||||
mt, message, err := c.ReadMessage()
|
|
||||||
if err != nil {
|
|
||||||
if websocket.IsCloseError(err, websocket.CloseAbnormalClosure) ||
|
|
||||||
websocket.IsCloseError(err, websocket.CloseNormalClosure) {
|
|
||||||
c.Close()
|
|
||||||
} else if !strings.Contains(err.Error(), "i/o timeout") {
|
|
||||||
log.Error("websocket[%s] read failed: %#v", r.RemoteAddr, err)
|
|
||||||
}
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Trace("websocket[%s] received message: %s", r.RemoteAddr, string(message))
|
|
||||||
|
|
||||||
// read message first
|
|
||||||
var msg Message
|
|
||||||
if err = json.Unmarshal(message, &msg); err != nil {
|
|
||||||
log.Error("websocket[%s] unmarshal failed: %#v", r.RemoteAddr, err)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
switch msg.Version {
|
|
||||||
case 1:
|
|
||||||
if err := handleVersion1(r, c, mt, message, &msg); err != nil {
|
|
||||||
log.Error("%v", err)
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
returnMsg := Message{
|
|
||||||
Version: 1,
|
|
||||||
Type: MsgTypeError,
|
|
||||||
ErrCode: 1,
|
|
||||||
ErrContent: "version is not supported",
|
|
||||||
}
|
|
||||||
bs, err := json.Marshal(&returnMsg)
|
|
||||||
if err != nil {
|
|
||||||
log.Error("websocket[%s] marshal message failed: %v", r.RemoteAddr, err)
|
|
||||||
} else {
|
|
||||||
err = c.WriteMessage(mt, bs)
|
|
||||||
if err != nil {
|
|
||||||
log.Error("websocket[%s] sent message failed: %v", r.RemoteAddr, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
48
routers/api/bots/process.go
Normal file
48
routers/api/bots/process.go
Normal file
@ -0,0 +1,48 @@
|
|||||||
|
// Copyright 2022 The Gitea Authors. All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
package bots
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"code.gitea.io/gitea/modules/log"
|
||||||
|
v1 "gitea.com/gitea/proto/gen/proto/v1"
|
||||||
|
|
||||||
|
"github.com/bufbuild/connect-go"
|
||||||
|
)
|
||||||
|
|
||||||
|
type RunnerService struct{}
|
||||||
|
|
||||||
|
func (s *RunnerService) Connect(
|
||||||
|
ctx context.Context,
|
||||||
|
req *connect.Request[v1.ConnectRequest],
|
||||||
|
) (*connect.Response[v1.ConnectResponse], error) {
|
||||||
|
log.Info("Request headers: %v", req.Header())
|
||||||
|
res := connect.NewResponse(&v1.ConnectResponse{
|
||||||
|
JobId: 100,
|
||||||
|
})
|
||||||
|
res.Header().Set("Gitea-Version", "v1")
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *RunnerService) Accept(
|
||||||
|
ctx context.Context,
|
||||||
|
req *connect.Request[v1.AcceptRequest],
|
||||||
|
) (*connect.Response[v1.AcceptResponse], error) {
|
||||||
|
log.Info("Request headers: %v", req.Header())
|
||||||
|
res := connect.NewResponse(&v1.AcceptResponse{
|
||||||
|
JobId: 100,
|
||||||
|
})
|
||||||
|
res.Header().Set("Gitea-Version", "v1")
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func giteaHandler(h http.Handler) http.HandlerFunc {
|
||||||
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
log.Info("Got connection: %v", r.Proto)
|
||||||
|
h.ServeHTTP(w, r)
|
||||||
|
})
|
||||||
|
}
|
206
routers/api/bots/socket.go
Normal file
206
routers/api/bots/socket.go
Normal file
@ -0,0 +1,206 @@
|
|||||||
|
// Copyright 2022 The Gitea Authors. All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
package bots
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
bots_model "code.gitea.io/gitea/models/bots"
|
||||||
|
"code.gitea.io/gitea/modules/json"
|
||||||
|
"code.gitea.io/gitea/modules/log"
|
||||||
|
"code.gitea.io/gitea/modules/timeutil"
|
||||||
|
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
|
)
|
||||||
|
|
||||||
|
var upgrader = websocket.Upgrader{
|
||||||
|
ReadBufferSize: 4096,
|
||||||
|
WriteBufferSize: 4096,
|
||||||
|
EnableCompression: true,
|
||||||
|
CheckOrigin: func(r *http.Request) bool {
|
||||||
|
return true
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
var pongWait = 60 * time.Second
|
||||||
|
|
||||||
|
type Message struct {
|
||||||
|
Version int //
|
||||||
|
Type int // message type, 1 register 2 error 3 task 4 no task
|
||||||
|
RunnerUUID string // runner uuid
|
||||||
|
BuildUUID string // build uuid
|
||||||
|
ErrCode int // error code
|
||||||
|
ErrContent string // errors message
|
||||||
|
EventName string
|
||||||
|
EventPayload string
|
||||||
|
JobID string // only run the special job, empty means run all the jobs
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
version1 = 1
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
MsgTypeRegister = iota + 1 // register
|
||||||
|
MsgTypeError // error
|
||||||
|
MsgTypeRequestBuild // request build task
|
||||||
|
MsgTypeIdle // no task
|
||||||
|
MsgTypeBuildResult // build result
|
||||||
|
MsgTypeBuildJobResult // build job result
|
||||||
|
)
|
||||||
|
|
||||||
|
func handleVersion1(r *http.Request, c *websocket.Conn, mt int, message []byte, msg *Message) error {
|
||||||
|
switch msg.Type {
|
||||||
|
case MsgTypeRegister:
|
||||||
|
log.Info("websocket[%s] registered", r.RemoteAddr)
|
||||||
|
runner, err := bots_model.GetRunnerByUUID(msg.RunnerUUID)
|
||||||
|
if err != nil {
|
||||||
|
if !errors.Is(err, bots_model.ErrRunnerNotExist{}) {
|
||||||
|
return fmt.Errorf("websocket[%s] get runner [%s] failed: %v", r.RemoteAddr, msg.RunnerUUID, err)
|
||||||
|
}
|
||||||
|
err = c.WriteMessage(mt, message)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("websocket[%s] sent message failed: %v", r.RemoteAddr, err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
fmt.Printf("-----%v\n", runner)
|
||||||
|
// TODO: handle read message
|
||||||
|
err = c.WriteMessage(mt, message)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("websocket[%s] sent message failed: %v", r.RemoteAddr, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case MsgTypeRequestBuild:
|
||||||
|
// TODO: find new task and send to client
|
||||||
|
build, err := bots_model.GetCurBuildByUUID(msg.RunnerUUID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("websocket[%s] get task[%s] failed: %v", r.RemoteAddr, msg.RunnerUUID, err)
|
||||||
|
}
|
||||||
|
var returnMsg *Message
|
||||||
|
if build == nil {
|
||||||
|
time.Sleep(3 * time.Second)
|
||||||
|
returnMsg = &Message{
|
||||||
|
Version: version1,
|
||||||
|
Type: MsgTypeIdle,
|
||||||
|
RunnerUUID: msg.RunnerUUID,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
returnMsg = &Message{
|
||||||
|
Version: version1,
|
||||||
|
Type: MsgTypeRequestBuild,
|
||||||
|
RunnerUUID: msg.RunnerUUID,
|
||||||
|
BuildUUID: build.UUID,
|
||||||
|
EventName: build.Event.Event(),
|
||||||
|
EventPayload: build.EventPayload,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
bs, err := json.Marshal(&returnMsg)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("websocket[%s] marshal message failed: %v", r.RemoteAddr, err)
|
||||||
|
}
|
||||||
|
err = c.WriteMessage(mt, bs)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("websocket[%s] sent message failed: %v", r.RemoteAddr, err)
|
||||||
|
}
|
||||||
|
case MsgTypeBuildResult:
|
||||||
|
log.Info("websocket[%s] returned CI result: %v", r.RemoteAddr, msg)
|
||||||
|
build, err := bots_model.GetBuildByUUID(msg.BuildUUID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("websocket[%s] get build by uuid failed: %v", r.RemoteAddr, err)
|
||||||
|
}
|
||||||
|
cols := []string{"status", "end_time"}
|
||||||
|
if msg.ErrCode == 0 {
|
||||||
|
build.Status = bots_model.BuildFinished
|
||||||
|
} else {
|
||||||
|
build.Status = bots_model.BuildFailed
|
||||||
|
}
|
||||||
|
build.EndTime = timeutil.TimeStampNow()
|
||||||
|
if err := bots_model.UpdateBuild(build, cols...); err != nil {
|
||||||
|
log.Error("websocket[%s] update build failed: %v", r.RemoteAddr, err)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
returnMsg := Message{
|
||||||
|
Version: version1,
|
||||||
|
Type: MsgTypeError,
|
||||||
|
ErrCode: 1,
|
||||||
|
ErrContent: fmt.Sprintf("message type %d is not supported", msg.Type),
|
||||||
|
}
|
||||||
|
bs, err := json.Marshal(&returnMsg)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("websocket[%s] marshal message failed: %v", r.RemoteAddr, err)
|
||||||
|
}
|
||||||
|
err = c.WriteMessage(mt, bs)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("websocket[%s] sent message failed: %v", r.RemoteAddr, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func socketServe(w http.ResponseWriter, r *http.Request) {
|
||||||
|
log.Trace("websocket init request begin from %s", r.RemoteAddr)
|
||||||
|
c, err := upgrader.Upgrade(w, r, nil)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("websocket upgrade failed: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer c.Close()
|
||||||
|
log.Trace("websocket upgrade from %s successfully", r.RemoteAddr)
|
||||||
|
|
||||||
|
_ = c.SetReadDeadline(time.Now().Add(pongWait))
|
||||||
|
c.SetPongHandler(func(string) error {
|
||||||
|
return c.SetReadDeadline(time.Now().Add(pongWait))
|
||||||
|
})
|
||||||
|
|
||||||
|
for {
|
||||||
|
// read message from client
|
||||||
|
mt, message, err := c.ReadMessage()
|
||||||
|
if err != nil {
|
||||||
|
if websocket.IsCloseError(err, websocket.CloseAbnormalClosure) ||
|
||||||
|
websocket.IsCloseError(err, websocket.CloseNormalClosure) {
|
||||||
|
c.Close()
|
||||||
|
} else if !strings.Contains(err.Error(), "i/o timeout") {
|
||||||
|
log.Error("websocket[%s] read failed: %#v", r.RemoteAddr, err)
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Trace("websocket[%s] received message: %s", r.RemoteAddr, string(message))
|
||||||
|
|
||||||
|
// read message first
|
||||||
|
var msg Message
|
||||||
|
if err = json.Unmarshal(message, &msg); err != nil {
|
||||||
|
log.Error("websocket[%s] unmarshal failed: %#v", r.RemoteAddr, err)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
switch msg.Version {
|
||||||
|
case 1:
|
||||||
|
if err := handleVersion1(r, c, mt, message, &msg); err != nil {
|
||||||
|
log.Error("%v", err)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
returnMsg := Message{
|
||||||
|
Version: 1,
|
||||||
|
Type: MsgTypeError,
|
||||||
|
ErrCode: 1,
|
||||||
|
ErrContent: "version is not supported",
|
||||||
|
}
|
||||||
|
bs, err := json.Marshal(&returnMsg)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("websocket[%s] marshal message failed: %v", r.RemoteAddr, err)
|
||||||
|
} else {
|
||||||
|
err = c.WriteMessage(mt, bs)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("websocket[%s] sent message failed: %v", r.RemoteAddr, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -198,6 +198,6 @@ func NormalRoutes(ctx context.Context) *web.Route {
|
|||||||
// This implements the OCI API (Note this is not preceded by /api but is instead /v2)
|
// This implements the OCI API (Note this is not preceded by /api but is instead /v2)
|
||||||
r.Mount("/v2", packages_router.ContainerRoutes(ctx))
|
r.Mount("/v2", packages_router.ContainerRoutes(ctx))
|
||||||
}
|
}
|
||||||
r.Mount("/api/actions", bots_router.Routes())
|
bots_router.Routes(r)
|
||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user