diff --git a/proxy/freedom/freedom.go b/proxy/freedom/freedom.go index ec891c60b..db2a7273c 100644 --- a/proxy/freedom/freedom.go +++ b/proxy/freedom/freedom.go @@ -2,6 +2,7 @@ package freedom import ( "net" + "sync" "github.com/v2ray/v2ray-core" "github.com/v2ray/v2ray-core/common/log" @@ -41,31 +42,32 @@ func (vconn *FreedomConnection) Start(ray core.OutboundRay) error { input := ray.OutboundInput() output := ray.OutboundOutput() - readFinish := make(chan bool) - writeFinish := make(chan bool) + var readMutex, writeMutex sync.Mutex + readMutex.Lock() + writeMutex.Lock() - go dumpInput(conn, input, writeFinish) - go dumpOutput(conn, output, readFinish) + go dumpInput(conn, input, writeMutex) + go dumpOutput(conn, output, readMutex) go func() { - <-writeFinish + writeMutex.Lock() if tcpConn, ok := conn.(*net.TCPConn); ok { tcpConn.CloseWrite() } - <-readFinish + readMutex.Lock() conn.Close() }() return nil } -func dumpInput(conn net.Conn, input <-chan []byte, finish chan<- bool) { +func dumpInput(conn net.Conn, input <-chan []byte, finish sync.Mutex) { v2net.ChanToWriter(conn, input) - close(finish) + finish.Unlock() } -func dumpOutput(conn net.Conn, output chan<- []byte, finish chan<- bool) { +func dumpOutput(conn net.Conn, output chan<- []byte, finish sync.Mutex) { v2net.ReaderToChan(output, conn) + finish.Unlock() close(output) - close(finish) } diff --git a/proxy/socks/socks.go b/proxy/socks/socks.go index 4237a0311..f6b19ed1d 100644 --- a/proxy/socks/socks.go +++ b/proxy/socks/socks.go @@ -6,6 +6,7 @@ import ( "io" "net" "strconv" + "sync" "github.com/v2ray/v2ray-core" "github.com/v2ray/v2ray-core/common/log" @@ -173,23 +174,24 @@ func (server *SocksServer) HandleConnection(connection net.Conn) error { ray := server.vPoint.DispatchToOutbound(v2net.NewTCPPacket(dest)) input := ray.InboundInput() output := ray.InboundOutput() - readFinish := make(chan bool) - writeFinish := make(chan bool) + var readFinish, writeFinish sync.Mutex + readFinish.Lock() + writeFinish.Lock() go dumpInput(reader, input, readFinish) go dumpOutput(connection, output, writeFinish) - <-writeFinish + writeFinish.Lock() return nil } -func dumpInput(reader io.Reader, input chan<- []byte, finish chan<- bool) { +func dumpInput(reader io.Reader, input chan<- []byte, finish sync.Mutex) { v2net.ReaderToChan(input, reader) + finish.Unlock() close(input) - close(finish) } -func dumpOutput(writer io.Writer, output <-chan []byte, finish chan<- bool) { +func dumpOutput(writer io.Writer, output <-chan []byte, finish sync.Mutex) { v2net.ChanToWriter(writer, output) - close(finish) + finish.Unlock() } diff --git a/proxy/vmess/vmessin.go b/proxy/vmess/vmessin.go index e0c86ff34..7ae7b7cfe 100644 --- a/proxy/vmess/vmessin.go +++ b/proxy/vmess/vmessin.go @@ -5,6 +5,7 @@ import ( "io" "net" "strconv" + "sync" "time" "github.com/v2ray/v2ray-core" @@ -77,9 +78,9 @@ func (handler *VMessInboundHandler) HandleConnection(connection net.Conn) error ray := handler.vPoint.DispatchToOutbound(v2net.NewTCPPacket(request.Destination())) input := ray.InboundInput() output := ray.InboundOutput() - - readFinish := make(chan bool) - writeFinish := make(chan bool) + var readFinish, writeFinish sync.Mutex + readFinish.Lock() + writeFinish.Lock() go handleInput(request, connection, input, readFinish) @@ -100,20 +101,20 @@ func (handler *VMessInboundHandler) HandleConnection(connection net.Conn) error buffer = append(buffer, data...) responseWriter.Write(buffer) go handleOutput(request, responseWriter, output, writeFinish) - <-writeFinish + writeFinish.Lock() } if tcpConn, ok := connection.(*net.TCPConn); ok { tcpConn.CloseWrite() } - <-readFinish + readFinish.Lock() return nil } -func handleInput(request *protocol.VMessRequest, reader io.Reader, input chan<- []byte, finish chan<- bool) { +func handleInput(request *protocol.VMessRequest, reader io.Reader, input chan<- []byte, finish sync.Mutex) { defer close(input) - defer close(finish) + defer finish.Unlock() requestReader, err := v2io.NewAesDecryptReader(request.RequestKey[:], request.RequestIV[:], reader) if err != nil { @@ -124,9 +125,9 @@ func handleInput(request *protocol.VMessRequest, reader io.Reader, input chan<- v2net.ReaderToChan(input, requestReader) } -func handleOutput(request *protocol.VMessRequest, writer io.Writer, output <-chan []byte, finish chan<- bool) { +func handleOutput(request *protocol.VMessRequest, writer io.Writer, output <-chan []byte, finish sync.Mutex) { v2net.ChanToWriter(writer, output) - close(finish) + finish.Unlock() } type VMessInboundHandlerFactory struct { diff --git a/proxy/vmess/vmessout.go b/proxy/vmess/vmessout.go index 69b04b47d..86337663e 100644 --- a/proxy/vmess/vmessout.go +++ b/proxy/vmess/vmessout.go @@ -6,6 +6,7 @@ import ( "crypto/rand" mrand "math/rand" "net" + "sync" "github.com/v2ray/v2ray-core" v2io "github.com/v2ray/v2ray-core/common/io" @@ -109,21 +110,21 @@ func startCommunicate(request *protocol.VMessRequest, dest v2net.Destination, ra input := ray.OutboundInput() output := ray.OutboundOutput() - - requestFinish := make(chan bool) - responseFinish := make(chan bool) + var requestFinish, responseFinish sync.Mutex + requestFinish.Lock() + responseFinish.Lock() go handleRequest(conn, request, input, requestFinish) go handleResponse(conn, request, output, responseFinish) - <-requestFinish + requestFinish.Lock() conn.CloseWrite() - <-responseFinish + responseFinish.Lock() return nil } -func handleRequest(conn *net.TCPConn, request *protocol.VMessRequest, input <-chan []byte, finish chan<- bool) { - defer close(finish) +func handleRequest(conn *net.TCPConn, request *protocol.VMessRequest, input <-chan []byte, finish sync.Mutex) { + defer finish.Unlock() encryptRequestWriter, err := v2io.NewAesEncryptWriter(request.RequestKey[:], request.RequestIV[:], conn) if err != nil { log.Error("VMessOut: Failed to create encrypt writer: %v", err) @@ -154,8 +155,8 @@ func handleRequest(conn *net.TCPConn, request *protocol.VMessRequest, input <-ch return } -func handleResponse(conn *net.TCPConn, request *protocol.VMessRequest, output chan<- []byte, finish chan<- bool) { - defer close(finish) +func handleResponse(conn *net.TCPConn, request *protocol.VMessRequest, output chan<- []byte, finish sync.Mutex) { + defer finish.Unlock() defer close(output) responseKey := md5.Sum(request.RequestKey[:]) responseIV := md5.Sum(request.RequestIV[:])