1
0

cNetwork: Added link creation callback.

This allows the callback classes to store the link inside them and use it internally later on, mainly for sending data.
This commit is contained in:
Mattes D 2015-01-22 13:00:32 +01:00
parent 5b4c5cf2be
commit dbf7f13bd4
6 changed files with 90 additions and 26 deletions

View File

@ -13,6 +13,14 @@
// fwd:
class cTCPLink;
typedef SharedPtr<cTCPLink> cTCPLinkPtr;
/** Interface that provides the methods available on a single TCP connection. */ /** Interface that provides the methods available on a single TCP connection. */
class cTCPLink class cTCPLink
{ {
@ -25,16 +33,20 @@ public:
// Force a virtual destructor for all descendants: // Force a virtual destructor for all descendants:
virtual ~cCallbacks() {} virtual ~cCallbacks() {}
/** Called when the cTCPLink for the connection is created.
The callback may store the cTCPLink instance for later use, but it should remove it in OnError(), OnRemoteClosed() or right after Close(). */
virtual void OnLinkCreated(cTCPLinkPtr a_Link) = 0;
/** Called when there's data incoming from the remote peer. */ /** Called when there's data incoming from the remote peer. */
virtual void OnReceivedData(cTCPLink & a_Link, const char * a_Data, size_t a_Length) = 0; virtual void OnReceivedData(const char * a_Data, size_t a_Length) = 0;
/** Called when the remote end closes the connection. /** Called when the remote end closes the connection.
The link is still available for connection information query (IP / port). The link is still available for connection information query (IP / port).
Sending data on the link is not an error, but the data won't be delivered. */ Sending data on the link is not an error, but the data won't be delivered. */
virtual void OnRemoteClosed(cTCPLink & a_Link) = 0; virtual void OnRemoteClosed(void) = 0;
/** Called when an error is detected on the connection. */ /** Called when an error is detected on the connection. */
virtual void OnError(cTCPLink & a_Link, int a_ErrorCode, const AString & a_ErrorMsg) = 0; virtual void OnError(int a_ErrorCode, const AString & a_ErrorMsg) = 0;
}; };
typedef SharedPtr<cCallbacks> cCallbacksPtr; typedef SharedPtr<cCallbacks> cCallbacksPtr;

View File

@ -283,6 +283,8 @@ void cServerHandleImpl::Callback(evconnlistener * a_Listener, evutil_socket_t a_
cCSLock Lock(Self->m_CS); cCSLock Lock(Self->m_CS);
Self->m_Connections.push_back(Link); Self->m_Connections.push_back(Link);
} // Lock(m_CS) } // Lock(m_CS)
LinkCallbacks->OnLinkCreated(Link);
Link->Enable();
// Call the OnAccepted callback: // Call the OnAccepted callback:
Self->m_ListenCallbacks->OnAccepted(*Link); Self->m_ListenCallbacks->OnAccepted(*Link);

View File

