1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2025-01-17 23:06:30 -05:00

faster udp reading

This commit is contained in:
Darien Raymond 2018-05-27 01:19:05 +02:00
parent fde877e276
commit cad07c3a83
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169
2 changed files with 34 additions and 5 deletions

View File

@ -124,6 +124,32 @@ func (c *udpConn) updateActivity() {
atomic.StoreInt64(&c.lastActivityTime, time.Now().Unix()) atomic.StoreInt64(&c.lastActivityTime, time.Now().Unix())
} }
// ReadMultiBuffer implements buf.Reader
func (c *udpConn) ReadMultiBuffer() (buf.MultiBuffer, error) {
var payload buf.MultiBuffer
select {
case in := <-c.input:
payload.Append(in)
case <-c.done.Wait():
return nil, io.EOF
}
L:
for {
select {
case in := <-c.input:
payload.Append(in)
case <-c.done.Wait():
break L
default:
break L
}
}
return payload, nil
}
func (c *udpConn) Read(buf []byte) (int, error) { func (c *udpConn) Read(buf []byte) (int, error) {
select { select {
case in := <-c.input: case in := <-c.input:
@ -202,7 +228,7 @@ func (w *udpWorker) getConnection(id connID) (*udpConn, bool) {
w.Lock() w.Lock()
defer w.Unlock() defer w.Unlock()
if conn, found := w.activeConn[id]; found { if conn, found := w.activeConn[id]; found && !conn.done.Done() {
return conn, true return conn, true
} }

View File

@ -1,6 +1,7 @@
package mux package mux
import ( import (
"v2ray.com/core/common"
"v2ray.com/core/common/bitmask" "v2ray.com/core/common/bitmask"
"v2ray.com/core/common/buf" "v2ray.com/core/common/buf"
"v2ray.com/core/common/net" "v2ray.com/core/common/net"
@ -58,21 +59,21 @@ type FrameMetadata struct {
func (f FrameMetadata) WriteTo(b *buf.Buffer) error { func (f FrameMetadata) WriteTo(b *buf.Buffer) error {
lenBytes := b.Bytes() lenBytes := b.Bytes()
b.AppendBytes(0x00, 0x00) common.Must2(b.AppendBytes(0x00, 0x00))
len0 := b.Len() len0 := b.Len()
if err := b.AppendSupplier(serial.WriteUint16(f.SessionID)); err != nil { if err := b.AppendSupplier(serial.WriteUint16(f.SessionID)); err != nil {
return err return err
} }
b.AppendBytes(byte(f.SessionStatus), byte(f.Option)) common.Must2(b.AppendBytes(byte(f.SessionStatus), byte(f.Option)))
if f.SessionStatus == SessionStatusNew { if f.SessionStatus == SessionStatusNew {
switch f.Target.Network { switch f.Target.Network {
case net.Network_TCP: case net.Network_TCP:
b.AppendBytes(byte(TargetNetworkTCP)) common.Must2(b.AppendBytes(byte(TargetNetworkTCP)))
case net.Network_UDP: case net.Network_UDP:
b.AppendBytes(byte(TargetNetworkUDP)) common.Must2(b.AppendBytes(byte(TargetNetworkUDP)))
} }
if err := addrParser.WriteAddressPort(b, f.Target.Address, f.Target.Port); err != nil { if err := addrParser.WriteAddressPort(b, f.Target.Address, f.Target.Port); err != nil {
@ -85,6 +86,8 @@ func (f FrameMetadata) WriteTo(b *buf.Buffer) error {
return nil return nil
} }
// ReadFrameFrom reads a FrameMetadata from the given buffer.
// Visible for testing only.
func ReadFrameFrom(b *buf.Buffer) (*FrameMetadata, error) { func ReadFrameFrom(b *buf.Buffer) (*FrameMetadata, error) {
if b.Len() < 4 { if b.Len() < 4 {
return nil, newError("insufficient buffer: ", b.Len()) return nil, newError("insufficient buffer: ", b.Len())