1
0

Merge pull request #2224 from cuberite/ChunkQueueCollapsing

Chunk queue collapsing: MK II
This commit is contained in:
worktycho 2015-07-04 14:43:00 +01:00
commit 106e06617a
16 changed files with 255 additions and 334 deletions

@ -1 +1 @@
Subproject commit 94da343b62f0498a5843247f36d6ee00cbeb8f21
Subproject commit 493f2dfa6d39f134e37c4c614cf8d6ffd10c825f

View File

@ -85,6 +85,7 @@ public:
// tolua_begin
// Position, in absolute block coordinates:
Vector3i GetPos(void) const { return Vector3i{m_PosX, m_PosY, m_PosZ}; }
int GetPosX(void) const { return m_PosX; }
int GetPosY(void) const { return m_PosY; }
int GetPosZ(void) const { return m_PosZ; }

View File

@ -2858,22 +2858,6 @@ void cChunk::BroadcastBlockEntity(int a_BlockX, int a_BlockY, int a_BlockZ, cons
void cChunk::BroadcastChunkData(cChunkDataSerializer & a_Serializer, const cClientHandle * a_Exclude)
{
for (cClientHandleList::iterator itr = m_LoadedByClient.begin(); itr != m_LoadedByClient.end(); ++itr)
{
if (*itr == a_Exclude)
{
continue;
}
(*itr)->SendChunkData(m_PosX, m_PosZ, a_Serializer);
} // for itr - LoadedByClient[]
}
void cChunk::BroadcastCollectEntity(const cEntity & a_Entity, const cPlayer & a_Player, const cClientHandle * a_Exclude)
{
for (cClientHandleList::iterator itr = m_LoadedByClient.begin(); itr != m_LoadedByClient.end(); ++itr)

View File

@ -319,7 +319,6 @@ public:
void BroadcastBlockAction (int a_BlockX, int a_BlockY, int a_BlockZ, char a_Byte1, char a_Byte2, BLOCKTYPE a_BlockType, const cClientHandle * a_Exclude = nullptr);
void BroadcastBlockBreakAnimation(UInt32 a_EntityID, int a_BlockX, int a_BlockY, int a_BlockZ, char a_Stage, const cClientHandle * a_Exclude = nullptr);
void BroadcastBlockEntity (int a_BlockX, int a_BlockY, int a_BlockZ, const cClientHandle * a_Exclude = nullptr);
void BroadcastChunkData (cChunkDataSerializer & a_Serializer, const cClientHandle * a_Exclude = nullptr);
void BroadcastCollectEntity (const cEntity & a_Entity, const cPlayer & a_Player, const cClientHandle * a_Exclude = nullptr);
void BroadcastDestroyEntity (const cEntity & a_Entity, const cClientHandle * a_Exclude = nullptr);
void BroadcastEntityEffect (const cEntity & a_Entity, int a_EffectID, int a_Amplifier, short a_Duration, const cClientHandle * a_Exclude = nullptr);

View File

@ -409,22 +409,6 @@ void cChunkMap::BroadcastBlockEntity(int a_BlockX, int a_BlockY, int a_BlockZ, c
void cChunkMap::BroadcastChunkData(int a_ChunkX, int a_ChunkZ, cChunkDataSerializer & a_Serializer, const cClientHandle * a_Exclude)
{
cCSLock Lock(m_CSLayers);
cChunkPtr Chunk = GetChunkNoGen(a_ChunkX, a_ChunkZ);
if (Chunk == nullptr)
{
return;
}
// It's perfectly legal to broadcast packets even to invalid chunks!
Chunk->BroadcastChunkData(a_Serializer, a_Exclude);
}
void cChunkMap::BroadcastCollectEntity(const cEntity & a_Entity, const cPlayer & a_Player, const cClientHandle * a_Exclude)
{
cCSLock Lock(m_CSLayers);

View File

@ -73,7 +73,6 @@ public:
void BroadcastBlockAction(int a_BlockX, int a_BlockY, int a_BlockZ, char a_Byte1, char a_Byte2, BLOCKTYPE a_BlockType, const cClientHandle * a_Exclude = nullptr);
void BroadcastBlockBreakAnimation(UInt32 a_EntityID, int a_BlockX, int a_BlockY, int a_BlockZ, char a_Stage, const cClientHandle * a_Exclude = nullptr);
void BroadcastBlockEntity(int a_BlockX, int a_BlockY, int a_BlockZ, const cClientHandle * a_Exclude);
void BroadcastChunkData(int a_ChunkX, int a_ChunkZ, cChunkDataSerializer & a_Serializer, const cClientHandle * a_Exclude = nullptr);
void BroadcastCollectEntity(const cEntity & a_Entity, const cPlayer & a_Player, const cClientHandle * a_Exclude = nullptr);
void BroadcastCollectPickup(const cPickup & a_Pickup, const cPlayer & a_Player, const cClientHandle * a_Exclude = nullptr);
void BroadcastDestroyEntity(const cEntity & a_Entity, const cClientHandle * a_Exclude = nullptr);

View File

@ -13,6 +13,7 @@
#include "BlockEntities/BlockEntity.h"
#include "Protocol/ChunkDataSerializer.h"
#include "ClientHandle.h"
#include "Chunk.h"
@ -28,26 +29,29 @@ class cNotifyChunkSender :
{
virtual void Call(int a_ChunkX, int a_ChunkZ) override
{
m_ChunkSender->ChunkReady(a_ChunkX, a_ChunkZ);
cChunkSender & ChunkSender = m_ChunkSender;
m_World.DoWithChunk(
a_ChunkX, a_ChunkZ,
[&ChunkSender] (cChunk & a_Chunk) -> bool
{
ChunkSender.QueueSendChunkTo(a_Chunk.GetPosX(), a_Chunk.GetPosZ(), cChunkSender::E_CHUNK_PRIORITY_MIDHIGH, a_Chunk.GetAllClients());
return true;
}
);
}
cChunkSender * m_ChunkSender;
cChunkSender & m_ChunkSender;
cWorld & m_World;
public:
cNotifyChunkSender(cChunkSender * a_ChunkSender) : m_ChunkSender(a_ChunkSender) {}
cNotifyChunkSender(cChunkSender & a_ChunkSender, cWorld & a_World) : m_ChunkSender(a_ChunkSender), m_World(a_World) {}
};
////////////////////////////////////////////////////////////////////////////////
// cChunkSender:
cChunkSender::cChunkSender(void) :
cChunkSender::cChunkSender(cWorld & a_World) :
super("ChunkSender"),
m_World(nullptr),
m_RemoveCount(0)
m_World(a_World)
{
}
@ -64,10 +68,9 @@ cChunkSender::~cChunkSender()
bool cChunkSender::Start(cWorld * a_World)
bool cChunkSender::Start()
{
m_ShouldTerminate = false;
m_World = a_World;
return super::Start();
}
@ -86,12 +89,30 @@ void cChunkSender::Stop(void)
void cChunkSender::ChunkReady(int a_ChunkX, int a_ChunkZ)
void cChunkSender::QueueSendChunkTo(int a_ChunkX, int a_ChunkZ, eChunkPriority a_Priority, cClientHandle * a_Client)
{
// This is probably never gonna be called twice for the same chunk, and if it is, we don't mind, so we don't check
ASSERT(a_Client != nullptr);
{
cChunkCoords Chunk{a_ChunkX, a_ChunkZ};
cCSLock Lock(m_CS);
m_ChunksReady.push_back(cChunkCoords(a_ChunkX, a_ChunkZ));
auto iter = m_ChunkInfo.find(Chunk);
if (iter != m_ChunkInfo.end())
{
auto & info = iter->second;
if (info.m_Priority > a_Priority)
{
m_SendChunks.push(sChunkQueue{a_Priority, Chunk});
info.m_Priority = a_Priority;
}
info.m_Clients.insert(a_Client);
}
else
{
m_SendChunks.push(sChunkQueue{a_Priority, Chunk});
auto info = sSendChunk{Chunk, a_Priority};
info.m_Clients.insert(a_Client);
m_ChunkInfo.emplace(Chunk, info);
}
}
m_evtQueue.Set();
}
@ -100,40 +121,29 @@ void cChunkSender::ChunkReady(int a_ChunkX, int a_ChunkZ)
void cChunkSender::QueueSendChunkTo(int a_ChunkX, int a_ChunkZ, eChunkPriority a_Priority, cClientHandle * a_Client)
void cChunkSender::QueueSendChunkTo(int a_ChunkX, int a_ChunkZ, eChunkPriority a_Priority, std::list<cClientHandle *> a_Clients)
{
ASSERT(a_Client != nullptr);
{
sSendChunk Chunk(a_ChunkX, a_ChunkZ, a_Client);
cChunkCoords Chunk{a_ChunkX, a_ChunkZ};
cCSLock Lock(m_CS);
if (
std::find(m_SendChunksLowPriority.begin(), m_SendChunksLowPriority.end(), Chunk) != m_SendChunksLowPriority.end() ||
std::find(m_SendChunksMediumPriority.begin(), m_SendChunksMediumPriority.end(), Chunk) != m_SendChunksMediumPriority.end() ||
std::find(m_SendChunksHighPriority.begin(), m_SendChunksHighPriority.end(), Chunk) != m_SendChunksHighPriority.end()
)
auto iter = m_ChunkInfo.find(Chunk);
if (iter != m_ChunkInfo.end())
{
// Already queued, bail out
return;
auto & info = iter->second;
if (info.m_Priority > a_Priority)
{
m_SendChunks.push(sChunkQueue{a_Priority, Chunk});
info.m_Priority = a_Priority;
}
info.m_Clients.insert(a_Clients.begin(), a_Clients.end());
}
switch (a_Priority)
else
{
case E_CHUNK_PRIORITY_LOW:
{
m_SendChunksLowPriority.push_back(Chunk);
break;
}
case E_CHUNK_PRIORITY_MEDIUM:
{
m_SendChunksMediumPriority.push_back(Chunk);
break;
}
case E_CHUNK_PRIORITY_HIGH:
{
m_SendChunksHighPriority.push_back(Chunk);
break;
}
m_SendChunks.push(sChunkQueue{a_Priority, Chunk});
auto info = sSendChunk{Chunk, a_Priority};
info.m_Clients.insert(a_Clients.begin(), a_Clients.end());
m_ChunkInfo.emplace(Chunk, info);
}
}
m_evtQueue.Set();
@ -147,37 +157,14 @@ void cChunkSender::RemoveClient(cClientHandle * a_Client)
{
{
cCSLock Lock(m_CS);
for (sSendChunkList::iterator itr = m_SendChunksLowPriority.begin(); itr != m_SendChunksLowPriority.end();)
for (auto && pair : m_ChunkInfo)
{
if (itr->m_Client == a_Client)
{
itr = m_SendChunksLowPriority.erase(itr);
continue;
}
++itr;
} // for itr - m_SendChunksLowPriority[]
for (sSendChunkList::iterator itr = m_SendChunksMediumPriority.begin(); itr != m_SendChunksMediumPriority.end();)
{
if (itr->m_Client == a_Client)
{
itr = m_SendChunksMediumPriority.erase(itr);
continue;
}
++itr;
} // for itr - m_SendChunksMediumPriority[]
for (sSendChunkList::iterator itr = m_SendChunksHighPriority.begin(); itr != m_SendChunksHighPriority.end();)
{
if (itr->m_Client == a_Client)
{
itr = m_SendChunksHighPriority.erase(itr);
continue;
}
++itr;
} // for itr - m_SendChunksHighPriority[]
m_RemoveCount++;
auto && clients = pair.second.m_Clients;
clients.erase(a_Client); // nop for sets that do not contain a_Client
}
}
m_evtQueue.Set();
m_evtRemoved.Wait(); // Wait for removal confirmation
m_evtRemoved.Wait(); // Wait for all remaining instances of a_Client to be processed (Execute() makes a copy of m_ChunkInfo)
}
@ -188,132 +175,91 @@ void cChunkSender::Execute(void)
{
while (!m_ShouldTerminate)
{
cCSLock Lock(m_CS);
while (m_ChunksReady.empty() && m_SendChunksLowPriority.empty() && m_SendChunksMediumPriority.empty() && m_SendChunksHighPriority.empty())
m_evtQueue.Wait();
{
int RemoveCount = m_RemoveCount;
m_RemoveCount = 0;
cCSUnlock Unlock(Lock);
for (int i = 0; i < RemoveCount; i++)
cCSLock Lock(m_CS);
while (!m_SendChunks.empty())
{
m_evtRemoved.Set(); // Notify that the removed clients are safe to be deleted
// Take one from the queue:
auto Chunk = m_SendChunks.top().m_Chunk;
m_SendChunks.pop();
auto itr = m_ChunkInfo.find(Chunk);
if (itr == m_ChunkInfo.end())
{
continue;
}
std::unordered_set<cClientHandle *> clients;
std::swap(itr->second.m_Clients, clients);
m_ChunkInfo.erase(itr);
cCSUnlock Unlock(Lock);
SendChunk(Chunk.m_ChunkX, Chunk.m_ChunkZ, clients);
}
m_evtQueue.Wait();
if (m_ShouldTerminate)
{
return;
}
} // while (empty)
}
if (!m_SendChunksHighPriority.empty())
{
// Take one from the queue:
sSendChunk Chunk(m_SendChunksHighPriority.front());
m_SendChunksHighPriority.pop_front();
Lock.Unlock();
SendChunk(Chunk.m_ChunkX, Chunk.m_ChunkZ, Chunk.m_Client);
}
else if (!m_ChunksReady.empty())
{
// Take one from the queue:
cChunkCoords Coords(m_ChunksReady.front());
m_ChunksReady.pop_front();
Lock.Unlock();
SendChunk(Coords.m_ChunkX, Coords.m_ChunkZ, nullptr);
}
else if (!m_SendChunksMediumPriority.empty())
{
// Take one from the queue:
sSendChunk Chunk(m_SendChunksMediumPriority.front());
m_SendChunksMediumPriority.pop_front();
Lock.Unlock();
SendChunk(Chunk.m_ChunkX, Chunk.m_ChunkZ, Chunk.m_Client);
}
else
{
// Take one from the queue:
sSendChunk Chunk(m_SendChunksLowPriority.front());
m_SendChunksLowPriority.pop_front();
Lock.Unlock();
SendChunk(Chunk.m_ChunkX, Chunk.m_ChunkZ, Chunk.m_Client);
}
Lock.Lock();
int RemoveCount = m_RemoveCount;
m_RemoveCount = 0;
Lock.Unlock();
for (int i = 0; i < RemoveCount; i++)
{
m_evtRemoved.Set(); // Notify that the removed clients are safe to be deleted
}
} // while (!mShouldTerminate)
m_evtRemoved.SetAll(); // Notify all waiting threads that all clients are processed and thus safe to destroy
} // while (!m_ShouldTerminate)
}
void cChunkSender::SendChunk(int a_ChunkX, int a_ChunkZ, cClientHandle * a_Client)
void cChunkSender::SendChunk(int a_ChunkX, int a_ChunkZ, std::unordered_set<cClientHandle *> a_Clients)
{
ASSERT(m_World != nullptr);
// Ask the client if it still wants the chunk:
if ((a_Client != nullptr) && !a_Client->WantsSendChunk(a_ChunkX, a_ChunkZ))
for (auto itr = a_Clients.begin(); itr != a_Clients.end();)
{
return;
if (!(*itr)->WantsSendChunk(a_ChunkX, a_ChunkZ))
{
itr = a_Clients.erase(itr);
}
else
{
itr++;
}
}
// If the chunk has no clients, no need to packetize it:
if (!m_World->HasChunkAnyClients(a_ChunkX, a_ChunkZ))
if (!m_World.HasChunkAnyClients(a_ChunkX, a_ChunkZ))
{
return;
}
// If the chunk is not valid, do nothing - whoever needs it has queued it for loading / generating
if (!m_World->IsChunkValid(a_ChunkX, a_ChunkZ))
if (!m_World.IsChunkValid(a_ChunkX, a_ChunkZ))
{
return;
}
// If the chunk is not lighted, queue it for relighting and get notified when it's ready:
if (!m_World->IsChunkLighted(a_ChunkX, a_ChunkZ))
if (!m_World.IsChunkLighted(a_ChunkX, a_ChunkZ))
{
m_World->QueueLightChunk(a_ChunkX, a_ChunkZ, cpp14::make_unique<cNotifyChunkSender>(this));
m_World.QueueLightChunk(a_ChunkX, a_ChunkZ, cpp14::make_unique<cNotifyChunkSender>(*this, m_World));
return;
}
// Query and prepare chunk data:
if (!m_World->GetChunkData(a_ChunkX, a_ChunkZ, *this))
if (!m_World.GetChunkData(a_ChunkX, a_ChunkZ, *this))
{
return;
}
cChunkDataSerializer Data(m_BlockTypes, m_BlockMetas, m_BlockLight, m_BlockSkyLight, m_BiomeMap);
// Send:
if (a_Client == nullptr)
for (const auto client : a_Clients)
{
m_World->BroadcastChunkData(a_ChunkX, a_ChunkZ, Data);
}
else
{
a_Client->SendChunkData(a_ChunkX, a_ChunkZ, Data);
}
// Send:
client->SendChunkData(a_ChunkX, a_ChunkZ, Data);
// Send block-entity packets:
for (sBlockCoords::iterator itr = m_BlockEntities.begin(); itr != m_BlockEntities.end(); ++itr)
{
if (a_Client == nullptr)
// Send block-entity packets:
for (const auto & Pos : m_BlockEntities)
{
m_World->BroadcastBlockEntity(itr->m_BlockX, itr->m_BlockY, itr->m_BlockZ);
}
else
{
m_World->SendBlockEntity(itr->m_BlockX, itr->m_BlockY, itr->m_BlockZ, *a_Client);
}
} // for itr - m_Packets[]
m_World.SendBlockEntity(Pos.x, Pos.y, Pos.z, *client);
} // for itr - m_Packets[]
}
m_BlockEntities.clear();
// TODO: Send entity spawn packets
@ -325,7 +271,7 @@ void cChunkSender::SendChunk(int a_ChunkX, int a_ChunkZ, cClientHandle * a_Clien
void cChunkSender::BlockEntity(cBlockEntity * a_Entity)
{
m_BlockEntities.push_back(sBlockCoord(a_Entity->GetPosX(), a_Entity->GetPosY(), a_Entity->GetPosZ()));
m_BlockEntities.push_back(a_Entity->GetPos());
}

View File

@ -1,4 +1,4 @@

// ChunkSender.h
// Interfaces to the cChunkSender class representing the thread that waits for chunks becoming ready (loaded / generated) and sends them to clients
@ -29,6 +29,9 @@ Note that it may be called by world's BroadcastToChunk() if the client is still
#include "ChunkDef.h"
#include "ChunkDataCallback.h"
#include <unordered_set>
#include <unordered_map>
@ -53,87 +56,73 @@ class cChunkSender:
{
typedef cIsThread super;
public:
cChunkSender(void);
cChunkSender(cWorld & a_World);
~cChunkSender();
enum eChunkPriority
{
E_CHUNK_PRIORITY_HIGH = 0,
E_CHUNK_PRIORITY_MEDIUM = 1,
E_CHUNK_PRIORITY_LOW = 2,
E_CHUNK_PRIORITY_HIGH = 0,
E_CHUNK_PRIORITY_MIDHIGH,
E_CHUNK_PRIORITY_MEDIUM,
E_CHUNK_PRIORITY_LOW,
};
bool Start(cWorld * a_World);
bool Start();
void Stop(void);
/// Notifies that a chunk has become ready and it should be sent to all its clients
void ChunkReady(int a_ChunkX, int a_ChunkZ);
/// Queues a chunk to be sent to a specific client
void QueueSendChunkTo(int a_ChunkX, int a_ChunkZ, eChunkPriority a_Priority, cClientHandle * a_Client);
void QueueSendChunkTo(int a_ChunkX, int a_ChunkZ, eChunkPriority a_Priority, std::list<cClientHandle *> a_Client);
/// Removes the a_Client from all waiting chunk send operations
void RemoveClient(cClientHandle * a_Client);
protected:
struct sChunkQueue
{
eChunkPriority m_Priority;
cChunkCoords m_Chunk;
bool operator <(const sChunkQueue & a_Other) const
{
/* The Standard Priority Queue sorts from biggest to smallest
return true here means you are smaller than the other object, and you get pushed down.
The priorities go from HIGH (0) to LOW (2), so a smaller priority should mean further up the list
therefore, return true (affirm we're "smaller", and get pushed down) only if our priority is bigger than theirs (they're more urgent)
*/
return this->m_Priority > a_Other.m_Priority;
}
};
/// Used for sending chunks to specific clients
struct sSendChunk
{
int m_ChunkX;
int m_ChunkZ;
cClientHandle * m_Client;
sSendChunk(int a_ChunkX, int a_ChunkZ, cClientHandle * a_Client) :
m_ChunkX(a_ChunkX),
m_ChunkZ(a_ChunkZ),
m_Client(a_Client)
cChunkCoords m_Chunk;
std::unordered_set<cClientHandle *> m_Clients;
eChunkPriority m_Priority;
sSendChunk(cChunkCoords a_Chunk, eChunkPriority a_Priority) :
m_Chunk(a_Chunk),
m_Priority(a_Priority)
{
}
bool operator ==(const sSendChunk & a_Other)
{
return (
(a_Other.m_ChunkX == m_ChunkX) &&
(a_Other.m_ChunkZ == m_ChunkZ) &&
(a_Other.m_Client == m_Client)
);
}
} ;
typedef std::list<sSendChunk> sSendChunkList;
struct sBlockCoord
{
int m_BlockX;
int m_BlockY;
int m_BlockZ;
sBlockCoord(int a_BlockX, int a_BlockY, int a_BlockZ) :
m_BlockX(a_BlockX),
m_BlockY(a_BlockY),
m_BlockZ(a_BlockZ)
{
}
} ;
typedef std::vector<sBlockCoord> sBlockCoords;
};
cWorld * m_World;
cWorld & m_World;
cCriticalSection m_CS;
cChunkCoordsList m_ChunksReady;
sSendChunkList m_SendChunksLowPriority;
sSendChunkList m_SendChunksMediumPriority;
sSendChunkList m_SendChunksHighPriority;
cEvent m_evtQueue; // Set when anything is added to m_ChunksReady
cEvent m_evtRemoved; // Set when removed clients are safe to be deleted
int m_RemoveCount; // Number of threads waiting for a client removal (m_evtRemoved needs to be set this many times)
std::priority_queue<sChunkQueue> m_SendChunks;
std::unordered_map<cChunkCoords, sSendChunk, cChunkCoordsHash> m_ChunkInfo;
cEvent m_evtQueue; // Set when anything is added to m_ChunksReady
cEvent m_evtRemoved; // Set when removed clients are safe to be deleted
// Data about the chunk that is being sent:
// NOTE that m_BlockData[] is inherited from the cChunkDataCollector
unsigned char m_BiomeMap[cChunkDef::Width * cChunkDef::Width];
sBlockCoords m_BlockEntities; // Coords of the block entities to send
std::vector<Vector3i> m_BlockEntities; // Coords of the block entities to send
// TODO: sEntityIDs m_Entities; // Entity-IDs of the entities to send
// cIsThread override:
@ -146,9 +135,8 @@ protected:
virtual void BlockEntity (cBlockEntity * a_Entity) override;
/// Sends the specified chunk to a_Client, or to all chunk clients if a_Client == nullptr
void SendChunk(int a_ChunkX, int a_ChunkZ, cClientHandle * a_Client);
void SendChunk(int a_ChunkX, int a_ChunkZ, std::unordered_set<cClientHandle *> a_Clients);
} ;

View File

@ -456,7 +456,7 @@ bool cClientHandle::StreamNextChunk(void)
// If the chunk already loading / loaded -> skip
if (
(std::find(m_ChunksToSend.begin(), m_ChunksToSend.end(), Coords) != m_ChunksToSend.end()) ||
(m_ChunksToSend.find(Coords) != m_ChunksToSend.end()) ||
(std::find(m_LoadedChunks.begin(), m_LoadedChunks.end(), Coords) != m_LoadedChunks.end())
)
{
@ -494,7 +494,7 @@ bool cClientHandle::StreamNextChunk(void)
// If the chunk already loading / loaded -> skip
if (
(std::find(m_ChunksToSend.begin(), m_ChunksToSend.end(), Coords) != m_ChunksToSend.end()) ||
(m_ChunksToSend.find(Coords) != m_ChunksToSend.end()) ||
(std::find(m_LoadedChunks.begin(), m_LoadedChunks.end(), Coords) != m_LoadedChunks.end())
)
{
@ -541,7 +541,7 @@ void cClientHandle::UnloadOutOfRangeChunks(void)
}
}
for (cChunkCoordsList::iterator itr = m_ChunksToSend.begin(); itr != m_ChunksToSend.end();)
for (auto itr = m_ChunksToSend.begin(); itr != m_ChunksToSend.end();)
{
int DiffX = Diff((*itr).m_ChunkX, ChunkPosX);
int DiffZ = Diff((*itr).m_ChunkZ, ChunkPosZ);
@ -583,7 +583,7 @@ void cClientHandle::StreamChunk(int a_ChunkX, int a_ChunkZ, cChunkSender::eChunk
{
cCSLock Lock(m_CSChunkLists);
m_LoadedChunks.push_back(cChunkCoords(a_ChunkX, a_ChunkZ));
m_ChunksToSend.push_back(cChunkCoords(a_ChunkX, a_ChunkZ));
m_ChunksToSend.emplace(a_ChunkX, a_ChunkZ);
}
World->SendChunkTo(a_ChunkX, a_ChunkZ, a_Priority, this);
}
@ -2179,15 +2179,12 @@ void cClientHandle::SendChunkData(int a_ChunkX, int a_ChunkZ, cChunkDataSerializ
bool Found = false;
{
cCSLock Lock(m_CSChunkLists);
for (cChunkCoordsList::iterator itr = m_ChunksToSend.begin(); itr != m_ChunksToSend.end(); ++itr)
auto itr = m_ChunksToSend.find(cChunkCoords{a_ChunkX, a_ChunkZ});
if (itr != m_ChunksToSend.end())
{
if ((itr->m_ChunkX == a_ChunkX) && (itr->m_ChunkZ == a_ChunkZ))
{
m_ChunksToSend.erase(itr);
Found = true;
break;
}
} // for itr - m_ChunksToSend[]
m_ChunksToSend.erase(itr);
Found = true;
}
}
if (!Found)
{
@ -2950,7 +2947,7 @@ bool cClientHandle::WantsSendChunk(int a_ChunkX, int a_ChunkZ)
}
cCSLock Lock(m_CSChunkLists);
return (std::find(m_ChunksToSend.begin(), m_ChunksToSend.end(), cChunkCoords(a_ChunkX, a_ChunkZ)) != m_ChunksToSend.end());
return m_ChunksToSend.find(cChunkCoords(a_ChunkX, a_ChunkZ)) != m_ChunksToSend.end();
}
@ -2966,9 +2963,9 @@ void cClientHandle::AddWantedChunk(int a_ChunkX, int a_ChunkZ)
LOGD("Adding chunk [%d, %d] to wanted chunks for client %p", a_ChunkX, a_ChunkZ, this);
cCSLock Lock(m_CSChunkLists);
if (std::find(m_ChunksToSend.begin(), m_ChunksToSend.end(), cChunkCoords(a_ChunkX, a_ChunkZ)) == m_ChunksToSend.end())
if (m_ChunksToSend.find(cChunkCoords(a_ChunkX, a_ChunkZ)) == m_ChunksToSend.end())
{
m_ChunksToSend.push_back(cChunkCoords(a_ChunkX, a_ChunkZ));
m_ChunksToSend.emplace(a_ChunkX, a_ChunkZ);
}
}

