1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2024-12-06 18:26:47 -05:00

refactor io package

This commit is contained in:
Darien Raymond 2016-12-09 13:17:34 +01:00
parent 055023fdd5
commit 1948d0738f
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169
36 changed files with 390 additions and 414 deletions

View File

@ -7,8 +7,8 @@ import (
"v2ray.com/core/app"
"v2ray.com/core/app/proxyman"
"v2ray.com/core/common/buf"
"v2ray.com/core/common/errors"
v2io "v2ray.com/core/common/io"
"v2ray.com/core/common/log"
v2net "v2ray.com/core/common/net"
"v2ray.com/core/transport/internet"
@ -63,8 +63,8 @@ type ProxyConnection struct {
localAddr net.Addr
remoteAddr net.Addr
reader *v2io.ChanReader
writer *v2io.ChainWriter
reader *buf.BufferToBytesReader
writer *buf.BytesToBufferWriter
}
func NewProxyConnection(src v2net.Address, dest v2net.Destination, stream ray.Ray) *ProxyConnection {
@ -78,8 +78,8 @@ func NewProxyConnection(src v2net.Address, dest v2net.Destination, stream ray.Ra
IP: []byte{0, 0, 0, 0},
Port: 0,
},
reader: v2io.NewChanReader(stream.InboundOutput()),
writer: v2io.NewChainWriter(stream.InboundInput()),
reader: buf.NewBytesReader(stream.InboundOutput()),
writer: buf.NewBytesWriter(stream.InboundInput()),
}
}

View File

