Removed EventProcessingInfo, and pre-computing of protocols that should

receive an event. Solves potential problem that an event might arrive at
a client before the destination protocol is up and running. Also improved
handling of (dis)connects, now only protocols that can handle a disconnect
event will receive them, hopefully reducing the number of crashes when
a client disconnects.
This commit is contained in:
hiker
2016-03-09 07:46:33 +11:00
parent c6cef53c99
commit 087d491445
10 changed files with 87 additions and 134 deletions

View File

@@ -29,6 +29,8 @@
*/
Event::Event(ENetEvent* event)
{
m_arrival_time = (double)StkTime::getTimeSinceEpoch();
switch (event->type)
{
case ENET_EVENT_TYPE_CONNECT:

View File

@@ -24,12 +24,12 @@
#ifndef EVENT_HPP
#define EVENT_HPP
#include "network/network_string.hpp"
#include "utils/leak_check.hpp"
#include "utils/types.hpp"
#include "enet/enet.h"
class NetworkString;
class STKPeer;
/*!
@@ -67,6 +67,9 @@ private:
/** Pointer to the peer that triggered that event. */
STKPeer* m_peer;
/** Arrivial time of the event, for timeouts. */
double m_arrival_time;
public:
Event(ENetEvent* event);
~Event();
@@ -89,6 +92,15 @@ public:
* connection or disconnections. */
NetworkString& data() { return *m_data; }
// ------------------------------------------------------------------------
/** Determines if this event should be delivered synchronous or not.
* Only messages can be delivered synchronous. */
bool isSynchronous() const { return m_type==EVENT_TYPE_MESSAGE &&
m_data->isSynchronous(); }
// ------------------------------------------------------------------------
/** Returns the arrival time of this event. */
double getArrivalTime() const { return m_arrival_time; }
// ------------------------------------------------------------------------
}; // class Event

View File

@@ -34,10 +34,12 @@
*/
Protocol::Protocol(ProtocolType type, CallbackObject* callback_object)
{
m_callback_object = callback_object;
m_type = type;
m_state = PROTOCOL_STATE_INITIALISING;
m_id = 0;
m_callback_object = callback_object;
m_type = type;
m_state = PROTOCOL_STATE_INITIALISING;
m_id = 0;
m_handle_connections = false;
m_handle_disconnections = false;
} // Protocol
// ----------------------------------------------------------------------------

View File

@@ -105,6 +105,11 @@ protected:
/** The unique id of the protocol. */
uint32_t m_id;
/** True if this protocol should receive connection events. */
bool m_handle_connections;
/** TRue if this protocol should recceiver disconnection events. */
bool m_handle_disconnections;
public:
Protocol(ProtocolType type, CallbackObject* callback_object=NULL);
virtual ~Protocol();
@@ -177,7 +182,18 @@ public:
/** \brief Method to get a protocol's type.
* \return The protocol type. */
ProtocolType getProtocolType() const { return m_type; }
// ------------------------------------------------------------------------
/** Sets if this protocol should receive connection events. */
void setHandleConnections(bool b) { m_handle_connections = b; }
// ------------------------------------------------------------------------
/** Sets if this protocol should receive disconnection events. */
void setHandleDisconnections(bool b) { m_handle_disconnections = b; }
// ------------------------------------------------------------------------
/** Return true if this protocol should be informed about connects. */
virtual bool handleConnects() const { return m_handle_connections; }
// ------------------------------------------------------------------------
/** Return true if this protocol should be informed about disconnects. */
virtual bool handleDisconnects() const { return m_handle_disconnections; }
}; // class Protocol

View File