View File

@ -377,10 +377,10 @@ private:
AString m_Password;
Json::Value m_Properties;
cCriticalSection m_CSChunkLists;
cChunkCoordsList m_LoadedChunks; // Chunks that the player belongs to
cChunkCoordsList m_ChunksToSend; // Chunks that need to be sent to the player (queued because they weren't generated yet or there's not enough time to send them)
cChunkCoordsList m_SentChunks; // Chunks that are currently sent to the client
cCriticalSection m_CSChunkLists;
cChunkCoordsList m_LoadedChunks; // Chunks that the player belongs to
std::unordered_set<cChunkCoords, cChunkCoordsHash> m_ChunksToSend; // Chunks that need to be sent to the player (queued because they weren't generated yet or there's not enough time to send them)
cChunkCoordsList m_SentChunks; // Chunks that are currently sent to the client
cProtocol * m_Protocol;

View File

@ -13,7 +13,7 @@
cEvent::cEvent(void) :
m_ShouldWait(true)
m_ShouldContinue(false)
{
}
@ -23,12 +23,11 @@ cEvent::cEvent(void) :
void cEvent::Wait(void)
{
std::unique_lock<std::mutex> Lock(m_Mutex);
while (m_ShouldWait)
{
m_CondVar.wait(Lock);
std::unique_lock<std::mutex> Lock(m_Mutex);
m_CondVar.wait(Lock, [this](){ return m_ShouldContinue.load(); });
}
m_ShouldWait = true;
m_ShouldContinue = false;
}
@ -38,33 +37,13 @@ void cEvent::Wait(void)
bool cEvent::Wait(unsigned a_TimeoutMSec)
{
auto dst = std::chrono::system_clock::now() + std::chrono::milliseconds(a_TimeoutMSec);
std::unique_lock<std::mutex> Lock(m_Mutex); // We assume that this lock is acquired without much delay - we are the only user of the mutex
while (m_ShouldWait && (std::chrono::system_clock::now() <= dst))
bool Result;
{
switch (m_CondVar.wait_until(Lock, dst))
{
case std::cv_status::no_timeout:
{
// The wait was successful, check for spurious wakeup:
if (!m_ShouldWait)
{
m_ShouldWait = true;
return true;
}
// This was a spurious wakeup, wait again:
continue;
}
case std::cv_status::timeout:
{
// The wait timed out, return failure:
return false;
}
} // switch (wait_until())
} // while (m_ShouldWait && not timeout)
// The wait timed out in the while condition:
return false;
std::unique_lock<std::mutex> Lock(m_Mutex); // We assume that this lock is acquired without much delay - we are the only user of the mutex
Result = m_CondVar.wait_until(Lock, dst, [this](){ return m_ShouldContinue.load(); });
}
m_ShouldContinue = false;
return Result;
}
@ -73,13 +52,20 @@ bool cEvent::Wait(unsigned a_TimeoutMSec)
void cEvent::Set(void)
{
{
std::unique_lock<std::mutex> Lock(m_Mutex);
m_ShouldWait = false;
}
m_ShouldContinue = true;
m_CondVar.notify_one();
}
void cEvent::SetAll(void)
{
m_ShouldContinue = true;
m_CondVar.notify_all();
}

