Avoid copying STK Events for networking.

This commit is contained in:
hiker 2015-10-22 00:02:40 +11:00
parent f297c92039
commit d6e7ddf000
5 changed files with 45 additions and 36 deletions

View File

@ -81,18 +81,6 @@ Event::Event(ENetEvent* event)
} }
} // Event(ENetEvent) } // Event(ENetEvent)
// ----------------------------------------------------------------------------
/** \brief Constructor
* \param event : The event to copy.
*/
Event::Event(const Event& event)
{
m_type = event.m_type;
m_packet = NULL;
m_data = event.m_data;
m_peer = event.m_peer;
} // Event(Event)
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
/** \brief Destructor that frees the memory of the package. /** \brief Destructor that frees the memory of the package.
*/ */

View File

@ -26,6 +26,7 @@
#include "network/stk_peer.hpp" #include "network/stk_peer.hpp"
#include "network/network_string.hpp" #include "network/network_string.hpp"
#include "utils/leak_check.hpp"
#include "utils/types.hpp" #include "utils/types.hpp"
/*! /*!
@ -52,6 +53,7 @@ enum EVENT_TYPE
class Event class Event
{ {
private: private:
LEAK_CHECK()
/** Copy of the data passed by the event. */ /** Copy of the data passed by the event. */
NetworkString m_data; NetworkString m_data;
@ -66,7 +68,6 @@ private:
public: public:
Event(ENetEvent* event); Event(ENetEvent* event);
Event(const Event& event);
~Event(); ~Event();
void removeFront(int size); void removeFront(int size);

View File

