1
0

Rewritten SocketThreads for proper shutdown scenario.

This fixes #560 and #390.
This commit is contained in:
madmaxoft 2014-01-19 19:31:43 +01:00
parent cce063b0cd
commit 3c0e8c8da0
8 changed files with 189 additions and 178 deletions

View File

@ -120,9 +120,6 @@ cClientHandle::~cClientHandle()
LOGD("Deleting client \"%s\" at %p", GetUsername().c_str(), this);
// Remove from cSocketThreads, we're not to be called anymore:
cRoot::Get()->GetServer()->ClientDestroying(this);
{
cCSLock Lock(m_CSChunkLists);
m_LoadedChunks.clear();
@ -160,8 +157,7 @@ cClientHandle::~cClientHandle()
cRoot::Get()->GetServer()->WriteToClient(this, Data);
}
// Queue the socket to close as soon as it sends all outgoing data:
cRoot::Get()->GetServer()->QueueClientClose(this);
// Close the socket as soon as it sends all outgoing data:
cRoot::Get()->GetServer()->RemoveClient(this);
delete m_Protocol;

View File

@ -26,7 +26,7 @@ cHTTPConnection::cHTTPConnection(cHTTPServer & a_HTTPServer) :
cHTTPConnection::~cHTTPConnection()
{
// LOGD("HTTP: Del connection at %p", this);
delete m_CurrentRequest;
}

View File

