1
0

Moved client socket accepting into a separate thread object, cListenThread

MCServer can now listen on multiple ports
FS #312

git-svn-id: http://mc-server.googlecode.com/svn/trunk@1252 0a769ca7-a7f5-676a-18bf-c427514a06d6
This commit is contained in:
madmaxoft@gmail.com 2013-03-04 21:13:08 +00:00
parent 7e16d48855
commit 28d8d8419a
9 changed files with 360 additions and 123 deletions

View File

@ -499,6 +499,14 @@
RelativePath="..\source\LightingThread.h"
>
</File>
<File
RelativePath="..\source\ListenThread.cpp"
>
</File>
<File
RelativePath="..\source\ListenThread.h"
>
</File>
<File
RelativePath="..\source\Log.cpp"
>

172
source/ListenThread.cpp Normal file
View File

@ -0,0 +1,172 @@
// ListenThread.cpp
// Implements the cListenThread class representing the thread that listens for client connections
#include "Globals.h"
#include "ListenThread.h"
cListenThread::cListenThread(cCallback & a_Callback) :
super("ListenThread"),
m_Callback(a_Callback)
{
}
cListenThread::~cListenThread()
{
// TODO
}
bool cListenThread::Initialize(const AString & a_PortsString)
{
ASSERT(m_Sockets.empty()); // Not yet started
if (!CreateSockets(a_PortsString))
{
return false;
}
return true;
}
bool cListenThread::Start(void)
{
ASSERT(!m_Sockets.empty()); // Has Initialize() been called?
return super::Start();
}
void cListenThread::Stop(void)
{
m_ShouldTerminate = true;
// Close one socket to wake the thread up from the select() call
m_Sockets[0].CloseSocket();
// Wait for the thread to finish
super::Wait();
// Clean up all sockets
m_Sockets.clear();
}
void cListenThread::SetReuseAddr(bool a_Reuse)
{
ASSERT(m_Sockets.empty()); // Must not be started
m_ShouldReuseAddr = a_Reuse;
}
bool cListenThread::CreateSockets(const AString & a_PortsString)
{
AStringVector Ports = StringSplit(a_PortsString, ",");
if (Ports.empty())
{
return false;
}
for (AStringVector::const_iterator itr = Ports.begin(), end = Ports.end(); itr != end; ++itr)
{
int Port = atoi(Trim(*itr).c_str());
if ((Port <= 0) || (Port > 65535))
{
LOGWARNING("Invalid port specified: \"%s\".", Trim(*itr).c_str());
continue;
}
m_Sockets.push_back(cSocket::CreateSocket());
if (!m_Sockets.back().IsValid())
{
LOGERROR("Cannot create listening socket for port %d: \"%s\"", Port, cSocket::GetLastErrorString().c_str());
m_Sockets.pop_back();
continue;
}
if (m_ShouldReuseAddr)
{
if (m_Sockets.back().SetReuseAddress() == -1)
{
LOG("Port %d cannot reuse addr, syscall failed: \"%s\".", Port, cSocket::GetLastErrorString().c_str());
}
}
m_Sockets.back().BindToAny(Port);
m_Sockets.back().Listen();
LOGD("Port %d is open for connections", Port);
} // for itr - Ports[]
return !(m_Sockets.empty());
}
void cListenThread::Execute(void)
{
// Find the highest socket number:
cSocket::xSocket Highest = m_Sockets[0].GetSocket();
for (cSockets::iterator itr = m_Sockets.begin(), end = m_Sockets.end(); itr != end; ++itr)
{
if (itr->GetSocket() > Highest)
{
Highest = itr->GetSocket();
}
} // for itr - m_Sockets[]
while (!m_ShouldTerminate)
{
// Put all sockets into a FD set:
fd_set fdRead;
FD_ZERO(&fdRead);
for (cSockets::iterator itr = m_Sockets.begin(), end = m_Sockets.end(); itr != end; ++itr)
{
FD_SET(itr->GetSocket(), &fdRead);
} // for itr - m_Sockets[]
if (select(Highest + 1, &fdRead, NULL, NULL, NULL) == -1)
{
LOG("select(R) call failed in cListenThread: \"%s\"", cSocket::GetLastErrorString().c_str());
continue;
}
for (cSockets::iterator itr = m_Sockets.begin(), end = m_Sockets.end(); itr != end; ++itr)
{
if (FD_ISSET(itr->GetSocket(), &fdRead))
{
cSocket Client = itr->Accept();
m_Callback.OnConnectionAccepted(Client);
}
} // for itr - m_Sockets[]
} // while (!m_ShouldTerminate)
}

