1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2024-12-21 09:36:34 -05:00

simplify buffer extension

This commit is contained in:
Darien Raymond 2018-11-02 21:34:04 +01:00
parent 35ccc3a49c
commit f7b96507f9
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169
31 changed files with 139 additions and 193 deletions

View File

@ -39,9 +39,10 @@ func (r *cachedReader) Cache(b *buf.Buffer) {
if !mb.IsEmpty() {
common.Must(r.cache.WriteMultiBuffer(mb))
}
common.Must(b.Reset(func(x []byte) (int, error) {
return r.cache.Copy(x), nil
}))
b.Clear()
rawBytes := b.Extend(buf.Size)
n := r.cache.Copy(rawBytes)
b.Resize(0, int32(n))
r.Unlock()
}

View File

@ -260,13 +260,13 @@ func (s *ClassicNameServer) buildMsgs(domain string) []*dns.Msg {
func msgToBuffer(msg *dns.Msg) (*buf.Buffer, error) {
buffer := buf.New()
if err := buffer.Reset(func(b []byte) (int, error) {
writtenBuffer, err := msg.PackBuffer(b)
return len(writtenBuffer), err
}); err != nil {
rawBytes := buffer.Extend(buf.Size)
packed, err := msg.PackBuffer(rawBytes)
if err != nil {
buffer.Release()
return nil, err
}
buffer.Resize(0, int32(len(packed)))
return buffer, nil
}

View File

@ -11,9 +11,6 @@ const (
Size = 2048
)
// Supplier is a writer that writes contents into the given buffer.
type Supplier func([]byte) (int, error)
// Buffer is a recyclable allocation of a byte array. Buffer.Release() recycles
// the buffer into an internal buffer pool, in order to recreate a buffer more
// quickly.
@ -40,13 +37,6 @@ func (b *Buffer) Clear() {
b.end = 0
}
// AppendSupplier appends the content of a BytesWriter to the buffer.
func (b *Buffer) AppendSupplier(writer Supplier) error {
nBytes, err := writer(b.v[b.end:])
b.end += int32(nBytes)
return err
}
// Byte returns the bytes at index.
func (b *Buffer) Byte(index int32) byte {
return b.v[b.start+index]
@ -62,15 +52,16 @@ func (b *Buffer) Bytes() []byte {
return b.v[b.start:b.end]
}
// Reset resets the content of the Buffer with a supplier.
func (b *Buffer) Reset(writer Supplier) error {
nBytes, err := writer(b.v)
if nBytes > len(b.v) {
return newError("too many bytes written: ", nBytes, " > ", len(b.v))
// Extend increases the buffer size by n bytes, and returns the extended part.
// It panics if result size is larger than buf.Size.
func (b *Buffer) Extend(n int32) []byte {
end := b.end + n
if end > int32(len(b.v)) {
panic(newError("out of bound: ", end))
}
b.start = 0
b.end = int32(nBytes)
return err
ext := b.v[b.end:end]
b.end = end
return ext
}
// BytesRange returns a slice of this buffer with given from and to boundary.
@ -153,6 +144,11 @@ func (b *Buffer) WriteBytes(bytes ...byte) (int, error) {
return b.Write(bytes)
}
// WriteString implements io.StringWriter.
func (b *Buffer) WriteString(s string) (int, error) {
return b.Write([]byte(s))
}
// Read implements io.Reader.Read().
func (b *Buffer) Read(data []byte) (int, error) {
if b.Len() == 0 {
@ -174,6 +170,7 @@ func (b *Buffer) ReadFrom(reader io.Reader) (int64, error) {
return int64(n), err
}
// ReadFullFrom reads exact size of bytes from given reader, or until error occurs.
func (b *Buffer) ReadFullFrom(reader io.Reader, size int32) (int64, error) {
end := b.end + size
if end > int32(len(b.v)) {

View File

@ -8,7 +8,6 @@ import (
"v2ray.com/core/common"
. "v2ray.com/core/common/buf"
"v2ray.com/core/common/compare"
"v2ray.com/core/common/serial"
. "v2ray.com/ext/assert"
)
@ -41,7 +40,7 @@ func TestBufferString(t *testing.T) {
buffer := New()
defer buffer.Release()
assert(buffer.AppendSupplier(serial.WriteString("Test String")), IsNil)
common.Must2(buffer.WriteString("Test String"))
assert(buffer.String(), Equals, "Test String")
}

View File

@ -245,10 +245,10 @@ func NewAuthenticationWriter(auth Authenticator, sizeParser ChunkSizeEncoder, wr
}
func (w *AuthenticationWriter) seal(b *buf.Buffer) (*buf.Buffer, error) {
encryptedSize := int(b.Len()) + w.auth.Overhead()
var paddingSize int
encryptedSize := b.Len() + int32(w.auth.Overhead())
var paddingSize int32
if w.padding != nil {
paddingSize = int(w.padding.NextPaddingLen())
paddingSize = int32(w.padding.NextPaddingLen())
}
totalSize := encryptedSize + paddingSize
@ -257,14 +257,8 @@ func (w *AuthenticationWriter) seal(b *buf.Buffer) (*buf.Buffer, error) {
}
eb := buf.New()
common.Must(eb.Reset(func(bb []byte) (int, error) {
w.sizeParser.Encode(uint16(encryptedSize+paddingSize), bb)
return int(w.sizeParser.SizeBytes()), nil
}))
if err := eb.AppendSupplier(func(bb []byte) (int, error) {
_, err := w.auth.Seal(bb[:0], b.Bytes())
return encryptedSize, err
}); err != nil {
w.sizeParser.Encode(uint16(encryptedSize+paddingSize), eb.Extend(w.sizeParser.SizeBytes()))
if _, err := w.auth.Seal(eb.Extend(encryptedSize)[:0], b.Bytes()); err != nil {
eb.Release()
return nil, err
}

View File

@ -146,10 +146,7 @@ func (w *ChunkStreamWriter) WriteMultiBuffer(mb buf.MultiBuffer) error {
slice := mb.SliceBySize(sliceSize)
b := buf.New()
common.Must(b.Reset(func(buffer []byte) (int, error) {
w.sizeEncoder.Encode(uint16(slice.Len()), buffer)
return int(w.sizeEncoder.SizeBytes()), nil
}))
w.sizeEncoder.Encode(uint16(slice.Len()), b.Extend(w.sizeEncoder.SizeBytes()))
mb2Write.Append(b)
mb2Write.AppendMulti(slice)

View File

@ -1,12 +0,0 @@
package serial
import (
"hash"
)
func WriteHash(h hash.Hash) func(b []byte) (int, error) {
return func(b []byte) (int, error) {
h.Sum(b[:0])
return h.Size(), nil
}
}

View File

@ -3,7 +3,6 @@ package blackhole
import (
"v2ray.com/core/common"
"v2ray.com/core/common/buf"
"v2ray.com/core/common/serial"
)
const (
@ -28,7 +27,7 @@ func (*NoneResponse) WriteTo(buf.Writer) int32 { return 0 }
// WriteTo implements ResponseConfig.WriteTo().
func (*HTTPResponse) WriteTo(writer buf.Writer) int32 {
b := buf.New()
common.Must(b.Reset(serial.WriteString(http403response)))
common.Must2(b.Write([]byte(http403response)))
n := b.Len()
writer.WriteMultiBuffer(buf.NewMultiBufferValue(b))
return n

View File

@ -198,13 +198,10 @@ func (c *AEADCipher) EncodePacket(key []byte, b *buf.Buffer) error {
ivLen := c.IVSize()
payloadLen := b.Len()
auth := c.createAuthenticator(key, b.BytesTo(ivLen))
return b.Reset(func(bb []byte) (int, error) {
bbb, err := auth.Seal(bb[:ivLen], bb[ivLen:payloadLen])
if err != nil {
return 0, err
}
return len(bbb), nil
})
b.Extend(int32(auth.Overhead()))
_, err := auth.Seal(b.BytesTo(ivLen), b.BytesRange(ivLen, payloadLen))
return err
}
func (c *AEADCipher) DecodePacket(key []byte, b *buf.Buffer) error {
@ -214,16 +211,12 @@ func (c *AEADCipher) DecodePacket(key []byte, b *buf.Buffer) error {
ivLen := c.IVSize()
payloadLen := b.Len()
auth := c.createAuthenticator(key, b.BytesTo(ivLen))
if err := b.Reset(func(bb []byte) (int, error) {
bbb, err := auth.Open(bb[:ivLen], bb[ivLen:payloadLen])
bbb, err := auth.Open(b.BytesTo(ivLen), b.BytesRange(ivLen, payloadLen))
if err != nil {
return 0, err
}
return len(bbb), nil
}); err != nil {
return err
}
b.Advance(ivLen)
b.Resize(ivLen, ivLen+int32(len(bbb)))
return nil
}

View File

@ -30,13 +30,11 @@ func NewAuthenticator(keygen KeyGenerator) *Authenticator {
}
}
func (v *Authenticator) Authenticate(data []byte) buf.Supplier {
func (v *Authenticator) Authenticate(data []byte, dest []byte) {
hasher := hmac.New(sha1.New, v.key())
common.Must2(hasher.Write(data))
res := hasher.Sum(nil)
return func(b []byte) (int, error) {
return copy(b, res[:AuthSize]), nil
}
copy(dest, res[:AuthSize])
}
func HeaderKeyGenerator(key []byte, iv []byte) func() []byte {
@ -89,7 +87,7 @@ func (v *ChunkReader) ReadMultiBuffer() (buf.MultiBuffer, error) {
payload := buffer[AuthSize:size]
actualAuthBytes := make([]byte, AuthSize)
v.auth.Authenticate(payload)(actualAuthBytes)
v.auth.Authenticate(payload, actualAuthBytes)
if !bytes.Equal(authBytes, actualAuthBytes) {
return nil, newError("invalid auth")
}
@ -121,7 +119,7 @@ func (w *ChunkWriter) WriteMultiBuffer(mb buf.MultiBuffer) error {
for {
payloadLen, _ := mb.Read(w.buffer[2+AuthSize:])
binary.BigEndian.PutUint16(w.buffer, uint16(payloadLen))
w.auth.Authenticate(w.buffer[2+AuthSize : 2+AuthSize+payloadLen])(w.buffer[2:])
w.auth.Authenticate(w.buffer[2+AuthSize:2+AuthSize+payloadLen], w.buffer[2:])
if err := buf.WriteAllBytes(w.writer, w.buffer[:2+AuthSize+payloadLen]); err != nil {
return err
}

View File

@ -142,7 +142,8 @@ func WriteTCPRequest(request *protocol.RequestHeader, writer io.Writer) (buf.Wri
header.SetByte(0, header.Byte(0)|0x10)
authenticator := NewAuthenticator(HeaderKeyGenerator(account.Key, iv))
common.Must(header.AppendSupplier(authenticator.Authenticate(header.Bytes())))
authBuffer := header.Extend(AuthSize)
authenticator.Authenticate(header.Bytes(), authBuffer)
}
if err := w.WriteMultiBuffer(buf.NewMultiBufferValue(header)); err != nil {
@ -210,7 +211,9 @@ func EncodeUDPPacket(request *protocol.RequestHeader, payload []byte) (*buf.Buff
authenticator := NewAuthenticator(HeaderKeyGenerator(account.Key, iv))
buffer.SetByte(ivLen, buffer.Byte(ivLen)|0x10)
common.Must(buffer.AppendSupplier(authenticator.Authenticate(buffer.BytesFrom(ivLen))))
authPayload := buffer.BytesFrom(ivLen)
authBuffer := buffer.Extend(AuthSize)
authenticator.Authenticate(authPayload, authBuffer)
}
if err := account.Cipher.EncodePacket(account.Key, buffer); err != nil {
return nil, newError("failed to encrypt UDP payload").Base(err)

View File

@ -7,7 +7,6 @@ import (
"v2ray.com/core/common/buf"
"v2ray.com/core/common/net"
"v2ray.com/core/common/protocol"
"v2ray.com/core/common/serial"
. "v2ray.com/core/proxy/shadowsocks"
. "v2ray.com/ext/assert"
)
@ -37,7 +36,7 @@ func TestUDPEncoding(t *testing.T) {
}
data := buf.New()
data.AppendSupplier(serial.WriteString("test string"))
common.Must2(data.WriteString("test string"))
encodedData, err := EncodeUDPPacket(request, data.Bytes())
assert(err, IsNil)
@ -168,7 +167,7 @@ func TestUDPReaderWriter(t *testing.T) {
{
b := buf.New()
b.AppendSupplier(serial.WriteString("test payload"))
common.Must2(b.WriteString("test payload"))
err := writer.WriteMultiBuffer(buf.NewMultiBufferValue(b))
assert(err, IsNil)
@ -179,7 +178,7 @@ func TestUDPReaderWriter(t *testing.T) {
{
b := buf.New()
b.AppendSupplier(serial.WriteString("test payload 2"))
common.Must2(b.WriteString("test payload 2"))
err := writer.WriteMultiBuffer(buf.NewMultiBufferValue(b))
assert(err, IsNil)

View File

@ -16,7 +16,6 @@ import (
"v2ray.com/core/common/crypto"
"v2ray.com/core/common/dice"
"v2ray.com/core/common/protocol"
"v2ray.com/core/common/serial"
"v2ray.com/core/common/vio"
"v2ray.com/core/proxy/vmess"
)
@ -88,7 +87,8 @@ func (c *ClientSession) EncodeRequestHeader(header *protocol.RequestHeader, writ
{
fnv1a := fnv.New32a()
common.Must2(fnv1a.Write(buffer.Bytes()))
common.Must(buffer.AppendSupplier(serial.WriteHash(fnv1a)))
hashBytes := buffer.Extend(int32(fnv1a.Size()))
fnv1a.Sum(hashBytes[:0])
}
iv := hashTimestamp(md5.New(), timestamp)

View File

@ -9,7 +9,7 @@ import (
type PacketHeader interface {
Size() int32
Write([]byte) (int, error)
Serialize([]byte)
}
func CreatePacketHeader(config interface{}) (PacketHeader, error) {

View File

@ -13,7 +13,6 @@ import (
"v2ray.com/core/common"
"v2ray.com/core/common/buf"
"v2ray.com/core/common/serial"
)
const (
@ -29,7 +28,6 @@ const (
var (
ErrHeaderToLong = newError("Header too long.")
writeCRLF = serial.WriteString(CRLF)
)
type Reader interface {
@ -70,12 +68,12 @@ func (*HeaderReader) Read(reader io.Reader) (*buf.Buffer, error) {
endingDetected = true
break
}
if buffer.Len() >= int32(len(ENDING)) {
totalBytes += buffer.Len() - int32(len(ENDING))
leftover := buffer.BytesFrom(-int32(len(ENDING)))
buffer.Reset(func(b []byte) (int, error) {
return copy(b, leftover), nil
})
lenEnding := int32(len(ENDING))
if buffer.Len() >= lenEnding {
totalBytes += buffer.Len() - lenEnding
leftover := buffer.BytesFrom(-lenEnding)
buffer.Clear()
copy(buffer.Extend(lenEnding), leftover)
}
}
if buffer.IsEmpty() {
@ -175,20 +173,20 @@ func (c *HttpConn) Close() error {
func formResponseHeader(config *ResponseConfig) *HeaderWriter {
header := buf.New()
header.AppendSupplier(serial.WriteString(strings.Join([]string{config.GetFullVersion(), config.GetStatusValue().Code, config.GetStatusValue().Reason}, " ")))
header.AppendSupplier(writeCRLF)
common.Must2(header.WriteString(strings.Join([]string{config.GetFullVersion(), config.GetStatusValue().Code, config.GetStatusValue().Reason}, " ")))
common.Must2(header.WriteString(CRLF))
headers := config.PickHeaders()
for _, h := range headers {
header.AppendSupplier(serial.WriteString(h))
header.AppendSupplier(writeCRLF)
common.Must2(header.WriteString(h))
common.Must2(header.WriteString(CRLF))
}
if !config.HasHeader("Date") {
header.AppendSupplier(serial.WriteString("Date: "))
header.AppendSupplier(serial.WriteString(time.Now().Format(http.TimeFormat)))
header.AppendSupplier(writeCRLF)
common.Must2(header.WriteString("Date: "))
common.Must2(header.WriteString(time.Now().Format(http.TimeFormat)))
common.Must2(header.WriteString(CRLF))
}
header.AppendSupplier(writeCRLF)
common.Must2(header.WriteString(CRLF))
return &HeaderWriter{
header: header,
}
@ -201,15 +199,15 @@ type HttpAuthenticator struct {
func (a HttpAuthenticator) GetClientWriter() *HeaderWriter {
header := buf.New()
config := a.config.Request
header.AppendSupplier(serial.WriteString(strings.Join([]string{config.GetMethodValue(), config.PickUri(), config.GetFullVersion()}, " ")))
header.AppendSupplier(writeCRLF)
common.Must2(header.WriteString(strings.Join([]string{config.GetMethodValue(), config.PickUri(), config.GetFullVersion()}, " ")))
common.Must2(header.WriteString(CRLF))
headers := config.PickHeaders()
for _, h := range headers {
header.AppendSupplier(serial.WriteString(h))
header.AppendSupplier(writeCRLF)
common.Must2(header.WriteString(h))
common.Must2(header.WriteString(CRLF))
}
header.AppendSupplier(writeCRLF)
common.Must2(header.WriteString(CRLF))
return &HeaderWriter{
header: header,
}

View File

@ -5,9 +5,9 @@ import (
"testing"
"time"
"v2ray.com/core/common"
"v2ray.com/core/common/buf"
"v2ray.com/core/common/net"
"v2ray.com/core/common/serial"
. "v2ray.com/core/transport/internet/headers/http"
. "v2ray.com/ext/assert"
)
@ -17,7 +17,7 @@ func TestReaderWriter(t *testing.T) {
cache := buf.New()
b := buf.New()
b.AppendSupplier(serial.WriteString("abcd" + ENDING))
common.Must2(b.WriteString("abcd" + ENDING))
writer := NewHeaderWriter(b)
err := writer.Write(cache)
assert(err, IsNil)

View File

@ -13,10 +13,8 @@ func (NoOpHeader) Size() int32 {
return 0
}
// Write implements io.Writer.
func (NoOpHeader) Write([]byte) (int, error) {
return 0, nil
}
// Serialize implements PacketHeader.
func (NoOpHeader) Serialize([]byte) {}
func NewNoOpHeader(context.Context, interface{}) (interface{}, error) {
return NoOpHeader{}, nil

View File

@ -17,12 +17,11 @@ func (*SRTP) Size() int32 {
return 4
}
// Write implements io.Writer.
func (s *SRTP) Write(b []byte) (int, error) {
// Serialize implements PacketHeader.
func (s *SRTP) Serialize(b []byte) {
s.number++
binary.BigEndian.PutUint16(b, s.number)
binary.BigEndian.PutUint16(b[2:], s.number)
return 4, nil
}
// New returns a new SRTP instance based on the given config.

View File

@ -19,7 +19,7 @@ func TestSRTPWrite(t *testing.T) {
srtp := srtpRaw.(*SRTP)
payload := buf.New()
payload.AppendSupplier(srtp.Write)
srtp.Serialize(payload.Extend(srtp.Size()))
payload.Write(content)
assert(payload.Len(), Equals, int32(len(content))+srtp.Size())

View File

@ -19,8 +19,8 @@ func (*DTLS) Size() int32 {
return 1 + 2 + 2 + 6 + 2
}
// Write implements PacketHeader.
func (d *DTLS) Write(b []byte) (int, error) {
// Serialize implements PacketHeader.
func (d *DTLS) Serialize(b []byte) {
b[0] = 23 // application data
b[1] = 254
b[2] = 253
@ -39,7 +39,6 @@ func (d *DTLS) Write(b []byte) (int, error) {
if d.length > 100 {
d.length -= 50
}
return 13, nil
}
// New creates a new UTP header for the given config.

View File

@ -19,7 +19,7 @@ func TestDTLSWrite(t *testing.T) {
dtls := dtlsRaw.(*DTLS)
payload := buf.New()
payload.AppendSupplier(dtls.Write)
dtls.Serialize(payload.Extend(dtls.Size()))
payload.Write(content)
assert(payload.Len(), Equals, int32(len(content))+dtls.Size())

View File

@ -18,12 +18,11 @@ func (*UTP) Size() int32 {
return 4
}
// Write implements io.Writer.
func (u *UTP) Write(b []byte) (int, error) {
// Serialize implements PacketHeader.
func (u *UTP) Serialize(b []byte) {
binary.BigEndian.PutUint16(b, u.connectionId)
b[2] = u.header
b[3] = u.extension
return 4, nil
}
// New creates a new UTP header for the given config.

View File

@ -19,7 +19,7 @@ func TestUTPWrite(t *testing.T) {
utp := utpRaw.(*UTP)
payload := buf.New()
payload.AppendSupplier(utp.Write)
utp.Serialize(payload.Extend(utp.Size()))
payload.Write(content)
assert(payload.Len(), Equals, int32(len(content))+utp.Size())

View File

@ -16,8 +16,8 @@ func (vc *VideoChat) Size() int32 {
return 13
}
// Write implements io.Writer.
func (vc *VideoChat) Write(b []byte) (int, error) {
// Serialize implements PacketHeader.
func (vc *VideoChat) Serialize(b []byte) {
vc.sn++
b[0] = 0xa1
b[1] = 0x08
@ -29,7 +29,6 @@ func (vc *VideoChat) Write(b []byte) (int, error) {
b[10] = 0x30
b[11] = 0x22
b[12] = 0x30
return 13, nil
}
// NewVideoChat returns a new VideoChat instance based on given config.

View File

@ -18,7 +18,7 @@ func TestUTPWrite(t *testing.T) {
video := videoRaw.(*VideoChat)
payload := buf.New()
payload.AppendSupplier(video.Write)
video.Serialize(payload.Extend(video.Size()))
assert(payload.Len(), Equals, video.Size())
}

View File

@ -12,10 +12,12 @@ func (Wireguard) Size() int32 {
return 4
}
// Write implements io.Writer.
func (Wireguard) Write(b []byte) (int, error) {
b = append(b[:0], 0x04, 0x00, 0x00, 0x00)
return 4, nil
// Serialize implements PacketHeader.
func (Wireguard) Serialize(b []byte) {
b[0] = 0x04
b[1] = 0x00
b[2] = 0x00
b[3] = 0x00
}
// NewWireguard returns a new VideoChat instance based on given config.

View File

@ -74,20 +74,15 @@ func (w *KCPPacketWriter) Write(b []byte) (int, error) {
defer bb.Release()
if w.Header != nil {
common.Must(bb.AppendSupplier(func(x []byte) (int, error) {
return w.Header.Write(x)
}))
w.Header.Serialize(bb.Extend(w.Header.Size()))
}
if w.Security != nil {
nonceSize := w.Security.NonceSize()
common.Must(bb.AppendSupplier(func(x []byte) (int, error) {
return rand.Read(x[:nonceSize])
}))
common.Must2(bb.ReadFullFrom(rand.Reader, int32(nonceSize)))
nonce := bb.BytesFrom(int32(-nonceSize))
common.Must(bb.AppendSupplier(func(x []byte) (int, error) {
eb := w.Security.Seal(x[:0], nonce, b, nil)
return len(eb), nil
}))
encrypted := bb.Extend(int32(w.Security.Overhead() + len(b)))
w.Security.Seal(encrypted[:0], nonce, b, nil)
} else {
bb.Write(b)
}

View File

@ -6,7 +6,6 @@ import (
"v2ray.com/core/common/retry"
"v2ray.com/core/common"
"v2ray.com/core/common/buf"
)
@ -31,7 +30,9 @@ func (w *SimpleSegmentWriter) Write(seg Segment) error {
w.Lock()
defer w.Unlock()
common.Must(w.buffer.Reset(seg.Bytes()))
w.buffer.Clear()
rawBytes := w.buffer.Extend(seg.ByteSize())
seg.Serialize(rawBytes)
_, err := w.writer.Write(w.buffer.Bytes())
return err
}

View File

@ -31,7 +31,7 @@ type Segment interface {
Conversation() uint16
Command() Command
ByteSize() int32
Bytes() buf.Supplier
Serialize([]byte)
parse(conv uint16, cmd Command, opt SegmentOption, buf []byte) (bool, []byte)
}
@ -104,8 +104,7 @@ func (s *DataSegment) Data() *buf.Buffer {
return s.payload
}
func (s *DataSegment) Bytes() buf.Supplier {
return func(b []byte) (int, error) {
func (s *DataSegment) Serialize(b []byte) {
binary.BigEndian.PutUint16(b, s.Conv)
b[2] = byte(CommandData)
b[3] = byte(s.Option)
@ -113,9 +112,7 @@ func (s *DataSegment) Bytes() buf.Supplier {
binary.BigEndian.PutUint32(b[8:], s.Number)
binary.BigEndian.PutUint32(b[12:], s.SendingNext)
binary.BigEndian.PutUint16(b[16:], uint16(s.payload.Len()))
n := copy(b[18:], s.payload.Bytes())
return 18 + n, nil
}
copy(b[18:], s.payload.Bytes())
}
func (s *DataSegment) ByteSize() int32 {
@ -202,8 +199,7 @@ func (s *AckSegment) ByteSize() int32 {
return 2 + 1 + 1 + 4 + 4 + 4 + 1 + int32(len(s.NumberList)*4)
}
func (s *AckSegment) Bytes() buf.Supplier {
return func(b []byte) (int, error) {
func (s *AckSegment) Serialize(b []byte) {
binary.BigEndian.PutUint16(b, s.Conv)
b[2] = byte(CommandACK)
b[3] = byte(s.Option)
@ -216,8 +212,6 @@ func (s *AckSegment) Bytes() buf.Supplier {
binary.BigEndian.PutUint32(b[n:], number)
n += 4
}
return n, nil
}
}
func (s *AckSegment) Release() {}
@ -268,16 +262,13 @@ func (*CmdOnlySegment) ByteSize() int32 {
return 2 + 1 + 1 + 4 + 4 + 4
}
func (s *CmdOnlySegment) Bytes() buf.Supplier {
return func(b []byte) (int, error) {
func (s *CmdOnlySegment) Serialize(b []byte) {
binary.BigEndian.PutUint16(b, s.Conv)
b[2] = byte(s.Cmd)
b[3] = byte(s.Option)
binary.BigEndian.PutUint32(b[4:], s.SendingNext)
binary.BigEndian.PutUint32(b[8:], s.ReceivingNext)
binary.BigEndian.PutUint32(b[12:], s.PeerRTO)
return 16, nil
}
}
func (*CmdOnlySegment) Release() {}

View File

@ -28,7 +28,7 @@ func TestDataSegment(t *testing.T) {
nBytes := seg.ByteSize()
bytes := make([]byte, nBytes)
seg.Bytes()(bytes)
seg.Serialize(bytes)
assert(int32(len(bytes)), Equals, nBytes)
@ -54,7 +54,7 @@ func Test1ByteDataSegment(t *testing.T) {
nBytes := seg.ByteSize()
bytes := make([]byte, nBytes)
seg.Bytes()(bytes)
seg.Serialize(bytes)
assert(int32(len(bytes)), Equals, nBytes)
@ -80,7 +80,7 @@ func TestACKSegment(t *testing.T) {
nBytes := seg.ByteSize()
bytes := make([]byte, nBytes)
seg.Bytes()(bytes)
seg.Serialize(bytes)
assert(int32(len(bytes)), Equals, nBytes)
@ -110,7 +110,7 @@ func TestCmdSegment(t *testing.T) {
nBytes := seg.ByteSize()
bytes := make([]byte, nBytes)
seg.Bytes()(bytes)
seg.Serialize(bytes)
assert(int32(len(bytes)), Equals, nBytes)

View File

@ -88,18 +88,15 @@ func (h *Hub) start() {
buffer := buf.New()
var noob int
var addr *net.UDPAddr
err := buffer.Reset(func(b []byte) (int, error) {
n, nb, _, a, e := ReadUDPMsg(h.conn, b, oobBytes)
noob = nb
addr = a
return n, e
})
rawBytes := buffer.Extend(buf.Size)
n, noob, _, addr, err := ReadUDPMsg(h.conn, rawBytes, oobBytes)
if err != nil {
newError("failed to read UDP msg").Base(err).WriteToLog()
buffer.Release()
break
}
buffer.Resize(0, int32(n))
if buffer.IsEmpty() {
buffer.Release()