From 6785d390f7e8ffbbb836ae1464c8fc2dbd4c9efd Mon Sep 17 00:00:00 2001 From: Kyle Evans Date: Tue, 5 May 2020 16:36:04 -0500 Subject: [PATCH] archiver: restructure a little bit to facilitate testing This introduces two sync.Cond pointers to the archiver package. If they're non-nil when we go to process a request, we'll wait until signalled (at all) to proceed. The tests will then create the sync.Cond so that it can signal at-will and sanity-check the state of the queue at different phases. The author believes that nil-checking these two sync.Cond pointers on every archive processing will introduce minimal overhead with no impact on maintainability. --- services/archiver/archiver.go | 21 +++++++ services/archiver/archiver_test.go | 92 ++++++++++++++++++++++++++---- 2 files changed, 103 insertions(+), 10 deletions(-) diff --git a/services/archiver/archiver.go b/services/archiver/archiver.go index 30b7cc6d0c..ebc060e943 100644 --- a/services/archiver/archiver.go +++ b/services/archiver/archiver.go @@ -41,6 +41,12 @@ type ArchiveRequest struct { var archiveInProgress []*ArchiveRequest var archiveMutex sync.Mutex +// These facilitate testing, by allowing the unit tests to control (to some extent) +// the goroutine used for processing the queue. +var archiveQueueMutex *sync.Mutex +var archiveQueueStartCond *sync.Cond +var archiveQueueReleaseCond *sync.Cond + // GetArchivePath returns the path from which we can serve this archive. func (aReq *ArchiveRequest) GetArchivePath() string { return aReq.archivePath @@ -216,11 +222,26 @@ func ArchiveRepository(request *ArchiveRequest) { archiveInProgress = append(archiveInProgress, request) archiveMutex.Unlock() + // Wait to start, if we have the Cond for it. This is currently only + // useful for testing, so that the start and release of queued entries + // can be controlled to examine the queue. + if archiveQueueStartCond != nil { + archiveQueueMutex.Lock() + archiveQueueStartCond.Wait() + archiveQueueMutex.Unlock() + } + // Drop the mutex while we process the request. This may take a long // time, and it's not necessary now that we've added the reequest to // archiveInProgress. doArchive(request) + if archiveQueueReleaseCond != nil { + archiveQueueMutex.Lock() + archiveQueueReleaseCond.Wait() + archiveQueueMutex.Unlock() + } + // Purge this request from the list. To do so, we'll just take the // index at which we ended up at and swap the final element into that // position, then chop off the now-redundant final element. The slice diff --git a/services/archiver/archiver_test.go b/services/archiver/archiver_test.go index 98ec4050e4..4b6574bfd8 100644 --- a/services/archiver/archiver_test.go +++ b/services/archiver/archiver_test.go @@ -6,6 +6,7 @@ package archiver import ( "path/filepath" + "sync" "testing" "time" @@ -16,8 +17,46 @@ import ( "github.com/unknwon/com" ) +var queueMutex sync.Mutex + func TestMain(m *testing.M) { models.MainTest(m, filepath.Join("..", "..")) + + archiveQueueMutex = &queueMutex + archiveQueueStartCond = sync.NewCond(&queueMutex) + archiveQueueReleaseCond = sync.NewCond(&queueMutex) +} + +func allComplete(inFlight []*ArchiveRequest) bool { + for _, req := range inFlight { + if !req.IsComplete() { + return false + } + } + + return true +} + +func releaseOneEntry(t *testing.T, inFlight []*ArchiveRequest) { + var nowQueued, numQueued int + + numQueued = len(archiveInProgress) + + // Release one, then wait up to 3 seconds for it to complete. + archiveQueueReleaseCond.Signal() + timeout := time.Now().Add(3 * time.Second) + for { + nowQueued = len(archiveInProgress) + if nowQueued != numQueued || time.Now().After(timeout) { + break + } + } + + // Make sure we didn't just timeout. + assert.NotEqual(t, nowQueued, numQueued) + + // Also make sure that we released only one. + assert.Equal(t, nowQueued, numQueued + 1) } func TestArchive_Basic(t *testing.T) { @@ -59,14 +98,31 @@ func TestArchive_Basic(t *testing.T) { secondReq := DeriveRequestFrom(ctx, secondCommit+".zip") assert.NotNil(t, secondReq) - ArchiveRepository(zipReq) - ArchiveRepository(tgzReq) - ArchiveRepository(secondReq) + inFlight := make([]*ArchiveRequest, 3) + inFlight[0] = zipReq + inFlight[1] = tgzReq + inFlight[2] = secondReq - // Wait for those requests to complete, time out after 8 seconds. + ArchiveRepository(zipReq) + assert.Equal(t, len(archiveInProgress), 1) + ArchiveRepository(tgzReq) + assert.Equal(t, len(archiveInProgress), 2) + ArchiveRepository(secondReq) + assert.Equal(t, len(archiveInProgress), 3) + + // Make sure sending an unprocessed request through doesn't affect the queue + // count. + ArchiveRepository(zipReq) + assert.Equal(t, len(archiveInProgress), 3) + + // Release them all, they'll then stall at the archiveQueueReleaseCond while + // we examine the queue state. + archiveQueueStartCond.Broadcast() + + // 8 second timeout for them all to complete. timeout := time.Now().Add(8 * time.Second) for { - if zipReq.IsComplete() && tgzReq.IsComplete() && secondReq.IsComplete() { + if allComplete(inFlight) { break } else if time.Now().After(timeout) { break @@ -80,8 +136,9 @@ func TestArchive_Basic(t *testing.T) { assert.True(t, com.IsExist(tgzReq.GetArchivePath())) assert.True(t, com.IsExist(secondReq.GetArchivePath())) - // The queue should also be drained, if all requests have completed. - assert.Equal(t, len(archiveInProgress), 0) + // Queues should not have drained yet, because we haven't released them. + // Do so now. + assert.Equal(t, len(archiveInProgress), 3) zipReq2 := DeriveRequestFrom(ctx, firstCommit+".zip") // After completion, zipReq should have dropped out of the queue. Make sure @@ -90,10 +147,25 @@ func TestArchive_Basic(t *testing.T) { assert.Equal(t, zipReq, zipReq2) assert.False(t, zipReq == zipReq2) - // Make sure we can submit this follow-up request with no side-effects, to - // the extent that we can. + // We still have the other three stalled at completion, waiting to remove + // from archiveInProgress. Try to submit this new one before its + // predecessor has cleared out of the queue. ArchiveRepository(zipReq2) - assert.Equal(t, zipReq, zipReq2) + + // Make sure we didn't enqueue anything from this new one, and that the + // queue hasn't changed. + assert.Equal(t, len(archiveInProgress), 3) + + for _, req := range archiveInProgress { + assert.False(t, req == zipReq2) + } + + // Make sure the queue drains properly + releaseOneEntry(t, inFlight) + assert.Equal(t, len(archiveInProgress), 2) + releaseOneEntry(t, inFlight) + assert.Equal(t, len(archiveInProgress), 1) + releaseOneEntry(t, inFlight) assert.Equal(t, len(archiveInProgress), 0) // Same commit, different compression formats should have different names.