1
0

Fixed minor errors in Tycho's code

Everything should work now :)
This commit is contained in:
Tiger Wang 2015-06-22 21:27:13 +01:00
parent dd4b415051
commit 33fc1474d9
7 changed files with 84 additions and 93 deletions

View File

@ -34,7 +34,7 @@ class cNotifyChunkSender :
a_ChunkX, a_ChunkZ, a_ChunkX, a_ChunkZ,
[&ChunkSender] (cChunk & a_Chunk) -> bool [&ChunkSender] (cChunk & a_Chunk) -> bool
{ {
ChunkSender.QueueSendChunkTo(a_Chunk.GetPosX(), a_Chunk.GetPosZ(), cChunkSender::PRIORITY_BROADCAST, a_Chunk.GetAllClients()); ChunkSender.QueueSendChunkTo(a_Chunk.GetPosX(), a_Chunk.GetPosZ(), cChunkSender::E_CHUNK_PRIORITY_MIDHIGH, a_Chunk.GetAllClients());
return true; return true;
} }
); );
@ -51,8 +51,7 @@ public:
cChunkSender::cChunkSender(cWorld & a_World) : cChunkSender::cChunkSender(cWorld & a_World) :
super("ChunkSender"), super("ChunkSender"),
m_World(a_World), m_World(a_World)
m_RemoveCount(0)
{ {
} }
@ -163,11 +162,9 @@ void cChunkSender::RemoveClient(cClientHandle * a_Client)
auto && clients = pair.second.m_Clients; auto && clients = pair.second.m_Clients;
clients.erase(a_Client); // nop for sets that do not contain a_Client clients.erase(a_Client); // nop for sets that do not contain a_Client
} }
m_RemoveCount++;
} }
m_evtQueue.Set(); m_evtQueue.Set();
m_evtRemoved.Wait(); // Wait for removal confirmation m_evtRemoved.Wait(); // Wait for all remaining instances of a_Client to be processed (Execute() makes a copy of m_ChunkInfo)
} }
@ -178,40 +175,32 @@ void cChunkSender::Execute(void)
{ {
while (!m_ShouldTerminate) while (!m_ShouldTerminate)
{ {
cCSLock Lock(m_CS); m_evtQueue.Wait();
do
{
int RemoveCount = m_RemoveCount;
m_RemoveCount = 0;
cCSUnlock Unlock(Lock);
for (int i = 0; i < RemoveCount; i++)
{
m_evtRemoved.Set(); // Notify that the removed clients are safe to be deleted
}
m_evtQueue.Wait();
if (m_ShouldTerminate)
{
return;
}
} while (m_SendChunks.empty());
// Take one from the queue:
auto Chunk = m_SendChunks.top().m_Chunk;
m_SendChunks.pop();
auto itr = m_ChunkInfo.find(Chunk);
if (itr == m_ChunkInfo.end())
{ {
continue; cCSLock Lock(m_CS);
while (!m_SendChunks.empty())
{
// Take one from the queue:
auto Chunk = m_SendChunks.top().m_Chunk;
m_SendChunks.pop();
auto itr = m_ChunkInfo.find(Chunk);
if (itr == m_ChunkInfo.end())
{
continue;
}
std::unordered_set<cClientHandle *> clients;
std::swap(itr->second.m_Clients, clients);
m_ChunkInfo.erase(itr);
cCSUnlock Unlock(Lock);
SendChunk(Chunk.m_ChunkX, Chunk.m_ChunkZ, clients);
}
} }
std::unordered_set<cClientHandle *> clients;
std::swap(itr->second.m_Clients, clients);
m_ChunkInfo.erase(itr);
Lock.Unlock(); m_evtRemoved.SetAll(); // Notify all waiting threads that all clients are processed and thus safe to destroy
} // while (!m_ShouldTerminate)
SendChunk(Chunk.m_ChunkX, Chunk.m_ChunkZ, clients);
} // while (!mShouldTerminate)
} }
@ -220,7 +209,6 @@ void cChunkSender::Execute(void)
void cChunkSender::SendChunk(int a_ChunkX, int a_ChunkZ, std::unordered_set<cClientHandle *> a_Clients) void cChunkSender::SendChunk(int a_ChunkX, int a_ChunkZ, std::unordered_set<cClientHandle *> a_Clients)
{ {
// Ask the client if it still wants the chunk: // Ask the client if it still wants the chunk:
for (auto itr = a_Clients.begin(); itr != a_Clients.end();) for (auto itr = a_Clients.begin(); itr != a_Clients.end();)
{ {
@ -260,13 +248,13 @@ void cChunkSender::SendChunk(int a_ChunkX, int a_ChunkZ, std::unordered_set<cCli
} }
cChunkDataSerializer Data(m_BlockTypes, m_BlockMetas, m_BlockLight, m_BlockSkyLight, m_BiomeMap); cChunkDataSerializer Data(m_BlockTypes, m_BlockMetas, m_BlockLight, m_BlockSkyLight, m_BiomeMap);
for (auto client : a_Clients) for (const auto client : a_Clients)
{ {
// Send: // Send:
client->SendChunkData(a_ChunkX, a_ChunkZ, Data); client->SendChunkData(a_ChunkX, a_ChunkZ, Data);
// Send block-entity packets: // Send block-entity packets:
for (auto Pos : m_BlockEntities) for (const auto & Pos : m_BlockEntities)
{ {
m_World.SendBlockEntity(Pos.x, Pos.y, Pos.z, *client); m_World.SendBlockEntity(Pos.x, Pos.y, Pos.z, *client);
} // for itr - m_Packets[] } // for itr - m_Packets[]

View File

@ -1,4 +1,4 @@

// ChunkSender.h // ChunkSender.h
// Interfaces to the cChunkSender class representing the thread that waits for chunks becoming ready (loaded / generated) and sends them to clients // Interfaces to the cChunkSender class representing the thread that waits for chunks becoming ready (loaded / generated) and sends them to clients
@ -61,8 +61,8 @@ public:
enum eChunkPriority enum eChunkPriority
{ {
E_CHUNK_PRIORITY_HIGH = 0, E_CHUNK_PRIORITY_HIGH = 0,
PRIORITY_BROADCAST, E_CHUNK_PRIORITY_MIDHIGH,
E_CHUNK_PRIORITY_MEDIUM, E_CHUNK_PRIORITY_MEDIUM,
E_CHUNK_PRIORITY_LOW, E_CHUNK_PRIORITY_LOW,
@ -86,7 +86,16 @@ protected:
eChunkPriority m_Priority; eChunkPriority m_Priority;
cChunkCoords m_Chunk; cChunkCoords m_Chunk;
bool operator <(const sChunkQueue & a_Other) const { return this->m_Priority < a_Other.m_Priority; } bool operator <(const sChunkQueue & a_Other) const
{
/* The Standard Priority Queue sorts from biggest to smallest
return true here means you are smaller than the other object, and you get pushed down.
The priorities go from HIGH (0) to LOW (2), so a smaller priority should mean further up the list
therefore, return true (affirm we're "smaller", and get pushed down) only if our priority is bigger than theirs (they're more urgent)
*/
return this->m_Priority > a_Other.m_Priority;
}
}; };
/// Used for sending chunks to specific clients /// Used for sending chunks to specific clients
@ -107,9 +116,9 @@ protected:
cCriticalSection m_CS; cCriticalSection m_CS;
std::priority_queue<sChunkQueue> m_SendChunks; std::priority_queue<sChunkQueue> m_SendChunks;
std::unordered_map<cChunkCoords, sSendChunk, cChunkCoordsHash> m_ChunkInfo; std::unordered_map<cChunkCoords, sSendChunk, cChunkCoordsHash> m_ChunkInfo;
cEvent m_evtQueue; // Set when anything is added to m_ChunksReady cEvent m_evtQueue; // Set when anything is added to m_ChunksReady
cEvent m_evtRemoved; // Set when removed clients are safe to be deleted cEvent m_evtRemoved; // Set when removed clients are safe to be deleted
int m_RemoveCount; // Number of threads waiting for a client removal (m_evtRemoved needs to be set this many times)
// Data about the chunk that is being sent: // Data about the chunk that is being sent:
// NOTE that m_BlockData[] is inherited from the cChunkDataCollector // NOTE that m_BlockData[] is inherited from the cChunkDataCollector
unsigned char m_BiomeMap[cChunkDef::Width * cChunkDef::Width]; unsigned char m_BiomeMap[cChunkDef::Width * cChunkDef::Width];

View File

@ -13,7 +13,7 @@
cEvent::cEvent(void) : cEvent::cEvent(void) :
m_ShouldWait(true) m_ShouldContinue(false)
{ {
} }
@ -23,12 +23,11 @@ cEvent::cEvent(void) :
void cEvent::Wait(void) void cEvent::Wait(void)
{ {
std::unique_lock<std::mutex> Lock(m_Mutex);
while (m_ShouldWait)
{ {
m_CondVar.wait(Lock); std::unique_lock<std::mutex> Lock(m_Mutex);
m_CondVar.wait(Lock, [this](){ return m_ShouldContinue.load(); });
} }
m_ShouldWait = true; m_ShouldContinue = false;
} }
@ -38,33 +37,13 @@ void cEvent::Wait(void)
bool cEvent::Wait(unsigned a_TimeoutMSec) bool cEvent::Wait(unsigned a_TimeoutMSec)
{ {
auto dst = std::chrono::system_clock::now() + std::chrono::milliseconds(a_TimeoutMSec); auto dst = std::chrono::system_clock::now() + std::chrono::milliseconds(a_TimeoutMSec);
std::unique_lock<std::mutex> Lock(m_Mutex); // We assume that this lock is acquired without much delay - we are the only user of the mutex bool Result;
while (m_ShouldWait && (std::chrono::system_clock::now() <= dst))
{ {
switch (m_CondVar.wait_until(Lock, dst)) std::unique_lock<std::mutex> Lock(m_Mutex); // We assume that this lock is acquired without much delay - we are the only user of the mutex
{ Result = m_CondVar.wait_until(Lock, dst, [this](){ return m_ShouldContinue.load(); });
case std::cv_status::no_timeout: }
{ m_ShouldContinue = false;
// The wait was successful, check for spurious wakeup: return Result;
if (!m_ShouldWait)
{
m_ShouldWait = true;
return true;
}
// This was a spurious wakeup, wait again:
continue;
}
case std::cv_status::timeout:
{
// The wait timed out, return failure:
return false;
}
} // switch (wait_until())
} // while (m_ShouldWait && not timeout)
// The wait timed out in the while condition:
return false;
} }
@ -73,13 +52,20 @@ bool cEvent::Wait(unsigned a_TimeoutMSec)
void cEvent::Set(void) void cEvent::Set(void)
{ {
{ m_ShouldContinue = true;
std::unique_lock<std::mutex> Lock(m_Mutex);
m_ShouldWait = false;
}
m_CondVar.notify_one(); m_CondVar.notify_one();
} }
void cEvent::SetAll(void)
{
m_ShouldContinue = true;
m_CondVar.notify_all();
}

View File

@ -12,6 +12,7 @@
#include <mutex> #include <mutex>
#include <condition_variable> #include <condition_variable>
#include <atomic>
@ -28,7 +29,11 @@ public:
/** Sets the event - releases one thread that has been waiting in Wait(). /** Sets the event - releases one thread that has been waiting in Wait().
If there was no thread waiting, the next call to Wait() will not block. */ If there was no thread waiting, the next call to Wait() will not block. */
void Set (void); void Set(void);
/** Sets the event - releases all threads that have been waiting in Wait().
If there was no thread waiting, the next call to Wait() will not block. */
void SetAll(void);
/** Waits for the event until either it is signalled, or the (relative) timeout is passed. /** Waits for the event until either it is signalled, or the (relative) timeout is passed.
Returns true if the event was signalled, false if the timeout was hit or there was an error. */ Returns true if the event was signalled, false if the timeout was hit or there was an error. */
@ -37,9 +42,9 @@ public:
private: private:
/** Used for checking for spurious wakeups. */ /** Used for checking for spurious wakeups. */
bool m_ShouldWait; std::atomic<bool> m_ShouldContinue;
/** Mutex protecting m_ShouldWait from multithreaded access. */ /** Mutex protecting m_ShouldContinue from multithreaded access. */
std::mutex m_Mutex; std::mutex m_Mutex;
/** The condition variable used as the Event. */ /** The condition variable used as the Event. */

View File

@ -134,9 +134,9 @@ bool cIsThread::Wait(void)
m_Thread.join(); m_Thread.join();
return true; return true;
} }
catch (std::system_error & a_Exception) catch (const std::system_error & a_Exception)
{ {
LOGERROR("cIsThread::Wait error %i: could not join thread %s; %s", a_Exception.code().value(), m_ThreadName.c_str(), a_Exception.code().message().c_str()); LOGERROR("%s error %i: could not join thread %s; %s", __FUNCTION__, a_Exception.code().value(), m_ThreadName.c_str(), a_Exception.code().message().c_str());
return false; return false;
} }
} }

View File

@ -32,10 +32,6 @@ protected:
/** The overriden Execute() method should check this value periodically and terminate if this is true. */ /** The overriden Execute() method should check this value periodically and terminate if this is true. */
volatile bool m_ShouldTerminate; volatile bool m_ShouldTerminate;
private:
/** Wrapper for Execute() that waits for the initialization event, to prevent race conditions in thread initialization. */
void DoExecute(void);
public: public:
cIsThread(const AString & a_ThreadName); cIsThread(const AString & a_ThreadName);
virtual ~cIsThread(); virtual ~cIsThread();
@ -51,14 +47,21 @@ public:
/** Returns true if the thread calling this function is the thread contained within this object. */ /** Returns true if the thread calling this function is the thread contained within this object. */
bool IsCurrentThread(void) const { return std::this_thread::get_id() == m_Thread.get_id(); } bool IsCurrentThread(void) const { return std::this_thread::get_id() == m_Thread.get_id(); }
private:
protected: /** The name of the thread, used to aid debugging in IDEs which support named threads */
AString m_ThreadName; AString m_ThreadName;
/** The thread object which holds the created thread for later manipulation */
std::thread m_Thread; std::thread m_Thread;
/** The event that is used to wait with the thread's execution until the thread object is fully initialized. /** The event that is used to wait with the thread's execution until the thread object is fully initialized.
This prevents the IsCurrentThread() call to fail because of a race-condition. */ This prevents the IsCurrentThread() call to fail because of a race-condition where the thread starts before m_Thread has been fully assigned. */
cEvent m_evtStart; cEvent m_evtStart;
/** Wrapper for Execute() that waits for the initialization event, to prevent race conditions in thread initialization. */
void DoExecute(void);
} ; } ;

View File

@ -2487,7 +2487,7 @@ void cWorld::SetChunkData(cSetChunkData & a_SetChunkData)
ChunkSender.QueueSendChunkTo( ChunkSender.QueueSendChunkTo(
a_Chunk.GetPosX(), a_Chunk.GetPosX(),
a_Chunk.GetPosZ(), a_Chunk.GetPosZ(),
cChunkSender::PRIORITY_BROADCAST, cChunkSender::E_CHUNK_PRIORITY_MEDIUM,
a_Chunk.GetAllClients() a_Chunk.GetAllClients()
); );
} }