75
source/ListenThread.h Normal file
View File

@ -0,0 +1,75 @@
// ListenThread.h
// Declares the cListenThread class representing the thread that listens for client connections
#pragma once
#include "OSSupport/IsThread.h"
#include "OSSupport/Socket.h"
// fwd:
class cServer;
class cListenThread :
public cIsThread
{
typedef cIsThread super;
public:
/// Used as the callback for connection events
class cCallback
{
public:
/// This callback is called whenever a socket connection is accepted
virtual void OnConnectionAccepted(cSocket & a_Socket) = 0;
} ;
cListenThread(cCallback & a_Callback);
~cListenThread();
/// Creates all the sockets, returns trus if successful, false if not.
bool Initialize(const AString & a_PortsString);
bool Start(void);
void Stop(void);
/// Call before Initialize() to set the "reuse" flag on the sockets
void SetReuseAddr(bool a_Reuse = true);
protected:
typedef std::vector<cSocket> cSockets;
/// The callback which to notify of incoming connections
cCallback & m_Callback;
/// Sockets that are being monitored
cSockets m_Sockets;
bool m_ShouldReuseAddr;
/** Fills in m_Sockets with individual sockets, each for one port specified in a_PortsString.
Returns true if successful and at least one socket has been created
*/
bool CreateSockets(const AString & a_PortsString);
// cIsThread override:
virtual void Execute(void) override;
} ;

View File

@ -63,6 +63,7 @@ public:
static const unsigned long INTERNET_ADDRESS_ANY = 0;
static unsigned long INTERNET_ADDRESS_LOCALHOST(void); // 127.0.0.1 represented in network byteorder; must be a function due to GCC :(
static const unsigned short ANY_PORT = 0; // When given to Bind() functions, they will find a free port
static const int DEFAULT_BACKLOG = 10;
/// Binds to the specified port on "any" interface (0.0.0.0)
int BindToAny(unsigned short a_Port);
@ -76,9 +77,11 @@ public:
/// Binds to the specified port on localhost interface (127.0.0.1) through IPv4
int BindToLocalhost(unsigned short a_Port);
int Listen( int a_Backlog );
int Listen(int a_Backlog = DEFAULT_BACKLOG);
cSocket Accept();
int Connect(SockAddr_In & a_Address); // Returns 0 on success, !0 on failure
int Connect(const AString & a_HostNameOrAddr, unsigned short a_Port); // Returns 0 on success, !0 on failure
int Receive(char * a_Buffer, unsigned int a_Length, unsigned int a_Flags);
int Send (const char * a_Buffer, unsigned int a_Length);

View File

@ -163,13 +163,12 @@ void cRoot::Start(void)
StartWorlds();
LOG("Starting server...");
m_Server->StartListenThread();
//cHeartBeat* HeartBeat = new cHeartBeat();
m_Server->Start();
#if !defined(ANDROID_NDK)
LOG("Starting InputThread...");
m_InputThread = new cThread( InputThread, this, "cRoot::InputThread" );
m_InputThread->Start( false ); //we should NOT wait? Otherwise we can´t stop the server from other threads than the input thread
m_InputThread->Start( false ); // We should NOT wait? Otherwise we can´t stop the server from other threads than the input thread
#endif
LOG("Initialization done, server running now.");
@ -179,7 +178,7 @@ void cRoot::Start(void)
}
#if !defined(ANDROID_NDK)
delete m_InputThread; m_InputThread = 0;
delete m_InputThread; m_InputThread = NULL;
#endif
// Deallocate stuffs

View File