@ -1,6 +1,24 @@
package buf
import "io"
import (
"io"
"v2ray.com/core/common/errors"
)
// Reader extends io.Reader with alloc.Buffer.
type Reader interface {
Release()
// Read reads content from underlying reader, and put it into an alloc.Buffer.
Read() (*Buffer, error)
}
// Writer extends io.Writer with alloc.Buffer.
type Writer interface {
Release()
// Write writes an alloc.Buffer into underlying writer.
Write(*Buffer) error
}
func ReadFrom(reader io.Reader) Supplier {
return func(b []byte) (int, error) {
@ -13,3 +31,60 @@ func ReadFullFrom(reader io.Reader, size int) Supplier {
return io.ReadFull(reader, b[:size])
}
}
// Pipe dumps all content from reader to writer, until an error happens.
func Pipe(reader Reader, writer Writer) error {
for {
buffer, err := reader.Read()
if err != nil {
return err
}
if buffer.IsEmpty() {
buffer.Release()
continue
}
err = writer.Write(buffer)
if err != nil {
buffer.Release()
return err
}
}
}
// PipeUntilEOF behaves the same as Pipe(). The only difference is PipeUntilEOF returns nil on EOF.
func PipeUntilEOF(reader Reader, writer Writer) error {
err := Pipe(reader, writer)
if err != nil && errors.Cause(err) != io.EOF {
return err
}
return nil
}
// NewReader creates a new Reader.
// The Reader instance doesn't take the ownership of reader.
func NewReader(reader io.Reader) Reader {
return &BytesToBufferReader{
reader: reader,
}
}
func NewBytesReader(stream Reader) *BufferToBytesReader {
return &BufferToBytesReader{
stream: stream,
}
}
// NewWriter creates a new Writer.
func NewWriter(writer io.Writer) Writer {
return &BufferToBytesWriter{
writer: writer,
}
}
func NewBytesWriter(writer Writer) *BytesToBufferWriter {
return &BytesToBufferWriter{
writer: writer,
}
}

102
common/buf/reader.go Normal file
View File

@ -0,0 +1,102 @@
package buf
import (
"io"
"sync"
)
// BytesToBufferReader is a Reader that adjusts its reading speed automatically.
type BytesToBufferReader struct {
reader io.Reader
largeBuffer *Buffer
highVolumn bool
}
// Read implements Reader.Read().
func (v *BytesToBufferReader) Read() (*Buffer, error) {
if v.highVolumn && v.largeBuffer.IsEmpty() {
if v.largeBuffer == nil {
v.largeBuffer = NewLocal(32 * 1024)
}
err := v.largeBuffer.AppendSupplier(ReadFrom(v.reader))
if err != nil {
return nil, err
}
if v.largeBuffer.Len() < Size {
v.highVolumn = false
}
}
buffer := New()
if !v.largeBuffer.IsEmpty() {
buffer.AppendSupplier(ReadFrom(v.largeBuffer))
return buffer, nil
}
err := buffer.AppendSupplier(ReadFrom(v.reader))
if err != nil {
buffer.Release()
return nil, err
}
if buffer.IsFull() {
v.highVolumn = true
}
return buffer, nil
}
// Release implements Releasable.Release().
func (v *BytesToBufferReader) Release() {
v.reader = nil
}
type BufferToBytesReader struct {
sync.Mutex
stream Reader
current *Buffer
eof bool
}
// Private: Visible for testing.
func (v *BufferToBytesReader) Fill() {
b, err := v.stream.Read()
v.current = b
if err != nil {
v.eof = true
v.current = nil
}
}
func (v *BufferToBytesReader) Read(b []byte) (int, error) {
if v.eof {
return 0, io.EOF
}
v.Lock()
defer v.Unlock()
if v.current == nil {
v.Fill()
if v.eof {
return 0, io.EOF
}
}
nBytes, err := v.current.Read(b)
if v.current.IsEmpty() {
v.current.Release()
v.current = nil
}
return nBytes, err
}
func (v *BufferToBytesReader) Release() {
v.Lock()
defer v.Unlock()
v.eof = true
v.current.Release()
v.current = nil
v.stream = nil
}

View File

@ -1,11 +1,10 @@
package io_test
package buf_test
import (
"bytes"
"testing"
"v2ray.com/core/common/buf"
. "v2ray.com/core/common/io"
. "v2ray.com/core/common/buf"
"v2ray.com/core/testing/assert"
)
@ -15,12 +14,12 @@ func TestAdaptiveReader(t *testing.T) {
rawContent := make([]byte, 1024*1024)
buffer := bytes.NewBuffer(rawContent)
reader := NewAdaptiveReader(buffer)
reader := NewReader(buffer)
b1, err := reader.Read()
assert.Error(err).IsNil()
assert.Bool(b1.IsFull()).IsTrue()
assert.Int(b1.Len()).Equals(buf.Size)
assert.Int(buffer.Len()).Equals(cap(rawContent) - buf.Size)
assert.Int(b1.Len()).Equals(Size)
assert.Int(buffer.Len()).Equals(cap(rawContent) - Size)
b2, err := reader.Read()
assert.Error(err).IsNil()

68
common/buf/writer.go Normal file
View File

@ -0,0 +1,68 @@
package buf
import (
"io"
"sync"
)
// BufferToBytesWriter is a Writer that writes alloc.Buffer into underlying writer.
type BufferToBytesWriter struct {
writer io.Writer
}
// Write implements Writer.Write(). Write() takes ownership of the given buffer.
func (v *BufferToBytesWriter) Write(buffer *Buffer) error {
defer buffer.Release()
for {
nBytes, err := v.writer.Write(buffer.Bytes())
if err != nil {
return err
}
if nBytes == buffer.Len() {
break
}
buffer.SliceFrom(nBytes)
}
return nil
}
// Release implements Releasable.Release().
func (v *BufferToBytesWriter) Release() {
v.writer = nil
}
type BytesToBufferWriter struct {
sync.Mutex
writer Writer
}
func (v *BytesToBufferWriter) Write(payload []byte) (int, error) {
v.Lock()
defer v.Unlock()
if v.writer == nil {
return 0, io.ErrClosedPipe
}
bytesWritten := 0
size := len(payload)
for size > 0 {
buffer := New()
nBytes, _ := buffer.Write(payload)
size -= nBytes
payload = payload[nBytes:]
bytesWritten += nBytes
err := v.writer.Write(buffer)
if err != nil {
return bytesWritten, err
}
}
return bytesWritten, nil
}
func (v *BytesToBufferWriter) Release() {
v.Lock()
v.writer.Release()
v.writer = nil
v.Unlock()
}

View File

@ -1,26 +1,26 @@
package io_test
package buf_test
import (
"bytes"
"crypto/rand"
"testing"
"v2ray.com/core/common/buf"
. "v2ray.com/core/common/io"
. "v2ray.com/core/common/buf"
"v2ray.com/core/common/bufio"
"v2ray.com/core/testing/assert"
)
func TestAdaptiveWriter(t *testing.T) {
func TestWriter(t *testing.T) {
assert := assert.On(t)
lb := buf.New()
lb.AppendSupplier(buf.ReadFrom(rand.Reader))
lb := New()
lb.AppendSupplier(ReadFrom(rand.Reader))
expectedBytes := append([]byte(nil), lb.Bytes()...)
writeBuffer := bytes.NewBuffer(make([]byte, 0, 1024*1024))
writer := NewAdaptiveWriter(NewBufferedWriter(writeBuffer))
writer := NewWriter(bufio.NewWriter(writeBuffer))
err := writer.Write(lb)
assert.Error(err).IsNil()
assert.Bytes(expectedBytes).Equals(writeBuffer.Bytes())

View File

@ -1,4 +1,4 @@
package io
package bufio
import (
"io"
@ -14,7 +14,7 @@ type BufferedReader struct {
cached bool
}
func NewBufferedReader(rawReader io.Reader) *BufferedReader {
func NewReader(rawReader io.Reader) *BufferedReader {
return &BufferedReader{
reader: rawReader,
buffer: buf.New(),

View File

@ -1,11 +1,11 @@
package io_test
package bufio_test
import (
"crypto/rand"
"testing"
"v2ray.com/core/common/buf"
. "v2ray.com/core/common/io"
. "v2ray.com/core/common/bufio"
"v2ray.com/core/testing/assert"
)
@ -17,7 +17,7 @@ func TestBufferedReader(t *testing.T) {
len := content.Len()
reader := NewBufferedReader(content)
reader := NewReader(content)
assert.Bool(reader.Cached()).IsTrue()
payload := make([]byte, 16)

View File

@ -1,4 +1,4 @@
package io
package bufio
import (
"io"
@ -15,7 +15,7 @@ type BufferedWriter struct {
cached bool
}
func NewBufferedWriter(rawWriter io.Writer) *BufferedWriter {
func NewWriter(rawWriter io.Writer) *BufferedWriter {
return &BufferedWriter{
writer: rawWriter,
buffer: buf.NewSmall(),

View File

@ -1,11 +1,11 @@
package io_test
package bufio_test
import (
"crypto/rand"
"testing"
"v2ray.com/core/common/buf"
. "v2ray.com/core/common/io"
. "v2ray.com/core/common/bufio"
"v2ray.com/core/testing/assert"
)
@ -14,7 +14,7 @@ func TestBufferedWriter(t *testing.T) {
content := buf.New()
writer := NewBufferedWriter(content)
writer := NewWriter(content)
assert.Bool(writer.Cached()).IsTrue()
payload := make([]byte, 16)
@ -34,7 +34,7 @@ func TestBufferedWriterLargePayload(t *testing.T) {
content := buf.NewLocal(128 * 1024)
writer := NewBufferedWriter(content)
writer := NewWriter(content)
assert.Bool(writer.Cached()).IsTrue()
payload := make([]byte, 64*1024)

15
common/bufio/bufio.go Normal file
View File

@ -0,0 +1,15 @@
// Package bufio is a replacement of the standard golang package bufio.
package bufio
import (
"bufio"
"io"
)
func OriginalReader(reader io.Reader) *bufio.Reader {
return bufio.NewReader(reader)
}
func OriginalReaderSize(reader io.Reader, size int) *bufio.Reader {
return bufio.NewReaderSize(reader, size)
}

View File

@ -3,7 +3,7 @@
package common
import (
"v2ray.com/core/common/errors"
"errors"
)
var (

View File

@ -1,50 +0,0 @@
package io
import (
"io"
"sync"
"v2ray.com/core/common/buf"
)
type ChainWriter struct {
sync.Mutex
writer Writer
}
func NewChainWriter(writer Writer) *ChainWriter {
return &ChainWriter{
writer: writer,
}
}
func (v *ChainWriter) Write(payload []byte) (int, error) {
v.Lock()
defer v.Unlock()
if v.writer == nil {
return 0, io.ErrClosedPipe
}
bytesWritten := 0
size := len(payload)
for size > 0 {
buffer := buf.New()
nBytes, _ := buffer.Write(payload)
size -= nBytes
payload = payload[nBytes:]
bytesWritten += nBytes
err := v.writer.Write(buffer)
if err != nil {
return bytesWritten, err
}
}
return bytesWritten, nil
}
func (v *ChainWriter) Release() {
v.Lock()
v.writer.Release()
v.writer = nil
v.Unlock()
}

View File

@ -1,62 +0,0 @@
package io
import (
"io"
"sync"
"v2ray.com/core/common/buf"
)
type ChanReader struct {
sync.Mutex
stream Reader
current *buf.Buffer
eof bool
}
func NewChanReader(stream Reader) *ChanReader {
return &ChanReader{
stream: stream,
}
}
// Private: Visible for testing.
func (v *ChanReader) Fill() {
b, err := v.stream.Read()
v.current = b
if err != nil {
v.eof = true
v.current = nil
}
}
func (v *ChanReader) Read(b []byte) (int, error) {
if v.eof {
return 0, io.EOF
}
v.Lock()
defer v.Unlock()
if v.current == nil {
v.Fill()
if v.eof {
return 0, io.EOF
}
}
nBytes, err := v.current.Read(b)
if v.current.IsEmpty() {
v.current.Release()
v.current = nil
}
return nBytes, err
}
func (v *ChanReader) Release() {
v.Lock()
defer v.Unlock()
v.eof = true
v.current.Release()
v.current = nil
v.stream = nil
}

View File

@ -1,69 +0,0 @@
package io
import (
"io"
"v2ray.com/core/common"
"v2ray.com/core/common/buf"
)
// Reader extends io.Reader with alloc.Buffer.
type Reader interface {
common.Releasable
// Read reads content from underlying reader, and put it into an alloc.Buffer.
Read() (*buf.Buffer, error)
}
// AdaptiveReader is a Reader that adjusts its reading speed automatically.
type AdaptiveReader struct {
reader io.Reader
largeBuffer *buf.Buffer
highVolumn bool
}
// NewAdaptiveReader creates a new AdaptiveReader.
// The AdaptiveReader instance doesn't take the ownership of reader.
func NewAdaptiveReader(reader io.Reader) *AdaptiveReader {
return &AdaptiveReader{
reader: reader,
}
}
// Read implements Reader.Read().
func (v *AdaptiveReader) Read() (*buf.Buffer, error) {
if v.highVolumn && v.largeBuffer.IsEmpty() {
if v.largeBuffer == nil {
v.largeBuffer = buf.NewLocal(32 * 1024)
}
err := v.largeBuffer.AppendSupplier(buf.ReadFrom(v.reader))
if err != nil {
return nil, err
}
if v.largeBuffer.Len() < buf.Size {
v.highVolumn = false
}
}
buffer := buf.New()
if !v.largeBuffer.IsEmpty() {
buffer.AppendSupplier(buf.ReadFrom(v.largeBuffer))
return buffer, nil
}
err := buffer.AppendSupplier(buf.ReadFrom(v.reader))
if err != nil {
buffer.Release()
return nil, err
}
if buffer.IsFull() {
v.highVolumn = true
}
return buffer, nil
}
// Release implements Releasable.Release().
func (v *AdaptiveReader) Release() {
v.reader = nil
}

View File

@ -1,40 +0,0 @@
package io
import (
"io"
"v2ray.com/core/common/errors"
"v2ray.com/core/common/log"
)
// Pipe dumps all content from reader to writer, until an error happens.
func Pipe(reader Reader, writer Writer) error {
for {
buffer, err := reader.Read()
if err != nil {
log.Debug("IO: Pipe exits as ", err)
return err
}
if buffer.IsEmpty() {
buffer.Release()
continue
}
err = writer.Write(buffer)
if err != nil {
log.Debug("IO: Pipe exits as ", err)
buffer.Release()
return err
}
}
}
// PipeUntilEOF behaves the same as Pipe(). The only difference is PipeUntilEOF returns nil on EOF.
func PipeUntilEOF(reader Reader, writer Writer) error {
err := Pipe(reader, writer)
if err != nil && errors.Cause(err) != io.EOF {
return err
}
return nil
}

View File

@ -1,48 +0,0 @@
package io
import (
"io"
"v2ray.com/core/common"
"v2ray.com/core/common/buf"
)
// Writer extends io.Writer with alloc.Buffer.
type Writer interface {
common.Releasable
// Write writes an alloc.Buffer into underlying writer.
Write(*buf.Buffer) error
}
// AdaptiveWriter is a Writer that writes alloc.Buffer into underlying writer.
type AdaptiveWriter struct {
writer io.Writer
}
// NewAdaptiveWriter creates a new AdaptiveWriter.
func NewAdaptiveWriter(writer io.Writer) *AdaptiveWriter {
return &AdaptiveWriter{
writer: writer,
}
}
// Write implements Writer.Write(). Write() takes ownership of the given buffer.
func (v *AdaptiveWriter) Write(buffer *buf.Buffer) error {
defer buffer.Release()
for {
nBytes, err := v.writer.Write(buffer.Bytes())
if err != nil {
return err
}
if nBytes == buffer.Len() {
break
}
buffer.SliceFrom(nBytes)
}
return nil
}
// Release implements Releasable.Release().
func (v *AdaptiveWriter) Release() {
v.writer = nil
}

View File

@ -1,12 +1,8 @@
package serial
import (
"hash"
import "hash"
"v2ray.com/core/common/buf"
)
func WriteHash(h hash.Hash) buf.Supplier {
func WriteHash(h hash.Hash) func([]byte) (int, error) {
return func(b []byte) (int, error) {
h.Sum(b[:0])
return h.Size(), nil

View File

@ -1,10 +1,6 @@
package serial
import (
"strconv"
"v2ray.com/core/common/buf"
)
import "strconv"
func Uint16ToBytes(value uint16, b []byte) []byte {
return append(b, byte(value>>8), byte(value))
@ -14,7 +10,7 @@ func Uint16ToString(value uint16) string {
return strconv.Itoa(int(value))
}
func WriteUint16(value uint16) buf.Supplier {
func WriteUint16(value uint16) func([]byte) (int, error) {
return func(b []byte) (int, error) {
b = Uint16ToBytes(value, b[:0])
return 2, nil
@ -29,7 +25,7 @@ func Uint32ToString(value uint32) string {
return strconv.FormatUint(uint64(value), 10)
}
func WriteUint32(value uint32) buf.Supplier {
func WriteUint32(value uint32) func([]byte) (int, error) {
return func(b []byte) (int, error) {
b = Uint32ToBytes(value, b[:0])
return 4, nil

View File

@ -3,8 +3,6 @@ package serial
import (
"fmt"
"strings"
"v2ray.com/core/common/buf"
)
func ToString(v interface{}) string {
@ -36,7 +34,7 @@ func Concat(v ...interface{}) string {
return strings.Join(values, "")
}
func WriteString(s string) buf.Supplier {
func WriteString(s string) func([]byte) (int, error) {
return func(b []byte) (int, error) {
return copy(b, []byte(s)), nil
}

View File

@ -1,11 +1,10 @@
package blackhole
import (
"v2ray.com/core/common/buf"
v2io "v2ray.com/core/common/io"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/any"
"v2ray.com/core/common/buf"
"v2ray.com/core/common/serial"
)
@ -21,17 +20,17 @@ Content-Length: 0
type ResponseConfig interface {
AsAny() *any.Any
WriteTo(v2io.Writer)
WriteTo(buf.Writer)
}
func (v *NoneResponse) WriteTo(v2io.Writer) {}
func (v *NoneResponse) WriteTo(buf.Writer) {}
func (v *NoneResponse) AsAny() *any.Any {
r, _ := ptypes.MarshalAny(v)
return r
}
func (v *HTTPResponse) WriteTo(writer v2io.Writer) {
func (v *HTTPResponse) WriteTo(writer buf.Writer) {
b := buf.NewLocal(512)
b.AppendSupplier(serial.WriteString(http403response))
writer.Write(b)

View File

@ -6,7 +6,6 @@ import (
"testing"
"v2ray.com/core/common/buf"
v2io "v2ray.com/core/common/io"
. "v2ray.com/core/proxy/blackhole"
"v2ray.com/core/testing/assert"
)
@ -17,7 +16,7 @@ func TestHTTPResponse(t *testing.T) {
buffer := buf.New()
httpResponse := new(HTTPResponse)
httpResponse.WriteTo(v2io.NewAdaptiveWriter(buffer))
httpResponse.WriteTo(buf.NewWriter(buffer))
reader := bufio.NewReader(buffer)
response, err := http.ReadResponse(reader, nil)

View File

@ -7,7 +7,6 @@ import (
"v2ray.com/core/app/dispatcher"
"v2ray.com/core/common/buf"
"v2ray.com/core/common/errors"
v2io "v2ray.com/core/common/io"
"v2ray.com/core/common/loader"
"v2ray.com/core/common/log"
v2net "v2ray.com/core/common/net"
@ -176,10 +175,10 @@ func (v *DokodemoDoor) HandleTCPConnection(conn internet.Connection) {
wg.Add(1)
go func() {
v2reader := v2io.NewAdaptiveReader(reader)
v2reader := buf.NewReader(reader)
defer v2reader.Release()
if err := v2io.PipeUntilEOF(v2reader, ray.InboundInput()); err != nil {
if err := buf.PipeUntilEOF(v2reader, ray.InboundInput()); err != nil {
log.Info("Dokodemo: Failed to transport all TCP request: ", err)
}
wg.Done()
@ -188,10 +187,10 @@ func (v *DokodemoDoor) HandleTCPConnection(conn internet.Connection) {
wg.Add(1)
go func() {
v2writer := v2io.NewAdaptiveWriter(conn)
v2writer := buf.NewWriter(conn)
defer v2writer.Release()
if err := v2io.PipeUntilEOF(ray.InboundOutput(), v2writer); err != nil {
if err := buf.PipeUntilEOF(ray.InboundOutput(), v2writer); err != nil {
log.Info("Dokodemo: Failed to transport all TCP response: ", err)
}
wg.Done()

View File

@ -2,12 +2,12 @@ package freedom
import (
"io"
"v2ray.com/core/app"
"v2ray.com/core/app/dns"
"v2ray.com/core/common/buf"
"v2ray.com/core/common/dice"
"v2ray.com/core/common/errors"
v2io "v2ray.com/core/common/io"
"v2ray.com/core/common/loader"
"v2ray.com/core/common/log"
v2net "v2ray.com/core/common/net"
@ -100,10 +100,10 @@ func (v *FreedomConnection) Dispatch(destination v2net.Destination, payload *buf
}
go func() {
v2writer := v2io.NewAdaptiveWriter(conn)
v2writer := buf.NewWriter(conn)
defer v2writer.Release()
if err := v2io.PipeUntilEOF(input, v2writer); err != nil {
if err := buf.PipeUntilEOF(input, v2writer); err != nil {
log.Info("Freedom: Failed to transport all TCP request: ", err)
}
if tcpConn, ok := conn.(*tcp.RawConnection); ok {
@ -121,8 +121,8 @@ func (v *FreedomConnection) Dispatch(destination v2net.Destination, payload *buf
reader = v2net.NewTimeOutReader(timeout /* seconds */, conn)
}
v2reader := v2io.NewAdaptiveReader(reader)
if err := v2io.PipeUntilEOF(v2reader, output); err != nil {
v2reader := buf.NewReader(reader)
if err := buf.PipeUntilEOF(v2reader, output); err != nil {
log.Info("Freedom: Failed to transport all TCP response: ", err)
}
v2reader.Release()

View File

@ -1,7 +1,6 @@
package http
import (
"bufio"
"io"
"net"
"net/http"
@ -12,8 +11,9 @@ import (
"v2ray.com/core/app"
"v2ray.com/core/app/dispatcher"
"v2ray.com/core/common"
"v2ray.com/core/common/buf"
"v2ray.com/core/common/bufio"
"v2ray.com/core/common/errors"
v2io "v2ray.com/core/common/io"
"v2ray.com/core/common/loader"
"v2ray.com/core/common/log"
v2net "v2ray.com/core/common/net"
@ -98,7 +98,7 @@ func parseHost(rawHost string, defaultPort v2net.Port) (v2net.Destination, error
func (v *Server) handleConnection(conn internet.Connection) {
defer conn.Close()
timedReader := v2net.NewTimeOutReader(v.config.Timeout, conn)
reader := bufio.NewReaderSize(timedReader, 2048)
reader := bufio.OriginalReaderSize(timedReader, 2048)
request, err := http.ReadRequest(reader)
if err != nil {
@ -158,10 +158,10 @@ func (v *Server) transport(input io.Reader, output io.Writer, ray ray.InboundRay
defer wg.Wait()
go func() {
v2reader := v2io.NewAdaptiveReader(input)
v2reader := buf.NewReader(input)
defer v2reader.Release()
if err := v2io.PipeUntilEOF(v2reader, ray.InboundInput()); err != nil {
if err := buf.PipeUntilEOF(v2reader, ray.InboundInput()); err != nil {
log.Info("HTTP: Failed to transport all TCP request: ", err)
}
ray.InboundInput().Close()
@ -169,10 +169,10 @@ func (v *Server) transport(input io.Reader, output io.Writer, ray ray.InboundRay
}()
go func() {
v2writer := v2io.NewAdaptiveWriter(output)
v2writer := buf.NewWriter(output)
defer v2writer.Release()
if err := v2io.PipeUntilEOF(ray.InboundOutput(), v2writer); err != nil {
if err := buf.PipeUntilEOF(ray.InboundOutput(), v2writer); err != nil {
log.Info("HTTP: Failed to transport all TCP response: ", err)
}
ray.InboundOutput().Release()
@ -221,7 +221,7 @@ func (v *Server) GenerateResponse(statusCode int, status string) *http.Response
}
}
func (v *Server) handlePlainHTTP(request *http.Request, session *proxy.SessionInfo, reader *bufio.Reader, writer io.Writer) {
func (v *Server) handlePlainHTTP(request *http.Request, session *proxy.SessionInfo, reader io.Reader, writer io.Writer) {
if len(request.URL.Host) <= 0 {
response := v.GenerateResponse(400, "Bad Request")
response.Write(writer)
@ -240,7 +240,7 @@ func (v *Server) handlePlainHTTP(request *http.Request, session *proxy.SessionIn
finish.Add(1)
go func() {
defer finish.Done()
requestWriter := v2io.NewBufferedWriter(v2io.NewChainWriter(ray.InboundInput()))
requestWriter := bufio.NewWriter(buf.NewBytesWriter(ray.InboundInput()))
err := request.Write(requestWriter)
if err != nil {
log.Warning("HTTP: Failed to write request: ", err)
@ -252,13 +252,13 @@ func (v *Server) handlePlainHTTP(request *http.Request, session *proxy.SessionIn
finish.Add(1)
go func() {
defer finish.Done()
responseReader := bufio.NewReader(v2io.NewChanReader(ray.InboundOutput()))
responseReader := bufio.OriginalReader(buf.NewBytesReader(ray.InboundOutput()))
response, err := http.ReadResponse(responseReader, request)
if err != nil {
log.Warning("HTTP: Failed to read response: ", err)
response = v.GenerateResponse(503, "Service Unavailable")
}
responseWriter := v2io.NewBufferedWriter(writer)
responseWriter := bufio.NewWriter(writer)
err = response.Write(responseWriter)
if err != nil {
log.Warning("HTTP: Failed to write response: ", err)

View File

@ -2,9 +2,10 @@ package shadowsocks
import (
"sync"
"v2ray.com/core/app"
"v2ray.com/core/common/buf"
v2io "v2ray.com/core/common/io"
"v2ray.com/core/common/bufio"
"v2ray.com/core/common/log"
v2net "v2ray.com/core/common/net"
"v2ray.com/core/common/protocol"
@ -87,7 +88,7 @@ func (v *Client) Dispatch(destination v2net.Destination, payload *buf.Buffer, ra
}
if request.Command == protocol.RequestCommandTCP {
bufferedWriter := v2io.NewBufferedWriter(conn)
bufferedWriter := bufio.NewWriter(conn)
defer bufferedWriter.Release()
bodyWriter, err := WriteTCPRequest(request, bufferedWriter)
@ -115,13 +116,13 @@ func (v *Client) Dispatch(destination v2net.Destination, payload *buf.Buffer, ra
return
}
if err := v2io.PipeUntilEOF(responseReader, ray.OutboundOutput()); err != nil {
if err := buf.PipeUntilEOF(responseReader, ray.OutboundOutput()); err != nil {
log.Info("Shadowsocks|Client: Failed to transport all TCP response: ", err)
}
}()
bufferedWriter.SetCached(false)
if err := v2io.PipeUntilEOF(ray.OutboundInput(), bodyWriter); err != nil {
if err := buf.PipeUntilEOF(ray.OutboundInput(), bodyWriter); err != nil {
log.Info("Shadowsocks|Client: Failed to trasnport all TCP request: ", err)
}
@ -141,7 +142,7 @@ func (v *Client) Dispatch(destination v2net.Destination, payload *buf.Buffer, ra
User: user,
}
if err := v2io.PipeUntilEOF(reader, ray.OutboundOutput()); err != nil {
if err := buf.PipeUntilEOF(reader, ray.OutboundOutput()); err != nil {
log.Info("Shadowsocks|Client: Failed to transport all UDP response: ", err)
}
}()
@ -156,7 +157,7 @@ func (v *Client) Dispatch(destination v2net.Destination, payload *buf.Buffer, ra
return
}
}
if err := v2io.PipeUntilEOF(ray.OutboundInput(), writer); err != nil {
if err := buf.PipeUntilEOF(ray.OutboundInput(), writer); err != nil {
log.Info("Shadowsocks|Client: Failed to transport all UDP request: ", err)
}

View File

@ -8,7 +8,6 @@ import (
"v2ray.com/core/common/buf"
"v2ray.com/core/common/crypto"
"v2ray.com/core/common/errors"
v2io "v2ray.com/core/common/io"
v2net "v2ray.com/core/common/net"
"v2ray.com/core/common/protocol"
"v2ray.com/core/common/serial"
@ -23,7 +22,7 @@ const (
AddrTypeDomain = 3
)
func ReadTCPSession(user *protocol.User, reader io.Reader) (*protocol.RequestHeader, v2io.Reader, error) {
func ReadTCPSession(user *protocol.User, reader io.Reader) (*protocol.RequestHeader, buf.Reader, error) {
rawAccount, err := user.GetTypedAccount()
if err != nil {
return nil, nil, errors.Base(err).Message("Shadowsocks|TCP: Failed to parse account.")
@ -121,17 +120,17 @@ func ReadTCPSession(user *protocol.User, reader io.Reader) (*protocol.RequestHea
}
}
var chunkReader v2io.Reader
var chunkReader buf.Reader
if request.Option.Has(RequestOptionOneTimeAuth) {
chunkReader = NewChunkReader(reader, NewAuthenticator(ChunkKeyGenerator(iv)))
} else {
chunkReader = v2io.NewAdaptiveReader(reader)
chunkReader = buf.NewReader(reader)
}
return request, chunkReader, nil
}
func WriteTCPRequest(request *protocol.RequestHeader, writer io.Writer) (v2io.Writer, error) {
func WriteTCPRequest(request *protocol.RequestHeader, writer io.Writer) (buf.Writer, error) {
user := request.User
rawAccount, err := user.GetTypedAccount()
if err != nil {
@ -183,17 +182,17 @@ func WriteTCPRequest(request *protocol.RequestHeader, writer io.Writer) (v2io.Wr
return nil, errors.Base(err).Message("Shadowsocks|TCP: Failed to write header.")
}
var chunkWriter v2io.Writer
var chunkWriter buf.Writer
if request.Option.Has(RequestOptionOneTimeAuth) {
chunkWriter = NewChunkWriter(writer, NewAuthenticator(ChunkKeyGenerator(iv)))
} else {
chunkWriter = v2io.NewAdaptiveWriter(writer)
chunkWriter = buf.NewWriter(writer)
}
return chunkWriter, nil
}
func ReadTCPResponse(user *protocol.User, reader io.Reader) (v2io.Reader, error) {
func ReadTCPResponse(user *protocol.User, reader io.Reader) (buf.Reader, error) {
rawAccount, err := user.GetTypedAccount()
if err != nil {
return nil, errors.Base(err).Message("Shadowsocks|TCP: Failed to parse account.")
@ -210,10 +209,10 @@ func ReadTCPResponse(user *protocol.User, reader io.Reader) (v2io.Reader, error)
if err != nil {
return nil, errors.Base(err).Message("Shadowsocks|TCP: Failed to initialize decoding stream.")
}
return v2io.NewAdaptiveReader(crypto.NewCryptionReader(stream, reader)), nil
return buf.NewReader(crypto.NewCryptionReader(stream, reader)), nil
}
func WriteTCPResponse(request *protocol.RequestHeader, writer io.Writer) (v2io.Writer, error) {
func WriteTCPResponse(request *protocol.RequestHeader, writer io.Writer) (buf.Writer, error) {
user := request.User
rawAccount, err := user.GetTypedAccount()
if err != nil {
@ -233,7 +232,7 @@ func WriteTCPResponse(request *protocol.RequestHeader, writer io.Writer) (v2io.W
return nil, errors.Base(err).Message("Shadowsocks|TCP: Failed to create encoding stream.")
}
return v2io.NewAdaptiveWriter(crypto.NewCryptionWriter(stream, writer)), nil
return buf.NewWriter(crypto.NewCryptionWriter(stream, writer)), nil
}
func EncodeUDPPacket(request *protocol.RequestHeader, payload *buf.Buffer) (*buf.Buffer, error) {

View File

@ -3,12 +3,13 @@ package shadowsocks
import (
"sync"
"v2ray.com/core/app"
"v2ray.com/core/app/dispatcher"
"v2ray.com/core/common"
"v2ray.com/core/common/buf"
"v2ray.com/core/common/bufio"
"v2ray.com/core/common/errors"
v2io "v2ray.com/core/common/io"
"v2ray.com/core/common/log"
v2net "v2ray.com/core/common/net"
"v2ray.com/core/common/protocol"
@ -150,7 +151,7 @@ func (v *Server) handleConnection(conn internet.Connection) {
timedReader := v2net.NewTimeOutReader(16, conn)
defer timedReader.Release()
bufferedReader := v2io.NewBufferedReader(timedReader)
bufferedReader := bufio.NewReader(timedReader)
defer bufferedReader.Release()
request, bodyReader, err := ReadTCPSession(v.user, bufferedReader)
@ -183,7 +184,7 @@ func (v *Server) handleConnection(conn internet.Connection) {
go func() {
defer writeFinish.Unlock()
bufferedWriter := v2io.NewBufferedWriter(conn)
bufferedWriter := bufio.NewWriter(conn)
defer bufferedWriter.Release()
responseWriter, err := WriteTCPResponse(request, bufferedWriter)
@ -197,13 +198,13 @@ func (v *Server) handleConnection(conn internet.Connection) {
responseWriter.Write(payload)
bufferedWriter.SetCached(false)
if err := v2io.PipeUntilEOF(ray.InboundOutput(), responseWriter); err != nil {
if err := buf.PipeUntilEOF(ray.InboundOutput(), responseWriter); err != nil {
log.Info("Shadowsocks|Server: Failed to transport all TCP response: ", err)
}
}
}()
if err := v2io.PipeUntilEOF(bodyReader, ray.InboundInput()); err != nil {
if err := buf.PipeUntilEOF(bodyReader, ray.InboundInput()); err != nil {
log.Info("Shadowsocks|Server: Failed to transport all TCP request: ", err)
}
ray.InboundInput().Close()

View File

@ -7,9 +7,10 @@ import (
"v2ray.com/core/app"
"v2ray.com/core/app/dispatcher"
"v2ray.com/core/common/buf"
"v2ray.com/core/common/bufio"
"v2ray.com/core/common/crypto"
"v2ray.com/core/common/errors"
v2io "v2ray.com/core/common/io"
"v2ray.com/core/common/loader"
"v2ray.com/core/common/log"
v2net "v2ray.com/core/common/net"
@ -106,10 +107,10 @@ func (v *Server) handleConnection(connection internet.Connection) {
defer connection.Close()
timedReader := v2net.NewTimeOutReader(v.config.Timeout, connection)
reader := v2io.NewBufferedReader(timedReader)
reader := bufio.NewReader(timedReader)
defer reader.Release()
writer := v2io.NewBufferedWriter(connection)
writer := bufio.NewWriter(connection)
defer writer.Release()
auth, auth4, err := protocol.ReadAuthentication(reader)
@ -128,7 +129,7 @@ func (v *Server) handleConnection(connection internet.Connection) {
}
}
func (v *Server) handleSocks5(clientAddr v2net.Destination, reader *v2io.BufferedReader, writer *v2io.BufferedWriter, auth protocol.Socks5AuthenticationRequest) error {
func (v *Server) handleSocks5(clientAddr v2net.Destination, reader *bufio.BufferedReader, writer *bufio.BufferedWriter, auth protocol.Socks5AuthenticationRequest) error {
expectedAuthMethod := protocol.AuthNotRequired
if v.config.AuthType == AuthType_PASSWORD {
expectedAuthMethod = protocol.AuthUserPass
@ -232,7 +233,7 @@ func (v *Server) handleSocks5(clientAddr v2net.Destination, reader *v2io.Buffere
return nil
}
func (v *Server) handleUDP(reader io.Reader, writer *v2io.BufferedWriter) error {
func (v *Server) handleUDP(reader io.Reader, writer *bufio.BufferedWriter) error {
response := protocol.NewSocks5Response()
response.Error = protocol.ErrorSuccess
@ -264,7 +265,7 @@ func (v *Server) handleUDP(reader io.Reader, writer *v2io.BufferedWriter) error
return nil
}
func (v *Server) handleSocks4(clientAddr v2net.Destination, reader *v2io.BufferedReader, writer *v2io.BufferedWriter, auth protocol.Socks4AuthenticationRequest) error {
func (v *Server) handleSocks4(clientAddr v2net.Destination, reader *bufio.BufferedReader, writer *bufio.BufferedWriter, auth protocol.Socks4AuthenticationRequest) error {
result := protocol.Socks4RequestGranted
if auth.Command == protocol.CmdBind {
result = protocol.Socks4RequestRejected
@ -302,19 +303,19 @@ func (v *Server) transport(reader io.Reader, writer io.Writer, session *proxy.Se
defer output.Release()
go func() {
v2reader := v2io.NewAdaptiveReader(reader)
v2reader := buf.NewReader(reader)
defer v2reader.Release()
if err := v2io.PipeUntilEOF(v2reader, input); err != nil {
if err := buf.PipeUntilEOF(v2reader, input); err != nil {
log.Info("Socks|Server: Failed to transport all TCP request: ", err)
}
input.Close()
}()
v2writer := v2io.NewAdaptiveWriter(writer)
v2writer := buf.NewWriter(writer)