Make controller events conditional wait to avoid being stalled

This commit is contained in:
Benau 2019-05-05 15:46:59 +08:00
parent fb7c82786a
commit 7d19ccaad8
4 changed files with 78 additions and 86 deletions

View File

@ -19,7 +19,9 @@
#include "network/protocol_manager.hpp"
#include "network/event.hpp"
#include "network/protocol.hpp"
#include "network/network_config.hpp"
#include "network/protocols/game_protocol.hpp"
#include "network/protocols/server_lobby.hpp"
#include "network/stk_peer.hpp"
#include "utils/log.hpp"
#include "utils/profiler.hpp"
@ -56,6 +58,41 @@ std::shared_ptr<ProtocolManager> ProtocolManager::createInstance()
PROFILER_POP_CPU_MARKER();
}
});
if (NetworkConfig::get()->isServer())
{
pm->m_game_protocol_thread = std::thread([pm]()
{
VS::setThreadName("CtrlEvents");
while (true)
{
std::unique_lock<std::mutex> ul(pm->m_game_protocol_mutex);
pm->m_game_protocol_cv.wait(ul, [&pm]
{
return !pm->m_controller_events_list.empty();
});
Event* event_top = pm->m_controller_events_list.front();
pm->m_controller_events_list.pop_front();
ul.unlock();
if (event_top == NULL)
break;
auto sl = LobbyProtocol::get<ServerLobby>();
if (sl)
{
ServerLobby::ServerState ss = sl->getCurrentState();
if (!(ss >= ServerLobby::WAIT_FOR_WORLD_LOADED &&
ss <= ServerLobby::RACING))
{
delete event_top;
continue;
}
}
auto gp = GameProtocol::lock();
if (gp)
gp->notifyEventAsynchronous(event_top);
delete event_top;
}
});
}
m_protocol_manager = pm;
return pm;
} // createInstance
@ -90,6 +127,11 @@ ProtocolManager::~ProtocolManager()
m_async_events_to_process.getData().clear();
m_async_events_to_process.unlock();
for (EventList::iterator i = m_controller_events_list.begin();
i!= m_controller_events_list.end(); ++i)
delete *i;
m_controller_events_list.clear();
m_requests.lock();
m_requests.getData().clear();
m_requests.unlock();
@ -108,6 +150,14 @@ void ProtocolManager::OneProtocolType::abort()
void ProtocolManager::abort()
{
m_exit.store(true);
if (NetworkConfig::get()->isServer())
{
std::unique_lock<std::mutex> ul(m_game_protocol_mutex);
m_controller_events_list.push_back(NULL);
m_game_protocol_cv.notify_one();
ul.unlock();
m_game_protocol_thread.join();
}
// wait the thread to finish
m_asynchronous_update_thread.join();
} // abort
@ -119,6 +169,16 @@ void ProtocolManager::abort()
*/
void ProtocolManager::propagateEvent(Event* event)
{
// Special handling for contoller events in server
if (NetworkConfig::get()->isServer() &&
event->getType() == EVENT_TYPE_MESSAGE &&
event->data().getProtocolType() == PROTOCOL_CONTROLLER_EVENTS)
{
std::lock_guard<std::mutex> lock(m_game_protocol_mutex);
m_controller_events_list.push_back(event);
m_game_protocol_cv.notify_one();
return;
}
if (event->isSynchronous())
{
m_sync_events_to_process.lock();
@ -131,7 +191,6 @@ void ProtocolManager::propagateEvent(Event* event)
m_async_events_to_process.getData().push_back(event);
m_async_events_to_process.unlock();
}
return;
} // propagateEvent
// ----------------------------------------------------------------------------

View File

@ -31,8 +31,10 @@
#include "utils/types.hpp"
#include <atomic>
#include <condition_variable>
#include <list>
#include <memory>
#include <mutex>
#include <vector>
#include <thread>
@ -218,6 +220,16 @@ private:
/*! Asynchronous update thread.*/
std::thread m_asynchronous_update_thread;
/** Asynchronous game protocol thread to handle controller action as fast
* as possible. */
std::thread m_game_protocol_thread;
std::condition_variable m_game_protocol_cv;
std::mutex m_game_protocol_mutex;
EventList m_controller_events_list;
/*! Single instance of protocol manager.*/
static std::weak_ptr<ProtocolManager> m_protocol_manager;

View File

