diff --git a/app/commander/outbound.go b/app/commander/outbound.go index 7b46f9791..a5886060c 100644 --- a/app/commander/outbound.go +++ b/app/commander/outbound.go @@ -80,7 +80,7 @@ func (co *Outbound) Dispatch(ctx context.Context, link *core.Link) { } closeSignal := signal.NewNotifier() - c := net.NewConnection(net.ConnectionInputMulti(link.Writer), net.ConnectionOutputMulti(link.Reader), net.ConnectionOnClose(closeSignal)) + c := net.NewConnection(net.ConnectionInputMulti(link.Writer), net.ConnectionOutputMulti(link.Reader), net.ConnectionOnClose(signal.NotifyClose(closeSignal))) co.listener.add(c) co.access.RUnlock() <-closeSignal.Wait() diff --git a/common/net/connection.go b/common/net/connection.go index 12998fbe8..cd3dfb07d 100644 --- a/common/net/connection.go +++ b/common/net/connection.go @@ -48,15 +48,23 @@ func ConnectionOutputMulti(reader buf.Reader) ConnectionOption { } } -func ConnectionOnClose(s *signal.Notifier) ConnectionOption { +func ConnectionOnClose(n io.Closer) ConnectionOption { return func(c *connection) { - c.onClose = s + c.onClose = n } } func NewConnection(opts ...ConnectionOption) net.Conn { c := &connection{ done: signal.NewDone(), + local: &net.TCPAddr{ + IP: []byte{0, 0, 0, 0}, + Port: 0, + }, + remote: &net.TCPAddr{ + IP: []byte{0, 0, 0, 0}, + Port: 0, + }, } for _, opt := range opts { @@ -70,7 +78,7 @@ type connection struct { reader *buf.BufferedReader writer buf.Writer done *signal.Done - onClose *signal.Notifier + onClose io.Closer local Addr remote Addr } @@ -110,7 +118,7 @@ func (c *connection) Close() error { common.Close(c.reader) common.Close(c.writer) if c.onClose != nil { - c.onClose.Signal() + return c.onClose.Close() } return nil diff --git a/common/signal/notifier.go b/common/signal/notifier.go index 19836e54f..908bc9e81 100755 --- a/common/signal/notifier.go +++ b/common/signal/notifier.go @@ -1,5 +1,7 @@ package signal +import "io" + // Notifier is a utility for notifying changes. The change producer may notify changes multiple time, and the consumer may get notified asynchronously. type Notifier struct { c chan struct{} @@ -24,3 +26,18 @@ func (n *Notifier) Signal() { func (n *Notifier) Wait() <-chan struct{} { return n.c } + +type nCloser struct { + n *Notifier +} + +func (c *nCloser) Close() error { + c.n.Signal() + return nil +} + +func NotifyClose(n *Notifier) io.Closer { + return &nCloser{ + n: n, + } +} diff --git a/transport/internet/http/connection.go b/transport/internet/http/connection.go deleted file mode 100644 index 8f6808672..000000000 --- a/transport/internet/http/connection.go +++ /dev/null @@ -1,49 +0,0 @@ -package http - -import ( - "io" - "time" - - "v2ray.com/core/common" - "v2ray.com/core/common/net" -) - -type Connection struct { - Reader io.Reader - Writer io.Writer - Closer common.Closable - Local net.Addr - Remote net.Addr -} - -func (c *Connection) Read(b []byte) (int, error) { - return c.Reader.Read(b) -} - -func (c *Connection) Write(b []byte) (int, error) { - return c.Writer.Write(b) -} - -func (c *Connection) Close() error { - return c.Closer.Close() -} - -func (c *Connection) LocalAddr() net.Addr { - return c.Local -} - -func (c *Connection) RemoteAddr() net.Addr { - return c.Remote -} - -func (c *Connection) SetDeadline(t time.Time) error { - return nil -} - -func (c *Connection) SetReadDeadline(t time.Time) error { - return nil -} - -func (c *Connection) SetWriteDeadline(t time.Time) error { - return nil -} diff --git a/transport/internet/http/dialer.go b/transport/internet/http/dialer.go index ba4f20220..a1daf4b4e 100644 --- a/transport/internet/http/dialer.go +++ b/transport/internet/http/dialer.go @@ -112,19 +112,11 @@ func Dial(ctx context.Context, dest net.Destination) (internet.Connection, error bwriter := buf.NewBufferedWriter(pwriter) common.Must(bwriter.SetBuffered(false)) - return &Connection{ - Reader: response.Body, - Writer: bwriter, - Closer: common.NewChainedClosable(breader, bwriter, response.Body), - Local: &net.TCPAddr{ - IP: []byte{0, 0, 0, 0}, - Port: 0, - }, - Remote: &net.TCPAddr{ - IP: []byte{0, 0, 0, 0}, - Port: 0, - }, - }, nil + return net.NewConnection( + net.ConnectionOutput(response.Body), + net.ConnectionInput(bwriter), + net.ConnectionOnClose(common.NewChainedClosable(breader, bwriter, response.Body)), + ), nil } func init() { diff --git a/transport/internet/http/hub.go b/transport/internet/http/hub.go index 6c9828452..c5bd8d79c 100644 --- a/transport/internet/http/hub.go +++ b/transport/internet/http/hub.go @@ -64,13 +64,14 @@ func (l *Listener) ServeHTTP(writer http.ResponseWriter, request *http.Request) f.Flush() } done := signal.NewDone() - l.handler(&Connection{ - Reader: request.Body, - Writer: flushWriter{w: writer, d: done}, - Closer: common.NewChainedClosable(done, request.Body), - Local: l.Addr(), - Remote: l.Addr(), - }) + conn := net.NewConnection( + net.ConnectionOutput(request.Body), + net.ConnectionInput(flushWriter{w: writer, d: done}), + net.ConnectionOnClose(common.NewChainedClosable(done, request.Body)), + net.ConnectionLocalAddr(l.Addr()), + net.ConnectionRemoteAddr(l.Addr()), + ) + l.handler(conn) <-done.Wait() }