1
0

Fixed TCP link shutdown.

The shutdown is postponed until there's no more outgoing data in the LibEvent buffers.
This commit is contained in:
Mattes D 2015-02-14 13:55:54 +01:00
parent 1ca0a4915e
commit d336a3ea9e
3 changed files with 71 additions and 11 deletions

View File

@ -163,14 +163,15 @@ local g_Services =
OnReceivedData = function (a_Link, a_Data) OnReceivedData = function (a_Link, a_Data)
IncomingData = IncomingData .. a_Data IncomingData = IncomingData .. a_Data
if (IncomingData:find("\r\n\r\n")) then if (IncomingData:find("\r\n\r\n")) then
-- We have received the entire request headers, just send the response and shutdown the link:
local Content = os.date() local Content = os.date()
a_Link:Send("HTTP/1.0 200 OK\r\nContent-type: text/plain\r\nContent-length: " .. #Content .. "\r\n\r\n" .. Content) a_Link:Send("HTTP/1.0 200 OK\r\nContent-type: text/plain\r\nContent-length: " .. #Content .. "\r\n\r\n" .. Content)
-- TODO: shutdown is not yet properly implemented in cTCPLink a_Link:Shutdown()
-- a_Link:Shutdown()
end end
end, end,
OnRemoteClosed = function (a_Link) OnRemoteClosed = function (a_Link)
LOG("httpstime: link closed by remote")
end end
} -- Link callbacks } -- Link callbacks
end, -- OnIncomingConnection() end, -- OnIncomingConnection()

View File