@ -62,14 +62,10 @@ typedef std::list< cClientHandle* > ClientList;
struct cServer::sServerState
{
sServerState()
: pListenThread( 0 )
, pTickThread( 0 )
, bStopListenThread( false )
: pTickThread(NULL)
, bStopTickThread(false)
{}
cSocket SListenClient; // socket listening for client calls
cThread* pListenThread; bool bStopListenThread;
cThread* pTickThread; bool bStopTickThread;
cEvent RestartEvent;
@ -80,21 +76,6 @@ struct cServer::sServerState
void cServer::ServerListenThread( void *a_Args )
{
LOG("ServerListenThread");
cServer* self = (cServer*)a_Args;
sServerState* m_pState = self->m_pState;
while( !m_pState->bStopListenThread )
{
self->StartListenClient();
}
}
void cServer::ClientDestroying(const cClientHandle * a_Client)
{
m_SocketThreads.StopReading(a_Client);
@ -172,36 +153,13 @@ bool cServer::InitServer(cIniFile & a_SettingsIni)
return false;
}
m_pState->SListenClient = cSocket::CreateSocket();
if( !m_pState->SListenClient.IsValid() )
AString Ports = a_SettingsIni.GetValueSet("Server", "Port", "25565");
m_ListenThread.SetReuseAddr(true);
if (!m_ListenThread.Initialize(Ports))
{
LOGERROR("m_SListenClient==INVALID_SOCKET (%s)", cSocket::GetErrorString( cSocket::GetLastError() ).c_str() );
return false;
}
if( m_pState->SListenClient.SetReuseAddress() == -1 )
{
LOGERROR("setsockopt == -1");
return false;
}
int Port = a_SettingsIni.GetValueSetI("Server", "Port", 25565);
if (m_pState->SListenClient.BindToAny(Port) != 0)
{
LOGERROR("bind fail (%s)", cSocket::GetErrorString( cSocket::GetLastError() ).c_str() );
return false;
}
if( m_pState->SListenClient.Listen( 10 ) != 0)
{
LOGERROR("listen fail (%s)", cSocket::GetErrorString( cSocket::GetLastError() ).c_str() );
return false;
}
m_iServerPort = Port;
LOG("Port %i has been bound", m_iServerPort);
m_bIsConnected = true;
m_pState->ServerID = "-";
@ -241,12 +199,12 @@ bool cServer::InitServer(cIniFile & a_SettingsIni)
cServer::cServer()
cServer::cServer(void)
: m_pState(new sServerState)
, m_ListenThread(*this)
, m_Millisecondsf(0)
, m_Milliseconds(0)
, m_bIsConnected(false)
, m_iServerPort( 0 )
, m_bRestarting(false)
{
}
@ -258,14 +216,6 @@ cServer::cServer()
cServer::~cServer()
{
// TODO: Shut down the server gracefully
if ( m_pState->SListenClient )
{
m_pState->SListenClient.CloseSocket();
}
m_pState->SListenClient = 0;
m_pState->bStopListenThread = true;
delete m_pState->pListenThread; m_pState->pListenThread = NULL;
m_pState->bStopTickThread = true;
delete m_pState->pTickThread; m_pState->pTickThread = NULL;
@ -295,6 +245,41 @@ void cServer::PrepareKeys(void)
void cServer::OnConnectionAccepted(cSocket & a_Socket)
{
if (!a_Socket.IsValid())
{
return;
}
const AString & ClientIP = a_Socket.GetIPString();
if (ClientIP.empty())
{
LOGWARN("cServer: A client connected, but didn't present its IP, disconnecting.");
a_Socket.CloseSocket();
return;
}
LOG("Client \"%s\" connected!", ClientIP.c_str());
cClientHandle * NewHandle = new cClientHandle(&a_Socket, m_ClientViewDistance);
if (!m_SocketThreads.AddClient(a_Socket, NewHandle))
{
// For some reason SocketThreads have rejected the handle, clean it up
LOGERROR("Client \"%s\" cannot be handled, server probably unstable", ClientIP.c_str());
a_Socket.CloseSocket();
delete NewHandle;
return;
}
cCSLock Lock(m_CSClients);
m_Clients.push_back(NewHandle);
}
void cServer::BroadcastChat(const AString & a_Message, const cClientHandle * a_Exclude)
{
cCSLock Lock(m_CSClients);
@ -312,43 +297,6 @@ void cServer::BroadcastChat(const AString & a_Message, const cClientHandle * a_E
void cServer::StartListenClient()
{
cSocket SClient = m_pState->SListenClient.Accept();
if (!SClient.IsValid())
{
return;
}
const AString & ClientIP = SClient.GetIPString();
if (ClientIP.empty())
{
LOGWARN("cServer: A client connected, but didn't present its IP, disconnecting.");
SClient.CloseSocket();
return;
}
LOG("Client \"%s\" connected!", ClientIP.c_str());
cClientHandle * NewHandle = new cClientHandle(&SClient, m_ClientViewDistance);
if (!m_SocketThreads.AddClient(SClient, NewHandle))
{
// For some reason SocketThreads have rejected the handle, clean it up
LOGERROR("Client \"%s\" cannot be handled, server probably unstable", SClient.GetIPString().c_str());
SClient.CloseSocket();
delete NewHandle;
return;
}
cCSLock Lock(m_CSClients);
m_Clients.push_back( NewHandle );
}
bool cServer::Tick(float a_Dt)
{
//LOG("1. Tick %0.2f", a_Dt);
@ -434,12 +382,15 @@ void ServerTickThread( void * a_Param )
void cServer::StartListenThread()
bool cServer::Start(void)
{
m_pState->pListenThread = new cThread( ServerListenThread, this, "cServer::ServerListenThread" );
m_pState->pTickThread = new cThread( ServerTickThread, this, "cServer::ServerTickThread" );
m_pState->pListenThread->Start( true );
if (!m_ListenThread.Start())
{
return false;
}
m_pState->pTickThread->Start( true );
return true;
}
@ -532,6 +483,8 @@ void cServer::SendMessage(const AString & a_Message, cPlayer * a_Player /* = NUL
void cServer::Shutdown()
{
m_ListenThread.Stop();
m_bRestarting = true;
m_pState->RestartEvent.Wait();

View File

@ -14,6 +14,7 @@
#include "OSSupport/SocketThreads.h"
#include "CryptoPP/rsa.h"
#include "CryptoPP/randpool.h"
#include "ListenThread.h"
@ -30,19 +31,18 @@ typedef std::list<cClientHandle *> cClientHandleList;
class cServer // tolua_export
: public cListenThread::cCallback
{ // tolua_export
public: // tolua_export
bool InitServer(cIniFile & a_SettingsIni);
int GetPort() { return m_iServerPort; }
bool IsConnected(){return m_bIsConnected;} // returns connection status
void StartListenClient(); // Listen to client
bool IsConnected(void) const { return m_bIsConnected;} // returns connection status
void BroadcastChat(const AString & a_Message, const cClientHandle * a_Exclude = NULL); // tolua_export
bool Tick(float a_Dt);
void StartListenThread();
bool Start(void);
bool Command(cClientHandle & a_Client, const AString & a_Cmd);
void ExecuteConsoleCommand(const AString & a_Cmd);
@ -57,8 +57,6 @@ public: // tolua_export
void KickUser(int a_ClientID, const AString & a_Reason);
void AuthenticateUser(int a_ClientID); // Called by cAuthenticator to auth the specified user
static void ServerListenThread( void* a_Args );
const AString & GetServerID(void) const;
void ClientDestroying(const cClientHandle * a_Client); // Called by cClientHandle::Destroy(); stop m_SocketThreads from calling back into a_Client
@ -106,6 +104,7 @@ private:
sServerState* m_pState;
cNotifyWriteThread m_NotifyWriteThread;
cListenThread m_ListenThread;
cCriticalSection m_CSClients; // Locks client list
cClientHandleList m_Clients; // Clients that are connected to the server
@ -126,11 +125,14 @@ private:
CryptoPP::RSA::PrivateKey m_PrivateKey;
CryptoPP::RSA::PublicKey m_PublicKey;
cServer();
cServer(void);
~cServer();
/// Loads, or generates, if missing, RSA keys for protocol encryption
void PrepareKeys(void);
// cListenThread::cCallback overrides:
virtual void OnConnectionAccepted(cSocket & a_Socket) override;
}; // tolua_export

View File

@ -535,3 +535,26 @@ AString & CreateHexDump(AString & a_Out, const void * a_Data, int a_Size, int a_
AString Trim(const AString & a_Text)
{
if (a_Text.empty())
{
return "";
}
size_t Beginning = a_Text.find_first_not_of(" \r\n\t");
if (Beginning == AString::npos)
{
Beginning = 0;
}
size_t End = a_Text.find_last_not_of(" \r\n\t");
if (End == AString::npos)
{
End = a_Text.length();
}
return a_Text.substr(Beginning, End - Beginning + 1);
}

View File

@ -63,6 +63,8 @@ extern AString & UTF8ToRawBEUTF16(const char * a_UTF8, size_t a_UTF8Length, AStr
/// Creates a nicely formatted HEX dump of the given memory block. Max a_BytesPerLine is 120
extern AString & CreateHexDump(AString & a_Out, const void * a_Data, int a_Size, int a_BytesPerLine);
/// Removes whitespace at the beginning and left of the string
extern AString Trim(const AString & a_Text);
// If you have any other string helper functions, declare them here