1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2025-01-20 00:07:06 -05:00

fix usage of ray stream.

This commit is contained in:
Darien Raymond 2016-12-30 00:51:39 +01:00
parent 609dbc1f13
commit f6aa7a0053
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169
9 changed files with 29 additions and 24 deletions

View File

@ -167,7 +167,8 @@ func (v *DokodemoDoor) HandleTCPConnection(conn internet.Connection) {
Destination: dest, Destination: dest,
Inbound: v.meta, Inbound: v.meta,
}) })
defer ray.InboundOutput().Release() output := ray.InboundOutput()
defer output.ForceClose()
reader := v2net.NewTimeOutReader(v.config.Timeout, conn) reader := v2net.NewTimeOutReader(v.config.Timeout, conn)
defer reader.Release() defer reader.Release()
@ -187,19 +188,21 @@ func (v *DokodemoDoor) HandleTCPConnection(conn internet.Connection) {
}) })
responseDone := signal.ExecuteAsync(func() error { responseDone := signal.ExecuteAsync(func() error {
defer ray.InboundOutput().Release() defer output.ForceClose()
v2writer := buf.NewWriter(conn) v2writer := buf.NewWriter(conn)
defer v2writer.Release() defer v2writer.Release()
if err := buf.PipeUntilEOF(ray.InboundOutput(), v2writer); err != nil { if err := buf.PipeUntilEOF(output, v2writer); err != nil {
log.Info("Dokodemo: Failed to transport all TCP response: ", err) log.Info("Dokodemo: Failed to transport all TCP response: ", err)
return err return err
} }
return nil return nil
}) })
signal.ErrorOrFinish2(requestDone, responseDone) if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil {
log.Info("Dokodemo: Connection ends with ", err)
}
} }
type Factory struct{} type Factory struct{}

View File

