Protocol data structure uses now finer grained locks, avoiding that

asynchronous updates or event delivery can be delayed by synchronous
updates/event deliveries to a different protocol.
This commit is contained in:
hiker
2017-02-15 09:57:38 +11:00
parent efeea8eeb2
commit 17751631f1
7 changed files with 330 additions and 265 deletions

View File

@@ -37,7 +37,6 @@ Protocol::Protocol(ProtocolType type, CallbackObject* callback_object)
m_callback_object = callback_object;
m_type = type;
m_state = PROTOCOL_STATE_INITIALISING;
m_id = 0;
m_handle_connections = false;
m_handle_disconnections = false;
} // Protocol
@@ -107,20 +106,6 @@ void Protocol::requestTerminate()
ProtocolManager::getInstance()->requestTerminate(this);
} // requestTerminate
// ----------------------------------------------------------------------------
/** Finds a protocol with the given type and requests it to be terminated.
* If no such protocol exist, log an error message.
* \param type The protocol type to delete.
*/
void Protocol::findAndTerminateProtocol(ProtocolType type)
{
Protocol* protocol = ProtocolManager::getInstance()->getProtocol(type);
if (protocol)
protocol->requestTerminate();
else
Log::error("Protocol", "No protocol %d registered.", type);
} // findAndTerminateProtocol
// ----------------------------------------------------------------------------
/** Sends a message to all peers, inserting the peer's token into the message.
* The message is composed of a 1-byte message (usually the message type)

View File

@@ -45,12 +45,13 @@ enum ProtocolType
PROTOCOL_CONNECTION = 0x01, //!< Protocol that deals with client-server connection.
PROTOCOL_LOBBY_ROOM = 0x02, //!< Protocol that is used during the lobby room phase.
PROTOCOL_START_GAME = 0x03, //!< Protocol used when starting the game.
PROTOCOL_SYNCHRONIZATION = 0x04, //!<Protocol used to synchronize clocks.
PROTOCOL_SYNCHRONIZATION = 0x04, //!< Protocol used to determine latency
PROTOCOL_KART_UPDATE = 0x05, //!< Protocol to update karts position, rotation etc...
PROTOCOL_GAME_EVENTS = 0x06, //!< Protocol to communicate the game events.
PROTOCOL_CONTROLLER_EVENTS = 0x07, //!< Protocol to transfer controller modifications
PROTOCOL_SILENT = 0x08, //!< Used for protocols that do not subscribe to any network event.
PROTOCOL_MAX , //!< Maximum number of different protocol types
PROTOCOL_SYNCHRONOUS = 0x80, //!< Flag, indicates synchronous delivery
PROTOCOL_SILENT = 0xff //!< Used for protocols that do not subscribe to any network event.
}; // ProtocolType
// ----------------------------------------------------------------------------
@@ -102,9 +103,6 @@ protected:
/** The state this protocol is in (e.g. running, paused, ...). */
ProtocolState m_state;
/** The unique id of the protocol. */
uint32_t m_id;
/** True if this protocol should receive connection events. */
bool m_handle_connections;
@@ -138,7 +136,6 @@ public:
void requestPause();
void requestUnpause();
void requestTerminate();
void findAndTerminateProtocol(ProtocolType type);
// ------------------------------------------------------------------------
/** \brief Called when the protocol is paused (by an other entity or by
@@ -163,12 +160,6 @@ public:
/** Sets the current protocol state. */
void setState(ProtocolState s) { m_state = s; }
// ------------------------------------------------------------------------
/** Returns the unique protocol ID. */
uint32_t getId() const { return m_id; }
// ------------------------------------------------------------------------
/** Sets the unique protocol id. */
void setId(uint32_t id) { m_id = id; }
// ------------------------------------------------------------------------
/** \brief Notify a protocol matching the Event type of that event.
* \param event : Pointer to the event.
* \return True if the event has been treated, false otherwise. */

View File

