From b1c9fa7f1a6f73982462fa99dd411fbf1dd14f04 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Mon, 30 Dec 2019 15:54:19 +0000 Subject: [PATCH] Add MaxWorker settings to queues --- modules/queue/manager.go | 100 +++++++++++++++++------ modules/queue/queue_channel.go | 20 +++-- modules/queue/queue_channel_test.go | 2 + modules/queue/queue_disk.go | 20 +++-- modules/queue/queue_disk_channel.go | 5 +- modules/queue/queue_disk_channel_test.go | 2 + modules/queue/queue_disk_test.go | 2 + modules/queue/queue_redis.go | 20 +++-- modules/queue/queue_wrapped.go | 2 +- modules/queue/workerpool.go | 87 ++++++++++++++++---- modules/setting/queue.go | 4 + options/locale/locale_en-US.ini | 22 ++++- routers/admin/admin.go | 68 +++++++++++++++ routers/routes/routes.go | 1 + templates/admin/queue.tmpl | 30 +++++++ 15 files changed, 312 insertions(+), 73 deletions(-) diff --git a/modules/queue/manager.go b/modules/queue/manager.go index 81478019e5..a2deb8ff7c 100644 --- a/modules/queue/manager.go +++ b/modules/queue/manager.go @@ -28,17 +28,28 @@ type Manager struct { // Description represents a working queue inheriting from Gitea. type Description struct { - mutex sync.Mutex - QID int64 - Queue Queue - Type Type - Name string - Configuration interface{} - ExemplarType string - addWorkers func(number int, timeout time.Duration) context.CancelFunc - numberOfWorkers func() int - counter int64 - PoolWorkers map[int64]*PoolWorkers + mutex sync.Mutex + QID int64 + Queue Queue + Type Type + Name string + Configuration interface{} + ExemplarType string + Pool PoolManager + counter int64 + PoolWorkers map[int64]*PoolWorkers +} + +// PoolManager is a simple interface to get certain details from a worker pool +type PoolManager interface { + AddWorkers(number int, timeout time.Duration) context.CancelFunc + NumberOfWorkers() int + MaxNumberOfWorkers() int + SetMaxNumberOfWorkers(int) + BoostTimeout() time.Duration + BlockTimeout() time.Duration + BoostWorkers() int + SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) } // DescriptionList implements the sort.Interface @@ -76,18 +87,16 @@ func (m *Manager) Add(queue Queue, t Type, configuration, exemplar interface{}, - addWorkers func(number int, timeout time.Duration) context.CancelFunc, - numberOfWorkers func() int) int64 { + pool PoolManager) int64 { cfg, _ := json.Marshal(configuration) desc := &Description{ - Queue: queue, - Type: t, - Configuration: string(cfg), - ExemplarType: reflect.TypeOf(exemplar).String(), - PoolWorkers: make(map[int64]*PoolWorkers), - addWorkers: addWorkers, - numberOfWorkers: numberOfWorkers, + Queue: queue, + Type: t, + Configuration: string(cfg), + ExemplarType: reflect.TypeOf(exemplar).String(), + PoolWorkers: make(map[int64]*PoolWorkers), + Pool: pool, } m.mutex.Lock() m.counter++ @@ -177,20 +186,61 @@ func (q *Description) RemoveWorkers(pid int64) { } // AddWorkers adds workers to the queue if it has registered an add worker function -func (q *Description) AddWorkers(number int, timeout time.Duration) { - if q.addWorkers != nil { - _ = q.addWorkers(number, timeout) +func (q *Description) AddWorkers(number int, timeout time.Duration) context.CancelFunc { + if q.Pool != nil { + // the cancel will be added to the pool workers description above + return q.Pool.AddWorkers(number, timeout) } + return nil } // NumberOfWorkers returns the number of workers in the queue func (q *Description) NumberOfWorkers() int { - if q.numberOfWorkers != nil { - return q.numberOfWorkers() + if q.Pool != nil { + return q.Pool.NumberOfWorkers() } return -1 } +// MaxNumberOfWorkers returns the maximum number of workers for the pool +func (q *Description) MaxNumberOfWorkers() int { + if q.Pool != nil { + return q.Pool.MaxNumberOfWorkers() + } + return 0 +} + +// BoostWorkers returns the number of workers for a boost +func (q *Description) BoostWorkers() int { + if q.Pool != nil { + return q.Pool.BoostWorkers() + } + return -1 +} + +// BoostTimeout returns the timeout of the next boost +func (q *Description) BoostTimeout() time.Duration { + if q.Pool != nil { + return q.Pool.BoostTimeout() + } + return 0 +} + +// BlockTimeout returns the timeout til the next boost +func (q *Description) BlockTimeout() time.Duration { + if q.Pool != nil { + return q.Pool.BlockTimeout() + } + return 0 +} + +// SetSettings sets the setable boost values +func (q *Description) SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) { + if q.Pool != nil { + q.Pool.SetSettings(maxNumberOfWorkers, boostWorkers, timeout) + } +} + func (l DescriptionList) Len() int { return len(l) } diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go index 265a5c88f1..5f41ef7574 100644 --- a/modules/queue/queue_channel.go +++ b/modules/queue/queue_channel.go @@ -21,6 +21,7 @@ type ChannelQueueConfiguration struct { QueueLength int BatchLength int Workers int + MaxWorkers int BlockTimeout time.Duration BoostTimeout time.Duration BoostWorkers int @@ -50,20 +51,21 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro ctx, cancel := context.WithCancel(context.Background()) queue := &ChannelQueue{ pool: &WorkerPool{ - baseCtx: ctx, - cancel: cancel, - batchLength: config.BatchLength, - handle: handle, - dataChan: dataChan, - blockTimeout: config.BlockTimeout, - boostTimeout: config.BoostTimeout, - boostWorkers: config.BoostWorkers, + baseCtx: ctx, + cancel: cancel, + batchLength: config.BatchLength, + handle: handle, + dataChan: dataChan, + blockTimeout: config.BlockTimeout, + boostTimeout: config.BoostTimeout, + boostWorkers: config.BoostWorkers, + maxNumberOfWorkers: config.MaxWorkers, }, exemplar: exemplar, workers: config.Workers, name: config.Name, } - queue.pool.qid = GetManager().Add(queue, ChannelQueueType, config, exemplar, queue.pool.AddWorkers, queue.pool.NumberOfWorkers) + queue.pool.qid = GetManager().Add(queue, ChannelQueueType, config, exemplar, queue.pool) return queue, nil } diff --git a/modules/queue/queue_channel_test.go b/modules/queue/queue_channel_test.go index c04407aa24..fafc1e3303 100644 --- a/modules/queue/queue_channel_test.go +++ b/modules/queue/queue_channel_test.go @@ -27,6 +27,7 @@ func TestChannelQueue(t *testing.T) { ChannelQueueConfiguration{ QueueLength: 20, Workers: 1, + MaxWorkers: 10, BlockTimeout: 1 * time.Second, BoostTimeout: 5 * time.Minute, BoostWorkers: 5, @@ -62,6 +63,7 @@ func TestChannelQueue_Batch(t *testing.T) { QueueLength: 20, BatchLength: 2, Workers: 1, + MaxWorkers: 10, BlockTimeout: 1 * time.Second, BoostTimeout: 5 * time.Minute, BoostWorkers: 5, diff --git a/modules/queue/queue_disk.go b/modules/queue/queue_disk.go index 41e8a9e7c0..b74ce378b0 100644 --- a/modules/queue/queue_disk.go +++ b/modules/queue/queue_disk.go @@ -25,6 +25,7 @@ type LevelQueueConfiguration struct { QueueLength int BatchLength int Workers int + MaxWorkers int BlockTimeout time.Duration BoostTimeout time.Duration BoostWorkers int @@ -60,14 +61,15 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) queue := &LevelQueue{ pool: &WorkerPool{ - baseCtx: ctx, - cancel: cancel, - batchLength: config.BatchLength, - handle: handle, - dataChan: dataChan, - blockTimeout: config.BlockTimeout, - boostTimeout: config.BoostTimeout, - boostWorkers: config.BoostWorkers, + baseCtx: ctx, + cancel: cancel, + batchLength: config.BatchLength, + handle: handle, + dataChan: dataChan, + blockTimeout: config.BlockTimeout, + boostTimeout: config.BoostTimeout, + boostWorkers: config.BoostWorkers, + maxNumberOfWorkers: config.MaxWorkers, }, queue: internal, exemplar: exemplar, @@ -76,7 +78,7 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) workers: config.Workers, name: config.Name, } - queue.pool.qid = GetManager().Add(queue, LevelQueueType, config, exemplar, queue.pool.AddWorkers, queue.pool.NumberOfWorkers) + queue.pool.qid = GetManager().Add(queue, LevelQueueType, config, exemplar, queue.pool) return queue, nil } diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go index 884fc410df..e2ee0bda56 100644 --- a/modules/queue/queue_disk_channel.go +++ b/modules/queue/queue_disk_channel.go @@ -23,6 +23,7 @@ type PersistableChannelQueueConfiguration struct { Timeout time.Duration MaxAttempts int Workers int + MaxWorkers int BlockTimeout time.Duration BoostTimeout time.Duration BoostWorkers int @@ -48,6 +49,7 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( QueueLength: config.QueueLength, BatchLength: config.BatchLength, Workers: config.Workers, + MaxWorkers: config.MaxWorkers, BlockTimeout: config.BlockTimeout, BoostTimeout: config.BoostTimeout, BoostWorkers: config.BoostWorkers, @@ -63,6 +65,7 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( QueueLength: config.QueueLength, BatchLength: config.BatchLength, Workers: 1, + MaxWorkers: 6, BlockTimeout: 1 * time.Second, BoostTimeout: 5 * time.Minute, BoostWorkers: 5, @@ -96,7 +99,7 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( }, closed: make(chan struct{}), } - _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar, nil, nil) + _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar, nil) return queue, nil } diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go index 01a90ebcfb..4ef68961c6 100644 --- a/modules/queue/queue_disk_channel_test.go +++ b/modules/queue/queue_disk_channel_test.go @@ -36,6 +36,7 @@ func TestPersistableChannelQueue(t *testing.T) { BatchLength: 2, QueueLength: 20, Workers: 1, + MaxWorkers: 10, }, &testData{}) assert.NoError(t, err) @@ -89,6 +90,7 @@ func TestPersistableChannelQueue(t *testing.T) { BatchLength: 2, QueueLength: 20, Workers: 1, + MaxWorkers: 10, }, &testData{}) assert.NoError(t, err) diff --git a/modules/queue/queue_disk_test.go b/modules/queue/queue_disk_test.go index 03de451760..c5959d606f 100644 --- a/modules/queue/queue_disk_test.go +++ b/modules/queue/queue_disk_test.go @@ -35,6 +35,7 @@ func TestLevelQueue(t *testing.T) { DataDir: tmpDir, BatchLength: 2, Workers: 1, + MaxWorkers: 10, QueueLength: 20, BlockTimeout: 1 * time.Second, BoostTimeout: 5 * time.Minute, @@ -94,6 +95,7 @@ func TestLevelQueue(t *testing.T) { DataDir: tmpDir, BatchLength: 2, Workers: 1, + MaxWorkers: 10, QueueLength: 20, BlockTimeout: 1 * time.Second, BoostTimeout: 5 * time.Minute, diff --git a/modules/queue/queue_redis.go b/modules/queue/queue_redis.go index 724e22b7b5..21fa4462b8 100644 --- a/modules/queue/queue_redis.go +++ b/modules/queue/queue_redis.go @@ -49,6 +49,7 @@ type RedisQueueConfiguration struct { QueueLength int QueueName string Workers int + MaxWorkers int BlockTimeout time.Duration BoostTimeout time.Duration BoostWorkers int @@ -70,14 +71,15 @@ func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) var queue = &RedisQueue{ pool: &WorkerPool{ - baseCtx: ctx, - cancel: cancel, - batchLength: config.BatchLength, - handle: handle, - dataChan: dataChan, - blockTimeout: config.BlockTimeout, - boostTimeout: config.BoostTimeout, - boostWorkers: config.BoostWorkers, + baseCtx: ctx, + cancel: cancel, + batchLength: config.BatchLength, + handle: handle, + dataChan: dataChan, + blockTimeout: config.BlockTimeout, + boostTimeout: config.BoostTimeout, + boostWorkers: config.BoostWorkers, + maxNumberOfWorkers: config.MaxWorkers, }, queueName: config.QueueName, exemplar: exemplar, @@ -102,7 +104,7 @@ func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) if err := queue.client.Ping().Err(); err != nil { return nil, err } - queue.pool.qid = GetManager().Add(queue, RedisQueueType, config, exemplar, queue.pool.AddWorkers, queue.pool.NumberOfWorkers) + queue.pool.qid = GetManager().Add(queue, RedisQueueType, config, exemplar, queue.pool) return queue, nil } diff --git a/modules/queue/queue_wrapped.go b/modules/queue/queue_wrapped.go index 46557ea318..c218749b65 100644 --- a/modules/queue/queue_wrapped.go +++ b/modules/queue/queue_wrapped.go @@ -123,7 +123,7 @@ func NewWrappedQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro name: config.Name, }, } - _ = GetManager().Add(queue, WrappedQueueType, config, exemplar, nil, nil) + _ = GetManager().Add(queue, WrappedQueueType, config, exemplar, nil) return queue, nil } diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go index fe05e7fe6e..98a68cd041 100644 --- a/modules/queue/workerpool.go +++ b/modules/queue/workerpool.go @@ -14,24 +14,25 @@ import ( // WorkerPool takes type WorkerPool struct { - lock sync.Mutex - baseCtx context.Context - cancel context.CancelFunc - cond *sync.Cond - qid int64 - numberOfWorkers int - batchLength int - handle HandlerFunc - dataChan chan Data - blockTimeout time.Duration - boostTimeout time.Duration - boostWorkers int + lock sync.Mutex + baseCtx context.Context + cancel context.CancelFunc + cond *sync.Cond + qid int64 + maxNumberOfWorkers int + numberOfWorkers int + batchLength int + handle HandlerFunc + dataChan chan Data + blockTimeout time.Duration + boostTimeout time.Duration + boostWorkers int } // Push pushes the data to the internal channel func (p *WorkerPool) Push(data Data) { p.lock.Lock() - if p.blockTimeout > 0 && p.boostTimeout > 0 { + if p.blockTimeout > 0 && p.boostTimeout > 0 && (p.numberOfWorkers <= p.maxNumberOfWorkers || p.maxNumberOfWorkers < 0) { p.lock.Unlock() p.pushBoost(data) } else { @@ -63,7 +64,7 @@ func (p *WorkerPool) pushBoost(data Data) { } case <-timer.C: p.lock.Lock() - if p.blockTimeout > ourTimeout { + if p.blockTimeout > ourTimeout || (p.numberOfWorkers > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0) { p.lock.Unlock() p.dataChan <- data return @@ -71,11 +72,15 @@ func (p *WorkerPool) pushBoost(data Data) { p.blockTimeout *= 2 ctx, cancel := context.WithCancel(p.baseCtx) desc := GetManager().GetDescription(p.qid) + boost := p.boostWorkers + if (boost+p.numberOfWorkers) > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0 { + boost = p.maxNumberOfWorkers - p.numberOfWorkers + } if desc != nil { - log.Warn("WorkerPool: %d (for %s) Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, desc.Name, ourTimeout, p.boostWorkers, p.boostTimeout, p.blockTimeout) + log.Warn("WorkerPool: %d (for %s) Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, desc.Name, ourTimeout, boost, p.boostTimeout, p.blockTimeout) start := time.Now() - pid := desc.RegisterWorkers(p.boostWorkers, start, false, start, cancel) + pid := desc.RegisterWorkers(boost, start, false, start, cancel) go func() { <-ctx.Done() desc.RemoveWorkers(pid) @@ -91,7 +96,7 @@ func (p *WorkerPool) pushBoost(data Data) { p.blockTimeout /= 2 p.lock.Unlock() }() - p.addWorkers(ctx, p.boostWorkers) + p.addWorkers(ctx, boost) p.lock.Unlock() p.dataChan <- data } @@ -105,7 +110,53 @@ func (p *WorkerPool) NumberOfWorkers() int { return p.numberOfWorkers } -// AddWorkers adds workers to the pool +// MaxNumberOfWorkers returns the maximum number of workers automatically added to the pool +func (p *WorkerPool) MaxNumberOfWorkers() int { + p.lock.Lock() + defer p.lock.Unlock() + return p.maxNumberOfWorkers +} + +// BoostWorkers returns the number of workers for a boost +func (p *WorkerPool) BoostWorkers() int { + p.lock.Lock() + defer p.lock.Unlock() + return p.boostWorkers +} + +// BoostTimeout returns the timeout of the next boost +func (p *WorkerPool) BoostTimeout() time.Duration { + p.lock.Lock() + defer p.lock.Unlock() + return p.boostTimeout +} + +// BlockTimeout returns the timeout til the next boost +func (p *WorkerPool) BlockTimeout() time.Duration { + p.lock.Lock() + defer p.lock.Unlock() + return p.blockTimeout +} + +// SetSettings sets the setable boost values +func (p *WorkerPool) SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) { + p.lock.Lock() + defer p.lock.Unlock() + p.maxNumberOfWorkers = maxNumberOfWorkers + p.boostWorkers = boostWorkers + p.boostTimeout = timeout +} + +// SetMaxNumberOfWorkers sets the maximum number of workers automatically added to the pool +// Changing this number will not change the number of current workers but will change the limit +// for future additions +func (p *WorkerPool) SetMaxNumberOfWorkers(newMax int) { + p.lock.Lock() + defer p.lock.Unlock() + p.maxNumberOfWorkers = newMax +} + +// AddWorkers adds workers to the pool - this allows the number of workers to go above the limit func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.CancelFunc { var ctx context.Context var cancel context.CancelFunc diff --git a/modules/setting/queue.go b/modules/setting/queue.go index 778ddeb217..0170834391 100644 --- a/modules/setting/queue.go +++ b/modules/setting/queue.go @@ -31,6 +31,7 @@ type queueSettings struct { MaxAttempts int Timeout time.Duration Workers int + MaxWorkers int BlockTimeout time.Duration BoostTimeout time.Duration BoostWorkers int @@ -53,6 +54,7 @@ func CreateQueue(name string, handle queue.HandlerFunc, exemplar interface{}) qu opts["DBIndex"] = q.DBIndex opts["QueueName"] = q.QueueName opts["Workers"] = q.Workers + opts["MaxWorkers"] = q.MaxWorkers opts["BlockTimeout"] = q.BlockTimeout opts["BoostTimeout"] = q.BoostTimeout opts["BoostWorkers"] = q.BoostWorkers @@ -108,6 +110,7 @@ func getQueueSettings(name string) queueSettings { q.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(Queue.MaxAttempts) q.Timeout = sec.Key("TIMEOUT").MustDuration(Queue.Timeout) q.Workers = sec.Key("WORKERS").MustInt(Queue.Workers) + q.MaxWorkers = sec.Key("MAX_WORKERS").MustInt(Queue.MaxWorkers) q.BlockTimeout = sec.Key("BLOCK_TIMEOUT").MustDuration(Queue.BlockTimeout) q.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(Queue.BoostTimeout) q.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(Queue.BoostWorkers) @@ -135,6 +138,7 @@ func NewQueueService() { Queue.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(10) Queue.Timeout = sec.Key("TIMEOUT").MustDuration(GracefulHammerTime + 30*time.Second) Queue.Workers = sec.Key("WORKERS").MustInt(1) + Queue.MaxWorkers = sec.Key("MAX_WORKERS").MustInt(10) Queue.BlockTimeout = sec.Key("BLOCK_TIMEOUT").MustDuration(1 * time.Second) Queue.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(5 * time.Minute) Queue.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(5) diff --git a/options/locale/locale_en-US.ini b/options/locale/locale_en-US.ini index d6a96b55a5..4e30cf08f6 100644 --- a/options/locale/locale_en-US.ini +++ b/options/locale/locale_en-US.ini @@ -1408,7 +1408,7 @@ settings.protect_check_status_contexts_list = Status checks found in the last we settings.protect_required_approvals = Required approvals: settings.protect_required_approvals_desc = Allow only to merge pull request with enough positive reviews. settings.protect_approvals_whitelist_enabled = Restrict approvals to whitelisted users or teams -settings.protect_approvals_whitelist_enabled_desc = Only reviews from whitelisted users or teams will count to the required approvals. Without approval whitelist, reviews from anyone with write access count to the required approvals. +settings.protect_approvals_whitelist_enabled_desc = Only reviews from whitelisted users or teams will count to the required approvals. Without approval whitelist, reviews from anyone with write access count to the required approvals. settings.protect_approvals_whitelist_users = Whitelisted reviewers: settings.protect_approvals_whitelist_teams = Whitelisted teams for reviews: settings.add_protected_branch = Enable protection @@ -2028,6 +2028,7 @@ monitor.queue.name = Name monitor.queue.type = Type monitor.queue.exemplar = Exemplar Type monitor.queue.numberworkers = Number of Workers +monitor.queue.maxnumberworkers = Max Number of Workers monitor.queue.review = Review Config monitor.queue.review_add = Review/Add Workers monitor.queue.configuration = Initial Configuration @@ -2043,7 +2044,26 @@ monitor.queue.pool.addworkers.numberworkers.placeholder = Number of Workers monitor.queue.pool.addworkers.timeout.placeholder = Set to 0 for no timeout monitor.queue.pool.addworkers.mustnumbergreaterzero = Number of Workers to add must be greater than zero monitor.queue.pool.addworkers.musttimeoutduration = Timeout must be a golang duration eg. 5m or be 0 + +monitor.queue.settings.title = Pool Settings +monitor.queue.settings.desc = Pools dynamically grow with a boost in response to their worker queue blocking. These changes will not affect current worker groups. +monitor.queue.settings.timeout = Boost Timeout +monitor.queue.settings.timeout.placeholder = Currently %[1]v +monitor.queue.settings.timeout.error = Timeout must be a golang duration eg. 5m or be 0 +monitor.queue.settings.numberworkers = Boost Number of Workers +monitor.queue.settings.numberworkers.placeholder = Currently %[1]d +monitor.queue.settings.numberworkers.error = Number of Workers to add must be greater than or equal to zero +monitor.queue.settings.maxnumberworkers = Max Number of workers +monitor.queue.settings.maxnumberworkers.placeholder = Currently %[1]d +monitor.queue.settings.maxnumberworkers.error = Max number of workers must be a number +monitor.queue.settings.submit = Change Settings +monitor.queue.settings.changed = Settings Updated +monitor.queue.settings.blocktimeout = Current Block Timeout +monitor.queue.settings.blocktimeout.value = %[1]v + +monitor.queue.pool.none = This queue does not have a Pool monitor.queue.pool.added = Worker Group Added +monitor.queue.pool.max_changed = Maximum number of workers changed monitor.queue.pool.workers.title = Active Worker Groups monitor.queue.pool.workers.none = No worker groups. monitor.queue.pool.cancel = Shutdown Worker Group diff --git a/routers/admin/admin.go b/routers/admin/admin.go index 7fc57edf31..299da8b46c 100644 --- a/routers/admin/admin.go +++ b/routers/admin/admin.go @@ -11,6 +11,7 @@ import ( "net/url" "os" "runtime" + "strconv" "strings" "time" @@ -421,7 +422,74 @@ func AddWorkers(ctx *context.Context) { ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid)) return } + if desc.Pool == nil { + ctx.Flash.Error(ctx.Tr("admin.monitor.queue.pool.none")) + ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid)) + return + } desc.AddWorkers(number, timeout) ctx.Flash.Success(ctx.Tr("admin.monitor.queue.pool.added")) ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid)) } + +// SetQueueSettings sets the maximum number of workers for this queue +func SetQueueSettings(ctx *context.Context) { + qid := ctx.ParamsInt64("qid") + desc := queue.GetManager().GetDescription(qid) + if desc == nil { + ctx.Status(404) + return + } + if desc.Pool == nil { + ctx.Flash.Error(ctx.Tr("admin.monitor.queue.pool.none")) + ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid)) + return + } + + maxNumberStr := ctx.Query("max-number") + numberStr := ctx.Query("number") + timeoutStr := ctx.Query("timeout") + + var err error + var maxNumber, number int + var timeout time.Duration + if len(maxNumberStr) > 0 { + maxNumber, err = strconv.Atoi(maxNumberStr) + if err != nil { + ctx.Flash.Error(ctx.Tr("admin.monitor.queue.settings.maxnumberworkers.error")) + ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid)) + return + } + if maxNumber < -1 { + maxNumber = -1 + } + } else { + maxNumber = desc.MaxNumberOfWorkers() + } + + if len(numberStr) > 0 { + number, err = strconv.Atoi(numberStr) + if err != nil || number < 0 { + ctx.Flash.Error(ctx.Tr("admin.monitor.queue.settings.numberworkers.error")) + ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid)) + return + } + } else { + number = desc.BoostWorkers() + } + + if len(timeoutStr) > 0 { + timeout, err = time.ParseDuration(timeoutStr) + if err != nil { + ctx.Flash.Error(ctx.Tr("admin.monitor.queue.settings.timeout.error")) + ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid)) + return + } + } else { + timeout = desc.Pool.BoostTimeout() + } + + desc.SetSettings(maxNumber, number, timeout) + ctx.Flash.Success(ctx.Tr("admin.monitor.queue.settings.changed")) + ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid)) +} diff --git a/routers/routes/routes.go b/routers/routes/routes.go index e97a932692..11cc297580 100644 --- a/routers/routes/routes.go +++ b/routers/routes/routes.go @@ -416,6 +416,7 @@ func RegisterRoutes(m *macaron.Macaron) { m.Post("/cancel/:pid", admin.MonitorCancel) m.Group("/queue/:qid", func() { m.Get("", admin.Queue) + m.Post("/set", admin.SetQueueSettings) m.Post("/add", admin.AddWorkers) m.Post("/cancel/:pid", admin.WorkerCancel) }) diff --git a/templates/admin/queue.tmpl b/templates/admin/queue.tmpl index ab84228243..4f422210e7 100644 --- a/templates/admin/queue.tmpl +++ b/templates/admin/queue.tmpl @@ -14,6 +14,7 @@ {{.i18n.Tr "admin.monitor.queue.type"}} {{.i18n.Tr "admin.monitor.queue.exemplar"}} {{.i18n.Tr "admin.monitor.queue.numberworkers"}} + {{.i18n.Tr "admin.monitor.queue.maxnumberworkers"}} @@ -22,6 +23,7 @@ {{.Queue.Type}} {{.Queue.ExemplarType}} {{$sum := .Queue.NumberOfWorkers}}{{if lt $sum 0}}-{{else}}{{$sum}}{{end}} + {{if lt $sum 0}}-{{else}}{{.Queue.MaxNumberOfWorkers}}{{end}} @@ -40,6 +42,34 @@ {{end}} {{else}} +

+ {{.i18n.Tr "admin.monitor.queue.settings.title"}} +

+
+

{{.i18n.Tr "admin.monitor.queue.settings.desc"}}

+
+ {{$.CsrfTokenHtml}} +
+
+ + +
+
+ + +
+
+ + +
+
+ + {{.i18n.Tr "admin.monitor.queue.settings.blocktimeout.value" .Queue.BlockTimeout}} +
+ +
+
+

{{.i18n.Tr "admin.monitor.queue.pool.addworkers.title"}}