1
0

Merge pull request #494 from worktycho/threadsafequeue

refactored the chunk queue into a seperate class
This commit is contained in:
Mattes D 2014-01-04 03:52:58 -08:00
commit 4d7f213998
7 changed files with 220 additions and 145 deletions

1
.gitignore vendored
View File

@ -19,6 +19,7 @@ cloc.xsl
## emacs
*.*~
*~
*.orig
# world inside source
ChunkWorx.ini

View File

@ -35,6 +35,9 @@ CMAKE_COMMAND = /usr/bin/cmake
# The command to remove a file.
RM = /usr/bin/cmake -E remove -f
# Escaping for special characters.
EQUALS = =
# The top-level source directory on which CMake was run.
CMAKE_SOURCE_DIR = /home/tycho/MCServer

View File

@ -1,31 +1,151 @@
// Queue.h
// Implements the cQueue class representing a thread safe queue
#pragma once
/*
Items can be added multiple times to a queue, there are two functions for
adding, EnqueueItem() and EnqueueItemIfNotPresent(). The first one always
enqueues the specified item, the second one checks if the item is already
present and only queues it if it isn't.
Usage:
To create a queue of type T, instantiate a cQueue<T> object. You can also
modify the behavior of the queue when deleting items and when adding items
that are already in the queue by providing a second parameter, a class that
implements the functions Delete() and Combine(). An example is given in
cQueueFuncs and is used as the default behavior.
*/
// this empty struct allows for the callback functions to be inlined
template<class T>
class cDeleter
struct cQueueFuncs
{
public:
// Called when an Item is deleted form the queue without being returned
static void Delete(T) {};
// Called when an Item is inserted with EnqueueItemIfNotPresent and
// there is another equal value already inserted
static void Combine(T& a_existing, const T& a_new) {};
};
template<class T, class D = cDeleter<T>>
template<class ItemType, class Funcs = cQueueFuncs<ItemType> >
class cQueue
{
// internal typedef for a List of Items
typedef typename std::list<ItemType> ListType;
// magic typedef to persuade clang that the iterator is a type
typedef typename ListType::iterator iterator;
public:
cQueue(int warnsize);
cQueue(cQueue<T>& queue);
~cQueue();
cQueue() {}
~cQueue() {}
// Enqueues an item to the queue, may block if other threads are accessing
// the queue.
void EnqueueItem(ItemType a_item)
{
cCSLock Lock(m_CS);
m_contents.push_back(a_item);
m_evtAdded.Set();
}
// Enqueues an item to the queue if not already present as determined with
// operator ==. Will block other threads from accessing the queue.
void EnqueueItemIfNotPresent(ItemType a_item)
{
cCSLock Lock(m_CS);
for (iterator itr = m_contents.begin(); itr != m_contents.end(); ++itr)
{
if((*itr) == a_item) {
Funcs funcTable;
funcTable.Combine(*itr,a_item);
return;
}
}
m_contents.push_back(a_item);
m_evtAdded.Set();
}
// Dequeues an Item from the queue if any are present. Returns true if
// successful. Value of item is undefined if Dequeuing was unsuccessful.
bool TryDequeueItem(ItemType& item)
{
cCSLock Lock(m_CS);
if (m_contents.size() == 0)
{
return false;
}
item = m_contents.front();
m_contents.pop_front();
m_evtRemoved.Set();
return true;
}
// Dequeues an Item from the Queue, blocking until an Item is Available.
ItemType DequeueItem()
{
cCSLock Lock(m_CS);
while (m_contents.size() == 0)
{
cCSUnlock Unlock(m_CS);
m_evtAdded.Wait();
}
ItemType item = m_contents.front();
m_contents.pop_front();
m_evtRemoved.Set();
return item;
}
// Blocks Until the queue is Empty, Has a slight race condition which may
// cause it to miss the queue being empty.
void BlockTillEmpty() {
// There is a very slight race condition here if the load completes between the check
// and the wait.
while(!(Size() == 0)){m_evtRemoved.Wait();}
}
// Removes all Items from the Queue, calling Delete on each of them.
// can all be inlined when delete is a noop
void Clear()
{
cCSLock Lock(m_CS);
Funcs funcTable;
while (!m_contents.empty())
{
funcTable.Delete(m_contents.front());
m_contents.pop_front();
}
}
// Returns the Size at time of being called
// Do not use to detirmine weather to call DequeueItem, use TryDequeue instead
size_t Size()
{
cCSLock Lock(m_CS);
return m_contents.size();
}
// Removes an Item from the queue
bool Remove(ItemType a_item)
{
cCSLock Lock(m_CS);
for (iterator itr = m_contents.begin(); itr != m_contents.end(); ++itr)
{
if((*itr) == a_item) {
m_contents.erase(itr);
m_evtRemoved.Set();
return true;
}
}
return false;
}
void EnqueueItem(T item);
bool TryDequeueItem(T& item);
T DequeueItem();
void BlockTillEmpty(cEvent CancelationEvent);
void Clear();
int Size();
private:
int warnsize;
std::list<T> contents;
ListType m_contents;
cCriticalSection m_CS;
cEvent m_evtAdded;
cEvent m_evtRemoved;
};
//template classes must be implemented in the header
#include "Queue.inc"

