Used Synchronised instead of pthread mutex.

This commit is contained in:
hiker 2015-10-12 08:18:15 +11:00
parent 124420b90a
commit 47cb6ac2e3
2 changed files with 30 additions and 34 deletions

View File

@ -31,7 +31,6 @@
ProtocolManager::ProtocolManager() ProtocolManager::ProtocolManager()
{ {
pthread_mutex_init(&m_events_mutex, NULL);
pthread_mutex_init(&m_protocols_mutex, NULL); pthread_mutex_init(&m_protocols_mutex, NULL);
pthread_mutex_init(&m_asynchronous_protocols_mutex, NULL); pthread_mutex_init(&m_asynchronous_protocols_mutex, NULL);
pthread_mutex_init(&m_requests_mutex, NULL); pthread_mutex_init(&m_requests_mutex, NULL);
@ -72,25 +71,24 @@ void ProtocolManager::abort()
{ {
pthread_mutex_unlock(&m_exit_mutex); // will stop the update function pthread_mutex_unlock(&m_exit_mutex); // will stop the update function
pthread_join(*m_asynchronous_update_thread, NULL); // wait the thread to finish pthread_join(*m_asynchronous_update_thread, NULL); // wait the thread to finish
pthread_mutex_lock(&m_events_mutex); m_events_to_process.lock();
pthread_mutex_lock(&m_protocols_mutex); pthread_mutex_lock(&m_protocols_mutex);
pthread_mutex_lock(&m_asynchronous_protocols_mutex); pthread_mutex_lock(&m_asynchronous_protocols_mutex);
pthread_mutex_lock(&m_requests_mutex); pthread_mutex_lock(&m_requests_mutex);
pthread_mutex_lock(&m_id_mutex); pthread_mutex_lock(&m_id_mutex);
for (unsigned int i = 0; i < m_protocols.size() ; i++) for (unsigned int i = 0; i < m_protocols.size() ; i++)
delete m_protocols[i].protocol; delete m_protocols[i].protocol;
for (unsigned int i = 0; i < m_events_to_process.size() ; i++) for (unsigned int i = 0; i < m_events_to_process.getData().size() ; i++)
delete m_events_to_process[i].event; delete m_events_to_process.getData()[i].event;
m_protocols.clear(); m_protocols.clear();
m_requests.clear(); m_requests.clear();
m_events_to_process.clear(); m_events_to_process.getData().clear();
pthread_mutex_unlock(&m_events_mutex); m_events_to_process.unlock();
pthread_mutex_unlock(&m_protocols_mutex); pthread_mutex_unlock(&m_protocols_mutex);
pthread_mutex_unlock(&m_asynchronous_protocols_mutex); pthread_mutex_unlock(&m_asynchronous_protocols_mutex);
pthread_mutex_unlock(&m_requests_mutex); pthread_mutex_unlock(&m_requests_mutex);
pthread_mutex_unlock(&m_id_mutex); pthread_mutex_unlock(&m_id_mutex);
pthread_mutex_destroy(&m_events_mutex);
pthread_mutex_destroy(&m_protocols_mutex); pthread_mutex_destroy(&m_protocols_mutex);
pthread_mutex_destroy(&m_asynchronous_protocols_mutex); pthread_mutex_destroy(&m_asynchronous_protocols_mutex);
pthread_mutex_destroy(&m_requests_mutex); pthread_mutex_destroy(&m_requests_mutex);
@ -101,7 +99,7 @@ void ProtocolManager::abort()
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
void ProtocolManager::notifyEvent(Event* event) void ProtocolManager::notifyEvent(Event* event)
{ {
pthread_mutex_lock(&m_events_mutex); m_events_to_process.lock();
Event* event2 = new Event(*event); Event* event2 = new Event(*event);
// register protocols that will receive this event // register protocols that will receive this event
std::vector<unsigned int> protocols_ids; std::vector<unsigned int> protocols_ids;
@ -148,13 +146,13 @@ void ProtocolManager::notifyEvent(Event* event)
epi.arrival_time = (double)StkTime::getTimeSinceEpoch(); epi.arrival_time = (double)StkTime::getTimeSinceEpoch();
epi.event = event2; epi.event = event2;
epi.protocols_ids = protocols_ids; epi.protocols_ids = protocols_ids;
m_events_to_process.push_back(epi); // add the event to the queue m_events_to_process.getData().push_back(epi); // add the event to the queue
} }
else else
Log::warn("ProtocolManager", Log::warn("ProtocolManager",
"Received an event for %d that has no destination protocol.", "Received an event for %d that has no destination protocol.",
searched_protocol); searched_protocol);
pthread_mutex_unlock(&m_events_mutex); m_events_to_process.lock();
} // notifyEvent } // notifyEvent
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
@ -386,20 +384,21 @@ bool ProtocolManager::propagateEvent(EventProcessingInfo* event, bool synchronou
void ProtocolManager::update() void ProtocolManager::update()
{ {
// before updating, notice protocols that they have received events // before updating, notice protocols that they have received events
pthread_mutex_lock(&m_events_mutex); // secure threads m_events_to_process.lock();
int size = (int)m_events_to_process.size(); int size = (int)m_events_to_process.getData().size();
int offset = 0; int offset = 0;
for (int i = 0; i < size; i++) for (int i = 0; i < size; i++)
{ {
bool result = propagateEvent(&m_events_to_process[i+offset], true); bool result = propagateEvent(&m_events_to_process.getData()[i+offset], true);
if (result) if (result)
{ {
m_events_to_process.erase(m_events_to_process.begin()+i+offset, m_events_to_process.getData()
m_events_to_process.begin()+i+offset+1); .erase(m_events_to_process.getData().begin()+i+offset,
m_events_to_process.getData().begin()+i+offset+1);
offset --; offset --;
} }
} }
pthread_mutex_unlock(&m_events_mutex); // release the mutex m_events_to_process.unlock();
// now update all protocols // now update all protocols
pthread_mutex_lock(&m_protocols_mutex); pthread_mutex_lock(&m_protocols_mutex);
for (unsigned int i = 0; i < m_protocols.size(); i++) for (unsigned int i = 0; i < m_protocols.size(); i++)
@ -414,20 +413,21 @@ void ProtocolManager::update()
void ProtocolManager::asynchronousUpdate() void ProtocolManager::asynchronousUpdate()
{ {
// before updating, notice protocols that they have received information // before updating, notice protocols that they have received information
pthread_mutex_lock(&m_events_mutex); // secure threads m_events_to_process.lock();
int size = (int)m_events_to_process.size(); int size = (int)m_events_to_process.getData().size();
int offset = 0; int offset = 0;
for (int i = 0; i < size; i++) for (int i = 0; i < size; i++)
{ {
bool result = propagateEvent(&m_events_to_process[i+offset], false); bool result = propagateEvent(&m_events_to_process.getData()[i+offset], false);
if (result) if (result)
{ {
m_events_to_process.erase(m_events_to_process.begin()+i+offset, m_events_to_process.getData()
m_events_to_process.begin()+i+offset+1); .erase(m_events_to_process.getData().begin()+i+offset,
m_events_to_process.getData().begin()+i+offset+1);
offset --; offset --;
} }
} }
pthread_mutex_unlock(&m_events_mutex); // release the mutex m_events_to_process.unlock();
// now update all protocols that need to be updated in asynchronous mode // now update all protocols that need to be updated in asynchronous mode
pthread_mutex_lock(&m_asynchronous_protocols_mutex); pthread_mutex_lock(&m_asynchronous_protocols_mutex);

View File

@ -28,6 +28,7 @@
#include "network/protocol.hpp" #include "network/protocol.hpp"
#include "utils/no_copy.hpp" #include "utils/no_copy.hpp"
#include "utils/singleton.hpp" #include "utils/singleton.hpp"
#include "utils/synchronised.hpp"
#include "utils/types.hpp" #include "utils/types.hpp"
#include <vector> #include <vector>
@ -289,19 +290,16 @@ class ProtocolManager : public AbstractSingleton<ProtocolManager>,
bool propagateEvent(EventProcessingInfo* event, bool synchronous); bool propagateEvent(EventProcessingInfo* event, bool synchronous);
// protected members // protected members
/*! /** Contains the running protocols.
* \brief Contains the running protocols. * This stores the protocols that are either running or paused, their
* This stores the protocols that are either running or paused, their
* state and their unique id. * state and their unique id.
*/ */
std::vector<ProtocolInfo> m_protocols; std::vector<ProtocolInfo> m_protocols;
/*!
* \brief Contains the network events to pass to protocols. /** Contains the network events to pass to protocols. */
*/ Synchronised<std::vector<EventProcessingInfo> > m_events_to_process;
std::vector<EventProcessingInfo> m_events_to_process;
/*! /** Contains the requests to start/stop etc... protocols. */
* \brief Contains the requests to start/stop etc... protocols.
*/
std::vector<ProtocolRequest> m_requests; std::vector<ProtocolRequest> m_requests;
/*! \brief The next id to assign to a protocol. /*! \brief The next id to assign to a protocol.
* This value is incremented by 1 each time a protocol is started. * This value is incremented by 1 each time a protocol is started.
@ -311,8 +309,6 @@ class ProtocolManager : public AbstractSingleton<ProtocolManager>,
uint32_t m_next_protocol_id; uint32_t m_next_protocol_id;
// mutexes: // mutexes:
/*! Used to ensure that the event queue is used thread-safely. */
pthread_mutex_t m_events_mutex;
/*! Used to ensure that the protocol vector is used thread-safely. */ /*! Used to ensure that the protocol vector is used thread-safely. */
pthread_mutex_t m_protocols_mutex; pthread_mutex_t m_protocols_mutex;
/*! Used to ensure that the protocol vector is used thread-safely. */ /*! Used to ensure that the protocol vector is used thread-safely. */