Cosmetic changes only.

This commit is contained in:
hiker 2015-10-12 08:02:20 +11:00
parent 4b0d4cb853
commit 124420b90a
2 changed files with 135 additions and 88 deletions

View File

@ -18,8 +18,8 @@
#include "network/protocol_manager.hpp"
#include "network/protocol.hpp"
#include "network/network_manager.hpp"
#include "network/protocol.hpp"
#include "utils/log.hpp"
#include "utils/time.hpp"
@ -28,28 +28,6 @@
#include <errno.h>
#include <typeinfo>
void* protocolManagerUpdate(void* data)
{
ProtocolManager* manager = static_cast<ProtocolManager*>(data);
while(manager && !manager->exit())
{
manager->update();
StkTime::sleep(2);
}
return NULL;
}
void* protocolManagerAsynchronousUpdate(void* data)
{
ProtocolManager* manager = static_cast<ProtocolManager*>(data);
manager->m_asynchronous_thread_running = true;
while(manager && !manager->exit())
{
manager->asynchronousUpdate();
StkTime::sleep(2);
}
manager->m_asynchronous_thread_running = false;
return NULL;
}
ProtocolManager::ProtocolManager()
{
@ -61,24 +39,35 @@ ProtocolManager::ProtocolManager()
pthread_mutex_init(&m_exit_mutex, NULL);
m_next_protocol_id = 0;
pthread_mutex_lock(&m_exit_mutex); // will let the update function run
/// FIXME used on server because mainloop never running
/*if (NetworkManager::getInstance()->isServer())
{
m_update_thread = (pthread_t*)(malloc(sizeof(pthread_t)));
pthread_create(m_update_thread, NULL, protocolManagerUpdate, this);
}*/
// always run this one
m_asynchronous_update_thread = (pthread_t*)(malloc(sizeof(pthread_t)));
pthread_create(m_asynchronous_update_thread, NULL, protocolManagerAsynchronousUpdate, this);
}
m_asynchronous_update_thread = (pthread_t*)(malloc(sizeof(pthread_t)));
pthread_create(m_asynchronous_update_thread, NULL,
ProtocolManager::mainLoop, this);
} // ProtocolManager
// ----------------------------------------------------------------------------
void* ProtocolManager::mainLoop(void* data)
{
ProtocolManager* manager = static_cast<ProtocolManager*>(data);
manager->m_asynchronous_thread_running = true;
while(manager && !manager->exit())
{
manager->asynchronousUpdate();
StkTime::sleep(2);
}
manager->m_asynchronous_thread_running = false;
return NULL;
} // protocolManagerAsynchronousUpdate
// ----------------------------------------------------------------------------
ProtocolManager::~ProtocolManager()
{
}
} // ~ProtocolManager
// ----------------------------------------------------------------------------
void ProtocolManager::abort()
{
pthread_mutex_unlock(&m_exit_mutex); // will stop the update function
@ -107,20 +96,21 @@ void ProtocolManager::abort()
pthread_mutex_destroy(&m_requests_mutex);
pthread_mutex_destroy(&m_id_mutex);
pthread_mutex_destroy(&m_exit_mutex);
}
} // abort
// ----------------------------------------------------------------------------
void ProtocolManager::notifyEvent(Event* event)
{
pthread_mutex_lock(&m_events_mutex);
Event* event2 = new Event(*event);
// register protocols that will receive this event
std::vector<unsigned int> protocols_ids;
PROTOCOL_TYPE searchedProtocol = PROTOCOL_NONE;
PROTOCOL_TYPE searched_protocol = PROTOCOL_NONE;
if (event2->type == EVENT_TYPE_MESSAGE)
{
if (event2->data().size() > 0)
{
searchedProtocol = (PROTOCOL_TYPE)(event2->data()[0]);
searched_protocol = (PROTOCOL_TYPE)(event2->data()[0]);
event2->removeFront(1);
}
else
@ -130,21 +120,26 @@ void ProtocolManager::notifyEvent(Event* event)
}
if (event2->type == EVENT_TYPE_CONNECTED)
{
searchedProtocol = PROTOCOL_CONNECTION;
searched_protocol = PROTOCOL_CONNECTION;
}
Log::verbose("ProtocolManager", "Received event for protocols of type %d", searchedProtocol);
Log::verbose("ProtocolManager", "Received event for protocols of type %d",
searched_protocol);
pthread_mutex_lock(&m_protocols_mutex);
for (unsigned int i = 0; i < m_protocols.size() ; i++)
{
if (m_protocols[i].protocol->getProtocolType() == searchedProtocol || event2->type == EVENT_TYPE_DISCONNECTED) // pass data to protocols even when paused
// Pass data to protocols even when paused
if (m_protocols[i].protocol->getProtocolType() == searched_protocol ||
event2->type == EVENT_TYPE_DISCONNECTED)
{
protocols_ids.push_back(m_protocols[i].id);
}
}
pthread_mutex_unlock(&m_protocols_mutex);
if (searchedProtocol == PROTOCOL_NONE) // no protocol was aimed, show the msg to debug
// no protocol was aimed, show the msg to debug
if (searched_protocol == PROTOCOL_NONE)
{
Log::debug("ProtocolManager", "NO PROTOCOL : Message is \"%s\"", event2->data().std_string().c_str());
Log::debug("ProtocolManager", "NO PROTOCOL : Message is \"%s\"",
event2->data().std_string().c_str());
}
if (protocols_ids.size() != 0)
@ -156,33 +151,44 @@ void ProtocolManager::notifyEvent(Event* event)
m_events_to_process.push_back(epi); // add the event to the queue
}
else
Log::warn("ProtocolManager", "Received an event for %d that has no destination protocol.", searchedProtocol);
Log::warn("ProtocolManager",
"Received an event for %d that has no destination protocol.",
searched_protocol);
pthread_mutex_unlock(&m_events_mutex);
}
} // notifyEvent
void ProtocolManager::sendMessage(Protocol* sender, const NetworkString& message, bool reliable)
// ----------------------------------------------------------------------------
void ProtocolManager::sendMessage(Protocol* sender, const NetworkString& message,
bool reliable)
{
NetworkString newMessage;
newMessage.ai8(sender->getProtocolType()); // add one byte to add protocol type
newMessage += message;
NetworkManager::getInstance()->sendPacket(newMessage, reliable);
}
} // sendMessage
void ProtocolManager::sendMessage(Protocol* sender, STKPeer* peer, const NetworkString& message, bool reliable)
// ----------------------------------------------------------------------------
void ProtocolManager::sendMessage(Protocol* sender, STKPeer* peer,
const NetworkString& message, bool reliable)
{
NetworkString newMessage;
newMessage.ai8(sender->getProtocolType()); // add one byte to add protocol type
newMessage += message;
NetworkManager::getInstance()->sendPacket(peer, newMessage, reliable);
}
void ProtocolManager::sendMessageExcept(Protocol* sender, STKPeer* peer, const NetworkString& message, bool reliable)
} // sendMessage
// ----------------------------------------------------------------------------
void ProtocolManager::sendMessageExcept(Protocol* sender, STKPeer* peer,
const NetworkString& message,
bool reliable)
{
NetworkString newMessage;
newMessage.ai8(sender->getProtocolType()); // add one byte to add protocol type
newMessage += message;
NetworkManager::getInstance()->sendPacketExcept(peer, newMessage, reliable);
}
} // sendMessageExcept
// ----------------------------------------------------------------------------
uint32_t ProtocolManager::requestStart(Protocol* protocol)
{
// create the request
@ -199,8 +205,9 @@ uint32_t ProtocolManager::requestStart(Protocol* protocol)
pthread_mutex_unlock(&m_requests_mutex);
return info.id;
}
} // requestStart
// ----------------------------------------------------------------------------
void ProtocolManager::requestStop(Protocol* protocol)
{
if (!protocol)
@ -213,8 +220,9 @@ void ProtocolManager::requestStop(Protocol* protocol)
pthread_mutex_lock(&m_requests_mutex);
m_requests.push_back(req);
pthread_mutex_unlock(&m_requests_mutex);
}
} // requestStop
// ----------------------------------------------------------------------------
void ProtocolManager::requestPause(Protocol* protocol)
{
if (!protocol)
@ -227,8 +235,9 @@ void ProtocolManager::requestPause(Protocol* protocol)
pthread_mutex_lock(&m_requests_mutex);
m_requests.push_back(req);
pthread_mutex_unlock(&m_requests_mutex);
}
} // requestPause
// ----------------------------------------------------------------------------
void ProtocolManager::requestUnpause(Protocol* protocol)
{
if (!protocol)
@ -241,8 +250,9 @@ void ProtocolManager::requestUnpause(Protocol* protocol)
pthread_mutex_lock(&m_requests_mutex);
m_requests.push_back(req);
pthread_mutex_unlock(&m_requests_mutex);
}
} // requestUnpause
// ----------------------------------------------------------------------------
void ProtocolManager::requestTerminate(Protocol* protocol)
{
if (!protocol)
@ -264,50 +274,64 @@ void ProtocolManager::requestTerminate(Protocol* protocol)
}
m_requests.push_back(req);
pthread_mutex_unlock(&m_requests_mutex);
}
} // requestTerminate
// ----------------------------------------------------------------------------
void ProtocolManager::startProtocol(ProtocolInfo protocol)
{
// add the protocol to the protocol vector so that it's updated
pthread_mutex_lock(&m_protocols_mutex);
pthread_mutex_lock(&m_asynchronous_protocols_mutex);
Log::info("ProtocolManager", "A %s protocol with id=%u has been started. There are %ld protocols running.", typeid(*protocol.protocol).name(), protocol.id, m_protocols.size()+1);
Log::info("ProtocolManager",
"A %s protocol with id=%u has been started. There are %ld protocols running.",
typeid(*protocol.protocol).name(), protocol.id,
m_protocols.size()+1);
m_protocols.push_back(protocol);
// setup the protocol and notify it that it's started
protocol.protocol->setListener(this);
protocol.protocol->setup();
pthread_mutex_unlock(&m_protocols_mutex);
pthread_mutex_unlock(&m_asynchronous_protocols_mutex);
}
} // startProtocol
// ----------------------------------------------------------------------------
void ProtocolManager::stopProtocol(ProtocolInfo protocol)
{
} // stopProtocol
}
// ----------------------------------------------------------------------------
void ProtocolManager::pauseProtocol(ProtocolInfo protocol)
{
for (unsigned int i = 0; i < m_protocols.size(); i++)
{
if (m_protocols[i].protocol == protocol.protocol && m_protocols[i].state == PROTOCOL_STATE_RUNNING)
if (m_protocols[i].protocol == protocol.protocol &&
m_protocols[i].state == PROTOCOL_STATE_RUNNING)
{
m_protocols[i].state = PROTOCOL_STATE_PAUSED;
m_protocols[i].protocol->pause();
}
}
}
} // pauseProtocol
// ----------------------------------------------------------------------------
void ProtocolManager::unpauseProtocol(ProtocolInfo protocol)
{
for (unsigned int i = 0; i < m_protocols.size(); i++)
{
if (m_protocols[i].protocol == protocol.protocol && m_protocols[i].state == PROTOCOL_STATE_PAUSED)
if (m_protocols[i].protocol == protocol.protocol &&
m_protocols[i].state == PROTOCOL_STATE_PAUSED)
{
m_protocols[i].state = PROTOCOL_STATE_RUNNING;
m_protocols[i].protocol->unpause();
}
}
}
} // unpauseProtocol
// ----------------------------------------------------------------------------
void ProtocolManager::protocolTerminated(ProtocolInfo protocol)
{
pthread_mutex_lock(&m_protocols_mutex); // be sure that noone accesses the protocols vector while we erase a protocol
// Be sure that noone accesses the protocols vector while we erase a protocol
pthread_mutex_lock(&m_protocols_mutex);
pthread_mutex_lock(&m_asynchronous_protocols_mutex);
int offset = 0;
std::string protocol_type = typeid(*protocol.protocol).name();
@ -316,15 +340,19 @@ void ProtocolManager::protocolTerminated(ProtocolInfo protocol)
if (m_protocols[i-offset].protocol == protocol.protocol)
{
delete m_protocols[i].protocol;
m_protocols.erase(m_protocols.begin()+(i-offset), m_protocols.begin()+(i-offset)+1);
m_protocols.erase(m_protocols.begin()+(i-offset),
m_protocols.begin()+(i-offset)+1);
offset++;
}
}
Log::info("ProtocolManager", "A %s protocol has been terminated. There are %ld protocols running.", protocol_type.c_str(), m_protocols.size());
Log::info("ProtocolManager",
"A %s protocol has been terminated. There are %ld protocols running.",
protocol_type.c_str(), m_protocols.size());
pthread_mutex_unlock(&m_asynchronous_protocols_mutex);
pthread_mutex_unlock(&m_protocols_mutex);
}
} // protocolTerminated
// ----------------------------------------------------------------------------
bool ProtocolManager::propagateEvent(EventProcessingInfo* event, bool synchronous)
{
int index = 0;
@ -343,7 +371,8 @@ bool ProtocolManager::propagateEvent(EventProcessingInfo* event, bool synchronou
index++;
}
}
if (event->protocols_ids.size() == 0 || (StkTime::getTimeSinceEpoch()-event->arrival_time) >= TIME_TO_KEEP_EVENTS)
if (event->protocols_ids.size() == 0 ||
(StkTime::getTimeSinceEpoch()-event->arrival_time) >= TIME_TO_KEEP_EVENTS)
{
// because we made a copy of the event
delete event->event->peer; // no more need of that
@ -351,8 +380,9 @@ bool ProtocolManager::propagateEvent(EventProcessingInfo* event, bool synchronou
return true;
}
return false;
}
} // propagateEvent
// ----------------------------------------------------------------------------
void ProtocolManager::update()
{
// before updating, notice protocols that they have received events
@ -364,7 +394,8 @@ void ProtocolManager::update()
bool result = propagateEvent(&m_events_to_process[i+offset], true);
if (result)
{
m_events_to_process.erase(m_events_to_process.begin()+i+offset,m_events_to_process.begin()+i+offset+1);
m_events_to_process.erase(m_events_to_process.begin()+i+offset,
m_events_to_process.begin()+i+offset+1);
offset --;
}
}
@ -377,8 +408,9 @@ void ProtocolManager::update()
m_protocols[i].protocol->update();
}
pthread_mutex_unlock(&m_protocols_mutex);
}
} // update
// ----------------------------------------------------------------------------
void ProtocolManager::asynchronousUpdate()
{
// before updating, notice protocols that they have received information
@ -390,7 +422,8 @@ void ProtocolManager::asynchronousUpdate()
bool result = propagateEvent(&m_events_to_process[i+offset], false);
if (result)
{
m_events_to_process.erase(m_events_to_process.begin()+i+offset,m_events_to_process.begin()+i+offset+1);
m_events_to_process.erase(m_events_to_process.begin()+i+offset,
m_events_to_process.begin()+i+offset+1);
offset --;
}
}
@ -431,13 +464,15 @@ void ProtocolManager::asynchronousUpdate()
}
m_requests.clear();
pthread_mutex_unlock(&m_requests_mutex);
}
} // asynchronousUpdate
// ----------------------------------------------------------------------------
int ProtocolManager::runningProtocolsCount()
{
return (int)m_protocols.size();
}
} // runningProtocolsCount
// ----------------------------------------------------------------------------
PROTOCOL_STATE ProtocolManager::getProtocolState(uint32_t id)
{
for (unsigned int i = 0; i < m_protocols.size(); i++)
@ -448,12 +483,14 @@ PROTOCOL_STATE ProtocolManager::getProtocolState(uint32_t id)
// the protocol isn't running right now
for (unsigned int i = 0; i < m_requests.size(); i++)
{
if (m_requests[i].protocol_info.id == id) // the protocol is going to be started
// the protocol is going to be started
if (m_requests[i].protocol_info.id == id)
return PROTOCOL_STATE_RUNNING; // we can say it's running
}
return PROTOCOL_STATE_TERMINATED; // else, it's already finished
}
} // getProtocolState
// ----------------------------------------------------------------------------
PROTOCOL_STATE ProtocolManager::getProtocolState(Protocol* protocol)
{
for (unsigned int i = 0; i < m_protocols.size(); i++)
@ -463,12 +500,15 @@ PROTOCOL_STATE ProtocolManager::getProtocolState(Protocol* protocol)
}
for (unsigned int i = 0; i < m_requests.size(); i++)
{
if (m_requests[i].protocol_info.protocol == protocol) // the protocol is going to be started
// the protocol is going to be started
if (m_requests[i].protocol_info.protocol == protocol)
return PROTOCOL_STATE_RUNNING; // we can say it's running
}
return PROTOCOL_STATE_TERMINATED; // we don't know this protocol at all, it's finished
}
// we don't know this protocol at all, it's finished
return PROTOCOL_STATE_TERMINATED;
} // getProtocolState
// ----------------------------------------------------------------------------
uint32_t ProtocolManager::getProtocolID(Protocol* protocol)
{
for (unsigned int i = 0; i < m_protocols.size(); i++)
@ -477,8 +517,9 @@ uint32_t ProtocolManager::getProtocolID(Protocol* protocol)
return m_protocols[i].id;
}
return 0;
}
} // getProtocolID
// ----------------------------------------------------------------------------
Protocol* ProtocolManager::getProtocol(uint32_t id)
{
for (unsigned int i = 0; i < m_protocols.size(); i++)
@ -487,8 +528,9 @@ Protocol* ProtocolManager::getProtocol(uint32_t id)
return m_protocols[i].protocol;
}
return NULL;
}
} // getProtocol
// ----------------------------------------------------------------------------
Protocol* ProtocolManager::getProtocol(PROTOCOL_TYPE type)
{
for (unsigned int i = 0; i < m_protocols.size(); i++)
@ -497,13 +539,15 @@ Protocol* ProtocolManager::getProtocol(PROTOCOL_TYPE type)
return m_protocols[i].protocol;
}
return NULL;
}
} // getProtocol
// ----------------------------------------------------------------------------
bool ProtocolManager::isServer()
{
return NetworkManager::getInstance()->isServer();
}
} // isServer
// ----------------------------------------------------------------------------
int ProtocolManager::exit()
{
switch(pthread_mutex_trylock(&m_exit_mutex)) {
@ -514,14 +558,15 @@ int ProtocolManager::exit()
return 0;
}
return 1;
}
} // exit
// ----------------------------------------------------------------------------
void ProtocolManager::assignProtocolId(ProtocolInfo* protocol_info)
{
pthread_mutex_lock(&m_id_mutex);
protocol_info->id = m_next_protocol_id;
m_next_protocol_id++;
pthread_mutex_unlock(&m_id_mutex);
}
} // assignProtocolId

View File

@ -26,6 +26,7 @@
#include "network/event.hpp"
#include "network/network_string.hpp"
#include "network/protocol.hpp"
#include "utils/no_copy.hpp"
#include "utils/singleton.hpp"
#include "utils/types.hpp"
@ -102,10 +103,11 @@ typedef struct EventProcessingInfo
* frames per second. Then, the management of protocols is thread-safe: any
* object can start/pause/stop protocols whithout problems.
*/
class ProtocolManager : public AbstractSingleton<ProtocolManager>
class ProtocolManager : public AbstractSingleton<ProtocolManager>,
public NoCopy
{
friend class AbstractSingleton<ProtocolManager>;
friend void* protocolManagerAsynchronousUpdate(void* data);
static void* mainLoop(void *data);
public:
/*! \brief Stops the protocol manager. */