@ -7,6 +7,7 @@
#include "TCPLinkImpl.h" #include "TCPLinkImpl.h"
#include "NetworkSingleton.h" #include "NetworkSingleton.h"
#include "ServerHandleImpl.h" #include "ServerHandleImpl.h"
#include "event2/buffer.h"
@ -17,7 +18,10 @@
cTCPLinkImpl::cTCPLinkImpl(cTCPLink::cCallbacksPtr a_LinkCallbacks): cTCPLinkImpl::cTCPLinkImpl(cTCPLink::cCallbacksPtr a_LinkCallbacks):
super(a_LinkCallbacks), super(a_LinkCallbacks),
m_BufferEvent(bufferevent_socket_new(cNetworkSingleton::Get().GetEventBase(), -1, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE)) m_BufferEvent(bufferevent_socket_new(cNetworkSingleton::Get().GetEventBase(), -1, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE)),
m_LocalPort(0),
m_RemotePort(0),
m_ShouldShutdown(false)
{ {
LOGD("Created new cTCPLinkImpl at %p with BufferEvent at %p", this, m_BufferEvent); LOGD("Created new cTCPLinkImpl at %p with BufferEvent at %p", this, m_BufferEvent);
} }
@ -29,7 +33,10 @@ cTCPLinkImpl::cTCPLinkImpl(cTCPLink::cCallbacksPtr a_LinkCallbacks):
cTCPLinkImpl::cTCPLinkImpl(evutil_socket_t a_Socket, cTCPLink::cCallbacksPtr a_LinkCallbacks, cServerHandleImplPtr a_Server, const sockaddr * a_Address, socklen_t a_AddrLen): cTCPLinkImpl::cTCPLinkImpl(evutil_socket_t a_Socket, cTCPLink::cCallbacksPtr a_LinkCallbacks, cServerHandleImplPtr a_Server, const sockaddr * a_Address, socklen_t a_AddrLen):
super(a_LinkCallbacks), super(a_LinkCallbacks),
m_BufferEvent(bufferevent_socket_new(cNetworkSingleton::Get().GetEventBase(), a_Socket, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE)), m_BufferEvent(bufferevent_socket_new(cNetworkSingleton::Get().GetEventBase(), a_Socket, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE)),
m_Server(a_Server) m_Server(a_Server),
m_LocalPort(0),
m_RemotePort(0),
m_ShouldShutdown(false)
{ {
LOGD("Created new cTCPLinkImpl at %p with BufferEvent at %p", this, m_BufferEvent); LOGD("Created new cTCPLinkImpl at %p with BufferEvent at %p", this, m_BufferEvent);
@ -111,7 +118,7 @@ void cTCPLinkImpl::Enable(cTCPLinkImplPtr a_Self)
m_Self = a_Self; m_Self = a_Self;
// Set the LibEvent callbacks and enable processing: // Set the LibEvent callbacks and enable processing:
bufferevent_setcb(m_BufferEvent, ReadCallback, nullptr, EventCallback, this); bufferevent_setcb(m_BufferEvent, ReadCallback, WriteCallback, EventCallback, this);
bufferevent_enable(m_BufferEvent, EV_READ | EV_WRITE); bufferevent_enable(m_BufferEvent, EV_READ | EV_WRITE);
} }
@ -121,6 +128,11 @@ void cTCPLinkImpl::Enable(cTCPLinkImplPtr a_Self)
bool cTCPLinkImpl::Send(const void * a_Data, size_t a_Length) bool cTCPLinkImpl::Send(const void * a_Data, size_t a_Length)
{ {
if (m_ShouldShutdown)
{
LOGD("%s: Cannot send data, the link is already shut down.", __FUNCTION__);
return false;
}
return (bufferevent_write(m_BufferEvent, a_Data, a_Length) == 0); return (bufferevent_write(m_BufferEvent, a_Data, a_Length) == 0);
} }
@ -130,12 +142,15 @@ bool cTCPLinkImpl::Send(const void * a_Data, size_t a_Length)
void cTCPLinkImpl::Shutdown(void) void cTCPLinkImpl::Shutdown(void)
{ {
#ifdef _WIN32 // If there's no outgoing data, shutdown the socket directly:
shutdown(bufferevent_getfd(m_BufferEvent), SD_SEND); if (evbuffer_get_length(bufferevent_get_output(m_BufferEvent)) == 0)
#else {
shutdown(bufferevent_getfd(m_BufferEvent), SHUT_WR); DoActualShutdown();
#endif return;
bufferevent_disable(m_BufferEvent, EV_WRITE); }
// There's still outgoing data in the LibEvent buffer, schedule a shutdown when it's written to OS's TCP stack:
m_ShouldShutdown = true;
} }
@ -181,6 +196,24 @@ void cTCPLinkImpl::ReadCallback(bufferevent * a_BufferEvent, void * a_Self)
void cTCPLinkImpl::WriteCallback(bufferevent * a_BufferEvent, void * a_Self)
{
ASSERT(a_Self != nullptr);
auto Self = static_cast<cTCPLinkImpl *>(a_Self);
ASSERT(Self->m_Callbacks != nullptr);
// If there's no more data to write and the link has been scheduled for shutdown, do the shutdown:
auto OutLen = evbuffer_get_length(bufferevent_get_output(Self->m_BufferEvent));
if ((OutLen == 0) && (Self->m_ShouldShutdown))
{
Self->DoActualShutdown();
}
}
void cTCPLinkImpl::EventCallback(bufferevent * a_BufferEvent, short a_What, void * a_Self) void cTCPLinkImpl::EventCallback(bufferevent * a_BufferEvent, short a_What, void * a_Self)
{ {
LOGD("cTCPLink event callback for link %p, BEV %p; what = 0x%02x", a_Self, a_BufferEvent, a_What); LOGD("cTCPLink event callback for link %p, BEV %p; what = 0x%02x", a_Self, a_BufferEvent, a_What);
@ -316,6 +349,20 @@ void cTCPLinkImpl::UpdateRemoteAddress(void)
void cTCPLinkImpl::DoActualShutdown(void)
{
#ifdef _WIN32
shutdown(bufferevent_getfd(m_BufferEvent), SD_SEND);
#else
shutdown(bufferevent_getfd(m_BufferEvent), SHUT_WR);
#endif
bufferevent_disable(m_BufferEvent, EV_WRITE);
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// cNetwork API: // cNetwork API:

View File

@ -94,6 +94,11 @@ protected:
Initialized in Enable(), cleared in Close() and EventCallback(RemoteClosed). */ Initialized in Enable(), cleared in Close() and EventCallback(RemoteClosed). */
cTCPLinkImplPtr m_Self; cTCPLinkImplPtr m_Self;
/** If true, Shutdown() has been called and is in queue.
No more data is allowed to be sent via Send() and after all the currently buffered
data is sent to the OS TCP stack, the socket gets shut down. */
bool m_ShouldShutdown;
/** Creates a new link to be queued to connect to a specified host:port. /** Creates a new link to be queued to connect to a specified host:port.
Used for outgoing connections created using cNetwork::Connect(). Used for outgoing connections created using cNetwork::Connect().
@ -104,6 +109,9 @@ protected:
/** Callback that LibEvent calls when there's data available from the remote peer. */ /** Callback that LibEvent calls when there's data available from the remote peer. */
static void ReadCallback(bufferevent * a_BufferEvent, void * a_Self); static void ReadCallback(bufferevent * a_BufferEvent, void * a_Self);
/** Callback that LibEvent calls when the remote peer can receive more data. */
static void WriteCallback(bufferevent * a_BufferEvent, void * a_Self);
/** Callback that LibEvent calls when there's a non-data-related event on the socket. */ /** Callback that LibEvent calls when there's a non-data-related event on the socket. */
static void EventCallback(bufferevent * a_BufferEvent, short a_What, void * a_Self); static void EventCallback(bufferevent * a_BufferEvent, short a_What, void * a_Self);
@ -115,6 +123,10 @@ protected:
/** Updates m_RemoteIP and m_RemotePort based on the metadata read from the socket. */ /** Updates m_RemoteIP and m_RemotePort based on the metadata read from the socket. */
void UpdateRemoteAddress(void); void UpdateRemoteAddress(void);
/** Calls shutdown on the link and disables LibEvent writing.
Called after all data from LibEvent buffers is sent to the OS TCP stack and shutdown() has been called before. */
void DoActualShutdown(void);
}; };