Allow start and terminate protocol directly
This commit is contained in:
parent
242f1ecfe2
commit
3573f64ae1
@ -131,15 +131,17 @@ ProtocolManager::~ProtocolManager()
|
||||
delete *i;
|
||||
m_controller_events_list.clear();
|
||||
|
||||
m_requests.lock();
|
||||
m_requests.getData().clear();
|
||||
m_requests.unlock();
|
||||
|
||||
} // ~ProtocolManager
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
void ProtocolManager::OneProtocolType::abort()
|
||||
{
|
||||
for (auto& p : m_protocols)
|
||||
{
|
||||
Protocol* protocol_ptr = p.get();
|
||||
Log::info("ProtocolManager", "A %s protocol has been terminated.",
|
||||
typeid(*protocol_ptr).name());
|
||||
}
|
||||
m_protocols.clear();
|
||||
} // OneProtocolType::abort
|
||||
|
||||
@ -194,60 +196,31 @@ void ProtocolManager::propagateEvent(Event* event)
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
/** \brief Asks the manager to start a protocol.
|
||||
* This function will store the request, and process it at a time when it is
|
||||
* thread-safe.
|
||||
* \param protocol : A pointer to the protocol to start
|
||||
* \return The unique id of the protocol that is being started.
|
||||
* Add the protocol to the protocols vector.
|
||||
* \param protocol : Protocol concerned.
|
||||
*/
|
||||
void ProtocolManager::requestStart(std::shared_ptr<Protocol> protocol)
|
||||
{
|
||||
// create the request
|
||||
ProtocolRequest req(PROTOCOL_REQUEST_START, protocol);
|
||||
// add it to the request stack
|
||||
m_requests.lock();
|
||||
m_requests.getData().push_back(req);
|
||||
m_requests.unlock();
|
||||
if (!protocol)
|
||||
return;
|
||||
std::lock_guard<std::mutex> lock(m_protocols_mutex);
|
||||
OneProtocolType &opt = m_all_protocols[protocol->getProtocolType()];
|
||||
opt.addProtocol(protocol);
|
||||
} // requestStart
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
/** \brief Notifies the manager that a protocol is terminated.
|
||||
* This function will store the request, and process it at a time it is
|
||||
* thread-safe.
|
||||
* \param protocol : A pointer to the protocol that is finished
|
||||
* Remove a protocol from the protocols vector.
|
||||
* \param protocol : Protocol concerned.
|
||||
*/
|
||||
void ProtocolManager::requestTerminate(std::shared_ptr<Protocol> protocol)
|
||||
{
|
||||
if (!protocol)
|
||||
return;
|
||||
// create the request
|
||||
ProtocolRequest req(PROTOCOL_REQUEST_TERMINATE, protocol);
|
||||
// add it to the request stack
|
||||
m_requests.lock();
|
||||
// check that the request does not already exist :
|
||||
for (unsigned int i = 0; i < m_requests.getData().size(); i++)
|
||||
{
|
||||
if (m_requests.getData()[i].m_protocol == protocol)
|
||||
{
|
||||
m_requests.unlock();
|
||||
return;
|
||||
}
|
||||
}
|
||||
m_requests.getData().push_back(req);
|
||||
m_requests.unlock();
|
||||
} // requestTerminate
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
/** \brief Starts a protocol.
|
||||
* Add the protocol info to the m_protocols vector.
|
||||
* \param protocol : ProtocolInfo to start.
|
||||
*/
|
||||
void ProtocolManager::startProtocol(std::shared_ptr<Protocol> protocol)
|
||||
{
|
||||
assert(std::this_thread::get_id() == m_asynchronous_update_thread.get_id());
|
||||
std::lock_guard<std::mutex> lock(m_protocols_mutex);
|
||||
OneProtocolType &opt = m_all_protocols[protocol->getProtocolType()];
|
||||
opt.addProtocol(protocol);
|
||||
} // startProtocol
|
||||
opt.removeProtocol(protocol);
|
||||
} // requestTerminate
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
void ProtocolManager::OneProtocolType::addProtocol(std::shared_ptr<Protocol> p)
|
||||
@ -292,50 +265,16 @@ void ProtocolManager::OneProtocolType::removeProtocol(std::shared_ptr<Protocol>
|
||||
}
|
||||
} // deleteProtocol
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
/** \brief Notes that a protocol is terminated.
|
||||
* Remove a protocol from the protocols vector.
|
||||
* \param protocol : Protocol concerned.
|
||||
*/
|
||||
void ProtocolManager::terminateProtocol(std::shared_ptr<Protocol> protocol)
|
||||
{
|
||||
assert(std::this_thread::get_id() == m_asynchronous_update_thread.get_id());
|
||||
|
||||
OneProtocolType &opt = m_all_protocols[protocol->getProtocolType()];
|
||||
opt.removeProtocol(protocol);
|
||||
} // terminateProtocol
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
/** Requests to terminate all protocols of the given protocol type.
|
||||
* This function must be called from the ProtocolManager thread in order
|
||||
* to avoid a race condition (only the ProtocolManager thread can change the
|
||||
* number of elements in that list).
|
||||
*/
|
||||
void ProtocolManager::OneProtocolType::requestTerminateAll()
|
||||
{
|
||||
for (unsigned int i = 0; i < m_protocols.size(); i++)
|
||||
{
|
||||
m_protocols[i]->requestTerminate();
|
||||
}
|
||||
} // requestTerminateAll
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
/** Finds a protocol with the given type and requests it to be terminated.
|
||||
* If no such protocol exist, log an error message.
|
||||
* This function must be called from the ProtocolManager thread in order
|
||||
* to avoid a race condition (only the ProtocolManager thread can change the
|
||||
* number of elements in that list).
|
||||
* If no such protocol exist it will do nothing
|
||||
* \param type The protocol type to delete.
|
||||
*/
|
||||
void ProtocolManager::findAndTerminate(ProtocolType type)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(m_protocols_mutex);
|
||||
OneProtocolType &opt = m_all_protocols[type];
|
||||
if (opt.isEmpty())
|
||||
{
|
||||
Log::debug("ProtocolManager", "findAndTerminate:"
|
||||
" No protocol %d registered, ignore.", type);
|
||||
}
|
||||
opt.requestTerminateAll();
|
||||
opt.abort();
|
||||
} // findAndTerminate
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
@ -484,6 +423,11 @@ void ProtocolManager::asynchronousUpdate()
|
||||
PROFILER_PUSH_CPU_MARKER("Message delivery", 255, 0, 0);
|
||||
// First deliver asynchronous messages for all protocols
|
||||
// =====================================================
|
||||
// Get a copied of protocols to prevent long time locking;
|
||||
std::unique_lock<std::mutex> ul(m_protocols_mutex);
|
||||
auto all_protocols = m_all_protocols;
|
||||
ul.unlock();
|
||||
|
||||
m_async_events_to_process.lock();
|
||||
EventList::iterator i = m_async_events_to_process.getData().begin();
|
||||
while (i != m_async_events_to_process.getData().end())
|
||||
@ -493,7 +437,7 @@ void ProtocolManager::asynchronousUpdate()
|
||||
bool result = true;
|
||||
try
|
||||
{
|
||||
result = sendEvent(*i, m_all_protocols);
|
||||
result = sendEvent(*i, all_protocols);
|
||||
}
|
||||
catch (std::exception& e)
|
||||
{
|
||||
@ -525,46 +469,13 @@ void ProtocolManager::asynchronousUpdate()
|
||||
// Second: update all running protocols
|
||||
// ====================================
|
||||
// Now update all protocols.
|
||||
for (unsigned int i = 0; i < m_all_protocols.size(); i++)
|
||||
for (unsigned int i = 0; i < all_protocols.size(); i++)
|
||||
{
|
||||
OneProtocolType &opt = m_all_protocols[i];
|
||||
// We don't need lock here because it can hang the GUI when connecting
|
||||
// to or creating server, since this function is only called from
|
||||
// the ProtocolManager thread, and this thread is also
|
||||
// the only one who changes the number of protocols.
|
||||
// But you need to make sure async and non-async
|
||||
// update in each protocol will have atomic or mutex-protected write
|
||||
OneProtocolType &opt = all_protocols[i];
|
||||
opt.update(0, /*async*/true); // ticks does not matter, so set it to 0
|
||||
}
|
||||
|
||||
PROFILER_POP_CPU_MARKER();
|
||||
PROFILER_PUSH_CPU_MARKER("Process events", 0, 255, 0);
|
||||
|
||||
// Process queued events (start, pause, ...) for protocols asynchronously
|
||||
// ======================================================================
|
||||
m_requests.lock();
|
||||
while(m_requests.getData().size()>0)
|
||||
{
|
||||
ProtocolRequest request = m_requests.getData()[0];
|
||||
m_requests.getData().erase(m_requests.getData().begin());
|
||||
// Make sure new requests can be queued up while handling requests.
|
||||
m_requests.unlock();
|
||||
// This is often used that terminating a protocol unpauses another,
|
||||
// so the m_requests queue must not be locked while executing requests.
|
||||
std::lock_guard<std::mutex> lock(m_protocols_mutex);
|
||||
switch (request.getType())
|
||||
{
|
||||
case PROTOCOL_REQUEST_START:
|
||||
startProtocol(request.getProtocol());
|
||||
break;
|
||||
case PROTOCOL_REQUEST_TERMINATE:
|
||||
terminateProtocol(request.getProtocol());
|
||||
break;
|
||||
} // switch (type)
|
||||
m_requests.lock();
|
||||
} // while m_requests.size()>0
|
||||
m_requests.unlock();
|
||||
PROFILER_POP_CPU_MARKER();
|
||||
} // asynchronousUpdate
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
|
@ -42,46 +42,6 @@
|
||||
class Event;
|
||||
class STKPeer;
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
/** \enum ProtocolRequestType
|
||||
* \brief Defines actions that can be done about protocols.
|
||||
* This enum is used essentially to keep the manager thread-safe and
|
||||
* to avoid protocols modifying directly their state.
|
||||
*/
|
||||
enum ProtocolRequestType
|
||||
{
|
||||
PROTOCOL_REQUEST_START, //!< Start a protocol
|
||||
PROTOCOL_REQUEST_TERMINATE //!< Terminate a protocol
|
||||
}; // ProtocolRequestType
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
/** \struct ProtocolRequest
|
||||
* \brief Represents a request to do an action about a protocol, e.g. to
|
||||
* start, pause, unpause or terminate a protocol.
|
||||
*/
|
||||
class ProtocolRequest
|
||||
{
|
||||
public:
|
||||
/** The type of request. */
|
||||
ProtocolRequestType m_type;
|
||||
|
||||
/** The concerned protocol information. */
|
||||
std::shared_ptr<Protocol> m_protocol;
|
||||
|
||||
public:
|
||||
ProtocolRequest(ProtocolRequestType type, std::shared_ptr<Protocol> protocol)
|
||||
{
|
||||
m_type = type;
|
||||
m_protocol = protocol;
|
||||
} // ProtocolRequest
|
||||
// ------------------------------------------------------------------------
|
||||
/** Returns the request type. */
|
||||
ProtocolRequestType getType() const { return m_type; }
|
||||
// ------------------------------------------------------------------------
|
||||
/** Returns the protocol for this request. */
|
||||
std::shared_ptr<Protocol> getProtocol() { return m_protocol; }
|
||||
}; // class ProtocolRequest;
|
||||
|
||||
// ============================================================================
|
||||
/** \class ProtocolManager
|
||||
* \brief Manages the protocols at runtime.
|
||||
@ -146,7 +106,6 @@ private:
|
||||
std::vector<std::shared_ptr<Protocol> > m_protocols;
|
||||
public:
|
||||
void removeProtocol(std::shared_ptr<Protocol> p);
|
||||
void requestTerminateAll();
|
||||
bool notifyEvent(Event *event);
|
||||
void update(int ticks, bool async);
|
||||
void abort();
|
||||
@ -198,9 +157,6 @@ private:
|
||||
* (i.e. from the separate ProtocolManager thread). */
|
||||
Synchronised<EventList> m_async_events_to_process;
|
||||
|
||||
/** Contains the requests to start/pause etc... protocols. */
|
||||
Synchronised< std::vector<ProtocolRequest> > m_requests;
|
||||
|
||||
/** When set to true, the main thread will exit. */
|
||||
std::atomic_bool m_exit;
|
||||
|
||||
@ -223,15 +179,13 @@ private:
|
||||
bool sendEvent(Event* event,
|
||||
std::array<OneProtocolType, PROTOCOL_MAX>& protocols);
|
||||
|
||||
virtual void startProtocol(std::shared_ptr<Protocol> protocol);
|
||||
virtual void terminateProtocol(std::shared_ptr<Protocol> protocol);
|
||||
virtual void asynchronousUpdate();
|
||||
void asynchronousUpdate();
|
||||
|
||||
public:
|
||||
// ===========================================
|
||||
// Public constructor is required for shared_ptr
|
||||
ProtocolManager();
|
||||
virtual ~ProtocolManager();
|
||||
~ProtocolManager();
|
||||
void abort();
|
||||
void propagateEvent(Event* event);
|
||||
std::shared_ptr<Protocol> getProtocol(ProtocolType type);
|
||||
|
Loading…
x
Reference in New Issue
Block a user