From 609dbc1f130c97fa2a8e144151a15667afd963ea Mon Sep 17 00:00:00 2001 From: Darien Raymond Date: Fri, 30 Dec 2016 00:32:20 +0100 Subject: [PATCH] task engine for all proxies --- common/signal/exec_test.go | 87 ++++++++++++++++++++++++++++ proxy/freedom/freedom.go | 29 +++++++--- proxy/http/server.go | 79 ++++++++++++------------- proxy/shadowsocks/client.go | 85 +++++++++++++++------------ proxy/shadowsocks/server.go | 50 +++++++++------- proxy/socks/server.go | 31 ++++++---- proxy/vmess/inbound/inbound.go | 11 ++-- proxy/vmess/outbound/outbound.go | 98 +++++++++++++++----------------- transport/ray/direct.go | 6 +- transport/ray/ray.go | 2 + 10 files changed, 304 insertions(+), 174 deletions(-) create mode 100644 common/signal/exec_test.go diff --git a/common/signal/exec_test.go b/common/signal/exec_test.go new file mode 100644 index 000000000..75c76f27f --- /dev/null +++ b/common/signal/exec_test.go @@ -0,0 +1,87 @@ +package signal_test + +import ( + "errors" + "testing" + + . "v2ray.com/core/common/signal" + "v2ray.com/core/testing/assert" +) + +func TestErrorOrFinish2_Error(t *testing.T) { + assert := assert.On(t) + + c1 := make(chan error, 1) + c2 := make(chan error, 2) + c := make(chan error, 1) + + go func() { + c <- ErrorOrFinish2(c1, c2) + }() + + c1 <- errors.New("test") + err := <-c + assert.String(err.Error()).Equals("test") +} + +func TestErrorOrFinish2_Error2(t *testing.T) { + assert := assert.On(t) + + c1 := make(chan error, 1) + c2 := make(chan error, 2) + c := make(chan error, 1) + + go func() { + c <- ErrorOrFinish2(c1, c2) + }() + + c2 <- errors.New("test") + err := <-c + assert.String(err.Error()).Equals("test") +} + +func TestErrorOrFinish2_NoneError(t *testing.T) { + assert := assert.On(t) + + c1 := make(chan error, 1) + c2 := make(chan error, 2) + c := make(chan error, 1) + + go func() { + c <- ErrorOrFinish2(c1, c2) + }() + + close(c1) + select { + case <-c: + t.Fail() + default: + } + + close(c2) + err := <-c + assert.Error(err).IsNil() +} + +func TestErrorOrFinish2_NoneError2(t *testing.T) { + assert := assert.On(t) + + c1 := make(chan error, 1) + c2 := make(chan error, 2) + c := make(chan error, 1) + + go func() { + c <- ErrorOrFinish2(c1, c2) + }() + + close(c2) + select { + case <-c: + t.Fail() + default: + } + + close(c1) + err := <-c + assert.Error(err).IsNil() +} diff --git a/proxy/freedom/freedom.go b/proxy/freedom/freedom.go index 28ca93d4e..3249a8fa5 100644 --- a/proxy/freedom/freedom.go +++ b/proxy/freedom/freedom.go @@ -13,6 +13,7 @@ import ( v2net "v2ray.com/core/common/net" "v2ray.com/core/common/retry" "v2ray.com/core/common/serial" + "v2ray.com/core/common/signal" "v2ray.com/core/proxy" "v2ray.com/core/transport/internet" "v2ray.com/core/transport/ray" @@ -101,14 +102,17 @@ func (v *Handler) Dispatch(destination v2net.Destination, payload *buf.Buffer, r } } - go func() { + requestDone := signal.ExecuteAsync(func() error { + defer input.Release() + v2writer := buf.NewWriter(conn) defer v2writer.Release() if err := buf.PipeUntilEOF(input, v2writer); err != nil { - log.Info("Freedom: Failed to transport all TCP request: ", err) + return err } - }() + return nil + }) var reader io.Reader = conn @@ -120,12 +124,21 @@ func (v *Handler) Dispatch(destination v2net.Destination, payload *buf.Buffer, r reader = v2net.NewTimeOutReader(timeout /* seconds */, conn) } - v2reader := buf.NewReader(reader) - if err := buf.PipeUntilEOF(v2reader, output); err != nil { - log.Info("Freedom: Failed to transport all TCP response: ", err) + responseDone := signal.ExecuteAsync(func() error { + defer output.Close() + + v2reader := buf.NewReader(reader) + defer v2reader.Release() + + if err := buf.PipeUntilEOF(v2reader, output); err != nil { + return err + } + return nil + }) + + if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil { + log.Info("Freedom: Connection ending with ", err) } - v2reader.Release() - ray.OutboundOutput().Close() } type Factory struct{} diff --git a/proxy/http/server.go b/proxy/http/server.go index 30cfec087..bd06edede 100644 --- a/proxy/http/server.go +++ b/proxy/http/server.go @@ -17,9 +17,9 @@ import ( "v2ray.com/core/common/log" v2net "v2ray.com/core/common/net" "v2ray.com/core/common/serial" + "v2ray.com/core/common/signal" "v2ray.com/core/proxy" "v2ray.com/core/transport/internet" - "v2ray.com/core/transport/ray" ) // Server is a HTTP proxy server. @@ -155,35 +155,32 @@ func (v *Server) handleConnect(request *http.Request, session *proxy.SessionInfo } ray := v.packetDispatcher.DispatchToOutbound(session) - v.transport(reader, writer, ray) -} -func (v *Server) transport(input io.Reader, output io.Writer, ray ray.InboundRay) { - var wg sync.WaitGroup - wg.Add(2) - defer wg.Wait() + requestDone := signal.ExecuteAsync(func() error { + defer ray.InboundInput().Close() - go func() { - v2reader := buf.NewReader(input) + v2reader := buf.NewReader(reader) defer v2reader.Release() if err := buf.PipeUntilEOF(v2reader, ray.InboundInput()); err != nil { - log.Info("HTTP: Failed to transport all TCP request: ", err) + return err } - ray.InboundInput().Close() - wg.Done() - }() + return nil + }) - go func() { - v2writer := buf.NewWriter(output) + responseDone := signal.ExecuteAsync(func() error { + defer ray.InboundOutput().Release() + + v2writer := buf.NewWriter(writer) defer v2writer.Release() if err := buf.PipeUntilEOF(ray.InboundOutput(), v2writer); err != nil { - log.Info("HTTP: Failed to transport all TCP response: ", err) + return err } - ray.InboundOutput().Release() - wg.Done() - }() + return nil + }) + + signal.ErrorOrFinish2(requestDone, responseDone) } // @VisibleForTesting @@ -239,27 +236,26 @@ func (v *Server) handlePlainHTTP(request *http.Request, session *proxy.SessionIn StripHopByHopHeaders(request) ray := v.packetDispatcher.DispatchToOutbound(session) - defer ray.InboundInput().Close() - defer ray.InboundOutput().Release() - var finish sync.WaitGroup - finish.Add(1) - go func() { - defer finish.Done() + requestDone := signal.ExecuteAsync(func() error { + defer ray.InboundInput().Close() + requestWriter := bufio.NewWriter(buf.NewBytesWriter(ray.InboundInput())) defer requestWriter.Release() err := request.Write(requestWriter) if err != nil { - log.Warning("HTTP: Failed to write request: ", err) - return + return err } - requestWriter.Flush() - }() + if err := requestWriter.Flush(); err != nil { + return err + } + return nil + }) + + responseDone := signal.ExecuteAsync(func() error { + defer ray.InboundOutput().Release() - finish.Add(1) - go func() { - defer finish.Done() responseReader := bufio.OriginalReader(buf.NewBytesReader(ray.InboundOutput())) response, err := http.ReadResponse(responseReader, request) if err != nil { @@ -267,14 +263,19 @@ func (v *Server) handlePlainHTTP(request *http.Request, session *proxy.SessionIn response = v.GenerateResponse(503, "Service Unavailable") } responseWriter := bufio.NewWriter(writer) - err = response.Write(responseWriter) - if err != nil { - log.Warning("HTTP: Failed to write response: ", err) - return + if err := response.Write(responseWriter); err != nil { + return err } - responseWriter.Flush() - }() - finish.Wait() + + if err := responseWriter.Flush(); err != nil { + return err + } + return nil + }) + + if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil { + log.Info("HTTP|Server: Connecton ending with ", err) + } } // ServerFactory is a InboundHandlerFactory. diff --git a/proxy/shadowsocks/client.go b/proxy/shadowsocks/client.go index 8ab3616fe..12b41e5e4 100644 --- a/proxy/shadowsocks/client.go +++ b/proxy/shadowsocks/client.go @@ -1,8 +1,6 @@ package shadowsocks import ( - "sync" - "v2ray.com/core/app" "v2ray.com/core/common/buf" "v2ray.com/core/common/bufio" @@ -10,6 +8,7 @@ import ( v2net "v2ray.com/core/common/net" "v2ray.com/core/common/protocol" "v2ray.com/core/common/retry" + "v2ray.com/core/common/signal" "v2ray.com/core/proxy" "v2ray.com/core/transport/internet" "v2ray.com/core/transport/ray" @@ -38,8 +37,6 @@ func NewClient(config *ClientConfig, space app.Space, meta *proxy.OutboundHandle // Dispatch implements OutboundHandler.Dispatch(). func (v *Client) Dispatch(destination v2net.Destination, payload *buf.Buffer, ray ray.OutboundRay) { defer payload.Release() - defer ray.OutboundInput().Release() - defer ray.OutboundOutput().Close() network := destination.Network @@ -109,48 +106,38 @@ func (v *Client) Dispatch(destination v2net.Destination, payload *buf.Buffer, ra } } - var responseMutex sync.Mutex - responseMutex.Lock() + bufferedWriter.SetBuffered(false) - go func() { - defer responseMutex.Unlock() + requestDone := signal.ExecuteAsync(func() error { + defer ray.OutboundInput().Release() + + if err := buf.PipeUntilEOF(ray.OutboundInput(), bodyWriter); err != nil { + return err + } + return nil + }) + + responseDone := signal.ExecuteAsync(func() error { + defer ray.OutboundOutput().Close() responseReader, err := ReadTCPResponse(user, conn) if err != nil { - log.Warning("Shadowsocks|Client: Failed to read response: ", err) - return + return err } if err := buf.PipeUntilEOF(responseReader, ray.OutboundOutput()); err != nil { - log.Info("Shadowsocks|Client: Failed to transport all TCP response: ", err) + return err } - }() - bufferedWriter.SetBuffered(false) - if err := buf.PipeUntilEOF(ray.OutboundInput(), bodyWriter); err != nil { - log.Info("Shadowsocks|Client: Failed to trasnport all TCP request: ", err) + return nil + }) + + if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil { + log.Info("Shadowsocks|Client: Connection ends with ", err) } - - responseMutex.Lock() } if request.Command == protocol.RequestCommandUDP { - timedReader := v2net.NewTimeOutReader(16, conn) - var responseMutex sync.Mutex - responseMutex.Lock() - - go func() { - defer responseMutex.Unlock() - - reader := &UDPReader{ - Reader: timedReader, - User: user, - } - - if err := buf.PipeUntilEOF(reader, ray.OutboundOutput()); err != nil { - log.Info("Shadowsocks|Client: Failed to transport all UDP response: ", err) - } - }() writer := &UDPWriter{ Writer: conn, @@ -162,11 +149,35 @@ func (v *Client) Dispatch(destination v2net.Destination, payload *buf.Buffer, ra return } } - if err := buf.PipeUntilEOF(ray.OutboundInput(), writer); err != nil { - log.Info("Shadowsocks|Client: Failed to transport all UDP request: ", err) - } - responseMutex.Lock() + requestDone := signal.ExecuteAsync(func() error { + defer ray.OutboundInput().Release() + + if err := buf.PipeUntilEOF(ray.OutboundInput(), writer); err != nil { + log.Info("Shadowsocks|Client: Failed to transport all UDP request: ", err) + return err + } + return nil + }) + + timedReader := v2net.NewTimeOutReader(16, conn) + + responseDone := signal.ExecuteAsync(func() error { + defer ray.OutboundOutput().Close() + + reader := &UDPReader{ + Reader: timedReader, + User: user, + } + + if err := buf.PipeUntilEOF(reader, ray.OutboundOutput()); err != nil { + log.Info("Shadowsocks|Client: Failed to transport all UDP response: ", err) + return err + } + return nil + }) + + signal.ErrorOrFinish2(requestDone, responseDone) } } diff --git a/proxy/shadowsocks/server.go b/proxy/shadowsocks/server.go index 01bd2fc31..854c32729 100644 --- a/proxy/shadowsocks/server.go +++ b/proxy/shadowsocks/server.go @@ -1,8 +1,6 @@ package shadowsocks import ( - "sync" - "v2ray.com/core/app" "v2ray.com/core/app/dispatcher" "v2ray.com/core/common" @@ -12,6 +10,7 @@ import ( "v2ray.com/core/common/log" v2net "v2ray.com/core/common/net" "v2ray.com/core/common/protocol" + "v2ray.com/core/common/signal" "v2ray.com/core/proxy" "v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet/udp" @@ -177,11 +176,10 @@ func (v *Server) handleConnection(conn internet.Connection) { Inbound: v.meta, }) defer ray.InboundOutput().Release() + defer ray.InboundInput().Close() - var writeFinish sync.Mutex - writeFinish.Lock() - go func() { - defer writeFinish.Unlock() + requestDone := signal.ExecuteAsync(func() error { + defer ray.InboundOutput().Release() bufferedWriter := bufio.NewWriter(conn) defer bufferedWriter.Release() @@ -189,26 +187,38 @@ func (v *Server) handleConnection(conn internet.Connection) { responseWriter, err := WriteTCPResponse(request, bufferedWriter) if err != nil { log.Warning("Shadowsocks|Server: Failed to write response: ", err) - return + return err } defer responseWriter.Release() - if payload, err := ray.InboundOutput().Read(); err == nil { - responseWriter.Write(payload) - bufferedWriter.SetBuffered(false) - - if err := buf.PipeUntilEOF(ray.InboundOutput(), responseWriter); err != nil { - log.Info("Shadowsocks|Server: Failed to transport all TCP response: ", err) - } + payload, err := ray.InboundOutput().Read() + if err != nil { + return err } - }() + responseWriter.Write(payload) + bufferedWriter.SetBuffered(false) - if err := buf.PipeUntilEOF(bodyReader, ray.InboundInput()); err != nil { - log.Info("Shadowsocks|Server: Failed to transport all TCP request: ", err) + if err := buf.PipeUntilEOF(ray.InboundOutput(), responseWriter); err != nil { + log.Info("Shadowsocks|Server: Failed to transport all TCP response: ", err) + return err + } + + return nil + }) + + responseDone := signal.ExecuteAsync(func() error { + defer ray.InboundInput().Close() + + if err := buf.PipeUntilEOF(bodyReader, ray.InboundInput()); err != nil { + log.Info("Shadowsocks|Server: Failed to transport all TCP request: ", err) + return err + } + return nil + }) + + if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil { + log.Info("Shadowsocks|Server: Connection ends with ", err) } - ray.InboundInput().Close() - - writeFinish.Lock() } type ServerFactory struct{} diff --git a/proxy/socks/server.go b/proxy/socks/server.go index 6ba3f6292..0ec3d03e5 100644 --- a/proxy/socks/server.go +++ b/proxy/socks/server.go @@ -15,6 +15,7 @@ import ( "v2ray.com/core/common/log" v2net "v2ray.com/core/common/net" "v2ray.com/core/common/serial" + "v2ray.com/core/common/signal" "v2ray.com/core/proxy" "v2ray.com/core/proxy/socks/protocol" "v2ray.com/core/transport/internet" @@ -299,26 +300,36 @@ func (v *Server) transport(reader io.Reader, writer io.Writer, session *proxy.Se input := ray.InboundInput() output := ray.InboundOutput() - defer input.Close() - defer output.Release() + requestDone := signal.ExecuteAsync(func() error { + defer input.Close() - go func() { v2reader := buf.NewReader(reader) defer v2reader.Release() if err := buf.PipeUntilEOF(v2reader, input); err != nil { log.Info("Socks|Server: Failed to transport all TCP request: ", err) + return err } - input.Close() - }() + return nil + }) - v2writer := buf.NewWriter(writer) - defer v2writer.Release() + responseDone := signal.ExecuteAsync(func() error { + defer output.Release() - if err := buf.PipeUntilEOF(output, v2writer); err != nil { - log.Info("Socks|Server: Failed to transport all TCP response: ", err) + v2writer := buf.NewWriter(writer) + defer v2writer.Release() + + if err := buf.PipeUntilEOF(output, v2writer); err != nil { + log.Info("Socks|Server: Failed to transport all TCP response: ", err) + return err + } + return nil + + }) + + if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil { + log.Info("Socks|Server: Connection ends with ", err) } - output.Release() } type ServerFactory struct{} diff --git a/proxy/vmess/inbound/inbound.go b/proxy/vmess/inbound/inbound.go index dcb6667ea..04ae2a45a 100644 --- a/proxy/vmess/inbound/inbound.go +++ b/proxy/vmess/inbound/inbound.go @@ -137,7 +137,6 @@ func transferRequest(session *encoding.ServerSession, request *protocol.RequestH defer bodyReader.Release() if err := buf.PipeUntilEOF(bodyReader, output); err != nil { - log.Debug("VMess|Inbound: Error when sending data to outbound: ", err) return err } return nil @@ -160,7 +159,6 @@ func transferResponse(session *encoding.ServerSession, request *protocol.Request } if err := buf.PipeUntilEOF(input, bodyWriter); err != nil { - log.Debug("VMess|Inbound: Error when sending data to downstream: ", err) return err } } @@ -201,13 +199,13 @@ func (v *VMessInboundHandler) HandleConnection(connection internet.Connection) { if err != nil { if errors.Cause(err) != io.EOF { log.Access(connection.RemoteAddr(), "", log.AccessRejected, err) - log.Info("VMessIn: Invalid request from ", connection.RemoteAddr(), ": ", err) + log.Info("VMess|Inbound: Invalid request from ", connection.RemoteAddr(), ": ", err) } connection.SetReusable(false) return } log.Access(connection.RemoteAddr(), request.Destination(), log.AccessAccepted, "") - log.Info("VMessIn: Received request for ", request.Destination()) + log.Info("VMess|Inbound: Received request for ", request.Destination()) connection.SetReusable(request.Option.Has(protocol.RequestOptionConnectionReuse)) @@ -245,13 +243,14 @@ func (v *VMessInboundHandler) HandleConnection(connection internet.Connection) { return transferResponse(session, request, response, output, writer) }) - err = signal.ErrorOrFinish2(requestDone, responseDone) - if err != nil { + if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil { + log.Info("VMess|Inbound: Connection ending with ", err) connection.SetReusable(false) return } if err := writer.Flush(); err != nil { + log.Info("VMess|Inbound: Failed to flush remain data: ", err) connection.SetReusable(false) return } diff --git a/proxy/vmess/outbound/outbound.go b/proxy/vmess/outbound/outbound.go index 5bc96cbcf..41ad0e39d 100644 --- a/proxy/vmess/outbound/outbound.go +++ b/proxy/vmess/outbound/outbound.go @@ -1,8 +1,6 @@ package outbound import ( - "sync" - "v2ray.com/core/app" "v2ray.com/core/common" "v2ray.com/core/common/buf" @@ -12,6 +10,7 @@ import ( "v2ray.com/core/common/protocol" "v2ray.com/core/common/retry" "v2ray.com/core/common/serial" + "v2ray.com/core/common/signal" "v2ray.com/core/proxy" "v2ray.com/core/proxy/vmess" "v2ray.com/core/proxy/vmess/encoding" @@ -28,6 +27,7 @@ type VMessOutboundHandler struct { // Dispatch implements OutboundHandler.Dispatch(). func (v *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *buf.Buffer, ray ray.OutboundRay) { + defer payload.Release() defer ray.OutboundInput().Release() defer ray.OutboundOutput().Close() @@ -80,73 +80,65 @@ func (v *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *buf.B input := ray.OutboundInput() output := ray.OutboundOutput() - var requestFinish, responseFinish sync.Mutex - requestFinish.Lock() - responseFinish.Lock() - session := encoding.NewClientSession(protocol.DefaultIDHash) - go v.handleRequest(session, conn, request, payload, input, &requestFinish) - go v.handleResponse(session, conn, request, rec.Destination(), output, &responseFinish) + requestDone := signal.ExecuteAsync(func() error { + defer input.Release() - requestFinish.Lock() - responseFinish.Lock() - return -} + writer := bufio.NewWriter(conn) + defer writer.Release() -func (v *VMessOutboundHandler) handleRequest(session *encoding.ClientSession, conn internet.Connection, request *protocol.RequestHeader, payload *buf.Buffer, input buf.Reader, finish *sync.Mutex) { - defer finish.Unlock() + session.EncodeRequestHeader(request, writer) - writer := bufio.NewWriter(conn) - defer writer.Release() - session.EncodeRequestHeader(request, writer) + bodyWriter := session.EncodeRequestBody(request, writer) + defer bodyWriter.Release() - bodyWriter := session.EncodeRequestBody(request, writer) - defer bodyWriter.Release() - - if !payload.IsEmpty() { - if err := bodyWriter.Write(payload); err != nil { - log.Info("VMess|Outbound: Failed to write payload. Disabling connection reuse.", err) - conn.SetReusable(false) + if !payload.IsEmpty() { + if err := bodyWriter.Write(payload); err != nil { + return err + } } - payload.Release() - } - writer.SetBuffered(false) + writer.SetBuffered(false) - if err := buf.PipeUntilEOF(input, bodyWriter); err != nil { - conn.SetReusable(false) - } + if err := buf.PipeUntilEOF(input, bodyWriter); err != nil { + return err + } - if request.Option.Has(protocol.RequestOptionChunkStream) { - err := bodyWriter.Write(buf.NewLocal(8)) + if request.Option.Has(protocol.RequestOptionChunkStream) { + if err := bodyWriter.Write(buf.NewLocal(8)); err != nil { + return err + } + } + return nil + }) + + responseDone := signal.ExecuteAsync(func() error { + defer output.Close() + + reader := bufio.NewReader(conn) + defer reader.Release() + + header, err := session.DecodeResponseHeader(reader) if err != nil { - conn.SetReusable(false) + return err } - } - return -} + v.handleCommand(rec.Destination(), header.Command) -func (v *VMessOutboundHandler) handleResponse(session *encoding.ClientSession, conn internet.Connection, request *protocol.RequestHeader, dest v2net.Destination, output buf.Writer, finish *sync.Mutex) { - defer finish.Unlock() + conn.SetReusable(header.Option.Has(protocol.ResponseOptionConnectionReuse)) - reader := bufio.NewReader(conn) - defer reader.Release() + reader.SetBuffered(false) + bodyReader := session.DecodeResponseBody(request, reader) + defer bodyReader.Release() - header, err := session.DecodeResponseHeader(reader) - if err != nil { - conn.SetReusable(false) - log.Warning("VMess|Outbound: Failed to read response from ", request.Destination(), ": ", err) - return - } - v.handleCommand(dest, header.Command) + if err := buf.PipeUntilEOF(bodyReader, output); err != nil { + return err + } - conn.SetReusable(header.Option.Has(protocol.ResponseOptionConnectionReuse)) + return nil + }) - reader.SetBuffered(false) - bodyReader := session.DecodeResponseBody(request, reader) - defer bodyReader.Release() - - if err := buf.PipeUntilEOF(bodyReader, output); err != nil { + if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil { + log.Info("VMess|Outbound: Connection ending with ", err) conn.SetReusable(false) } diff --git a/transport/ray/direct.go b/transport/ray/direct.go index d326836d2..e70026431 100644 --- a/transport/ray/direct.go +++ b/transport/ray/direct.go @@ -65,6 +65,8 @@ func (v *Stream) Read() (*buf.Buffer, error) { return b, nil case <-v.srcClose: return nil, io.EOF + case <-v.destClose: + return nil, io.ErrClosedPipe } } } @@ -97,7 +99,7 @@ func (v *Stream) Close() { close(v.srcClose) } -func (v *Stream) Release() { +func (v *Stream) ForceClose() { defer swallowPanic() close(v.destClose) @@ -114,6 +116,8 @@ func (v *Stream) Release() { } } +func (v *Stream) Release() {} + func swallowPanic() { recover() } diff --git a/transport/ray/ray.go b/transport/ray/ray.go index b949d1195..a71ccb8af 100644 --- a/transport/ray/ray.go +++ b/transport/ray/ray.go @@ -36,9 +36,11 @@ type Ray interface { type InputStream interface { buf.Reader Close() + ForceClose() } type OutputStream interface { buf.Writer Close() + ForceClose() }