@@ -34,12 +34,11 @@
ProtocolManager::ProtocolManager()
{
pthread_mutex_init(&m_asynchronous_protocols_mutex, NULL);
m_exit.setAtomic(false);
m_next_protocol_id.setAtomic(0);
m_asynchronous_update_thread = (pthread_t*)(malloc(sizeof(pthread_t)));
pthread_create(m_asynchronous_update_thread, NULL,
m_all_protocols.resize(PROTOCOL_MAX);
pthread_create(&m_asynchronous_update_thread, NULL,
ProtocolManager::mainLoop, this);
} // ProtocolManager
@@ -64,37 +63,46 @@ ProtocolManager::~ProtocolManager()
{
} // ~ProtocolManager
// ----------------------------------------------------------------------------
void ProtocolManager::OneProtocolType::abort()
{
for (unsigned int i = 0; i < m_protocols.getData().size(); i++)
delete m_protocols.getData()[i];
m_protocols.getData().clear();
} // OneProtocolType::abort
// ----------------------------------------------------------------------------
/** \brief Stops the protocol manager.
*/
void ProtocolManager::abort()
{
m_exit.setAtomic(true);
pthread_mutex_lock(&m_asynchronous_protocols_mutex);
pthread_join(m_asynchronous_update_thread, NULL); // wait the thread to finish
m_protocols.lock();
for (unsigned int i = 0; i < m_protocols.getData().size() ; i++)
delete m_protocols.getData()[i];
m_protocols.getData().clear();
m_protocols.unlock();
// Now only this main thread is active, no more need for locks
for (unsigned int i = 0; i < m_all_protocols.size(); i++)
{
m_all_protocols[i].abort();
}
m_events_to_process.lock();
EventList::iterator i;
for (EventList::iterator i =m_events_to_process.getData().begin();
i!=m_events_to_process.getData().end(); ++i)
m_sync_events_to_process.lock();
for (EventList::iterator i =m_sync_events_to_process.getData().begin();
i!=m_sync_events_to_process.getData().end(); ++i)
delete *i;
m_events_to_process.getData().clear();
m_events_to_process.unlock();
m_sync_events_to_process.getData().clear();
m_sync_events_to_process.unlock();
m_async_events_to_process.lock();
for (EventList::iterator i = m_async_events_to_process.getData().begin();
i!= m_async_events_to_process.getData().end(); ++i)
delete *i;
m_async_events_to_process.getData().clear();
m_async_events_to_process.unlock();
m_requests.lock();
m_requests.getData().clear();
m_requests.unlock();
pthread_mutex_unlock(&m_asynchronous_protocols_mutex);
pthread_mutex_destroy(&m_asynchronous_protocols_mutex);
pthread_join(*m_asynchronous_update_thread, NULL); // wait the thread to finish
} // abort
// ----------------------------------------------------------------------------
@@ -104,31 +112,36 @@ void ProtocolManager::abort()
*/
void ProtocolManager::propagateEvent(Event* event)
{
m_events_to_process.lock();
m_events_to_process.getData().push_back(event);
m_events_to_process.unlock();
if (event->isSynchronous())
{
m_sync_events_to_process.lock();
m_sync_events_to_process.getData().push_back(event);
m_sync_events_to_process.unlock();
}
else
{
m_async_events_to_process.lock();
m_async_events_to_process.getData().push_back(event);
m_async_events_to_process.unlock();
}
return;
} // propagateEvent
// ----------------------------------------------------------------------------
/** \brief Asks the manager to start a protocol.
* This function will store the request, and process it at a time it is
* This function will store the request, and process it at a time when it is
* thread-safe.
* \param protocol : A pointer to the protocol to start
* \return The unique id of the protocol that is being started.
*/
uint32_t ProtocolManager::requestStart(Protocol* protocol)
void ProtocolManager::requestStart(Protocol* protocol)
{
// assign a unique id to the protocol.
protocol->setId(getNextProtocolId());
// create the request
ProtocolRequest req(PROTOCOL_REQUEST_START, protocol);
// add it to the request stack
m_requests.lock();
m_requests.getData().push_back(req);
m_requests.unlock();
return req.getProtocol()->getId();
} // requestStart
// ----------------------------------------------------------------------------
@@ -195,25 +208,24 @@ void ProtocolManager::requestTerminate(Protocol* protocol)
} // requestTerminate
// ----------------------------------------------------------------------------
/** \brief Starts a protocol.
* Add the protocol info to the m_protocols vector.
* \param protocol : ProtocolInfo to start.
*/
void ProtocolManager::startProtocol(Protocol *protocol)
{
// add the protocol to the protocol vector so that it's updated
m_protocols.lock();
pthread_mutex_lock(&m_asynchronous_protocols_mutex);
Log::info("ProtocolManager",
"A %s protocol with id=%u has been started. There are %ld protocols running.",
typeid(*protocol).name(), protocol->getId(),
m_protocols.getData().size()+1);
m_protocols.getData().push_back(protocol);
// setup the protocol and notify it that it's started
assert(pthread_equal(pthread_self(), m_asynchronous_update_thread));
OneProtocolType &opt = m_all_protocols[protocol->getProtocolType()];
opt.lock();
opt.addProtocol(protocol);
protocol->setup();
protocol->setState(PROTOCOL_STATE_RUNNING);
m_protocols.unlock();
pthread_mutex_unlock(&m_asynchronous_protocols_mutex);
opt.unlock();
Log::info("ProtocolManager",
"A %s protocol has been started.", typeid(*protocol).name());
// setup the protocol and notify it that it's started
} // startProtocol
// ----------------------------------------------------------------------------
@@ -223,9 +235,14 @@ void ProtocolManager::startProtocol(Protocol *protocol)
*/
void ProtocolManager::pauseProtocol(Protocol *protocol)
{
assert(pthread_equal(pthread_self(), m_asynchronous_update_thread));
assert(protocol->getState() == PROTOCOL_STATE_RUNNING);
// We lock the protocol to avoid that paused() is called at the same
// time that the main thread delivers an event or calls update
m_all_protocols[protocol->getProtocolType()].lock();
protocol->setState(PROTOCOL_STATE_PAUSED);
protocol->paused();
m_all_protocols[protocol->getProtocolType()].unlock();
} // pauseProtocol
// ----------------------------------------------------------------------------
@@ -235,11 +252,36 @@ void ProtocolManager::pauseProtocol(Protocol *protocol)
*/
void ProtocolManager::unpauseProtocol(Protocol *protocol)
{
assert(pthread_equal(pthread_self(), m_asynchronous_update_thread));
assert(protocol->getState() == PROTOCOL_STATE_PAUSED);
// No lock necessary, since the protocol is paused, no other thread will
// be executing
protocol->setState(PROTOCOL_STATE_RUNNING);
protocol->unpaused();
} // unpauseProtocol
// ----------------------------------------------------------------------------
/** Removes a protocol from the list of protocols of a certain type.
* Note that the protocol is not deleted.
* \param p The protocol to be removed.
*/
void ProtocolManager::OneProtocolType::removeProtocol(Protocol *p)
{
std::vector<Protocol*>::iterator i =
std::find(m_protocols.getData().begin(),
m_protocols.getData().end(), p);
if (i == m_protocols.getData().end())
{
Log::error("ProtocolManager",
"Trying to delete protocol '%s', which was not found",
typeid(*p).name());
}
else
{
m_protocols.getData().erase(i);
}
} // deleteProtocol
// ----------------------------------------------------------------------------
/** \brief Notes that a protocol is terminated.
* Remove a protocol from the protocols vector.
@@ -247,68 +289,116 @@ void ProtocolManager::unpauseProtocol(Protocol *protocol)
*/
void ProtocolManager::terminateProtocol(Protocol *protocol)
{
// Be sure that noone accesses the protocols vector while we erase a protocol
m_protocols.lock();
pthread_mutex_lock(&m_asynchronous_protocols_mutex);
int offset = 0;
std::string protocol_type = typeid(*protocol).name();
for (unsigned int i = 0; i < m_protocols.getData().size(); i++)
{
if (m_protocols.getData()[i-offset] == protocol)
{
protocol->setState(PROTOCOL_STATE_TERMINATED);
m_protocols.getData().erase(m_protocols.getData().begin()+(i-offset),
m_protocols.getData().begin()+(i-offset)+1);
offset++;
}
}
Log::info("ProtocolManager",
"A %s protocol has been terminated. There are %ld protocols running.",
protocol_type.c_str(), m_protocols.getData().size());
pthread_mutex_unlock(&m_asynchronous_protocols_mutex);
m_protocols.unlock();
assert(pthread_equal(pthread_self(), m_asynchronous_update_thread));
OneProtocolType &opt = m_all_protocols[protocol->getProtocolType()];
// Be sure that noone accesses the protocols vector
// while the protocol is being removed.
opt.lock();
opt.removeProtocol(protocol);
opt.unlock();
protocol->setState(PROTOCOL_STATE_TERMINATED);
protocol->terminated();
} // terminateProtocol
// ----------------------------------------------------------------------------
/** Requests to terminate all protocols of the given protocol type.
* This function must be called from the ProtocolManager thread in order
* to avoid a race condition (only the ProtocolManager thread can change the
* number of elements in that list).
*/
void ProtocolManager::OneProtocolType::requestTerminateAll()
{
for (unsigned int i = 0; i < m_protocols.getData().size(); i++)
{
m_protocols.getData()[i]->requestTerminate();
}
} // requestTerminateAll
// ----------------------------------------------------------------------------
/** Finds a protocol with the given type and requests it to be terminated.
* If no such protocol exist, log an error message.
* \param type The protocol type to delete.
*/
void ProtocolManager::findAndTerminate(ProtocolType type)
{
OneProtocolType &opt = m_all_protocols[type];
if (opt.isEmpty())
Log::error("ProtocolManager",
"findAndTerminate: No protocol %d registered.", type);
opt.requestTerminateAll();
} // findAndTerminate
// ----------------------------------------------------------------------------
/** Calls either notifyEvent(event) or notifyEventAsynchronous(evet) on all
* protocols. Note that no locking is done, it is the responsibility of the
* caller to avoid race conditions.
* \param event The event to deliver to the protocols.
*/
bool ProtocolManager::OneProtocolType::notifyEvent(Event *event)
{
if (m_protocols.getData().empty()) return false;
// Either all protocols of a certain type handle connects, or none.
// So we tet only one of them
if (event->getType() == EVENT_TYPE_CONNECTED &&
!m_protocols.getData()[0]->handleConnects()) return false;
if (event->getType() == EVENT_TYPE_DISCONNECTED &&
!m_protocols.getData()[0]->handleDisconnects()) return false;
bool can_be_deleted = false;
for (unsigned int i = 0; i < m_protocols.getData().size(); i++)
{
bool done = event->isSynchronous()
? m_protocols.getData()[i]->notifyEvent(event)
: m_protocols.getData()[i]->notifyEventAsynchronous(event);
can_be_deleted |= done;
}
return can_be_deleted;
} // notifyEvent
// ----------------------------------------------------------------------------
/** Sends the event to the corresponding protocol. Returns true if the event
* can be ignored, or false otherwise.
*/
bool ProtocolManager::sendEvent(Event* event)
{
m_protocols.lock();
int count=0;
for(unsigned int i=0; i<m_protocols.getData().size(); i++)
bool can_be_deleted = false;
if (event->getType() == EVENT_TYPE_MESSAGE)
{
Protocol *p = m_protocols.getData()[i];
bool is_right_protocol = false;
switch(event->getType())
OneProtocolType &opt = m_all_protocols[event->data().getProtocolType()];
can_be_deleted = opt.notifyEvent(event);
}
else // connect or disconnect event --> test all protocols
{
for (unsigned int i = 0; i < m_all_protocols.size(); i++)
{
case EVENT_TYPE_MESSAGE:
is_right_protocol = event->data().getProtocolType()==p->getProtocolType();
break;
case EVENT_TYPE_DISCONNECTED:
is_right_protocol = p->handleDisconnects();
break;
case EVENT_TYPE_CONNECTED:
is_right_protocol = p->handleConnects();
break;
} // switch event->getType()
if( is_right_protocol)
{
count ++;
event->isSynchronous() ? p->notifyEvent(event)
: p->notifyEventAsynchronous(event);
can_be_deleted |= m_all_protocols[i].notifyEvent(event);
}
} // for i in protocols
m_protocols.unlock();
return (count > 0 || StkTime::getTimeSinceEpoch() - event->getArrivalTime()
>= TIME_TO_KEEP_EVENTS );
}
return can_be_deleted || StkTime::getTimeSinceEpoch() - event->getArrivalTime()
>= TIME_TO_KEEP_EVENTS;
} // sendEvent
// ----------------------------------------------------------------------------
/** Calls either the synchronous update or asynchronous update function in all
* protocols of this type.
* \param dt Time step size.
* \param async True if asynchronousUpdate() should be called.
*/
void ProtocolManager::OneProtocolType::update(float dt, bool async)
{
for (unsigned int i = 0; i < m_protocols.getData().size(); i++)
{
if (m_protocols.getData()[i]->getState() == PROTOCOL_STATE_RUNNING)
{
async ? m_protocols.getData()[i]->asynchronousUpdate()
: m_protocols.getData()[i]->update(dt);
}
}
} // update
// ----------------------------------------------------------------------------
/** \brief Updates the manager.
*
@@ -321,25 +411,22 @@ bool ProtocolManager::sendEvent(Event* event)
*/
void ProtocolManager::update(float dt)
{
// before updating, notify protocols that they have received events
m_events_to_process.lock();
EventList::iterator i = m_events_to_process.getData().begin();
// Update from main thread only:
assert(!pthread_equal(pthread_self(), m_asynchronous_update_thread));
while (i != m_events_to_process.getData().end())
// before updating, notify protocols that they have received events
m_sync_events_to_process.lock();
EventList::iterator i = m_sync_events_to_process.getData().begin();
while (i != m_sync_events_to_process.getData().end())
{
// Don't handle asynchronous events here
if (!(*i)->isSynchronous())
{
++i;
continue;
}
m_events_to_process.unlock();
bool result = sendEvent(*i);
m_events_to_process.lock();
if (result)
m_sync_events_to_process.unlock();
bool can_be_deleted = sendEvent(*i);
m_sync_events_to_process.lock();
if (can_be_deleted)
{
delete *i;
i = m_events_to_process.getData().erase(i);
i = m_sync_events_to_process.getData().erase(i);
}
else
{
@@ -347,16 +434,16 @@ void ProtocolManager::update(float dt)
++i;
}
}
m_events_to_process.unlock();
m_sync_events_to_process.unlock();
// now update all protocols
m_protocols.lock();
for (unsigned int i = 0; i < m_protocols.getData().size(); i++)
// Now update all protocols.
for (unsigned int i = 0; i < m_all_protocols.size(); i++)
{
if (m_protocols.getData()[i]->getState() == PROTOCOL_STATE_RUNNING)
m_protocols.getData()[i]->update(dt);
OneProtocolType &opt = m_all_protocols[i];
opt.lock();
opt.update(dt, /*async*/false);
opt.unlock();
}
m_protocols.unlock();
} // update
// ----------------------------------------------------------------------------
@@ -370,53 +457,59 @@ void ProtocolManager::update(float dt)
*/
void ProtocolManager::asynchronousUpdate()
{
// before updating, notice protocols that they have received information
m_events_to_process.lock();
int size = (int)m_events_to_process.getData().size();
EventList::iterator i = m_events_to_process.getData().begin();
while (i != m_events_to_process.getData().end())
// Update from ProtocolManager thread only:
assert(pthread_equal(pthread_self(), m_asynchronous_update_thread));
// First deliver asynchronous messages for all protocols
// =====================================================
m_async_events_to_process.lock();
EventList::iterator i = m_async_events_to_process.getData().begin();
while (i != m_async_events_to_process.getData().end())
{
// Don't handle synchronous events here.
if ((*i)->isSynchronous())
{
++i;
continue;
}
m_events_to_process.unlock();
m_async_events_to_process.unlock();
m_all_protocols[(*i)->getType()].lock();
bool result = sendEvent(*i);
m_events_to_process.lock();
m_all_protocols[(*i)->getType()].unlock();
m_async_events_to_process.lock();
if (result)
{
delete *i;
i = m_events_to_process.getData().erase(i);
i = m_async_events_to_process.getData().erase(i);
}
else
{
// This should only happen if the protocol has not been started
// or already terminated (e.g. late ping answer)
++i;
}
} // while i != m_events_to_process.end()
m_events_to_process.unlock();
m_async_events_to_process.unlock();
// now update all protocols that need to be updated in asynchronous mode
pthread_mutex_lock(&m_asynchronous_protocols_mutex);
// FIXME: does m_protocols need to be locked???
for (unsigned int i = 0; i < m_protocols.getData().size(); i++)
// Second: update all running protocols
// ====================================
// Now update all protocols.
for (unsigned int i = 0; i < m_all_protocols.size(); i++)
{
if (m_protocols.getData()[i]->getState() == PROTOCOL_STATE_RUNNING)
m_protocols.getData()[i]->asynchronousUpdate();
OneProtocolType &opt = m_all_protocols[i];
// The lock is likely not necessary, since this function is only
// called from the ProtocolManager thread, and this thread is also
// the only one who changes the number of protocols.
opt.lock();
opt.update(0, /*async*/true); // dt does not matter, so set it to 0
opt.unlock();
}
pthread_mutex_unlock(&m_asynchronous_protocols_mutex);
// Process queued events for protocols
// these requests are asynchronous
// Process queued events (start, pause, ...) for protocols asynchronously
// ======================================================================
m_requests.lock();
while(m_requests.getData().size()>0)
{
ProtocolRequest request = m_requests.getData()[0];
m_requests.getData().erase(m_requests.getData().begin());
m_requests.unlock();
// Make sure new requests can be queued up while handling requests.
m_requests.unlock();
// This is often used that terminating a protocol unpauses another,
// so the m_requests queue must not be locked while executing requests.
switch (request.getType())
@@ -439,22 +532,6 @@ void ProtocolManager::asynchronousUpdate()
m_requests.unlock();
} // asynchronousUpdate
// ----------------------------------------------------------------------------
/** \brief Get a protocol using its id.
* \param id : Unique ID of the seek protocol.
* \return The protocol that has the ID id.
*/
Protocol* ProtocolManager::getProtocol(uint32_t id)
{
// FIXME: does m_protocols need to be locked??
for (unsigned int i = 0; i < m_protocols.getData().size(); i++)
{
if (m_protocols.getData()[i]->getId() == id)
return m_protocols.getData()[i];
}
return NULL;
} // getProtocol
// ----------------------------------------------------------------------------
/** \brief Get a protocol using its type.
* \param type : The type of the protocol.
@@ -462,29 +539,8 @@ Protocol* ProtocolManager::getProtocol(uint32_t id)
*/
Protocol* ProtocolManager::getProtocol(ProtocolType type)
{
// FIXME: Does m_protocols need to be locked?
for (unsigned int i = 0; i < m_protocols.getData().size(); i++)
{
if (m_protocols.getData()[i]->getProtocolType() == type)
return m_protocols.getData()[i];
}
return NULL;
} // getProtocol
// ----------------------------------------------------------------------------
/** \brief Assign an id to a protocol.
* This function will assign m_next_protocol_id as the protocol id.
* This id starts at 0 at the beginning and is increased by 1 each time
* a protocol starts.
* \param protocol_info : The protocol info that needs an id.
*/
uint32_t ProtocolManager::getNextProtocolId()
{
m_next_protocol_id.lock();
uint32_t id = m_next_protocol_id.getData();
m_next_protocol_id.getData()++;
m_next_protocol_id.unlock();
return id;
} // getNextProtocolId
OneProtocolType &opt = m_all_protocols[type];
if (opt.isEmpty()) return NULL;
return opt.getFirstProtocol();
} // getProtocol