@ -126,9 +126,10 @@ bool GameProtocol::notifyEventAsynchronous(Event* event)
{
case GP_CONTROLLER_ACTION: handleControllerAction(event); break;
case GP_STATE: handleState(event); break;
case GP_ADJUST_TIME: handleAdjustTime(event); break;
//case GP_ITEM_UPDATE: handleItemUpdate(event); break;
case GP_ITEM_CONFIRMATION: handleItemEventConfirmation(event); break;
case GP_ADJUST_TIME:
case GP_ITEM_UPDATE:
break;
default: Log::error("GameProtocol",
"Received unknown message type %d - ignored.",
message_type); break;
@ -239,77 +240,10 @@ void GameProtocol::handleControllerAction(Event *event)
peer->updateLastActivity();
if (!will_trigger_rewind)
STKHost::get()->sendPacketExcept(peer, &data, false);
// FIXME unless there is a network jitter more than 100ms (more than
// server delay), time adjust is not necessary
/*if (not_rewound == 0 ||
m_initial_ticks.find(event->getPeer()) == m_initial_ticks.end())
return;
int cur_diff = cur_ticks - not_rewound;
const int max_adjustment = 12;
const int ticks_difference = m_initial_ticks.at(event->getPeer());
if (will_trigger_rewind)
{
rewind_delta += max_adjustment;
Log::info("GameProtocol", "At %d %f %d requesting time adjust"
" (speed up) of %d for host %d",
World::getWorld()->getTicksSinceStart(), StkTime::getRealTime(),
not_rewound, rewind_delta, event->getPeer()->getHostId());
// This message from a client triggered a rewind in the server.
// To avoid this, signal to the client that it should speed up.
adjustTimeForClient(event->getPeer(), rewind_delta);
return;
}
if (cur_diff > 0 &&
cur_diff - ticks_difference > max_adjustment)
{
const int adjustment = ticks_difference - cur_diff;
Log::info("GameProtocol", "At %d %f %d requesting time adjust"
" (slow down) of %d for host %d",
World::getWorld()->getTicksSinceStart(), StkTime::getRealTime(),
not_rewound, adjustment, event->getPeer()->getHostId());
adjustTimeForClient(event->getPeer(), adjustment);
}*/
} // if server
} // handleControllerAction
// ----------------------------------------------------------------------------
/** The server might request that a client adjusts its world clock (in order to
* reduce rewinds). This function sends a a (unreliable) message to the
* client.
* \param peer The peer that triggered the rewind.
* \param t Time that the peer needs to slowdown (<0) or speed up(>0).
*/
void GameProtocol::adjustTimeForClient(STKPeer *peer, int ticks)
{
assert(NetworkConfig::get()->isServer());
if (m_last_adjustments.find(peer) != m_last_adjustments.end() &&
StkTime::getRealTime() - m_last_adjustments.at(peer) < 3.0)
return;
NetworkString *ns = getNetworkString(5);
ns->addUInt8(GP_ADJUST_TIME).addUInt32(ticks);
// This message can be send unreliable, it's not critical if it doesn't
// get delivered, the server will request again later anyway.
peer->sendPacket(ns, /*reliable*/false);
m_last_adjustments[peer] = StkTime::getRealTime();
delete ns;
} // adjustTimeForClient
// ----------------------------------------------------------------------------
/** Called on a client when the server requests an adjustment of this client's
* world clock time (in order to reduce rewinds).
*/
void GameProtocol::handleAdjustTime(Event *event)
{
int ticks = event->data().getUInt32();
main_loop->setTicksAdjustment(ticks);
Log::verbose("GameProtocol", "%d ticks adjustment", ticks);
} // handleAdjustTime
// ----------------------------------------------------------------------------
/** Sends a confirmation to the server that all item events up to 'ticks'
* have been received.
@ -403,7 +337,8 @@ void GameProtocol::sendState()
*/
void GameProtocol::handleState(Event *event)
{
assert(NetworkConfig::get()->isClient());
if (!NetworkConfig::get()->isClient())
return;
NetworkString &data = event->data();
int ticks = data.getUInt32();
@ -454,11 +389,3 @@ void GameProtocol::rewind(BareNetworkString *buffer)
std::get<3>(a));
}
} // rewind
// ----------------------------------------------------------------------------
void GameProtocol::addInitialTicks(STKPeer* p, int ticks)
{
Log::verbose("GameProtocol", "Host %d with ticks difference %d",
p->getHostId(), ticks);
m_initial_ticks[p] = ticks;
} // addInitialTicks

View File

@ -27,7 +27,6 @@
#include "utils/singleton.hpp"
#include <cstdlib>
#include <map>
#include <mutex>
#include <vector>
#include <tuple>
@ -79,8 +78,6 @@ private:
void handleAdjustTime(Event *event);
void handleItemEventConfirmation(Event *event);
static std::weak_ptr<GameProtocol> m_game_protocol;
std::map<STKPeer*, int> m_initial_ticks;
std::map<STKPeer*, double> m_last_adjustments;
// Maximum value of values are only 32768
std::tuple<uint8_t, uint16_t, uint16_t, uint16_t>
compressAction(const Action& a)
@ -116,7 +113,6 @@ public:
void addState(BareNetworkString *buffer);
void sendState();
void finalizeState(std::vector<std::string>& cur_rewinder);
void adjustTimeForClient(STKPeer *peer, int ticks);
void sendItemEventConfirmation(int ticks);
virtual void undo(BareNetworkString *buffer) OVERRIDE;
@ -141,8 +137,6 @@ public:
/** Returns the NetworkString in which a state was saved. */
NetworkString* getState() const { return m_data_to_send; }
// ------------------------------------------------------------------------
void addInitialTicks(STKPeer* p, int ticks);
// ------------------------------------------------------------------------
std::unique_lock<std::mutex> acquireWorldDeletingMutex() const
{ return std::unique_lock<std::mutex>(m_world_deleting_mutex); }