1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2025-01-05 00:47:51 -05:00

merge http.Connection into net.Connection

This commit is contained in:
Darien Raymond 2018-04-20 21:30:58 +02:00
parent a7103481d5
commit d7aeb51904
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169
6 changed files with 43 additions and 74 deletions

View File

@ -80,7 +80,7 @@ func (co *Outbound) Dispatch(ctx context.Context, link *core.Link) {
} }
closeSignal := signal.NewNotifier() closeSignal := signal.NewNotifier()
c := net.NewConnection(net.ConnectionInputMulti(link.Writer), net.ConnectionOutputMulti(link.Reader), net.ConnectionOnClose(closeSignal)) c := net.NewConnection(net.ConnectionInputMulti(link.Writer), net.ConnectionOutputMulti(link.Reader), net.ConnectionOnClose(signal.NotifyClose(closeSignal)))
co.listener.add(c) co.listener.add(c)
co.access.RUnlock() co.access.RUnlock()
<-closeSignal.Wait() <-closeSignal.Wait()

View File

@ -48,15 +48,23 @@ func ConnectionOutputMulti(reader buf.Reader) ConnectionOption {
} }
} }
func ConnectionOnClose(s *signal.Notifier) ConnectionOption { func ConnectionOnClose(n io.Closer) ConnectionOption {
return func(c *connection) { return func(c *connection) {
c.onClose = s c.onClose = n
} }
} }
func NewConnection(opts ...ConnectionOption) net.Conn { func NewConnection(opts ...ConnectionOption) net.Conn {
c := &connection{ c := &connection{
done: signal.NewDone(), done: signal.NewDone(),
local: &net.TCPAddr{
IP: []byte{0, 0, 0, 0},
Port: 0,
},
remote: &net.TCPAddr{
IP: []byte{0, 0, 0, 0},
Port: 0,
},
} }
for _, opt := range opts { for _, opt := range opts {
@ -70,7 +78,7 @@ type connection struct {
reader *buf.BufferedReader reader *buf.BufferedReader
writer buf.Writer writer buf.Writer
done *signal.Done done *signal.Done
onClose *signal.Notifier onClose io.Closer
local Addr local Addr
remote Addr remote Addr
} }
@ -110,7 +118,7 @@ func (c *connection) Close() error {
common.Close(c.reader) common.Close(c.reader)
common.Close(c.writer) common.Close(c.writer)
if c.onClose != nil { if c.onClose != nil {
c.onClose.Signal() return c.onClose.Close()
} }
return nil return nil

View File

@ -1,5 +1,7 @@
package signal package signal
import "io"
// Notifier is a utility for notifying changes. The change producer may notify changes multiple time, and the consumer may get notified asynchronously. // Notifier is a utility for notifying changes. The change producer may notify changes multiple time, and the consumer may get notified asynchronously.
type Notifier struct { type Notifier struct {
c chan struct{} c chan struct{}
@ -24,3 +26,18 @@ func (n *Notifier) Signal() {
func (n *Notifier) Wait() <-chan struct{} { func (n *Notifier) Wait() <-chan struct{} {
return n.c return n.c
} }
type nCloser struct {
n *Notifier
}
func (c *nCloser) Close() error {
c.n.Signal()
return nil
}
func NotifyClose(n *Notifier) io.Closer {
return &nCloser{
n: n,
}
}

View File

@ -1,49 +0,0 @@
package http
import (
"io"
"time"
"v2ray.com/core/common"
"v2ray.com/core/common/net"
)
type Connection struct {
Reader io.Reader
Writer io.Writer
Closer common.Closable
Local net.Addr
Remote net.Addr
}
func (c *Connection) Read(b []byte) (int, error) {
return c.Reader.Read(b)
}
func (c *Connection) Write(b []byte) (int, error) {
return c.Writer.Write(b)
}
func (c *Connection) Close() error {
return c.Closer.Close()
}
func (c *Connection) LocalAddr() net.Addr {
return c.Local
}
func (c *Connection) RemoteAddr() net.Addr {
return c.Remote
}
func (c *Connection) SetDeadline(t time.Time) error {
return nil
}
func (c *Connection) SetReadDeadline(t time.Time) error {
return nil
}
func (c *Connection) SetWriteDeadline(t time.Time) error {
return nil
}

View File

@ -112,19 +112,11 @@ func Dial(ctx context.Context, dest net.Destination) (internet.Connection, error
bwriter := buf.NewBufferedWriter(pwriter) bwriter := buf.NewBufferedWriter(pwriter)
common.Must(bwriter.SetBuffered(false)) common.Must(bwriter.SetBuffered(false))
return &Connection{ return net.NewConnection(
Reader: response.Body, net.ConnectionOutput(response.Body),
Writer: bwriter, net.ConnectionInput(bwriter),
Closer: common.NewChainedClosable(breader, bwriter, response.Body), net.ConnectionOnClose(common.NewChainedClosable(breader, bwriter, response.Body)),
Local: &net.TCPAddr{ ), nil
IP: []byte{0, 0, 0, 0},
Port: 0,
},
Remote: &net.TCPAddr{
IP: []byte{0, 0, 0, 0},
Port: 0,
},
}, nil
} }
func init() { func init() {

View File

@ -64,13 +64,14 @@ func (l *Listener) ServeHTTP(writer http.ResponseWriter, request *http.Request)
f.Flush() f.Flush()
} }
done := signal.NewDone() done := signal.NewDone()
l.handler(&Connection{ conn := net.NewConnection(
Reader: request.Body, net.ConnectionOutput(request.Body),
Writer: flushWriter{w: writer, d: done}, net.ConnectionInput(flushWriter{w: writer, d: done}),
Closer: common.NewChainedClosable(done, request.Body), net.ConnectionOnClose(common.NewChainedClosable(done, request.Body)),
Local: l.Addr(), net.ConnectionLocalAddr(l.Addr()),
Remote: l.Addr(), net.ConnectionRemoteAddr(l.Addr()),
}) )
l.handler(conn)
<-done.Wait() <-done.Wait()
} }