View File

@ -12,6 +12,7 @@
#include <mutex>
#include <condition_variable>
#include <atomic>
@ -28,7 +29,11 @@ public:
/** Sets the event - releases one thread that has been waiting in Wait().
If there was no thread waiting, the next call to Wait() will not block. */
void Set (void);
void Set(void);
/** Sets the event - releases all threads that have been waiting in Wait().
If there was no thread waiting, the next call to Wait() will not block. */
void SetAll(void);
/** Waits for the event until either it is signalled, or the (relative) timeout is passed.
Returns true if the event was signalled, false if the timeout was hit or there was an error. */
@ -37,9 +42,9 @@ public:
private:
/** Used for checking for spurious wakeups. */
bool m_ShouldWait;
std::atomic<bool> m_ShouldContinue;
/** Mutex protecting m_ShouldWait from multithreaded access. */
/** Mutex protecting m_ShouldContinue from multithreaded access. */
std::mutex m_Mutex;
/** The condition variable used as the Event. */

View File

@ -134,9 +134,9 @@ bool cIsThread::Wait(void)
m_Thread.join();
return true;
}
catch (std::system_error & a_Exception)
catch (const std::system_error & a_Exception)
{
LOGERROR("cIsThread::Wait error %i: could not join thread %s; %s", a_Exception.code().value(), m_ThreadName.c_str(), a_Exception.code().message().c_str());
LOGERROR("%s error %i: could not join thread %s; %s", __FUNCTION__, a_Exception.code().value(), m_ThreadName.c_str(), a_Exception.code().message().c_str());
return false;
}
}

