1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2024-12-21 17:46:58 -05:00

refactor buffer

This commit is contained in:
Darien Raymond 2016-12-09 12:08:25 +01:00
parent 7a80409e30
commit 055023fdd5
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169
53 changed files with 220 additions and 284 deletions

View File

@ -62,7 +62,7 @@ func (v *DefaultDispatcher) DispatchToOutbound(session *proxy.SessionInfo) ray.I
} }
if session.Inbound != nil && session.Inbound.AllowPassiveConnection { if session.Inbound != nil && session.Inbound.AllowPassiveConnection {
go dispatcher.Dispatch(destination, buf.NewLocalBuffer(32), direct) go dispatcher.Dispatch(destination, buf.NewLocal(32), direct)
} else { } else {
go v.FilterPacketAndDispatch(destination, direct, dispatcher) go v.FilterPacketAndDispatch(destination, direct, dispatcher)
} }

View File

@ -20,7 +20,7 @@ func NewTestPacketDispatcher(handler func(destination v2net.Destination, traffic
if err != nil { if err != nil {
break break
} }
output := buf.NewBuffer() output := buf.New()
output.Append([]byte("Processed: ")) output.Append([]byte("Processed: "))
output.Append(payload.Bytes()) output.Append(payload.Bytes())
payload.Release() payload.Release()

View File

@ -145,7 +145,7 @@ func (v *UDPNameServer) HandleResponse(dest v2net.Destination, payload *buf.Buff
} }
func (v *UDPNameServer) BuildQueryA(domain string, id uint16) *buf.Buffer { func (v *UDPNameServer) BuildQueryA(domain string, id uint16) *buf.Buffer {
buffer := buf.NewBuffer() buffer := buf.New()
msg := new(dns.Msg) msg := new(dns.Msg)
msg.Id = id msg.Id = id
msg.RecursionDesired = true msg.RecursionDesired = true

View File

@ -1,12 +1,12 @@
// Package alloc provides a light-weight memory allocation mechanism. // Package buf provides a light-weight memory allocation mechanism.
package buf package buf
import ( import (
"io" "io"
) )
// BytesWriter is a writer that writes contents into the given buffer. // Supplier is a writer that writes contents into the given buffer.
type BytesWriter func([]byte) int type Supplier func([]byte) (int, error)
// Buffer is a recyclable allocation of a byte array. Buffer.Release() recycles // Buffer is a recyclable allocation of a byte array. Buffer.Release() recycles
// the buffer into an internal buffer pool, in order to recreate a buffer more // the buffer into an internal buffer pool, in order to recreate a buffer more
@ -59,10 +59,11 @@ func (b *Buffer) Append(data []byte) {
b.end += nBytes b.end += nBytes
} }
// AppendFunc appends the content of a BytesWriter to the buffer. // AppendSupplier appends the content of a BytesWriter to the buffer.
func (b *Buffer) AppendFunc(writer BytesWriter) { func (b *Buffer) AppendSupplier(writer Supplier) error {
nBytes := writer(b.v[b.end:]) nBytes, err := writer(b.v[b.end:])
b.end += nBytes b.end += nBytes
return err
} }
// Byte returns the bytes at index. // Byte returns the bytes at index.
@ -80,9 +81,11 @@ func (b *Buffer) Bytes() []byte {
return b.v[b.start:b.end] return b.v[b.start:b.end]
} }
func (b *Buffer) SetBytesFunc(writer BytesWriter) { func (b *Buffer) Reset(writer Supplier) error {
b.start = 0 b.start = 0
b.end = b.start + writer(b.v[b.start:]) nBytes, err := writer(b.v[b.start:])
b.end = b.start + nBytes
return err
} }
// BytesRange returns a slice of this buffer with given from and to bounary. // BytesRange returns a slice of this buffer with given from and to bounary.
@ -172,33 +175,21 @@ func (b *Buffer) Read(data []byte) (int, error) {
return nBytes, nil return nBytes, nil
} }
func (b *Buffer) FillFrom(reader io.Reader) (int, error) {
nBytes, err := reader.Read(b.v[b.end:])
b.end += nBytes
return nBytes, err
}
func (b *Buffer) FillFullFrom(reader io.Reader, amount int) (int, error) {
nBytes, err := io.ReadFull(reader, b.v[b.end:b.end+amount])
b.end += nBytes
return nBytes, err
}
func (b *Buffer) String() string { func (b *Buffer) String() string {
return string(b.Bytes()) return string(b.Bytes())
} }
// NewBuffer creates a Buffer with 8K bytes of arbitrary content. // New creates a Buffer with 8K bytes of arbitrary content.
func NewBuffer() *Buffer { func New() *Buffer {
return mediumPool.Allocate() return mediumPool.Allocate()
} }
// NewSmallBuffer returns a buffer with 2K bytes capacity. // NewSmall returns a buffer with 2K bytes capacity.
func NewSmallBuffer() *Buffer { func NewSmall() *Buffer {
return smallPool.Allocate() return smallPool.Allocate()
} }
// NewLocalBuffer creates and returns a buffer on current thread. // NewLocal creates and returns a buffer on current thread.
func NewLocalBuffer(size int) *Buffer { func NewLocal(size int) *Buffer {
return CreateBuffer(make([]byte, size), nil) return CreateBuffer(make([]byte, size), nil)
} }

View File

@ -84,15 +84,15 @@ func (p *BufferPool) Free(buffer *Buffer) {
} }
const ( const (
BufferSize = 8 * 1024 Size = 8 * 1024
SmallBufferSize = 2 * 1024 SizeSmall = 2 * 1024
PoolSizeEnvKey = "v2ray.buffer.size" PoolSizeEnvKey = "v2ray.buffer.size"
) )
var ( var (
mediumPool Pool mediumPool Pool
smallPool = NewSyncPool(2048) smallPool = NewSyncPool(SizeSmall)
) )
func init() { func init() {
@ -106,8 +106,8 @@ func init() {
} }
if size > 0 { if size > 0 {
totalByteSize := size * 1024 * 1024 totalByteSize := size * 1024 * 1024
mediumPool = NewBufferPool(BufferSize, totalByteSize/BufferSize) mediumPool = NewBufferPool(Size, totalByteSize/Size)
} else { } else {
mediumPool = NewSyncPool(BufferSize) mediumPool = NewSyncPool(Size)
} }
} }

View File