View File

@@ -54,7 +54,8 @@ enum ProtocolRequestType
// ----------------------------------------------------------------------------
/** \struct ProtocolRequest
* \brief Represents a request to do an action about a protocol.
* \brief Represents a request to do an action about a protocol, e.g. to
* start, pause, unpause or terminate a protocol.
*/
class ProtocolRequest
{
@@ -97,41 +98,89 @@ class ProtocolManager : public AbstractSingleton<ProtocolManager>,
friend class AbstractSingleton<ProtocolManager>;
private:
/** Contains the running protocols.
* This stores the protocols that are either running or paused, their
* state and their unique id. */
Synchronised<std::vector<Protocol*> >m_protocols;
/** A simple class that stores all protocols of a certain type. While
* many protocols have at most one instance running, some (e.g.
* GetPublicAddress, ConntectToPeer, ...) can have several instances
* active at the same time. */
class OneProtocolType
{
private:
Synchronised< std::vector<Protocol*> > m_protocols;
public:
void removeProtocol(Protocol *p);
void requestTerminateAll();
bool notifyEvent(Event *event);
void update(float dt, bool async);
void abort();
// --------------------------------------------------------------------
/** Returns the first protocol of a given type. It is assumed that
* there is a protocol of that type. */
Protocol *getFirstProtocol() { return m_protocols.getData()[0]; }
// --------------------------------------------------------------------
/** Returns if this protocol class handles connect events. Protocols
* of the same class either all handle a connect event, or none, so
* only the first protocol is actually tested. */
bool handleConnects() const
{
return !m_protocols.getData().empty() &&
m_protocols.getData()[0]->handleConnects();
} // handleConnects
// --------------------------------------------------------------------
/** Returns if this protocol class handles disconnect events. Protocols
* of the same class either all handle a disconnect event, or none, so
* only the first protocol is actually tested. */
bool handleDisconnects() const
{
return !m_protocols.getData().empty() &&
m_protocols.getData()[0]->handleDisconnects();
} // handleDisconnects
// --------------------------------------------------------------------
/** Locks access to this list of all protocols of a certain type. */
void lock() { m_protocols.lock(); }
// --------------------------------------------------------------------
/** Locks access to this list of all protocols of a certain type. */
void unlock() { m_protocols.unlock(); }
// --------------------------------------------------------------------
void addProtocol(Protocol *p)
{
m_protocols.getData().push_back(p);
} // addProtocol
// --------------------------------------------------------------------
/** Returns if there are no protocols of this type registered. */
bool isEmpty() const { return m_protocols.getData().empty(); }
// --------------------------------------------------------------------
}; // class OneProtocolType
// ------------------------------------------------------------------------
/** The list of all protocol types, each one containing a (potentially
* empty) list of protocols. */
std::vector<OneProtocolType> m_all_protocols;
/** A list of network events - messages, disconnect and disconnects. */
typedef std::list<Event*> EventList;
/** Contains the network events to pass synchronously to protocols
* (i.e. from the main thread). */
Synchronised<EventList> m_sync_events_to_process;
/** Contains the network events to pass asynchronously to protocols
* (i.e. from the separate ProtocolManager thread). */
Synchronised<EventList> m_events_to_process;
* (i.e. from the separate ProtocolManager thread). */
Synchronised<EventList> m_async_events_to_process;
/** Contains the requests to start/pause etc... protocols. */
Synchronised< std::vector<ProtocolRequest> > m_requests;
/*! \brief The next id to assign to a protocol.
* This value is incremented by 1 each time a protocol is started.
* If a protocol has an id lower than this value, it means that it has
* been formerly started.
*/
Synchronised<uint32_t> m_next_protocol_id;
/** When set to true, the main thread will exit. */
Synchronised<bool> m_exit;
// mutexes:
/*! Used to ensure that the protocol vector is used thread-safely. */
pthread_mutex_t m_asynchronous_protocols_mutex;
/*! Asynchronous update thread.*/
pthread_t* m_asynchronous_update_thread;
pthread_t m_asynchronous_update_thread;
ProtocolManager();
virtual ~ProtocolManager();
static void* mainLoop(void *data);
uint32_t getNextProtocolId();
bool sendEvent(Event* event);
virtual void startProtocol(Protocol *protocol);
@@ -141,15 +190,21 @@ private:
virtual void unpauseProtocol(Protocol *protocol);
public:
virtual void abort();
virtual void propagateEvent(Event* event);
virtual uint32_t requestStart(Protocol* protocol);
virtual void requestPause(Protocol* protocol);
virtual void requestUnpause(Protocol* protocol);
virtual void requestTerminate(Protocol* protocol);
virtual void update(float dt);
virtual Protocol* getProtocol(uint32_t id);
virtual Protocol* getProtocol(ProtocolType type);
void abort();
void propagateEvent(Event* event);
Protocol* getProtocol(ProtocolType type);
void requestStart(Protocol* protocol);
void requestPause(Protocol* protocol);
void requestUnpause(Protocol* protocol);
void requestTerminate(Protocol* protocol);
void findAndTerminate(ProtocolType type);
void update(float dt);
// ------------------------------------------------------------------------
const pthread_t & getThreadID() const
{
return m_asynchronous_update_thread;
} // getThreadID
}; // class ProtocolManager
#endif // PROTOCOL_MANAGER_HPP