@ -71,8 +71,10 @@ func (v *Handler) Dispatch(destination v2net.Destination, payload *buf.Buffer, r
log.Info("Freedom: Opening connection to ", destination) log.Info("Freedom: Opening connection to ", destination)
defer payload.Release() defer payload.Release()
defer ray.OutboundInput().Release() input := ray.OutboundInput()
defer ray.OutboundOutput().Close() output := ray.OutboundOutput()
defer input.ForceClose()
defer output.Close()
var conn internet.Connection var conn internet.Connection
if v.domainStrategy == Config_USE_IP && destination.Address.Family().IsDomain() { if v.domainStrategy == Config_USE_IP && destination.Address.Family().IsDomain() {
@ -92,9 +94,6 @@ func (v *Handler) Dispatch(destination v2net.Destination, payload *buf.Buffer, r
} }
defer conn.Close() defer conn.Close()
input := ray.OutboundInput()
output := ray.OutboundOutput()
if !payload.IsEmpty() { if !payload.IsEmpty() {
if _, err := conn.Write(payload.Bytes()); err != nil { if _, err := conn.Write(payload.Bytes()); err != nil {
log.Warning("Freedom: Failed to write to destination: ", destination, ": ", err) log.Warning("Freedom: Failed to write to destination: ", destination, ": ", err)
@ -103,7 +102,7 @@ func (v *Handler) Dispatch(destination v2net.Destination, payload *buf.Buffer, r
} }
requestDone := signal.ExecuteAsync(func() error { requestDone := signal.ExecuteAsync(func() error {
defer input.Release() defer input.ForceClose()
v2writer := buf.NewWriter(conn) v2writer := buf.NewWriter(conn)
defer v2writer.Release() defer v2writer.Release()

View File

@ -169,7 +169,7 @@ func (v *Server) handleConnect(request *http.Request, session *proxy.SessionInfo
}) })
responseDone := signal.ExecuteAsync(func() error { responseDone := signal.ExecuteAsync(func() error {
defer ray.InboundOutput().Release() defer ray.InboundOutput().ForceClose()
v2writer := buf.NewWriter(writer) v2writer := buf.NewWriter(writer)
defer v2writer.Release() defer v2writer.Release()
@ -236,9 +236,14 @@ func (v *Server) handlePlainHTTP(request *http.Request, session *proxy.SessionIn
StripHopByHopHeaders(request) StripHopByHopHeaders(request)
ray := v.packetDispatcher.DispatchToOutbound(session) ray := v.packetDispatcher.DispatchToOutbound(session)
input := ray.InboundInput()
output := ray.InboundOutput()
defer input.Close()
defer output.ForceClose()
requestDone := signal.ExecuteAsync(func() error { requestDone := signal.ExecuteAsync(func() error {
defer ray.InboundInput().Close() defer input.Close()
requestWriter := bufio.NewWriter(buf.NewBytesWriter(ray.InboundInput())) requestWriter := bufio.NewWriter(buf.NewBytesWriter(ray.InboundInput()))
defer requestWriter.Release() defer requestWriter.Release()
@ -254,7 +259,7 @@ func (v *Server) handlePlainHTTP(request *http.Request, session *proxy.SessionIn
}) })
responseDone := signal.ExecuteAsync(func() error { responseDone := signal.ExecuteAsync(func() error {
defer ray.InboundOutput().Release() defer output.ForceClose()
responseReader := bufio.OriginalReader(buf.NewBytesReader(ray.InboundOutput())) responseReader := bufio.OriginalReader(buf.NewBytesReader(ray.InboundOutput()))
response, err := http.ReadResponse(responseReader, request) response, err := http.ReadResponse(responseReader, request)

View File

@ -109,7 +109,7 @@ func (v *Client) Dispatch(destination v2net.Destination, payload *buf.Buffer, ra
bufferedWriter.SetBuffered(false) bufferedWriter.SetBuffered(false)
requestDone := signal.ExecuteAsync(func() error { requestDone := signal.ExecuteAsync(func() error {
defer ray.OutboundInput().Release() defer ray.OutboundInput().ForceClose()
if err := buf.PipeUntilEOF(ray.OutboundInput(), bodyWriter); err != nil { if err := buf.PipeUntilEOF(ray.OutboundInput(), bodyWriter); err != nil {
return err return err
@ -151,7 +151,7 @@ func (v *Client) Dispatch(destination v2net.Destination, payload *buf.Buffer, ra
} }
requestDone := signal.ExecuteAsync(func() error { requestDone := signal.ExecuteAsync(func() error {
defer ray.OutboundInput().Release() defer ray.OutboundInput().ForceClose()
if err := buf.PipeUntilEOF(ray.OutboundInput(), writer); err != nil { if err := buf.PipeUntilEOF(ray.OutboundInput(), writer); err != nil {
log.Info("Shadowsocks|Client: Failed to transport all UDP request: ", err) log.Info("Shadowsocks|Client: Failed to transport all UDP request: ", err)

View File

@ -175,11 +175,11 @@ func (v *Server) handleConnection(conn internet.Connection) {
User: request.User, User: request.User,
Inbound: v.meta, Inbound: v.meta,
}) })
defer ray.InboundOutput().Release() defer ray.InboundOutput().ForceClose()
defer ray.InboundInput().Close() defer ray.InboundInput().Close()
requestDone := signal.ExecuteAsync(func() error { requestDone := signal.ExecuteAsync(func() error {
defer ray.InboundOutput().Release() defer ray.InboundOutput().ForceClose()
bufferedWriter := bufio.NewWriter(conn) bufferedWriter := bufio.NewWriter(conn)
defer bufferedWriter.Release() defer bufferedWriter.Release()

View File

@ -314,7 +314,7 @@ func (v *Server) transport(reader io.Reader, writer io.Writer, session *proxy.Se
}) })
responseDone := signal.ExecuteAsync(func() error { responseDone := signal.ExecuteAsync(func() error {
defer output.Release() defer output.ForceClose()
v2writer := buf.NewWriter(writer) v2writer := buf.NewWriter(writer)
defer v2writer.Release() defer v2writer.Release()

View File

@ -143,7 +143,7 @@ func transferRequest(session *encoding.ServerSession, request *protocol.RequestH
} }
func transferResponse(session *encoding.ServerSession, request *protocol.RequestHeader, response *protocol.ResponseHeader, input ray.InputStream, output io.Writer) error { func transferResponse(session *encoding.ServerSession, request *protocol.RequestHeader, response *protocol.ResponseHeader, input ray.InputStream, output io.Writer) error {
defer input.Release() defer input.ForceClose()
session.EncodeResponseHeader(response, output) session.EncodeResponseHeader(response, output)
bodyWriter := session.EncodeResponseBody(request, output) bodyWriter := session.EncodeResponseBody(request, output)
@ -218,7 +218,7 @@ func (v *VMessInboundHandler) HandleConnection(connection internet.Connection) {
input := ray.InboundInput() input := ray.InboundInput()
output := ray.InboundOutput() output := ray.InboundOutput()
defer input.Close() defer input.Close()
defer output.Release() defer output.ForceClose()
userSettings := request.User.GetSettings() userSettings := request.User.GetSettings()
connReader.SetTimeOut(userSettings.PayloadReadTimeout) connReader.SetTimeOut(userSettings.PayloadReadTimeout)

View File

@ -28,7 +28,7 @@ type VMessOutboundHandler struct {
// Dispatch implements OutboundHandler.Dispatch(). // Dispatch implements OutboundHandler.Dispatch().
func (v *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *buf.Buffer, ray ray.OutboundRay) { func (v *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *buf.Buffer, ray ray.OutboundRay) {
defer payload.Release() defer payload.Release()
defer ray.OutboundInput().Release() defer ray.OutboundInput().ForceClose()
defer ray.OutboundOutput().Close() defer ray.OutboundOutput().Close()
var rec *protocol.ServerSpec var rec *protocol.ServerSpec
@ -83,7 +83,7 @@ func (v *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *buf.B
session := encoding.NewClientSession(protocol.DefaultIDHash) session := encoding.NewClientSession(protocol.DefaultIDHash)
requestDone := signal.ExecuteAsync(func() error { requestDone := signal.ExecuteAsync(func() error {
defer input.Release() defer input.ForceClose()
writer := bufio.NewWriter(conn) writer := bufio.NewWriter(conn)
defer writer.Release() defer writer.Release()

View File

@ -35,12 +35,10 @@ type Ray interface {
type InputStream interface { type InputStream interface {
buf.Reader buf.Reader
Close()
ForceClose() ForceClose()
} }
type OutputStream interface { type OutputStream interface {
buf.Writer buf.Writer
Close() Close()
ForceClose()
} }