1
0

cSocketThreads plugged in for cClientHandle reading. Sending still kept the old way. Please help me test this commit thoroughly, this is a change that can break on subtleties.

git-svn-id: http://mc-server.googlecode.com/svn/trunk@244 0a769ca7-a7f5-676a-18bf-c427514a06d6
This commit is contained in:
madmaxoft@gmail.com 2012-02-08 10:02:46 +00:00
parent dcd82b6988
commit c82c636d8c
8 changed files with 267 additions and 233 deletions

View File

@ -91,7 +91,6 @@ extern std::string GetWSAError();
cClientHandle::cClientHandle(const cSocket & a_Socket)
: m_ProtocolVersion(23)
, m_pReceiveThread(NULL)
, m_pSendThread(NULL)
, m_Socket(a_Socket)
, m_Semaphore(MAX_SEMAPHORES)
@ -105,8 +104,6 @@ cClientHandle::cClientHandle(const cSocket & a_Socket)
, m_Ping(1000)
, m_bPositionConfirmed(false)
{
LOG("cClientHandle::cClientHandle");
cTimer t1;
m_LastPingTime = t1.GetNowTime();
@ -143,13 +140,11 @@ cClientHandle::cClientHandle(const cSocket & a_Socket)
memset(m_LoadedChunks, 0x00, sizeof(m_LoadedChunks));
//////////////////////////////////////////////////////////////////////////
m_pReceiveThread = new cThread(ReceiveThread, this, "cClientHandle::ReceiveThread");
m_pSendThread = new cThread(SendThread, this, "cClientHandle::SendThread");
m_pReceiveThread->Start(true);
m_pSendThread->Start (true);
//////////////////////////////////////////////////////////////////////////
LOG("New ClientHandle");
LOG("New ClientHandle created at %p", this);
}
@ -158,7 +153,7 @@ cClientHandle::cClientHandle(const cSocket & a_Socket)
cClientHandle::~cClientHandle()
{
LOG("Deleting client %s", GetUsername().c_str());
LOG("Deleting client \"%s\"", GetUsername().c_str());
for(unsigned int i = 0; i < VIEWDISTANCE*VIEWDISTANCE; i++)
{
@ -185,7 +180,6 @@ cClientHandle::~cClientHandle()
// First stop sending thread
m_bKeepThreadGoing = false;
cCSLock Lock(m_SocketCriticalSection);
if (m_Socket.IsValid())
{
cPacket_Disconnect Disconnect;
@ -193,17 +187,10 @@ cClientHandle::~cClientHandle()
m_Socket.Send(&Disconnect);
m_Socket.CloseSocket();
}
Lock.Unlock();
m_Semaphore.Signal();
delete m_pReceiveThread;
delete m_pSendThread;
while (!m_PendingParsePackets.empty())
{
delete *m_PendingParsePackets.begin();
m_PendingParsePackets.erase(m_PendingParsePackets.begin());
}
while (!m_PendingNrmSendPackets.empty())
{
delete *m_PendingNrmSendPackets.begin();
@ -224,6 +211,8 @@ cClientHandle::~cClientHandle()
{
delete m_PacketMap[i];
}
LOG("ClientHandle at %p destroyed", this);
}
@ -233,11 +222,13 @@ cClientHandle::~cClientHandle()
void cClientHandle::Destroy()
{
m_bDestroyed = true;
cCSLock Lock(m_SocketCriticalSection);
if (m_Socket.IsValid())
{
m_Socket.CloseSocket();
}
// Synchronize with the cSocketThreads (so that they don't call us anymore)
cRoot::Get()->GetServer()->ClientDestroying(this);
}
@ -419,31 +410,6 @@ void cClientHandle::RemoveFromAllChunks()
void cClientHandle::AddPacket(cPacket * a_Packet)
{
cCSLock Lock(m_CriticalSection);
m_PendingParsePackets.push_back(a_Packet->Clone());
}
void cClientHandle::HandlePendingPackets()
{
cCSLock Lock(m_CriticalSection);
for (PacketList::iterator itr = m_PendingParsePackets.begin(); itr != m_PendingParsePackets.end(); ++itr)
{
HandlePacket(*itr);
delete *itr;
}
m_PendingParsePackets.clear();
}
void cClientHandle::HandlePacket(cPacket * a_Packet)
{
m_TimeLastPacket = cWorld::GetTime();
@ -1766,14 +1732,11 @@ void cClientHandle::SendThread(void *lpParam)
}
Lock.Unlock();
cCSLock SocketLock(self->m_SocketCriticalSection);
if (!self->m_Socket.IsValid())
{
break;
}
bool bSuccess = self->m_Socket.Send(Packet);
SocketLock.Unlock();
if (!bSuccess)
{
@ -1799,43 +1762,46 @@ void cClientHandle::SendThread(void *lpParam)
void cClientHandle::ReceiveThread(void *lpParam)
const AString & cClientHandle::GetUsername(void) const
{
LOG("ReceiveThread");
return m_Username;
}
cClientHandle* self = (cClientHandle*)lpParam;
char temp = 0;
int iStat = 0;
cSocket socket = self->GetSocket();
AString Received;
while (self->m_bKeepThreadGoing)
void cClientHandle::DataReceived(const char * a_Data, int a_Size)
{
// Data is received from the client
m_ReceivedData.append(a_Data, a_Size);
// Parse and handle all complete packets in m_ReceivedData:
while (!m_ReceivedData.empty())
{
char Buffer[1024];
iStat = socket.Receive(Buffer, sizeof(Buffer), 0);
if (cSocket::IsSocketError(iStat) || (iStat == 0))
cPacket* pPacket = m_PacketMap[(unsigned char)m_ReceivedData[0]];
if (pPacket == NULL)
{
LOG("CLIENT DISCONNECTED (%i bytes):%s", iStat, cSocket::GetLastErrorString().c_str());
break;
LOGERROR("Unknown packet type 0x%02x from client \"%s\"", (unsigned char)m_ReceivedData[0], m_Username.c_str());
AString Reason;
Printf(Reason, "[C->S] Unknown PacketID: 0x%02x", m_ReceivedData[0]);
cPacket_Disconnect DC(Reason);
m_Socket.Send(&DC);
cSleep::MilliSleep(1000); // Give packet some time to be received
Destroy();
return;
}
Received.append(Buffer, iStat);
// Parse all complete packets in Received:
while (!Received.empty())
{
cPacket* pPacket = self->m_PacketMap[(unsigned char)Received[0]];
if (pPacket)
{
int NumBytes = pPacket->Parse(Received.data() + 1, Received.size() - 1);
int NumBytes = pPacket->Parse(m_ReceivedData.data() + 1, m_ReceivedData.size() - 1);
if (NumBytes == PACKET_ERROR)
{
LOGERROR("Protocol error while parsing packet type 0x%x; disconnecting client \"%s\"", Received[0], self->m_Username.c_str());
LOGERROR("Protocol error while parsing packet type 0x%02x; disconnecting client \"%s\"", (unsigned char)m_ReceivedData[0], m_Username.c_str());
cPacket_Disconnect DC("Protocol error");
socket.Send(&DC);
m_Socket.Send(&DC);
cSleep::MilliSleep(1000); // Give packet some time to be received
Destroy();
return;
}
else if (NumBytes == PACKET_INCOMPLETE)
@ -1846,49 +1812,38 @@ void cClientHandle::ReceiveThread(void *lpParam)
else
{
// Packet parsed successfully, add it to internal queue:
self->AddPacket(pPacket);
HandlePacket(pPacket);
// Erase the packet from the buffer:
assert(Received.size() > (size_t)NumBytes);
Received.erase(0, NumBytes + 1);
}
}
else
{
LOGERROR("Unknown packet type: 0x%2x", Received[0]);
AString Reason;
Printf(Reason, "[C->S] Unknown PacketID: 0x%02x", Received[0]);
cPacket_Disconnect DC(Reason);
socket.Send(&DC);
cSleep::MilliSleep(1000); // Give packet some time to be received
break;
assert(m_ReceivedData.size() > (size_t)NumBytes);
m_ReceivedData.erase(0, NumBytes + 1);
}
} // while (!Received.empty())
} // while (self->m_bKeepThreadGoing)
}
void cClientHandle::GetOutgoingData(AString & a_Data)
{
// Data can be sent to client
// TODO
}
void cClientHandle::SocketClosed(void)
{
// The socket has been closed for any reason
// TODO
/*
self->Destroy();
LOG("ReceiveThread STOPPED");
return;
}
const AString & cClientHandle::GetUsername(void) const
{
return m_Username;
}
const cSocket & cClientHandle::GetSocket()
{
return m_Socket;
LOG("Client \"%s\" disconnected", GetLogName().c_str());
*/
}

View File

@ -13,6 +13,7 @@
#include "packets/cPacket.h"
#include "Vector3d.h"
#include "cSocketThreads.h"
#include "packets/cPacket_KeepAlive.h"
#include "packets/cPacket_PlayerPosition.h"
@ -54,7 +55,8 @@ class cRedstone;
class cClientHandle // tolua_export
class cClientHandle : // tolua_export
public cSocketThreads::cCallback
{ // tolua_export
public:
enum ENUM_PRIORITY
@ -71,15 +73,14 @@ public:
static const int VIEWDISTANCE = 17; // MUST be odd number or CRASH!
static const int GENERATEDISTANCE = 2; // Server generates this many chunks AHEAD of player sight.
const cSocket & GetSocket();
const cSocket & GetSocket(void) const {return m_Socket; }
cSocket & GetSocket(void) {return m_Socket; }
cPlayer* GetPlayer() { return m_Player; } // tolua_export
void Kick(const AString & a_Reason); //tolua_export
void Authenticate(void); // Called by cAuthenticator when the user passes authentication
void AddPacket( cPacket * a_Packet );
void HandlePendingPackets();
void StreamChunks();
void StreamChunksSmart( cChunk** a_Chunks, unsigned int a_NumChunks );
void RemoveFromAllChunks();
@ -97,7 +98,6 @@ public:
void Send( const cPacket & a_Packet, ENUM_PRIORITY a_Priority = E_PRIORITY_NORMAL );
static void SendThread( void *lpParam );
static void ReceiveThread( void *lpParam );
const AString & GetUsername(void) const;
@ -109,18 +109,18 @@ private:
AString m_Username;
AString m_Password;
PacketList m_PendingParsePackets;
AString m_ReceivedData; // Accumulator for the data received from the socket, waiting to be parsed; accessed from the cSocketThreads' thread only!
PacketList m_PendingNrmSendPackets;
PacketList m_PendingLowSendPackets;
cThread * m_pReceiveThread;
cThread * m_pSendThread;
cSocket m_Socket;
cCriticalSection m_CriticalSection;
cCriticalSection m_SendCriticalSection;
cCriticalSection m_SocketCriticalSection;
// cCriticalSection m_SocketCriticalSection;
cSemaphore m_Semaphore;
Vector3d m_ConfirmPosition;
@ -180,6 +180,12 @@ private:
bool CheckBlockInteractionsRate(void);
void SendLoginResponse();
// cSocketThreads::cCallback overrides:
virtual void DataReceived (const char * a_Data, int a_Size) override; // Data is received from the client
virtual void GetOutgoingData(AString & a_Data) override; // Data can be sent to client
virtual void SocketClosed (void) override; // The socket has been closed for any reason
}; // tolua_export

View File

@ -100,46 +100,9 @@ void cServer::ServerListenThread( void *a_Args )
std::string GetWSAError()
void cServer::ClientDestroying(const cClientHandle * a_Client)
{
#ifdef _WIN32
switch( WSAGetLastError() )
{
case WSANOTINITIALISED:
return "WSANOTINITIALISED";
case WSAENETDOWN:
return "WSAENETDOWN";
case WSAEFAULT:
return "WSAEFAULT";
case WSAENOTCONN:
return "WSAENOTCONN";
case WSAEINTR:
return "WSAEINTR";
case WSAEINPROGRESS:
return "WSAEINPROGRESS";
case WSAENETRESET:
return "WSAENETRESET";
case WSAENOTSOCK:
return "WSAENOTSOCK";
case WSAEOPNOTSUPP:
return "WSAEOPNOTSUPP";
case WSAESHUTDOWN:
return "WSAESHUTDOWN";
case WSAEWOULDBLOCK:
return "WSAEWOULDBLOCK";
case WSAEMSGSIZE:
return "WSAEMSGSIZE";
case WSAEINVAL:
return "WSAEINVAL";
case WSAECONNABORTED:
return "WSAECONNABORTED";
case WSAETIMEDOUT:
return "WSAETIMEDOUT";
case WSAECONNRESET:
return "WSAECONNRESET";
}
#endif
return "No Error";
m_SocketThreads.RemoveClient(a_Client);
}
@ -308,9 +271,16 @@ void cServer::StartListenClient()
return;
}
LOG("%s connected!", ClientIP.c_str());
LOG("Client \"%s\" connected!", ClientIP.c_str());
cClientHandle *NewHandle = new cClientHandle( SClient );
cClientHandle *NewHandle = new cClientHandle(SClient);
if (!m_SocketThreads.AddClient(&(NewHandle->GetSocket()), NewHandle))
{
// For some reason SocketThreads have rejected the handle, clean it up
SClient.CloseSocket();
delete NewHandle;
return;
}
m_pState->Clients.push_back( NewHandle ); // TODO - lock list
}
@ -335,11 +305,8 @@ bool cServer::Tick(float a_Dt)
//World->LockClientHandle(); // TODO - Lock client list
for( ClientList::iterator itr = m_pState->Clients.begin(); itr != m_pState->Clients.end();)
{
(*itr)->HandlePendingPackets();
if( (*itr)->IsDestroyed() )
{
cClientHandle* RemoveMe = *itr;
++itr;
m_pState->Clients.remove( RemoveMe );
@ -464,7 +431,7 @@ void cServer::ServerCommand( const char* a_Cmd )
{
if( split[0].compare( "help" ) == 0 )
{
printf("===================ALL COMMANDS====================\n");
printf("================== ALL COMMANDS ===================\n");
printf("help - Shows this message\n");
printf("save-all - Saves all loaded chunks to disk\n");
printf("list - Lists all players currently in server\n");

View File

@ -1,9 +1,30 @@
// cServer.h
// Interfaces to the cServer object representing the network server
#pragma once
#ifndef CSERVER_H_INCLUDED
#define CSERVER_H_INCLUDED
#include "cSocketThreads.h"
class cPlayer;
class cClientHandle;
class cPacket;
class cServer //tolua_export
{ //tolua_export
public: //tolua_export
@ -34,14 +55,21 @@ public: //tolua_export
static void ServerListenThread( void* a_Args );
const AString & GetServerID(void) const;
void ClientDestroying(const cClientHandle * a_Client); // Called by cClientHandle::Destroy(); removes the client from m_SocketThreads
private:
friend class cRoot; // so cRoot can create and destroy cServer
cServer();
~cServer();
struct sServerState;
sServerState* m_pState;
cSocketThreads m_SocketThreads;
// Time since server was started
float m_Millisecondsf;
unsigned int m_Milliseconds;
@ -51,3 +79,13 @@ private:
bool m_bRestarting;
}; //tolua_export
#endif // CSERVER_H_INCLUDED

View File

@ -27,6 +27,7 @@ cSocket::cSocket(xSocket a_Socket)
cSocket::~cSocket()
{
// Do NOT close the socket; this class is an API wrapper, not a RAII!
}
@ -100,6 +101,10 @@ AString cSocket::GetErrorString( int a_ErrNo )
FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, a_ErrNo, 0, buffer, ARRAYCOUNT(buffer), NULL);
Printf(Out, "%d: %s", a_ErrNo, buffer);
if (!Out.empty() && (Out[Out.length() - 1] == '\n'))
{
Out.erase(Out.length() - 2);
}
return Out;
#else // _WIN32
@ -312,7 +317,7 @@ unsigned short cSocket::GetPort(void) const
{
return 0;
}
return Addr.sin_port;
return ntohs(Addr.sin_port);
}

View File

@ -29,6 +29,9 @@ public:
operator const xSocket() const;
xSocket GetSocket() const;
bool operator == (const cSocket & a_Other) {return m_Socket == a_Other.m_Socket; }
void SetSocket( xSocket a_Socket );
int SetReuseAddress();

View File

@ -39,7 +39,7 @@ cSocketThreads::~cSocketThreads()
void cSocketThreads::AddClient(cSocket * a_Socket, cCallback * a_Client)
bool cSocketThreads::AddClient(cSocket * a_Socket, cCallback * a_Client)
{
// Add a (socket, client) pair for processing, data from a_Socket is to be sent to a_Client
@ -47,25 +47,32 @@ void cSocketThreads::AddClient(cSocket * a_Socket, cCallback * a_Client)
cCSLock Lock(m_CS);
for (cSocketThreadList::iterator itr = m_Threads.begin(); itr != m_Threads.end(); ++itr)
{
if ((*itr)->HasEmptySlot())
if ((*itr)->IsValid() && (*itr)->HasEmptySlot())
{
(*itr)->AddClient(a_Socket, a_Client);
return;
return true;
}
}
// No thread has free space, create a new one:
LOG("Creating a new cSocketThread (currently have %d)", m_Threads.size());
cSocketThread * Thread = new cSocketThread(this);
Thread->Start();
if (!Thread->Start())
{
// There was an error launching the thread (but it was already logged along with the reason)
delete Thread;
return false;
}
Thread->AddClient(a_Socket, a_Client);
m_Threads.push_back(Thread);
return true;
}
void cSocketThreads::RemoveClient(cSocket * a_Socket)
void cSocketThreads::RemoveClient(const cSocket * a_Socket)
{
// Remove the socket (and associated client) from processing
@ -83,7 +90,7 @@ void cSocketThreads::RemoveClient(cSocket * a_Socket)
void cSocketThreads::RemoveClient(cCallback * a_Client)
void cSocketThreads::RemoveClient(const cCallback * a_Client)
{
// Remove the associated socket and the client from processing
@ -101,7 +108,7 @@ void cSocketThreads::RemoveClient(cCallback * a_Client)
void cSocketThreads::NotifyWrite(cCallback * a_Client)
void cSocketThreads::NotifyWrite(const cCallback * a_Client)
{
// Notifies the thread responsible for a_Client that the client has something to write
@ -152,7 +159,7 @@ void cSocketThreads::cSocketThread::AddClient(cSocket * a_Socket, cCallback * a_
bool cSocketThreads::cSocketThread::RemoveClient(cCallback * a_Client)
bool cSocketThreads::cSocketThread::RemoveClient(const cCallback * a_Client)
{
// Returns true if removed, false if not found
@ -161,7 +168,7 @@ bool cSocketThreads::cSocketThread::RemoveClient(cCallback * a_Client)
return false;
}
for (int i = m_NumSlots - 1; i > 0 ; --i)
for (int i = m_NumSlots - 1; i >= 0 ; --i)
{
if (m_Slots[i].m_Client != a_Client)
{
@ -186,7 +193,7 @@ bool cSocketThreads::cSocketThread::RemoveClient(cCallback * a_Client)
bool cSocketThreads::cSocketThread::RemoveSocket(cSocket * a_Socket)
bool cSocketThreads::cSocketThread::RemoveSocket(const cSocket * a_Socket)
{
// Returns true if removed, false if not found
@ -195,7 +202,7 @@ bool cSocketThreads::cSocketThread::RemoveSocket(cSocket * a_Socket)
return false;
}
for (int i = m_NumSlots - 1; i > 0 ; --i)
for (int i = m_NumSlots - 1; i >= 0 ; --i)
{
if (m_Slots[i].m_Socket != a_Socket)
{
@ -220,7 +227,7 @@ bool cSocketThreads::cSocketThread::RemoveSocket(cSocket * a_Socket)
bool cSocketThreads::cSocketThread::NotifyWrite(cCallback * a_Client)
bool cSocketThreads::cSocketThread::NotifyWrite(const cCallback * a_Client)
{
if (HasClient(a_Client))
{
@ -236,7 +243,7 @@ bool cSocketThreads::cSocketThread::NotifyWrite(cCallback * a_Client)
bool cSocketThreads::cSocketThread::HasClient(cCallback * a_Client) const
bool cSocketThreads::cSocketThread::HasClient(const cCallback * a_Client) const
{
for (int i = m_NumSlots - 1; i >= 0; --i)
{
@ -252,11 +259,11 @@ bool cSocketThreads::cSocketThread::HasClient(cCallback * a_Client) const
bool cSocketThreads::cSocketThread::HasSocket(cSocket * a_Socket) const
bool cSocketThreads::cSocketThread::HasSocket(const cSocket * a_Socket) const
{
for (int i = m_NumSlots - 1; i >= 0; --i)
{
if (m_Slots[i].m_Socket == a_Socket)
if (m_Slots[i].m_Socket->GetSocket() == a_Socket->GetSocket())
{
return true;
}
@ -271,8 +278,8 @@ bool cSocketThreads::cSocketThread::HasSocket(cSocket * a_Socket) const
bool cSocketThreads::cSocketThread::Start(void)
{
// Create the control socket listener
m_ControlSocket1 = cSocket::CreateSocket();
if (!m_ControlSocket1.IsValid())
m_ControlSocket2 = cSocket::CreateSocket();
if (!m_ControlSocket2.IsValid())
{
LOGERROR("Cannot create a Control socket for a cSocketThread (\"%s\"); continuing, but server may be unreachable from now on.", cSocket::GetLastErrorString().c_str());
return false;
@ -281,36 +288,42 @@ bool cSocketThreads::cSocketThread::Start(void)
Addr.Family = cSocket::ADDRESS_FAMILY_INTERNET;
Addr.Address = cSocket::INTERNET_ADDRESS_LOCALHOST();
Addr.Port = 0; // Any free port is okay
if (m_ControlSocket1.Bind(Addr) != 0)
if (m_ControlSocket2.Bind(Addr) != 0)
{
LOGERROR("Cannot bind a Control socket for a cSocketThread (\"%s\"); continuing, but server may be unreachable from now on.", cSocket::GetLastErrorString().c_str());
m_ControlSocket1.CloseSocket();
m_ControlSocket2.CloseSocket();
return false;
}
if (m_ControlSocket1.GetPort() == 0)
if (m_ControlSocket2.Listen(1) != 0)
{
LOGERROR("Cannot listen on a Control socket for a cSocketThread (\"%s\"); continuing, but server may be unreachable from now on.", cSocket::GetLastErrorString().c_str());
m_ControlSocket2.CloseSocket();
return false;
}
if (m_ControlSocket2.GetPort() == 0)
{
LOGERROR("Cannot determine Control socket port (\"%s\"); conitnuing, but the server may be unreachable from now on.", cSocket::GetLastErrorString().c_str());
m_ControlSocket1.CloseSocket();
m_ControlSocket2.CloseSocket();
return false;
}
// Start the thread
if (!super::Start())
{
m_ControlSocket1.CloseSocket();
m_ControlSocket2.CloseSocket();
return false;
}
// Finish connecting the control socket by accepting connection from the thread's socket
cSocket tmp = m_ControlSocket1.Accept();
cSocket tmp = m_ControlSocket2.Accept();
if (!tmp.IsValid())
{
LOGERROR("Cannot link Control sockets for a cSocketThread (\"%s\"); continuing, but server may be unreachable from now on.", cSocket::GetLastErrorString().c_str());
m_ControlSocket1.CloseSocket();
m_ControlSocket2.CloseSocket();
return false;
}
m_ControlSocket1.CloseSocket();
m_ControlSocket1 = tmp;
m_ControlSocket2.CloseSocket();
m_ControlSocket2 = tmp;
return true;
}
@ -322,15 +335,16 @@ bool cSocketThreads::cSocketThread::Start(void)
void cSocketThreads::cSocketThread::Execute(void)
{
// Connect the "client" part of the Control socket:
m_ControlSocket1 = cSocket::CreateSocket();
cSocket::SockAddr_In Addr;
Addr.Family = cSocket::ADDRESS_FAMILY_INTERNET;
Addr.Address = cSocket::INTERNET_ADDRESS_LOCALHOST();
Addr.Port = m_ControlSocket1.GetPort();
Addr.Port = m_ControlSocket2.GetPort();
assert(Addr.Port != 0); // We checked in the Start() method, but let's be sure
if (m_ControlSocket2.Connect(Addr) != 0)
if (m_ControlSocket1.Connect(Addr) != 0)
{
LOGERROR("Cannot connect Control sockets for a cSocketThread (\"%s\"); continuing, but the server may be unreachable from now on.", cSocket::GetLastErrorString().c_str());
m_ControlSocket1.CloseSocket();
m_ControlSocket2.CloseSocket();
return;
}
@ -339,33 +353,35 @@ void cSocketThreads::cSocketThread::Execute(void)
{
// Put all sockets into the Read set:
fd_set fdRead;
cSocket::xSocket Highest = 0;
cSocket::xSocket Highest = m_ControlSocket1.GetSocket();
PrepareSet(&fdRead, Highest);
// Wait for the sockets:
if (select(Highest + 1, &fdRead, NULL, NULL, NULL) == -1)
{
LOGWARNING("select(R) call failed in cSocketThread: \"%s\"", cSocket::GetLastErrorString().c_str());
break;
LOG("select(R) call failed in cSocketThread: \"%s\"", cSocket::GetLastErrorString().c_str());
continue;
}
ReadFromSockets(&fdRead);
// Test sockets for writing:
fd_set fdWrite;
Highest = 0;
Highest = m_ControlSocket1.GetSocket();
PrepareSet(&fdWrite, Highest);
timeval Timeout;
Timeout.tv_sec = 0;
Timeout.tv_usec = 0;
if (select(Highest + 1, NULL, &fdWrite, NULL, &Timeout) == -1)
{
LOGWARNING("select(W) call failed in cSocketThread: \"%s\"", cSocket::GetLastErrorString().c_str());
break;
LOG("select(W) call failed in cSocketThread: \"%s\"", cSocket::GetLastErrorString().c_str());
continue;
}
WriteToSockets(&fdWrite);
RemoveClosedSockets();
} // while (!mShouldTerminate)
LOG("cSocketThread %p is terminating", this);
@ -381,7 +397,7 @@ void cSocketThreads::cSocketThread::PrepareSet(fd_set * a_Set, cSocket::xSocket
FD_SET(m_ControlSocket1.GetSocket(), a_Set);
cCSLock Lock(m_Parent->m_CS);
for (int i = m_NumSlots - 1; i > 0; --i)
for (int i = m_NumSlots - 1; i >= 0; --i)
{
if (!m_Slots[i].m_Socket->IsValid())
{
@ -403,8 +419,17 @@ void cSocketThreads::cSocketThread::PrepareSet(fd_set * a_Set, cSocket::xSocket
void cSocketThreads::cSocketThread::ReadFromSockets(fd_set * a_Read)
{
// Read on available sockets:
// Reset Control socket state:
if (FD_ISSET(m_ControlSocket1.GetSocket(), a_Read))
{
char Dummy[128];
m_ControlSocket1.Receive(Dummy, sizeof(Dummy), 0);
}
// Read from clients:
cCSLock Lock(m_Parent->m_CS);
for (int i = m_NumSlots - 1; i > 0; --i)
for (int i = m_NumSlots - 1; i >= 0; --i)
{
if (!FD_ISSET(m_Slots[i].m_Socket->GetSocket(), a_Read))
{
@ -437,8 +462,9 @@ void cSocketThreads::cSocketThread::ReadFromSockets(fd_set * a_Read)
void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write)
{
// Write to available client sockets:
cCSLock Lock(m_Parent->m_CS);
for (int i = m_NumSlots - 1; i > 0; --i)
for (int i = m_NumSlots - 1; i >= 0; --i)
{
if (!FD_ISSET(m_Slots[i].m_Socket->GetSocket(), a_Write))
{
@ -464,9 +490,40 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write)
return;
}
m_Slots[i].m_Outgoing.erase(0, Sent);
// _X: If there's data left, it means the client is not reading fast enough, the server would unnecessarily spin in the main loop with zero actions taken; so signalling is disabled
// This means that if there's data left, it will be sent only when there's incoming data or someone queues another packet (for any socket handled by this thread)
/*
// If there's any data left, signalize the Control socket:
if (!m_Slots[i].m_Outgoing.empty())
{
assert(m_ControlSocket2.IsValid());
m_ControlSocket2.Send("q", 1);
}
*/
} // for i - m_Slots[i]
}
void cSocketThreads::cSocketThread::RemoveClosedSockets(void)
{
// Removes sockets that have closed from m_Slots[]
cCSLock Lock(m_Parent->m_CS);
for (int i = m_NumSlots - 1; i >= 0; --i)
{
if (m_Slots[i].m_Socket->IsValid())
{
continue;
}
m_Slots[i] = m_Slots[m_NumSlots - 1];
m_NumSlots--;
} // for i - m_Slots[]
}

View File

@ -10,7 +10,7 @@
/// How many clients should one thread handle? (must be less than FD_SETSIZE - 1 for your platform)
#define MAX_SLOTS 1
#define MAX_SLOTS 63
@ -65,17 +65,17 @@ public:
cSocketThreads(void);
~cSocketThreads();
/// Add a (socket, client) pair for processing, data from a_Socket is to be sent to a_Client
void AddClient(cSocket * a_Socket, cCallback * a_Client);
/// Add a (socket, client) pair for processing, data from a_Socket is to be sent to a_Client; returns true if successful
bool AddClient(cSocket * a_Socket, cCallback * a_Client);
/// Remove the socket (and associated client) from processing
void RemoveClient(cSocket * a_Socket);
void RemoveClient(const cSocket * a_Socket);
/// Remove the associated socket and the client from processing
void RemoveClient(cCallback * a_Client);
void RemoveClient(const cCallback * a_Client);
/// Notify the thread responsible for a_Client that the client has something to write
void NotifyWrite(cCallback * a_Client);
void NotifyWrite(const cCallback * a_Client);
private:
@ -93,14 +93,16 @@ private:
bool IsEmpty (void) const {return m_NumSlots == 0; }
void AddClient (cSocket * a_Socket, cCallback * a_Client);
bool RemoveClient(cCallback * a_Client); // Returns true if removed, false if not found
bool RemoveSocket(cSocket * a_Socket); // Returns true if removed, false if not found
bool HasClient (cCallback * a_Client) const;
bool HasSocket (cSocket * a_Socket) const;
bool NotifyWrite (cCallback * a_Client); // Returns true if client handled by this thread
bool RemoveClient(const cCallback * a_Client); // Returns true if removed, false if not found
bool RemoveSocket(const cSocket * a_Socket); // Returns true if removed, false if not found
bool HasClient (const cCallback * a_Client) const;
bool HasSocket (const cSocket * a_Socket) const;
bool NotifyWrite (const cCallback * a_Client); // Returns true if client handled by this thread
bool Start(void); // Hide the cIsThread's Start method, we need to provide our own startup to create the control socket
bool IsValid(void) const {return m_ControlSocket2.IsValid(); } // If the Control socket dies, the thread is not valid anymore
private:
cSocketThreads * m_Parent;
@ -127,6 +129,7 @@ private:
void PrepareSet (fd_set * a_Set, cSocket::xSocket & a_Highest); // Puts all sockets into the set, along with m_ControlSocket1
void ReadFromSockets(fd_set * a_Read); // Reads from sockets indicated in a_Read
void WriteToSockets (fd_set * a_Write); // Writes to sockets indicated in a_Write
void RemoveClosedSockets(void); // Removes sockets that have closed from m_Slots[]
} ;
typedef std::list<cSocketThread *> cSocketThreadList;