@@ -80,7 +80,7 @@ void ProtocolManager::abort()
m_events_to_process.lock();
for (unsigned int i = 0; i < m_events_to_process.getData().size() ; i++)
delete m_events_to_process.getData()[i].m_event;
delete m_events_to_process.getData()[i];
m_events_to_process.getData().clear();
m_events_to_process.unlock();
@@ -103,77 +103,9 @@ void ProtocolManager::abort()
void ProtocolManager::propagateEvent(Event* event)
{
m_events_to_process.lock();
// register protocols that will receive this event
ProtocolType searched_protocol = PROTOCOL_NONE;
if (event->getType() == EVENT_TYPE_MESSAGE)
{
if (event->data().size() > 0)
{
searched_protocol = event->data().getProtocolType();
}
else
{
Log::warn("ProtocolManager", "Not enough data.");
}
}
else if (event->getType() == EVENT_TYPE_CONNECTED)
{
searched_protocol = PROTOCOL_CONNECTION;
}
Log::verbose("ProtocolManager", "Received event for protocols of type %d",
searched_protocol);
std::vector<unsigned int> protocols_ids;
m_protocols.lock();
for (unsigned int i = 0; i < m_protocols.getData().size() ; i++)
{
const Protocol *p = m_protocols.getData()[i];
// Pass data to protocols even when paused
if (p->getProtocolType() == searched_protocol ||
event->getType() == EVENT_TYPE_DISCONNECTED)
{
protocols_ids.push_back(p->getId());
}
} // for i in m_protocols
m_protocols.unlock();
// no protocol was aimed, show the msg to debug
if (searched_protocol == PROTOCOL_NONE &&
event->getType() != EVENT_TYPE_DISCONNECTED)
{
if(event->getType()==EVENT_TYPE_MESSAGE)
{
Log::warn("ProtocolManager", "NO PROTOCOL. Message is:");
Log::warn("ProtocolManager", event->data().getLogMessage().c_str());
}
else
Log::debug("ProtocolManager", "NO PROTOCOL, no data");
}
if (protocols_ids.size() != 0)
{
EventProcessingInfo epi;
epi.m_arrival_time = (double)StkTime::getTimeSinceEpoch();
// Only message events will optionally be delivered synchronously
epi.m_is_synchronous = event->getType()==EVENT_TYPE_MESSAGE &&
event->data().isSynchronous();
epi.m_event = event;
epi.m_protocols_ids = protocols_ids;
// Add the event to the queue. After the event is handled
// its memory will be freed.
m_events_to_process.getData().push_back(epi);
}
else
{
Log::warn("ProtocolManager",
"Received an event for %d that has no destination protocol.",
searched_protocol);
// Free the memory for the vent
delete event;
}
m_events_to_process.getData().push_back(event);
m_events_to_process.unlock();
return;
} // propagateEvent
// ----------------------------------------------------------------------------
@@ -339,31 +271,41 @@ void ProtocolManager::terminateProtocol(Protocol *protocol)
// ----------------------------------------------------------------------------
/** Sends the event to the corresponding protocol.
*/
bool ProtocolManager::sendEvent(EventProcessingInfo* event, bool synchronous)
bool ProtocolManager::sendEvent(Event* event)
{
unsigned int index = 0;
while(index < event->m_protocols_ids.size())
m_protocols.lock();
int count=0;
for(unsigned int i=0; i<m_protocols.getData().size(); i++)
{
Protocol *p = getProtocol(event->m_protocols_ids[index]);
if(!p)
Protocol *p = m_protocols.getData()[i];
bool is_right_protocol = false;
switch(event->getType())
{
index++;
continue;
}
bool result = synchronous ? p->notifyEvent(event->m_event)
: p->notifyEventAsynchronous(event->m_event);
if (result)
{
event->m_protocols_ids.erase(event->m_protocols_ids.begin()+index);
}
else // !result
index++;
}
case EVENT_TYPE_MESSAGE:
is_right_protocol = event->data().getProtocolType()==p->getProtocolType();
break;
case EVENT_TYPE_DISCONNECTED:
is_right_protocol = p->handleDisconnects();
break;
case EVENT_TYPE_CONNECTED:
is_right_protocol = p->handleConnects();
break;
} // switch event->getType()
if (event->m_protocols_ids.size() == 0 ||
(StkTime::getTimeSinceEpoch()-event->m_arrival_time) >= TIME_TO_KEEP_EVENTS)
if( is_right_protocol)
{
count ++;
event->isSynchronous() ? p->notifyEvent(event)
: p->notifyEventAsynchronous(event);
}
} // for i in protocols
m_protocols.unlock();
if (count>0 || StkTime::getTimeSinceEpoch()-event->getArrivalTime()
>= TIME_TO_KEEP_EVENTS )
{
delete event->m_event;
delete event;
return true;
}
return false;
@@ -388,8 +330,8 @@ void ProtocolManager::update()
for (int i = 0; i < size; i++)
{
// Don't handle asynchronous events here.
if(!m_events_to_process.getData()[i+offset].m_is_synchronous) continue;
bool result = sendEvent(&m_events_to_process.getData()[i+offset], true);
if(!m_events_to_process.getData()[i+offset]->isSynchronous()) continue;
bool result = sendEvent(m_events_to_process.getData()[i+offset]);
if (result)
{
m_events_to_process.getData()
@@ -427,8 +369,8 @@ void ProtocolManager::asynchronousUpdate()
for (int i = 0; i < size; i++)
{
// Don't handle synchronous events here.
if(m_events_to_process.getData()[i+offset].m_is_synchronous) continue;
bool result = sendEvent(&m_events_to_process.getData()[i+offset], false);
if(m_events_to_process.getData()[i+offset]->isSynchronous()) continue;
bool result = sendEvent(m_events_to_process.getData()[i+offset]);
if (result)
{
m_events_to_process.getData()

View File

@@ -78,28 +78,6 @@ public:
Protocol *getProtocol() { return m_protocol; }
}; // class ProtocolRequest;
// ============================================================================
/** \struct ProtocolRequest
* \brief Used to pass the event to protocols that need it
*/
struct EventProcessingInfo
{
/** The event to process. */
Event* m_event;
/** Arrival time of the event. Used to time out events that are not
* handled in time (e.g. because the receiving protocol is not running).*/
double m_arrival_time;
/** The list of protocol ids to which this event can be
* sent to. */
std::vector<unsigned int> m_protocols_ids;
/** Indicates if this received message must be handled synchronously or
* asynchronously. */
bool m_is_synchronous;
}; // EventProcessingInfo
// ============================================================================
/** \class ProtocolManager
* \brief Manages the protocols at runtime.
@@ -125,7 +103,7 @@ private:
/** Contains the network events to pass asynchronously to protocols
* (i.e. from the separate ProtocolManager thread). */
Synchronised<std::vector<EventProcessingInfo> > m_events_to_process;
Synchronised<std::vector<Event*> > m_events_to_process;
/** Contains the requests to start/pause etc... protocols. */
Synchronised< std::vector<ProtocolRequest> > m_requests;
@@ -151,7 +129,7 @@ private:
virtual ~ProtocolManager();
static void* mainLoop(void *data);
uint32_t getNextProtocolId();
bool sendEvent(EventProcessingInfo* event, bool synchronous);
bool sendEvent(Event* event);
virtual void startProtocol(Protocol *protocol);
virtual void terminateProtocol(Protocol *protocol);

View File

@@ -40,6 +40,7 @@ ClientLobbyRoomProtocol(const TransportAddress& server_address)
{
m_server_address.copy(server_address);
m_server = NULL;
setHandleDisconnections(true);
} // ClientLobbyRoomProtocol
//-----------------------------------------------------------------------------
@@ -198,17 +199,13 @@ bool ClientLobbyRoomProtocol::notifyEventAsynchronous(Event* event)
return true;
} // message
else if (event->getType() == EVENT_TYPE_CONNECTED)
{
return true;
} // connection
else if (event->getType() == EVENT_TYPE_DISCONNECTED)
{
// This means we left essentially.
// We can't delete STKHost from this thread, since the main
// thread might still test if STKHost exists and then call
// the ProtocolManager, which might already have been deleted.
// So only signal tha the STKHost should exit, which will be tested
// So only signal that STKHost should exit, which will be tested
// from the main thread.
STKHost::get()->requestShutdown();
return true;

View File

@@ -42,9 +42,13 @@ ConnectToPeer::ConnectToPeer(uint32_t peer_id) : Protocol(PROTOCOL_CONNECTION)
m_state = NONE;
m_current_protocol = NULL;
m_is_lan = false;
setHandleConnections(true);
} // ConnectToPeer(peer_id)
// ----------------------------------------------------------------------------
/** Constructor for a LAN connection.
* \param address The address to connect to.
*/
ConnectToPeer::ConnectToPeer(const TransportAddress &address)
: Protocol(PROTOCOL_CONNECTION)
{
@@ -54,6 +58,7 @@ ConnectToPeer::ConnectToPeer(const TransportAddress &address)
m_state = RECEIVED_PEER_ADDRESS;
m_current_protocol = NULL;
m_is_lan = true;
setHandleConnections(true);
} // ConnectToPeers(TransportAddress)
// ----------------------------------------------------------------------------

View File

@@ -50,6 +50,7 @@ ConnectToServer::ConnectToServer() : Protocol(PROTOCOL_CONNECTION)
m_host_id = 0;
m_quick_join = true;
m_server_address.clear();
setHandleConnections(true);
} // ConnectToServer()
// ----------------------------------------------------------------------------
@@ -65,7 +66,7 @@ ConnectToServer::ConnectToServer(uint32_t server_id, uint32_t host_id)
m_quick_join = false;
const Server *server = ServersManager::get()->getServerByID(server_id);
m_server_address.copy(server->getAddress());
setHandleConnections(true);
} // ConnectToServer(server, host)
// ----------------------------------------------------------------------------

View File

@@ -44,6 +44,7 @@
*/
ServerLobbyRoomProtocol::ServerLobbyRoomProtocol() : LobbyRoomProtocol(NULL)
{
setHandleDisconnections(true);
} // ServerLobbyRoomProtocol
//-----------------------------------------------------------------------------
@@ -99,9 +100,6 @@ bool ServerLobbyRoomProtocol::notifyEventAsynchronous(Event* event)
} // switch
} // if (event->getType() == EVENT_TYPE_MESSAGE)
else if (event->getType() == EVENT_TYPE_CONNECTED)
{
} // if (event->getType() == EVENT_TYPE_CONNECTED)
else if (event->getType() == EVENT_TYPE_DISCONNECTED)
{
kartDisconnected(event);