@ -11,7 +11,7 @@ import (
func TestBufferClear(t *testing.T) { func TestBufferClear(t *testing.T) {
assert := assert.On(t) assert := assert.On(t)
buffer := NewBuffer() buffer := New()
defer buffer.Release() defer buffer.Release()
payload := "Bytes" payload := "Bytes"
@ -25,7 +25,7 @@ func TestBufferClear(t *testing.T) {
func TestBufferIsEmpty(t *testing.T) { func TestBufferIsEmpty(t *testing.T) {
assert := assert.On(t) assert := assert.On(t)
buffer := NewBuffer() buffer := New()
defer buffer.Release() defer buffer.Release()
assert.Bool(buffer.IsEmpty()).IsTrue() assert.Bool(buffer.IsEmpty()).IsTrue()
@ -34,17 +34,17 @@ func TestBufferIsEmpty(t *testing.T) {
func TestBufferString(t *testing.T) { func TestBufferString(t *testing.T) {
assert := assert.On(t) assert := assert.On(t)
buffer := NewBuffer() buffer := New()
defer buffer.Release() defer buffer.Release()
buffer.AppendFunc(serial.WriteString("Test String")) buffer.AppendSupplier(serial.WriteString("Test String"))
assert.String(buffer.String()).Equals("Test String") assert.String(buffer.String()).Equals("Test String")
} }
func TestBufferWrite(t *testing.T) { func TestBufferWrite(t *testing.T) {
assert := assert.On(t) assert := assert.On(t)
buffer := NewLocalBuffer(8) buffer := NewLocal(8)
nBytes, err := buffer.Write([]byte("abcd")) nBytes, err := buffer.Write([]byte("abcd"))
assert.Error(err).IsNil() assert.Error(err).IsNil()
assert.Int(nBytes).Equals(4) assert.Int(nBytes).Equals(4)
@ -56,28 +56,28 @@ func TestBufferWrite(t *testing.T) {
func BenchmarkNewBuffer8192(b *testing.B) { func BenchmarkNewBuffer8192(b *testing.B) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
buffer := NewBuffer() buffer := New()
buffer.Release() buffer.Release()
} }
} }
func BenchmarkNewLocalBuffer8192(b *testing.B) { func BenchmarkNewLocalBuffer8192(b *testing.B) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
buffer := NewLocalBuffer(8192) buffer := NewLocal(8192)
buffer.Release() buffer.Release()
} }
} }
func BenchmarkNewBuffer2048(b *testing.B) { func BenchmarkNewBuffer2048(b *testing.B) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
buffer := NewSmallBuffer() buffer := NewSmall()
buffer.Release() buffer.Release()
} }
} }
func BenchmarkNewLocalBuffer2048(b *testing.B) { func BenchmarkNewLocalBuffer2048(b *testing.B) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
buffer := NewLocalBuffer(2048) buffer := NewLocal(2048)
buffer.Release() buffer.Release()
} }
} }
@ -94,7 +94,7 @@ func BenchmarkBufferValue(b *testing.B) {
} }
func BenchmarkBufferPointer(b *testing.B) { func BenchmarkBufferPointer(b *testing.B) {
x := NewSmallBuffer() x := NewSmall()
doSomething := func(a *Buffer) { doSomething := func(a *Buffer) {
_ = a.Len() _ = a.Len()
} }

15
common/buf/io.go Normal file
View File

@ -0,0 +1,15 @@
package buf
import "io"
func ReadFrom(reader io.Reader) Supplier {
return func(b []byte) (int, error) {
return reader.Read(b)
}
}
func ReadFullFrom(reader io.Reader, size int) Supplier {
return func(b []byte) (int, error) {
return io.ReadFull(reader, b[:size])
}
}

View File

@ -81,7 +81,7 @@ type AuthenticationReader struct {
func NewAuthenticationReader(auth Authenticator, reader io.Reader, aggressive bool) *AuthenticationReader { func NewAuthenticationReader(auth Authenticator, reader io.Reader, aggressive bool) *AuthenticationReader {
return &AuthenticationReader{ return &AuthenticationReader{
auth: auth, auth: auth,
buffer: buf.NewLocalBuffer(32 * 1024), buffer: buf.NewLocal(32 * 1024),
reader: reader, reader: reader,
aggressive: aggressive, aggressive: aggressive,
} }
@ -135,11 +135,11 @@ func (v *AuthenticationReader) EnsureChunk() error {
v.buffer.Clear() v.buffer.Clear()
} else { } else {
leftover := v.buffer.Bytes() leftover := v.buffer.Bytes()
v.buffer.SetBytesFunc(func(b []byte) int { v.buffer.Reset(func(b []byte) (int, error) {
return copy(b, leftover) return copy(b, leftover), nil
}) })
} }
_, err = v.buffer.FillFrom(v.reader) err = v.buffer.AppendSupplier(buf.ReadFrom(v.reader))
} }
return err return err
} }

View File