@ -20,9 +20,6 @@ cTCPLinkImpl::cTCPLinkImpl(cTCPLink::cCallbacksPtr a_LinkCallbacks):
m_BufferEvent(bufferevent_socket_new(cNetworkSingleton::Get().GetEventBase(), -1, BEV_OPT_CLOSE_ON_FREE)), m_BufferEvent(bufferevent_socket_new(cNetworkSingleton::Get().GetEventBase(), -1, BEV_OPT_CLOSE_ON_FREE)),
m_Server(nullptr) m_Server(nullptr)
{ {
// Create the LibEvent handle, but don't assign a socket to it yet (will be assigned within Connect() method):
bufferevent_setcb(m_BufferEvent, ReadCallback, nullptr, EventCallback, this);
bufferevent_enable(m_BufferEvent, EV_READ | EV_WRITE);
} }
@ -37,10 +34,6 @@ cTCPLinkImpl::cTCPLinkImpl(evutil_socket_t a_Socket, cTCPLink::cCallbacksPtr a_L
// Update the endpoint addresses: // Update the endpoint addresses:
UpdateLocalAddress(); UpdateLocalAddress();
UpdateAddress(a_Address, a_AddrLen, m_RemoteIP, m_RemotePort); UpdateAddress(a_Address, a_AddrLen, m_RemoteIP, m_RemotePort);
// Create the LibEvent handle:
bufferevent_setcb(m_BufferEvent, ReadCallback, nullptr, EventCallback, this);
bufferevent_enable(m_BufferEvent, EV_READ | EV_WRITE);
} }
@ -65,6 +58,8 @@ cTCPLinkImplPtr cTCPLinkImpl::Connect(const AString & a_Host, UInt16 a_Port, cTC
cTCPLinkImplPtr res{new cTCPLinkImpl(a_LinkCallbacks)}; // Cannot use std::make_shared here, constructor is not accessible cTCPLinkImplPtr res{new cTCPLinkImpl(a_LinkCallbacks)}; // Cannot use std::make_shared here, constructor is not accessible
res->m_ConnectCallbacks = a_ConnectCallbacks; res->m_ConnectCallbacks = a_ConnectCallbacks;
cNetworkSingleton::Get().AddLink(res); cNetworkSingleton::Get().AddLink(res);
res->m_Callbacks->OnLinkCreated(res);
res->Enable();
// If a_Host is an IP address, schedule a connection immediately: // If a_Host is an IP address, schedule a connection immediately:
sockaddr_storage sa; sockaddr_storage sa;
@ -107,6 +102,17 @@ cTCPLinkImplPtr cTCPLinkImpl::Connect(const AString & a_Host, UInt16 a_Port, cTC
void cTCPLinkImpl::Enable(void)
{
// Set the LibEvent callbacks and enable processing:
bufferevent_setcb(m_BufferEvent, ReadCallback, nullptr, EventCallback, this);
bufferevent_enable(m_BufferEvent, EV_READ | EV_WRITE);
}
bool cTCPLinkImpl::Send(const void * a_Data, size_t a_Length) bool cTCPLinkImpl::Send(const void * a_Data, size_t a_Length)
{ {
return (bufferevent_write(m_BufferEvent, a_Data, a_Length) == 0); return (bufferevent_write(m_BufferEvent, a_Data, a_Length) == 0);
@ -160,7 +166,7 @@ void cTCPLinkImpl::ReadCallback(bufferevent * a_BufferEvent, void * a_Self)
size_t length; size_t length;
while ((length = bufferevent_read(a_BufferEvent, data, sizeof(data))) > 0) while ((length = bufferevent_read(a_BufferEvent, data, sizeof(data))) > 0)
{ {
Self->m_Callbacks->OnReceivedData(*Self, data, length); Self->m_Callbacks->OnReceivedData(data, length);
} }
} }
@ -189,7 +195,7 @@ void cTCPLinkImpl::EventCallback(bufferevent * a_BufferEvent, short a_What, void
} }
else else
{ {
Self->m_Callbacks->OnError(*Self, err, evutil_socket_error_to_string(err)); Self->m_Callbacks->OnError(err, evutil_socket_error_to_string(err));
if (Self->m_Server == nullptr) if (Self->m_Server == nullptr)
{ {
cNetworkSingleton::Get().RemoveLink(Self); cNetworkSingleton::Get().RemoveLink(Self);
@ -219,7 +225,7 @@ void cTCPLinkImpl::EventCallback(bufferevent * a_BufferEvent, short a_What, void
// If the connection has been closed, call the link callback and remove the connection: // If the connection has been closed, call the link callback and remove the connection:
if (a_What & BEV_EVENT_EOF) if (a_What & BEV_EVENT_EOF)
{ {
Self->m_Callbacks->OnRemoteClosed(*Self); Self->m_Callbacks->OnRemoteClosed();
if (Self->m_Server != nullptr) if (Self->m_Server != nullptr)
{ {
Self->m_Server->RemoveLink(Self); Self->m_Server->RemoveLink(Self);

View File

@ -37,7 +37,8 @@ class cTCPLinkImpl:
public: public:
/** Creates a new link based on the given socket. /** Creates a new link based on the given socket.
Used for connections accepted in a server using cNetwork::Listen(). Used for connections accepted in a server using cNetwork::Listen().
a_Address and a_AddrLen describe the remote peer that has connected. */ a_Address and a_AddrLen describe the remote peer that has connected.
The link is created disabled, you need to call Enable() to start the regular communication. */
cTCPLinkImpl(evutil_socket_t a_Socket, cCallbacksPtr a_LinkCallbacks, cServerHandleImpl * a_Server, const sockaddr * a_Address, socklen_t a_AddrLen); cTCPLinkImpl(evutil_socket_t a_Socket, cCallbacksPtr a_LinkCallbacks, cServerHandleImpl * a_Server, const sockaddr * a_Address, socklen_t a_AddrLen);
/** Destroys the LibEvent handle representing the link. */ /** Destroys the LibEvent handle representing the link. */
@ -48,6 +49,11 @@ public:
Returns a link that has the connection request queued, or NULL for failure. */ Returns a link that has the connection request queued, or NULL for failure. */
static cTCPLinkImplPtr Connect(const AString & a_Host, UInt16 a_Port, cTCPLink::cCallbacksPtr a_LinkCallbacks, cNetwork::cConnectCallbacksPtr a_ConnectCallbacks); static cTCPLinkImplPtr Connect(const AString & a_Host, UInt16 a_Port, cTCPLink::cCallbacksPtr a_LinkCallbacks, cNetwork::cConnectCallbacksPtr a_ConnectCallbacks);
/** Enables communication over the link.
Links are created with communication disabled, so that creation callbacks can be called first.
This function then enables the regular communication to be reported. */
void Enable(void);
// cTCPLink overrides: // cTCPLink overrides:
virtual bool Send(const void * a_Data, size_t a_Length) override; virtual bool Send(const void * a_Data, size_t a_Length) override;
virtual AString GetLocalIP(void) const override { return m_LocalIP; } virtual AString GetLocalIP(void) const override { return m_LocalIP; }
@ -85,7 +91,8 @@ protected:
/** Creates a new link to be queued to connect to a specified host:port. /** Creates a new link to be queued to connect to a specified host:port.
Used for outgoing connections created using cNetwork::Connect(). Used for outgoing connections created using cNetwork::Connect().
To be used only by the Connect() factory function. */ To be used only by the Connect() factory function.
The link is created disabled, you need to call Enable() to start the regular communication. */
cTCPLinkImpl(const cCallbacksPtr a_LinkCallbacks); cTCPLinkImpl(const cCallbacksPtr a_LinkCallbacks);
/** Callback that LibEvent calls when there's data available from the remote peer. */ /** Callback that LibEvent calls when there's data available from the remote peer. */

View File

@ -16,11 +16,21 @@
class cEchoLinkCallbacks: class cEchoLinkCallbacks:
public cTCPLink::cCallbacks public cTCPLink::cCallbacks
{ {
virtual void OnReceivedData(cTCPLink & a_Link, const char * a_Data, size_t a_Size) override // cTCPLink::cCallbacks overrides:
virtual void OnLinkCreated(cTCPLinkPtr a_Link) override
{ {
ASSERT(m_Link == nullptr);
m_Link = a_Link;
}
virtual void OnReceivedData(const char * a_Data, size_t a_Size) override
{
ASSERT(m_Link != nullptr);
// Echo the incoming data back to outgoing data: // Echo the incoming data back to outgoing data:
LOGD("%p (%s:%d): Data received (%u bytes), echoing back.", &a_Link, a_Link.GetRemoteIP().c_str(), a_Link.GetRemotePort(), static_cast<unsigned>(a_Size)); LOGD("%p (%s:%d): Data received (%u bytes), echoing back.", m_Link.get(), m_Link->GetRemoteIP().c_str(), m_Link->GetRemotePort(), static_cast<unsigned>(a_Size));
a_Link.Send(a_Data, a_Size); m_Link->Send(a_Data, a_Size);
LOGD("Echo queued"); LOGD("Echo queued");
// Search for a Ctrl+Z, if found, drop the connection: // Search for a Ctrl+Z, if found, drop the connection:
@ -28,21 +38,33 @@ class cEchoLinkCallbacks:
{ {
if (a_Data[i] == '\x1a') if (a_Data[i] == '\x1a')
{ {
a_Link.Close(); m_Link->Close();
m_Link.reset();
return; return;
} }
} }
} }
virtual void OnRemoteClosed(cTCPLink & a_Link) override
virtual void OnRemoteClosed(void) override
{ {
LOGD("%p (%s:%d): Remote has closed the connection.", &a_Link, a_Link.GetRemoteIP().c_str(), a_Link.GetRemotePort()); ASSERT(m_Link != nullptr);
LOGD("%p (%s:%d): Remote has closed the connection.", m_Link.get(), m_Link->GetRemoteIP().c_str(), m_Link->GetRemotePort());
m_Link.reset();
} }
virtual void OnError(cTCPLink & a_Link, int a_ErrorCode, const AString & a_ErrorMsg) override
virtual void OnError(int a_ErrorCode, const AString & a_ErrorMsg) override
{ {
LOGD("%p (%s:%d): Error %d in the cEchoLinkCallbacks: %s", &a_Link, a_Link.GetRemoteIP().c_str(), a_Link.GetRemotePort(), a_ErrorCode, a_ErrorMsg.c_str()); ASSERT(m_Link != nullptr);
LOGD("%p (%s:%d): Error %d in the cEchoLinkCallbacks: %s", m_Link.get(), m_Link->GetRemoteIP().c_str(), m_Link->GetRemotePort(), a_ErrorCode, a_ErrorMsg.c_str());
m_Link.reset();
} }
/** The link attached to this callbacks instance. */
cTCPLinkPtr m_Link;
}; };

View File

@ -49,24 +49,39 @@ class cDumpCallbacks:
public cTCPLink::cCallbacks public cTCPLink::cCallbacks
{ {
cEvent & m_Event; cEvent & m_Event;
cTCPLinkPtr m_Link;
virtual void OnReceivedData(cTCPLink & a_Link, const char * a_Data, size_t a_Size) override virtual void OnLinkCreated(cTCPLinkPtr a_Link) override
{ {
ASSERT(m_Link == nullptr);
m_Link = a_Link;
}
virtual void OnReceivedData(const char * a_Data, size_t a_Size) override
{
ASSERT(m_Link != nullptr);
// Log the incoming data size: // Log the incoming data size:
AString Hex; AString Hex;
CreateHexDump(Hex, a_Data, a_Size, 16); CreateHexDump(Hex, a_Data, a_Size, 16);
LOGD("Incoming data: %u bytes:\n%s", static_cast<unsigned>(a_Size), Hex.c_str()); LOGD("Incoming data: %u bytes:\n%s", static_cast<unsigned>(a_Size), Hex.c_str());
} }
virtual void OnRemoteClosed(cTCPLink & a_Link) override virtual void OnRemoteClosed(void) override
{ {
ASSERT(m_Link != nullptr);
LOGD("Remote has closed the connection."); LOGD("Remote has closed the connection.");
m_Link.reset();
m_Event.Set(); m_Event.Set();
} }
virtual void OnError(cTCPLink & a_Link, int a_ErrorCode, const AString & a_ErrorMsg) override virtual void OnError(int a_ErrorCode, const AString & a_ErrorMsg) override
{ {
ASSERT(m_Link != nullptr);
LOGD("Error %d (%s) in the cDumpCallbacks.", a_ErrorCode, a_ErrorMsg.c_str()); LOGD("Error %d (%s) in the cDumpCallbacks.", a_ErrorCode, a_ErrorMsg.c_str());
m_Link.reset();
m_Event.Set(); m_Event.Set();
} }