View File

@@ -679,29 +679,10 @@ void ClientLobby::raceFinished(Event* event)
"Server notified that the race is finished.");
// stop race protocols
Protocol* protocol = ProtocolManager::getInstance()
->getProtocol(PROTOCOL_CONTROLLER_EVENTS);
if (protocol)
ProtocolManager::getInstance()->requestTerminate(protocol);
else
Log::error("ClientLobby",
"No controller events protocol registered.");
protocol = ProtocolManager::getInstance()
->getProtocol(PROTOCOL_KART_UPDATE);
if (protocol)
ProtocolManager::getInstance()->requestTerminate(protocol);
else
Log::error("ClientLobby",
"No kart update protocol registered.");
protocol = ProtocolManager::getInstance()
->getProtocol(PROTOCOL_GAME_EVENTS);
if (protocol)
ProtocolManager::getInstance()->requestTerminate(protocol);
else
Log::error("ClientLobby",
"No game events protocol registered.");
ProtocolManager *pm = ProtocolManager::getInstance();
pm->findAndTerminate(PROTOCOL_CONTROLLER_EVENTS);
pm->findAndTerminate(PROTOCOL_KART_UPDATE);
pm->findAndTerminate(PROTOCOL_GAME_EVENTS);
// finish the race
WorldWithRank* ranked_world = (WorldWithRank*)(World::getWorld());

