diff --git a/common/bufio/reader.go b/common/bufio/reader.go index 182323a37..1d61776f8 100644 --- a/common/bufio/reader.go +++ b/common/bufio/reader.go @@ -9,17 +9,17 @@ import ( // BufferedReader is a reader with internal cache. type BufferedReader struct { - reader io.Reader - buffer *buf.Buffer - cached bool + reader io.Reader + buffer *buf.Buffer + buffered bool } // NewReader creates a new BufferedReader based on an io.Reader. func NewReader(rawReader io.Reader) *BufferedReader { return &BufferedReader{ - reader: rawReader, - buffer: buf.New(), - cached: true, + reader: rawReader, + buffer: buf.New(), + buffered: true, } } @@ -33,20 +33,20 @@ func (v *BufferedReader) Release() { common.Release(v.reader) } -// Cached returns true if the internal cache is effective. -func (v *BufferedReader) Cached() bool { - return v.cached +// IsBuffered returns true if the internal cache is effective. +func (v *BufferedReader) IsBuffered() bool { + return v.buffered } -// SetCached is to enable or disable internal cache. If cache is disabled, -// Read() and Write() calls will be delegated to the underlying io.Reader directly. -func (v *BufferedReader) SetCached(cached bool) { - v.cached = cached +// SetBuffered is to enable or disable internal cache. If cache is disabled, +// Read() calls will be delegated to the underlying io.Reader directly. +func (v *BufferedReader) SetBuffered(cached bool) { + v.buffered = cached } // Read implements io.Reader.Read(). func (v *BufferedReader) Read(b []byte) (int, error) { - if !v.cached || v.buffer == nil { + if !v.buffered || v.buffer == nil { if !v.buffer.IsEmpty() { return v.buffer.Read(b) } diff --git a/common/bufio/reader_test.go b/common/bufio/reader_test.go index 55c3e9405..48884bec1 100644 --- a/common/bufio/reader_test.go +++ b/common/bufio/reader_test.go @@ -18,7 +18,7 @@ func TestBufferedReader(t *testing.T) { len := content.Len() reader := NewReader(content) - assert.Bool(reader.Cached()).IsTrue() + assert.Bool(reader.IsBuffered()).IsTrue() payload := make([]byte, 16) diff --git a/common/bufio/writer.go b/common/bufio/writer.go index 0ade5f697..71e6f0526 100644 --- a/common/bufio/writer.go +++ b/common/bufio/writer.go @@ -8,17 +8,20 @@ import ( "v2ray.com/core/common/errors" ) +// BufferedWriter is an io.Writer with internal buffer. It writes to underlying writer when buffer is full or on demand. +// This type is not thread safe. type BufferedWriter struct { - writer io.Writer - buffer *buf.Buffer - cached bool + writer io.Writer + buffer *buf.Buffer + buffered bool } +// NewWriter creates a new BufferedWriter. func NewWriter(rawWriter io.Writer) *BufferedWriter { return &BufferedWriter{ - writer: rawWriter, - buffer: buf.NewSmall(), - cached: true, + writer: rawWriter, + buffer: buf.NewSmall(), + buffered: true, } } @@ -41,7 +44,7 @@ func (v *BufferedWriter) ReadFrom(reader io.Reader) (int64, error) { } func (v *BufferedWriter) Write(b []byte) (int, error) { - if !v.cached || v.buffer == nil { + if !v.buffered || v.buffer == nil { return v.writer.Write(b) } nBytes, err := v.buffer.Write(b) @@ -74,12 +77,12 @@ func (v *BufferedWriter) Flush() error { return nil } -func (v *BufferedWriter) Cached() bool { - return v.cached +func (v *BufferedWriter) Buffered() bool { + return v.buffered } -func (v *BufferedWriter) SetCached(cached bool) { - v.cached = cached +func (v *BufferedWriter) SetBuffered(cached bool) { + v.buffered = cached if !cached && !v.buffer.IsEmpty() { v.Flush() } diff --git a/common/bufio/writer_test.go b/common/bufio/writer_test.go index 09994f387..77ec700a2 100644 --- a/common/bufio/writer_test.go +++ b/common/bufio/writer_test.go @@ -15,7 +15,7 @@ func TestBufferedWriter(t *testing.T) { content := buf.New() writer := NewWriter(content) - assert.Bool(writer.Cached()).IsTrue() + assert.Bool(writer.Buffered()).IsTrue() payload := make([]byte, 16) @@ -25,7 +25,7 @@ func TestBufferedWriter(t *testing.T) { assert.Bool(content.IsEmpty()).IsTrue() - writer.SetCached(false) + writer.SetBuffered(false) assert.Int(content.Len()).Equals(16) } @@ -35,7 +35,7 @@ func TestBufferedWriterLargePayload(t *testing.T) { content := buf.NewLocal(128 * 1024) writer := NewWriter(content) - assert.Bool(writer.Cached()).IsTrue() + assert.Bool(writer.Buffered()).IsTrue() payload := make([]byte, 64*1024) rand.Read(payload) diff --git a/proxy/shadowsocks/client.go b/proxy/shadowsocks/client.go index 355874f5e..8ab3616fe 100644 --- a/proxy/shadowsocks/client.go +++ b/proxy/shadowsocks/client.go @@ -126,7 +126,7 @@ func (v *Client) Dispatch(destination v2net.Destination, payload *buf.Buffer, ra } }() - bufferedWriter.SetCached(false) + bufferedWriter.SetBuffered(false) if err := buf.PipeUntilEOF(ray.OutboundInput(), bodyWriter); err != nil { log.Info("Shadowsocks|Client: Failed to trasnport all TCP request: ", err) } diff --git a/proxy/shadowsocks/server.go b/proxy/shadowsocks/server.go index 7d90ba959..01bd2fc31 100644 --- a/proxy/shadowsocks/server.go +++ b/proxy/shadowsocks/server.go @@ -161,7 +161,7 @@ func (v *Server) handleConnection(conn internet.Connection) { } defer bodyReader.Release() - bufferedReader.SetCached(false) + bufferedReader.SetBuffered(false) userSettings := v.user.GetSettings() timedReader.SetTimeOut(userSettings.PayloadReadTimeout) @@ -195,7 +195,7 @@ func (v *Server) handleConnection(conn internet.Connection) { if payload, err := ray.InboundOutput().Read(); err == nil { responseWriter.Write(payload) - bufferedWriter.SetCached(false) + bufferedWriter.SetBuffered(false) if err := buf.PipeUntilEOF(ray.InboundOutput(), responseWriter); err != nil { log.Info("Shadowsocks|Server: Failed to transport all TCP response: ", err) diff --git a/proxy/socks/server.go b/proxy/socks/server.go index 3fa27c364..1d28ed2c5 100644 --- a/proxy/socks/server.go +++ b/proxy/socks/server.go @@ -216,8 +216,8 @@ func (v *Server) handleSocks5(clientAddr v2net.Destination, reader *bufio.Buffer return err } - reader.SetCached(false) - writer.SetCached(false) + reader.SetBuffered(false) + writer.SetBuffered(false) dest := request.Destination() session := &proxy.SessionInfo{ @@ -279,8 +279,8 @@ func (v *Server) handleSocks4(clientAddr v2net.Destination, reader *bufio.Buffer return ErrUnsupportedSocksCommand } - reader.SetCached(false) - writer.SetCached(false) + reader.SetBuffered(false) + writer.SetBuffered(false) dest := v2net.TCPDestination(v2net.IPAddress(auth.IP[:]), auth.Port) session := &proxy.SessionInfo{ diff --git a/proxy/vmess/inbound/inbound.go b/proxy/vmess/inbound/inbound.go index ef0e0aab6..b8ab3fc65 100644 --- a/proxy/vmess/inbound/inbound.go +++ b/proxy/vmess/inbound/inbound.go @@ -181,7 +181,7 @@ func (v *VMessInboundHandler) HandleConnection(connection internet.Connection) { userSettings := request.User.GetSettings() connReader.SetTimeOut(userSettings.PayloadReadTimeout) - reader.SetCached(false) + reader.SetBuffered(false) go func() { bodyReader := session.DecodeRequestBody(request, reader) @@ -216,7 +216,7 @@ func (v *VMessInboundHandler) HandleConnection(connection internet.Connection) { connection.SetReusable(false) } - writer.SetCached(false) + writer.SetBuffered(false) if err := buf.PipeUntilEOF(output, bodyWriter); err != nil { log.Debug("VMess|Inbound: Error when sending data to downstream: ", err) diff --git a/proxy/vmess/outbound/outbound.go b/proxy/vmess/outbound/outbound.go index 40794b659..22a0f3e62 100644 --- a/proxy/vmess/outbound/outbound.go +++ b/proxy/vmess/outbound/outbound.go @@ -110,7 +110,7 @@ func (v *VMessOutboundHandler) handleRequest(session *encoding.ClientSession, co } payload.Release() } - writer.SetCached(false) + writer.SetBuffered(false) if err := buf.PipeUntilEOF(input, bodyWriter); err != nil { conn.SetReusable(false) @@ -141,7 +141,7 @@ func (v *VMessOutboundHandler) handleResponse(session *encoding.ClientSession, c conn.SetReusable(header.Option.Has(protocol.ResponseOptionConnectionReuse)) - reader.SetCached(false) + reader.SetBuffered(false) bodyReader := session.DecodeResponseBody(request, reader) defer bodyReader.Release()