@ -87,6 +87,25 @@ void cSocket::CloseSocket()
void cSocket::ShutdownReadWrite(void)
{
#ifdef _WIN32
int res = shutdown(m_Socket, SD_BOTH);
#else
int res = shutdown(m_Socket, SHUT_RDWR);
#endif
if (res != 0)
{
LOGWARN("%s: Error shutting down socket %d (%s): %d (%s)",
__FUNCTION__, m_Socket, m_IPString.c_str(), this->GetLastError(), GetLastErrorString().c_str()
);
}
}
AString cSocket::GetErrorString( int a_ErrNo )
{
char buffer[ 1024 ];

View File

@ -40,6 +40,10 @@ public:
bool IsValid(void) const { return IsValidSocket(m_Socket); }
void CloseSocket(void);
/** Notifies the socket that we don't expect any more reads nor writes on it.
Most TCPIP implementations use this to send the FIN flag in a packet */
void ShutdownReadWrite(void);
operator xSocket(void) const;
xSocket GetSocket(void) const;

View File

@ -132,47 +132,6 @@ void cSocketThreads::Write(const cCallback * a_Client, const AString & a_Data)
/// Stops reading from the socket - when this call returns, no more calls to the callbacks are made
void cSocketThreads::StopReading(const cCallback * a_Client)
{
cCSLock Lock(m_CS);
for (cSocketThreadList::iterator itr = m_Threads.begin(); itr != m_Threads.end(); ++itr)
{
if ((*itr)->StopReading(a_Client))
{
return;
}
} // for itr - m_Threads[]
// Cannot assert, this normally happens if the socket is closed before the client deinitializes
// ASSERT(!"Stopping reading on an unknown client");
}
/// Queues the socket for closing, as soon as its outgoing data is sent
void cSocketThreads::QueueClose(const cCallback * a_Client)
{
LOGD("QueueClose(client %p)", a_Client);
cCSLock Lock(m_CS);
for (cSocketThreadList::iterator itr = m_Threads.begin(); itr != m_Threads.end(); ++itr)
{
if ((*itr)->QueueClose(a_Client))
{
return;
}
} // for itr - m_Threads[]
ASSERT(!"Queueing close of an unknown client");
}
////////////////////////////////////////////////////////////////////////////////
// cSocketThreads::cSocketThread:
@ -210,13 +169,13 @@ cSocketThreads::cSocketThread::~cSocketThread()
void cSocketThreads::cSocketThread::AddClient(const cSocket & a_Socket, cCallback * a_Client)
{
ASSERT(m_Parent->m_CS.IsLockedByCurrentThread());
ASSERT(m_NumSlots < MAX_SLOTS); // Use HasEmptySlot() to check before adding
m_Slots[m_NumSlots].m_Client = a_Client;
m_Slots[m_NumSlots].m_Socket = a_Socket;
m_Slots[m_NumSlots].m_Outgoing.clear();
m_Slots[m_NumSlots].m_ShouldClose = false;
m_Slots[m_NumSlots].m_ShouldCallClient = true;
m_Slots[m_NumSlots].m_State = sSlot::ssNormal;
m_NumSlots++;
// Notify the thread of the change:
@ -230,7 +189,7 @@ void cSocketThreads::cSocketThread::AddClient(const cSocket & a_Socket, cCallbac
bool cSocketThreads::cSocketThread::RemoveClient(const cCallback * a_Client)
{
// Returns true if removed, false if not found
ASSERT(m_Parent->m_CS.IsLockedByCurrentThread());
if (m_NumSlots == 0)
{
@ -244,8 +203,29 @@ bool cSocketThreads::cSocketThread::RemoveClient(const cCallback * a_Client)
continue;
}
// Found, remove it:
// Found the slot:
if (m_Slots[i].m_State == sSlot::ssRemoteClosed)
{
// The remote has already closed the socket, remove the slot altogether:
m_Slots[i] = m_Slots[--m_NumSlots];
}
else
{
// Query and queue the last batch of outgoing data:
m_Slots[i].m_Client->GetOutgoingData(m_Slots[i].m_Outgoing);
if (m_Slots[i].m_Outgoing.empty())
{
// No more outgoing data, shut the socket down immediately:
m_Slots[i].m_Socket.ShutdownReadWrite();
m_Slots[i].m_State = sSlot::ssShuttingDown;
}
else
{
// More data to send, shut down reading and wait for the rest to get sent:
m_Slots[i].m_State = sSlot::ssWritingRestOut;
}
m_Slots[i].m_Client = NULL;
}
// Notify the thread of the change:
ASSERT(m_ControlSocket2.IsValid());
@ -263,6 +243,8 @@ bool cSocketThreads::cSocketThread::RemoveClient(const cCallback * a_Client)
bool cSocketThreads::cSocketThread::HasClient(const cCallback * a_Client) const
{
ASSERT(m_Parent->m_CS.IsLockedByCurrentThread());
for (int i = m_NumSlots - 1; i >= 0; --i)
{
if (m_Slots[i].m_Client == a_Client)
@ -295,6 +277,8 @@ bool cSocketThreads::cSocketThread::HasSocket(const cSocket * a_Socket) const
bool cSocketThreads::cSocketThread::NotifyWrite(const cCallback * a_Client)
{
ASSERT(m_Parent->m_CS.IsLockedByCurrentThread());
if (HasClient(a_Client))
{
// Notify the thread that there's another packet in the queue:
@ -311,7 +295,7 @@ bool cSocketThreads::cSocketThread::NotifyWrite(const cCallback * a_Client)
bool cSocketThreads::cSocketThread::Write(const cCallback * a_Client, const AString & a_Data)
{
// Returns true if socket handled by this thread
ASSERT(m_Parent->m_CS.IsLockedByCurrentThread());
for (int i = m_NumSlots - 1; i >= 0; --i)
{
if (m_Slots[i].m_Client == a_Client)
@ -332,47 +316,6 @@ bool cSocketThreads::cSocketThread::Write(const cCallback * a_Client, const AStr
bool cSocketThreads::cSocketThread::StopReading (const cCallback * a_Client)
{
// Returns true if client handled by this thread
for (int i = m_NumSlots - 1; i >= 0; --i)
{
if (m_Slots[i].m_Client == a_Client)
{
m_Slots[i].m_ShouldCallClient = false;
return true;
}
} // for i - m_Slots[]
return false;
}
bool cSocketThreads::cSocketThread::QueueClose(const cCallback * a_Client)
{
// Returns true if socket handled by this thread
for (int i = m_NumSlots - 1; i >= 0; --i)
{
if (m_Slots[i].m_Client == a_Client)
{
m_Slots[i].m_ShouldClose = true;
// Notify the thread that there's a close queued (in case its conditions are already met):
ASSERT(m_ControlSocket2.IsValid());
m_ControlSocket2.Send("c", 1);
return true;
}
} // for i - m_Slots[]
return false;
}
bool cSocketThreads::cSocketThread::Start(void)
{
// Create the control socket listener
@ -446,10 +389,13 @@ void cSocketThreads::cSocketThread::Execute(void)
fd_set fdRead;
cSocket::xSocket Highest = m_ControlSocket1.GetSocket();
PrepareSet(&fdRead, Highest);
PrepareSet(&fdRead, Highest, false);
// Wait for the sockets:
if (select(Highest + 1, &fdRead, NULL, NULL, NULL) == -1)
timeval Timeout;
Timeout.tv_sec = 5;
Timeout.tv_usec = 0;
if (select(Highest + 1, &fdRead, NULL, NULL, &Timeout) == -1)
{
LOG("select(R) call failed in cSocketThread: \"%s\"", cSocket::GetLastErrorString().c_str());
continue;
@ -460,8 +406,7 @@ void cSocketThreads::cSocketThread::Execute(void)
// Test sockets for writing:
fd_set fdWrite;
Highest = m_ControlSocket1.GetSocket();
PrepareSet(&fdWrite, Highest);
timeval Timeout;
PrepareSet(&fdWrite, Highest, true);
Timeout.tv_sec = 0;
Timeout.tv_usec = 0;
if (select(Highest + 1, NULL, &fdWrite, NULL, &Timeout) == -1)
@ -471,6 +416,8 @@ void cSocketThreads::cSocketThread::Execute(void)
}
WriteToSockets(&fdWrite);
CleanUpShutSockets();
} // while (!mShouldTerminate)
}
@ -478,7 +425,7 @@ void cSocketThreads::cSocketThread::Execute(void)
void cSocketThreads::cSocketThread::PrepareSet(fd_set * a_Set, cSocket::xSocket & a_Highest)
void cSocketThreads::cSocketThread::PrepareSet(fd_set * a_Set, cSocket::xSocket & a_Highest, bool a_IsForWriting)
{
FD_ZERO(a_Set);
FD_SET(m_ControlSocket1.GetSocket(), a_Set);
@ -490,6 +437,11 @@ void cSocketThreads::cSocketThread::PrepareSet(fd_set * a_Set, cSocket::xSocket
{
continue;
}
if (m_Slots[i].m_State == sSlot::ssRemoteClosed)
{
// This socket won't provide nor consume any data anymore, don't put it in the Set
continue;
}
cSocket::xSocket s = m_Slots[i].m_Socket.GetSocket();
FD_SET(s, a_Set);
if (s > a_Highest)
@ -525,29 +477,42 @@ void cSocketThreads::cSocketThread::ReadFromSockets(fd_set * a_Read)
}
char Buffer[1024];
int Received = m_Slots[i].m_Socket.Receive(Buffer, ARRAYCOUNT(Buffer), 0);
if (Received == 0)
if (Received <= 0)
{
// The socket has been closed by the remote party, close our socket and let it be removed after we process all reading
m_Slots[i].m_Socket.CloseSocket();
if (m_Slots[i].m_ShouldCallClient)
// The socket has been closed by the remote party
switch (m_Slots[i].m_State)
{
case sSlot::ssNormal:
{
// Notify the callback that the remote has closed the socket; keep the slot
m_Slots[i].m_Client->SocketClosed();
m_Slots[i].m_State = sSlot::ssRemoteClosed;
break;
}
}
else if (Received > 0)
case sSlot::ssWritingRestOut:
case sSlot::ssShuttingDown:
case sSlot::ssShuttingDown2:
{
if (m_Slots[i].m_ShouldCallClient)
{
m_Slots[i].m_Client->DataReceived(Buffer, Received);
// Force-close the socket and remove the slot:
m_Slots[i].m_Socket.CloseSocket();
m_Slots[i] = m_Slots[--m_NumSlots];
break;
}
default:
{
LOG("%s: Unexpected socket state: %d (%s)",
__FUNCTION__, m_Slots[i].m_Socket.GetSocket(), m_Slots[i].m_Socket.GetIPString().c_str()
);
ASSERT(!"Unexpected socket state");
break;
}
} // switch (m_Slots[i].m_State)
}
else
{
// The socket has encountered an error, close it and let it be removed after we process all reading
m_Slots[i].m_Socket.CloseSocket();
if (m_Slots[i].m_ShouldCallClient)
if (m_Slots[i].m_Client != NULL)
{
m_Slots[i].m_Client->SocketClosed();
m_Slots[i].m_Client->DataReceived(Buffer, Received);
}
}
} // for i - m_Slots[]
@ -571,22 +536,17 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write)
if (m_Slots[i].m_Outgoing.empty())
{
// Request another chunk of outgoing data:
if (m_Slots[i].m_ShouldCallClient)
if (m_Slots[i].m_Client != NULL)
{
m_Slots[i].m_Client->GetOutgoingData(m_Slots[i].m_Outgoing);
}
if (m_Slots[i].m_Outgoing.empty())
{
// Nothing ready
if (m_Slots[i].m_ShouldClose)
// No outgoing data is ready
if (m_Slots[i].m_State == sSlot::ssWritingRestOut)
{
// Socket was queued for closing and there's no more data to send, close it now:
// DEBUG
LOGD("Socket was queued for closing, closing now. Slot %d, client %p, socket %d", i, m_Slots[i].m_Client, m_Slots[i].m_Socket.GetSocket());
m_Slots[i].m_Socket.CloseSocket();
// The slot must be freed actively by the client, using RemoveClient()
m_Slots[i].m_State = sSlot::ssShuttingDown;
m_Slots[i].m_Socket.ShutdownReadWrite();
}
continue;
}
@ -598,7 +558,7 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write)
int Err = cSocket::GetLastError();
LOGWARNING("Error %d while writing to client \"%s\", disconnecting. \"%s\"", Err, m_Slots[i].m_Socket.GetIPString().c_str(), cSocket::GetErrorString(Err).c_str());
m_Slots[i].m_Socket.CloseSocket();
if (m_Slots[i].m_ShouldCallClient)
if (m_Slots[i].m_Client != NULL)
{
m_Slots[i].m_Client->SocketClosed();
}
@ -606,6 +566,12 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write)
}
m_Slots[i].m_Outgoing.erase(0, Sent);
if (m_Slots[i].m_Outgoing.empty() && (m_Slots[i].m_State == sSlot::ssWritingRestOut))
{
m_Slots[i].m_State = sSlot::ssShuttingDown;
m_Slots[i].m_Socket.ShutdownReadWrite();
}
// _X: If there's data left, it means the client is not reading fast enough, the server would unnecessarily spin in the main loop with zero actions taken; so signalling is disabled
// This means that if there's data left, it will be sent only when there's incoming data or someone queues another packet (for any socket handled by this thread)
/*
@ -622,3 +588,31 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write)
void cSocketThreads::cSocketThread::CleanUpShutSockets(void)
{
for (int i = m_NumSlots - 1; i >= 0; i--)
{
switch (m_Slots[i].m_State)
{
case sSlot::ssShuttingDown2:
{
// The socket has reached the shutdown timeout, close it and clear its slot:
m_Slots[i].m_Socket.CloseSocket();
m_Slots[i] = m_Slots[--m_NumSlots];
break;
}
case sSlot::ssShuttingDown:
{
// The socket has been shut down for a single thread loop, let it loop once more before closing:
m_Slots[i].m_State = sSlot::ssShuttingDown2;
break;
}
default: break;
}
} // for i - m_Slots[]
}

View File

@ -7,19 +7,20 @@
/*
Additional details:
When a client is terminating a connection:
- they call the StopReading() method to disable callbacks for the incoming data
- they call the Write() method to queue any outstanding outgoing data
- they call the QueueClose() method to queue the socket to close after outgoing data has been sent.
When a socket slot is marked as having no callback, it is kept alive until its outgoing data queue is empty and its m_ShouldClose flag is set.
This means that the socket can be written to several times before finally closing it via QueueClose()
When a client wants to terminate the connection, they call the RemoveClient() function. This calls the
callback one last time to read all the available outgoing data, putting it in the slot's m_OutgoingData
buffer. Then it marks the slot as having no callback. The socket is kept alive until its outgoing data
queue is empty, then shutdown is called on it and finally the socket is closed after a timeout.
If at any time within this the remote end closes the socket, then the socket is closed directly.
As soon as the socket is closed, the slot is finally removed from the SocketThread.
The graph in $/docs/SocketThreads States.gv shows the state-machine transitions of the slot.
*/
/// How many clients should one thread handle? (must be less than FD_SETSIZE for your platform)
/** How many clients should one thread handle? (must be less than FD_SETSIZE for your platform) */
#define MAX_SLOTS 63
@ -27,8 +28,6 @@ This means that the socket can be written to several times before finally closin
#pragma once
#ifndef CSOCKETTHREADS_H_INCLUDED
#define CSOCKETTHREADS_H_INCLUDED
#include "Socket.h"
#include "IsThread.h"
@ -64,13 +63,13 @@ public:
// Force a virtual destructor in all subclasses:
virtual ~cCallback() {}
/// Called when data is received from the remote party
/** Called when data is received from the remote party */
virtual void DataReceived(const char * a_Data, int a_Size) = 0;
/// Called when data can be sent to remote party; the function is supposed to append outgoing data to a_Data
/** Called when data can be sent to remote party; the function is supposed to *append* outgoing data to a_Data */
virtual void GetOutgoingData(AString & a_Data) = 0;
/// Called when the socket has been closed for any reason
/** Called when the socket has been closed for any reason */
virtual void SocketClosed(void) = 0;
} ;
@ -78,26 +77,21 @@ public:
cSocketThreads(void);
~cSocketThreads();
/// Add a (socket, client) pair for processing, data from a_Socket is to be sent to a_Client; returns true if successful
/** Add a (socket, client) pair for processing, data from a_Socket is to be sent to a_Client; returns true if successful */
bool AddClient(const cSocket & a_Socket, cCallback * a_Client);
/** Remove the associated socket and the client from processing.
The socket is left to send its data and is removed only after all its m_OutgoingData is sent
The socket is left to send its last outgoing data and is removed only after all its m_Outgoing is sent
and after the socket is properly shutdown (unless the remote disconnects before that)
*/
void RemoveClient(const cCallback * a_Client);
/// Notify the thread responsible for a_Client that the client has something to write
/** Notify the thread responsible for a_Client that the client has something to write */
void NotifyWrite(const cCallback * a_Client);
/// Puts a_Data into outgoing data queue for a_Client
/** Puts a_Data into outgoing data queue for a_Client */
void Write(const cCallback * a_Client, const AString & a_Data);
/// Stops reading from the client - when this call returns, no more calls to the callbacks are made
void StopReading(const cCallback * a_Client);
/// Queues the client for closing, as soon as its outgoing data is sent
void QueueClose(const cCallback * a_Client);
private:
class cSocketThread :
@ -120,8 +114,6 @@ private:
bool HasSocket (const cSocket * a_Socket) const;
bool NotifyWrite (const cCallback * a_Client); // Returns true if client handled by this thread
bool Write (const cCallback * a_Client, const AString & a_Data); // Returns true if client handled by this thread
bool StopReading (const cCallback * a_Client); // Returns true if client handled by this thread
bool QueueClose (const cCallback * a_Client); // Returns true if client handled by this thread
bool Start(void); // Hide the cIsThread's Start method, we need to provide our own startup to create the control socket
@ -135,24 +127,45 @@ private:
cSocket m_ControlSocket1;
cSocket m_ControlSocket2;
// Socket-client-packetqueues triplets.
// Socket-client-dataqueues-state quadruplets.
// Manipulation with these assumes that the parent's m_CS is locked
struct sSlot
{
cSocket m_Socket; // The socket is primarily owned by this
/** The socket is primarily owned by this object */
cSocket m_Socket;
/** The callback to call for events. May be NULL */
cCallback * m_Client;
AString m_Outgoing; // If sending writes only partial data, the rest is stored here for another send
bool m_ShouldClose; // If true, the socket is to be closed after sending all outgoing data
bool m_ShouldCallClient; // If true, the client callbacks are called. Set to false in StopReading()
/** If sending writes only partial data, the rest is stored here for another send.
Also used when the slot is being removed to store the last batch of outgoing data. */
AString m_Outgoing;
enum eState
{
ssNormal, ///< Normal read / write operations
ssWritingRestOut, ///< The client callback was removed, continue to send outgoing data
ssShuttingDown, ///< The last outgoing data has been sent, the socket has called shutdown()
ssShuttingDown2, ///< The shutdown has been done at least 1 thread loop ago (timeout detection)
ssRemoteClosed, ///< The remote end has closed the connection (and we still have a client callback)
} m_State;
} ;
sSlot m_Slots[MAX_SLOTS];
int m_NumSlots; // Number of slots actually used
virtual void Execute(void) override;
void PrepareSet (fd_set * a_Set, cSocket::xSocket & a_Highest); // Puts all sockets into the set, along with m_ControlSocket1
/** Puts all sockets into the set, along with m_ControlSocket1.
Only sockets that are able to send and receive data are put in the Set.
Is a_IsForWriting is true, the ssWritingRestOut sockets are added as well. */
void PrepareSet(fd_set * a_Set, cSocket::xSocket & a_Highest, bool a_IsForWriting);
void ReadFromSockets(fd_set * a_Read); // Reads from sockets indicated in a_Read
void WriteToSockets (fd_set * a_Write); // Writes to sockets indicated in a_Write
/** Removes those slots in ssShuttingDown2 state, sets those with ssShuttingDown state to ssShuttingDown2 */
void CleanUpShutSockets(void);
} ;
typedef std::list<cSocketThread *> cSocketThreadList;
@ -165,9 +178,3 @@ private:
#endif // CSOCKETTHREADS_H_INCLUDED

View File

@ -118,7 +118,7 @@ cServer::cServer(void) :
void cServer::ClientDestroying(const cClientHandle * a_Client)
{
m_SocketThreads.StopReading(a_Client);
m_SocketThreads.RemoveClient(a_Client);
}
@ -143,15 +143,6 @@ void cServer::WriteToClient(const cClientHandle * a_Client, const AString & a_Da
void cServer::QueueClientClose(const cClientHandle * a_Client)
{
m_SocketThreads.QueueClose(a_Client);
}
void cServer::RemoveClient(const cClientHandle * a_Client)
{
m_SocketThreads.RemoveClient(a_Client);

View File

@ -88,14 +88,14 @@ public: // tolua_export
const AString & GetServerID(void) const { return m_ServerID; } // tolua_export
void ClientDestroying(const cClientHandle * a_Client); // Called by cClientHandle::Destroy(); stop m_SocketThreads from calling back into a_Client
/** Called by cClientHandle's destructor; stop m_SocketThreads from calling back into a_Client */
void ClientDestroying(const cClientHandle * a_Client);
void NotifyClientWrite(const cClientHandle * a_Client); // Notifies m_SocketThreads that client has something to be written
/** Notifies m_SocketThreads that client has something to be written */
void NotifyClientWrite(const cClientHandle * a_Client);
void WriteToClient(const cClientHandle * a_Client, const AString & a_Data); // Queues outgoing data for the client through m_SocketThreads
void QueueClientClose(const cClientHandle * a_Client); // Queues the clienthandle to close when all its outgoing data is sent
void RemoveClient(const cClientHandle * a_Client); // Removes the clienthandle from m_SocketThreads
/// Don't tick a_Client anymore, it will be ticked from its cPlayer instead