1
0

Rewritten networking to use non-blocking sockets.

This fixes #592.
This commit is contained in:
madmaxoft 2014-01-27 21:27:13 +01:00
parent 30c431b479
commit cc1284a753
4 changed files with 177 additions and 63 deletions

View File

@ -320,7 +320,7 @@ bool cSocket::ConnectIPv4(const AString & a_HostNameOrAddr, unsigned short a_Por
int cSocket::Receive(char* a_Buffer, unsigned int a_Length, unsigned int a_Flags) int cSocket::Receive(char * a_Buffer, unsigned int a_Length, unsigned int a_Flags)
{ {
return recv(m_Socket, a_Buffer, a_Length, a_Flags); return recv(m_Socket, a_Buffer, a_Length, a_Flags);
} }
@ -354,3 +354,32 @@ unsigned short cSocket::GetPort(void) const
void cSocket::SetNonBlocking(void)
{
#ifdef _WIN32
u_long NonBlocking = 1;
int res = ioctlsocket(m_Socket, FIONBIO, &NonBlocking);
if (res != 0)
{
LOGERROR("Cannot set socket to non-blocking. This would make the server deadlock later on, aborting.\nErr: %d, %d, %s",
res, GetLastError(), GetLastErrorString().c_str()
);
abort();
}
#else
int NonBlocking = 1;
int res = ioctl(m_Socket, FIONBIO, (char *)&NonBlocking);
if (res != 0)
{
LOGERROR("Cannot set socket to non-blocking. This would make the server deadlock later on, aborting.\nErr: %d, %d, %s",
res, GetLastError(), GetLastErrorString().c_str()
);
abort();
}
#endif
}

View File

@ -24,6 +24,12 @@ public:
{ {
IPv4 = AF_INET, IPv4 = AF_INET,
IPv6 = AF_INET6, IPv6 = AF_INET6,
#ifdef _WIN32
ErrWouldBlock = WSAEWOULDBLOCK,
#else
ErrWouldBlock = EWOULDBLOCK,
#endif
} ; } ;
#ifdef _WIN32 #ifdef _WIN32
@ -111,6 +117,9 @@ public:
const AString & GetIPString(void) const { return m_IPString; } const AString & GetIPString(void) const { return m_IPString; }
/** Sets the socket into non-blocking mode */
void SetNonBlocking(void);
private: private:
xSocket m_Socket; xSocket m_Socket;
AString m_IPString; AString m_IPString;

View File

@ -175,6 +175,7 @@ void cSocketThreads::cSocketThread::AddClient(const cSocket & a_Socket, cCallbac
m_Slots[m_NumSlots].m_Client = a_Client; m_Slots[m_NumSlots].m_Client = a_Client;
m_Slots[m_NumSlots].m_Socket = a_Socket; m_Slots[m_NumSlots].m_Socket = a_Socket;
m_Slots[m_NumSlots].m_Socket.SetNonBlocking();
m_Slots[m_NumSlots].m_Outgoing.clear(); m_Slots[m_NumSlots].m_Outgoing.clear();
m_Slots[m_NumSlots].m_State = sSlot::ssNormal; m_Slots[m_NumSlots].m_State = sSlot::ssNormal;
m_NumSlots++; m_NumSlots++;
@ -213,7 +214,9 @@ bool cSocketThreads::cSocketThread::RemoveClient(const cCallback * a_Client)
else else
{ {
// Query and queue the last batch of outgoing data: // Query and queue the last batch of outgoing data:
m_Slots[i].m_Client->GetOutgoingData(m_Slots[i].m_Outgoing); AString Data;
m_Slots[i].m_Client->GetOutgoingData(Data);
m_Slots[i].m_Outgoing.append(Data);
if (m_Slots[i].m_Outgoing.empty()) if (m_Slots[i].m_Outgoing.empty())
{ {
// No more outgoing data, shut the socket down immediately: // No more outgoing data, shut the socket down immediately:
@ -386,38 +389,28 @@ void cSocketThreads::cSocketThread::Execute(void)
// The main thread loop: // The main thread loop:
while (!m_ShouldTerminate) while (!m_ShouldTerminate)
{ {
// Put all sockets into the Read set: // Read outgoing data from the clients:
fd_set fdRead; QueueOutgoingData();
cSocket::xSocket Highest = m_ControlSocket1.GetSocket();
PrepareSet(&fdRead, Highest, false); // Put sockets into the sets
fd_set fdRead;
fd_set fdWrite;
cSocket::xSocket Highest = m_ControlSocket1.GetSocket();
PrepareSets(&fdRead, &fdWrite, Highest);
// Wait for the sockets: // Wait for the sockets:
timeval Timeout; timeval Timeout;
Timeout.tv_sec = 5; Timeout.tv_sec = 5;
Timeout.tv_usec = 0; Timeout.tv_usec = 0;
if (select(Highest + 1, &fdRead, NULL, NULL, &Timeout) == -1) if (select(Highest + 1, &fdRead, &fdWrite, NULL, &Timeout) == -1)
{ {
LOG("select(R) call failed in cSocketThread: \"%s\"", cSocket::GetLastErrorString().c_str()); LOG("select() call failed in cSocketThread: \"%s\"", cSocket::GetLastErrorString().c_str());
continue; continue;
} }
// Perform the IO:
ReadFromSockets(&fdRead); ReadFromSockets(&fdRead);
// Test sockets for writing:
fd_set fdWrite;
Highest = m_ControlSocket1.GetSocket();
PrepareSet(&fdWrite, Highest, true);
Timeout.tv_sec = 0;
Timeout.tv_usec = 0;
if (select(Highest + 1, NULL, &fdWrite, NULL, &Timeout) == -1)
{
LOG("select(W) call failed in cSocketThread: \"%s\"", cSocket::GetLastErrorString().c_str());
continue;
}
WriteToSockets(&fdWrite); WriteToSockets(&fdWrite);
CleanUpShutSockets(); CleanUpShutSockets();
} // while (!mShouldTerminate) } // while (!mShouldTerminate)
} }
@ -426,10 +419,11 @@ void cSocketThreads::cSocketThread::Execute(void)
void cSocketThreads::cSocketThread::PrepareSet(fd_set * a_Set, cSocket::xSocket & a_Highest, bool a_IsForWriting) void cSocketThreads::cSocketThread::PrepareSets(fd_set * a_Read, fd_set * a_Write, cSocket::xSocket & a_Highest)
{ {
FD_ZERO(a_Set); FD_ZERO(a_Read);
FD_SET(m_ControlSocket1.GetSocket(), a_Set); FD_ZERO(a_Write);
FD_SET(m_ControlSocket1.GetSocket(), a_Read);
cCSLock Lock(m_Parent->m_CS); cCSLock Lock(m_Parent->m_CS);
for (int i = m_NumSlots - 1; i >= 0; --i) for (int i = m_NumSlots - 1; i >= 0; --i)
@ -444,11 +438,16 @@ void cSocketThreads::cSocketThread::PrepareSet(fd_set * a_Set, cSocket::xSocket
continue; continue;
} }
cSocket::xSocket s = m_Slots[i].m_Socket.GetSocket(); cSocket::xSocket s = m_Slots[i].m_Socket.GetSocket();
FD_SET(s, a_Set); FD_SET(s, a_Read);
if (s > a_Highest) if (s > a_Highest)
{ {
a_Highest = s; a_Highest = s;
} }
if (!m_Slots[i].m_Outgoing.empty())
{
// There's outgoing data for the socket, put it in the Write set
FD_SET(s, a_Write);
}
} // for i - m_Slots[] } // for i - m_Slots[]
} }
@ -480,34 +479,37 @@ void cSocketThreads::cSocketThread::ReadFromSockets(fd_set * a_Read)
int Received = m_Slots[i].m_Socket.Receive(Buffer, ARRAYCOUNT(Buffer), 0); int Received = m_Slots[i].m_Socket.Receive(Buffer, ARRAYCOUNT(Buffer), 0);
if (Received <= 0) if (Received <= 0)
{ {
// The socket has been closed by the remote party if (cSocket::GetLastError() != cSocket::ErrWouldBlock)
switch (m_Slots[i].m_State)
{ {
case sSlot::ssNormal: // The socket has been closed by the remote party
switch (m_Slots[i].m_State)
{ {
// Notify the callback that the remote has closed the socket; keep the slot case sSlot::ssNormal:
m_Slots[i].m_Client->SocketClosed(); {
m_Slots[i].m_State = sSlot::ssRemoteClosed; // Notify the callback that the remote has closed the socket; keep the slot
break; m_Slots[i].m_Client->SocketClosed();
} m_Slots[i].m_State = sSlot::ssRemoteClosed;
case sSlot::ssWritingRestOut: break;
case sSlot::ssShuttingDown: }
case sSlot::ssShuttingDown2: case sSlot::ssWritingRestOut:
{ case sSlot::ssShuttingDown:
// Force-close the socket and remove the slot: case sSlot::ssShuttingDown2:
m_Slots[i].m_Socket.CloseSocket(); {
m_Slots[i] = m_Slots[--m_NumSlots]; // Force-close the socket and remove the slot:
break; m_Slots[i].m_Socket.CloseSocket();
} m_Slots[i] = m_Slots[--m_NumSlots];
default: break;
{ }
LOG("%s: Unexpected socket state: %d (%s)", default:
__FUNCTION__, m_Slots[i].m_Socket.GetSocket(), m_Slots[i].m_Socket.GetIPString().c_str() {
); LOG("%s: Unexpected socket state: %d (%s)",
ASSERT(!"Unexpected socket state"); __FUNCTION__, m_Slots[i].m_Socket.GetSocket(), m_Slots[i].m_Socket.GetIPString().c_str()
break; );
} ASSERT(!"Unexpected socket state");
} // switch (m_Slots[i].m_State) break;
}
} // switch (m_Slots[i].m_State)
}
} }
else else
{ {
@ -539,7 +541,9 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write)
// Request another chunk of outgoing data: // Request another chunk of outgoing data:
if (m_Slots[i].m_Client != NULL) if (m_Slots[i].m_Client != NULL)
{ {
m_Slots[i].m_Client->GetOutgoingData(m_Slots[i].m_Outgoing); AString Data;
m_Slots[i].m_Client->GetOutgoingData(Data);
m_Slots[i].m_Outgoing.append(Data);
} }
if (m_Slots[i].m_Outgoing.empty()) if (m_Slots[i].m_Outgoing.empty())
{ {
@ -553,8 +557,7 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write)
} }
} // if (outgoing data is empty) } // if (outgoing data is empty)
int Sent = m_Slots[i].m_Socket.Send(m_Slots[i].m_Outgoing.data(), m_Slots[i].m_Outgoing.size()); if (!SendDataThroughSocket(m_Slots[i].m_Socket, m_Slots[i].m_Outgoing))
if (Sent < 0)
{ {
int Err = cSocket::GetLastError(); int Err = cSocket::GetLastError();
LOGWARNING("Error %d while writing to client \"%s\", disconnecting. \"%s\"", Err, m_Slots[i].m_Socket.GetIPString().c_str(), GetOSErrorString(Err).c_str()); LOGWARNING("Error %d while writing to client \"%s\", disconnecting. \"%s\"", Err, m_Slots[i].m_Socket.GetIPString().c_str(), GetOSErrorString(Err).c_str());
@ -565,7 +568,6 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write)
} }
return; return;
} }
m_Slots[i].m_Outgoing.erase(0, Sent);
if (m_Slots[i].m_Outgoing.empty() && (m_Slots[i].m_State == sSlot::ssWritingRestOut)) if (m_Slots[i].m_Outgoing.empty() && (m_Slots[i].m_State == sSlot::ssWritingRestOut))
{ {
@ -590,8 +592,41 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write)
bool cSocketThreads::cSocketThread::SendDataThroughSocket(cSocket & a_Socket, AString & a_Data)
{
// Send data in smaller chunks, so that the OS send buffers aren't overflown easily
while (!a_Data.empty())
{
size_t NumToSend = std::min(a_Data.size(), (size_t)1024);
int Sent = a_Socket.Send(a_Data.data(), NumToSend);
if (Sent < 0)
{
int Err = cSocket::GetLastError();
if (Err == cSocket::ErrWouldBlock)
{
// The OS send buffer is full, leave the outgoing data for the next time
return true;
}
// An error has occured
return false;
}
if (Sent == 0)
{
a_Socket.CloseSocket();
return true;
}
a_Data.erase(0, Sent);
}
return true;
}
void cSocketThreads::cSocketThread::CleanUpShutSockets(void) void cSocketThreads::cSocketThread::CleanUpShutSockets(void)
{ {
cCSLock Lock(m_Parent->m_CS);
for (int i = m_NumSlots - 1; i >= 0; i--) for (int i = m_NumSlots - 1; i >= 0; i--)
{ {
switch (m_Slots[i].m_State) switch (m_Slots[i].m_State)
@ -617,3 +652,32 @@ void cSocketThreads::cSocketThread::CleanUpShutSockets(void)
void cSocketThreads::cSocketThread::QueueOutgoingData(void)
{
cCSLock Lock(m_Parent->m_CS);
for (int i = 0; i < m_NumSlots; i++)
{
if (m_Slots[i].m_Client != NULL)
{
AString Data;
m_Slots[i].m_Client->GetOutgoingData(Data);
m_Slots[i].m_Outgoing.append(Data);
}
if (m_Slots[i].m_Outgoing.empty())
{
// No outgoing data is ready
if (m_Slots[i].m_State == sSlot::ssWritingRestOut)
{
// The socket doesn't want to be kept alive anymore, and doesn't have any remaining data to send.
// Shut it down and then close it after a timeout, or when the other side agrees
m_Slots[i].m_State = sSlot::ssShuttingDown;
m_Slots[i].m_Socket.ShutdownReadWrite();
}
continue;
}
}
}

View File

@ -66,7 +66,8 @@ public:
/** Called when data is received from the remote party */ /** Called when data is received from the remote party */
virtual void DataReceived(const char * a_Data, int a_Size) = 0; virtual void DataReceived(const char * a_Data, int a_Size) = 0;
/** Called when data can be sent to remote party; the function is supposed to *append* outgoing data to a_Data */ /** Called when data can be sent to remote party
The function is supposed to *set* outgoing data to a_Data (overwrite) */
virtual void GetOutgoingData(AString & a_Data) = 0; virtual void GetOutgoingData(AString & a_Data) = 0;
/** Called when the socket has been closed for any reason */ /** Called when the socket has been closed for any reason */
@ -156,16 +157,27 @@ private:
virtual void Execute(void) override; virtual void Execute(void) override;
/** Puts all sockets into the set, along with m_ControlSocket1. /** Prepares the Read and Write socket sets for select()
Only sockets that are able to send and receive data are put in the Set. Puts all sockets into the read set, along with m_ControlSocket1.
Is a_IsForWriting is true, the ssWritingRestOut sockets are added as well. */ Only sockets that have outgoing data queued on them are put in the write set.*/
void PrepareSet(fd_set * a_Set, cSocket::xSocket & a_Highest, bool a_IsForWriting); void PrepareSets(fd_set * a_ReadSet, fd_set * a_WriteSet, cSocket::xSocket & a_Highest);
void ReadFromSockets(fd_set * a_Read); // Reads from sockets indicated in a_Read /** Reads from sockets indicated in a_Read */
void WriteToSockets (fd_set * a_Write); // Writes to sockets indicated in a_Write void ReadFromSockets(fd_set * a_Read);
/** Writes to sockets indicated in a_Write */
void WriteToSockets (fd_set * a_Write);
/** Sends data through the specified socket, trying to fill the OS send buffer in chunks.
Returns true if there was no error while sending, false if an error has occured.
Modifies a_Data to contain only the unsent data. */
bool SendDataThroughSocket(cSocket & a_Socket, AString & a_Data);
/** Removes those slots in ssShuttingDown2 state, sets those with ssShuttingDown state to ssShuttingDown2 */ /** Removes those slots in ssShuttingDown2 state, sets those with ssShuttingDown state to ssShuttingDown2 */
void CleanUpShutSockets(void); void CleanUpShutSockets(void);
/** Calls each client's callback to retrieve outgoing data for that client. */
void QueueOutgoingData(void);
} ; } ;
typedef std::list<cSocketThread *> cSocketThreadList; typedef std::list<cSocketThread *> cSocketThreadList;