From c82c636d8c9ac24c1e73740c35b3a93525783a3d Mon Sep 17 00:00:00 2001 From: "madmaxoft@gmail.com" Date: Wed, 8 Feb 2012 10:02:46 +0000 Subject: [PATCH] cSocketThreads plugged in for cClientHandle reading. Sending still kept the old way. Please help me test this commit thoroughly, this is a change that can break on subtleties. git-svn-id: http://mc-server.googlecode.com/svn/trunk@244 0a769ca7-a7f5-676a-18bf-c427514a06d6 --- source/cClientHandle.cpp | 203 +++++++++++++++----------------------- source/cClientHandle.h | 24 +++-- source/cServer.cpp | 65 +++--------- source/cServer.h | 38 +++++++ source/cSocket.cpp | 7 +- source/cSocket.h | 3 + source/cSocketThreads.cpp | 133 ++++++++++++++++++------- source/cSocketThreads.h | 27 ++--- 8 files changed, 267 insertions(+), 233 deletions(-) diff --git a/source/cClientHandle.cpp b/source/cClientHandle.cpp index bbf196ccb..898e04e72 100644 --- a/source/cClientHandle.cpp +++ b/source/cClientHandle.cpp @@ -91,7 +91,6 @@ extern std::string GetWSAError(); cClientHandle::cClientHandle(const cSocket & a_Socket) : m_ProtocolVersion(23) - , m_pReceiveThread(NULL) , m_pSendThread(NULL) , m_Socket(a_Socket) , m_Semaphore(MAX_SEMAPHORES) @@ -105,8 +104,6 @@ cClientHandle::cClientHandle(const cSocket & a_Socket) , m_Ping(1000) , m_bPositionConfirmed(false) { - LOG("cClientHandle::cClientHandle"); - cTimer t1; m_LastPingTime = t1.GetNowTime(); @@ -143,13 +140,11 @@ cClientHandle::cClientHandle(const cSocket & a_Socket) memset(m_LoadedChunks, 0x00, sizeof(m_LoadedChunks)); ////////////////////////////////////////////////////////////////////////// - m_pReceiveThread = new cThread(ReceiveThread, this, "cClientHandle::ReceiveThread"); m_pSendThread = new cThread(SendThread, this, "cClientHandle::SendThread"); - m_pReceiveThread->Start(true); m_pSendThread->Start (true); ////////////////////////////////////////////////////////////////////////// - LOG("New ClientHandle"); + LOG("New ClientHandle created at %p", this); } @@ -158,7 +153,7 @@ cClientHandle::cClientHandle(const cSocket & a_Socket) cClientHandle::~cClientHandle() { - LOG("Deleting client %s", GetUsername().c_str()); + LOG("Deleting client \"%s\"", GetUsername().c_str()); for(unsigned int i = 0; i < VIEWDISTANCE*VIEWDISTANCE; i++) { @@ -185,7 +180,6 @@ cClientHandle::~cClientHandle() // First stop sending thread m_bKeepThreadGoing = false; - cCSLock Lock(m_SocketCriticalSection); if (m_Socket.IsValid()) { cPacket_Disconnect Disconnect; @@ -193,17 +187,10 @@ cClientHandle::~cClientHandle() m_Socket.Send(&Disconnect); m_Socket.CloseSocket(); } - Lock.Unlock(); m_Semaphore.Signal(); - delete m_pReceiveThread; delete m_pSendThread; - while (!m_PendingParsePackets.empty()) - { - delete *m_PendingParsePackets.begin(); - m_PendingParsePackets.erase(m_PendingParsePackets.begin()); - } while (!m_PendingNrmSendPackets.empty()) { delete *m_PendingNrmSendPackets.begin(); @@ -224,6 +211,8 @@ cClientHandle::~cClientHandle() { delete m_PacketMap[i]; } + + LOG("ClientHandle at %p destroyed", this); } @@ -233,11 +222,13 @@ cClientHandle::~cClientHandle() void cClientHandle::Destroy() { m_bDestroyed = true; - cCSLock Lock(m_SocketCriticalSection); if (m_Socket.IsValid()) { m_Socket.CloseSocket(); } + + // Synchronize with the cSocketThreads (so that they don't call us anymore) + cRoot::Get()->GetServer()->ClientDestroying(this); } @@ -419,31 +410,6 @@ void cClientHandle::RemoveFromAllChunks() -void cClientHandle::AddPacket(cPacket * a_Packet) -{ - cCSLock Lock(m_CriticalSection); - m_PendingParsePackets.push_back(a_Packet->Clone()); -} - - - - - -void cClientHandle::HandlePendingPackets() -{ - cCSLock Lock(m_CriticalSection); - for (PacketList::iterator itr = m_PendingParsePackets.begin(); itr != m_PendingParsePackets.end(); ++itr) - { - HandlePacket(*itr); - delete *itr; - } - m_PendingParsePackets.clear(); -} - - - - - void cClientHandle::HandlePacket(cPacket * a_Packet) { m_TimeLastPacket = cWorld::GetTime(); @@ -1766,14 +1732,11 @@ void cClientHandle::SendThread(void *lpParam) } Lock.Unlock(); - cCSLock SocketLock(self->m_SocketCriticalSection); if (!self->m_Socket.IsValid()) { break; } - bool bSuccess = self->m_Socket.Send(Packet); - SocketLock.Unlock(); if (!bSuccess) { @@ -1799,84 +1762,6 @@ void cClientHandle::SendThread(void *lpParam) -void cClientHandle::ReceiveThread(void *lpParam) -{ - LOG("ReceiveThread"); - - cClientHandle* self = (cClientHandle*)lpParam; - - char temp = 0; - int iStat = 0; - - cSocket socket = self->GetSocket(); - - AString Received; - while (self->m_bKeepThreadGoing) - { - char Buffer[1024]; - iStat = socket.Receive(Buffer, sizeof(Buffer), 0); - if (cSocket::IsSocketError(iStat) || (iStat == 0)) - { - LOG("CLIENT DISCONNECTED (%i bytes):%s", iStat, cSocket::GetLastErrorString().c_str()); - break; - } - Received.append(Buffer, iStat); - - // Parse all complete packets in Received: - while (!Received.empty()) - { - cPacket* pPacket = self->m_PacketMap[(unsigned char)Received[0]]; - if (pPacket) - { - int NumBytes = pPacket->Parse(Received.data() + 1, Received.size() - 1); - if (NumBytes == PACKET_ERROR) - { - LOGERROR("Protocol error while parsing packet type 0x%x; disconnecting client \"%s\"", Received[0], self->m_Username.c_str()); - cPacket_Disconnect DC("Protocol error"); - socket.Send(&DC); - - cSleep::MilliSleep(1000); // Give packet some time to be received - return; - } - else if (NumBytes == PACKET_INCOMPLETE) - { - // Not a complete packet - break; - } - else - { - // Packet parsed successfully, add it to internal queue: - self->AddPacket(pPacket); - // Erase the packet from the buffer: - assert(Received.size() > (size_t)NumBytes); - Received.erase(0, NumBytes + 1); - } - } - else - { - LOGERROR("Unknown packet type: 0x%2x", Received[0]); - - AString Reason; - Printf(Reason, "[C->S] Unknown PacketID: 0x%02x", Received[0]); - cPacket_Disconnect DC(Reason); - socket.Send(&DC); - - cSleep::MilliSleep(1000); // Give packet some time to be received - break; - } - } // while (!Received.empty()) - } // while (self->m_bKeepThreadGoing) - - self->Destroy(); - - LOG("ReceiveThread STOPPED"); - return; -} - - - - - const AString & cClientHandle::GetUsername(void) const { return m_Username; @@ -1886,9 +1771,79 @@ const AString & cClientHandle::GetUsername(void) const -const cSocket & cClientHandle::GetSocket() +void cClientHandle::DataReceived(const char * a_Data, int a_Size) { - return m_Socket; + // Data is received from the client + + m_ReceivedData.append(a_Data, a_Size); + + // Parse and handle all complete packets in m_ReceivedData: + while (!m_ReceivedData.empty()) + { + cPacket* pPacket = m_PacketMap[(unsigned char)m_ReceivedData[0]]; + if (pPacket == NULL) + { + LOGERROR("Unknown packet type 0x%02x from client \"%s\"", (unsigned char)m_ReceivedData[0], m_Username.c_str()); + + AString Reason; + Printf(Reason, "[C->S] Unknown PacketID: 0x%02x", m_ReceivedData[0]); + cPacket_Disconnect DC(Reason); + m_Socket.Send(&DC); + cSleep::MilliSleep(1000); // Give packet some time to be received + Destroy(); + return; + } + + int NumBytes = pPacket->Parse(m_ReceivedData.data() + 1, m_ReceivedData.size() - 1); + if (NumBytes == PACKET_ERROR) + { + LOGERROR("Protocol error while parsing packet type 0x%02x; disconnecting client \"%s\"", (unsigned char)m_ReceivedData[0], m_Username.c_str()); + cPacket_Disconnect DC("Protocol error"); + m_Socket.Send(&DC); + cSleep::MilliSleep(1000); // Give packet some time to be received + Destroy(); + return; + } + else if (NumBytes == PACKET_INCOMPLETE) + { + // Not a complete packet + break; + } + else + { + // Packet parsed successfully, add it to internal queue: + HandlePacket(pPacket); + // Erase the packet from the buffer: + assert(m_ReceivedData.size() > (size_t)NumBytes); + m_ReceivedData.erase(0, NumBytes + 1); + } + } // while (!Received.empty()) +} + + + + + +void cClientHandle::GetOutgoingData(AString & a_Data) +{ + // Data can be sent to client + + // TODO +} + + + + + +void cClientHandle::SocketClosed(void) +{ + // The socket has been closed for any reason + + // TODO + /* + self->Destroy(); + LOG("Client \"%s\" disconnected", GetLogName().c_str()); + */ } diff --git a/source/cClientHandle.h b/source/cClientHandle.h index 56b9382e8..48a51c2df 100644 --- a/source/cClientHandle.h +++ b/source/cClientHandle.h @@ -13,6 +13,7 @@ #include "packets/cPacket.h" #include "Vector3d.h" +#include "cSocketThreads.h" #include "packets/cPacket_KeepAlive.h" #include "packets/cPacket_PlayerPosition.h" @@ -54,7 +55,8 @@ class cRedstone; -class cClientHandle // tolua_export +class cClientHandle : // tolua_export + public cSocketThreads::cCallback { // tolua_export public: enum ENUM_PRIORITY @@ -71,15 +73,14 @@ public: static const int VIEWDISTANCE = 17; // MUST be odd number or CRASH! static const int GENERATEDISTANCE = 2; // Server generates this many chunks AHEAD of player sight. - const cSocket & GetSocket(); + const cSocket & GetSocket(void) const {return m_Socket; } + cSocket & GetSocket(void) {return m_Socket; } + cPlayer* GetPlayer() { return m_Player; } // tolua_export void Kick(const AString & a_Reason); //tolua_export void Authenticate(void); // Called by cAuthenticator when the user passes authentication - void AddPacket( cPacket * a_Packet ); - void HandlePendingPackets(); - void StreamChunks(); void StreamChunksSmart( cChunk** a_Chunks, unsigned int a_NumChunks ); void RemoveFromAllChunks(); @@ -97,7 +98,6 @@ public: void Send( const cPacket & a_Packet, ENUM_PRIORITY a_Priority = E_PRIORITY_NORMAL ); static void SendThread( void *lpParam ); - static void ReceiveThread( void *lpParam ); const AString & GetUsername(void) const; @@ -108,19 +108,19 @@ private: int m_ProtocolVersion; AString m_Username; AString m_Password; + + AString m_ReceivedData; // Accumulator for the data received from the socket, waiting to be parsed; accessed from the cSocketThreads' thread only! - PacketList m_PendingParsePackets; PacketList m_PendingNrmSendPackets; PacketList m_PendingLowSendPackets; - cThread * m_pReceiveThread; cThread * m_pSendThread; cSocket m_Socket; cCriticalSection m_CriticalSection; cCriticalSection m_SendCriticalSection; - cCriticalSection m_SocketCriticalSection; + // cCriticalSection m_SocketCriticalSection; cSemaphore m_Semaphore; Vector3d m_ConfirmPosition; @@ -180,6 +180,12 @@ private: bool CheckBlockInteractionsRate(void); void SendLoginResponse(); + + // cSocketThreads::cCallback overrides: + virtual void DataReceived (const char * a_Data, int a_Size) override; // Data is received from the client + virtual void GetOutgoingData(AString & a_Data) override; // Data can be sent to client + virtual void SocketClosed (void) override; // The socket has been closed for any reason + }; // tolua_export diff --git a/source/cServer.cpp b/source/cServer.cpp index 36fb71875..e3fdb2f86 100644 --- a/source/cServer.cpp +++ b/source/cServer.cpp @@ -100,46 +100,9 @@ void cServer::ServerListenThread( void *a_Args ) -std::string GetWSAError() +void cServer::ClientDestroying(const cClientHandle * a_Client) { -#ifdef _WIN32 - switch( WSAGetLastError() ) - { - case WSANOTINITIALISED: - return "WSANOTINITIALISED"; - case WSAENETDOWN: - return "WSAENETDOWN"; - case WSAEFAULT: - return "WSAEFAULT"; - case WSAENOTCONN: - return "WSAENOTCONN"; - case WSAEINTR: - return "WSAEINTR"; - case WSAEINPROGRESS: - return "WSAEINPROGRESS"; - case WSAENETRESET: - return "WSAENETRESET"; - case WSAENOTSOCK: - return "WSAENOTSOCK"; - case WSAEOPNOTSUPP: - return "WSAEOPNOTSUPP"; - case WSAESHUTDOWN: - return "WSAESHUTDOWN"; - case WSAEWOULDBLOCK: - return "WSAEWOULDBLOCK"; - case WSAEMSGSIZE: - return "WSAEMSGSIZE"; - case WSAEINVAL: - return "WSAEINVAL"; - case WSAECONNABORTED: - return "WSAECONNABORTED"; - case WSAETIMEDOUT: - return "WSAETIMEDOUT"; - case WSAECONNRESET: - return "WSAECONNRESET"; - } -#endif - return "No Error"; + m_SocketThreads.RemoveClient(a_Client); } @@ -183,11 +146,11 @@ bool cServer::InitServer( int a_Port ) return false; } - if( m_pState->SListenClient.SetReuseAddress() == -1 ) + if( m_pState->SListenClient.SetReuseAddress() == -1 ) { - LOGERROR("setsockopt == -1"); - return false; - } + LOGERROR("setsockopt == -1"); + return false; + } cSocket::SockAddr_In local; local.Family = cSocket::ADDRESS_FAMILY_INTERNET; @@ -308,9 +271,16 @@ void cServer::StartListenClient() return; } - LOG("%s connected!", ClientIP.c_str()); + LOG("Client \"%s\" connected!", ClientIP.c_str()); - cClientHandle *NewHandle = new cClientHandle( SClient ); + cClientHandle *NewHandle = new cClientHandle(SClient); + if (!m_SocketThreads.AddClient(&(NewHandle->GetSocket()), NewHandle)) + { + // For some reason SocketThreads have rejected the handle, clean it up + SClient.CloseSocket(); + delete NewHandle; + return; + } m_pState->Clients.push_back( NewHandle ); // TODO - lock list } @@ -335,11 +305,8 @@ bool cServer::Tick(float a_Dt) //World->LockClientHandle(); // TODO - Lock client list for( ClientList::iterator itr = m_pState->Clients.begin(); itr != m_pState->Clients.end();) { - (*itr)->HandlePendingPackets(); - if( (*itr)->IsDestroyed() ) { - cClientHandle* RemoveMe = *itr; ++itr; m_pState->Clients.remove( RemoveMe ); @@ -464,7 +431,7 @@ void cServer::ServerCommand( const char* a_Cmd ) { if( split[0].compare( "help" ) == 0 ) { - printf("===================ALL COMMANDS====================\n"); + printf("================== ALL COMMANDS ===================\n"); printf("help - Shows this message\n"); printf("save-all - Saves all loaded chunks to disk\n"); printf("list - Lists all players currently in server\n"); diff --git a/source/cServer.h b/source/cServer.h index db60a2742..b56047487 100644 --- a/source/cServer.h +++ b/source/cServer.h @@ -1,9 +1,30 @@ +// cServer.h + +// Interfaces to the cServer object representing the network server + + + + + #pragma once +#ifndef CSERVER_H_INCLUDED +#define CSERVER_H_INCLUDED + +#include "cSocketThreads.h" + + + + class cPlayer; class cClientHandle; class cPacket; + + + + + class cServer //tolua_export { //tolua_export public: //tolua_export @@ -34,13 +55,20 @@ public: //tolua_export static void ServerListenThread( void* a_Args ); const AString & GetServerID(void) const; + + void ClientDestroying(const cClientHandle * a_Client); // Called by cClientHandle::Destroy(); removes the client from m_SocketThreads + private: + friend class cRoot; // so cRoot can create and destroy cServer + cServer(); ~cServer(); struct sServerState; sServerState* m_pState; + + cSocketThreads m_SocketThreads; // Time since server was started float m_Millisecondsf; @@ -51,3 +79,13 @@ private: bool m_bRestarting; }; //tolua_export + + + + + +#endif // CSERVER_H_INCLUDED + + + + diff --git a/source/cSocket.cpp b/source/cSocket.cpp index 42cc298a7..00f10154b 100644 --- a/source/cSocket.cpp +++ b/source/cSocket.cpp @@ -27,6 +27,7 @@ cSocket::cSocket(xSocket a_Socket) cSocket::~cSocket() { + // Do NOT close the socket; this class is an API wrapper, not a RAII! } @@ -100,6 +101,10 @@ AString cSocket::GetErrorString( int a_ErrNo ) FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, a_ErrNo, 0, buffer, ARRAYCOUNT(buffer), NULL); Printf(Out, "%d: %s", a_ErrNo, buffer); + if (!Out.empty() && (Out[Out.length() - 1] == '\n')) + { + Out.erase(Out.length() - 2); + } return Out; #else // _WIN32 @@ -312,7 +317,7 @@ unsigned short cSocket::GetPort(void) const { return 0; } - return Addr.sin_port; + return ntohs(Addr.sin_port); } diff --git a/source/cSocket.h b/source/cSocket.h index b98741c4b..81749048b 100644 --- a/source/cSocket.h +++ b/source/cSocket.h @@ -29,6 +29,9 @@ public: operator const xSocket() const; xSocket GetSocket() const; + + bool operator == (const cSocket & a_Other) {return m_Socket == a_Other.m_Socket; } + void SetSocket( xSocket a_Socket ); int SetReuseAddress(); diff --git a/source/cSocketThreads.cpp b/source/cSocketThreads.cpp index 277036e46..796316878 100644 --- a/source/cSocketThreads.cpp +++ b/source/cSocketThreads.cpp @@ -39,7 +39,7 @@ cSocketThreads::~cSocketThreads() -void cSocketThreads::AddClient(cSocket * a_Socket, cCallback * a_Client) +bool cSocketThreads::AddClient(cSocket * a_Socket, cCallback * a_Client) { // Add a (socket, client) pair for processing, data from a_Socket is to be sent to a_Client @@ -47,25 +47,32 @@ void cSocketThreads::AddClient(cSocket * a_Socket, cCallback * a_Client) cCSLock Lock(m_CS); for (cSocketThreadList::iterator itr = m_Threads.begin(); itr != m_Threads.end(); ++itr) { - if ((*itr)->HasEmptySlot()) + if ((*itr)->IsValid() && (*itr)->HasEmptySlot()) { (*itr)->AddClient(a_Socket, a_Client); - return; + return true; } } // No thread has free space, create a new one: + LOG("Creating a new cSocketThread (currently have %d)", m_Threads.size()); cSocketThread * Thread = new cSocketThread(this); - Thread->Start(); + if (!Thread->Start()) + { + // There was an error launching the thread (but it was already logged along with the reason) + delete Thread; + return false; + } Thread->AddClient(a_Socket, a_Client); m_Threads.push_back(Thread); + return true; } -void cSocketThreads::RemoveClient(cSocket * a_Socket) +void cSocketThreads::RemoveClient(const cSocket * a_Socket) { // Remove the socket (and associated client) from processing @@ -83,7 +90,7 @@ void cSocketThreads::RemoveClient(cSocket * a_Socket) -void cSocketThreads::RemoveClient(cCallback * a_Client) +void cSocketThreads::RemoveClient(const cCallback * a_Client) { // Remove the associated socket and the client from processing @@ -101,7 +108,7 @@ void cSocketThreads::RemoveClient(cCallback * a_Client) -void cSocketThreads::NotifyWrite(cCallback * a_Client) +void cSocketThreads::NotifyWrite(const cCallback * a_Client) { // Notifies the thread responsible for a_Client that the client has something to write @@ -152,7 +159,7 @@ void cSocketThreads::cSocketThread::AddClient(cSocket * a_Socket, cCallback * a_ -bool cSocketThreads::cSocketThread::RemoveClient(cCallback * a_Client) +bool cSocketThreads::cSocketThread::RemoveClient(const cCallback * a_Client) { // Returns true if removed, false if not found @@ -161,7 +168,7 @@ bool cSocketThreads::cSocketThread::RemoveClient(cCallback * a_Client) return false; } - for (int i = m_NumSlots - 1; i > 0 ; --i) + for (int i = m_NumSlots - 1; i >= 0 ; --i) { if (m_Slots[i].m_Client != a_Client) { @@ -186,7 +193,7 @@ bool cSocketThreads::cSocketThread::RemoveClient(cCallback * a_Client) -bool cSocketThreads::cSocketThread::RemoveSocket(cSocket * a_Socket) +bool cSocketThreads::cSocketThread::RemoveSocket(const cSocket * a_Socket) { // Returns true if removed, false if not found @@ -195,7 +202,7 @@ bool cSocketThreads::cSocketThread::RemoveSocket(cSocket * a_Socket) return false; } - for (int i = m_NumSlots - 1; i > 0 ; --i) + for (int i = m_NumSlots - 1; i >= 0 ; --i) { if (m_Slots[i].m_Socket != a_Socket) { @@ -220,7 +227,7 @@ bool cSocketThreads::cSocketThread::RemoveSocket(cSocket * a_Socket) -bool cSocketThreads::cSocketThread::NotifyWrite(cCallback * a_Client) +bool cSocketThreads::cSocketThread::NotifyWrite(const cCallback * a_Client) { if (HasClient(a_Client)) { @@ -236,7 +243,7 @@ bool cSocketThreads::cSocketThread::NotifyWrite(cCallback * a_Client) -bool cSocketThreads::cSocketThread::HasClient(cCallback * a_Client) const +bool cSocketThreads::cSocketThread::HasClient(const cCallback * a_Client) const { for (int i = m_NumSlots - 1; i >= 0; --i) { @@ -252,11 +259,11 @@ bool cSocketThreads::cSocketThread::HasClient(cCallback * a_Client) const -bool cSocketThreads::cSocketThread::HasSocket(cSocket * a_Socket) const +bool cSocketThreads::cSocketThread::HasSocket(const cSocket * a_Socket) const { for (int i = m_NumSlots - 1; i >= 0; --i) { - if (m_Slots[i].m_Socket == a_Socket) + if (m_Slots[i].m_Socket->GetSocket() == a_Socket->GetSocket()) { return true; } @@ -271,8 +278,8 @@ bool cSocketThreads::cSocketThread::HasSocket(cSocket * a_Socket) const bool cSocketThreads::cSocketThread::Start(void) { // Create the control socket listener - m_ControlSocket1 = cSocket::CreateSocket(); - if (!m_ControlSocket1.IsValid()) + m_ControlSocket2 = cSocket::CreateSocket(); + if (!m_ControlSocket2.IsValid()) { LOGERROR("Cannot create a Control socket for a cSocketThread (\"%s\"); continuing, but server may be unreachable from now on.", cSocket::GetLastErrorString().c_str()); return false; @@ -281,36 +288,42 @@ bool cSocketThreads::cSocketThread::Start(void) Addr.Family = cSocket::ADDRESS_FAMILY_INTERNET; Addr.Address = cSocket::INTERNET_ADDRESS_LOCALHOST(); Addr.Port = 0; // Any free port is okay - if (m_ControlSocket1.Bind(Addr) != 0) + if (m_ControlSocket2.Bind(Addr) != 0) { LOGERROR("Cannot bind a Control socket for a cSocketThread (\"%s\"); continuing, but server may be unreachable from now on.", cSocket::GetLastErrorString().c_str()); - m_ControlSocket1.CloseSocket(); + m_ControlSocket2.CloseSocket(); return false; } - if (m_ControlSocket1.GetPort() == 0) + if (m_ControlSocket2.Listen(1) != 0) + { + LOGERROR("Cannot listen on a Control socket for a cSocketThread (\"%s\"); continuing, but server may be unreachable from now on.", cSocket::GetLastErrorString().c_str()); + m_ControlSocket2.CloseSocket(); + return false; + } + if (m_ControlSocket2.GetPort() == 0) { LOGERROR("Cannot determine Control socket port (\"%s\"); conitnuing, but the server may be unreachable from now on.", cSocket::GetLastErrorString().c_str()); - m_ControlSocket1.CloseSocket(); + m_ControlSocket2.CloseSocket(); return false; } // Start the thread if (!super::Start()) { - m_ControlSocket1.CloseSocket(); + m_ControlSocket2.CloseSocket(); return false; } // Finish connecting the control socket by accepting connection from the thread's socket - cSocket tmp = m_ControlSocket1.Accept(); + cSocket tmp = m_ControlSocket2.Accept(); if (!tmp.IsValid()) { LOGERROR("Cannot link Control sockets for a cSocketThread (\"%s\"); continuing, but server may be unreachable from now on.", cSocket::GetLastErrorString().c_str()); - m_ControlSocket1.CloseSocket(); + m_ControlSocket2.CloseSocket(); return false; } - m_ControlSocket1.CloseSocket(); - m_ControlSocket1 = tmp; + m_ControlSocket2.CloseSocket(); + m_ControlSocket2 = tmp; return true; } @@ -322,15 +335,16 @@ bool cSocketThreads::cSocketThread::Start(void) void cSocketThreads::cSocketThread::Execute(void) { // Connect the "client" part of the Control socket: + m_ControlSocket1 = cSocket::CreateSocket(); cSocket::SockAddr_In Addr; Addr.Family = cSocket::ADDRESS_FAMILY_INTERNET; Addr.Address = cSocket::INTERNET_ADDRESS_LOCALHOST(); - Addr.Port = m_ControlSocket1.GetPort(); + Addr.Port = m_ControlSocket2.GetPort(); assert(Addr.Port != 0); // We checked in the Start() method, but let's be sure - if (m_ControlSocket2.Connect(Addr) != 0) + if (m_ControlSocket1.Connect(Addr) != 0) { LOGERROR("Cannot connect Control sockets for a cSocketThread (\"%s\"); continuing, but the server may be unreachable from now on.", cSocket::GetLastErrorString().c_str()); - m_ControlSocket1.CloseSocket(); + m_ControlSocket2.CloseSocket(); return; } @@ -339,33 +353,35 @@ void cSocketThreads::cSocketThread::Execute(void) { // Put all sockets into the Read set: fd_set fdRead; - cSocket::xSocket Highest = 0; + cSocket::xSocket Highest = m_ControlSocket1.GetSocket(); PrepareSet(&fdRead, Highest); // Wait for the sockets: if (select(Highest + 1, &fdRead, NULL, NULL, NULL) == -1) { - LOGWARNING("select(R) call failed in cSocketThread: \"%s\"", cSocket::GetLastErrorString().c_str()); - break; + LOG("select(R) call failed in cSocketThread: \"%s\"", cSocket::GetLastErrorString().c_str()); + continue; } ReadFromSockets(&fdRead); // Test sockets for writing: fd_set fdWrite; - Highest = 0; + Highest = m_ControlSocket1.GetSocket(); PrepareSet(&fdWrite, Highest); timeval Timeout; Timeout.tv_sec = 0; Timeout.tv_usec = 0; if (select(Highest + 1, NULL, &fdWrite, NULL, &Timeout) == -1) { - LOGWARNING("select(W) call failed in cSocketThread: \"%s\"", cSocket::GetLastErrorString().c_str()); - break; + LOG("select(W) call failed in cSocketThread: \"%s\"", cSocket::GetLastErrorString().c_str()); + continue; } WriteToSockets(&fdWrite); + + RemoveClosedSockets(); } // while (!mShouldTerminate) LOG("cSocketThread %p is terminating", this); @@ -381,7 +397,7 @@ void cSocketThreads::cSocketThread::PrepareSet(fd_set * a_Set, cSocket::xSocket FD_SET(m_ControlSocket1.GetSocket(), a_Set); cCSLock Lock(m_Parent->m_CS); - for (int i = m_NumSlots - 1; i > 0; --i) + for (int i = m_NumSlots - 1; i >= 0; --i) { if (!m_Slots[i].m_Socket->IsValid()) { @@ -403,8 +419,17 @@ void cSocketThreads::cSocketThread::PrepareSet(fd_set * a_Set, cSocket::xSocket void cSocketThreads::cSocketThread::ReadFromSockets(fd_set * a_Read) { // Read on available sockets: + + // Reset Control socket state: + if (FD_ISSET(m_ControlSocket1.GetSocket(), a_Read)) + { + char Dummy[128]; + m_ControlSocket1.Receive(Dummy, sizeof(Dummy), 0); + } + + // Read from clients: cCSLock Lock(m_Parent->m_CS); - for (int i = m_NumSlots - 1; i > 0; --i) + for (int i = m_NumSlots - 1; i >= 0; --i) { if (!FD_ISSET(m_Slots[i].m_Socket->GetSocket(), a_Read)) { @@ -437,8 +462,9 @@ void cSocketThreads::cSocketThread::ReadFromSockets(fd_set * a_Read) void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write) { + // Write to available client sockets: cCSLock Lock(m_Parent->m_CS); - for (int i = m_NumSlots - 1; i > 0; --i) + for (int i = m_NumSlots - 1; i >= 0; --i) { if (!FD_ISSET(m_Slots[i].m_Socket->GetSocket(), a_Write)) { @@ -464,9 +490,40 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write) return; } m_Slots[i].m_Outgoing.erase(0, Sent); + + // _X: If there's data left, it means the client is not reading fast enough, the server would unnecessarily spin in the main loop with zero actions taken; so signalling is disabled + // This means that if there's data left, it will be sent only when there's incoming data or someone queues another packet (for any socket handled by this thread) + /* + // If there's any data left, signalize the Control socket: + if (!m_Slots[i].m_Outgoing.empty()) + { + assert(m_ControlSocket2.IsValid()); + m_ControlSocket2.Send("q", 1); + } + */ } // for i - m_Slots[i] } + +void cSocketThreads::cSocketThread::RemoveClosedSockets(void) +{ + // Removes sockets that have closed from m_Slots[] + + cCSLock Lock(m_Parent->m_CS); + for (int i = m_NumSlots - 1; i >= 0; --i) + { + if (m_Slots[i].m_Socket->IsValid()) + { + continue; + } + m_Slots[i] = m_Slots[m_NumSlots - 1]; + m_NumSlots--; + } // for i - m_Slots[] +} + + + + diff --git a/source/cSocketThreads.h b/source/cSocketThreads.h index b43d693ba..cbf73a27e 100644 --- a/source/cSocketThreads.h +++ b/source/cSocketThreads.h @@ -10,7 +10,7 @@ /// How many clients should one thread handle? (must be less than FD_SETSIZE - 1 for your platform) -#define MAX_SLOTS 1 +#define MAX_SLOTS 63 @@ -65,17 +65,17 @@ public: cSocketThreads(void); ~cSocketThreads(); - /// Add a (socket, client) pair for processing, data from a_Socket is to be sent to a_Client - void AddClient(cSocket * a_Socket, cCallback * a_Client); + /// Add a (socket, client) pair for processing, data from a_Socket is to be sent to a_Client; returns true if successful + bool AddClient(cSocket * a_Socket, cCallback * a_Client); /// Remove the socket (and associated client) from processing - void RemoveClient(cSocket * a_Socket); + void RemoveClient(const cSocket * a_Socket); /// Remove the associated socket and the client from processing - void RemoveClient(cCallback * a_Client); + void RemoveClient(const cCallback * a_Client); /// Notify the thread responsible for a_Client that the client has something to write - void NotifyWrite(cCallback * a_Client); + void NotifyWrite(const cCallback * a_Client); private: @@ -92,15 +92,17 @@ private: bool HasEmptySlot(void) const {return m_NumSlots < MAX_SLOTS; } bool IsEmpty (void) const {return m_NumSlots == 0; } - void AddClient (cSocket * a_Socket, cCallback * a_Client); - bool RemoveClient(cCallback * a_Client); // Returns true if removed, false if not found - bool RemoveSocket(cSocket * a_Socket); // Returns true if removed, false if not found - bool HasClient (cCallback * a_Client) const; - bool HasSocket (cSocket * a_Socket) const; - bool NotifyWrite (cCallback * a_Client); // Returns true if client handled by this thread + void AddClient (cSocket * a_Socket, cCallback * a_Client); + bool RemoveClient(const cCallback * a_Client); // Returns true if removed, false if not found + bool RemoveSocket(const cSocket * a_Socket); // Returns true if removed, false if not found + bool HasClient (const cCallback * a_Client) const; + bool HasSocket (const cSocket * a_Socket) const; + bool NotifyWrite (const cCallback * a_Client); // Returns true if client handled by this thread bool Start(void); // Hide the cIsThread's Start method, we need to provide our own startup to create the control socket + bool IsValid(void) const {return m_ControlSocket2.IsValid(); } // If the Control socket dies, the thread is not valid anymore + private: cSocketThreads * m_Parent; @@ -127,6 +129,7 @@ private: void PrepareSet (fd_set * a_Set, cSocket::xSocket & a_Highest); // Puts all sockets into the set, along with m_ControlSocket1 void ReadFromSockets(fd_set * a_Read); // Reads from sockets indicated in a_Read void WriteToSockets (fd_set * a_Write); // Writes to sockets indicated in a_Write + void RemoveClosedSockets(void); // Removes sockets that have closed from m_Slots[] } ; typedef std::list cSocketThreadList;