From d12737b3b8d2516515b61349e8513810503ad524 Mon Sep 17 00:00:00 2001 From: V2Ray Date: Sun, 13 Sep 2015 20:01:50 +0200 Subject: [PATCH] Simplify channel operations --- config.go | 2 +- io/bufferset.go | 75 ------------------------------------ io/vmess/vmess.go | 7 ++-- net/{vdest.go => address.go} | 2 +- net/freedom/freedom.go | 28 +++----------- net/socks/socks.go | 29 +++----------- net/transport.go | 34 ++++++++++++++++ net/vmess/vmess.go | 5 --- net/vmess/vmessin.go | 28 +++----------- net/vmess/vmessout.go | 27 +++---------- release/server/main.go | 2 +- 11 files changed, 63 insertions(+), 176 deletions(-) delete mode 100644 io/bufferset.go rename net/{vdest.go => address.go} (98%) create mode 100644 net/transport.go delete mode 100644 net/vmess/vmess.go diff --git a/config.go b/config.go index 58781d57f..26d87e171 100644 --- a/config.go +++ b/config.go @@ -16,7 +16,7 @@ type ConnectionConfig struct { // Config is the config for Point server. type Config struct { - Port uint16 `json:"port"` // Port of this Point server. + Port uint16 `json:"port"` // Port of this Point server. InboundConfig ConnectionConfig `json:"inbound"` OutboundConfig ConnectionConfig `json:"outbound"` } diff --git a/io/bufferset.go b/io/bufferset.go deleted file mode 100644 index 720257200..000000000 --- a/io/bufferset.go +++ /dev/null @@ -1,75 +0,0 @@ -package io - -import ( - "errors" -) - -const ( - SizeSmall = 16 - SizeMedium = 128 - SizeLarge = 512 -) - -var ( - ErrorNoChannel = errors.New("No suitable channels found.") -) - -type BufferSet struct { - small chan []byte - medium chan []byte - large chan []byte -} - -func NewBufferSet() *BufferSet { - bSet := new(BufferSet) - bSet.small = make(chan []byte, 128) - bSet.medium = make(chan []byte, 128) - bSet.large = make(chan []byte, 128) - return bSet -} - -func (bSet *BufferSet) detectBucket(size int, strict bool) (chan []byte, error) { - if strict { - if size == SizeSmall { - return bSet.small, nil - } else if size == SizeMedium { - return bSet.medium, nil - } else if size == SizeLarge { - return bSet.large, nil - } - } else { - if size <= SizeSmall { - return bSet.small, nil - } else if size <= SizeMedium { - return bSet.medium, nil - } else if size <= SizeLarge { - return bSet.large, nil - } - } - return nil, ErrorNoChannel -} - -func (bSet *BufferSet) FetchBuffer(minSize int) []byte { - var buffer []byte - byteChan, err := bSet.detectBucket(minSize, false) - if err != nil { - return make([]byte, minSize) - } - select { - case buffer = <-byteChan: - default: - buffer = make([]byte, minSize) - } - return buffer -} - -func (bSet *BufferSet) ReturnBuffer(buffer []byte) { - byteChan, err := bSet.detectBucket(len(buffer), true) - if err != nil { - return - } - select { - case byteChan <- buffer: - default: - } -} diff --git a/io/vmess/vmess.go b/io/vmess/vmess.go index 86cb46815..69158aa3b 100644 --- a/io/vmess/vmess.go +++ b/io/vmess/vmess.go @@ -9,7 +9,6 @@ import ( "errors" "fmt" "io" - _ "log" mrand "math/rand" "github.com/v2ray/v2ray-core" @@ -27,6 +26,8 @@ const ( var ( ErrorInvalidUser = errors.New("Invalid User") + + emptyIV = make([]byte, blockSize) ) // VMessRequest implements the request message of VMess protocol. It only contains @@ -75,7 +76,7 @@ func (r *VMessRequestReader) Read(reader io.Reader) (*VMessRequest, error) { } request.UserId = *userId - decryptor, err := NewDecryptionReader(reader, userId.Hash([]byte("PWD")), make([]byte, blockSize)) + decryptor, err := NewDecryptionReader(reader, userId.Hash([]byte("PWD")), emptyIV) if err != nil { return nil, err } @@ -224,7 +225,7 @@ func (w *VMessRequestWriter) Write(writer io.Writer, request *VMessRequest) erro if err != nil { return err } - aesStream := cipher.NewCFBEncrypter(aesCipher, make([]byte, blockSize)) + aesStream := cipher.NewCFBEncrypter(aesCipher, emptyIV) cWriter := v2io.NewCryptionWriter(aesStream, writer) _, err = writer.Write(buffer[0:encryptionBegin]) diff --git a/net/vdest.go b/net/address.go similarity index 98% rename from net/vdest.go rename to net/address.go index 6855b4053..5d6f62a94 100644 --- a/net/vdest.go +++ b/net/address.go @@ -1,4 +1,4 @@ -package core +package net import ( "net" diff --git a/net/freedom/freedom.go b/net/freedom/freedom.go index 8be772d4b..dd496764c 100644 --- a/net/freedom/freedom.go +++ b/net/freedom/freedom.go @@ -1,7 +1,6 @@ package freedom import ( - "io" "net" "github.com/v2ray/v2ray-core" @@ -36,31 +35,14 @@ func (vconn *FreedomConnection) Start(ray core.OutboundRay) error { } func (vconn *FreedomConnection) DumpInput(conn net.Conn, input <-chan []byte, finish chan<- bool) { - for { - data, open := <-input - if !open { - finish <- true - log.Debug("Freedom finishing input.") - break - } - nBytes, err := conn.Write(data) - log.Debug("Freedom wrote %d bytes with error %v", nBytes, err) - } + v2net.ChanToWriter(conn, input) + finish <- true } func (vconn *FreedomConnection) DumpOutput(conn net.Conn, output chan<- []byte, finish chan<- bool) { - for { - buffer := make([]byte, 512) - nBytes, err := conn.Read(buffer) - log.Debug("Freedom reading %d bytes with error %v", nBytes, err) - if err == io.EOF { - close(output) - finish <- true - log.Debug("Freedom finishing output.") - break - } - output <- buffer[:nBytes] - } + v2net.ReaderToChan(output, conn) + close(output) + finish <- true } func (vconn *FreedomConnection) CloseConn(conn net.Conn, finish <-chan bool) { diff --git a/net/socks/socks.go b/net/socks/socks.go index 6bc888891..ee2b1a272 100644 --- a/net/socks/socks.go +++ b/net/socks/socks.go @@ -2,13 +2,13 @@ package socks import ( "errors" - "io" "net" "strconv" "github.com/v2ray/v2ray-core" socksio "github.com/v2ray/v2ray-core/io/socks" "github.com/v2ray/v2ray-core/log" + v2net "github.com/v2ray/v2ray-core/net" ) var ( @@ -127,31 +127,14 @@ func (server *SocksServer) HandleConnection(connection net.Conn) error { } func (server *SocksServer) dumpInput(conn net.Conn, input chan<- []byte, finish chan<- bool) { - for { - buffer := make([]byte, 512) - nBytes, err := conn.Read(buffer) - log.Debug("Reading %d bytes, with error %v", nBytes, err) - if err == io.EOF { - close(input) - finish <- true - log.Debug("Socks finishing input.") - break - } - input <- buffer[:nBytes] - } + v2net.ReaderToChan(input, conn) + close(input) + finish <- true } func (server *SocksServer) dumpOutput(conn net.Conn, output <-chan []byte, finish chan<- bool) { - for { - buffer, open := <-output - if !open { - finish <- true - log.Debug("Socks finishing output") - break - } - nBytes, err := conn.Write(buffer) - log.Debug("Writing %d bytes with error %v", nBytes, err) - } + v2net.ChanToWriter(conn, output) + finish <- true } func (server *SocksServer) waitForFinish(finish <-chan bool) { diff --git a/net/transport.go b/net/transport.go new file mode 100644 index 000000000..d37dcc28d --- /dev/null +++ b/net/transport.go @@ -0,0 +1,34 @@ +package net + +import ( + "io" + + "github.com/v2ray/v2ray-core/log" +) + +const ( + bufferSize = 8192 +) + +func ReaderToChan(stream chan<- []byte, reader io.Reader) error { + for { + buffer := make([]byte, bufferSize) + nBytes, err := reader.Read(buffer) + if err != nil { + return err + } + stream <- buffer[:nBytes] + } + return nil +} + +func ChanToWriter(writer io.Writer, stream <-chan []byte) error { + for buffer := range stream { + nBytes, err := writer.Write(buffer) + log.Debug("Writing %d bytes with error %v", nBytes, err) + if err != nil { + return err + } + } + return nil +} diff --git a/net/vmess/vmess.go b/net/vmess/vmess.go deleted file mode 100644 index 59eb0ac8f..000000000 --- a/net/vmess/vmess.go +++ /dev/null @@ -1,5 +0,0 @@ -package vmess - -const ( - BufferSize = 512 -) diff --git a/net/vmess/vmessin.go b/net/vmess/vmessin.go index 3ac96c969..580597b63 100644 --- a/net/vmess/vmessin.go +++ b/net/vmess/vmessin.go @@ -10,6 +10,7 @@ import ( v2io "github.com/v2ray/v2ray-core/io" vmessio "github.com/v2ray/v2ray-core/io/vmess" "github.com/v2ray/v2ray-core/log" + v2net "github.com/v2ray/v2ray-core/net" ) type VMessInboundHandler struct { @@ -92,31 +93,14 @@ func (handler *VMessInboundHandler) HandleConnection(connection net.Conn) error } func (handler *VMessInboundHandler) dumpInput(reader io.Reader, input chan<- []byte, finish chan<- bool) { - for { - buffer := make([]byte, BufferSize) - nBytes, err := reader.Read(buffer) - log.Debug("VMessInbound: Reading %d bytes with error %v", nBytes, err) - if err == io.EOF { - close(input) - log.Debug("VMessInbound finishing input.") - finish <- true - break - } - input <- buffer[:nBytes] - } + v2net.ReaderToChan(input, reader) + close(input) + finish <- true } func (handler *VMessInboundHandler) dumpOutput(writer io.Writer, output <-chan []byte, finish chan<- bool) { - for { - buffer, open := <-output - if !open { - finish <- true - log.Debug("VMessInbound finishing output.") - break - } - nBytes, err := writer.Write(buffer) - log.Debug("VmessInbound: Wrote %d bytes with error %v", nBytes, err) - } + v2net.ChanToWriter(writer, output) + finish <- true } func (handler *VMessInboundHandler) waitForFinish(finish <-chan bool) { diff --git a/net/vmess/vmessout.go b/net/vmess/vmessout.go index 16c07c90c..ec77383f9 100644 --- a/net/vmess/vmessout.go +++ b/net/vmess/vmessout.go @@ -118,31 +118,14 @@ func (handler *VMessOutboundHandler) startCommunicate(request *vmessio.VMessRequ } func (handler *VMessOutboundHandler) dumpOutput(reader io.Reader, output chan<- []byte, finish chan<- bool) { - for { - buffer := make([]byte, BufferSize) - nBytes, err := reader.Read(buffer) - log.Debug("VMessOutbound: Reading %d bytes, with error %v", nBytes, err) - if err == io.EOF { - close(output) - finish <- true - log.Debug("VMessOutbound finishing output.") - break - } - output <- buffer[:nBytes] - } + v2net.ReaderToChan(output, reader) + close(output) + finish <- true } func (handler *VMessOutboundHandler) dumpInput(writer io.Writer, input <-chan []byte, finish chan<- bool) { - for { - buffer, open := <-input - if !open { - finish <- true - log.Debug("VMessOutbound finishing input.") - break - } - nBytes, err := writer.Write(buffer) - log.Debug("VMessOutbound: Wrote %d bytes with error %v", nBytes, err) - } + v2net.ChanToWriter(writer, input) + finish <- true } func (handler *VMessOutboundHandler) waitForFinish(finish <-chan bool) { diff --git a/release/server/main.go b/release/server/main.go index 0d3946086..2ee5365d8 100644 --- a/release/server/main.go +++ b/release/server/main.go @@ -8,7 +8,7 @@ import ( "github.com/v2ray/v2ray-core" "github.com/v2ray/v2ray-core/log" - // The following are neccesary as they register handlers in their init functions. + // The following are neccesary as they register handlers in their init functions. _ "github.com/v2ray/v2ray-core/net/freedom" _ "github.com/v2ray/v2ray-core/net/socks" _ "github.com/v2ray/v2ray-core/net/vmess"