@ -99,58 +99,67 @@ void ProtocolManager::abort()
void ProtocolManager::propagateEvent(Event* event) void ProtocolManager::propagateEvent(Event* event)
{ {
m_events_to_process.lock(); m_events_to_process.lock();
Event* event2 = new Event(*event);
// register protocols that will receive this event // register protocols that will receive this event
std::vector<unsigned int> protocols_ids;
PROTOCOL_TYPE searched_protocol = PROTOCOL_NONE; PROTOCOL_TYPE searched_protocol = PROTOCOL_NONE;
if (event->getType() == EVENT_TYPE_MESSAGE) if (event->getType() == EVENT_TYPE_MESSAGE)
{ {
if (event2->data().size() > 0) if (event->data().size() > 0)
{ {
searched_protocol = (PROTOCOL_TYPE)(event2->data()[0]); searched_protocol = (PROTOCOL_TYPE)(event->data()[0]);
event2->removeFront(1); event->removeFront(1);
} }
else else
{ {
Log::warn("ProtocolManager", "Not enough data."); Log::warn("ProtocolManager", "Not enough data.");
} }
} }
if (event->getType() == EVENT_TYPE_CONNECTED) else if (event->getType() == EVENT_TYPE_CONNECTED)
{ {
searched_protocol = PROTOCOL_CONNECTION; searched_protocol = PROTOCOL_CONNECTION;
} }
Log::verbose("ProtocolManager", "Received event for protocols of type %d", Log::verbose("ProtocolManager", "Received event for protocols of type %d",
searched_protocol); searched_protocol);
std::vector<unsigned int> protocols_ids;
m_protocols.lock(); m_protocols.lock();
for (unsigned int i = 0; i < m_protocols.getData().size() ; i++) for (unsigned int i = 0; i < m_protocols.getData().size() ; i++)
{ {
const ProtocolInfo &pi = m_protocols.getData()[i];
// Pass data to protocols even when paused // Pass data to protocols even when paused
if (m_protocols.getData()[i].protocol->getProtocolType() == searched_protocol || if (pi.protocol->getProtocolType() == searched_protocol ||
event->getType() == EVENT_TYPE_DISCONNECTED) event->getType() == EVENT_TYPE_DISCONNECTED)
{ {
protocols_ids.push_back(m_protocols.getData()[i].id); protocols_ids.push_back(pi.id);
}
} }
} // for i in m_protocols
m_protocols.unlock(); m_protocols.unlock();
// no protocol was aimed, show the msg to debug // no protocol was aimed, show the msg to debug
if (searched_protocol == PROTOCOL_NONE) if (searched_protocol == PROTOCOL_NONE)
{ {
Log::debug("ProtocolManager", "NO PROTOCOL : Message is \"%s\"", Log::debug("ProtocolManager", "NO PROTOCOL : Message is \"%s\"",
event2->data().std_string().c_str()); event->data().std_string().c_str());
} }
if (protocols_ids.size() != 0) if (protocols_ids.size() != 0)
{ {
EventProcessingInfo epi; EventProcessingInfo epi;
epi.arrival_time = (double)StkTime::getTimeSinceEpoch(); epi.arrival_time = (double)StkTime::getTimeSinceEpoch();
epi.event = event2; epi.event = event;
epi.protocols_ids = protocols_ids; epi.protocols_ids = protocols_ids;
m_events_to_process.getData().push_back(epi); // add the event to the queue // Add the event to the queue. After the event is handled
// its memory will be freed.
m_events_to_process.getData().push_back(epi);
} }
else else
{
Log::warn("ProtocolManager", Log::warn("ProtocolManager",
"Received an event for %d that has no destination protocol.", "Received an event for %d that has no destination protocol.",
searched_protocol); searched_protocol);
// Free the memory for the vent
delete event;
}
m_events_to_process.unlock(); m_events_to_process.unlock();
} // propagateEvent } // propagateEvent
@ -354,8 +363,11 @@ void ProtocolManager::protocolTerminated(ProtocolInfo protocol)
} // protocolTerminated } // protocolTerminated
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
bool ProtocolManager::propagateEvent(EventProcessingInfo* event, bool synchronous) /** Sends the event to the corresponding protocol.
*/
bool ProtocolManager::sendEvent(EventProcessingInfo* event, bool synchronous)
{ {
m_protocols.lock();
int index = 0; int index = 0;
for (unsigned int i = 0; i < m_protocols.getData().size(); i++) for (unsigned int i = 0; i < m_protocols.getData().size(); i++)
{ {
@ -374,26 +386,27 @@ bool ProtocolManager::propagateEvent(EventProcessingInfo* event, bool synchronou
index++; index++;
} }
} }
m_protocols.unlock();
if (event->protocols_ids.size() == 0 || if (event->protocols_ids.size() == 0 ||
(StkTime::getTimeSinceEpoch()-event->arrival_time) >= TIME_TO_KEEP_EVENTS) (StkTime::getTimeSinceEpoch()-event->arrival_time) >= TIME_TO_KEEP_EVENTS)
{ {
// because we made a copy of the event
delete event->event; delete event->event;
return true; return true;
} }
return false; return false;
} // propagateEvent } // sendEvent
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
void ProtocolManager::update() void ProtocolManager::update()
{ {
// before updating, notice protocols that they have received events // before updating, notify protocols that they have received events
m_events_to_process.lock(); m_events_to_process.lock();
int size = (int)m_events_to_process.getData().size(); int size = (int)m_events_to_process.getData().size();
int offset = 0; int offset = 0;
for (int i = 0; i < size; i++) for (int i = 0; i < size; i++)
{ {
bool result = propagateEvent(&m_events_to_process.getData()[i+offset], true); bool result = sendEvent(&m_events_to_process.getData()[i+offset], true);
if (result) if (result)
{ {
m_events_to_process.getData() m_events_to_process.getData()
@ -422,7 +435,7 @@ void ProtocolManager::asynchronousUpdate()
int offset = 0; int offset = 0;
for (int i = 0; i < size; i++) for (int i = 0; i < size; i++)
{ {
bool result = propagateEvent(&m_events_to_process.getData()[i+offset], false); bool result = sendEvent(&m_events_to_process.getData()[i+offset], false);
if (result) if (result)
{ {
m_events_to_process.getData() m_events_to_process.getData()

View File

@ -284,7 +284,7 @@ class ProtocolManager : public AbstractSingleton<ProtocolManager>,
*/ */
virtual void protocolTerminated(ProtocolInfo protocol); virtual void protocolTerminated(ProtocolInfo protocol);
bool propagateEvent(EventProcessingInfo* event, bool synchronous); bool sendEvent(EventProcessingInfo* event, bool synchronous);
// protected members // protected members
/** Contains the running protocols. /** Contains the running protocols.

View File

@ -179,12 +179,19 @@ void* STKHost::mainLoop(void* self)
{ {
while (enet_host_service(host, &event, 20) != 0) while (enet_host_service(host, &event, 20) != 0)
{ {
if (event.type == ENET_EVENT_TYPE_NONE)
continue;
// Create an STKEvent with the event data
Event* stk_event = new Event(&event); Event* stk_event = new Event(&event);
if (stk_event->getType() == EVENT_TYPE_MESSAGE) if (stk_event->getType() == EVENT_TYPE_MESSAGE)
logPacket(stk_event->data(), true); logPacket(stk_event->data(), true);
if (event.type != ENET_EVENT_TYPE_NONE)
// The event is forwarded to the NetworkManger and from there
// there to the ProtocolManager. The ProtocolManager is
// responsible for freeing the memory.
NetworkManager::getInstance()->propagateEvent(stk_event); NetworkManager::getInstance()->propagateEvent(stk_event);
delete stk_event;
} // while enet_host_service } // while enet_host_service
} // while !mustStopListening } // while !mustStopListening
@ -373,7 +380,7 @@ uint8_t* STKHost::receiveRawPacket(const TransportAddress& sender,
} }
if (addr.sin_family == AF_INET) if (addr.sin_family == AF_INET)
{ {
TransportAddress a(addr.sin_addr.s_addr); TransportAddress a(ntohl(addr.sin_addr.s_addr));
Log::info("STKHost", "IPv4 Address of the sender was %s", Log::info("STKHost", "IPv4 Address of the sender was %s",
a.toString(false).c_str()); a.toString(false).c_str());
} }