From 28d8d8419a5b900e9d20ce91dc63e28349b6470a Mon Sep 17 00:00:00 2001 From: "madmaxoft@gmail.com" Date: Mon, 4 Mar 2013 21:13:08 +0000 Subject: [PATCH] Moved client socket accepting into a separate thread object, cListenThread MCServer can now listen on multiple ports FS #312 git-svn-id: http://mc-server.googlecode.com/svn/trunk@1252 0a769ca7-a7f5-676a-18bf-c427514a06d6 --- VC2008/MCServer.vcproj | 8 ++ source/ListenThread.cpp | 172 ++++++++++++++++++++++++++++++++++++++ source/ListenThread.h | 75 +++++++++++++++++ source/OSSupport/Socket.h | 7 +- source/Root.cpp | 21 +++-- source/Server.cpp | 157 ++++++++++++---------------------- source/Server.h | 18 ++-- source/StringUtils.cpp | 23 +++++ source/StringUtils.h | 2 + 9 files changed, 360 insertions(+), 123 deletions(-) create mode 100644 source/ListenThread.cpp create mode 100644 source/ListenThread.h diff --git a/VC2008/MCServer.vcproj b/VC2008/MCServer.vcproj index e4c0e7490..a1b7e186e 100644 --- a/VC2008/MCServer.vcproj +++ b/VC2008/MCServer.vcproj @@ -499,6 +499,14 @@ RelativePath="..\source\LightingThread.h" > + + + + diff --git a/source/ListenThread.cpp b/source/ListenThread.cpp new file mode 100644 index 000000000..52a4df9e4 --- /dev/null +++ b/source/ListenThread.cpp @@ -0,0 +1,172 @@ + +// ListenThread.cpp + +// Implements the cListenThread class representing the thread that listens for client connections + +#include "Globals.h" +#include "ListenThread.h" + + + + + +cListenThread::cListenThread(cCallback & a_Callback) : + super("ListenThread"), + m_Callback(a_Callback) +{ +} + + + + + +cListenThread::~cListenThread() +{ + // TODO +} + + + + + +bool cListenThread::Initialize(const AString & a_PortsString) +{ + ASSERT(m_Sockets.empty()); // Not yet started + + if (!CreateSockets(a_PortsString)) + { + return false; + } + + return true; +} + + + + + +bool cListenThread::Start(void) +{ + ASSERT(!m_Sockets.empty()); // Has Initialize() been called? + + return super::Start(); +} + + + + + +void cListenThread::Stop(void) +{ + m_ShouldTerminate = true; + + // Close one socket to wake the thread up from the select() call + m_Sockets[0].CloseSocket(); + + // Wait for the thread to finish + super::Wait(); + + // Clean up all sockets + m_Sockets.clear(); +} + + + + + +void cListenThread::SetReuseAddr(bool a_Reuse) +{ + ASSERT(m_Sockets.empty()); // Must not be started + + m_ShouldReuseAddr = a_Reuse; +} + + + + + +bool cListenThread::CreateSockets(const AString & a_PortsString) +{ + AStringVector Ports = StringSplit(a_PortsString, ","); + + if (Ports.empty()) + { + return false; + } + + for (AStringVector::const_iterator itr = Ports.begin(), end = Ports.end(); itr != end; ++itr) + { + int Port = atoi(Trim(*itr).c_str()); + if ((Port <= 0) || (Port > 65535)) + { + LOGWARNING("Invalid port specified: \"%s\".", Trim(*itr).c_str()); + continue; + } + m_Sockets.push_back(cSocket::CreateSocket()); + if (!m_Sockets.back().IsValid()) + { + LOGERROR("Cannot create listening socket for port %d: \"%s\"", Port, cSocket::GetLastErrorString().c_str()); + m_Sockets.pop_back(); + continue; + } + + if (m_ShouldReuseAddr) + { + if (m_Sockets.back().SetReuseAddress() == -1) + { + LOG("Port %d cannot reuse addr, syscall failed: \"%s\".", Port, cSocket::GetLastErrorString().c_str()); + } + } + m_Sockets.back().BindToAny(Port); + m_Sockets.back().Listen(); + LOGD("Port %d is open for connections", Port); + } // for itr - Ports[] + + return !(m_Sockets.empty()); +} + + + + + +void cListenThread::Execute(void) +{ + // Find the highest socket number: + cSocket::xSocket Highest = m_Sockets[0].GetSocket(); + for (cSockets::iterator itr = m_Sockets.begin(), end = m_Sockets.end(); itr != end; ++itr) + { + if (itr->GetSocket() > Highest) + { + Highest = itr->GetSocket(); + } + } // for itr - m_Sockets[] + + while (!m_ShouldTerminate) + { + // Put all sockets into a FD set: + fd_set fdRead; + FD_ZERO(&fdRead); + for (cSockets::iterator itr = m_Sockets.begin(), end = m_Sockets.end(); itr != end; ++itr) + { + FD_SET(itr->GetSocket(), &fdRead); + } // for itr - m_Sockets[] + + if (select(Highest + 1, &fdRead, NULL, NULL, NULL) == -1) + { + LOG("select(R) call failed in cListenThread: \"%s\"", cSocket::GetLastErrorString().c_str()); + continue; + } + for (cSockets::iterator itr = m_Sockets.begin(), end = m_Sockets.end(); itr != end; ++itr) + { + if (FD_ISSET(itr->GetSocket(), &fdRead)) + { + cSocket Client = itr->Accept(); + m_Callback.OnConnectionAccepted(Client); + } + } // for itr - m_Sockets[] + } // while (!m_ShouldTerminate) +} + + + + diff --git a/source/ListenThread.h b/source/ListenThread.h new file mode 100644 index 000000000..90523ea4f --- /dev/null +++ b/source/ListenThread.h @@ -0,0 +1,75 @@ + +// ListenThread.h + +// Declares the cListenThread class representing the thread that listens for client connections + + + + + +#pragma once + +#include "OSSupport/IsThread.h" +#include "OSSupport/Socket.h" + + + + + +// fwd: +class cServer; + + + + + +class cListenThread : + public cIsThread +{ + typedef cIsThread super; + +public: + /// Used as the callback for connection events + class cCallback + { + public: + /// This callback is called whenever a socket connection is accepted + virtual void OnConnectionAccepted(cSocket & a_Socket) = 0; + } ; + + cListenThread(cCallback & a_Callback); + ~cListenThread(); + + /// Creates all the sockets, returns trus if successful, false if not. + bool Initialize(const AString & a_PortsString); + + bool Start(void); + + void Stop(void); + + /// Call before Initialize() to set the "reuse" flag on the sockets + void SetReuseAddr(bool a_Reuse = true); + +protected: + typedef std::vector cSockets; + + /// The callback which to notify of incoming connections + cCallback & m_Callback; + + /// Sockets that are being monitored + cSockets m_Sockets; + + bool m_ShouldReuseAddr; + + /** Fills in m_Sockets with individual sockets, each for one port specified in a_PortsString. + Returns true if successful and at least one socket has been created + */ + bool CreateSockets(const AString & a_PortsString); + + // cIsThread override: + virtual void Execute(void) override; +} ; + + + + diff --git a/source/OSSupport/Socket.h b/source/OSSupport/Socket.h index 71ec99fad..c1e510387 100644 --- a/source/OSSupport/Socket.h +++ b/source/OSSupport/Socket.h @@ -63,6 +63,7 @@ public: static const unsigned long INTERNET_ADDRESS_ANY = 0; static unsigned long INTERNET_ADDRESS_LOCALHOST(void); // 127.0.0.1 represented in network byteorder; must be a function due to GCC :( static const unsigned short ANY_PORT = 0; // When given to Bind() functions, they will find a free port + static const int DEFAULT_BACKLOG = 10; /// Binds to the specified port on "any" interface (0.0.0.0) int BindToAny(unsigned short a_Port); @@ -76,11 +77,13 @@ public: /// Binds to the specified port on localhost interface (127.0.0.1) through IPv4 int BindToLocalhost(unsigned short a_Port); - int Listen( int a_Backlog ); + int Listen(int a_Backlog = DEFAULT_BACKLOG); cSocket Accept(); + int Connect(SockAddr_In & a_Address); // Returns 0 on success, !0 on failure + int Connect(const AString & a_HostNameOrAddr, unsigned short a_Port); // Returns 0 on success, !0 on failure - int Receive( char* a_Buffer, unsigned int a_Length, unsigned int a_Flags ); + int Receive(char * a_Buffer, unsigned int a_Length, unsigned int a_Flags); int Send (const char * a_Buffer, unsigned int a_Length); unsigned short GetPort(void) const; // Returns 0 on failure diff --git a/source/Root.cpp b/source/Root.cpp index 41788ff8d..ea33afe7e 100644 --- a/source/Root.cpp +++ b/source/Root.cpp @@ -163,28 +163,27 @@ void cRoot::Start(void) StartWorlds(); LOG("Starting server..."); - m_Server->StartListenThread(); - //cHeartBeat* HeartBeat = new cHeartBeat(); + m_Server->Start(); -#if !defined(ANDROID_NDK) + #if !defined(ANDROID_NDK) LOG("Starting InputThread..."); m_InputThread = new cThread( InputThread, this, "cRoot::InputThread" ); - m_InputThread->Start( false ); //we should NOT wait? Otherwise we canīt stop the server from other threads than the input thread -#endif + m_InputThread->Start( false ); // We should NOT wait? Otherwise we canīt stop the server from other threads than the input thread + #endif LOG("Initialization done, server running now."); - while( !m_bStop && !m_bRestart ) // These are modified by external threads + while (!m_bStop && !m_bRestart) // These are modified by external threads { - cSleep::MilliSleep( 1000 ); + cSleep::MilliSleep(1000); } -#if !defined(ANDROID_NDK) - delete m_InputThread; m_InputThread = 0; -#endif + #if !defined(ANDROID_NDK) + delete m_InputThread; m_InputThread = NULL; + #endif // Deallocate stuffs LOG("Shutting down server..."); - m_Server->Shutdown(); // This waits for threads to stop and d/c clients + m_Server->Shutdown(); // This waits for threads to stop and d/c clients LOG("Stopping world threads..."); StopWorlds(); LOG("Stopping authenticator..."); diff --git a/source/Server.cpp b/source/Server.cpp index 1e830874e..3fcaa6e8e 100644 --- a/source/Server.cpp +++ b/source/Server.cpp @@ -62,14 +62,10 @@ typedef std::list< cClientHandle* > ClientList; struct cServer::sServerState { sServerState() - : pListenThread( 0 ) - , pTickThread( 0 ) - , bStopListenThread( false ) - , bStopTickThread( false ) + : pTickThread(NULL) + , bStopTickThread(false) {} - cSocket SListenClient; // socket listening for client calls - cThread* pListenThread; bool bStopListenThread; cThread* pTickThread; bool bStopTickThread; cEvent RestartEvent; @@ -80,21 +76,6 @@ struct cServer::sServerState -void cServer::ServerListenThread( void *a_Args ) -{ - LOG("ServerListenThread"); - cServer* self = (cServer*)a_Args; - sServerState* m_pState = self->m_pState; - while( !m_pState->bStopListenThread ) - { - self->StartListenClient(); - } -} - - - - - void cServer::ClientDestroying(const cClientHandle * a_Client) { m_SocketThreads.StopReading(a_Client); @@ -172,36 +153,13 @@ bool cServer::InitServer(cIniFile & a_SettingsIni) return false; } - m_pState->SListenClient = cSocket::CreateSocket(); - - if( !m_pState->SListenClient.IsValid() ) + AString Ports = a_SettingsIni.GetValueSet("Server", "Port", "25565"); + m_ListenThread.SetReuseAddr(true); + if (!m_ListenThread.Initialize(Ports)) { - LOGERROR("m_SListenClient==INVALID_SOCKET (%s)", cSocket::GetErrorString( cSocket::GetLastError() ).c_str() ); return false; } - if( m_pState->SListenClient.SetReuseAddress() == -1 ) - { - LOGERROR("setsockopt == -1"); - return false; - } - - int Port = a_SettingsIni.GetValueSetI("Server", "Port", 25565); - - if (m_pState->SListenClient.BindToAny(Port) != 0) - { - LOGERROR("bind fail (%s)", cSocket::GetErrorString( cSocket::GetLastError() ).c_str() ); - return false; - } - - if( m_pState->SListenClient.Listen( 10 ) != 0) - { - LOGERROR("listen fail (%s)", cSocket::GetErrorString( cSocket::GetLastError() ).c_str() ); - return false; - } - - m_iServerPort = Port; - LOG("Port %i has been bound", m_iServerPort); m_bIsConnected = true; m_pState->ServerID = "-"; @@ -241,13 +199,13 @@ bool cServer::InitServer(cIniFile & a_SettingsIni) -cServer::cServer() - : m_pState( new sServerState ) - , m_Millisecondsf( 0 ) - , m_Milliseconds( 0 ) - , m_bIsConnected( false ) - , m_iServerPort( 0 ) - , m_bRestarting( false ) +cServer::cServer(void) + : m_pState(new sServerState) + , m_ListenThread(*this) + , m_Millisecondsf(0) + , m_Milliseconds(0) + , m_bIsConnected(false) + , m_bRestarting(false) { } @@ -258,14 +216,6 @@ cServer::cServer() cServer::~cServer() { // TODO: Shut down the server gracefully - if ( m_pState->SListenClient ) - { - m_pState->SListenClient.CloseSocket(); - } - m_pState->SListenClient = 0; - - m_pState->bStopListenThread = true; - delete m_pState->pListenThread; m_pState->pListenThread = NULL; m_pState->bStopTickThread = true; delete m_pState->pTickThread; m_pState->pTickThread = NULL; @@ -295,6 +245,41 @@ void cServer::PrepareKeys(void) +void cServer::OnConnectionAccepted(cSocket & a_Socket) +{ + if (!a_Socket.IsValid()) + { + return; + } + + const AString & ClientIP = a_Socket.GetIPString(); + if (ClientIP.empty()) + { + LOGWARN("cServer: A client connected, but didn't present its IP, disconnecting."); + a_Socket.CloseSocket(); + return; + } + + LOG("Client \"%s\" connected!", ClientIP.c_str()); + + cClientHandle * NewHandle = new cClientHandle(&a_Socket, m_ClientViewDistance); + if (!m_SocketThreads.AddClient(a_Socket, NewHandle)) + { + // For some reason SocketThreads have rejected the handle, clean it up + LOGERROR("Client \"%s\" cannot be handled, server probably unstable", ClientIP.c_str()); + a_Socket.CloseSocket(); + delete NewHandle; + return; + } + + cCSLock Lock(m_CSClients); + m_Clients.push_back(NewHandle); +} + + + + + void cServer::BroadcastChat(const AString & a_Message, const cClientHandle * a_Exclude) { cCSLock Lock(m_CSClients); @@ -312,43 +297,6 @@ void cServer::BroadcastChat(const AString & a_Message, const cClientHandle * a_E -void cServer::StartListenClient() -{ - cSocket SClient = m_pState->SListenClient.Accept(); - - if (!SClient.IsValid()) - { - return; - } - - const AString & ClientIP = SClient.GetIPString(); - if (ClientIP.empty()) - { - LOGWARN("cServer: A client connected, but didn't present its IP, disconnecting."); - SClient.CloseSocket(); - return; - } - - LOG("Client \"%s\" connected!", ClientIP.c_str()); - - cClientHandle * NewHandle = new cClientHandle(&SClient, m_ClientViewDistance); - if (!m_SocketThreads.AddClient(SClient, NewHandle)) - { - // For some reason SocketThreads have rejected the handle, clean it up - LOGERROR("Client \"%s\" cannot be handled, server probably unstable", SClient.GetIPString().c_str()); - SClient.CloseSocket(); - delete NewHandle; - return; - } - - cCSLock Lock(m_CSClients); - m_Clients.push_back( NewHandle ); -} - - - - - bool cServer::Tick(float a_Dt) { //LOG("1. Tick %0.2f", a_Dt); @@ -434,12 +382,15 @@ void ServerTickThread( void * a_Param ) -void cServer::StartListenThread() +bool cServer::Start(void) { - m_pState->pListenThread = new cThread( ServerListenThread, this, "cServer::ServerListenThread" ); m_pState->pTickThread = new cThread( ServerTickThread, this, "cServer::ServerTickThread" ); - m_pState->pListenThread->Start( true ); + if (!m_ListenThread.Start()) + { + return false; + } m_pState->pTickThread->Start( true ); + return true; } @@ -532,6 +483,8 @@ void cServer::SendMessage(const AString & a_Message, cPlayer * a_Player /* = NUL void cServer::Shutdown() { + m_ListenThread.Stop(); + m_bRestarting = true; m_pState->RestartEvent.Wait(); diff --git a/source/Server.h b/source/Server.h index 707f91261..ec91bb6c0 100644 --- a/source/Server.h +++ b/source/Server.h @@ -14,6 +14,7 @@ #include "OSSupport/SocketThreads.h" #include "CryptoPP/rsa.h" #include "CryptoPP/randpool.h" +#include "ListenThread.h" @@ -30,19 +31,18 @@ typedef std::list cClientHandleList; class cServer // tolua_export + : public cListenThread::cCallback { // tolua_export public: // tolua_export bool InitServer(cIniFile & a_SettingsIni); - int GetPort() { return m_iServerPort; } - bool IsConnected(){return m_bIsConnected;} // returns connection status - void StartListenClient(); // Listen to client - + bool IsConnected(void) const { return m_bIsConnected;} // returns connection status + void BroadcastChat(const AString & a_Message, const cClientHandle * a_Exclude = NULL); // tolua_export bool Tick(float a_Dt); - void StartListenThread(); + bool Start(void); bool Command(cClientHandle & a_Client, const AString & a_Cmd); void ExecuteConsoleCommand(const AString & a_Cmd); @@ -57,8 +57,6 @@ public: // tolua_export void KickUser(int a_ClientID, const AString & a_Reason); void AuthenticateUser(int a_ClientID); // Called by cAuthenticator to auth the specified user - static void ServerListenThread( void* a_Args ); - const AString & GetServerID(void) const; void ClientDestroying(const cClientHandle * a_Client); // Called by cClientHandle::Destroy(); stop m_SocketThreads from calling back into a_Client @@ -106,6 +104,7 @@ private: sServerState* m_pState; cNotifyWriteThread m_NotifyWriteThread; + cListenThread m_ListenThread; cCriticalSection m_CSClients; // Locks client list cClientHandleList m_Clients; // Clients that are connected to the server @@ -126,11 +125,14 @@ private: CryptoPP::RSA::PrivateKey m_PrivateKey; CryptoPP::RSA::PublicKey m_PublicKey; - cServer(); + cServer(void); ~cServer(); /// Loads, or generates, if missing, RSA keys for protocol encryption void PrepareKeys(void); + + // cListenThread::cCallback overrides: + virtual void OnConnectionAccepted(cSocket & a_Socket) override; }; // tolua_export diff --git a/source/StringUtils.cpp b/source/StringUtils.cpp index 161a8a168..dc128e61d 100644 --- a/source/StringUtils.cpp +++ b/source/StringUtils.cpp @@ -535,3 +535,26 @@ AString & CreateHexDump(AString & a_Out, const void * a_Data, int a_Size, int a_ + +AString Trim(const AString & a_Text) +{ + if (a_Text.empty()) + { + return ""; + } + size_t Beginning = a_Text.find_first_not_of(" \r\n\t"); + if (Beginning == AString::npos) + { + Beginning = 0; + } + size_t End = a_Text.find_last_not_of(" \r\n\t"); + if (End == AString::npos) + { + End = a_Text.length(); + } + return a_Text.substr(Beginning, End - Beginning + 1); +} + + + + diff --git a/source/StringUtils.h b/source/StringUtils.h index dbf553773..f8a7d7106 100644 --- a/source/StringUtils.h +++ b/source/StringUtils.h @@ -63,6 +63,8 @@ extern AString & UTF8ToRawBEUTF16(const char * a_UTF8, size_t a_UTF8Length, AStr /// Creates a nicely formatted HEX dump of the given memory block. Max a_BytesPerLine is 120 extern AString & CreateHexDump(AString & a_Out, const void * a_Data, int a_Size, int a_BytesPerLine); +/// Removes whitespace at the beginning and left of the string +extern AString Trim(const AString & a_Text); // If you have any other string helper functions, declare them here