mirror of
https://github.com/go-gitea/gitea.git
synced 2025-01-03 14:57:55 -05:00
archiver: setup infrastructure for notifying consumers of completion
This API will *not* allow consumers to subscribe to specific requests being completed, just *any* request being completed. The caller is responsible for determining if their request is satisfied and waiting again if needed.
This commit is contained in:
parent
eb15781d98
commit
d134c3f3ce
@ -28,6 +28,7 @@ import (
|
|||||||
"code.gitea.io/gitea/modules/ssh"
|
"code.gitea.io/gitea/modules/ssh"
|
||||||
"code.gitea.io/gitea/modules/task"
|
"code.gitea.io/gitea/modules/task"
|
||||||
"code.gitea.io/gitea/modules/webhook"
|
"code.gitea.io/gitea/modules/webhook"
|
||||||
|
"code.gitea.io/gitea/services/archiver"
|
||||||
"code.gitea.io/gitea/services/mailer"
|
"code.gitea.io/gitea/services/mailer"
|
||||||
mirror_service "code.gitea.io/gitea/services/mirror"
|
mirror_service "code.gitea.io/gitea/services/mirror"
|
||||||
pull_service "code.gitea.io/gitea/services/pull"
|
pull_service "code.gitea.io/gitea/services/pull"
|
||||||
@ -50,6 +51,7 @@ func checkRunMode() {
|
|||||||
// NewServices init new services
|
// NewServices init new services
|
||||||
func NewServices() {
|
func NewServices() {
|
||||||
setting.NewServices()
|
setting.NewServices()
|
||||||
|
archiver.NewContext()
|
||||||
mailer.NewContext()
|
mailer.NewContext()
|
||||||
_ = cache.NewContext()
|
_ = cache.NewContext()
|
||||||
notification.NewContext()
|
notification.NewContext()
|
||||||
|
@ -40,6 +40,7 @@ type ArchiveRequest struct {
|
|||||||
|
|
||||||
var archiveInProgress []*ArchiveRequest
|
var archiveInProgress []*ArchiveRequest
|
||||||
var archiveMutex sync.Mutex
|
var archiveMutex sync.Mutex
|
||||||
|
var archiveCond *sync.Cond
|
||||||
|
|
||||||
// These facilitate testing, by allowing the unit tests to control (to some extent)
|
// These facilitate testing, by allowing the unit tests to control (to some extent)
|
||||||
// the goroutine used for processing the queue.
|
// the goroutine used for processing the queue.
|
||||||
@ -198,30 +199,33 @@ func doArchive(r *ArchiveRequest) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Block any attempt to finalize creating a new request if we're marking
|
||||||
r.archiveComplete = true
|
r.archiveComplete = true
|
||||||
}
|
}
|
||||||
|
|
||||||
// ArchiveRepository satisfies the ArchiveRequest being passed in. Processing
|
// ArchiveRepository satisfies the ArchiveRequest being passed in. Processing
|
||||||
// will occur in a separate goroutine, as this phase may take a while to
|
// will occur in a separate goroutine, as this phase may take a while to
|
||||||
// complete. If the archive already exists, ArchiveRepository will not do
|
// complete. If the archive already exists, ArchiveRepository will not do
|
||||||
// anything.
|
// anything. In all cases, the caller should be examining the *ArchiveRequest
|
||||||
func ArchiveRepository(request *ArchiveRequest) {
|
// being returned for completion, as it may be different than the one they passed
|
||||||
if request.archiveComplete {
|
// in.
|
||||||
return
|
func ArchiveRepository(request *ArchiveRequest) *ArchiveRequest {
|
||||||
}
|
// We'll return the request that's already been enqueued if it has been
|
||||||
go func() {
|
// enqueued, or we'll immediately enqueue it if it has not been enqueued
|
||||||
// We'll take some liberties here, in that the caller may not assume that the
|
// and it is not marked complete.
|
||||||
// specific request they submitted is the one getting enqueued. We'll just drop
|
|
||||||
// it if it turns out we've already enqueued an identical request, as they'll keep
|
|
||||||
// checking back for the status anyways.
|
|
||||||
archiveMutex.Lock()
|
archiveMutex.Lock()
|
||||||
if rExisting := getArchiveRequest(request.repo, request.commit, request.archiveType); rExisting != nil {
|
if rExisting := getArchiveRequest(request.repo, request.commit, request.archiveType); rExisting != nil {
|
||||||
archiveMutex.Unlock()
|
archiveMutex.Unlock()
|
||||||
return
|
return rExisting
|
||||||
}
|
}
|
||||||
|
if request.archiveComplete {
|
||||||
|
archiveMutex.Unlock()
|
||||||
|
return request
|
||||||
|
}
|
||||||
|
|
||||||
archiveInProgress = append(archiveInProgress, request)
|
archiveInProgress = append(archiveInProgress, request)
|
||||||
archiveMutex.Unlock()
|
archiveMutex.Unlock()
|
||||||
|
go func() {
|
||||||
// Wait to start, if we have the Cond for it. This is currently only
|
// Wait to start, if we have the Cond for it. This is currently only
|
||||||
// useful for testing, so that the start and release of queued entries
|
// useful for testing, so that the start and release of queued entries
|
||||||
// can be controlled to examine the queue.
|
// can be controlled to examine the queue.
|
||||||
@ -251,6 +255,11 @@ func ArchiveRepository(request *ArchiveRequest) {
|
|||||||
// correctness.
|
// correctness.
|
||||||
archiveMutex.Lock()
|
archiveMutex.Lock()
|
||||||
defer archiveMutex.Unlock()
|
defer archiveMutex.Unlock()
|
||||||
|
// Wake up all other goroutines that may be waiting on a request to
|
||||||
|
// complete. They should all wake up, see if that particular request
|
||||||
|
// is complete, then return to waiting if it is not.
|
||||||
|
archiveCond.Broadcast()
|
||||||
|
|
||||||
idx := -1
|
idx := -1
|
||||||
for _idx, req := range archiveInProgress {
|
for _idx, req := range archiveInProgress {
|
||||||
if req == request {
|
if req == request {
|
||||||
@ -268,4 +277,33 @@ func ArchiveRepository(request *ArchiveRequest) {
|
|||||||
}
|
}
|
||||||
archiveInProgress = archiveInProgress[:lastidx]
|
archiveInProgress = archiveInProgress[:lastidx]
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
return request
|
||||||
|
}
|
||||||
|
|
||||||
|
// LockQueue will obtain the archiveMutex for the caller. This allows the
|
||||||
|
// underlying locking mechanism to remain opaque.
|
||||||
|
func LockQueue() {
|
||||||
|
archiveMutex.Lock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnlockQueue will release the archiveMutex for the caller, again allowing the
|
||||||
|
// underlying locking mechanism to remain opaque.
|
||||||
|
func UnlockQueue() {
|
||||||
|
archiveMutex.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// WaitForCompletion should be called with the queue locked (LockQueue), and will
|
||||||
|
// return with the queue lock held when a single archive request has finished.
|
||||||
|
// There is currently no API for getting notified of a particular request being
|
||||||
|
// completed.
|
||||||
|
func WaitForCompletion() {
|
||||||
|
archiveCond.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewContext will initialize local state, e.g. primitives needed to be able to
|
||||||
|
// synchronize with the lock queue and allow callers to wait for an archive to
|
||||||
|
// finish.
|
||||||
|
func NewContext() {
|
||||||
|
archiveCond = sync.NewCond(&archiveMutex)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user