Get a copied list of protocols to prevent long time locking

This commit is contained in:
Benau 2019-05-12 15:43:11 +08:00
parent b0695b08a5
commit 0912758d0e
2 changed files with 46 additions and 92 deletions

View File

@ -141,7 +141,7 @@ ProtocolManager::~ProtocolManager()
// ----------------------------------------------------------------------------
void ProtocolManager::OneProtocolType::abort()
{
m_protocols.getData().clear();
m_protocols.clear();
} // OneProtocolType::abort
// ----------------------------------------------------------------------------
@ -283,11 +283,9 @@ void ProtocolManager::startProtocol(std::shared_ptr<Protocol> protocol)
{
assert(std::this_thread::get_id() == m_asynchronous_update_thread.get_id());
OneProtocolType &opt = m_all_protocols[protocol->getProtocolType()];
opt.lock();
opt.addProtocol(protocol);
protocol->setup();
protocol->setState(PROTOCOL_STATE_RUNNING);
opt.unlock();
Protocol* protocol_ptr = protocol.get();
Log::info("ProtocolManager",
"A %s protocol has been started.", typeid(*protocol_ptr).name());
@ -295,38 +293,6 @@ void ProtocolManager::startProtocol(std::shared_ptr<Protocol> protocol)
// setup the protocol and notify it that it's started
} // startProtocol
// ----------------------------------------------------------------------------
/** \brief Pauses a protocol.
* Pauses a protocol and tells it that it's being paused.
* \param protocol : Protocol to pause.
*/
void ProtocolManager::pauseProtocol(std::shared_ptr<Protocol> protocol)
{
assert(std::this_thread::get_id() == m_asynchronous_update_thread.get_id());
assert(protocol->getState() == PROTOCOL_STATE_RUNNING);
// We lock the protocol to avoid that paused() is called at the same
// time that the main thread delivers an event or calls update
m_all_protocols[protocol->getProtocolType()].lock();
protocol->setState(PROTOCOL_STATE_PAUSED);
protocol->paused();
m_all_protocols[protocol->getProtocolType()].unlock();
} // pauseProtocol
// ----------------------------------------------------------------------------
/** \brief Unpauses a protocol.
* Unpauses a protocol and notifies it.
* \param protocol : Protocol to unpause.
*/
void ProtocolManager::unpauseProtocol(std::shared_ptr<Protocol> protocol)
{
assert(std::this_thread::get_id() == m_asynchronous_update_thread.get_id());
assert(protocol->getState() == PROTOCOL_STATE_PAUSED);
// No lock necessary, since the protocol is paused, no other thread will
// be executing
protocol->setState(PROTOCOL_STATE_RUNNING);
protocol->unpaused();
} // unpauseProtocol
// ----------------------------------------------------------------------------
/** Removes a protocol from the list of protocols of a certain type.
* Note that the protocol is not deleted.
@ -334,9 +300,8 @@ void ProtocolManager::unpauseProtocol(std::shared_ptr<Protocol> protocol)
*/
void ProtocolManager::OneProtocolType::removeProtocol(std::shared_ptr<Protocol> p)
{
auto i = std::find(m_protocols.getData().begin(),
m_protocols.getData().end(), p);
if (i == m_protocols.getData().end())
auto i = std::find(m_protocols.begin(), m_protocols.end(), p);
if (i == m_protocols.end())
{
Protocol* protocol_ptr = p.get();
Log::error("ProtocolManager",
@ -345,7 +310,7 @@ void ProtocolManager::OneProtocolType::removeProtocol(std::shared_ptr<Protocol>
}
else
{
m_protocols.getData().erase(i);
m_protocols.erase(i);
}
} // deleteProtocol
@ -359,11 +324,7 @@ void ProtocolManager::terminateProtocol(std::shared_ptr<Protocol> protocol)
assert(std::this_thread::get_id() == m_asynchronous_update_thread.get_id());
OneProtocolType &opt = m_all_protocols[protocol->getProtocolType()];
// Be sure that noone accesses the protocols vector
// while the protocol is being removed.
opt.lock();
opt.removeProtocol(protocol);
opt.unlock();
protocol->setState(PROTOCOL_STATE_TERMINATED);
protocol->terminated();
Protocol* protocol_ptr = protocol.get();
@ -379,9 +340,9 @@ void ProtocolManager::terminateProtocol(std::shared_ptr<Protocol> protocol)
*/
void ProtocolManager::OneProtocolType::requestTerminateAll()
{
for (unsigned int i = 0; i < m_protocols.getData().size(); i++)
for (unsigned int i = 0; i < m_protocols.size(); i++)
{
m_protocols.getData()[i]->requestTerminate();
m_protocols[i]->requestTerminate();
}
} // requestTerminateAll
@ -409,21 +370,21 @@ void ProtocolManager::findAndTerminate(ProtocolType type)
*/
bool ProtocolManager::OneProtocolType::notifyEvent(Event *event)
{
if (m_protocols.getData().empty()) return false;
if (m_protocols.empty()) return false;
// Either all protocols of a certain type handle connects, or none.
// So we tet only one of them
if (event->getType() == EVENT_TYPE_CONNECTED &&
!m_protocols.getData()[0]->handleConnects()) return false;
!m_protocols[0]->handleConnects()) return false;
if (event->getType() == EVENT_TYPE_DISCONNECTED &&
!m_protocols.getData()[0]->handleDisconnects()) return false;
!m_protocols[0]->handleDisconnects()) return false;
bool can_be_deleted = false;
for (unsigned int i = 0; i < m_protocols.getData().size(); i++)
for (unsigned int i = 0; i < m_protocols.size(); i++)
{
bool done = event->isSynchronous()
? m_protocols.getData()[i]->notifyEvent(event)
: m_protocols.getData()[i]->notifyEventAsynchronous(event);
? m_protocols[i]->notifyEvent(event)
: m_protocols[i]->notifyEventAsynchronous(event);
can_be_deleted |= done;
}
return can_be_deleted;
@ -433,20 +394,21 @@ bool ProtocolManager::OneProtocolType::notifyEvent(Event *event)
/** Sends the event to the corresponding protocol. Returns true if the event
* can be ignored, or false otherwise.
*/
bool ProtocolManager::sendEvent(Event* event)
bool ProtocolManager::sendEvent(Event* event,
std::vector<OneProtocolType>& protocols)
{
bool can_be_deleted = false;
if (event->getType() == EVENT_TYPE_MESSAGE)
{
OneProtocolType &opt =
m_all_protocols.at(event->data().getProtocolType());
protocols.at(event->data().getProtocolType());
can_be_deleted = opt.notifyEvent(event);
}
else // connect or disconnect event --> test all protocols
{
for (unsigned int i = 0; i < m_all_protocols.size(); i++)
for (unsigned int i = 0; i < protocols.size(); i++)
{
can_be_deleted |= m_all_protocols.at(i).notifyEvent(event);
can_be_deleted |= protocols.at(i).notifyEvent(event);
}
}
const uint64_t TIME_TO_KEEP_EVENTS = 1000;
@ -462,12 +424,12 @@ bool ProtocolManager::sendEvent(Event* event)
*/
void ProtocolManager::OneProtocolType::update(int ticks, bool async)
{
for (unsigned int i = 0; i < m_protocols.getData().size(); i++)
for (unsigned int i = 0; i < m_protocols.size(); i++)
{
if (m_protocols.getData()[i]->getState() == PROTOCOL_STATE_RUNNING)
if (m_protocols[i]->getState() == PROTOCOL_STATE_RUNNING)
{
async ? m_protocols.getData()[i]->asynchronousUpdate()
: m_protocols.getData()[i]->update(ticks);
async ? m_protocols[i]->asynchronousUpdate()
: m_protocols[i]->update(ticks);
}
}
} // update
@ -487,6 +449,11 @@ void ProtocolManager::update(int ticks)
// Update from main thread only:
assert(std::this_thread::get_id() != m_asynchronous_update_thread.get_id());
// Get a copied of protocols to prevent long time locking;
std::unique_lock<std::mutex> ul(m_protocols_mutex);
auto all_protocols = m_all_protocols;
ul.unlock();
// before updating, notify protocols that they have received events
m_sync_events_to_process.lock();
EventList::iterator i = m_sync_events_to_process.getData().begin();
@ -497,7 +464,7 @@ void ProtocolManager::update(int ticks)
bool can_be_deleted = true;
try
{
can_be_deleted = sendEvent(*i);
can_be_deleted = sendEvent(*i, all_protocols);
}
catch (std::exception& e)
{
@ -521,12 +488,10 @@ void ProtocolManager::update(int ticks)
m_sync_events_to_process.unlock();
// Now update all protocols.
for (unsigned int i = 0; i < m_all_protocols.size(); i++)
for (unsigned int i = 0; i < all_protocols.size(); i++)
{
OneProtocolType &opt = m_all_protocols[i];
opt.lock();
OneProtocolType &opt = all_protocols[i];
opt.update(ticks, /*async*/false);
opt.unlock();
}
} // update
@ -550,11 +515,10 @@ void ProtocolManager::asynchronousUpdate()
{
m_async_events_to_process.unlock();
m_all_protocols[(*i)->getType()].lock();
bool result = true;
try
{
result = sendEvent(*i);
result = sendEvent(*i, m_all_protocols);
}
catch (std::exception& e)
{
@ -564,7 +528,6 @@ void ProtocolManager::asynchronousUpdate()
Log::error("ProtocolManager",
(*i)->data().getLogMessage().c_str());
}
m_all_protocols[(*i)->getType()].unlock();
m_async_events_to_process.lock();
if (result)
@ -613,20 +576,19 @@ void ProtocolManager::asynchronousUpdate()
m_requests.unlock();
// This is often used that terminating a protocol unpauses another,
// so the m_requests queue must not be locked while executing requests.
std::lock_guard<std::mutex> ul(m_protocols_mutex);
switch (request.getType())
{
case PROTOCOL_REQUEST_START:
startProtocol(request.getProtocol());
break;
case PROTOCOL_REQUEST_PAUSE:
pauseProtocol(request.getProtocol());
break;
case PROTOCOL_REQUEST_UNPAUSE:
unpauseProtocol(request.getProtocol());
break;
case PROTOCOL_REQUEST_TERMINATE:
terminateProtocol(request.getProtocol());
break;
// Unused
case PROTOCOL_REQUEST_PAUSE:
case PROTOCOL_REQUEST_UNPAUSE:
break;
} // switch (type)
m_requests.lock();
} // while m_requests.size()>0

View File

@ -144,7 +144,7 @@ private:
class OneProtocolType
{
private:
Synchronised< std::vector<std::shared_ptr<Protocol> > > m_protocols;
std::vector<std::shared_ptr<Protocol> > m_protocols;
public:
void removeProtocol(std::shared_ptr<Protocol> p);
void requestTerminateAll();
@ -154,16 +154,15 @@ private:
// --------------------------------------------------------------------
/** Returns the first protocol of a given type. It is assumed that
* there is a protocol of that type. */
std::shared_ptr<Protocol> getFirstProtocol()
{ return m_protocols.getData()[0]; }
std::shared_ptr<Protocol> getFirstProtocol() { return m_protocols[0]; }
// --------------------------------------------------------------------
/** Returns if this protocol class handles connect events. Protocols
* of the same class either all handle a connect event, or none, so
* only the first protocol is actually tested. */
bool handleConnects() const
{
return !m_protocols.getData().empty() &&
m_protocols.getData()[0]->handleConnects();
return !m_protocols.empty() &&
m_protocols[0]->handleConnects();
} // handleConnects
// --------------------------------------------------------------------
/** Returns if this protocol class handles disconnect events. Protocols
@ -171,23 +170,17 @@ private:
* only the first protocol is actually tested. */
bool handleDisconnects() const
{
return !m_protocols.getData().empty() &&
m_protocols.getData()[0]->handleDisconnects();
return !m_protocols.empty() &&
m_protocols[0]->handleDisconnects();
} // handleDisconnects
// --------------------------------------------------------------------
/** Locks access to this list of all protocols of a certain type. */
void lock() { m_protocols.lock(); }
// --------------------------------------------------------------------
/** Locks access to this list of all protocols of a certain type. */
void unlock() { m_protocols.unlock(); }
// --------------------------------------------------------------------
void addProtocol(std::shared_ptr<Protocol> p)
{
m_protocols.getData().push_back(p);
m_protocols.push_back(p);
} // addProtocol
// --------------------------------------------------------------------
/** Returns if there are no protocols of this type registered. */
bool isEmpty() const { return m_protocols.getData().empty(); }
bool isEmpty() const { return m_protocols.empty(); }
// --------------------------------------------------------------------
}; // class OneProtocolType
@ -224,20 +217,19 @@ private:
std::condition_variable m_game_protocol_cv;
std::mutex m_game_protocol_mutex;
std::mutex m_game_protocol_mutex, m_protocols_mutex;
EventList m_controller_events_list;
/*! Single instance of protocol manager.*/
static std::weak_ptr<ProtocolManager> m_protocol_manager;
bool sendEvent(Event* event);
bool sendEvent(Event* event,
std::vector<OneProtocolType>& protocols);
virtual void startProtocol(std::shared_ptr<Protocol> protocol);
virtual void terminateProtocol(std::shared_ptr<Protocol> protocol);
virtual void asynchronousUpdate();
virtual void pauseProtocol(std::shared_ptr<Protocol> protocol);
virtual void unpauseProtocol(std::shared_ptr<Protocol> protocol);
public:
// ===========================================