Create a prototype for thread-safe protocol manager

Using weak and shared_ptr, if !lock(), than it was atomtically
destroyed
This commit is contained in:
Benau 2018-02-15 16:47:04 +08:00
parent 3d8efcdfa1
commit 05f7c014dd
13 changed files with 103 additions and 72 deletions

View File

@ -343,7 +343,10 @@ void MainLoop::run()
PROFILER_PUSH_CPU_MARKER("Protocol manager update",
0x7F, 0x00, 0x7F);
ProtocolManager::getInstance()->update(dt);
if (auto pm = ProtocolManager::lock())
{
pm->update(dt);
}
PROFILER_POP_CPU_MARKER();
}
if (World::getWorld()) World::getWorld()->updateTime(dt);

View File

@ -79,7 +79,7 @@ bool Protocol::checkDataSize(Event* event, unsigned int minimum_size)
*/
void Protocol::requestStart()
{
ProtocolManager::getInstance()->requestStart(this);
ProtocolManager::lock()->requestStart(this);
} // requestStart
// ----------------------------------------------------------------------------
@ -87,7 +87,7 @@ void Protocol::requestStart()
*/
void Protocol::requestPause()
{
ProtocolManager::getInstance()->requestPause(this);
ProtocolManager::lock()->requestPause(this);
} // requestPause
// ----------------------------------------------------------------------------
@ -95,7 +95,7 @@ void Protocol::requestPause()
*/
void Protocol::requestUnpause()
{
ProtocolManager::getInstance()->requestUnpause(this);
ProtocolManager::lock()->requestUnpause(this);
} // requestUnpause
// ----------------------------------------------------------------------------
@ -103,7 +103,7 @@ void Protocol::requestUnpause()
*/
void Protocol::requestTerminate()
{
ProtocolManager::getInstance()->requestTerminate(this);
ProtocolManager::lock()->requestTerminate(this);
} // requestTerminate
// ----------------------------------------------------------------------------

View File

