mirror of
https://github.com/go-gitea/gitea.git
synced 2025-02-02 15:09:33 -05:00
Remove dependency on queue from setting
This commit is contained in:
parent
8798a61ba4
commit
030b6d91c8
@ -127,7 +127,7 @@ func InitIssueIndexer(syncReindex bool) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
issueIndexerQueue = setting.CreateQueue("issue_indexer", handler, &IndexerData{})
|
issueIndexerQueue = queue.CreateQueue("issue_indexer", handler, &IndexerData{})
|
||||||
|
|
||||||
if issueIndexerQueue == nil {
|
if issueIndexerQueue == nil {
|
||||||
log.Fatal("Unable to create issue indexer queue")
|
log.Fatal("Unable to create issue indexer queue")
|
||||||
|
@ -123,8 +123,8 @@ func RegisteredTypesAsString() []string {
|
|||||||
return types
|
return types
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateQueue takes a queue Type and HandlerFunc some options and possibly an exemplar and returns a Queue or an error
|
// NewQueue takes a queue Type and HandlerFunc some options and possibly an exemplar and returns a Queue or an error
|
||||||
func CreateQueue(queueType Type, handlerFunc HandlerFunc, opts, exemplar interface{}) (Queue, error) {
|
func NewQueue(queueType Type, handlerFunc HandlerFunc, opts, exemplar interface{}) (Queue, error) {
|
||||||
newFn, ok := queuesMap[queueType]
|
newFn, ok := queuesMap[queueType]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("Unsupported queue type: %v", queueType)
|
return nil, fmt.Errorf("Unsupported queue type: %v", queueType)
|
||||||
|
@ -59,7 +59,7 @@ func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), h
|
|||||||
q.lock.Unlock()
|
q.lock.Unlock()
|
||||||
log.Fatal("Timedout creating queue %v with cfg %v in %s", q.underlying, q.cfg, q.name)
|
log.Fatal("Timedout creating queue %v with cfg %v in %s", q.underlying, q.cfg, q.name)
|
||||||
default:
|
default:
|
||||||
queue, err := CreateQueue(q.underlying, handle, q.cfg, exemplar)
|
queue, err := NewQueue(q.underlying, handle, q.cfg, exemplar)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
q.internal = queue
|
q.internal = queue
|
||||||
q.lock.Unlock()
|
q.lock.Unlock()
|
||||||
@ -101,7 +101,7 @@ func NewWrappedQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro
|
|||||||
}
|
}
|
||||||
config := configInterface.(WrappedQueueConfiguration)
|
config := configInterface.(WrappedQueueConfiguration)
|
||||||
|
|
||||||
queue, err := CreateQueue(config.Underlying, handle, config.Config, exemplar)
|
queue, err := NewQueue(config.Underlying, handle, config.Config, exemplar)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
// Just return the queue there is no need to wrap
|
// Just return the queue there is no need to wrap
|
||||||
return queue, nil
|
return queue, nil
|
||||||
|
75
modules/queue/setting.go
Normal file
75
modules/queue/setting.go
Normal file
@ -0,0 +1,75 @@
|
|||||||
|
// Copyright 2019 The Gitea Authors. All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
package queue
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"code.gitea.io/gitea/modules/log"
|
||||||
|
"code.gitea.io/gitea/modules/setting"
|
||||||
|
)
|
||||||
|
|
||||||
|
func validType(t string) (Type, error) {
|
||||||
|
if len(t) == 0 {
|
||||||
|
return PersistableChannelQueueType, nil
|
||||||
|
}
|
||||||
|
for _, typ := range RegisteredTypes() {
|
||||||
|
if t == string(typ) {
|
||||||
|
return typ, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return PersistableChannelQueueType, fmt.Errorf("Unknown queue type: %s defaulting to %s", t, string(PersistableChannelQueueType))
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateQueue for name with provided handler and exemplar
|
||||||
|
func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue {
|
||||||
|
q := setting.GetQueueSettings(name)
|
||||||
|
opts := make(map[string]interface{})
|
||||||
|
opts["Name"] = name
|
||||||
|
opts["QueueLength"] = q.Length
|
||||||
|
opts["BatchLength"] = q.BatchLength
|
||||||
|
opts["DataDir"] = q.DataDir
|
||||||
|
opts["Addresses"] = q.Addresses
|
||||||
|
opts["Network"] = q.Network
|
||||||
|
opts["Password"] = q.Password
|
||||||
|
opts["DBIndex"] = q.DBIndex
|
||||||
|
opts["QueueName"] = q.QueueName
|
||||||
|
opts["Workers"] = q.Workers
|
||||||
|
opts["MaxWorkers"] = q.MaxWorkers
|
||||||
|
opts["BlockTimeout"] = q.BlockTimeout
|
||||||
|
opts["BoostTimeout"] = q.BoostTimeout
|
||||||
|
opts["BoostWorkers"] = q.BoostWorkers
|
||||||
|
|
||||||
|
typ, err := validType(q.Type)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Invalid type %s provided for queue named %s defaulting to %s", q.Type, name, string(typ))
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg, err := json.Marshal(opts)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Unable to marshall generic options: %v Error: %v", opts, err)
|
||||||
|
log.Error("Unable to create queue for %s", name, err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
returnable, err := NewQueue(typ, handle, cfg, exemplar)
|
||||||
|
if q.WrapIfNecessary && err != nil {
|
||||||
|
log.Warn("Unable to create queue for %s: %v", name, err)
|
||||||
|
log.Warn("Attempting to create wrapped queue")
|
||||||
|
returnable, err = NewQueue(WrappedQueueType, handle, WrappedQueueConfiguration{
|
||||||
|
Underlying: Type(q.Type),
|
||||||
|
Timeout: q.Timeout,
|
||||||
|
MaxAttempts: q.MaxAttempts,
|
||||||
|
Config: cfg,
|
||||||
|
QueueLength: q.Length,
|
||||||
|
}, exemplar)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Unable to create queue for %s: %v", name, err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return returnable
|
||||||
|
}
|
@ -5,7 +5,6 @@
|
|||||||
package setting
|
package setting
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"path"
|
"path"
|
||||||
"strconv"
|
"strconv"
|
||||||
@ -13,10 +12,10 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"code.gitea.io/gitea/modules/log"
|
"code.gitea.io/gitea/modules/log"
|
||||||
"code.gitea.io/gitea/modules/queue"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type queueSettings struct {
|
// QueueSettings represent the settings for a queue from the ini
|
||||||
|
type QueueSettings struct {
|
||||||
DataDir string
|
DataDir string
|
||||||
Length int
|
Length int
|
||||||
BatchLength int
|
BatchLength int
|
||||||
@ -38,55 +37,11 @@ type queueSettings struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Queue settings
|
// Queue settings
|
||||||
var Queue = queueSettings{}
|
var Queue = QueueSettings{}
|
||||||
|
|
||||||
// CreateQueue for name with provided handler and exemplar
|
// GetQueueSettings returns the queue settings for the appropriately named queue
|
||||||
func CreateQueue(name string, handle queue.HandlerFunc, exemplar interface{}) queue.Queue {
|
func GetQueueSettings(name string) QueueSettings {
|
||||||
q := getQueueSettings(name)
|
q := QueueSettings{}
|
||||||
opts := make(map[string]interface{})
|
|
||||||
opts["Name"] = name
|
|
||||||
opts["QueueLength"] = q.Length
|
|
||||||
opts["BatchLength"] = q.BatchLength
|
|
||||||
opts["DataDir"] = q.DataDir
|
|
||||||
opts["Addresses"] = q.Addresses
|
|
||||||
opts["Network"] = q.Network
|
|
||||||
opts["Password"] = q.Password
|
|
||||||
opts["DBIndex"] = q.DBIndex
|
|
||||||
opts["QueueName"] = q.QueueName
|
|
||||||
opts["Workers"] = q.Workers
|
|
||||||
opts["MaxWorkers"] = q.MaxWorkers
|
|
||||||
opts["BlockTimeout"] = q.BlockTimeout
|
|
||||||
opts["BoostTimeout"] = q.BoostTimeout
|
|
||||||
opts["BoostWorkers"] = q.BoostWorkers
|
|
||||||
|
|
||||||
cfg, err := json.Marshal(opts)
|
|
||||||
if err != nil {
|
|
||||||
log.Error("Unable to marshall generic options: %v Error: %v", opts, err)
|
|
||||||
log.Error("Unable to create queue for %s", name, err)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
returnable, err := queue.CreateQueue(queue.Type(q.Type), handle, cfg, exemplar)
|
|
||||||
if q.WrapIfNecessary && err != nil {
|
|
||||||
log.Warn("Unable to create queue for %s: %v", name, err)
|
|
||||||
log.Warn("Attempting to create wrapped queue")
|
|
||||||
returnable, err = queue.CreateQueue(queue.WrappedQueueType, handle, queue.WrappedQueueConfiguration{
|
|
||||||
Underlying: queue.Type(q.Type),
|
|
||||||
Timeout: q.Timeout,
|
|
||||||
MaxAttempts: q.MaxAttempts,
|
|
||||||
Config: cfg,
|
|
||||||
QueueLength: q.Length,
|
|
||||||
}, exemplar)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
log.Error("Unable to create queue for %s: %v", name, err)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return returnable
|
|
||||||
}
|
|
||||||
|
|
||||||
func getQueueSettings(name string) queueSettings {
|
|
||||||
q := queueSettings{}
|
|
||||||
sec := Cfg.Section("queue." + name)
|
sec := Cfg.Section("queue." + name)
|
||||||
// DataDir is not directly inheritable
|
// DataDir is not directly inheritable
|
||||||
q.DataDir = path.Join(Queue.DataDir, name)
|
q.DataDir = path.Join(Queue.DataDir, name)
|
||||||
@ -104,8 +59,7 @@ func getQueueSettings(name string) queueSettings {
|
|||||||
q.Length = sec.Key("LENGTH").MustInt(Queue.Length)
|
q.Length = sec.Key("LENGTH").MustInt(Queue.Length)
|
||||||
q.BatchLength = sec.Key("BATCH_LENGTH").MustInt(Queue.BatchLength)
|
q.BatchLength = sec.Key("BATCH_LENGTH").MustInt(Queue.BatchLength)
|
||||||
q.ConnectionString = sec.Key("CONN_STR").MustString(Queue.ConnectionString)
|
q.ConnectionString = sec.Key("CONN_STR").MustString(Queue.ConnectionString)
|
||||||
validTypes := queue.RegisteredTypesAsString()
|
q.Type = sec.Key("TYPE").MustString(Queue.Type)
|
||||||
q.Type = sec.Key("TYPE").In(Queue.Type, validTypes)
|
|
||||||
q.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(Queue.WrapIfNecessary)
|
q.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(Queue.WrapIfNecessary)
|
||||||
q.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(Queue.MaxAttempts)
|
q.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(Queue.MaxAttempts)
|
||||||
q.Timeout = sec.Key("TIMEOUT").MustDuration(Queue.Timeout)
|
q.Timeout = sec.Key("TIMEOUT").MustDuration(Queue.Timeout)
|
||||||
@ -131,8 +85,7 @@ func NewQueueService() {
|
|||||||
Queue.Length = sec.Key("LENGTH").MustInt(20)
|
Queue.Length = sec.Key("LENGTH").MustInt(20)
|
||||||
Queue.BatchLength = sec.Key("BATCH_LENGTH").MustInt(20)
|
Queue.BatchLength = sec.Key("BATCH_LENGTH").MustInt(20)
|
||||||
Queue.ConnectionString = sec.Key("CONN_STR").MustString(path.Join(AppDataPath, ""))
|
Queue.ConnectionString = sec.Key("CONN_STR").MustString(path.Join(AppDataPath, ""))
|
||||||
validTypes := queue.RegisteredTypesAsString()
|
Queue.Type = sec.Key("TYPE").MustString("")
|
||||||
Queue.Type = sec.Key("TYPE").In(string(queue.PersistableChannelQueueType), validTypes)
|
|
||||||
Queue.Network, Queue.Addresses, Queue.Password, Queue.DBIndex, _ = ParseQueueConnStr(Queue.ConnectionString)
|
Queue.Network, Queue.Addresses, Queue.Password, Queue.DBIndex, _ = ParseQueueConnStr(Queue.ConnectionString)
|
||||||
Queue.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(true)
|
Queue.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(true)
|
||||||
Queue.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(10)
|
Queue.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(10)
|
||||||
|
@ -4,16 +4,14 @@
|
|||||||
|
|
||||||
package setting
|
package setting
|
||||||
|
|
||||||
import "code.gitea.io/gitea/modules/queue"
|
|
||||||
|
|
||||||
func newTaskService() {
|
func newTaskService() {
|
||||||
taskSec := Cfg.Section("task")
|
taskSec := Cfg.Section("task")
|
||||||
queueTaskSec := Cfg.Section("queue.task")
|
queueTaskSec := Cfg.Section("queue.task")
|
||||||
switch taskSec.Key("QUEUE_TYPE").MustString(ChannelQueueType) {
|
switch taskSec.Key("QUEUE_TYPE").MustString(ChannelQueueType) {
|
||||||
case ChannelQueueType:
|
case ChannelQueueType:
|
||||||
queueTaskSec.Key("TYPE").MustString(string(queue.PersistableChannelQueueType))
|
queueTaskSec.Key("TYPE").MustString("persistable-channel")
|
||||||
case RedisQueueType:
|
case RedisQueueType:
|
||||||
queueTaskSec.Key("TYPE").MustString(string(queue.RedisQueueType))
|
queueTaskSec.Key("TYPE").MustString("redis")
|
||||||
}
|
}
|
||||||
queueTaskSec.Key("LENGTH").MustInt(taskSec.Key("QUEUE_LENGTH").MustInt(1000))
|
queueTaskSec.Key("LENGTH").MustInt(taskSec.Key("QUEUE_LENGTH").MustInt(1000))
|
||||||
queueTaskSec.Key("CONN_STR").MustString(taskSec.Key("QUEUE_CONN_STR").MustString("addrs=127.0.0.1:6379 db=0"))
|
queueTaskSec.Key("CONN_STR").MustString(taskSec.Key("QUEUE_CONN_STR").MustString("addrs=127.0.0.1:6379 db=0"))
|
||||||
|
@ -12,7 +12,6 @@ import (
|
|||||||
"code.gitea.io/gitea/modules/log"
|
"code.gitea.io/gitea/modules/log"
|
||||||
"code.gitea.io/gitea/modules/migrations/base"
|
"code.gitea.io/gitea/modules/migrations/base"
|
||||||
"code.gitea.io/gitea/modules/queue"
|
"code.gitea.io/gitea/modules/queue"
|
||||||
"code.gitea.io/gitea/modules/setting"
|
|
||||||
"code.gitea.io/gitea/modules/structs"
|
"code.gitea.io/gitea/modules/structs"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -31,7 +30,7 @@ func Run(t *models.Task) error {
|
|||||||
|
|
||||||
// Init will start the service to get all unfinished tasks and run them
|
// Init will start the service to get all unfinished tasks and run them
|
||||||
func Init() error {
|
func Init() error {
|
||||||
taskQueue = setting.CreateQueue("task", handle, &models.Task{})
|
taskQueue = queue.CreateQueue("task", handle, &models.Task{})
|
||||||
|
|
||||||
if taskQueue == nil {
|
if taskQueue == nil {
|
||||||
return fmt.Errorf("Unable to create Task Queue")
|
return fmt.Errorf("Unable to create Task Queue")
|
||||||
|
Loading…
Reference in New Issue
Block a user