View File

@ -32,10 +32,6 @@ protected:
/** The overriden Execute() method should check this value periodically and terminate if this is true. */
volatile bool m_ShouldTerminate;
private:
/** Wrapper for Execute() that waits for the initialization event, to prevent race conditions in thread initialization. */
void DoExecute(void);
public:
cIsThread(const AString & a_ThreadName);
virtual ~cIsThread();
@ -51,14 +47,21 @@ public:
/** Returns true if the thread calling this function is the thread contained within this object. */
bool IsCurrentThread(void) const { return std::this_thread::get_id() == m_Thread.get_id(); }
private:
protected:
/** The name of the thread, used to aid debugging in IDEs which support named threads */
AString m_ThreadName;
/** The thread object which holds the created thread for later manipulation */
std::thread m_Thread;
/** The event that is used to wait with the thread's execution until the thread object is fully initialized.
This prevents the IsCurrentThread() call to fail because of a race-condition. */
This prevents the IsCurrentThread() call to fail because of a race-condition where the thread starts before m_Thread has been fully assigned. */
cEvent m_evtStart;
/** Wrapper for Execute() that waits for the initialization event, to prevent race conditions in thread initialization. */
void DoExecute(void);
} ;

View File

@ -186,6 +186,7 @@ cWorld::cWorld(const AString & a_WorldName, eDimension a_Dimension, const AStrin
m_Scoreboard(this),
m_MapManager(this),
m_GeneratorCallbacks(*this),
m_ChunkSender(*this),
m_TickThread(*this)
{
LOGD("cWorld::cWorld(\"%s\")", a_WorldName.c_str());
@ -513,7 +514,7 @@ void cWorld::Start(void)
m_Lighting.Start(this);
m_Storage.Start(this, m_StorageSchema, m_StorageCompressionFactor);
m_Generator.Start(m_GeneratorCallbacks, m_GeneratorCallbacks, IniFile);
m_ChunkSender.Start(this);
m_ChunkSender.Start();
m_TickThread.Start();
// Init of the spawn monster time (as they are supposed to have different spawn rate)
@ -1330,6 +1331,30 @@ bool cWorld::DoWithChunk(int a_ChunkX, int a_ChunkZ, cChunkCallback & a_Callback
bool cWorld::DoWithChunk(int a_ChunkX, int a_ChunkZ, std::function<bool(cChunk &)> a_Callback)
{
struct cCallBackWrapper : cChunkCallback
{
cCallBackWrapper(std::function<bool(cChunk &)> a_InnerCallback) :
m_Callback(a_InnerCallback)
{
}
virtual bool Item(cChunk * a_Chunk)
{
return m_Callback(*a_Chunk);
}
private:
std::function<bool(cChunk &)> m_Callback;
} callback(a_Callback);
return m_ChunkMap->DoWithChunk(a_ChunkX, a_ChunkZ, callback);
}
bool cWorld::DoWithChunkAt(Vector3i a_BlockPos, std::function<bool(cChunk &)> a_Callback)
{
return m_ChunkMap->DoWithChunkAt(a_BlockPos, a_Callback);
@ -2005,15 +2030,6 @@ void cWorld::BroadcastChat(const cCompositeChat & a_Message, const cClientHandle
void cWorld::BroadcastChunkData(int a_ChunkX, int a_ChunkZ, cChunkDataSerializer & a_Serializer, const cClientHandle * a_Exclude)
{
m_ChunkMap->BroadcastChunkData(a_ChunkX, a_ChunkZ, a_Serializer, a_Exclude);
}
void cWorld::BroadcastCollectEntity(const cEntity & a_Entity, const cPlayer & a_Player, const cClientHandle * a_Exclude)
{
m_ChunkMap->BroadcastCollectEntity(a_Entity, a_Player, a_Exclude);
@ -2465,10 +2481,23 @@ void cWorld::SetChunkData(cSetChunkData & a_SetChunkData)
// If a client is requesting this chunk, send it to them:
int ChunkX = a_SetChunkData.GetChunkX();
int ChunkZ = a_SetChunkData.GetChunkZ();
if (m_ChunkMap->HasChunkAnyClients(ChunkX, ChunkZ))
{
m_ChunkSender.ChunkReady(ChunkX, ChunkZ);
}
cChunkSender & ChunkSender = m_ChunkSender;
DoWithChunk(
ChunkX, ChunkZ,
[&ChunkSender] (cChunk & a_Chunk) -> bool
{
if (a_Chunk.HasAnyClients())
{
ChunkSender.QueueSendChunkTo(
a_Chunk.GetPosX(),
a_Chunk.GetPosZ(),
cChunkSender::E_CHUNK_PRIORITY_MEDIUM,
a_Chunk.GetAllClients()
);
}
return true;
}
);
// Save the chunk right after generating, so that we don't have to generate it again on next run
if (a_SetChunkData.ShouldMarkDirty())

View File

@ -250,7 +250,6 @@ public:
void BroadcastChat (const cCompositeChat & a_Message, const cClientHandle * a_Exclude = nullptr);
// tolua_end
void BroadcastChunkData (int a_ChunkX, int a_ChunkZ, cChunkDataSerializer & a_Serializer, const cClientHandle * a_Exclude = nullptr);
void BroadcastCollectEntity (const cEntity & a_Pickup, const cPlayer & a_Player, const cClientHandle * a_Exclude = nullptr);
void BroadcastDestroyEntity (const cEntity & a_Entity, const cClientHandle * a_Exclude = nullptr);
void BroadcastEntityEffect (const cEntity & a_Entity, int a_EffectID, int a_Amplifier, short a_Duration, const cClientHandle * a_Exclude = nullptr);
@ -628,6 +627,7 @@ public:
/** Calls the callback for the chunk specified, with ChunkMapCS locked; returns false if the chunk doesn't exist, otherwise returns the same value as the callback */
bool DoWithChunk(int a_ChunkX, int a_ChunkZ, cChunkCallback & a_Callback);
bool DoWithChunk(int a_ChunkX, int a_ChunkZ, std::function<bool(cChunk &)> a_Callback);
/** Calls the callback for the chunk at the block position specified, with ChunkMapCS locked; returns false if the chunk doesn't exist, otherwise returns the same value as the callback **/
bool DoWithChunkAt(Vector3i a_BlockPos, std::function<bool(cChunk &)> a_Callback);