@ -30,42 +30,58 @@
#include <assert.h>
#include <cstdlib>
#include <errno.h>
#include <functional>
#include <typeinfo>
ProtocolManager::ProtocolManager()
// ============================================================================
std::weak_ptr<ProtocolManager> ProtocolManager::m_protocol_manager;
// ============================================================================
std::shared_ptr<ProtocolManager> ProtocolManager::createInstance()
{
m_exit.setAtomic(false);
m_all_protocols.resize(PROTOCOL_MAX);
pthread_create(&m_asynchronous_update_thread, NULL,
ProtocolManager::mainLoop, this);
} // ProtocolManager
if (!m_protocol_manager.expired())
{
Log::fatal("ProtocolManager",
"Create only 1 instance of ProtocolManager!");
return NULL;
}
auto pm = std::make_shared<ProtocolManager>();
pm->m_asynchronous_update_thread = std::thread(
std::bind(&ProtocolManager::startAsynchronousUpdateThread, pm));
m_protocol_manager = pm;
return pm;
} // createInstance
// ----------------------------------------------------------------------------
void* ProtocolManager::mainLoop(void* data)
std::shared_ptr<ProtocolManager> ProtocolManager::lock()
{
VS::setThreadName("ProtocolManager");
ProtocolManager* manager = static_cast<ProtocolManager*>(data);
while(manager && !manager->m_exit.getAtomic())
{
manager->asynchronousUpdate();
PROFILER_PUSH_CPU_MARKER("sleep", 0, 255, 255);
StkTime::sleep(2);
PROFILER_POP_CPU_MARKER();
}
return NULL;
} // mainLoop
return m_protocol_manager.lock();
} // lock
// ----------------------------------------------------------------------------
ProtocolManager::ProtocolManager()
{
m_exit.store(false);
m_all_protocols.resize(PROTOCOL_MAX);
} // ProtocolManager
// ----------------------------------------------------------------------------
ProtocolManager::~ProtocolManager()
{
} // ~ProtocolManager
// ----------------------------------------------------------------------------
void ProtocolManager::startAsynchronousUpdateThread()
{
VS::setThreadName("ProtocolManager");
while(!m_exit.load())
{
asynchronousUpdate();
PROFILER_PUSH_CPU_MARKER("sleep", 0, 255, 255);
StkTime::sleep(2);
PROFILER_POP_CPU_MARKER();
}
} // startAsynchronousUpdateThread
// ----------------------------------------------------------------------------
void ProtocolManager::OneProtocolType::abort()
{
@ -79,8 +95,8 @@ void ProtocolManager::OneProtocolType::abort()
*/
void ProtocolManager::abort()
{
m_exit.setAtomic(true);
pthread_join(m_asynchronous_update_thread, NULL); // wait the thread to finish
m_exit.store(true);
m_asynchronous_update_thread.join(); // wait the thread to finish
// Now only this main thread is active, no more need for locks
for (unsigned int i = 0; i < m_all_protocols.size(); i++)
@ -218,7 +234,7 @@ void ProtocolManager::requestTerminate(Protocol* protocol)
*/
void ProtocolManager::startProtocol(Protocol *protocol)
{
assert(pthread_equal(pthread_self(), m_asynchronous_update_thread));
assert(std::this_thread::get_id() == m_asynchronous_update_thread.get_id());
OneProtocolType &opt = m_all_protocols[protocol->getProtocolType()];
opt.lock();
opt.addProtocol(protocol);
@ -238,7 +254,7 @@ void ProtocolManager::startProtocol(Protocol *protocol)
*/
void ProtocolManager::pauseProtocol(Protocol *protocol)
{
assert(pthread_equal(pthread_self(), m_asynchronous_update_thread));
assert(std::this_thread::get_id() == m_asynchronous_update_thread.get_id());
assert(protocol->getState() == PROTOCOL_STATE_RUNNING);
// We lock the protocol to avoid that paused() is called at the same
// time that the main thread delivers an event or calls update
@ -255,7 +271,7 @@ void ProtocolManager::pauseProtocol(Protocol *protocol)
*/
void ProtocolManager::unpauseProtocol(Protocol *protocol)
{
assert(pthread_equal(pthread_self(), m_asynchronous_update_thread));
assert(std::this_thread::get_id() == m_asynchronous_update_thread.get_id());
assert(protocol->getState() == PROTOCOL_STATE_PAUSED);
// No lock necessary, since the protocol is paused, no other thread will
// be executing
@ -292,7 +308,7 @@ void ProtocolManager::OneProtocolType::removeProtocol(Protocol *p)
*/
void ProtocolManager::terminateProtocol(Protocol *protocol)
{
assert(pthread_equal(pthread_self(), m_asynchronous_update_thread));
assert(std::this_thread::get_id() == m_asynchronous_update_thread.get_id());
OneProtocolType &opt = m_all_protocols[protocol->getProtocolType()];
// Be sure that noone accesses the protocols vector
@ -415,7 +431,7 @@ void ProtocolManager::OneProtocolType::update(float dt, bool async)
void ProtocolManager::update(float dt)
{
// Update from main thread only:
assert(!pthread_equal(pthread_self(), m_asynchronous_update_thread));
assert(std::this_thread::get_id() != m_asynchronous_update_thread.get_id());
// before updating, notify protocols that they have received events
m_sync_events_to_process.lock();
@ -462,7 +478,7 @@ void ProtocolManager::asynchronousUpdate()
{
PROFILER_PUSH_CPU_MARKER("Message delivery", 255, 0, 0);
// Update from ProtocolManager thread only:
assert(pthread_equal(pthread_self(), m_asynchronous_update_thread));
assert(std::this_thread::get_id() == m_asynchronous_update_thread.get_id());
// First deliver asynchronous messages for all protocols
// =====================================================
@ -554,4 +570,4 @@ Protocol* ProtocolManager::getProtocol(ProtocolType type)
if (opt.isEmpty()) return NULL;
return opt.getFirstProtocol();
} // getProtocol
} // getProtocol

View File

@ -30,8 +30,11 @@
#include "utils/synchronised.hpp"
#include "utils/types.hpp"
#include <atomic>
#include <list>
#include <memory>
#include <vector>
#include <thread>
class Event;
class STKPeer;
@ -130,10 +133,8 @@ public:
* responsible to forward events to all protocols with the same id.
*
*/
class ProtocolManager : public AbstractSingleton<ProtocolManager>,
public NoCopy
class ProtocolManager : public NoCopy
{
friend class AbstractSingleton<ProtocolManager>;
private:
/** A simple class that stores all protocols of a certain type. While
@ -211,14 +212,15 @@ private:
Synchronised< std::vector<ProtocolRequest> > m_requests;
/** When set to true, the main thread will exit. */
Synchronised<bool> m_exit;
std::atomic_bool m_exit;
/*! Asynchronous update thread.*/
pthread_t m_asynchronous_update_thread;
std::thread m_asynchronous_update_thread;
ProtocolManager();
virtual ~ProtocolManager();
static void* mainLoop(void *data);
/*! Single instance of protocol manager.*/
static std::weak_ptr<ProtocolManager> m_protocol_manager;
void startAsynchronousUpdateThread();
bool sendEvent(Event* event);
virtual void startProtocol(Protocol *protocol);
@ -228,6 +230,8 @@ private:
virtual void unpauseProtocol(Protocol *protocol);
public:
ProtocolManager();
virtual ~ProtocolManager();
void abort();
void propagateEvent(Event* event);
Protocol* getProtocol(ProtocolType type);
@ -238,10 +242,16 @@ public:
void findAndTerminate(ProtocolType type);
void update(float dt);
// ------------------------------------------------------------------------
const pthread_t & getThreadID() const
bool isExiting() const { return m_exit.load(); }
// ------------------------------------------------------------------------
const std::thread& getThread() const
{
return m_asynchronous_update_thread;
} // getThreadID
// ------------------------------------------------------------------------
static std::shared_ptr<ProtocolManager> createInstance();
// ------------------------------------------------------------------------
static std::shared_ptr<ProtocolManager> lock();
}; // class ProtocolManager

View File

@ -370,7 +370,7 @@ void ClientLobby::update(float dt)
break;
case DONE:
m_state = EXITING;
ProtocolManager::getInstance()->requestTerminate(this);
ProtocolManager::lock()->requestTerminate(this);
break;
case EXITING:
break;
@ -722,7 +722,8 @@ void ClientLobby::raceFinished(Event* event)
"Server notified that the race is finished.");
// stop race protocols
ProtocolManager *pm = ProtocolManager::getInstance();
auto pm = ProtocolManager::lock();
assert(pm);
pm->findAndTerminate(PROTOCOL_CONTROLLER_EVENTS);
pm->findAndTerminate(PROTOCOL_KART_UPDATE);
pm->findAndTerminate(PROTOCOL_GAME_EVENTS);
@ -756,7 +757,8 @@ void ClientLobby::exitResultScreen(Event *event)
m_game_setup = STKHost::get()->setupNewGame();
STKHost::get()->getServerPeerForClient()->unsetClientServerToken();
// stop race protocols
ProtocolManager *pm = ProtocolManager::getInstance();
auto pm = ProtocolManager::lock();
assert(pm);
pm->findAndTerminate(PROTOCOL_CONTROLLER_EVENTS);
pm->findAndTerminate(PROTOCOL_KART_UPDATE);
pm->findAndTerminate(PROTOCOL_GAME_EVENTS);

View File

@ -132,7 +132,7 @@ void ConnectToPeer::asynchronousUpdate()
{
m_current_protocol = new PingProtocol(m_peer_address,
/*time-between-ping*/2.0);
ProtocolManager::getInstance()->requestStart(m_current_protocol);
ProtocolManager::lock()->requestStart(m_current_protocol);
m_state = CONNECTING;
}
else

View File

@ -135,5 +135,5 @@ void LobbyProtocol::loadWorld()
*/
void LobbyProtocol::terminateLatencyProtocol()
{
ProtocolManager::getInstance()->findAndTerminate(PROTOCOL_SYNCHRONIZATION);
ProtocolManager::lock()->findAndTerminate(PROTOCOL_SYNCHRONIZATION);
} // stopLatencyProtocol

View File

@ -289,7 +289,8 @@ void ServerLobby::update(float dt)
// notify the network world that it is stopped
RaceEventManager::getInstance()->stop();
// stop race protocols
ProtocolManager *pm = ProtocolManager::getInstance();
auto pm = ProtocolManager::lock();
assert(pm);
pm->findAndTerminate(PROTOCOL_CONTROLLER_EVENTS);
pm->findAndTerminate(PROTOCOL_KART_UPDATE);
pm->findAndTerminate(PROTOCOL_GAME_EVENTS);

View File

@ -27,7 +27,7 @@ void RaceEventManager::update(float dt)
{
// This can happen in case of disconnects - protocol manager is
// shut down, but still events to process.
if(!ProtocolManager::getInstance())
if(!ProtocolManager::lock())
return;
// Replay all recorded events up to the current time (only if the
@ -78,7 +78,7 @@ bool RaceEventManager::isRaceOver()
void RaceEventManager::kartFinishedRace(AbstractKart *kart, float time)
{
GameEventsProtocol* protocol = static_cast<GameEventsProtocol*>(
ProtocolManager::getInstance()->getProtocol(PROTOCOL_GAME_EVENTS));
ProtocolManager::lock()->getProtocol(PROTOCOL_GAME_EVENTS));
protocol->kartFinishedRace(kart, time);
} // kartFinishedRace
@ -94,7 +94,7 @@ void RaceEventManager::collectedItem(Item *item, AbstractKart *kart)
assert(NetworkConfig::get()->isServer());
GameEventsProtocol* protocol = static_cast<GameEventsProtocol*>(
ProtocolManager::getInstance()->getProtocol(PROTOCOL_GAME_EVENTS));
ProtocolManager::lock()->getProtocol(PROTOCOL_GAME_EVENTS));
protocol->collectedItem(item,kart);
} // collectedItem

View File

@ -290,7 +290,7 @@ STKHost::STKHost(const irr::core::stringw &server_name)
startListening();
Protocol *p = LobbyProtocol::create<ServerLobby>();
ProtocolManager::getInstance()->requestStart(p);
ProtocolManager::lock()->requestStart(p);
} // STKHost(server_name)
@ -320,7 +320,7 @@ void STKHost::init()
Log::info("STKHost", "Host initialized.");
Network::openLog(); // Open packet log file
ProtocolManager::getInstance<ProtocolManager>();
ProtocolManager::createInstance();
// Optional: start the network console
m_network_console = NULL;
@ -337,7 +337,6 @@ void STKHost::init()
*/
STKHost::~STKHost()
{
ProtocolManager::kill();
// delete the game setup
if (m_game_setup)
delete m_game_setup;
@ -375,7 +374,7 @@ void STKHost::requestShutdown()
void STKHost::shutdown()
{
ServersManager::get()->unsetJoinedServer();
ProtocolManager::getInstance()->abort();
ProtocolManager::lock()->abort();
deleteAllPeers();
destroy();
} // shutdown
@ -414,7 +413,7 @@ void STKHost::abort()
{
// Finish protocol manager first, to avoid that it access data
// in STKHost.
ProtocolManager::getInstance()->abort();
ProtocolManager::lock()->abort();
stopListening();
} // abort
@ -548,6 +547,13 @@ void* STKHost::mainLoop(void* self)
if (event.type == ENET_EVENT_TYPE_NONE)
continue;
auto pm = ProtocolManager::lock();
if (!pm || pm->isExiting())
{
// Don't create more event if no protocol manager or it will
// be exiting
continue;
}
// Create an STKEvent with the event data. This will also
// create the peer if it doesn't exist already
Event* stk_event = new Event(&event);
@ -579,7 +585,7 @@ void* STKHost::mainLoop(void* self)
} // if message event
// notify for the event now.
ProtocolManager::getInstance()->propagateEvent(stk_event);
pm->propagateEvent(stk_event);
} // while enet_host_service
} // while !mustStopListening
@ -638,13 +644,6 @@ void STKHost::handleDirectSocketRequest()
{
// In case of a LAN connection, we only allow connections from
// a LAN address (192.168*, ..., and 127.*).
if (NetworkConfig::get()->isLAN() && !sender.isLAN())
{
Log::error("STKHost", "Client trying to connect from '%s'",
sender.toString().c_str());
Log::error("STKHost", "which is outside of LAN - rejected.");
return;
}
Protocol *c = new ConnectToPeer(sender);
c->requestStart();
}

View File

@ -196,7 +196,7 @@ void NetworkKartSelectionScreen::playerSelected(uint8_t player_id,
// to the server.
if(STKHost::get()->isAuthorisedToControl())
{
Protocol* protocol = ProtocolManager::getInstance()
Protocol* protocol = ProtocolManager::lock()
->getProtocol(PROTOCOL_LOBBY_ROOM);
ClientLobby* clrp =
dynamic_cast<ClientLobby*>(protocol);

View File

@ -151,7 +151,7 @@ void OnlineProfileServers::doQuickPlay()
NetworkingLobby::getInstance()->push();
ConnectToServer *cts = new ConnectToServer(server->getServerId(),
server->getHostId());
ProtocolManager::getInstance()->requestStart(cts);
ProtocolManager::lock()->requestStart(cts);
}
else
{

View File

@ -24,8 +24,8 @@
#include "utils/log.hpp"
/*! \class ProtocolManager
* \brief Manages the protocols at runtime.
/*! \class AbstractSingleton
* \brief Manages the abstract singleton at runtime.
* This has been designed to allow multi-inheritance. This is advised to
* re-declare getInstance, but whithout templates parameters in the inheriting
* classes.