Removed ListenThread and SocketThreads.
They have been replaced by the cNetwork API. Socket.cpp is still used by RCONClient.
This commit is contained in:
parent
86f2f82d2a
commit
f243aa387c
@ -13,12 +13,9 @@ SET (SRCS
|
||||
HostnameLookup.cpp
|
||||
IPLookup.cpp
|
||||
IsThread.cpp
|
||||
ListenThread.cpp
|
||||
NetworkSingleton.cpp
|
||||
Semaphore.cpp
|
||||
ServerHandleImpl.cpp
|
||||
Socket.cpp
|
||||
SocketThreads.cpp
|
||||
StackTrace.cpp
|
||||
TCPLinkImpl.cpp
|
||||
)
|
||||
@ -32,14 +29,11 @@ SET (HDRS
|
||||
HostnameLookup.h
|
||||
IPLookup.h
|
||||
IsThread.h
|
||||
ListenThread.h
|
||||
Network.h
|
||||
NetworkSingleton.h
|
||||
Queue.h
|
||||
Semaphore.h
|
||||
ServerHandleImpl.h
|
||||
Socket.h
|
||||
SocketThreads.h
|
||||
StackTrace.h
|
||||
TCPLinkImpl.h
|
||||
)
|
||||
|
@ -1,238 +0,0 @@
|
||||
|
||||
// ListenThread.cpp
|
||||
|
||||
// Implements the cListenThread class representing the thread that listens for client connections
|
||||
|
||||
#include "Globals.h"
|
||||
#include "ListenThread.h"
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
cListenThread::cListenThread(cCallback & a_Callback, cSocket::eFamily a_Family, const AString & a_ServiceName) :
|
||||
super(Printf("ListenThread %s", a_ServiceName.c_str())),
|
||||
m_Callback(a_Callback),
|
||||
m_Family(a_Family),
|
||||
m_ShouldReuseAddr(false),
|
||||
m_ServiceName(a_ServiceName)
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
cListenThread::~cListenThread()
|
||||
{
|
||||
Stop();
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
bool cListenThread::Initialize(const AString & a_PortsString)
|
||||
{
|
||||
ASSERT(m_Sockets.empty()); // Not yet started
|
||||
|
||||
if (!CreateSockets(a_PortsString))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
bool cListenThread::Start(void)
|
||||
{
|
||||
if (m_Sockets.empty())
|
||||
{
|
||||
// There are no sockets listening, either forgotten to initialize or the user specified no listening ports
|
||||
// Report as successful, though
|
||||
return true;
|
||||
}
|
||||
return super::Start();
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
void cListenThread::Stop(void)
|
||||
{
|
||||
if (m_Sockets.empty())
|
||||
{
|
||||
// No sockets means no thread was running in the first place
|
||||
return;
|
||||
}
|
||||
|
||||
m_ShouldTerminate = true;
|
||||
|
||||
// Close one socket to wake the thread up from the select() call
|
||||
m_Sockets[0].CloseSocket();
|
||||
|
||||
// Wait for the thread to finish
|
||||
super::Wait();
|
||||
|
||||
// Close all the listening sockets:
|
||||
for (cSockets::iterator itr = m_Sockets.begin() + 1, end = m_Sockets.end(); itr != end; ++itr)
|
||||
{
|
||||
itr->CloseSocket();
|
||||
} // for itr - m_Sockets[]
|
||||
m_Sockets.clear();
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
void cListenThread::SetReuseAddr(bool a_Reuse)
|
||||
{
|
||||
ASSERT(m_Sockets.empty()); // Must not have been Initialize()d yet
|
||||
|
||||
m_ShouldReuseAddr = a_Reuse;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
bool cListenThread::CreateSockets(const AString & a_PortsString)
|
||||
{
|
||||
AStringVector Ports = StringSplitAndTrim(a_PortsString, ",");
|
||||
|
||||
if (Ports.empty())
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
AString FamilyStr = m_ServiceName;
|
||||
switch (m_Family)
|
||||
{
|
||||
case cSocket::IPv4: FamilyStr.append(" IPv4"); break;
|
||||
case cSocket::IPv6: FamilyStr.append(" IPv6"); break;
|
||||
default:
|
||||
{
|
||||
ASSERT(!"Unknown address family");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
for (AStringVector::const_iterator itr = Ports.begin(), end = Ports.end(); itr != end; ++itr)
|
||||
{
|
||||
int Port = atoi(itr->c_str());
|
||||
if ((Port <= 0) || (Port > 65535))
|
||||
{
|
||||
LOGWARNING("%s: Invalid port specified: \"%s\".", FamilyStr.c_str(), itr->c_str());
|
||||
continue;
|
||||
}
|
||||
m_Sockets.push_back(cSocket::CreateSocket(m_Family));
|
||||
if (!m_Sockets.back().IsValid())
|
||||
{
|
||||
LOGWARNING("%s: Cannot create listening socket for port %d: \"%s\"", FamilyStr.c_str(), Port, cSocket::GetLastErrorString().c_str());
|
||||
m_Sockets.pop_back();
|
||||
continue;
|
||||
}
|
||||
|
||||
if (m_ShouldReuseAddr)
|
||||
{
|
||||
if (!m_Sockets.back().SetReuseAddress())
|
||||
{
|
||||
LOG("%s: Port %d cannot reuse addr, syscall failed: \"%s\".", FamilyStr.c_str(), Port, cSocket::GetLastErrorString().c_str());
|
||||
}
|
||||
}
|
||||
|
||||
// Bind to port:
|
||||
bool res = false;
|
||||
switch (m_Family)
|
||||
{
|
||||
case cSocket::IPv4: res = m_Sockets.back().BindToAnyIPv4(Port); break;
|
||||
case cSocket::IPv6: res = m_Sockets.back().BindToAnyIPv6(Port); break;
|
||||
default:
|
||||
{
|
||||
ASSERT(!"Unknown address family");
|
||||
res = false;
|
||||
}
|
||||
}
|
||||
if (!res)
|
||||
{
|
||||
LOGWARNING("%s: Cannot bind port %d: \"%s\".", FamilyStr.c_str(), Port, cSocket::GetLastErrorString().c_str());
|
||||
m_Sockets.pop_back();
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!m_Sockets.back().Listen())
|
||||
{
|
||||
LOGWARNING("%s: Cannot listen on port %d: \"%s\".", FamilyStr.c_str(), Port, cSocket::GetLastErrorString().c_str());
|
||||
m_Sockets.pop_back();
|
||||
continue;
|
||||
}
|
||||
|
||||
LOGINFO("%s: Port %d is open for connections", FamilyStr.c_str(), Port);
|
||||
} // for itr - Ports[]
|
||||
|
||||
return !(m_Sockets.empty());
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
void cListenThread::Execute(void)
|
||||
{
|
||||
if (m_Sockets.empty())
|
||||
{
|
||||
LOGD("Empty cListenThread, ending thread now.");
|
||||
return;
|
||||
}
|
||||
|
||||
// Find the highest socket number:
|
||||
cSocket::xSocket Highest = m_Sockets[0].GetSocket();
|
||||
for (cSockets::iterator itr = m_Sockets.begin(), end = m_Sockets.end(); itr != end; ++itr)
|
||||
{
|
||||
if (itr->GetSocket() > Highest)
|
||||
{
|
||||
Highest = itr->GetSocket();
|
||||
}
|
||||
} // for itr - m_Sockets[]
|
||||
|
||||
while (!m_ShouldTerminate)
|
||||
{
|
||||
// Put all sockets into a FD set:
|
||||
fd_set fdRead;
|
||||
FD_ZERO(&fdRead);
|
||||
for (cSockets::iterator itr = m_Sockets.begin(), end = m_Sockets.end(); itr != end; ++itr)
|
||||
{
|
||||
FD_SET(itr->GetSocket(), &fdRead);
|
||||
} // for itr - m_Sockets[]
|
||||
|
||||
timeval tv; // On Linux select() doesn't seem to wake up when socket is closed, so let's kinda busy-wait:
|
||||
tv.tv_sec = 1;
|
||||
tv.tv_usec = 0;
|
||||
if (select((int)Highest + 1, &fdRead, nullptr, nullptr, &tv) == -1)
|
||||
{
|
||||
LOG("select(R) call failed in cListenThread: \"%s\"", cSocket::GetLastErrorString().c_str());
|
||||
continue;
|
||||
}
|
||||
for (cSockets::iterator itr = m_Sockets.begin(), end = m_Sockets.end(); itr != end; ++itr)
|
||||
{
|
||||
if (itr->IsValid() && FD_ISSET(itr->GetSocket(), &fdRead))
|
||||
{
|
||||
cSocket Client = (m_Family == cSocket::IPv4) ? itr->AcceptIPv4() : itr->AcceptIPv6();
|
||||
if (Client.IsValid())
|
||||
{
|
||||
m_Callback.OnConnectionAccepted(Client);
|
||||
}
|
||||
}
|
||||
} // for itr - m_Sockets[]
|
||||
} // while (!m_ShouldTerminate)
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
@ -1,85 +0,0 @@
|
||||
|
||||
// ListenThread.h
|
||||
|
||||
// Declares the cListenThread class representing the thread that listens for client connections
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "IsThread.h"
|
||||
#include "Socket.h"
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
// fwd:
|
||||
class cServer;
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
class cListenThread :
|
||||
public cIsThread
|
||||
{
|
||||
typedef cIsThread super;
|
||||
|
||||
public:
|
||||
/** Used as the callback for connection events */
|
||||
class cCallback
|
||||
{
|
||||
public:
|
||||
virtual ~cCallback() {}
|
||||
|
||||
/** This callback is called whenever a socket connection is accepted */
|
||||
virtual void OnConnectionAccepted(cSocket & a_Socket) = 0;
|
||||
} ;
|
||||
|
||||
cListenThread(cCallback & a_Callback, cSocket::eFamily a_Family, const AString & a_ServiceName = "");
|
||||
~cListenThread();
|
||||
|
||||
/** Creates all the sockets, returns trus if successful, false if not. */
|
||||
bool Initialize(const AString & a_PortsString);
|
||||
|
||||
bool Start(void);
|
||||
|
||||
void Stop(void);
|
||||
|
||||
/** Call before Initialize() to set the "reuse" flag on the sockets */
|
||||
void SetReuseAddr(bool a_Reuse = true);
|
||||
|
||||
protected:
|
||||
typedef std::vector<cSocket> cSockets;
|
||||
|
||||
/** The callback which to notify of incoming connections */
|
||||
cCallback & m_Callback;
|
||||
|
||||
/** Socket address family to use */
|
||||
cSocket::eFamily m_Family;
|
||||
|
||||
/** Sockets that are being monitored */
|
||||
cSockets m_Sockets;
|
||||
|
||||
/** If set to true, the SO_REUSEADDR socket option is set to true */
|
||||
bool m_ShouldReuseAddr;
|
||||
|
||||
/** Name of the service that's listening on the ports; for logging purposes only */
|
||||
AString m_ServiceName;
|
||||
|
||||
|
||||
/** Fills in m_Sockets with individual sockets, each for one port specified in a_PortsString.
|
||||
Returns true if successful and at least one socket has been created
|
||||
*/
|
||||
bool CreateSockets(const AString & a_PortsString);
|
||||
|
||||
// cIsThread override:
|
||||
virtual void Execute(void) override;
|
||||
} ;
|
||||
|
||||
|
||||
|
||||
|
@ -1,702 +0,0 @@
|
||||
|
||||
// cSocketThreads.cpp
|
||||
|
||||
// Implements the cSocketThreads class representing the heart of MCS's client networking.
|
||||
// This object takes care of network communication, groups sockets into threads and uses as little threads as possible for full read / write support
|
||||
// For more detail, see http://forum.mc-server.org/showthread.php?tid=327
|
||||
|
||||
#include "Globals.h"
|
||||
#include "SocketThreads.h"
|
||||
#include "Errors.h"
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
// cSocketThreads:
|
||||
|
||||
cSocketThreads::cSocketThreads(void)
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
cSocketThreads::~cSocketThreads()
|
||||
{
|
||||
for (cSocketThreadList::iterator itr = m_Threads.begin(); itr != m_Threads.end(); ++itr)
|
||||
{
|
||||
delete *itr;
|
||||
} // for itr - m_Threads[]
|
||||
m_Threads.clear();
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
bool cSocketThreads::AddClient(const cSocket & a_Socket, cCallback * a_Client)
|
||||
{
|
||||
// Add a (socket, client) pair for processing, data from a_Socket is to be sent to a_Client
|
||||
|
||||
// Try to add to existing threads:
|
||||
cCSLock Lock(m_CS);
|
||||
for (cSocketThreadList::iterator itr = m_Threads.begin(); itr != m_Threads.end(); ++itr)
|
||||
{
|
||||
if ((*itr)->IsValid() && (*itr)->HasEmptySlot())
|
||||
{
|
||||
(*itr)->AddClient(a_Socket, a_Client);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
// No thread has free space, create a new one:
|
||||
LOGD("Creating a new cSocketThread (currently have " SIZE_T_FMT ")", m_Threads.size());
|
||||
cSocketThread * Thread = new cSocketThread(this);
|
||||
if (!Thread->Start())
|
||||
{
|
||||
// There was an error launching the thread (but it was already logged along with the reason)
|
||||
LOGERROR("A new cSocketThread failed to start");
|
||||
delete Thread;
|
||||
Thread = nullptr;
|
||||
return false;
|
||||
}
|
||||
Thread->AddClient(a_Socket, a_Client);
|
||||
m_Threads.push_back(Thread);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
void cSocketThreads::RemoveClient(const cCallback * a_Client)
|
||||
{
|
||||
// Remove the associated socket and the client from processing
|
||||
|
||||
cCSLock Lock(m_CS);
|
||||
for (cSocketThreadList::iterator itr = m_Threads.begin(); itr != m_Threads.end(); ++itr)
|
||||
{
|
||||
if ((*itr)->RemoveClient(a_Client))
|
||||
{
|
||||
return;
|
||||
}
|
||||
} // for itr - m_Threads[]
|
||||
|
||||
// This client wasn't found.
|
||||
// It's not an error, because it may have been removed by a different thread in the meantime.
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
void cSocketThreads::NotifyWrite(const cCallback * a_Client)
|
||||
{
|
||||
// Notifies the thread responsible for a_Client that the client has something to write
|
||||
|
||||
cCSLock Lock(m_CS);
|
||||
for (cSocketThreadList::iterator itr = m_Threads.begin(); itr != m_Threads.end(); ++itr)
|
||||
{
|
||||
if ((*itr)->NotifyWrite(a_Client))
|
||||
{
|
||||
return;
|
||||
}
|
||||
} // for itr - m_Threads[]
|
||||
|
||||
// Cannot assert - this normally happens if a client disconnects and has pending packets, the cServer::cNotifyWriteThread will call this on invalid clients too
|
||||
// ASSERT(!"Notifying write to an unknown client");
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
void cSocketThreads::Write(const cCallback * a_Client, const AString & a_Data)
|
||||
{
|
||||
// Puts a_Data into outgoing data queue for a_Client
|
||||
cCSLock Lock(m_CS);
|
||||
for (cSocketThreadList::iterator itr = m_Threads.begin(); itr != m_Threads.end(); ++itr)
|
||||
{
|
||||
if ((*itr)->Write(a_Client, a_Data))
|
||||
{
|
||||
return;
|
||||
}
|
||||
} // for itr - m_Threads[]
|
||||
|
||||
// This may be perfectly legal, if the socket has been destroyed and the client is finishing up
|
||||
// ASSERT(!"Writing to an unknown socket");
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
// cSocketThreads::cSocketThread:
|
||||
|
||||
cSocketThreads::cSocketThread::cSocketThread(cSocketThreads * a_Parent) :
|
||||
cIsThread("cSocketThread"),
|
||||
m_Parent(a_Parent),
|
||||
m_NumSlots(0)
|
||||
{
|
||||
// Nothing needed yet
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
cSocketThreads::cSocketThread::~cSocketThread()
|
||||
{
|
||||
m_ShouldTerminate = true;
|
||||
|
||||
// Notify the thread:
|
||||
ASSERT(m_ControlSocket2.IsValid());
|
||||
m_ControlSocket2.Send("a", 1);
|
||||
|
||||
// Wait for the thread to finish:
|
||||
Wait();
|
||||
|
||||
// Close the control sockets:
|
||||
m_ControlSocket1.CloseSocket();
|
||||
m_ControlSocket2.CloseSocket();
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
void cSocketThreads::cSocketThread::AddClient(const cSocket & a_Socket, cCallback * a_Client)
|
||||
{
|
||||
ASSERT(m_Parent->m_CS.IsLockedByCurrentThread());
|
||||
ASSERT(m_NumSlots < MAX_SLOTS); // Use HasEmptySlot() to check before adding
|
||||
|
||||
m_Slots[m_NumSlots].m_Client = a_Client;
|
||||
m_Slots[m_NumSlots].m_Socket = a_Socket;
|
||||
m_Slots[m_NumSlots].m_Socket.SetNonBlocking();
|
||||
m_Slots[m_NumSlots].m_Outgoing.clear();
|
||||
m_Slots[m_NumSlots].m_State = sSlot::ssNormal;
|
||||
m_NumSlots++;
|
||||
|
||||
// Notify the thread of the change:
|
||||
ASSERT(m_ControlSocket2.IsValid());
|
||||
m_ControlSocket2.Send("a", 1);
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
bool cSocketThreads::cSocketThread::RemoveClient(const cCallback * a_Client)
|
||||
{
|
||||
ASSERT(m_Parent->m_CS.IsLockedByCurrentThread());
|
||||
|
||||
if (m_NumSlots == 0)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
for (int i = m_NumSlots - 1; i >= 0 ; --i)
|
||||
{
|
||||
if (m_Slots[i].m_Client != a_Client)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
// Found the slot:
|
||||
if (m_Slots[i].m_State == sSlot::ssRemoteClosed)
|
||||
{
|
||||
// The remote has already closed the socket, remove the slot altogether:
|
||||
if (m_Slots[i].m_Socket.IsValid())
|
||||
{
|
||||
m_Slots[i].m_Socket.CloseSocket();
|
||||
}
|
||||
m_Slots[i] = m_Slots[--m_NumSlots];
|
||||
}
|
||||
else
|
||||
{
|
||||
// Query and queue the last batch of outgoing data:
|
||||
AString Data;
|
||||
m_Slots[i].m_Client->GetOutgoingData(Data);
|
||||
m_Slots[i].m_Outgoing.append(Data);
|
||||
if (m_Slots[i].m_Outgoing.empty())
|
||||
{
|
||||
// No more outgoing data, shut the socket down immediately:
|
||||
m_Slots[i].m_Socket.ShutdownReadWrite();
|
||||
m_Slots[i].m_State = sSlot::ssShuttingDown;
|
||||
}
|
||||
else
|
||||
{
|
||||
// More data to send, shut down reading and wait for the rest to get sent:
|
||||
m_Slots[i].m_State = sSlot::ssWritingRestOut;
|
||||
}
|
||||
m_Slots[i].m_Client = nullptr;
|
||||
}
|
||||
|
||||
// Notify the thread of the change:
|
||||
ASSERT(m_ControlSocket2.IsValid());
|
||||
m_ControlSocket2.Send("r", 1);
|
||||
return true;
|
||||
} // for i - m_Slots[]
|
||||
|
||||
// Not found
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
bool cSocketThreads::cSocketThread::HasClient(const cCallback * a_Client) const
|
||||
{
|
||||
ASSERT(m_Parent->m_CS.IsLockedByCurrentThread());
|
||||
|
||||
for (int i = m_NumSlots - 1; i >= 0; --i)
|
||||
{
|
||||
if (m_Slots[i].m_Client == a_Client)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
} // for i - m_Slots[]
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
bool cSocketThreads::cSocketThread::HasSocket(const cSocket * a_Socket) const
|
||||
{
|
||||
for (int i = m_NumSlots - 1; i >= 0; --i)
|
||||
{
|
||||
if (m_Slots[i].m_Socket == *a_Socket)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
} // for i - m_Slots[]
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
bool cSocketThreads::cSocketThread::NotifyWrite(const cCallback * a_Client)
|
||||
{
|
||||
ASSERT(m_Parent->m_CS.IsLockedByCurrentThread());
|
||||
|
||||
if (HasClient(a_Client))
|
||||
{
|
||||
// Notify the thread that there's another packet in the queue:
|
||||
ASSERT(m_ControlSocket2.IsValid());
|
||||
m_ControlSocket2.Send("q", 1);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
bool cSocketThreads::cSocketThread::Write(const cCallback * a_Client, const AString & a_Data)
|
||||
{
|
||||
ASSERT(m_Parent->m_CS.IsLockedByCurrentThread());
|
||||
for (int i = m_NumSlots - 1; i >= 0; --i)
|
||||
{
|
||||
if (m_Slots[i].m_Client == a_Client)
|
||||
{
|
||||
m_Slots[i].m_Outgoing.append(a_Data);
|
||||
|
||||
// Notify the thread that there's data in the queue:
|
||||
ASSERT(m_ControlSocket2.IsValid());
|
||||
m_ControlSocket2.Send("q", 1);
|
||||
|
||||
return true;
|
||||
}
|
||||
} // for i - m_Slots[]
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
bool cSocketThreads::cSocketThread::Start(void)
|
||||
{
|
||||
// Create the control socket listener
|
||||
m_ControlSocket2 = cSocket::CreateSocket(cSocket::IPv4);
|
||||
if (!m_ControlSocket2.IsValid())
|
||||
{
|
||||
LOGERROR("Cannot create a Control socket for a cSocketThread (\"%s\"); continuing, but server may be unreachable from now on.", cSocket::GetLastErrorString().c_str());
|
||||
return false;
|
||||
}
|
||||
if (!m_ControlSocket2.BindToLocalhostIPv4(cSocket::ANY_PORT))
|
||||
{
|
||||
LOGERROR("Cannot bind a Control socket for a cSocketThread (\"%s\"); continuing, but server may be unreachable from now on.", cSocket::GetLastErrorString().c_str());
|
||||
m_ControlSocket2.CloseSocket();
|
||||
return false;
|
||||
}
|
||||
if (!m_ControlSocket2.Listen(1))
|
||||
{
|
||||
LOGERROR("Cannot listen on a Control socket for a cSocketThread (\"%s\"); continuing, but server may be unreachable from now on.", cSocket::GetLastErrorString().c_str());
|
||||
m_ControlSocket2.CloseSocket();
|
||||
return false;
|
||||
}
|
||||
if (m_ControlSocket2.GetPort() == 0)
|
||||
{
|
||||
LOGERROR("Cannot determine Control socket port (\"%s\"); conitnuing, but the server may be unreachable from now on.", cSocket::GetLastErrorString().c_str());
|
||||
m_ControlSocket2.CloseSocket();
|
||||
return false;
|
||||
}
|
||||
|
||||
// Start the thread
|
||||
if (!super::Start())
|
||||
{
|
||||
LOGERROR("Cannot start new cSocketThread");
|
||||
m_ControlSocket2.CloseSocket();
|
||||
return false;
|
||||
}
|
||||
|
||||
// Finish connecting the control socket by accepting connection from the thread's socket
|
||||
cSocket tmp = m_ControlSocket2.AcceptIPv4();
|
||||
if (!tmp.IsValid())
|
||||
{
|
||||
LOGERROR("Cannot link Control sockets for a cSocketThread (\"%s\"); continuing, but server may be unreachable from now on.", cSocket::GetLastErrorString().c_str());
|
||||
m_ControlSocket2.CloseSocket();
|
||||
return false;
|
||||
}
|
||||
m_ControlSocket2.CloseSocket();
|
||||
m_ControlSocket2 = tmp;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
void cSocketThreads::cSocketThread::Execute(void)
|
||||
{
|
||||
// Connect the "client" part of the Control socket:
|
||||
m_ControlSocket1 = cSocket::CreateSocket(cSocket::IPv4);
|
||||
ASSERT(m_ControlSocket2.GetPort() != 0); // We checked in the Start() method, but let's be sure
|
||||
if (!m_ControlSocket1.ConnectToLocalhostIPv4(m_ControlSocket2.GetPort()))
|
||||
{
|
||||
LOGERROR("Cannot connect Control sockets for a cSocketThread (\"%s\"); continuing, but the server may be unreachable from now on.", cSocket::GetLastErrorString().c_str());
|
||||
m_ControlSocket2.CloseSocket();
|
||||
return;
|
||||
}
|
||||
|
||||
// The main thread loop:
|
||||
while (!m_ShouldTerminate)
|
||||
{
|
||||
// Read outgoing data from the clients:
|
||||
QueueOutgoingData();
|
||||
|
||||
// Put sockets into the sets
|
||||
fd_set fdRead;
|
||||
fd_set fdWrite;
|
||||
cSocket::xSocket Highest = m_ControlSocket1.GetSocket();
|
||||
PrepareSets(&fdRead, &fdWrite, Highest);
|
||||
|
||||
// Wait for the sockets:
|
||||
timeval Timeout;
|
||||
Timeout.tv_sec = 5;
|
||||
Timeout.tv_usec = 0;
|
||||
if (select((int)Highest + 1, &fdRead, &fdWrite, nullptr, &Timeout) == -1)
|
||||
{
|
||||
LOG("select() call failed in cSocketThread: \"%s\"", cSocket::GetLastErrorString().c_str());
|
||||
continue;
|
||||
}
|
||||
|
||||
// Perform the IO:
|
||||
ReadFromSockets(&fdRead);
|
||||
WriteToSockets(&fdWrite);
|
||||
CleanUpShutSockets();
|
||||
} // while (!mShouldTerminate)
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
void cSocketThreads::cSocketThread::PrepareSets(fd_set * a_Read, fd_set * a_Write, cSocket::xSocket & a_Highest)
|
||||
{
|
||||
FD_ZERO(a_Read);
|
||||
FD_ZERO(a_Write);
|
||||
FD_SET(m_ControlSocket1.GetSocket(), a_Read);
|
||||
|
||||
cCSLock Lock(m_Parent->m_CS);
|
||||
for (int i = m_NumSlots - 1; i >= 0; --i)
|
||||
{
|
||||
if (!m_Slots[i].m_Socket.IsValid())
|
||||
{
|
||||
continue;
|
||||
}
|
||||
if (m_Slots[i].m_State == sSlot::ssRemoteClosed)
|
||||
{
|
||||
// This socket won't provide nor consume any data anymore, don't put it in the Set
|
||||
continue;
|
||||
}
|
||||
cSocket::xSocket s = m_Slots[i].m_Socket.GetSocket();
|
||||
FD_SET(s, a_Read);
|
||||
if (s > a_Highest)
|
||||
{
|
||||
a_Highest = s;
|
||||
}
|
||||
if (!m_Slots[i].m_Outgoing.empty())
|
||||
{
|
||||
// There's outgoing data for the socket, put it in the Write set
|
||||
FD_SET(s, a_Write);
|
||||
}
|
||||
} // for i - m_Slots[]
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
void cSocketThreads::cSocketThread::ReadFromSockets(fd_set * a_Read)
|
||||
{
|
||||
// Read on available sockets:
|
||||
|
||||
// Reset Control socket state:
|
||||
if (FD_ISSET(m_ControlSocket1.GetSocket(), a_Read))
|
||||
{
|
||||
char Dummy[128];
|
||||
m_ControlSocket1.Receive(Dummy, sizeof(Dummy), 0);
|
||||
}
|
||||
|
||||
// Read from clients:
|
||||
cCSLock Lock(m_Parent->m_CS);
|
||||
for (int i = m_NumSlots - 1; i >= 0; --i)
|
||||
{
|
||||
cSocket::xSocket Socket = m_Slots[i].m_Socket.GetSocket();
|
||||
if (!cSocket::IsValidSocket(Socket) || !FD_ISSET(Socket, a_Read))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
char Buffer[1024];
|
||||
int Received = m_Slots[i].m_Socket.Receive(Buffer, ARRAYCOUNT(Buffer), 0);
|
||||
if (Received <= 0)
|
||||
{
|
||||
if (cSocket::GetLastError() != cSocket::ErrWouldBlock)
|
||||
{
|
||||
// The socket has been closed by the remote party
|
||||
switch (m_Slots[i].m_State)
|
||||
{
|
||||
case sSlot::ssNormal:
|
||||
{
|
||||
// Close the socket on our side:
|
||||
m_Slots[i].m_State = sSlot::ssRemoteClosed;
|
||||
m_Slots[i].m_Socket.CloseSocket();
|
||||
|
||||
// Notify the callback that the remote has closed the socket, *after* removing the socket:
|
||||
cCallback * client = m_Slots[i].m_Client;
|
||||
m_Slots[i] = m_Slots[--m_NumSlots];
|
||||
if (client != nullptr)
|
||||
{
|
||||
client->SocketClosed();
|
||||
}
|
||||
break;
|
||||
}
|
||||
case sSlot::ssWritingRestOut:
|
||||
case sSlot::ssShuttingDown:
|
||||
case sSlot::ssShuttingDown2:
|
||||
{
|
||||
// Force-close the socket and remove the slot:
|
||||
m_Slots[i].m_Socket.CloseSocket();
|
||||
m_Slots[i] = m_Slots[--m_NumSlots];
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
LOG("%s: Unexpected socket state: %d (%s)",
|
||||
__FUNCTION__, m_Slots[i].m_Socket.GetSocket(), m_Slots[i].m_Socket.GetIPString().c_str()
|
||||
);
|
||||
ASSERT(!"Unexpected socket state");
|
||||
break;
|
||||
}
|
||||
} // switch (m_Slots[i].m_State)
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (m_Slots[i].m_Client != nullptr)
|
||||
{
|
||||
m_Slots[i].m_Client->DataReceived(Buffer, Received);
|
||||
}
|
||||
}
|
||||
} // for i - m_Slots[]
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write)
|
||||
{
|
||||
// Write to available client sockets:
|
||||
cCSLock Lock(m_Parent->m_CS);
|
||||
for (int i = m_NumSlots - 1; i >= 0; --i)
|
||||
{
|
||||
cSocket::xSocket Socket = m_Slots[i].m_Socket.GetSocket();
|
||||
if (!cSocket::IsValidSocket(Socket) || !FD_ISSET(Socket, a_Write))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
if (m_Slots[i].m_Outgoing.empty())
|
||||
{
|
||||
// Request another chunk of outgoing data:
|
||||
if (m_Slots[i].m_Client != nullptr)
|
||||
{
|
||||
AString Data;
|
||||
m_Slots[i].m_Client->GetOutgoingData(Data);
|
||||
m_Slots[i].m_Outgoing.append(Data);
|
||||
}
|
||||
if (m_Slots[i].m_Outgoing.empty())
|
||||
{
|
||||
// No outgoing data is ready
|
||||
if (m_Slots[i].m_State == sSlot::ssWritingRestOut)
|
||||
{
|
||||
m_Slots[i].m_State = sSlot::ssShuttingDown;
|
||||
m_Slots[i].m_Socket.ShutdownReadWrite();
|
||||
}
|
||||
continue;
|
||||
}
|
||||
} // if (outgoing data is empty)
|
||||
|
||||
if (m_Slots[i].m_State == sSlot::ssRemoteClosed)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!SendDataThroughSocket(m_Slots[i].m_Socket, m_Slots[i].m_Outgoing))
|
||||
{
|
||||
int Err = cSocket::GetLastError();
|
||||
LOGWARNING("Error %d while writing to client \"%s\", disconnecting. \"%s\"", Err, m_Slots[i].m_Socket.GetIPString().c_str(), GetOSErrorString(Err).c_str());
|
||||
m_Slots[i].m_Socket.CloseSocket();
|
||||
if (m_Slots[i].m_Client != nullptr)
|
||||
{
|
||||
m_Slots[i].m_Client->SocketClosed();
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (m_Slots[i].m_Outgoing.empty() && (m_Slots[i].m_State == sSlot::ssWritingRestOut))
|
||||
{
|
||||
m_Slots[i].m_State = sSlot::ssShuttingDown;
|
||||
m_Slots[i].m_Socket.ShutdownReadWrite();
|
||||
}
|
||||
|
||||
// _X: If there's data left, it means the client is not reading fast enough, the server would unnecessarily spin in the main loop with zero actions taken; so signalling is disabled
|
||||
// This means that if there's data left, it will be sent only when there's incoming data or someone queues another packet (for any socket handled by this thread)
|
||||
/*
|
||||
// If there's any data left, signalize the Control socket:
|
||||
if (!m_Slots[i].m_Outgoing.empty())
|
||||
{
|
||||
ASSERT(m_ControlSocket2.IsValid());
|
||||
m_ControlSocket2.Send("q", 1);
|
||||
}
|
||||
*/
|
||||
} // for i - m_Slots[i]
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
bool cSocketThreads::cSocketThread::SendDataThroughSocket(cSocket & a_Socket, AString & a_Data)
|
||||
{
|
||||
// Send data in smaller chunks, so that the OS send buffers aren't overflown easily
|
||||
while (!a_Data.empty())
|
||||
{
|
||||
size_t NumToSend = std::min(a_Data.size(), (size_t)1024);
|
||||
int Sent = a_Socket.Send(a_Data.data(), NumToSend);
|
||||
if (Sent < 0)
|
||||
{
|
||||
int Err = cSocket::GetLastError();
|
||||
if (Err == cSocket::ErrWouldBlock)
|
||||
{
|
||||
// The OS send buffer is full, leave the outgoing data for the next time
|
||||
return true;
|
||||
}
|
||||
// An error has occured
|
||||
return false;
|
||||
}
|
||||
if (Sent == 0)
|
||||
{
|
||||
a_Socket.CloseSocket();
|
||||
return true;
|
||||
}
|
||||
a_Data.erase(0, Sent);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
void cSocketThreads::cSocketThread::CleanUpShutSockets(void)
|
||||
{
|
||||
cCSLock Lock(m_Parent->m_CS);
|
||||
for (int i = m_NumSlots - 1; i >= 0; i--)
|
||||
{
|
||||
switch (m_Slots[i].m_State)
|
||||
{
|
||||
case sSlot::ssShuttingDown2:
|
||||
{
|
||||
// The socket has reached the shutdown timeout, close it and clear its slot:
|
||||
m_Slots[i].m_Socket.CloseSocket();
|
||||
m_Slots[i] = m_Slots[--m_NumSlots];
|
||||
break;
|
||||
}
|
||||
case sSlot::ssShuttingDown:
|
||||
{
|
||||
// The socket has been shut down for a single thread loop, let it loop once more before closing:
|
||||
m_Slots[i].m_State = sSlot::ssShuttingDown2;
|
||||
break;
|
||||
}
|
||||
default: break;
|
||||
}
|
||||
} // for i - m_Slots[]
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
void cSocketThreads::cSocketThread::QueueOutgoingData(void)
|
||||
{
|
||||
cCSLock Lock(m_Parent->m_CS);
|
||||
for (int i = 0; i < m_NumSlots; i++)
|
||||
{
|
||||
if (m_Slots[i].m_Client != nullptr)
|
||||
{
|
||||
AString Data;
|
||||
m_Slots[i].m_Client->GetOutgoingData(Data);
|
||||
m_Slots[i].m_Outgoing.append(Data);
|
||||
}
|
||||
if (m_Slots[i].m_Outgoing.empty())
|
||||
{
|
||||
// No outgoing data is ready
|
||||
if (m_Slots[i].m_State == sSlot::ssWritingRestOut)
|
||||
{
|
||||
// The socket doesn't want to be kept alive anymore, and doesn't have any remaining data to send.
|
||||
// Shut it down and then close it after a timeout, or when the other side agrees
|
||||
m_Slots[i].m_State = sSlot::ssShuttingDown;
|
||||
m_Slots[i].m_Socket.ShutdownReadWrite();
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
@ -1,194 +0,0 @@
|
||||
|
||||
// SocketThreads.h
|
||||
|
||||
// Interfaces to the cSocketThreads class representing the heart of MCS's client networking.
|
||||
// This object takes care of network communication, groups sockets into threads and uses as little threads as possible for full read / write support
|
||||
// For more detail, see http://forum.mc-server.org/showthread.php?tid=327
|
||||
|
||||
/*
|
||||
Additional details:
|
||||
When a client wants to terminate the connection, they call the RemoveClient() function. This calls the
|
||||
callback one last time to read all the available outgoing data, putting it in the slot's m_OutgoingData
|
||||
buffer. Then it marks the slot as having no callback. The socket is kept alive until its outgoing data
|
||||
queue is empty, then shutdown is called on it and finally the socket is closed after a timeout.
|
||||
If at any time within this the remote end closes the socket, then the socket is closed directly.
|
||||
As soon as the socket is closed, the slot is finally removed from the SocketThread.
|
||||
The graph in $/docs/SocketThreads States.gv shows the state-machine transitions of the slot.
|
||||
*/
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
/** How many clients should one thread handle? (must be less than FD_SETSIZE for your platform) */
|
||||
#define MAX_SLOTS 63
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "Socket.h"
|
||||
#include "IsThread.h"
|
||||
|
||||
|
||||
|
||||
|
||||
// Check MAX_SLOTS:
|
||||
#if MAX_SLOTS >= FD_SETSIZE
|
||||
#error "MAX_SLOTS must be less than FD_SETSIZE for your platform! (otherwise select() won't work)"
|
||||
#endif
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
// fwd:
|
||||
class cSocket;
|
||||
class cClientHandle;
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
class cSocketThreads
|
||||
{
|
||||
public:
|
||||
|
||||
// Clients of cSocketThreads must implement this interface to be able to communicate
|
||||
class cCallback
|
||||
{
|
||||
public:
|
||||
// Force a virtual destructor in all subclasses:
|
||||
virtual ~cCallback() {}
|
||||
|
||||
/** Called when data is received from the remote party.
|
||||
SocketThreads does not care about the return value, others can use it for their specific purpose -
|
||||
for example HTTPServer uses it to signal if the connection was terminated as a result of the data received. */
|
||||
virtual bool DataReceived(const char * a_Data, size_t a_Size) = 0;
|
||||
|
||||
/** Called when data can be sent to remote party
|
||||
The function is supposed to *set* outgoing data to a_Data (overwrite) */
|
||||
virtual void GetOutgoingData(AString & a_Data) = 0;
|
||||
|
||||
/** Called when the socket has been closed for any reason */
|
||||
virtual void SocketClosed(void) = 0;
|
||||
} ;
|
||||
|
||||
|
||||
cSocketThreads(void);
|
||||
~cSocketThreads();
|
||||
|
||||
/** Add a (socket, client) pair for processing, data from a_Socket is to be sent to a_Client; returns true if successful */
|
||||
bool AddClient(const cSocket & a_Socket, cCallback * a_Client);
|
||||
|
||||
/** Remove the associated socket and the client from processing.
|
||||
The socket is left to send its last outgoing data and is removed only after all its m_Outgoing is sent
|
||||
and after the socket is properly shutdown (unless the remote disconnects before that)
|
||||
*/
|
||||
void RemoveClient(const cCallback * a_Client);
|
||||
|
||||
/** Notify the thread responsible for a_Client that the client has something to write */
|
||||
void NotifyWrite(const cCallback * a_Client);
|
||||
|
||||
/** Puts a_Data into outgoing data queue for a_Client */
|
||||
void Write(const cCallback * a_Client, const AString & a_Data);
|
||||
|
||||
private:
|
||||
|
||||
class cSocketThread :
|
||||
public cIsThread
|
||||
{
|
||||
typedef cIsThread super;
|
||||
|
||||
public:
|
||||
|
||||
cSocketThread(cSocketThreads * a_Parent);
|
||||
virtual ~cSocketThread();
|
||||
|
||||
// All these methods assume parent's m_CS is locked
|
||||
bool HasEmptySlot(void) const {return m_NumSlots < MAX_SLOTS; }
|
||||
bool IsEmpty (void) const {return m_NumSlots == 0; }
|
||||
|
||||
void AddClient (const cSocket & a_Socket, cCallback * a_Client); // Takes ownership of the socket
|
||||
bool RemoveClient(const cCallback * a_Client); // Returns true if removed, false if not found
|
||||
bool HasClient (const cCallback * a_Client) const;
|
||||
bool HasSocket (const cSocket * a_Socket) const;
|
||||
bool NotifyWrite (const cCallback * a_Client); // Returns true if client handled by this thread
|
||||
bool Write (const cCallback * a_Client, const AString & a_Data); // Returns true if client handled by this thread
|
||||
|
||||
bool Start(void); // Hide the cIsThread's Start method, we need to provide our own startup to create the control socket
|
||||
|
||||
bool IsValid(void) const {return m_ControlSocket2.IsValid(); } // If the Control socket dies, the thread is not valid anymore
|
||||
|
||||
private:
|
||||
|
||||
cSocketThreads * m_Parent;
|
||||
|
||||
// Two ends of the control socket, the first is select()-ed, the second is written to for notifications
|
||||
cSocket m_ControlSocket1;
|
||||
cSocket m_ControlSocket2;
|
||||
|
||||
// Socket-client-dataqueues-state quadruplets.
|
||||
// Manipulation with these assumes that the parent's m_CS is locked
|
||||
struct sSlot
|
||||
{
|
||||
/** The socket is primarily owned by this object */
|
||||
cSocket m_Socket;
|
||||
|
||||
/** The callback to call for events. May be nullptr */
|
||||
cCallback * m_Client;
|
||||
|
||||
/** If sending writes only partial data, the rest is stored here for another send.
|
||||
Also used when the slot is being removed to store the last batch of outgoing data. */
|
||||
AString m_Outgoing;
|
||||
|
||||
enum eState
|
||||
{
|
||||
ssNormal, ///< Normal read / write operations
|
||||
ssWritingRestOut, ///< The client callback was removed, continue to send outgoing data
|
||||
ssShuttingDown, ///< The last outgoing data has been sent, the socket has called shutdown()
|
||||
ssShuttingDown2, ///< The shutdown has been done at least 1 thread loop ago (timeout detection)
|
||||
ssRemoteClosed, ///< The remote end has closed the connection (and we still have a client callback)
|
||||
} m_State;
|
||||
} ;
|
||||
|
||||
sSlot m_Slots[MAX_SLOTS];
|
||||
int m_NumSlots; // Number of slots actually used
|
||||
|
||||
virtual void Execute(void) override;
|
||||
|
||||
/** Prepares the Read and Write socket sets for select()
|
||||
Puts all sockets into the read set, along with m_ControlSocket1.
|
||||
Only sockets that have outgoing data queued on them are put in the write set.*/
|
||||
void PrepareSets(fd_set * a_ReadSet, fd_set * a_WriteSet, cSocket::xSocket & a_Highest);
|
||||
|
||||
/** Reads from sockets indicated in a_Read */
|
||||
void ReadFromSockets(fd_set * a_Read);
|
||||
|
||||
/** Writes to sockets indicated in a_Write */
|
||||
void WriteToSockets (fd_set * a_Write);
|
||||
|
||||
/** Sends data through the specified socket, trying to fill the OS send buffer in chunks.
|
||||
Returns true if there was no error while sending, false if an error has occured.
|
||||
Modifies a_Data to contain only the unsent data. */
|
||||
bool SendDataThroughSocket(cSocket & a_Socket, AString & a_Data);
|
||||
|
||||
/** Removes those slots in ssShuttingDown2 state, sets those with ssShuttingDown state to ssShuttingDown2 */
|
||||
void CleanUpShutSockets(void);
|
||||
|
||||
/** Calls each client's callback to retrieve outgoing data for that client. */
|
||||
void QueueOutgoingData(void);
|
||||
} ;
|
||||
|
||||
typedef std::list<cSocketThread *> cSocketThreadList;
|
||||
|
||||
|
||||
cCriticalSection m_CS;
|
||||
cSocketThreadList m_Threads;
|
||||
} ;
|
||||
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user