diff --git a/src/network/protocol_manager.cpp b/src/network/protocol_manager.cpp index b156b8397..3d3a944b6 100644 --- a/src/network/protocol_manager.cpp +++ b/src/network/protocol_manager.cpp @@ -31,7 +31,6 @@ ProtocolManager::ProtocolManager() { - pthread_mutex_init(&m_events_mutex, NULL); pthread_mutex_init(&m_protocols_mutex, NULL); pthread_mutex_init(&m_asynchronous_protocols_mutex, NULL); pthread_mutex_init(&m_requests_mutex, NULL); @@ -72,25 +71,24 @@ void ProtocolManager::abort() { pthread_mutex_unlock(&m_exit_mutex); // will stop the update function pthread_join(*m_asynchronous_update_thread, NULL); // wait the thread to finish - pthread_mutex_lock(&m_events_mutex); + m_events_to_process.lock(); pthread_mutex_lock(&m_protocols_mutex); pthread_mutex_lock(&m_asynchronous_protocols_mutex); pthread_mutex_lock(&m_requests_mutex); pthread_mutex_lock(&m_id_mutex); for (unsigned int i = 0; i < m_protocols.size() ; i++) delete m_protocols[i].protocol; - for (unsigned int i = 0; i < m_events_to_process.size() ; i++) - delete m_events_to_process[i].event; + for (unsigned int i = 0; i < m_events_to_process.getData().size() ; i++) + delete m_events_to_process.getData()[i].event; m_protocols.clear(); m_requests.clear(); - m_events_to_process.clear(); - pthread_mutex_unlock(&m_events_mutex); + m_events_to_process.getData().clear(); + m_events_to_process.unlock(); pthread_mutex_unlock(&m_protocols_mutex); pthread_mutex_unlock(&m_asynchronous_protocols_mutex); pthread_mutex_unlock(&m_requests_mutex); pthread_mutex_unlock(&m_id_mutex); - pthread_mutex_destroy(&m_events_mutex); pthread_mutex_destroy(&m_protocols_mutex); pthread_mutex_destroy(&m_asynchronous_protocols_mutex); pthread_mutex_destroy(&m_requests_mutex); @@ -101,7 +99,7 @@ void ProtocolManager::abort() // ---------------------------------------------------------------------------- void ProtocolManager::notifyEvent(Event* event) { - pthread_mutex_lock(&m_events_mutex); + m_events_to_process.lock(); Event* event2 = new Event(*event); // register protocols that will receive this event std::vector protocols_ids; @@ -148,13 +146,13 @@ void ProtocolManager::notifyEvent(Event* event) epi.arrival_time = (double)StkTime::getTimeSinceEpoch(); epi.event = event2; epi.protocols_ids = protocols_ids; - m_events_to_process.push_back(epi); // add the event to the queue + m_events_to_process.getData().push_back(epi); // add the event to the queue } else Log::warn("ProtocolManager", "Received an event for %d that has no destination protocol.", searched_protocol); - pthread_mutex_unlock(&m_events_mutex); + m_events_to_process.lock(); } // notifyEvent // ---------------------------------------------------------------------------- @@ -386,20 +384,21 @@ bool ProtocolManager::propagateEvent(EventProcessingInfo* event, bool synchronou void ProtocolManager::update() { // before updating, notice protocols that they have received events - pthread_mutex_lock(&m_events_mutex); // secure threads - int size = (int)m_events_to_process.size(); + m_events_to_process.lock(); + int size = (int)m_events_to_process.getData().size(); int offset = 0; for (int i = 0; i < size; i++) { - bool result = propagateEvent(&m_events_to_process[i+offset], true); + bool result = propagateEvent(&m_events_to_process.getData()[i+offset], true); if (result) { - m_events_to_process.erase(m_events_to_process.begin()+i+offset, - m_events_to_process.begin()+i+offset+1); + m_events_to_process.getData() + .erase(m_events_to_process.getData().begin()+i+offset, + m_events_to_process.getData().begin()+i+offset+1); offset --; } } - pthread_mutex_unlock(&m_events_mutex); // release the mutex + m_events_to_process.unlock(); // now update all protocols pthread_mutex_lock(&m_protocols_mutex); for (unsigned int i = 0; i < m_protocols.size(); i++) @@ -414,20 +413,21 @@ void ProtocolManager::update() void ProtocolManager::asynchronousUpdate() { // before updating, notice protocols that they have received information - pthread_mutex_lock(&m_events_mutex); // secure threads - int size = (int)m_events_to_process.size(); + m_events_to_process.lock(); + int size = (int)m_events_to_process.getData().size(); int offset = 0; for (int i = 0; i < size; i++) { - bool result = propagateEvent(&m_events_to_process[i+offset], false); + bool result = propagateEvent(&m_events_to_process.getData()[i+offset], false); if (result) { - m_events_to_process.erase(m_events_to_process.begin()+i+offset, - m_events_to_process.begin()+i+offset+1); + m_events_to_process.getData() + .erase(m_events_to_process.getData().begin()+i+offset, + m_events_to_process.getData().begin()+i+offset+1); offset --; } } - pthread_mutex_unlock(&m_events_mutex); // release the mutex + m_events_to_process.unlock(); // now update all protocols that need to be updated in asynchronous mode pthread_mutex_lock(&m_asynchronous_protocols_mutex); diff --git a/src/network/protocol_manager.hpp b/src/network/protocol_manager.hpp index 4413fac08..edf3a584b 100644 --- a/src/network/protocol_manager.hpp +++ b/src/network/protocol_manager.hpp @@ -28,6 +28,7 @@ #include "network/protocol.hpp" #include "utils/no_copy.hpp" #include "utils/singleton.hpp" +#include "utils/synchronised.hpp" #include "utils/types.hpp" #include @@ -289,19 +290,16 @@ class ProtocolManager : public AbstractSingleton, bool propagateEvent(EventProcessingInfo* event, bool synchronous); // protected members - /*! - * \brief Contains the running protocols. - * This stores the protocols that are either running or paused, their + /** Contains the running protocols. + * This stores the protocols that are either running or paused, their * state and their unique id. */ std::vector m_protocols; - /*! - * \brief Contains the network events to pass to protocols. - */ - std::vector m_events_to_process; - /*! - * \brief Contains the requests to start/stop etc... protocols. - */ + + /** Contains the network events to pass to protocols. */ + Synchronised > m_events_to_process; + + /** Contains the requests to start/stop etc... protocols. */ std::vector m_requests; /*! \brief The next id to assign to a protocol. * This value is incremented by 1 each time a protocol is started. @@ -311,8 +309,6 @@ class ProtocolManager : public AbstractSingleton, uint32_t m_next_protocol_id; // mutexes: - /*! Used to ensure that the event queue is used thread-safely. */ - pthread_mutex_t m_events_mutex; /*! Used to ensure that the protocol vector is used thread-safely. */ pthread_mutex_t m_protocols_mutex; /*! Used to ensure that the protocol vector is used thread-safely. */