From 05f7c014dd4a31812962ef56319356aea08bb730 Mon Sep 17 00:00:00 2001 From: Benau Date: Thu, 15 Feb 2018 16:47:04 +0800 Subject: [PATCH] Create a prototype for thread-safe protocol manager Using weak and shared_ptr, if !lock(), than it was atomtically destroyed --- src/main_loop.cpp | 5 +- src/network/protocol.cpp | 8 +- src/network/protocol_manager.cpp | 80 +++++++++++-------- src/network/protocol_manager.hpp | 28 ++++--- src/network/protocols/client_lobby.cpp | 8 +- src/network/protocols/connect_to_peer.cpp | 2 +- src/network/protocols/lobby_protocol.cpp | 2 +- src/network/protocols/server_lobby.cpp | 3 +- src/network/race_event_manager.cpp | 6 +- src/network/stk_host.cpp | 25 +++--- src/states_screens/network_kart_selection.cpp | 2 +- src/states_screens/online_profile_servers.cpp | 2 +- src/utils/singleton.hpp | 4 +- 13 files changed, 103 insertions(+), 72 deletions(-) diff --git a/src/main_loop.cpp b/src/main_loop.cpp index b2c57b497..a9d185049 100644 --- a/src/main_loop.cpp +++ b/src/main_loop.cpp @@ -343,7 +343,10 @@ void MainLoop::run() PROFILER_PUSH_CPU_MARKER("Protocol manager update", 0x7F, 0x00, 0x7F); - ProtocolManager::getInstance()->update(dt); + if (auto pm = ProtocolManager::lock()) + { + pm->update(dt); + } PROFILER_POP_CPU_MARKER(); } if (World::getWorld()) World::getWorld()->updateTime(dt); diff --git a/src/network/protocol.cpp b/src/network/protocol.cpp index 90f5ee610..29d54192e 100644 --- a/src/network/protocol.cpp +++ b/src/network/protocol.cpp @@ -79,7 +79,7 @@ bool Protocol::checkDataSize(Event* event, unsigned int minimum_size) */ void Protocol::requestStart() { - ProtocolManager::getInstance()->requestStart(this); + ProtocolManager::lock()->requestStart(this); } // requestStart // ---------------------------------------------------------------------------- @@ -87,7 +87,7 @@ void Protocol::requestStart() */ void Protocol::requestPause() { - ProtocolManager::getInstance()->requestPause(this); + ProtocolManager::lock()->requestPause(this); } // requestPause // ---------------------------------------------------------------------------- @@ -95,7 +95,7 @@ void Protocol::requestPause() */ void Protocol::requestUnpause() { - ProtocolManager::getInstance()->requestUnpause(this); + ProtocolManager::lock()->requestUnpause(this); } // requestUnpause // ---------------------------------------------------------------------------- @@ -103,7 +103,7 @@ void Protocol::requestUnpause() */ void Protocol::requestTerminate() { - ProtocolManager::getInstance()->requestTerminate(this); + ProtocolManager::lock()->requestTerminate(this); } // requestTerminate // ---------------------------------------------------------------------------- diff --git a/src/network/protocol_manager.cpp b/src/network/protocol_manager.cpp index 689edb771..6327732b9 100644 --- a/src/network/protocol_manager.cpp +++ b/src/network/protocol_manager.cpp @@ -30,42 +30,58 @@ #include #include #include +#include #include - -ProtocolManager::ProtocolManager() +// ============================================================================ +std::weak_ptr ProtocolManager::m_protocol_manager; +// ============================================================================ +std::shared_ptr ProtocolManager::createInstance() { - m_exit.setAtomic(false); - - m_all_protocols.resize(PROTOCOL_MAX); - - pthread_create(&m_asynchronous_update_thread, NULL, - ProtocolManager::mainLoop, this); -} // ProtocolManager + if (!m_protocol_manager.expired()) + { + Log::fatal("ProtocolManager", + "Create only 1 instance of ProtocolManager!"); + return NULL; + } + auto pm = std::make_shared(); + pm->m_asynchronous_update_thread = std::thread( + std::bind(&ProtocolManager::startAsynchronousUpdateThread, pm)); + m_protocol_manager = pm; + return pm; +} // createInstance // ---------------------------------------------------------------------------- - -void* ProtocolManager::mainLoop(void* data) +std::shared_ptr ProtocolManager::lock() { - VS::setThreadName("ProtocolManager"); - - ProtocolManager* manager = static_cast(data); - while(manager && !manager->m_exit.getAtomic()) - { - manager->asynchronousUpdate(); - PROFILER_PUSH_CPU_MARKER("sleep", 0, 255, 255); - StkTime::sleep(2); - PROFILER_POP_CPU_MARKER(); - } - return NULL; -} // mainLoop + return m_protocol_manager.lock(); +} // lock +// ---------------------------------------------------------------------------- +ProtocolManager::ProtocolManager() +{ + m_exit.store(false); + m_all_protocols.resize(PROTOCOL_MAX); +} // ProtocolManager // ---------------------------------------------------------------------------- ProtocolManager::~ProtocolManager() { } // ~ProtocolManager +// ---------------------------------------------------------------------------- +void ProtocolManager::startAsynchronousUpdateThread() +{ + VS::setThreadName("ProtocolManager"); + while(!m_exit.load()) + { + asynchronousUpdate(); + PROFILER_PUSH_CPU_MARKER("sleep", 0, 255, 255); + StkTime::sleep(2); + PROFILER_POP_CPU_MARKER(); + } +} // startAsynchronousUpdateThread + // ---------------------------------------------------------------------------- void ProtocolManager::OneProtocolType::abort() { @@ -79,8 +95,8 @@ void ProtocolManager::OneProtocolType::abort() */ void ProtocolManager::abort() { - m_exit.setAtomic(true); - pthread_join(m_asynchronous_update_thread, NULL); // wait the thread to finish + m_exit.store(true); + m_asynchronous_update_thread.join(); // wait the thread to finish // Now only this main thread is active, no more need for locks for (unsigned int i = 0; i < m_all_protocols.size(); i++) @@ -218,7 +234,7 @@ void ProtocolManager::requestTerminate(Protocol* protocol) */ void ProtocolManager::startProtocol(Protocol *protocol) { - assert(pthread_equal(pthread_self(), m_asynchronous_update_thread)); + assert(std::this_thread::get_id() == m_asynchronous_update_thread.get_id()); OneProtocolType &opt = m_all_protocols[protocol->getProtocolType()]; opt.lock(); opt.addProtocol(protocol); @@ -238,7 +254,7 @@ void ProtocolManager::startProtocol(Protocol *protocol) */ void ProtocolManager::pauseProtocol(Protocol *protocol) { - assert(pthread_equal(pthread_self(), m_asynchronous_update_thread)); + assert(std::this_thread::get_id() == m_asynchronous_update_thread.get_id()); assert(protocol->getState() == PROTOCOL_STATE_RUNNING); // We lock the protocol to avoid that paused() is called at the same // time that the main thread delivers an event or calls update @@ -255,7 +271,7 @@ void ProtocolManager::pauseProtocol(Protocol *protocol) */ void ProtocolManager::unpauseProtocol(Protocol *protocol) { - assert(pthread_equal(pthread_self(), m_asynchronous_update_thread)); + assert(std::this_thread::get_id() == m_asynchronous_update_thread.get_id()); assert(protocol->getState() == PROTOCOL_STATE_PAUSED); // No lock necessary, since the protocol is paused, no other thread will // be executing @@ -292,7 +308,7 @@ void ProtocolManager::OneProtocolType::removeProtocol(Protocol *p) */ void ProtocolManager::terminateProtocol(Protocol *protocol) { - assert(pthread_equal(pthread_self(), m_asynchronous_update_thread)); + assert(std::this_thread::get_id() == m_asynchronous_update_thread.get_id()); OneProtocolType &opt = m_all_protocols[protocol->getProtocolType()]; // Be sure that noone accesses the protocols vector @@ -415,7 +431,7 @@ void ProtocolManager::OneProtocolType::update(float dt, bool async) void ProtocolManager::update(float dt) { // Update from main thread only: - assert(!pthread_equal(pthread_self(), m_asynchronous_update_thread)); + assert(std::this_thread::get_id() != m_asynchronous_update_thread.get_id()); // before updating, notify protocols that they have received events m_sync_events_to_process.lock(); @@ -462,7 +478,7 @@ void ProtocolManager::asynchronousUpdate() { PROFILER_PUSH_CPU_MARKER("Message delivery", 255, 0, 0); // Update from ProtocolManager thread only: - assert(pthread_equal(pthread_self(), m_asynchronous_update_thread)); + assert(std::this_thread::get_id() == m_asynchronous_update_thread.get_id()); // First deliver asynchronous messages for all protocols // ===================================================== @@ -554,4 +570,4 @@ Protocol* ProtocolManager::getProtocol(ProtocolType type) if (opt.isEmpty()) return NULL; return opt.getFirstProtocol(); -} // getProtocol \ No newline at end of file +} // getProtocol diff --git a/src/network/protocol_manager.hpp b/src/network/protocol_manager.hpp index b0752ae71..e4424cc5d 100644 --- a/src/network/protocol_manager.hpp +++ b/src/network/protocol_manager.hpp @@ -30,8 +30,11 @@ #include "utils/synchronised.hpp" #include "utils/types.hpp" +#include #include +#include #include +#include class Event; class STKPeer; @@ -130,10 +133,8 @@ public: * responsible to forward events to all protocols with the same id. * */ -class ProtocolManager : public AbstractSingleton, - public NoCopy +class ProtocolManager : public NoCopy { - friend class AbstractSingleton; private: /** A simple class that stores all protocols of a certain type. While @@ -211,14 +212,15 @@ private: Synchronised< std::vector > m_requests; /** When set to true, the main thread will exit. */ - Synchronised m_exit; + std::atomic_bool m_exit; /*! Asynchronous update thread.*/ - pthread_t m_asynchronous_update_thread; + std::thread m_asynchronous_update_thread; - ProtocolManager(); - virtual ~ProtocolManager(); - static void* mainLoop(void *data); + /*! Single instance of protocol manager.*/ + static std::weak_ptr m_protocol_manager; + + void startAsynchronousUpdateThread(); bool sendEvent(Event* event); virtual void startProtocol(Protocol *protocol); @@ -228,6 +230,8 @@ private: virtual void unpauseProtocol(Protocol *protocol); public: + ProtocolManager(); + virtual ~ProtocolManager(); void abort(); void propagateEvent(Event* event); Protocol* getProtocol(ProtocolType type); @@ -238,10 +242,16 @@ public: void findAndTerminate(ProtocolType type); void update(float dt); // ------------------------------------------------------------------------ - const pthread_t & getThreadID() const + bool isExiting() const { return m_exit.load(); } + // ------------------------------------------------------------------------ + const std::thread& getThread() const { return m_asynchronous_update_thread; } // getThreadID + // ------------------------------------------------------------------------ + static std::shared_ptr createInstance(); + // ------------------------------------------------------------------------ + static std::shared_ptr lock(); }; // class ProtocolManager diff --git a/src/network/protocols/client_lobby.cpp b/src/network/protocols/client_lobby.cpp index fa9ef0f6b..d0ec62168 100644 --- a/src/network/protocols/client_lobby.cpp +++ b/src/network/protocols/client_lobby.cpp @@ -370,7 +370,7 @@ void ClientLobby::update(float dt) break; case DONE: m_state = EXITING; - ProtocolManager::getInstance()->requestTerminate(this); + ProtocolManager::lock()->requestTerminate(this); break; case EXITING: break; @@ -722,7 +722,8 @@ void ClientLobby::raceFinished(Event* event) "Server notified that the race is finished."); // stop race protocols - ProtocolManager *pm = ProtocolManager::getInstance(); + auto pm = ProtocolManager::lock(); + assert(pm); pm->findAndTerminate(PROTOCOL_CONTROLLER_EVENTS); pm->findAndTerminate(PROTOCOL_KART_UPDATE); pm->findAndTerminate(PROTOCOL_GAME_EVENTS); @@ -756,7 +757,8 @@ void ClientLobby::exitResultScreen(Event *event) m_game_setup = STKHost::get()->setupNewGame(); STKHost::get()->getServerPeerForClient()->unsetClientServerToken(); // stop race protocols - ProtocolManager *pm = ProtocolManager::getInstance(); + auto pm = ProtocolManager::lock(); + assert(pm); pm->findAndTerminate(PROTOCOL_CONTROLLER_EVENTS); pm->findAndTerminate(PROTOCOL_KART_UPDATE); pm->findAndTerminate(PROTOCOL_GAME_EVENTS); diff --git a/src/network/protocols/connect_to_peer.cpp b/src/network/protocols/connect_to_peer.cpp index 41e0bf6a9..70b505e16 100644 --- a/src/network/protocols/connect_to_peer.cpp +++ b/src/network/protocols/connect_to_peer.cpp @@ -132,7 +132,7 @@ void ConnectToPeer::asynchronousUpdate() { m_current_protocol = new PingProtocol(m_peer_address, /*time-between-ping*/2.0); - ProtocolManager::getInstance()->requestStart(m_current_protocol); + ProtocolManager::lock()->requestStart(m_current_protocol); m_state = CONNECTING; } else diff --git a/src/network/protocols/lobby_protocol.cpp b/src/network/protocols/lobby_protocol.cpp index 0c0985392..4b4f489b0 100644 --- a/src/network/protocols/lobby_protocol.cpp +++ b/src/network/protocols/lobby_protocol.cpp @@ -135,5 +135,5 @@ void LobbyProtocol::loadWorld() */ void LobbyProtocol::terminateLatencyProtocol() { - ProtocolManager::getInstance()->findAndTerminate(PROTOCOL_SYNCHRONIZATION); + ProtocolManager::lock()->findAndTerminate(PROTOCOL_SYNCHRONIZATION); } // stopLatencyProtocol diff --git a/src/network/protocols/server_lobby.cpp b/src/network/protocols/server_lobby.cpp index 1c1b75f20..3ae85c425 100644 --- a/src/network/protocols/server_lobby.cpp +++ b/src/network/protocols/server_lobby.cpp @@ -289,7 +289,8 @@ void ServerLobby::update(float dt) // notify the network world that it is stopped RaceEventManager::getInstance()->stop(); // stop race protocols - ProtocolManager *pm = ProtocolManager::getInstance(); + auto pm = ProtocolManager::lock(); + assert(pm); pm->findAndTerminate(PROTOCOL_CONTROLLER_EVENTS); pm->findAndTerminate(PROTOCOL_KART_UPDATE); pm->findAndTerminate(PROTOCOL_GAME_EVENTS); diff --git a/src/network/race_event_manager.cpp b/src/network/race_event_manager.cpp index 8d2fb5d2f..02a699646 100644 --- a/src/network/race_event_manager.cpp +++ b/src/network/race_event_manager.cpp @@ -27,7 +27,7 @@ void RaceEventManager::update(float dt) { // This can happen in case of disconnects - protocol manager is // shut down, but still events to process. - if(!ProtocolManager::getInstance()) + if(!ProtocolManager::lock()) return; // Replay all recorded events up to the current time (only if the @@ -78,7 +78,7 @@ bool RaceEventManager::isRaceOver() void RaceEventManager::kartFinishedRace(AbstractKart *kart, float time) { GameEventsProtocol* protocol = static_cast( - ProtocolManager::getInstance()->getProtocol(PROTOCOL_GAME_EVENTS)); + ProtocolManager::lock()->getProtocol(PROTOCOL_GAME_EVENTS)); protocol->kartFinishedRace(kart, time); } // kartFinishedRace @@ -94,7 +94,7 @@ void RaceEventManager::collectedItem(Item *item, AbstractKart *kart) assert(NetworkConfig::get()->isServer()); GameEventsProtocol* protocol = static_cast( - ProtocolManager::getInstance()->getProtocol(PROTOCOL_GAME_EVENTS)); + ProtocolManager::lock()->getProtocol(PROTOCOL_GAME_EVENTS)); protocol->collectedItem(item,kart); } // collectedItem diff --git a/src/network/stk_host.cpp b/src/network/stk_host.cpp index 0b6f12d54..85c71a053 100644 --- a/src/network/stk_host.cpp +++ b/src/network/stk_host.cpp @@ -290,7 +290,7 @@ STKHost::STKHost(const irr::core::stringw &server_name) startListening(); Protocol *p = LobbyProtocol::create(); - ProtocolManager::getInstance()->requestStart(p); + ProtocolManager::lock()->requestStart(p); } // STKHost(server_name) @@ -320,7 +320,7 @@ void STKHost::init() Log::info("STKHost", "Host initialized."); Network::openLog(); // Open packet log file - ProtocolManager::getInstance(); + ProtocolManager::createInstance(); // Optional: start the network console m_network_console = NULL; @@ -337,7 +337,6 @@ void STKHost::init() */ STKHost::~STKHost() { - ProtocolManager::kill(); // delete the game setup if (m_game_setup) delete m_game_setup; @@ -375,7 +374,7 @@ void STKHost::requestShutdown() void STKHost::shutdown() { ServersManager::get()->unsetJoinedServer(); - ProtocolManager::getInstance()->abort(); + ProtocolManager::lock()->abort(); deleteAllPeers(); destroy(); } // shutdown @@ -414,7 +413,7 @@ void STKHost::abort() { // Finish protocol manager first, to avoid that it access data // in STKHost. - ProtocolManager::getInstance()->abort(); + ProtocolManager::lock()->abort(); stopListening(); } // abort @@ -548,6 +547,13 @@ void* STKHost::mainLoop(void* self) if (event.type == ENET_EVENT_TYPE_NONE) continue; + auto pm = ProtocolManager::lock(); + if (!pm || pm->isExiting()) + { + // Don't create more event if no protocol manager or it will + // be exiting + continue; + } // Create an STKEvent with the event data. This will also // create the peer if it doesn't exist already Event* stk_event = new Event(&event); @@ -579,7 +585,7 @@ void* STKHost::mainLoop(void* self) } // if message event // notify for the event now. - ProtocolManager::getInstance()->propagateEvent(stk_event); + pm->propagateEvent(stk_event); } // while enet_host_service } // while !mustStopListening @@ -638,13 +644,6 @@ void STKHost::handleDirectSocketRequest() { // In case of a LAN connection, we only allow connections from // a LAN address (192.168*, ..., and 127.*). - if (NetworkConfig::get()->isLAN() && !sender.isLAN()) - { - Log::error("STKHost", "Client trying to connect from '%s'", - sender.toString().c_str()); - Log::error("STKHost", "which is outside of LAN - rejected."); - return; - } Protocol *c = new ConnectToPeer(sender); c->requestStart(); } diff --git a/src/states_screens/network_kart_selection.cpp b/src/states_screens/network_kart_selection.cpp index 53f87aac2..559e165f2 100644 --- a/src/states_screens/network_kart_selection.cpp +++ b/src/states_screens/network_kart_selection.cpp @@ -196,7 +196,7 @@ void NetworkKartSelectionScreen::playerSelected(uint8_t player_id, // to the server. if(STKHost::get()->isAuthorisedToControl()) { - Protocol* protocol = ProtocolManager::getInstance() + Protocol* protocol = ProtocolManager::lock() ->getProtocol(PROTOCOL_LOBBY_ROOM); ClientLobby* clrp = dynamic_cast(protocol); diff --git a/src/states_screens/online_profile_servers.cpp b/src/states_screens/online_profile_servers.cpp index 3d070b154..d7ca56184 100644 --- a/src/states_screens/online_profile_servers.cpp +++ b/src/states_screens/online_profile_servers.cpp @@ -151,7 +151,7 @@ void OnlineProfileServers::doQuickPlay() NetworkingLobby::getInstance()->push(); ConnectToServer *cts = new ConnectToServer(server->getServerId(), server->getHostId()); - ProtocolManager::getInstance()->requestStart(cts); + ProtocolManager::lock()->requestStart(cts); } else { diff --git a/src/utils/singleton.hpp b/src/utils/singleton.hpp index 5c04d556c..074887a67 100644 --- a/src/utils/singleton.hpp +++ b/src/utils/singleton.hpp @@ -24,8 +24,8 @@ #include "utils/log.hpp" -/*! \class ProtocolManager - * \brief Manages the protocols at runtime. +/*! \class AbstractSingleton + * \brief Manages the abstract singleton at runtime. * This has been designed to allow multi-inheritance. This is advised to * re-declare getInstance, but whithout templates parameters in the inheriting * classes.