View File

@@ -135,9 +135,5 @@ void LobbyProtocol::loadWorld()
*/
void LobbyProtocol::terminateLatencyProtocol()
{
Protocol *p = ProtocolManager::getInstance()
->getProtocol(PROTOCOL_SYNCHRONIZATION);
LatencyProtocol *sp = dynamic_cast<LatencyProtocol*>(p);
if (sp)
sp->requestTerminate();
ProtocolManager::getInstance()->findAndTerminate(PROTOCOL_SYNCHRONIZATION);
} // stopLatencyProtocol

View File

@@ -279,9 +279,10 @@ void ServerLobby::update(float dt)
// notify the network world that it is stopped
RaceEventManager::getInstance()->stop();
// stop race protocols
findAndTerminateProtocol(PROTOCOL_CONTROLLER_EVENTS);
findAndTerminateProtocol(PROTOCOL_KART_UPDATE);
findAndTerminateProtocol(PROTOCOL_GAME_EVENTS);
ProtocolManager *pm = ProtocolManager::getInstance();
pm->findAndTerminate(PROTOCOL_CONTROLLER_EVENTS);
pm->findAndTerminate(PROTOCOL_KART_UPDATE);
pm->findAndTerminate(PROTOCOL_GAME_EVENTS);
}
break;
case DONE: