2012-02-07 02:49:00 -05:00
// cSocketThreads.cpp
// Implements the cSocketThreads class representing the heart of MCS's client networking.
// This object takes care of network communication, groups sockets into threads and uses as little threads as possible for full read / write support
// For more detail, see http://forum.mc-server.org/showthread.php?tid=327
# include "Globals.h"
# include "cSocketThreads.h"
# include "cClientHandle.h"
# include "packets/cPacket_RelativeEntityMoveLook.h"
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// cSocketThreads:
cSocketThreads : : cSocketThreads ( void )
{
}
cSocketThreads : : ~ cSocketThreads ( )
{
for ( cSocketThreadList : : iterator itr = m_Threads . begin ( ) ; itr ! = m_Threads . end ( ) ; + + itr )
{
delete * itr ;
} // for itr - m_Threads[]
m_Threads . clear ( ) ;
}
2012-02-08 05:02:46 -05:00
bool cSocketThreads : : AddClient ( cSocket * a_Socket , cCallback * a_Client )
2012-02-07 02:49:00 -05:00
{
// Add a (socket, client) pair for processing, data from a_Socket is to be sent to a_Client
// Try to add to existing threads:
cCSLock Lock ( m_CS ) ;
for ( cSocketThreadList : : iterator itr = m_Threads . begin ( ) ; itr ! = m_Threads . end ( ) ; + + itr )
{
2012-02-08 05:02:46 -05:00
if ( ( * itr ) - > IsValid ( ) & & ( * itr ) - > HasEmptySlot ( ) )
2012-02-07 02:49:00 -05:00
{
( * itr ) - > AddClient ( a_Socket , a_Client ) ;
2012-02-08 05:02:46 -05:00
return true ;
2012-02-07 02:49:00 -05:00
}
}
// No thread has free space, create a new one:
2012-02-08 05:02:46 -05:00
LOG ( " Creating a new cSocketThread (currently have %d) " , m_Threads . size ( ) ) ;
2012-02-07 02:49:00 -05:00
cSocketThread * Thread = new cSocketThread ( this ) ;
2012-02-08 05:02:46 -05:00
if ( ! Thread - > Start ( ) )
{
// There was an error launching the thread (but it was already logged along with the reason)
delete Thread ;
return false ;
}
2012-02-07 02:49:00 -05:00
Thread - > AddClient ( a_Socket , a_Client ) ;
m_Threads . push_back ( Thread ) ;
2012-02-08 05:02:46 -05:00
return true ;
2012-02-07 02:49:00 -05:00
}
2012-02-08 05:02:46 -05:00
void cSocketThreads : : RemoveClient ( const cSocket * a_Socket )
2012-02-07 02:49:00 -05:00
{
// Remove the socket (and associated client) from processing
cCSLock Lock ( m_CS ) ;
for ( cSocketThreadList : : iterator itr = m_Threads . begin ( ) ; itr ! = m_Threads . end ( ) ; + + itr )
{
if ( ( * itr ) - > RemoveSocket ( a_Socket ) )
{
return ;
}
}
}
2012-02-08 05:02:46 -05:00
void cSocketThreads : : RemoveClient ( const cCallback * a_Client )
2012-02-07 02:49:00 -05:00
{
// Remove the associated socket and the client from processing
cCSLock Lock ( m_CS ) ;
for ( cSocketThreadList : : iterator itr = m_Threads . begin ( ) ; itr ! = m_Threads . end ( ) ; + + itr )
{
if ( ( * itr ) - > RemoveClient ( a_Client ) )
{
return ;
}
}
}
2012-02-08 05:02:46 -05:00
void cSocketThreads : : NotifyWrite ( const cCallback * a_Client )
2012-02-07 02:49:00 -05:00
{
// Notifies the thread responsible for a_Client that the client has something to write
cCSLock Lock ( m_CS ) ;
for ( cSocketThreadList : : iterator itr = m_Threads . begin ( ) ; itr ! = m_Threads . end ( ) ; + + itr )
{
if ( ( * itr ) - > NotifyWrite ( a_Client ) )
{
return ;
}
}
}
////////////////////////////////////////////////////////////////////////////////
// cSocketThreads::cSocketThread:
cSocketThreads : : cSocketThread : : cSocketThread ( cSocketThreads * a_Parent ) :
cIsThread ( " cSocketThread " ) ,
m_Parent ( a_Parent ) ,
m_NumSlots ( 0 )
{
// Nothing needed yet
}
void cSocketThreads : : cSocketThread : : AddClient ( cSocket * a_Socket , cCallback * a_Client )
{
assert ( m_NumSlots < MAX_SLOTS ) ; // Use HasEmptySlot() to check before adding
m_Slots [ m_NumSlots ] . m_Client = a_Client ;
m_Slots [ m_NumSlots ] . m_Socket = a_Socket ;
m_Slots [ m_NumSlots ] . m_Outgoing . clear ( ) ;
m_NumSlots + + ;
// Notify the thread of the change:
assert ( m_ControlSocket2 . IsValid ( ) ) ;
m_ControlSocket2 . Send ( " a " , 1 ) ;
}
2012-02-08 05:02:46 -05:00
bool cSocketThreads : : cSocketThread : : RemoveClient ( const cCallback * a_Client )
2012-02-07 02:49:00 -05:00
{
// Returns true if removed, false if not found
if ( m_NumSlots = = 0 )
{
return false ;
}
2012-02-08 05:02:46 -05:00
for ( int i = m_NumSlots - 1 ; i > = 0 ; - - i )
2012-02-07 02:49:00 -05:00
{
if ( m_Slots [ i ] . m_Client ! = a_Client )
{
continue ;
}
// Found, remove it:
m_Slots [ i ] = m_Slots [ m_NumSlots - 1 ] ;
m_NumSlots - - ;
// Notify the thread of the change:
assert ( m_ControlSocket2 . IsValid ( ) ) ;
m_ControlSocket2 . Send ( " r " , 1 ) ;
return true ;
} // for i - m_Slots[]
// Not found
return false ;
}
2012-02-08 05:02:46 -05:00
bool cSocketThreads : : cSocketThread : : RemoveSocket ( const cSocket * a_Socket )
2012-02-07 02:49:00 -05:00
{
// Returns true if removed, false if not found
if ( m_NumSlots = = 0 )
{
return false ;
}
2012-02-08 05:02:46 -05:00
for ( int i = m_NumSlots - 1 ; i > = 0 ; - - i )
2012-02-07 02:49:00 -05:00
{
if ( m_Slots [ i ] . m_Socket ! = a_Socket )
{
continue ;
}
// Found, remove it:
m_Slots [ i ] = m_Slots [ m_NumSlots - 1 ] ;
m_NumSlots - - ;
// Notify the thread of the change:
assert ( m_ControlSocket2 . IsValid ( ) ) ;
m_ControlSocket2 . Send ( " r " , 1 ) ;
return true ;
} // for i - m_Slots[]
// Not found
return false ;
}
2012-02-08 05:02:46 -05:00
bool cSocketThreads : : cSocketThread : : NotifyWrite ( const cCallback * a_Client )
2012-02-07 02:49:00 -05:00
{
if ( HasClient ( a_Client ) )
{
// Notify the thread that there's another packet in the queue:
assert ( m_ControlSocket2 . IsValid ( ) ) ;
m_ControlSocket2 . Send ( " q " , 1 ) ;
return true ;
}
return false ;
}
2012-02-08 05:02:46 -05:00
bool cSocketThreads : : cSocketThread : : HasClient ( const cCallback * a_Client ) const
2012-02-07 02:49:00 -05:00
{
for ( int i = m_NumSlots - 1 ; i > = 0 ; - - i )
{
if ( m_Slots [ i ] . m_Client = = a_Client )
{
return true ;
}
} // for i - m_Slots[]
return false ;
}
2012-02-08 05:02:46 -05:00
bool cSocketThreads : : cSocketThread : : HasSocket ( const cSocket * a_Socket ) const
2012-02-07 02:49:00 -05:00
{
for ( int i = m_NumSlots - 1 ; i > = 0 ; - - i )
{
2012-02-08 05:02:46 -05:00
if ( m_Slots [ i ] . m_Socket - > GetSocket ( ) = = a_Socket - > GetSocket ( ) )
2012-02-07 02:49:00 -05:00
{
return true ;
}
} // for i - m_Slots[]
return false ;
}
bool cSocketThreads : : cSocketThread : : Start ( void )
{
// Create the control socket listener
2012-02-08 05:02:46 -05:00
m_ControlSocket2 = cSocket : : CreateSocket ( ) ;
if ( ! m_ControlSocket2 . IsValid ( ) )
2012-02-07 02:49:00 -05:00
{
LOGERROR ( " Cannot create a Control socket for a cSocketThread ( \" %s \" ); continuing, but server may be unreachable from now on. " , cSocket : : GetLastErrorString ( ) . c_str ( ) ) ;
return false ;
}
cSocket : : SockAddr_In Addr ;
Addr . Family = cSocket : : ADDRESS_FAMILY_INTERNET ;
2012-02-07 16:16:34 -05:00
Addr . Address = cSocket : : INTERNET_ADDRESS_LOCALHOST ( ) ;
2012-02-07 02:49:00 -05:00
Addr . Port = 0 ; // Any free port is okay
2012-02-08 05:02:46 -05:00
if ( m_ControlSocket2 . Bind ( Addr ) ! = 0 )
2012-02-07 02:49:00 -05:00
{
LOGERROR ( " Cannot bind a Control socket for a cSocketThread ( \" %s \" ); continuing, but server may be unreachable from now on. " , cSocket : : GetLastErrorString ( ) . c_str ( ) ) ;
2012-02-08 05:02:46 -05:00
m_ControlSocket2 . CloseSocket ( ) ;
2012-02-07 02:49:00 -05:00
return false ;
}
2012-02-08 05:02:46 -05:00
if ( m_ControlSocket2 . Listen ( 1 ) ! = 0 )
{
LOGERROR ( " Cannot listen on a Control socket for a cSocketThread ( \" %s \" ); continuing, but server may be unreachable from now on. " , cSocket : : GetLastErrorString ( ) . c_str ( ) ) ;
m_ControlSocket2 . CloseSocket ( ) ;
return false ;
}
if ( m_ControlSocket2 . GetPort ( ) = = 0 )
2012-02-07 02:49:00 -05:00
{
LOGERROR ( " Cannot determine Control socket port ( \" %s \" ); conitnuing, but the server may be unreachable from now on. " , cSocket : : GetLastErrorString ( ) . c_str ( ) ) ;
2012-02-08 05:02:46 -05:00
m_ControlSocket2 . CloseSocket ( ) ;
2012-02-07 02:49:00 -05:00
return false ;
}
// Start the thread
if ( ! super : : Start ( ) )
{
2012-02-08 05:02:46 -05:00
m_ControlSocket2 . CloseSocket ( ) ;
2012-02-07 02:49:00 -05:00
return false ;
}
// Finish connecting the control socket by accepting connection from the thread's socket
2012-02-08 05:02:46 -05:00
cSocket tmp = m_ControlSocket2 . Accept ( ) ;
2012-02-07 02:49:00 -05:00
if ( ! tmp . IsValid ( ) )
{
LOGERROR ( " Cannot link Control sockets for a cSocketThread ( \" %s \" ); continuing, but server may be unreachable from now on. " , cSocket : : GetLastErrorString ( ) . c_str ( ) ) ;
2012-02-08 05:02:46 -05:00
m_ControlSocket2 . CloseSocket ( ) ;
2012-02-07 02:49:00 -05:00
return false ;
}
2012-02-08 05:02:46 -05:00
m_ControlSocket2 . CloseSocket ( ) ;
m_ControlSocket2 = tmp ;
2012-02-07 02:49:00 -05:00
return true ;
}
void cSocketThreads : : cSocketThread : : Execute ( void )
{
// Connect the "client" part of the Control socket:
2012-02-08 05:02:46 -05:00
m_ControlSocket1 = cSocket : : CreateSocket ( ) ;
2012-02-07 02:49:00 -05:00
cSocket : : SockAddr_In Addr ;
Addr . Family = cSocket : : ADDRESS_FAMILY_INTERNET ;
2012-02-07 16:16:34 -05:00
Addr . Address = cSocket : : INTERNET_ADDRESS_LOCALHOST ( ) ;
2012-02-08 05:02:46 -05:00
Addr . Port = m_ControlSocket2 . GetPort ( ) ;
2012-02-07 02:49:00 -05:00
assert ( Addr . Port ! = 0 ) ; // We checked in the Start() method, but let's be sure
2012-02-08 05:02:46 -05:00
if ( m_ControlSocket1 . Connect ( Addr ) ! = 0 )
2012-02-07 02:49:00 -05:00
{
LOGERROR ( " Cannot connect Control sockets for a cSocketThread ( \" %s \" ); continuing, but the server may be unreachable from now on. " , cSocket : : GetLastErrorString ( ) . c_str ( ) ) ;
2012-02-08 05:02:46 -05:00
m_ControlSocket2 . CloseSocket ( ) ;
2012-02-07 02:49:00 -05:00
return ;
}
// The main thread loop:
while ( ! mShouldTerminate )
{
// Put all sockets into the Read set:
fd_set fdRead ;
2012-02-08 05:02:46 -05:00
cSocket : : xSocket Highest = m_ControlSocket1 . GetSocket ( ) ;
2012-02-07 02:49:00 -05:00
PrepareSet ( & fdRead , Highest ) ;
// Wait for the sockets:
if ( select ( Highest + 1 , & fdRead , NULL , NULL , NULL ) = = - 1 )
{
2012-02-08 05:02:46 -05:00
LOG ( " select(R) call failed in cSocketThread: \" %s \" " , cSocket : : GetLastErrorString ( ) . c_str ( ) ) ;
continue ;
2012-02-07 02:49:00 -05:00
}
ReadFromSockets ( & fdRead ) ;
// Test sockets for writing:
fd_set fdWrite ;
2012-02-08 05:02:46 -05:00
Highest = m_ControlSocket1 . GetSocket ( ) ;
2012-02-07 02:49:00 -05:00
PrepareSet ( & fdWrite , Highest ) ;
timeval Timeout ;
Timeout . tv_sec = 0 ;
Timeout . tv_usec = 0 ;
if ( select ( Highest + 1 , NULL , & fdWrite , NULL , & Timeout ) = = - 1 )
{
2012-02-08 05:02:46 -05:00
LOG ( " select(W) call failed in cSocketThread: \" %s \" " , cSocket : : GetLastErrorString ( ) . c_str ( ) ) ;
continue ;
2012-02-07 02:49:00 -05:00
}
WriteToSockets ( & fdWrite ) ;
2012-02-08 05:02:46 -05:00
RemoveClosedSockets ( ) ;
2012-02-07 02:49:00 -05:00
} // while (!mShouldTerminate)
LOG ( " cSocketThread %p is terminating " , this ) ;
}
void cSocketThreads : : cSocketThread : : PrepareSet ( fd_set * a_Set , cSocket : : xSocket & a_Highest )
{
FD_ZERO ( a_Set ) ;
FD_SET ( m_ControlSocket1 . GetSocket ( ) , a_Set ) ;
cCSLock Lock ( m_Parent - > m_CS ) ;
2012-02-08 05:02:46 -05:00
for ( int i = m_NumSlots - 1 ; i > = 0 ; - - i )
2012-02-07 02:49:00 -05:00
{
if ( ! m_Slots [ i ] . m_Socket - > IsValid ( ) )
{
continue ;
}
cSocket : : xSocket s = m_Slots [ i ] . m_Socket - > GetSocket ( ) ;
FD_SET ( s , a_Set ) ;
if ( s > a_Highest )
{
a_Highest = s ;
}
} // for i - m_Slots[]
}
void cSocketThreads : : cSocketThread : : ReadFromSockets ( fd_set * a_Read )
{
// Read on available sockets:
2012-02-08 05:02:46 -05:00
// Reset Control socket state:
if ( FD_ISSET ( m_ControlSocket1 . GetSocket ( ) , a_Read ) )
{
char Dummy [ 128 ] ;
m_ControlSocket1 . Receive ( Dummy , sizeof ( Dummy ) , 0 ) ;
}
// Read from clients:
2012-02-07 02:49:00 -05:00
cCSLock Lock ( m_Parent - > m_CS ) ;
2012-02-08 05:02:46 -05:00
for ( int i = m_NumSlots - 1 ; i > = 0 ; - - i )
2012-02-07 02:49:00 -05:00
{
if ( ! FD_ISSET ( m_Slots [ i ] . m_Socket - > GetSocket ( ) , a_Read ) )
{
continue ;
}
char Buffer [ 1024 ] ;
int Received = m_Slots [ i ] . m_Socket - > Receive ( Buffer , ARRAYCOUNT ( Buffer ) , 0 ) ;
if ( Received = = 0 )
{
// The socket has been closed by the remote party, close our socket and let it be removed after we process all reading
m_Slots [ i ] . m_Socket - > CloseSocket ( ) ;
m_Slots [ i ] . m_Client - > SocketClosed ( ) ;
}
else if ( Received > 0 )
{
m_Slots [ i ] . m_Client - > DataReceived ( Buffer , Received ) ;
}
else
{
// The socket has encountered an error, close it and let it be removed after we process all reading
m_Slots [ i ] . m_Socket - > CloseSocket ( ) ;
m_Slots [ i ] . m_Client - > SocketClosed ( ) ;
}
} // for i - m_Slots[]
}
void cSocketThreads : : cSocketThread : : WriteToSockets ( fd_set * a_Write )
{
2012-02-08 05:02:46 -05:00
// Write to available client sockets:
2012-02-07 02:49:00 -05:00
cCSLock Lock ( m_Parent - > m_CS ) ;
2012-02-08 05:02:46 -05:00
for ( int i = m_NumSlots - 1 ; i > = 0 ; - - i )
2012-02-07 02:49:00 -05:00
{
if ( ! FD_ISSET ( m_Slots [ i ] . m_Socket - > GetSocket ( ) , a_Write ) )
{
continue ;
}
if ( m_Slots [ i ] . m_Outgoing . empty ( ) )
{
// Request another chunk of outgoing data:
m_Slots [ i ] . m_Client - > GetOutgoingData ( m_Slots [ i ] . m_Outgoing ) ;
if ( m_Slots [ i ] . m_Outgoing . empty ( ) )
{
// Nothing ready yet
continue ;
}
} // if (outgoing data is empty)
int Sent = m_Slots [ i ] . m_Socket - > Send ( m_Slots [ i ] . m_Outgoing . data ( ) , m_Slots [ i ] . m_Outgoing . size ( ) ) ;
if ( Sent < 0 )
{
LOGWARNING ( " Error while writing to client \" %s \" , disconnecting " , m_Slots [ i ] . m_Socket - > GetIPString ( ) . c_str ( ) ) ;
m_Slots [ i ] . m_Socket - > CloseSocket ( ) ;
m_Slots [ i ] . m_Client - > SocketClosed ( ) ;
return ;
}
m_Slots [ i ] . m_Outgoing . erase ( 0 , Sent ) ;
2012-02-08 05:02:46 -05:00
// _X: If there's data left, it means the client is not reading fast enough, the server would unnecessarily spin in the main loop with zero actions taken; so signalling is disabled
// This means that if there's data left, it will be sent only when there's incoming data or someone queues another packet (for any socket handled by this thread)
/*
// If there's any data left, signalize the Control socket:
if ( ! m_Slots [ i ] . m_Outgoing . empty ( ) )
{
assert ( m_ControlSocket2 . IsValid ( ) ) ;
m_ControlSocket2 . Send ( " q " , 1 ) ;
}
*/
2012-02-07 16:16:34 -05:00
} // for i - m_Slots[i]
2012-02-07 02:49:00 -05:00
}
2012-02-08 05:02:46 -05:00
void cSocketThreads : : cSocketThread : : RemoveClosedSockets ( void )
{
// Removes sockets that have closed from m_Slots[]
cCSLock Lock ( m_Parent - > m_CS ) ;
for ( int i = m_NumSlots - 1 ; i > = 0 ; - - i )
{
if ( m_Slots [ i ] . m_Socket - > IsValid ( ) )
{
continue ;
}
m_Slots [ i ] = m_Slots [ m_NumSlots - 1 ] ;
m_NumSlots - - ;
} // for i - m_Slots[]
}