View File

@ -367,10 +367,13 @@ void cWorld::InitializeSpawn(void)
cWorldLoadProgress Progress(this);
// Wait for the loader to finish loading
m_Storage.WaitForQueuesEmpty();
m_Storage.WaitForLoadQueueEmpty();
// Wait for the generator to finish generating
m_Generator.WaitForQueueEmpty();
// Wait for the loader to finish saving
m_Storage.WaitForSaveQueueEmpty();
Progress.Stop();
}

View File

@ -9,3 +9,5 @@ file(GLOB SOURCE
)
add_library(WorldStorage ${SOURCE})
target_link_libraries(WorldStorage OSSupport)

View File

@ -17,7 +17,6 @@
/// If a chunk with this Y coord is de-queued, it is a signal to emit the saved-all message (cWorldStorage::QueueSavedMessage())
#define CHUNK_Y_MESSAGE 2
@ -63,8 +62,6 @@ cWorldStorage::~cWorldStorage()
{
delete *itr;
} // for itr - m_Schemas[]
m_LoadQueue.clear();
m_SaveQueue.clear();
}
@ -98,18 +95,15 @@ void cWorldStorage::WaitForFinish(void)
LOG("Waiting for the world storage to finish saving");
{
// Cancel all loading requests:
cCSLock Lock(m_CSQueues);
m_LoadQueue.clear();
m_LoadQueue.Clear();
}
// Wait for the saving to finish:
WaitForQueuesEmpty();
WaitForSaveQueueEmpty();
// Wait for the thread to finish:
m_ShouldTerminate = true;
m_Event.Set();
m_evtRemoved.Set(); // Wake up anybody waiting in the WaitForQueuesEmpty() method
m_Event.Set(); // Wake up the thread if waiting
super::Wait();
LOG("World storage thread finished");
}
@ -118,34 +112,30 @@ void cWorldStorage::WaitForFinish(void)
void cWorldStorage::WaitForQueuesEmpty(void)
void cWorldStorage::WaitForLoadQueueEmpty(void)
{
cCSLock Lock(m_CSQueues);
while (!m_ShouldTerminate && (!m_LoadQueue.empty() || !m_SaveQueue.empty()))
{
cCSUnlock Unlock(Lock);
m_evtRemoved.Wait();
}
m_LoadQueue.BlockTillEmpty();
}
void cWorldStorage::WaitForSaveQueueEmpty(void)
{
m_SaveQueue.BlockTillEmpty();
}
size_t cWorldStorage::GetLoadQueueLength(void)
{
return m_LoadQueue.Size();
}
int cWorldStorage::GetLoadQueueLength(void)
size_t cWorldStorage::GetSaveQueueLength(void)
{
cCSLock Lock(m_CSQueues);
return (int)m_LoadQueue.size();
}
int cWorldStorage::GetSaveQueueLength(void)
{
cCSLock Lock(m_CSQueues);
return (int)m_SaveQueue.size();
return m_SaveQueue.Size();
}
@ -154,22 +144,8 @@ int cWorldStorage::GetSaveQueueLength(void)
void cWorldStorage::QueueLoadChunk(int a_ChunkX, int a_ChunkY, int a_ChunkZ, bool a_Generate)
{
// Queues the chunk for loading; if not loaded, the chunk will be generated
{
cCSLock Lock(m_CSQueues);
// Check if already in the queue:
for (sChunkLoadQueue::iterator itr = m_LoadQueue.begin(); itr != m_LoadQueue.end(); ++itr)
{
if ((itr->m_ChunkX == a_ChunkX) && (itr->m_ChunkY == a_ChunkY) && (itr->m_ChunkZ == a_ChunkZ) && (itr->m_Generate == a_Generate))
{
return;
}
}
m_LoadQueue.push_back(sChunkLoad(a_ChunkX, a_ChunkY, a_ChunkZ, a_Generate));
}
m_Event.Set();
m_LoadQueue.EnqueueItemIfNotPresent(sChunkLoad(a_ChunkX, a_ChunkY, a_ChunkZ, a_Generate));
}
@ -178,12 +154,8 @@ void cWorldStorage::QueueLoadChunk(int a_ChunkX, int a_ChunkY, int a_ChunkZ, boo
void cWorldStorage::QueueSaveChunk(int a_ChunkX, int a_ChunkY, int a_ChunkZ)
{
{
cCSLock Lock(m_CSQueues);
m_SaveQueue.remove (cChunkCoords(a_ChunkX, a_ChunkY, a_ChunkZ)); // Don't add twice
m_SaveQueue.push_back(cChunkCoords(a_ChunkX, a_ChunkY, a_ChunkZ));
}
m_Event.Set();
m_SaveQueue.EnqueueItemIfNotPresent(cChunkCoords(a_ChunkX, a_ChunkY, a_ChunkZ));
}
@ -192,12 +164,8 @@ void cWorldStorage::QueueSaveChunk(int a_ChunkX, int a_ChunkY, int a_ChunkZ)
void cWorldStorage::QueueSavedMessage(void)
{
// Pushes a special coord pair into the queue, signalizing a message instead:
{
cCSLock Lock(m_CSQueues);
m_SaveQueue.push_back(cChunkCoords(0, CHUNK_Y_MESSAGE, 0));
}
m_Event.Set();
// Pushes a special coord pair into the queue, signalizing a message instead
m_SaveQueue.EnqueueItem(cChunkCoords(0, CHUNK_Y_MESSAGE, 0));
}
@ -206,18 +174,7 @@ void cWorldStorage::QueueSavedMessage(void)
void cWorldStorage::UnqueueLoad(int a_ChunkX, int a_ChunkY, int a_ChunkZ)
{
cCSLock Lock(m_CSQueues);
for (sChunkLoadQueue::iterator itr = m_LoadQueue.begin(); itr != m_LoadQueue.end(); ++itr)
{
if ((itr->m_ChunkX != a_ChunkX) || (itr->m_ChunkY != a_ChunkY) || (itr->m_ChunkZ != a_ChunkZ))
{
continue;
}
m_LoadQueue.erase(itr);
Lock.Unlock();
m_evtRemoved.Set();
return;
} // for itr - m_LoadQueue[]
m_LoadQueue.Remove(sChunkLoad(a_ChunkX, a_ChunkY, a_ChunkZ,true));
}
@ -226,11 +183,7 @@ void cWorldStorage::UnqueueLoad(int a_ChunkX, int a_ChunkY, int a_ChunkZ)
void cWorldStorage::UnqueueSave(const cChunkCoords & a_Chunk)
{
{
cCSLock Lock(m_CSQueues);
m_SaveQueue.remove(a_Chunk);
}
m_evtRemoved.Set();
m_SaveQueue.Remove(a_Chunk);
}
@ -279,21 +232,19 @@ void cWorldStorage::Execute(void)
while (!m_ShouldTerminate)
{
m_Event.Wait();
// Process both queues until they are empty again:
bool HasMore;
bool Success;
do
{
HasMore = false;
Success = false;
if (m_ShouldTerminate)
{
return;
}
HasMore = LoadOneChunk();
HasMore = HasMore | SaveOneChunk();
m_evtRemoved.Set();
} while (HasMore);
Success = LoadOneChunk();
Success |= SaveOneChunk();
} while (Success);
}
}
@ -304,19 +255,7 @@ void cWorldStorage::Execute(void)
bool cWorldStorage::LoadOneChunk(void)
{
sChunkLoad ToLoad(0, 0, 0, false);
bool HasMore;
bool ShouldLoad = false;
{
cCSLock Lock(m_CSQueues);
if (!m_LoadQueue.empty())
{
ToLoad = m_LoadQueue.front();
m_LoadQueue.pop_front();
ShouldLoad = true;
}
HasMore = !m_LoadQueue.empty();
}
bool ShouldLoad = m_LoadQueue.TryDequeueItem(ToLoad);
if (ShouldLoad && !LoadChunk(ToLoad.m_ChunkX, ToLoad.m_ChunkY, ToLoad.m_ChunkZ))
{
if (ToLoad.m_Generate)
@ -330,7 +269,7 @@ bool cWorldStorage::LoadOneChunk(void)
// m_World->ChunkLoadFailed(ToLoad.m_ChunkX, ToLoad.m_ChunkY, ToLoad.m_ChunkZ);
}
}
return HasMore;
return ShouldLoad;
}
@ -339,33 +278,24 @@ bool cWorldStorage::LoadOneChunk(void)
bool cWorldStorage::SaveOneChunk(void)
{
cChunkCoords Save(0, 0, 0);
bool HasMore;
bool ShouldSave = false;
{
cCSLock Lock(m_CSQueues);
if (!m_SaveQueue.empty())
cChunkCoords ToSave(0, 0, 0);
bool ShouldSave = m_SaveQueue.TryDequeueItem(ToSave);
if(ShouldSave) {
if (ToSave.m_ChunkY == CHUNK_Y_MESSAGE)
{
Save = m_SaveQueue.front();
m_SaveQueue.pop_front();
ShouldSave = true;
LOGINFO("Saved all chunks in world %s", m_World->GetName().c_str());
return ShouldSave;
}
HasMore = !m_SaveQueue.empty();
}
if (Save.m_ChunkY == CHUNK_Y_MESSAGE)
{
LOGINFO("Saved all chunks in world %s", m_World->GetName().c_str());
return HasMore;
}
if (ShouldSave && m_World->IsChunkValid(Save.m_ChunkX, Save.m_ChunkZ))
{
m_World->MarkChunkSaving(Save.m_ChunkX, Save.m_ChunkZ);
if (m_SaveSchema->SaveChunk(Save))
if (ShouldSave && m_World->IsChunkValid(ToSave.m_ChunkX, ToSave.m_ChunkZ))
{
m_World->MarkChunkSaved(Save.m_ChunkX, Save.m_ChunkZ);
m_World->MarkChunkSaving(ToSave.m_ChunkX, ToSave.m_ChunkZ);
if (m_SaveSchema->SaveChunk(ToSave))
{
m_World->MarkChunkSaved(ToSave.m_ChunkX, ToSave.m_ChunkZ);
}
}
}
return HasMore;
return ShouldSave;
}

View File

@ -16,6 +16,7 @@
#include "../ChunkDef.h"
#include "../OSSupport/IsThread.h"
#include "../OSSupport/Queue.h"
@ -24,6 +25,8 @@
// fwd:
class cWorld;
typedef cQueue<cChunkCoords> cChunkCoordsQueue;
@ -76,10 +79,11 @@ public:
bool Start(cWorld * a_World, const AString & a_StorageSchemaName); // Hide the cIsThread's Start() method, we need to provide args
void Stop(void); // Hide the cIsThread's Stop() method, we need to signal the event
void WaitForFinish(void);
void WaitForQueuesEmpty(void);
void WaitForLoadQueueEmpty(void);
void WaitForSaveQueueEmpty(void);
int GetLoadQueueLength(void);
int GetSaveQueueLength(void);
size_t GetLoadQueueLength(void);
size_t GetSaveQueueLength(void);
protected:
@ -91,20 +95,30 @@ protected:
bool m_Generate; // If true, the chunk will be generated if it cannot be loaded
sChunkLoad(int a_ChunkX, int a_ChunkY, int a_ChunkZ, bool a_Generate) : m_ChunkX(a_ChunkX), m_ChunkY(a_ChunkY), m_ChunkZ(a_ChunkZ), m_Generate(a_Generate) {}
bool operator==(const sChunkLoad other) const
{
return this->m_ChunkX == other.m_ChunkX &&
this->m_ChunkY == other.m_ChunkY &&
this->m_ChunkZ == other.m_ChunkZ;
}
} ;
typedef std::list<sChunkLoad> sChunkLoadQueue;
struct FuncTable {
static void Delete(sChunkLoad) {};
static void Combine(sChunkLoad& a_orig, const sChunkLoad a_new)
{
a_orig.m_Generate |= a_new.m_Generate;
};
};
typedef cQueue<sChunkLoad,FuncTable> sChunkLoadQueue;
cWorld * m_World;
AString m_StorageSchemaName;
// Both queues are locked by the same CS
cCriticalSection m_CSQueues;
sChunkLoadQueue m_LoadQueue;
cChunkCoordsList m_SaveQueue;
cEvent m_Event; // Set when there's any addition to the queues
cEvent m_evtRemoved; // Set when an item has been removed from the queue, either by the worker thread or the Unqueue methods
cChunkCoordsQueue m_SaveQueue;
/// All the storage schemas (all used for loading)
cWSSchemaList m_Schemas;
@ -116,6 +130,8 @@ protected:
virtual void Execute(void) override;
cEvent m_Event; // Set when there's any addition to the queues
/// Loads one chunk from the queue (if any queued); returns true if there are more chunks in the load queue
bool LoadOneChunk(void);