mirror of
https://github.com/v2fly/v2ray-core.git
synced 2024-12-22 01:57:12 -05:00
rename buf.Copy
This commit is contained in:
parent
e362c74fa9
commit
ab9349ec31
@ -154,7 +154,7 @@ func fetchInput(ctx context.Context, s *Session, output buf.Writer) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err := buf.PipeUntilEOF(signal.BackgroundTimer(), s.input, writer); err != nil {
|
if err := buf.Copy(signal.BackgroundTimer(), s.input, writer); err != nil {
|
||||||
log.Trace(newError("failed to fetch all input").Base(err))
|
log.Trace(newError("failed to fetch all input").Base(err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -309,7 +309,7 @@ type ServerWorker struct {
|
|||||||
|
|
||||||
func handle(ctx context.Context, s *Session, output buf.Writer) {
|
func handle(ctx context.Context, s *Session, output buf.Writer) {
|
||||||
writer := NewResponseWriter(s.ID, output)
|
writer := NewResponseWriter(s.ID, output)
|
||||||
if err := buf.PipeUntilEOF(signal.BackgroundTimer(), s.input, writer); err != nil {
|
if err := buf.Copy(signal.BackgroundTimer(), s.input, writer); err != nil {
|
||||||
log.Trace(newError("session ", s.ID, " ends: ").Base(err))
|
log.Trace(newError("session ", s.ID, " ends: ").Base(err))
|
||||||
}
|
}
|
||||||
writer.Close()
|
writer.Close()
|
||||||
|
@ -40,9 +40,7 @@ func ReadFullFrom(reader io.Reader, size int) Supplier {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pipe dumps all payload from reader to writer, until an error occurs.
|
func copyInternal(timer signal.ActivityTimer, reader Reader, writer Writer) error {
|
||||||
// ActivityTimer gets updated as soon as there is a payload.
|
|
||||||
func Pipe(timer signal.ActivityTimer, reader Reader, writer Writer) error {
|
|
||||||
for {
|
for {
|
||||||
buffer, err := reader.Read()
|
buffer, err := reader.Read()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -64,9 +62,10 @@ func Pipe(timer signal.ActivityTimer, reader Reader, writer Writer) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// PipeUntilEOF behaves the same as Pipe(). The only difference is PipeUntilEOF returns nil on EOF.
|
// Copy dumps all payload from reader to writer or stops when an error occurs.
|
||||||
func PipeUntilEOF(timer signal.ActivityTimer, reader Reader, writer Writer) error {
|
// ActivityTimer gets updated as soon as there is a payload.
|
||||||
err := Pipe(timer, reader, writer)
|
func Copy(timer signal.ActivityTimer, reader Reader, writer Writer) error {
|
||||||
|
err := copyInternal(timer, reader, writer)
|
||||||
if err != nil && errors.Cause(err) != io.EOF {
|
if err != nil && errors.Cause(err) != io.EOF {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -76,7 +76,7 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn in
|
|||||||
|
|
||||||
chunkReader := buf.NewReader(conn)
|
chunkReader := buf.NewReader(conn)
|
||||||
|
|
||||||
if err := buf.PipeUntilEOF(timer, chunkReader, inboundRay.InboundInput()); err != nil {
|
if err := buf.Copy(timer, chunkReader, inboundRay.InboundInput()); err != nil {
|
||||||
return newError("failed to transport request").Base(err)
|
return newError("failed to transport request").Base(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -86,7 +86,7 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn in
|
|||||||
responseDone := signal.ExecuteAsync(func() error {
|
responseDone := signal.ExecuteAsync(func() error {
|
||||||
v2writer := buf.NewWriter(conn)
|
v2writer := buf.NewWriter(conn)
|
||||||
|
|
||||||
if err := buf.PipeUntilEOF(timer, inboundRay.InboundOutput(), v2writer); err != nil {
|
if err := buf.Copy(timer, inboundRay.InboundOutput(), v2writer); err != nil {
|
||||||
return newError("failed to transport response").Base(err)
|
return newError("failed to transport response").Base(err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -119,7 +119,7 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial
|
|||||||
} else {
|
} else {
|
||||||
writer = &seqWriter{writer: conn}
|
writer = &seqWriter{writer: conn}
|
||||||
}
|
}
|
||||||
if err := buf.PipeUntilEOF(timer, input, writer); err != nil {
|
if err := buf.Copy(timer, input, writer); err != nil {
|
||||||
return newError("failed to process request").Base(err)
|
return newError("failed to process request").Base(err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -129,7 +129,7 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial
|
|||||||
defer output.Close()
|
defer output.Close()
|
||||||
|
|
||||||
v2reader := buf.NewReader(conn)
|
v2reader := buf.NewReader(conn)
|
||||||
if err := buf.PipeUntilEOF(timer, v2reader, output); err != nil {
|
if err := buf.Copy(timer, v2reader, output); err != nil {
|
||||||
return newError("failed to process response").Base(err)
|
return newError("failed to process response").Base(err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -152,7 +152,7 @@ func (s *Server) handleConnect(ctx context.Context, request *http.Request, reade
|
|||||||
defer ray.InboundInput().Close()
|
defer ray.InboundInput().Close()
|
||||||
|
|
||||||
v2reader := buf.NewReader(reader)
|
v2reader := buf.NewReader(reader)
|
||||||
if err := buf.PipeUntilEOF(timer, v2reader, ray.InboundInput()); err != nil {
|
if err := buf.Copy(timer, v2reader, ray.InboundInput()); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -160,7 +160,7 @@ func (s *Server) handleConnect(ctx context.Context, request *http.Request, reade
|
|||||||
|
|
||||||
responseDone := signal.ExecuteAsync(func() error {
|
responseDone := signal.ExecuteAsync(func() error {
|
||||||
v2writer := buf.NewWriter(writer)
|
v2writer := buf.NewWriter(writer)
|
||||||
if err := buf.PipeUntilEOF(timer, ray.InboundOutput(), v2writer); err != nil {
|
if err := buf.Copy(timer, ray.InboundOutput(), v2writer); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -105,7 +105,7 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale
|
|||||||
}
|
}
|
||||||
|
|
||||||
requestDone := signal.ExecuteAsync(func() error {
|
requestDone := signal.ExecuteAsync(func() error {
|
||||||
if err := buf.PipeUntilEOF(timer, outboundRay.OutboundInput(), bodyWriter); err != nil {
|
if err := buf.Copy(timer, outboundRay.OutboundInput(), bodyWriter); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -119,7 +119,7 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := buf.PipeUntilEOF(timer, responseReader, outboundRay.OutboundOutput()); err != nil {
|
if err := buf.Copy(timer, responseReader, outboundRay.OutboundOutput()); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -141,7 +141,7 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale
|
|||||||
}
|
}
|
||||||
|
|
||||||
requestDone := signal.ExecuteAsync(func() error {
|
requestDone := signal.ExecuteAsync(func() error {
|
||||||
if err := buf.PipeUntilEOF(timer, outboundRay.OutboundInput(), writer); err != nil {
|
if err := buf.Copy(timer, outboundRay.OutboundInput(), writer); err != nil {
|
||||||
return newError("failed to transport all UDP request").Base(err)
|
return newError("failed to transport all UDP request").Base(err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -155,7 +155,7 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale
|
|||||||
User: user,
|
User: user,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := buf.PipeUntilEOF(timer, reader, outboundRay.OutboundOutput()); err != nil {
|
if err := buf.Copy(timer, reader, outboundRay.OutboundOutput()); err != nil {
|
||||||
return newError("failed to transport all UDP response").Base(err)
|
return newError("failed to transport all UDP response").Base(err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -173,7 +173,7 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection,
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := buf.PipeUntilEOF(timer, ray.InboundOutput(), responseWriter); err != nil {
|
if err := buf.Copy(timer, ray.InboundOutput(), responseWriter); err != nil {
|
||||||
return newError("failed to transport all TCP response").Base(err)
|
return newError("failed to transport all TCP response").Base(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -183,7 +183,7 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection,
|
|||||||
requestDone := signal.ExecuteAsync(func() error {
|
requestDone := signal.ExecuteAsync(func() error {
|
||||||
defer ray.InboundInput().Close()
|
defer ray.InboundInput().Close()
|
||||||
|
|
||||||
if err := buf.PipeUntilEOF(timer, bodyReader, ray.InboundInput()); err != nil {
|
if err := buf.Copy(timer, bodyReader, ray.InboundInput()); err != nil {
|
||||||
return newError("failed to transport all TCP request").Base(err)
|
return newError("failed to transport all TCP request").Base(err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -90,11 +90,11 @@ func (c *Client) Process(ctx context.Context, ray ray.OutboundRay, dialer proxy.
|
|||||||
var responseFunc func() error
|
var responseFunc func() error
|
||||||
if request.Command == protocol.RequestCommandTCP {
|
if request.Command == protocol.RequestCommandTCP {
|
||||||
requestFunc = func() error {
|
requestFunc = func() error {
|
||||||
return buf.PipeUntilEOF(timer, ray.OutboundInput(), buf.NewWriter(conn))
|
return buf.Copy(timer, ray.OutboundInput(), buf.NewWriter(conn))
|
||||||
}
|
}
|
||||||
responseFunc = func() error {
|
responseFunc = func() error {
|
||||||
defer ray.OutboundOutput().Close()
|
defer ray.OutboundOutput().Close()
|
||||||
return buf.PipeUntilEOF(timer, buf.NewReader(conn), ray.OutboundOutput())
|
return buf.Copy(timer, buf.NewReader(conn), ray.OutboundOutput())
|
||||||
}
|
}
|
||||||
} else if request.Command == protocol.RequestCommandUDP {
|
} else if request.Command == protocol.RequestCommandUDP {
|
||||||
udpConn, err := dialer.Dial(ctx, udpRequest.Destination())
|
udpConn, err := dialer.Dial(ctx, udpRequest.Destination())
|
||||||
@ -103,12 +103,12 @@ func (c *Client) Process(ctx context.Context, ray ray.OutboundRay, dialer proxy.
|
|||||||
}
|
}
|
||||||
defer udpConn.Close()
|
defer udpConn.Close()
|
||||||
requestFunc = func() error {
|
requestFunc = func() error {
|
||||||
return buf.PipeUntilEOF(timer, ray.OutboundInput(), &UDPWriter{request: request, writer: udpConn})
|
return buf.Copy(timer, ray.OutboundInput(), &UDPWriter{request: request, writer: udpConn})
|
||||||
}
|
}
|
||||||
responseFunc = func() error {
|
responseFunc = func() error {
|
||||||
defer ray.OutboundOutput().Close()
|
defer ray.OutboundOutput().Close()
|
||||||
reader := &UDPReader{reader: udpConn}
|
reader := &UDPReader{reader: udpConn}
|
||||||
return buf.PipeUntilEOF(timer, reader, ray.OutboundOutput())
|
return buf.Copy(timer, reader, ray.OutboundOutput())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -125,7 +125,7 @@ func (v *Server) transport(ctx context.Context, reader io.Reader, writer io.Writ
|
|||||||
defer input.Close()
|
defer input.Close()
|
||||||
|
|
||||||
v2reader := buf.NewReader(reader)
|
v2reader := buf.NewReader(reader)
|
||||||
if err := buf.PipeUntilEOF(timer, v2reader, input); err != nil {
|
if err := buf.Copy(timer, v2reader, input); err != nil {
|
||||||
return newError("failed to transport all TCP request").Base(err)
|
return newError("failed to transport all TCP request").Base(err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -133,7 +133,7 @@ func (v *Server) transport(ctx context.Context, reader io.Reader, writer io.Writ
|
|||||||
|
|
||||||
responseDone := signal.ExecuteAsync(func() error {
|
responseDone := signal.ExecuteAsync(func() error {
|
||||||
v2writer := buf.NewWriter(writer)
|
v2writer := buf.NewWriter(writer)
|
||||||
if err := buf.PipeUntilEOF(timer, output, v2writer); err != nil {
|
if err := buf.Copy(timer, output, v2writer); err != nil {
|
||||||
return newError("failed to transport all TCP response").Base(err)
|
return newError("failed to transport all TCP response").Base(err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -129,7 +129,7 @@ func transferRequest(timer signal.ActivityTimer, session *encoding.ServerSession
|
|||||||
defer output.Close()
|
defer output.Close()
|
||||||
|
|
||||||
bodyReader := session.DecodeRequestBody(request, input)
|
bodyReader := session.DecodeRequestBody(request, input)
|
||||||
if err := buf.PipeUntilEOF(timer, bodyReader, output); err != nil {
|
if err := buf.Copy(timer, bodyReader, output); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -157,7 +157,7 @@ func transferResponse(timer signal.ActivityTimer, session *encoding.ServerSessio
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := buf.PipeUntilEOF(timer, input, bodyWriter); err != nil {
|
if err := buf.Copy(timer, input, bodyWriter); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -123,7 +123,7 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := buf.PipeUntilEOF(timer, input, bodyWriter); err != nil {
|
if err := buf.Copy(timer, input, bodyWriter); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -147,7 +147,7 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial
|
|||||||
|
|
||||||
reader.SetBuffered(false)
|
reader.SetBuffered(false)
|
||||||
bodyReader := session.DecodeResponseBody(request, reader)
|
bodyReader := session.DecodeResponseBody(request, reader)
|
||||||
if err := buf.PipeUntilEOF(timer, bodyReader, output); err != nil {
|
if err := buf.Copy(timer, bodyReader, output); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user