@ -17,7 +17,7 @@ type BufferedReader struct {
func NewBufferedReader(rawReader io.Reader) *BufferedReader { func NewBufferedReader(rawReader io.Reader) *BufferedReader {
return &BufferedReader{ return &BufferedReader{
reader: rawReader, reader: rawReader,
buffer: buf.NewBuffer(), buffer: buf.New(),
cached: true, cached: true,
} }
} }
@ -54,7 +54,7 @@ func (v *BufferedReader) Read(b []byte) (int, error) {
return v.reader.Read(b) return v.reader.Read(b)
} }
if v.buffer.IsEmpty() { if v.buffer.IsEmpty() {
_, err := v.buffer.FillFrom(v.reader) err := v.buffer.AppendSupplier(buf.ReadFrom(v.reader))
if err != nil { if err != nil {
return 0, err return 0, err
} }

View File

@ -1,9 +1,9 @@
package io_test package io_test
import ( import (
"crypto/rand"
"testing" "testing"
"crypto/rand"
"v2ray.com/core/common/buf" "v2ray.com/core/common/buf"
. "v2ray.com/core/common/io" . "v2ray.com/core/common/io"
"v2ray.com/core/testing/assert" "v2ray.com/core/testing/assert"
@ -12,8 +12,8 @@ import (
func TestBufferedReader(t *testing.T) { func TestBufferedReader(t *testing.T) {
assert := assert.On(t) assert := assert.On(t)
content := buf.NewBuffer() content := buf.New()
content.FillFrom(rand.Reader) content.AppendSupplier(buf.ReadFrom(rand.Reader))
len := content.Len() len := content.Len()

View File

@ -3,6 +3,7 @@ package io
import ( import (
"io" "io"
"sync" "sync"
"v2ray.com/core/common/buf" "v2ray.com/core/common/buf"
"v2ray.com/core/common/errors" "v2ray.com/core/common/errors"
) )
@ -17,7 +18,7 @@ type BufferedWriter struct {
func NewBufferedWriter(rawWriter io.Writer) *BufferedWriter { func NewBufferedWriter(rawWriter io.Writer) *BufferedWriter {
return &BufferedWriter{ return &BufferedWriter{
writer: rawWriter, writer: rawWriter,
buffer: buf.NewSmallBuffer(), buffer: buf.NewSmall(),
cached: true, cached: true,
} }
} }
@ -32,8 +33,9 @@ func (v *BufferedWriter) ReadFrom(reader io.Reader) (int64, error) {
totalBytes := int64(0) totalBytes := int64(0)
for { for {
nBytes, err := v.buffer.FillFrom(reader) oriSize := v.buffer.Len()
totalBytes += int64(nBytes) err := v.buffer.AppendSupplier(buf.ReadFrom(reader))
totalBytes += int64(v.buffer.Len() - oriSize)
if err != nil { if err != nil {
if errors.Cause(err) == io.EOF { if errors.Cause(err) == io.EOF {
return totalBytes, nil return totalBytes, nil

View File

@ -12,7 +12,7 @@ import (
func TestBufferedWriter(t *testing.T) { func TestBufferedWriter(t *testing.T) {
assert := assert.On(t) assert := assert.On(t)
content := buf.NewBuffer() content := buf.New()
writer := NewBufferedWriter(content) writer := NewBufferedWriter(content)
assert.Bool(writer.Cached()).IsTrue() assert.Bool(writer.Cached()).IsTrue()
@ -32,7 +32,7 @@ func TestBufferedWriter(t *testing.T) {
func TestBufferedWriterLargePayload(t *testing.T) { func TestBufferedWriterLargePayload(t *testing.T) {
assert := assert.On(t) assert := assert.On(t)
content := buf.NewLocalBuffer(128 * 1024) content := buf.NewLocal(128 * 1024)
writer := NewBufferedWriter(content) writer := NewBufferedWriter(content)
assert.Bool(writer.Cached()).IsTrue() assert.Bool(writer.Cached()).IsTrue()

View File

@ -28,7 +28,7 @@ func (v *ChainWriter) Write(payload []byte) (int, error) {
bytesWritten := 0 bytesWritten := 0
size := len(payload) size := len(payload)
for size > 0 { for size > 0 {
buffer := buf.NewBuffer() buffer := buf.New()
nBytes, _ := buffer.Write(payload) nBytes, _ := buffer.Write(payload)
size -= nBytes size -= nBytes
payload = payload[nBytes:] payload = payload[nBytes:]

View File

@ -33,24 +33,24 @@ func NewAdaptiveReader(reader io.Reader) *AdaptiveReader {
func (v *AdaptiveReader) Read() (*buf.Buffer, error) { func (v *AdaptiveReader) Read() (*buf.Buffer, error) {
if v.highVolumn && v.largeBuffer.IsEmpty() { if v.highVolumn && v.largeBuffer.IsEmpty() {
if v.largeBuffer == nil { if v.largeBuffer == nil {
v.largeBuffer = buf.NewLocalBuffer(32 * 1024) v.largeBuffer = buf.NewLocal(32 * 1024)
} }
nBytes, err := v.largeBuffer.FillFrom(v.reader) err := v.largeBuffer.AppendSupplier(buf.ReadFrom(v.reader))
if err != nil { if err != nil {
return nil, err return nil, err
} }
if nBytes < buf.BufferSize { if v.largeBuffer.Len() < buf.Size {
v.highVolumn = false v.highVolumn = false
} }
} }
buffer := buf.NewBuffer() buffer := buf.New()
if !v.largeBuffer.IsEmpty() { if !v.largeBuffer.IsEmpty() {
buffer.FillFrom(v.largeBuffer) buffer.AppendSupplier(buf.ReadFrom(v.largeBuffer))
return buffer, nil return buffer, nil
} }
_, err := buffer.FillFrom(v.reader) err := buffer.AppendSupplier(buf.ReadFrom(v.reader))
if err != nil { if err != nil {
buffer.Release() buffer.Release()
return nil, err return nil, err

View File

@ -19,8 +19,8 @@ func TestAdaptiveReader(t *testing.T) {
b1, err := reader.Read() b1, err := reader.Read()
assert.Error(err).IsNil() assert.Error(err).IsNil()
assert.Bool(b1.IsFull()).IsTrue() assert.Bool(b1.IsFull()).IsTrue()
assert.Int(b1.Len()).Equals(buf.BufferSize) assert.Int(b1.Len()).Equals(buf.Size)
assert.Int(buffer.Len()).Equals(cap(rawContent) - buf.BufferSize) assert.Int(buffer.Len()).Equals(cap(rawContent) - buf.Size)
b2, err := reader.Read() b2, err := reader.Read()
assert.Error(err).IsNil() assert.Error(err).IsNil()

View File

@ -13,8 +13,8 @@ import (
func TestAdaptiveWriter(t *testing.T) { func TestAdaptiveWriter(t *testing.T) {
assert := assert.On(t) assert := assert.On(t)
lb := buf.NewBuffer() lb := buf.New()
lb.FillFrom(rand.Reader) lb.AppendSupplier(buf.ReadFrom(rand.Reader))
expectedBytes := append([]byte(nil), lb.Bytes()...) expectedBytes := append([]byte(nil), lb.Bytes()...)

View File

@ -6,9 +6,9 @@ import (
"v2ray.com/core/common/buf" "v2ray.com/core/common/buf"
) )
func WriteHash(h hash.Hash) buf.BytesWriter { func WriteHash(h hash.Hash) buf.Supplier {
return func(b []byte) int { return func(b []byte) (int, error) {
h.Sum(b[:0]) h.Sum(b[:0])
return h.Size() return h.Size(), nil
} }
} }

View File

@ -14,10 +14,10 @@ func Uint16ToString(value uint16) string {
return strconv.Itoa(int(value)) return strconv.Itoa(int(value))
} }
func WriteUint16(value uint16) buf.BytesWriter { func WriteUint16(value uint16) buf.Supplier {
return func(b []byte) int { return func(b []byte) (int, error) {
b = Uint16ToBytes(value, b[:0]) b = Uint16ToBytes(value, b[:0])
return 2 return 2, nil
} }
} }
@ -29,10 +29,10 @@ func Uint32ToString(value uint32) string {
return strconv.FormatUint(uint64(value), 10) return strconv.FormatUint(uint64(value), 10)
} }
func WriteUint32(value uint32) buf.BytesWriter { func WriteUint32(value uint32) buf.Supplier {
return func(b []byte) int { return func(b []byte) (int, error) {
b = Uint32ToBytes(value, b[:0]) b = Uint32ToBytes(value, b[:0])
return 4 return 4, nil
} }
} }

View File

@ -36,9 +36,8 @@ func Concat(v ...interface{}) string {
return strings.Join(values, "") return strings.Join(values, "")
} }
func WriteString(s string) buf.BytesWriter { func WriteString(s string) buf.Supplier {
return func(b []byte) int { return func(b []byte) (int, error) {
copy(b, []byte(s)) return copy(b, []byte(s)), nil
return len(s)
} }
} }

View File

@ -32,8 +32,8 @@ func (v *NoneResponse) AsAny() *any.Any {
} }
func (v *HTTPResponse) WriteTo(writer v2io.Writer) { func (v *HTTPResponse) WriteTo(writer v2io.Writer) {
b := buf.NewLocalBuffer(512) b := buf.NewLocal(512)
b.AppendFunc(serial.WriteString(http403response)) b.AppendSupplier(serial.WriteString(http403response))
writer.Write(b) writer.Write(b)
} }

View File

@ -14,7 +14,7 @@ import (
func TestHTTPResponse(t *testing.T) { func TestHTTPResponse(t *testing.T) {
assert := assert.On(t) assert := assert.On(t)
buffer := buf.NewBuffer() buffer := buf.New()
httpResponse := new(HTTPResponse) httpResponse := new(HTTPResponse)
httpResponse.WriteTo(v2io.NewAdaptiveWriter(buffer)) httpResponse.WriteTo(v2io.NewAdaptiveWriter(buffer))

View File

@ -47,7 +47,7 @@ func TestSinglePacket(t *testing.T) {
traffic := ray.NewRay() traffic := ray.NewRay()
data2Send := "Data to be sent to remote" data2Send := "Data to be sent to remote"
payload := buf.NewLocalBuffer(2048) payload := buf.NewLocal(2048)
payload.Append([]byte(data2Send)) payload.Append([]byte(data2Send))
go freedom.Dispatch(v2net.TCPDestination(v2net.LocalHostIP, tcpServer.Port), payload, traffic) go freedom.Dispatch(v2net.TCPDestination(v2net.LocalHostIP, tcpServer.Port), payload, traffic)

View File

@ -27,13 +27,12 @@ func NewAuthenticator(keygen KeyGenerator) *Authenticator {
} }
} }
func (v *Authenticator) Authenticate(data []byte) buf.BytesWriter { func (v *Authenticator) Authenticate(data []byte) buf.Supplier {
hasher := hmac.New(sha1.New, v.key()) hasher := hmac.New(sha1.New, v.key())
hasher.Write(data) hasher.Write(data)
res := hasher.Sum(nil) res := hasher.Sum(nil)
return func(b []byte) int { return func(b []byte) (int, error) {
copy(b, res[:AuthSize]) return copy(b, res[:AuthSize]), nil
return AuthSize
} }
} }
@ -75,22 +74,22 @@ func (v *ChunkReader) Release() {
} }
func (v *ChunkReader) Read() (*buf.Buffer, error) { func (v *ChunkReader) Read() (*buf.Buffer, error) {
buffer := buf.NewBuffer() buffer := buf.New()
if _, err := buffer.FillFullFrom(v.reader, 2); err != nil { if err := buffer.AppendSupplier(buf.ReadFullFrom(v.reader, 2)); err != nil {
buffer.Release() buffer.Release()
return nil, err return nil, err
} }
// There is a potential buffer overflow here. Large buffer is 64K bytes, // There is a potential buffer overflow here. Large buffer is 64K bytes,
// while uin16 + 10 will be more than that // while uin16 + 10 will be more than that
length := serial.BytesToUint16(buffer.BytesTo(2)) + AuthSize length := serial.BytesToUint16(buffer.BytesTo(2)) + AuthSize
if length > buf.BufferSize { if length > buf.Size {
// Theoretically the size of a chunk is 64K, but most Shadowsocks implementations used <4K buffer. // Theoretically the size of a chunk is 64K, but most Shadowsocks implementations used <4K buffer.
buffer.Release() buffer.Release()
buffer = buf.NewLocalBuffer(int(length) + 128) buffer = buf.NewLocal(int(length) + 128)
} }
buffer.Clear() buffer.Clear()
if _, err := buffer.FillFullFrom(v.reader, int(length)); err != nil { if err := buffer.AppendSupplier(buf.ReadFullFrom(v.reader, int(length))); err != nil {
buffer.Release() buffer.Release()
return nil, err return nil, err
} }

View File

@ -11,7 +11,7 @@ import (
func TestNormalChunkReading(t *testing.T) { func TestNormalChunkReading(t *testing.T) {
assert := assert.On(t) assert := assert.On(t)
buffer := buf.NewBuffer() buffer := buf.New()
buffer.AppendBytes( buffer.AppendBytes(
0, 8, 39, 228, 69, 96, 133, 39, 254, 26, 201, 70, 11, 12, 13, 14, 15, 16, 17, 18) 0, 8, 39, 228, 69, 96, 133, 39, 254, 26, 201, 70, 11, 12, 13, 14, 15, 16, 17, 18)
reader := NewChunkReader(buffer, NewAuthenticator(ChunkKeyGenerator( reader := NewChunkReader(buffer, NewAuthenticator(ChunkKeyGenerator(
@ -24,11 +24,11 @@ func TestNormalChunkReading(t *testing.T) {
func TestNormalChunkWriting(t *testing.T) { func TestNormalChunkWriting(t *testing.T) {
assert := assert.On(t) assert := assert.On(t)
buffer := buf.NewLocalBuffer(512) buffer := buf.NewLocal(512)
writer := NewChunkWriter(buffer, NewAuthenticator(ChunkKeyGenerator( writer := NewChunkWriter(buffer, NewAuthenticator(ChunkKeyGenerator(
[]byte{21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36}))) []byte{21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36})))
b := buf.NewLocalBuffer(256) b := buf.NewLocal(256)
b.Append([]byte{11, 12, 13, 14, 15, 16, 17, 18}) b.Append([]byte{11, 12, 13, 14, 15, 16, 17, 18})
err := writer.Write(b) err := writer.Write(b)
assert.Error(err).IsNil() assert.Error(err).IsNil()

View File

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"crypto/rand" "crypto/rand"
"io" "io"
"v2ray.com/core/common/buf" "v2ray.com/core/common/buf"
"v2ray.com/core/common/crypto" "v2ray.com/core/common/crypto"
"v2ray.com/core/common/errors" "v2ray.com/core/common/errors"
@ -29,11 +30,11 @@ func ReadTCPSession(user *protocol.User, reader io.Reader) (*protocol.RequestHea
} }
account := rawAccount.(*ShadowsocksAccount) account := rawAccount.(*ShadowsocksAccount)
buffer := buf.NewLocalBuffer(512) buffer := buf.NewLocal(512)
defer buffer.Release() defer buffer.Release()
ivLen := account.Cipher.IVSize() ivLen := account.Cipher.IVSize()
_, err = buffer.FillFullFrom(reader, ivLen) err = buffer.AppendSupplier(buf.ReadFullFrom(reader, ivLen))
if err != nil { if err != nil {
return nil, nil, errors.Base(err).Message("Shadowsocks|TCP: Failed to read IV.") return nil, nil, errors.Base(err).Message("Shadowsocks|TCP: Failed to read IV.")
} }
@ -54,7 +55,7 @@ func ReadTCPSession(user *protocol.User, reader io.Reader) (*protocol.RequestHea
} }
buffer.Clear() buffer.Clear()
_, err = buffer.FillFullFrom(reader, 1) err = buffer.AppendSupplier(buf.ReadFullFrom(reader, 1))
if err != nil { if err != nil {
return nil, nil, errors.Base(err).Message("Shadowsocks|TCP: Failed to read address type.") return nil, nil, errors.Base(err).Message("Shadowsocks|TCP: Failed to read address type.")
} }
@ -74,24 +75,24 @@ func ReadTCPSession(user *protocol.User, reader io.Reader) (*protocol.RequestHea
switch addrType { switch addrType {
case AddrTypeIPv4: case AddrTypeIPv4:
_, err := buffer.FillFullFrom(reader, 4) err := buffer.AppendSupplier(buf.ReadFullFrom(reader, 4))
if err != nil { if err != nil {
return nil, nil, errors.Base(err).Message("Shadowsocks|TCP: Failed to read IPv4 address.") return nil, nil, errors.Base(err).Message("Shadowsocks|TCP: Failed to read IPv4 address.")
} }
request.Address = v2net.IPAddress(buffer.BytesFrom(-4)) request.Address = v2net.IPAddress(buffer.BytesFrom(-4))
case AddrTypeIPv6: case AddrTypeIPv6:
_, err := buffer.FillFullFrom(reader, 16) err := buffer.AppendSupplier(buf.ReadFullFrom(reader, 16))
if err != nil { if err != nil {
return nil, nil, errors.Base(err).Message("Shadowsocks|TCP: Failed to read IPv6 address.") return nil, nil, errors.Base(err).Message("Shadowsocks|TCP: Failed to read IPv6 address.")
} }
request.Address = v2net.IPAddress(buffer.BytesFrom(-16)) request.Address = v2net.IPAddress(buffer.BytesFrom(-16))
case AddrTypeDomain: case AddrTypeDomain:
_, err := buffer.FillFullFrom(reader, 1) err := buffer.AppendSupplier(buf.ReadFullFrom(reader, 1))
if err != nil { if err != nil {
return nil, nil, errors.Base(err).Message("Shadowsocks|TCP: Failed to read domain lenth.") return nil, nil, errors.Base(err).Message("Shadowsocks|TCP: Failed to read domain lenth.")
} }
domainLength := int(buffer.BytesFrom(-1)[0]) domainLength := int(buffer.BytesFrom(-1)[0])
_, err = buffer.FillFullFrom(reader, domainLength) err = buffer.AppendSupplier(buf.ReadFullFrom(reader, domainLength))
if err != nil { if err != nil {
return nil, nil, errors.Base(err).Message("Shadowsocks|TCP: Failed to read domain.") return nil, nil, errors.Base(err).Message("Shadowsocks|TCP: Failed to read domain.")
} }
@ -100,7 +101,7 @@ func ReadTCPSession(user *protocol.User, reader io.Reader) (*protocol.RequestHea
return nil, nil, errors.New("Shadowsocks|TCP: Unknown address type: ", addrType) return nil, nil, errors.New("Shadowsocks|TCP: Unknown address type: ", addrType)
} }
_, err = buffer.FillFullFrom(reader, 2) err = buffer.AppendSupplier(buf.ReadFullFrom(reader, 2))
if err != nil { if err != nil {
return nil, nil, errors.Base(err).Message("Shadowsocks|TCP: Failed to read port.") return nil, nil, errors.Base(err).Message("Shadowsocks|TCP: Failed to read port.")
} }
@ -110,7 +111,7 @@ func ReadTCPSession(user *protocol.User, reader io.Reader) (*protocol.RequestHea
actualAuth := make([]byte, AuthSize) actualAuth := make([]byte, AuthSize)
authenticator.Authenticate(buffer.Bytes())(actualAuth) authenticator.Authenticate(buffer.Bytes())(actualAuth)
_, err := buffer.FillFullFrom(reader, AuthSize) err := buffer.AppendSupplier(buf.ReadFullFrom(reader, AuthSize))
if err != nil { if err != nil {
return nil, nil, errors.Base(err).Message("Shadowsocks|TCP: Failed to read OTA.") return nil, nil, errors.Base(err).Message("Shadowsocks|TCP: Failed to read OTA.")
} }
@ -152,7 +153,7 @@ func WriteTCPRequest(request *protocol.RequestHeader, writer io.Writer) (v2io.Wr
writer = crypto.NewCryptionWriter(stream, writer) writer = crypto.NewCryptionWriter(stream, writer)
header := buf.NewLocalBuffer(512) header := buf.NewLocal(512)
switch request.Address.Family() { switch request.Address.Family() {
case v2net.AddressFamilyIPv4: case v2net.AddressFamilyIPv4:
@ -168,13 +169,13 @@ func WriteTCPRequest(request *protocol.RequestHeader, writer io.Writer) (v2io.Wr
return nil, errors.New("Shadowsocks|TCP: Unsupported address type: ", request.Address.Family()) return nil, errors.New("Shadowsocks|TCP: Unsupported address type: ", request.Address.Family())
} }
header.AppendFunc(serial.WriteUint16(uint16(request.Port))) header.AppendSupplier(serial.WriteUint16(uint16(request.Port)))
if request.Option.Has(RequestOptionOneTimeAuth) { if request.Option.Has(RequestOptionOneTimeAuth) {
header.SetByte(0, header.Byte(0)|0x10) header.SetByte(0, header.Byte(0)|0x10)
authenticator := NewAuthenticator(HeaderKeyGenerator(account.Key, iv)) authenticator := NewAuthenticator(HeaderKeyGenerator(account.Key, iv))
header.AppendFunc(authenticator.Authenticate(header.Bytes())) header.AppendSupplier(authenticator.Authenticate(header.Bytes()))
} }
_, err = writer.Write(header.Bytes()) _, err = writer.Write(header.Bytes())
@ -243,9 +244,9 @@ func EncodeUDPPacket(request *protocol.RequestHeader, payload *buf.Buffer) (*buf
} }
account := rawAccount.(*ShadowsocksAccount) account := rawAccount.(*ShadowsocksAccount)
buffer := buf.NewSmallBuffer() buffer := buf.NewSmall()
ivLen := account.Cipher.IVSize() ivLen := account.Cipher.IVSize()
buffer.FillFullFrom(rand.Reader, ivLen) buffer.AppendSupplier(buf.ReadFullFrom(rand.Reader, ivLen))
iv := buffer.Bytes() iv := buffer.Bytes()
switch request.Address.Family() { switch request.Address.Family() {
@ -262,14 +263,14 @@ func EncodeUDPPacket(request *protocol.RequestHeader, payload *buf.Buffer) (*buf
return nil, errors.New("Shadowsocks|UDP: Unsupported address type: ", request.Address.Family()) return nil, errors.New("Shadowsocks|UDP: Unsupported address type: ", request.Address.Family())
} }
buffer.AppendFunc(serial.WriteUint16(uint16(request.Port))) buffer.AppendSupplier(serial.WriteUint16(uint16(request.Port)))
buffer.Append(payload.Bytes()) buffer.Append(payload.Bytes())
if request.Option.Has(RequestOptionOneTimeAuth) { if request.Option.Has(RequestOptionOneTimeAuth) {
authenticator := NewAuthenticator(HeaderKeyGenerator(account.Key, iv)) authenticator := NewAuthenticator(HeaderKeyGenerator(account.Key, iv))
buffer.SetByte(ivLen, buffer.Byte(ivLen)|0x10) buffer.SetByte(ivLen, buffer.Byte(ivLen)|0x10)
buffer.AppendFunc(authenticator.Authenticate(buffer.BytesFrom(ivLen))) buffer.AppendSupplier(authenticator.Authenticate(buffer.BytesFrom(ivLen)))
} }
stream, err := account.Cipher.NewEncodingStream(account.Key, iv) stream, err := account.Cipher.NewEncodingStream(account.Key, iv)
@ -360,8 +361,8 @@ type UDPReader struct {
} }
func (v *UDPReader) Read() (*buf.Buffer, error) { func (v *UDPReader) Read() (*buf.Buffer, error) {
buffer := buf.NewSmallBuffer() buffer := buf.NewSmall()
_, err := buffer.FillFrom(v.Reader) err := buffer.AppendSupplier(buf.ReadFrom(v.Reader))
if err != nil { if err != nil {
buffer.Release() buffer.Release()
return nil, err return nil, err

View File

@ -30,8 +30,8 @@ func TestUDPEncoding(t *testing.T) {
}, },
} }
data := buf.NewLocalBuffer(256) data := buf.NewLocal(256)
data.AppendFunc(serial.WriteString("test string")) data.AppendSupplier(serial.WriteString("test string"))
encodedData, err := EncodeUDPPacket(request, data) encodedData, err := EncodeUDPPacket(request, data)
assert.Error(err).IsNil() assert.Error(err).IsNil()
@ -60,9 +60,9 @@ func TestTCPRequest(t *testing.T) {
}, },
} }
data := buf.NewLocalBuffer(256) data := buf.NewLocal(256)
data.AppendFunc(serial.WriteString("test string")) data.AppendSupplier(serial.WriteString("test string"))
cache := buf.NewBuffer() cache := buf.New()
writer, err := WriteTCPRequest(request, cache) writer, err := WriteTCPRequest(request, cache)
assert.Error(err).IsNil() assert.Error(err).IsNil()
@ -88,7 +88,7 @@ func TestUDPReaderWriter(t *testing.T) {
CipherType: CipherType_CHACHA20_IEFT, CipherType: CipherType_CHACHA20_IEFT,
}), }),
} }
cache := buf.NewBuffer() cache := buf.New()
writer := &UDPWriter{ writer := &UDPWriter{
Writer: cache, Writer: cache,
Request: &protocol.RequestHeader{ Request: &protocol.RequestHeader{
@ -105,8 +105,8 @@ func TestUDPReaderWriter(t *testing.T) {
User: user, User: user,
} }
b := buf.NewBuffer() b := buf.New()
b.AppendFunc(serial.WriteString("test payload")) b.AppendSupplier(serial.WriteString("test payload"))
err := writer.Write(b) err := writer.Write(b)
assert.Error(err).IsNil() assert.Error(err).IsNil()
@ -114,8 +114,8 @@ func TestUDPReaderWriter(t *testing.T) {
assert.Error(err).IsNil() assert.Error(err).IsNil()
assert.String(payload.String()).Equals("test payload") assert.String(payload.String()).Equals("test payload")
b = buf.NewBuffer() b = buf.New()
b.AppendFunc(serial.WriteString("test payload 2")) b.AppendSupplier(serial.WriteString("test payload 2"))
err = writer.Write(b) err = writer.Write(b)
assert.Error(err).IsNil() assert.Error(err).IsNil()

View File

@ -3,12 +3,13 @@ package protocol
import ( import (
"fmt" "fmt"
"io" "io"
"v2ray.com/core/common/buf" "v2ray.com/core/common/buf"
"v2ray.com/core/common/crypto"
"v2ray.com/core/common/errors" "v2ray.com/core/common/errors"
"v2ray.com/core/common/log" "v2ray.com/core/common/log"
v2net "v2ray.com/core/common/net" v2net "v2ray.com/core/common/net"
"v2ray.com/core/proxy" "v2ray.com/core/proxy"
"v2ray.com/core/common/crypto"
) )
const ( const (
@ -120,10 +121,10 @@ func (request Socks5UserPassRequest) AuthDetail() string {
} }
func ReadUserPassRequest(reader io.Reader) (request Socks5UserPassRequest, err error) { func ReadUserPassRequest(reader io.Reader) (request Socks5UserPassRequest, err error) {
buffer := buf.NewLocalBuffer(512) buffer := buf.NewLocal(512)
defer buffer.Release() defer buffer.Release()
_, err = buffer.FillFullFrom(reader, 2) err = buffer.AppendSupplier(buf.ReadFullFrom(reader, 2))
if err != nil { if err != nil {
return return
} }
@ -131,18 +132,18 @@ func ReadUserPassRequest(reader io.Reader) (request Socks5UserPassRequest, err e
nUsername := int(buffer.Byte(1)) nUsername := int(buffer.Byte(1))
buffer.Clear() buffer.Clear()
_, err = buffer.FillFullFrom(reader, nUsername) err = buffer.AppendSupplier(buf.ReadFullFrom(reader, nUsername))
if err != nil { if err != nil {
return return
} }
request.username = string(buffer.Bytes()) request.username = string(buffer.Bytes())
_, err = buffer.FillFullFrom(reader, 1) err = buffer.AppendSupplier(buf.ReadFullFrom(reader, 1))
if err != nil { if err != nil {
return return
} }
nPassword := int(buffer.Byte(0)) nPassword := int(buffer.Byte(0))
_, err = buffer.FillFullFrom(reader, nPassword) err = buffer.AppendSupplier(buf.ReadFullFrom(reader, nPassword))
if err != nil { if err != nil {
return return
} }
@ -188,10 +189,10 @@ type Socks5Request struct {
} }
func ReadRequest(reader io.Reader) (request *Socks5Request, err error) { func ReadRequest(reader io.Reader) (request *Socks5Request, err error) {
buffer := buf.NewLocalBuffer(512) buffer := buf.NewLocal(512)
defer buffer.Release() defer buffer.Release()
_, err = buffer.FillFullFrom(reader, 4) err = buffer.AppendSupplier(buf.ReadFullFrom(reader, 4))
if err != nil { if err != nil {
return return
} }
@ -210,12 +211,12 @@ func ReadRequest(reader io.Reader) (request *Socks5Request, err error) {
} }
case AddrTypeDomain: case AddrTypeDomain:
buffer.Clear() buffer.Clear()
_, err = buffer.FillFullFrom(reader, 1) err = buffer.AppendSupplier(buf.ReadFullFrom(reader, 1))
if err != nil { if err != nil {
return return
} }
domainLength := int(buffer.Byte(0)) domainLength := int(buffer.Byte(0))
_, err = buffer.FillFullFrom(reader, domainLength) err = buffer.AppendSupplier(buf.ReadFullFrom(reader, domainLength))
if err != nil { if err != nil {
return return
} }
@ -231,7 +232,7 @@ func ReadRequest(reader io.Reader) (request *Socks5Request, err error) {
return return
} }
_, err = buffer.FillFullFrom(reader, 2) err = buffer.AppendSupplier(buf.ReadFullFrom(reader, 2))
if err != nil { if err != nil {
return return
} }

View File

@ -31,7 +31,7 @@ func TestSocks4AuthenticationResponseToBytes(t *testing.T) {
response := NewSocks4AuthenticationResponse(byte(0x10), 443, []byte{1, 2, 3, 4}) response := NewSocks4AuthenticationResponse(byte(0x10), 443, []byte{1, 2, 3, 4})
buffer := buf.NewLocalBuffer(2048) buffer := buf.NewLocal(2048)
defer buffer.Release() defer buffer.Release()
response.Write(buffer) response.Write(buffer)

View File

@ -6,10 +6,10 @@ import (
"testing" "testing"
"v2ray.com/core/common/buf" "v2ray.com/core/common/buf"
"v2ray.com/core/common/crypto"
v2net "v2ray.com/core/common/net" v2net "v2ray.com/core/common/net"
"v2ray.com/core/proxy" "v2ray.com/core/proxy"
"v2ray.com/core/testing/assert" "v2ray.com/core/testing/assert"
"v2ray.com/core/common/crypto"
) )
func TestHasAuthenticationMethod(t *testing.T) { func TestHasAuthenticationMethod(t *testing.T) {
@ -30,7 +30,7 @@ func TestHasAuthenticationMethod(t *testing.T) {
func TestAuthenticationRequestRead(t *testing.T) { func TestAuthenticationRequestRead(t *testing.T) {
assert := assert.On(t) assert := assert.On(t)
buffer := buf.NewBuffer() buffer := buf.New()
buffer.AppendBytes( buffer.AppendBytes(
0x05, // version 0x05, // version
0x01, // nMethods 0x01, // nMethods
@ -85,7 +85,7 @@ func TestResponseWrite(t *testing.T) {
[16]byte{}, [16]byte{},
v2net.Port(53), v2net.Port(53),
} }
buffer := buf.NewLocalBuffer(2048) buffer := buf.NewLocal(2048)
defer buffer.Release() defer buffer.Release()
response.Write(buffer) response.Write(buffer)
@ -106,7 +106,7 @@ func TestSetIPv6(t *testing.T) {
response := NewSocks5Response() response := NewSocks5Response()
response.SetIPv6([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}) response.SetIPv6([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15})
buffer := buf.NewLocalBuffer(2048) buffer := buf.NewLocal(2048)
defer buffer.Release() defer buffer.Release()
response.Write(buffer) response.Write(buffer)
assert.Bytes(buffer.Bytes()).Equals([]byte{ assert.Bytes(buffer.Bytes()).Equals([]byte{
@ -119,7 +119,7 @@ func TestSetDomain(t *testing.T) {
response := NewSocks5Response() response := NewSocks5Response()
response.SetDomain("v2ray.com") response.SetDomain("v2ray.com")
buffer := buf.NewLocalBuffer(2048) buffer := buf.NewLocal(2048)
defer buffer.Release() defer buffer.Release()
response.Write(buffer) response.Write(buffer)
assert.Bytes(buffer.Bytes()).Equals([]byte{ assert.Bytes(buffer.Bytes()).Equals([]byte{
@ -129,7 +129,7 @@ func TestSetDomain(t *testing.T) {
func TestEmptyAuthRequest(t *testing.T) { func TestEmptyAuthRequest(t *testing.T) {
assert := assert.On(t) assert := assert.On(t)
_, _, err := ReadAuthentication(buf.NewBuffer()) _, _, err := ReadAuthentication(buf.New())
assert.Error(err).Equals(io.EOF) assert.Error(err).Equals(io.EOF)
} }
@ -143,7 +143,7 @@ func TestSingleByteAuthRequest(t *testing.T) {
func TestZeroAuthenticationMethod(t *testing.T) { func TestZeroAuthenticationMethod(t *testing.T) {
assert := assert.On(t) assert := assert.On(t)
buffer := buf.NewBuffer() buffer := buf.New()
buffer.AppendBytes(5, 0) buffer.AppendBytes(5, 0)
_, _, err := ReadAuthentication(buffer) _, _, err := ReadAuthentication(buffer)
assert.Error(err).Equals(crypto.ErrAuthenticationFailed) assert.Error(err).Equals(crypto.ErrAuthenticationFailed)
@ -151,7 +151,7 @@ func TestZeroAuthenticationMethod(t *testing.T) {
func TestWrongProtocolVersion(t *testing.T) { func TestWrongProtocolVersion(t *testing.T) {
assert := assert.On(t) assert := assert.On(t)
buffer := buf.NewBuffer() buffer := buf.New()
buffer.AppendBytes(6, 1, 0) buffer.AppendBytes(6, 1, 0)
_, _, err := ReadAuthentication(buffer) _, _, err := ReadAuthentication(buffer)
assert.Error(err).Equals(proxy.ErrInvalidProtocolVersion) assert.Error(err).Equals(proxy.ErrInvalidProtocolVersion)
@ -160,14 +160,14 @@ func TestWrongProtocolVersion(t *testing.T) {
func TestEmptyRequest(t *testing.T) { func TestEmptyRequest(t *testing.T) {
assert := assert.On(t) assert := assert.On(t)
_, err := ReadRequest(buf.NewBuffer()) _, err := ReadRequest(buf.New())
assert.Error(err).Equals(io.EOF) assert.Error(err).Equals(io.EOF)
} }
func TestIPv6Request(t *testing.T) { func TestIPv6Request(t *testing.T) {
assert := assert.On(t) assert := assert.On(t)
b := buf.NewBuffer() b := buf.New()
b.AppendBytes(5, 1, 0, 4, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 0, 8) b.AppendBytes(5, 1, 0, 4, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 0, 8)
request, err := ReadRequest(b) request, err := ReadRequest(b)
assert.Error(err).IsNil() assert.Error(err).IsNil()

View File

@ -36,7 +36,7 @@ func (request *Socks5UDPRequest) Write(buffer *buf.Buffer) {
buffer.AppendBytes(AddrTypeDomain, byte(len(request.Address.Domain()))) buffer.AppendBytes(AddrTypeDomain, byte(len(request.Address.Domain())))
buffer.Append([]byte(request.Address.Domain())) buffer.Append([]byte(request.Address.Domain()))
} }
buffer.AppendFunc(serial.WriteUint16(request.Port.Value())) buffer.AppendSupplier(serial.WriteUint16(request.Port.Value()))
buffer.Append(request.Data.Bytes()) buffer.Append(request.Data.Bytes())
} }
@ -83,7 +83,7 @@ func ReadUDPRequest(packet []byte) (*Socks5UDPRequest, error) {
} }
if len(packet) > dataBegin { if len(packet) > dataBegin {
b := buf.NewSmallBuffer() b := buf.NewSmall()
b.Append(packet[dataBegin:]) b.Append(packet[dataBegin:])
request.Data = b request.Data = b
} }

View File

@ -55,7 +55,7 @@ func (v *Server) handleUDPPayload(payload *buf.Buffer, session *proxy.SessionInf
} }
log.Info("Socks: Writing back UDP response with ", payload.Len(), " bytes to ", destination) log.Info("Socks: Writing back UDP response with ", payload.Len(), " bytes to ", destination)
udpMessage := buf.NewLocalBuffer(2048) udpMessage := buf.NewLocal(2048)
response.Write(udpMessage) response.Write(udpMessage)
v.udpMutex.RLock() v.udpMutex.RLock()

View File

@ -32,7 +32,7 @@ func MarshalCommand(command interface{}, writer io.Writer) error {
return ErrUnknownCommand return ErrUnknownCommand
} }
buffer := buf.NewLocalBuffer(512) buffer := buf.NewLocal(512)
defer buffer.Release() defer buffer.Release()
err := factory.Marshal(command, buffer) err := factory.Marshal(command, buffer)

View File

@ -21,7 +21,7 @@ func TestSwitchAccount(t *testing.T) {
ValidMin: 16, ValidMin: 16,
} }
buffer := buf.NewBuffer() buffer := buf.New()
err := MarshalCommand(sa, buffer) err := MarshalCommand(sa, buffer)
assert.Error(err).IsNil() assert.Error(err).IsNil()

View File

@ -36,7 +36,7 @@ func TestRequestSerialization(t *testing.T) {
Security: protocol.Security(protocol.SecurityType_AES128_GCM), Security: protocol.Security(protocol.SecurityType_AES128_GCM),
} }
buffer := buf.NewBuffer() buffer := buf.New()
client := NewClientSession(protocol.DefaultIDHash) client := NewClientSession(protocol.DefaultIDHash)
client.EncodeRequestHeader(expectedRequest, buffer) client.EncodeRequestHeader(expectedRequest, buffer)

View File

@ -225,7 +225,7 @@ func (v *VMessInboundHandler) HandleConnection(connection internet.Connection) {
} }
output.Release() output.Release()
if request.Option.Has(protocol.RequestOptionChunkStream) { if request.Option.Has(protocol.RequestOptionChunkStream) {
if err := bodyWriter.Write(buf.NewLocalBuffer(8)); err != nil { if err := bodyWriter.Write(buf.NewLocal(8)); err != nil {
connection.SetReusable(false) connection.SetReusable(false)
} }
} }

View File

@ -116,7 +116,7 @@ func (v *VMessOutboundHandler) handleRequest(session *encoding.ClientSession, co
} }
if request.Option.Has(protocol.RequestOptionChunkStream) { if request.Option.Has(protocol.RequestOptionChunkStream) {
err := bodyWriter.Write(buf.NewLocalBuffer(8)) err := bodyWriter.Write(buf.NewLocal(8))
if err != nil { if err != nil {
conn.SetReusable(false) conn.SetReusable(false)
} }

View File

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"net" "net"
"testing" "testing"
"v2ray.com/core/common/buf" "v2ray.com/core/common/buf"
v2net "v2ray.com/core/common/net" v2net "v2ray.com/core/common/net"
"v2ray.com/core/testing/assert" "v2ray.com/core/testing/assert"
@ -42,11 +43,11 @@ func TestShadowsocksTCP(t *testing.T) {
//conn.CloseWrite() //conn.CloseWrite()
response := buf.NewBuffer() response := buf.New()
finished := false finished := false
expectedResponse := "Processed: " + payload expectedResponse := "Processed: " + payload
for { for {
_, err := response.FillFrom(conn) err := response.AppendSupplier(buf.ReadFrom(conn))
assert.Error(err).IsNil() assert.Error(err).IsNil()
if err != nil { if err != nil {
break break

View File

@ -1,70 +0,0 @@
package internet
import (
"v2ray.com/core/common"
"v2ray.com/core/common/buf"
)
type Authenticator interface {
Seal(*buf.Buffer)
Open(*buf.Buffer) bool
Overhead() int
}
type AuthenticatorFactory interface {
Create(interface{}) Authenticator
}
var (
authenticatorCache = make(map[string]AuthenticatorFactory)
)
func RegisterAuthenticator(name string, factory AuthenticatorFactory) error {
if _, found := authenticatorCache[name]; found {
return common.ErrDuplicatedName
}
authenticatorCache[name] = factory
return nil
}
func CreateAuthenticator(name string, config interface{}) (Authenticator, error) {
factory, found := authenticatorCache[name]
if !found {
return nil, common.ErrObjectNotFound
}
return factory.Create(config), nil
}
type AuthenticatorChain struct {
authenticators []Authenticator
}
func NewAuthenticatorChain(auths ...Authenticator) Authenticator {
return &AuthenticatorChain{
authenticators: auths,
}
}
func (v *AuthenticatorChain) Overhead() int {
total := 0
for _, auth := range v.authenticators {
total += auth.Overhead()
}
return total
}
func (v *AuthenticatorChain) Open(payload *buf.Buffer) bool {
for _, auth := range v.authenticators {
if !auth.Open(payload) {
return false
}
}
return true
}
func (v *AuthenticatorChain) Seal(payload *buf.Buffer) {
for i := len(v.authenticators) - 1; i >= 0; i-- {
auth := v.authenticators[i]
auth.Seal(payload)
}
}

View File

@ -4,7 +4,7 @@ import "v2ray.com/core/common"
type PacketHeader interface { type PacketHeader interface {
Size() int Size() int
Write([]byte) int Write([]byte) (int, error)
} }
type PacketHeaderFactory interface { type PacketHeaderFactory interface {

View File

@ -7,6 +7,7 @@ import (
"net/http" "net/http"
"strings" "strings"
"time" "time"
"v2ray.com/core/common/buf" "v2ray.com/core/common/buf"
"v2ray.com/core/common/loader" "v2ray.com/core/common/loader"
"v2ray.com/core/common/serial" "v2ray.com/core/common/serial"
@ -46,9 +47,9 @@ type HeaderReader struct {
} }
func (*HeaderReader) Read(reader io.Reader) (*buf.Buffer, error) { func (*HeaderReader) Read(reader io.Reader) (*buf.Buffer, error) {
buffer := buf.NewSmallBuffer() buffer := buf.NewSmall()
for { for {
_, err := buffer.FillFrom(reader) err := buffer.AppendSupplier(buf.ReadFrom(reader))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -143,39 +144,39 @@ type HttpAuthenticator struct {
} }
func (v HttpAuthenticator) GetClientWriter() *HeaderWriter { func (v HttpAuthenticator) GetClientWriter() *HeaderWriter {
header := buf.NewSmallBuffer() header := buf.NewSmall()
config := v.config.Request config := v.config.Request
header.AppendFunc(serial.WriteString(strings.Join([]string{config.Method.GetValue(), config.PickUri(), config.GetFullVersion()}, " "))) header.AppendSupplier(serial.WriteString(strings.Join([]string{config.Method.GetValue(), config.PickUri(), config.GetFullVersion()}, " ")))
header.AppendFunc(writeCRLF) header.AppendSupplier(writeCRLF)
headers := config.PickHeaders() headers := config.PickHeaders()
for _, h := range headers { for _, h := range headers {
header.AppendFunc(serial.WriteString(h)) header.AppendSupplier(serial.WriteString(h))
header.AppendFunc(writeCRLF) header.AppendSupplier(writeCRLF)
} }
header.AppendFunc(writeCRLF) header.AppendSupplier(writeCRLF)
return &HeaderWriter{ return &HeaderWriter{
header: header, header: header,
} }
} }
func (v HttpAuthenticator) GetServerWriter() *HeaderWriter { func (v HttpAuthenticator) GetServerWriter() *HeaderWriter {
header := buf.NewSmallBuffer() header := buf.NewSmall()
config := v.config.Response config := v.config.Response
header.AppendFunc(serial.WriteString(strings.Join([]string{config.GetFullVersion(), config.Status.GetCode(), config.Status.GetReason()}, " "))) header.AppendSupplier(serial.WriteString(strings.Join([]string{config.GetFullVersion(), config.Status.GetCode(), config.Status.GetReason()}, " ")))
header.AppendFunc(writeCRLF) header.AppendSupplier(writeCRLF)
headers := config.PickHeaders() headers := config.PickHeaders()
for _, h := range headers { for _, h := range headers {
header.AppendFunc(serial.WriteString(h)) header.AppendSupplier(serial.WriteString(h))
header.AppendFunc(writeCRLF) header.AppendSupplier(writeCRLF)
} }
if !config.HasHeader("Date") { if !config.HasHeader("Date") {
header.AppendFunc(serial.WriteString("Date: ")) header.AppendSupplier(serial.WriteString("Date: "))
header.AppendFunc(serial.WriteString(time.Now().Format(http.TimeFormat))) header.AppendSupplier(serial.WriteString(time.Now().Format(http.TimeFormat)))
header.AppendFunc(writeCRLF) header.AppendSupplier(writeCRLF)
} }
header.AppendFunc(writeCRLF) header.AppendSupplier(writeCRLF)
return &HeaderWriter{ return &HeaderWriter{
header: header, header: header,
} }

View File

@ -12,9 +12,9 @@ import (
func TestReaderWriter(t *testing.T) { func TestReaderWriter(t *testing.T) {
assert := assert.On(t) assert := assert.On(t)
cache := buf.NewBuffer() cache := buf.New()
b := buf.NewLocalBuffer(256) b := buf.NewLocal(256)
b.AppendFunc(serial.WriteString("abcd" + ENDING)) b.AppendSupplier(serial.WriteString("abcd" + ENDING))
writer := NewHeaderWriter(b) writer := NewHeaderWriter(b)
writer.Write(cache) writer.Write(cache)
cache.Write([]byte{'e', 'f', 'g'}) cache.Write([]byte{'e', 'f', 'g'})
@ -41,7 +41,7 @@ func TestRequestHeader(t *testing.T) {
}, },
}).(HttpAuthenticator) }).(HttpAuthenticator)
cache := buf.NewBuffer() cache := buf.New()
err := auth.GetClientWriter().Write(cache) err := auth.GetClientWriter().Write(cache)
assert.Error(err).IsNil() assert.Error(err).IsNil()

View File

@ -12,8 +12,8 @@ type NoOpHeader struct{}
func (v NoOpHeader) Size() int { func (v NoOpHeader) Size() int {
return 0 return 0
} }
func (v NoOpHeader) Write([]byte) int { func (v NoOpHeader) Write([]byte) (int, error) {
return 0 return 0, nil
} }
type NoOpHeaderFactory struct{} type NoOpHeaderFactory struct{}

View File

@ -17,11 +17,11 @@ func (v *SRTP) Size() int {
return 4 return 4
} }
func (v *SRTP) Write(b []byte) int { func (v *SRTP) Write(b []byte) (int, error) {
v.number++ v.number++
b = serial.Uint16ToBytes(v.number, b[:0]) b = serial.Uint16ToBytes(v.number, b[:0])
b = serial.Uint16ToBytes(v.number, b) b = serial.Uint16ToBytes(v.number, b)
return 4 return 4, nil
} }
type SRTPFactory struct { type SRTPFactory struct {

View File

@ -14,8 +14,8 @@ func TestSRTPWrite(t *testing.T) {
content := []byte{'a', 'b', 'c', 'd', 'e', 'f', 'g'} content := []byte{'a', 'b', 'c', 'd', 'e', 'f', 'g'}
srtp := SRTP{} srtp := SRTP{}
payload := buf.NewLocalBuffer(2048) payload := buf.NewLocal(2048)
payload.AppendFunc(srtp.Write) payload.AppendSupplier(srtp.Write)
payload.Append(content) payload.Append(content)
assert.Int(payload.Len()).Equals(len(content) + srtp.Size()) assert.Int(payload.Len()).Equals(len(content) + srtp.Size())

View File

@ -18,10 +18,10 @@ func (v *UTP) Size() int {
return 4 return 4
} }
func (v *UTP) Write(b []byte) int { func (v *UTP) Write(b []byte) (int, error) {
b = serial.Uint16ToBytes(v.connectionId, b[:0]) b = serial.Uint16ToBytes(v.connectionId, b[:0])
b = append(b, v.header, v.extension) b = append(b, v.header, v.extension)
return 4 return 4, nil
} }
type UTPFactory struct{} type UTPFactory struct{}

View File

@ -14,8 +14,8 @@ func TestUTPWrite(t *testing.T) {
content := []byte{'a', 'b', 'c', 'd', 'e', 'f', 'g'} content := []byte{'a', 'b', 'c', 'd', 'e', 'f', 'g'}
utp := UTP{} utp := UTP{}
payload := buf.NewLocalBuffer(2048) payload := buf.NewLocal(2048)
payload.AppendFunc(utp.Write) payload.AppendSupplier(utp.Write)
payload.Append(content) payload.Append(content)
assert.Int(payload.Len()).Equals(len(content) + utp.Size()) assert.Int(payload.Len()).Equals(len(content) + utp.Size())

View File

@ -10,7 +10,6 @@ import (
"v2ray.com/core/common/errors" "v2ray.com/core/common/errors"
"v2ray.com/core/common/log" "v2ray.com/core/common/log"
"v2ray.com/core/common/predicate" "v2ray.com/core/common/predicate"
"v2ray.com/core/transport/internet"
"v2ray.com/core/transport/internet/internal" "v2ray.com/core/transport/internet/internal"
) )
@ -170,7 +169,6 @@ type SystemConnection interface {
type Connection struct { type Connection struct {
conn SystemConnection conn SystemConnection
connRecycler internal.ConnectionRecyler connRecycler internal.ConnectionRecyler
block internet.Authenticator
rd time.Time rd time.Time
wd time.Time // write deadline wd time.Time // write deadline
since int64 since int64

View File

@ -88,12 +88,12 @@ func (o *ClientConnection) ResetSecurity(header internet.PacketHeader, security
} }
func (o *ClientConnection) Run() { func (o *ClientConnection) Run() {
payload := buf.NewSmallBuffer() payload := buf.NewSmall()
defer payload.Release() defer payload.Release()
for { for {
payload.Clear() payload.Clear()
_, err := payload.FillFrom(o.Conn) err := payload.AppendSupplier(buf.ReadFrom(o.Conn))
if err != nil { if err != nil {
payload.Release() payload.Release()
return return

View File

@ -69,7 +69,7 @@ func (v *KCPPacketWriter) Write(b []byte) (int, error) {
x := v.buffer[:] x := v.buffer[:]
size := 0 size := 0
if v.Header != nil { if v.Header != nil {
nBytes := v.Header.Write(x) nBytes, _ := v.Header.Write(x)
size += nBytes size += nBytes
x = x[nBytes:] x = x[nBytes:]
} }

View File

@ -22,7 +22,7 @@ func NewSegmentWriter(writer io.Writer, mtu uint32) *BufferedSegmentWriter {
return &BufferedSegmentWriter{ return &BufferedSegmentWriter{
mtu: mtu, mtu: mtu,
writer: writer, writer: writer,
buffer: buf.NewSmallBuffer(), buffer: buf.NewSmall(),
} }
} }
@ -35,7 +35,7 @@ func (v *BufferedSegmentWriter) Write(seg Segment) {
v.FlushWithoutLock() v.FlushWithoutLock()
} }
v.buffer.AppendFunc(seg.Bytes()) v.buffer.AppendSupplier(seg.Bytes())
} }
func (v *BufferedSegmentWriter) FlushWithoutLock() { func (v *BufferedSegmentWriter) FlushWithoutLock() {

View File

@ -31,7 +31,7 @@ type Segment interface {
Conversation() uint16 Conversation() uint16
Command() Command Command() Command
ByteSize() int ByteSize() int
Bytes() buf.BytesWriter Bytes() buf.Supplier
} }
const ( const (
@ -64,14 +64,14 @@ func (v *DataSegment) Command() Command {
func (v *DataSegment) SetData(b []byte) { func (v *DataSegment) SetData(b []byte) {
if v.Data == nil { if v.Data == nil {
v.Data = buf.NewSmallBuffer() v.Data = buf.NewSmall()
} }
v.Data.Clear() v.Data.Clear()
v.Data.Append(b) v.Data.Append(b)
} }
func (v *DataSegment) Bytes() buf.BytesWriter { func (v *DataSegment) Bytes() buf.Supplier {
return func(b []byte) int { return func(b []byte) (int, error) {
b = serial.Uint16ToBytes(v.Conv, b[:0]) b = serial.Uint16ToBytes(v.Conv, b[:0])
b = append(b, byte(CommandData), byte(v.Option)) b = append(b, byte(CommandData), byte(v.Option))
b = serial.Uint32ToBytes(v.Timestamp, b) b = serial.Uint32ToBytes(v.Timestamp, b)
@ -79,7 +79,7 @@ func (v *DataSegment) Bytes() buf.BytesWriter {
b = serial.Uint32ToBytes(v.SendingNext, b) b = serial.Uint32ToBytes(v.SendingNext, b)
b = serial.Uint16ToBytes(uint16(v.Data.Len()), b) b = serial.Uint16ToBytes(uint16(v.Data.Len()), b)
b = append(b, v.Data.Bytes()...) b = append(b, v.Data.Bytes()...)
return v.ByteSize() return v.ByteSize(), nil
} }
} }
@ -137,8 +137,8 @@ func (v *AckSegment) ByteSize() int {
return 2 + 1 + 1 + 4 + 4 + 4 + 1 + int(v.Count)*4 return 2 + 1 + 1 + 4 + 4 + 4 + 1 + int(v.Count)*4
} }
func (v *AckSegment) Bytes() buf.BytesWriter { func (v *AckSegment) Bytes() buf.Supplier {
return func(b []byte) int { return func(b []byte) (int, error) {
b = serial.Uint16ToBytes(v.Conv, b[:0]) b = serial.Uint16ToBytes(v.Conv, b[:0])
b = append(b, byte(CommandACK), byte(v.Option)) b = append(b, byte(CommandACK), byte(v.Option))
b = serial.Uint32ToBytes(v.ReceivingWindow, b) b = serial.Uint32ToBytes(v.ReceivingWindow, b)
@ -148,7 +148,7 @@ func (v *AckSegment) Bytes() buf.BytesWriter {
for i := byte(0); i < v.Count; i++ { for i := byte(0); i < v.Count; i++ {
b = serial.Uint32ToBytes(v.NumberList[i], b) b = serial.Uint32ToBytes(v.NumberList[i], b)
} }
return v.ByteSize() return v.ByteSize(), nil
} }
} }
@ -181,14 +181,14 @@ func (v *CmdOnlySegment) ByteSize() int {
return 2 + 1 + 1 + 4 + 4 + 4 return 2 + 1 + 1 + 4 + 4 + 4
} }
func (v *CmdOnlySegment) Bytes() buf.BytesWriter { func (v *CmdOnlySegment) Bytes() buf.Supplier {
return func(b []byte) int { return func(b []byte) (int, error) {
b = serial.Uint16ToBytes(v.Conv, b[:0]) b = serial.Uint16ToBytes(v.Conv, b[:0])
b = append(b, byte(v.Cmd), byte(v.Option)) b = append(b, byte(v.Cmd), byte(v.Option))
b = serial.Uint32ToBytes(v.SendingNext, b) b = serial.Uint32ToBytes(v.SendingNext, b)
b = serial.Uint32ToBytes(v.ReceivinNext, b) b = serial.Uint32ToBytes(v.ReceivinNext, b)
b = serial.Uint32ToBytes(v.PeerRTO, b) b = serial.Uint32ToBytes(v.PeerRTO, b)
return v.ByteSize() return v.ByteSize(), nil
} }
} }

View File

@ -19,7 +19,7 @@ func TestBadSegment(t *testing.T) {
func TestDataSegment(t *testing.T) { func TestDataSegment(t *testing.T) {
assert := assert.On(t) assert := assert.On(t)
b := buf.NewLocalBuffer(512) b := buf.NewLocal(512)
b.Append([]byte{'a', 'b', 'c', 'd'}) b.Append([]byte{'a', 'b', 'c', 'd'})
seg := &DataSegment{ seg := &DataSegment{
Conv: 1, Conv: 1,

View File

@ -135,16 +135,14 @@ func (v *UDPHub) start() {
oobBytes := make([]byte, 256) oobBytes := make([]byte, 256)
for v.Running() { for v.Running() {
buffer := buf.NewSmallBuffer() buffer := buf.NewSmall()
var noob int var noob int
var addr *net.UDPAddr var addr *net.UDPAddr
var err error err := buffer.AppendSupplier(func(b []byte) (int, error) {
buffer.AppendFunc(func(b []byte) int {
n, nb, _, a, e := ReadUDPMsg(v.conn, b, oobBytes) n, nb, _, a, e := ReadUDPMsg(v.conn, b, oobBytes)
noob = nb noob = nb
addr = a addr = a
err = e return n, e
return n
}) })
if err != nil { if err != nil {