1
0

Fixed race conditions in cClientHandle's State.

This commit is contained in:
Mattes D 2016-11-19 23:24:01 +01:00
parent 4ff9973987
commit 2aecc7d701
2 changed files with 251 additions and 190 deletions

View File

@ -160,30 +160,34 @@ void cClientHandle::Destroy(void)
m_Link.reset(); m_Link.reset();
} }
{ {
cCSLock Lock(m_CSDestroyingState); cCSLock Lock(m_CSState);
if (m_State >= csDestroying) if (m_State >= csDestroying)
{ {
// Already called // Already called
LOGD("%s: client %p, \"%s\" already destroyed, bailing out", __FUNCTION__, static_cast<void *>(this), m_Username.c_str());
return; return;
} }
m_State = csDestroying; m_State = csDestroying;
} }
LOGD("%s: client %p, \"%s\"", __FUNCTION__, static_cast<void *>(this), m_Username.c_str()); LOGD("%s: destroying client %p, \"%s\" @ %s", __FUNCTION__, static_cast<void *>(this), m_Username.c_str(), m_IPString.c_str());
if (m_Player != nullptr)
{ {
cWorld * World = m_Player->GetWorld(); cCSLock lock(m_CSState);
if (World != nullptr)
{
m_Player->StopEveryoneFromTargetingMe();
m_Player->SetIsTicking(false);
World->RemovePlayer(m_Player, true);
}
m_Player->RemoveClientHandle();
}
m_State = csDestroyed; m_State = csDestroyed;
}
auto player = m_Player;
if (player != nullptr)
{
auto world = player->GetWorld();
if (world != nullptr)
{
player->StopEveryoneFromTargetingMe();
player->SetIsTicking(false);
world->RemovePlayer(player, true);
}
player->RemoveClientHandle();
}
} }
@ -318,6 +322,15 @@ void cClientHandle::Kick(const AString & a_Reason)
void cClientHandle::Authenticate(const AString & a_Name, const AString & a_UUID, const Json::Value & a_Properties) void cClientHandle::Authenticate(const AString & a_Name, const AString & a_UUID, const Json::Value & a_Properties)
{ {
cWorld * World;
{
cCSLock lock(m_CSState);
/*
LOGD("Processing authentication for client %s @ %s (%p), state = %d",
m_Username.c_str(), m_IPString.c_str(), static_cast<void *>(this), m_State.load()
);
//*/
if (m_State != csAuthenticating) if (m_State != csAuthenticating)
{ {
return; return;
@ -342,10 +355,16 @@ void cClientHandle::Authenticate(const AString & a_Name, const AString & a_UUID,
// Spawn player (only serversided, so data is loaded) // Spawn player (only serversided, so data is loaded)
m_Player = new cPlayer(m_Self, GetUsername()); m_Player = new cPlayer(m_Self, GetUsername());
/*
LOGD("Created a new cPlayer object at %p for client %s @ %s (%p)",
static_cast<void *>(m_Player),
m_Username.c_str(), m_IPString.c_str(), static_cast<void *>(this)
);
//*/
InvalidateCachedSentChunk(); InvalidateCachedSentChunk();
m_Self.reset(); m_Self.reset();
cWorld * World = cRoot::Get()->GetWorld(m_Player->GetLoadedWorldName()); World = cRoot::Get()->GetWorld(m_Player->GetLoadedWorldName());
if (World == nullptr) if (World == nullptr)
{ {
World = cRoot::Get()->GetDefaultWorld(); World = cRoot::Get()->GetDefaultWorld();
@ -396,6 +415,7 @@ void cClientHandle::Authenticate(const AString & a_Name, const AString & a_UUID,
m_Player->SetWorld(World); m_Player->SetWorld(World);
m_State = csAuthenticated; m_State = csAuthenticated;
}
// Query player team // Query player team
m_Player->UpdateTeam(); m_Player->UpdateTeam();
@ -411,6 +431,7 @@ void cClientHandle::Authenticate(const AString & a_Name, const AString & a_UUID,
m_PingStartTime = std::chrono::steady_clock::now() + std::chrono::seconds(3); // Send the first KeepAlive packet in 3 seconds m_PingStartTime = std::chrono::steady_clock::now() + std::chrono::seconds(3); // Send the first KeepAlive packet in 3 seconds
cRoot::Get()->GetPluginManager()->CallHookPlayerSpawned(*m_Player); cRoot::Get()->GetPluginManager()->CallHookPlayerSpawned(*m_Player);
// LOGD("Client %s @ %s (%p) has been fully authenticated", m_Username.c_str(), m_IPString.c_str(), static_cast<void *>(this));
} }
@ -661,6 +682,20 @@ void cClientHandle::HandlePing(void)
bool cClientHandle::HandleLogin(UInt32 a_ProtocolVersion, const AString & a_Username) bool cClientHandle::HandleLogin(UInt32 a_ProtocolVersion, const AString & a_Username)
{ {
{
cCSLock lock(m_CSState);
if (m_State != csConnected)
{
/*
LOGD("Client %s @ %s (%p, state %d) has disconnected before logging in, bailing out of login",
a_Username.c_str(), m_IPString.c_str(), static_cast<void *>(this), m_State.load()
);
//*/
return false;
}
// LOGD("Handling login for client %s @ %s (%p), state = %d", a_Username.c_str(), m_IPString.c_str(), static_cast<void *>(this), m_State.load());
// If the protocol version hasn't been set yet, set it now: // If the protocol version hasn't been set yet, set it now:
if (m_ProtocolVersion == 0) if (m_ProtocolVersion == 0)
{ {
@ -675,9 +710,10 @@ bool cClientHandle::HandleLogin(UInt32 a_ProtocolVersion, const AString & a_User
Destroy(); Destroy();
return false; return false;
} }
m_State = csAuthenticating;
} // lock(m_CSState)
// Schedule for authentication; until then, let the player wait (but do not block) // Schedule for authentication; until then, let the player wait (but do not block)
m_State = csAuthenticating;
cRoot::Get()->GetAuthenticator().Authenticate(GetUniqueID(), GetUsername(), m_Protocol->GetAuthServerID()); cRoot::Get()->GetAuthenticator().Authenticate(GetUniqueID(), GetUsername(), m_Protocol->GetAuthServerID());
return true; return true;
} }
@ -2007,31 +2043,7 @@ void cClientHandle::Tick(float a_Dt)
m_BreakProgress += m_Player->GetPlayerRelativeBlockHardness(Block); m_BreakProgress += m_Player->GetPlayerRelativeBlockHardness(Block);
} }
// Process received network data: ProcessProtocolInOut();
AString IncomingData;
{
cCSLock Lock(m_CSIncomingData);
std::swap(IncomingData, m_IncomingData);
}
if (!IncomingData.empty())
{
m_Protocol->DataReceived(IncomingData.data(), IncomingData.size());
}
// Send any queued outgoing data:
AString OutgoingData;
{
cCSLock Lock(m_CSOutgoingData);
std::swap(OutgoingData, m_OutgoingData);
}
if (!OutgoingData.empty())
{
cTCPLinkPtr Link(m_Link); // Grab a copy of the link in a multithread-safe way
if ((Link != nullptr))
{
Link->Send(OutgoingData.data(), OutgoingData.size());
}
}
m_TicksSinceLastPacket += 1; m_TicksSinceLastPacket += 1;
if (m_TicksSinceLastPacket > 600) // 30 seconds time-out if (m_TicksSinceLastPacket > 600) // 30 seconds time-out
@ -2040,17 +2052,27 @@ void cClientHandle::Tick(float a_Dt)
return; return;
} }
// If destruction is queued, destroy now:
if (m_State == csQueuedForDestruction)
{
LOGD("Client %s @ %s (%p) has been queued for destruction, destroying now.",
m_Username.c_str(), m_IPString.c_str(), static_cast<void *>(this)
);
Destroy();
return;
}
// Only process further if the player object is valid:
if (m_Player == nullptr) if (m_Player == nullptr)
{ {
return; return;
} }
// Freeze the player if they are standing in a chunk not yet sent to the client
// Freeze the player if it is standing on a chunk not yet sent to the client
m_HasSentPlayerChunk = false; m_HasSentPlayerChunk = false;
if (m_Player->GetParentChunk() != nullptr) if (m_Player->GetParentChunk() != nullptr)
{ {
// If the chunk is invalid, do not bother checking if it's sent to the client, it is definitely not // If the chunk is invalid, it has definitely not been sent to the client yet
if (m_Player->GetParentChunk()->IsValid()) if (m_Player->GetParentChunk()->IsValid())
{ {
// Before iterating m_SentChunks, see if the player's coords equal m_CachedSentChunk // Before iterating m_SentChunks, see if the player's coords equal m_CachedSentChunk
@ -2075,11 +2097,14 @@ void cClientHandle::Tick(float a_Dt)
} }
// If the chunk the player's in was just sent, spawn the player: // If the chunk the player's in was just sent, spawn the player:
{
cCSLock lock(m_CSState);
if (m_HasSentPlayerChunk && (m_State == csDownloadingWorld)) if (m_HasSentPlayerChunk && (m_State == csDownloadingWorld))
{ {
m_Protocol->SendPlayerMoveLook(); m_Protocol->SendPlayerMoveLook();
m_State = csPlaying; m_State = csPlaying;
} }
} // lock(m_CSState)
// Send a ping packet: // Send a ping packet:
if (m_State == csPlaying) if (m_State == csPlaying)
@ -2092,7 +2117,7 @@ void cClientHandle::Tick(float a_Dt)
} }
} }
if ((m_State >= csAuthenticated) && (m_State < csDestroying)) if ((m_State >= csAuthenticated) && (m_State < csQueuedForDestruction))
{ {
// Stream 4 chunks per tick // Stream 4 chunks per tick
for (int i = 0; i < 4; i++) for (int i = 0; i < 4; i++)
@ -2138,28 +2163,20 @@ void cClientHandle::Tick(float a_Dt)
void cClientHandle::ServerTick(float a_Dt) void cClientHandle::ServerTick(float a_Dt)
{ {
// Process received network data: ProcessProtocolInOut();
AString IncomingData;
// If destruction is queued, destroy now:
if (m_State == csQueuedForDestruction)
{ {
cCSLock Lock(m_CSIncomingData); LOGD("Client %s @ %s (%p) has been queued for destruction, destroying now.",
std::swap(IncomingData, m_IncomingData); m_Username.c_str(), m_IPString.c_str(), static_cast<void *>(this)
} );
if (!IncomingData.empty()) Destroy();
{ return;
m_Protocol->DataReceived(IncomingData.data(), IncomingData.size());
} }
// Send any queued outgoing data:
AString OutgoingData;
{ {
cCSLock Lock(m_CSOutgoingData); cCSLock lock(m_CSState);
std::swap(OutgoingData, m_OutgoingData);
}
if ((m_Link != nullptr) && !OutgoingData.empty())
{
m_Link->Send(OutgoingData.data(), OutgoingData.size());
}
if (m_State == csAuthenticated) if (m_State == csAuthenticated)
{ {
StreamNextChunk(); StreamNextChunk();
@ -2172,6 +2189,7 @@ void cClientHandle::ServerTick(float a_Dt)
m_Player->Initialize(*(m_Player->GetWorld())); m_Player->Initialize(*(m_Player->GetWorld()));
return; return;
} }
} // lock(m_CSState)
m_TicksSinceLastPacket += 1; m_TicksSinceLastPacket += 1;
if (m_TicksSinceLastPacket > 600) // 30 seconds if (m_TicksSinceLastPacket > 600) // 30 seconds
@ -3138,7 +3156,7 @@ bool cClientHandle::HasPluginChannel(const AString & a_PluginChannel)
bool cClientHandle::WantsSendChunk(int a_ChunkX, int a_ChunkZ) bool cClientHandle::WantsSendChunk(int a_ChunkX, int a_ChunkZ)
{ {
if (m_State >= csDestroying) if (m_State >= csQueuedForDestruction)
{ {
return false; return false;
} }
@ -3153,7 +3171,7 @@ bool cClientHandle::WantsSendChunk(int a_ChunkX, int a_ChunkZ)
void cClientHandle::AddWantedChunk(int a_ChunkX, int a_ChunkZ) void cClientHandle::AddWantedChunk(int a_ChunkX, int a_ChunkZ)
{ {
if (m_State >= csDestroying) if (m_State >= csQueuedForDestruction)
{ {
return; return;
} }
@ -3207,24 +3225,22 @@ void cClientHandle::PacketError(UInt32 a_PacketType)
void cClientHandle::SocketClosed(void) void cClientHandle::SocketClosed(void)
{ {
// The socket has been closed for any reason // The socket has been closed for any reason
/*
LOGD("SocketClosed for client %s @ %s (%p), state = %d, m_Player = %p",
m_Username.c_str(), m_IPString.c_str(), static_cast<void *>(this), m_State.load(), static_cast<void *>(m_Player)
);
//*/
if (!m_Username.empty()) // Ignore client pings // Log into console, unless it's a client ping:
if (!m_Username.empty())
{ {
LOGD("Client %s @ %s disconnected", m_Username.c_str(), m_IPString.c_str()); LOGD("Client %s @ %s disconnected", m_Username.c_str(), m_IPString.c_str());
cRoot::Get()->GetPluginManager()->CallHookDisconnect(*this, "Player disconnected"); cRoot::Get()->GetPluginManager()->CallHookDisconnect(*this, "Player disconnected");
} }
if (m_Player != nullptr)
{ // Queue self for destruction:
m_Player->GetWorld()->QueueTask([this](cWorld & World) cCSLock lock(m_CSState);
{ m_State = csQueuedForDestruction;
UNUSED(World);
Destroy();
});
}
else
{
Destroy();
}
} }
@ -3241,6 +3257,36 @@ void cClientHandle::SetSelf(cClientHandlePtr a_Self)
void cClientHandle::ProcessProtocolInOut(void)
{
// Process received network data:
AString IncomingData;
{
cCSLock Lock(m_CSIncomingData);
std::swap(IncomingData, m_IncomingData);
}
if (!IncomingData.empty())
{
m_Protocol->DataReceived(IncomingData.data(), IncomingData.size());
}
// Send any queued outgoing data:
AString OutgoingData;
{
cCSLock Lock(m_CSOutgoingData);
std::swap(OutgoingData, m_OutgoingData);
}
auto link = m_Link;
if ((link != nullptr) && !OutgoingData.empty())
{
link->Send(OutgoingData.data(), OutgoingData.size());
}
}
void cClientHandle::OnLinkCreated(cTCPLinkPtr a_Link) void cClientHandle::OnLinkCreated(cTCPLinkPtr a_Link)
{ {
m_Link = a_Link; m_Link = a_Link;
@ -3266,6 +3312,11 @@ void cClientHandle::OnReceivedData(const char * a_Data, size_t a_Length)
void cClientHandle::OnRemoteClosed(void) void cClientHandle::OnRemoteClosed(void)
{ {
/*
LOGD("Client socket for %s @ %s has been closed.",
m_Username.c_str(), m_IPString.c_str()
);
//*/
{ {
cCSLock Lock(m_CSOutgoingData); cCSLock Lock(m_CSOutgoingData);
m_Link.reset(); m_Link.reset();

View File

@ -471,16 +471,22 @@ private:
csDownloadingWorld, ///< The client is waiting for chunks, we're waiting for the loader to provide and send them csDownloadingWorld, ///< The client is waiting for chunks, we're waiting for the loader to provide and send them
csConfirmingPos, ///< The client has been sent the position packet, waiting for them to repeat the position back csConfirmingPos, ///< The client has been sent the position packet, waiting for them to repeat the position back
csPlaying, ///< Normal gameplay csPlaying, ///< Normal gameplay
csQueuedForDestruction, ///< The client will be destroyed in the next tick (flag set when socket closed)
csDestroying, ///< The client is being destroyed, don't queue any more packets / don't add to chunks csDestroying, ///< The client is being destroyed, don't queue any more packets / don't add to chunks
csDestroyed, ///< The client has been destroyed, the destructor is to be called from the owner thread csDestroyed, ///< The client has been destroyed, the destructor is to be called from the owner thread
// TODO: Add Kicking here as well // TODO: Add Kicking here as well
} ; } ;
std::atomic<eState> m_State; /* Mutex protecting m_State from concurrent writes. */
cCriticalSection m_CSState;
/** m_State needs to be locked in the Destroy() function so that the destruction code doesn't run twice on two different threads */ /** The current (networking) state of the client.
cCriticalSection m_CSDestroyingState; Protected from concurrent writes by m_CSState; but may be read by other threads concurrently.
If a function depends on m_State or wants to change m_State, it needs to lock m_CSState.
However, if it only uses m_State for a quick bail out, or it doesn't break if the client disconnects in the middle of it,
it may just read m_State without locking m_CSState. */
std::atomic<eState> m_State;
/** If set to true during csDownloadingWorld, the tick thread calls CheckIfWorldDownloaded() */ /** If set to true during csDownloadingWorld, the tick thread calls CheckIfWorldDownloaded() */
bool m_ShouldCheckDownloaded; bool m_ShouldCheckDownloaded;
@ -556,6 +562,10 @@ private:
/** Called right after the instance is created to store its SharedPtr inside. */ /** Called right after the instance is created to store its SharedPtr inside. */
void SetSelf(cClientHandlePtr a_Self); void SetSelf(cClientHandlePtr a_Self);
/** Processes the data in the network input and output buffers.
Called by both Tick() and ServerTick(). */
void ProcessProtocolInOut(void);
// cTCPLink::cCallbacks overrides: // cTCPLink::cCallbacks overrides:
virtual void OnLinkCreated(cTCPLinkPtr a_Link) override; virtual void OnLinkCreated(cTCPLinkPtr a_Link) override;
virtual void OnReceivedData(const char * a_Data, size_t a_Length) override; virtual void OnReceivedData(const char * a_Data, size_t a_Length) override;