1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2024-12-21 17:46:58 -05:00

Switch to Mutex for better readability

This commit is contained in:
V2Ray 2015-09-23 14:14:53 +02:00
parent c59dcc309c
commit 3fbae6795a
4 changed files with 41 additions and 35 deletions

View File

@ -2,6 +2,7 @@ package freedom
import ( import (
"net" "net"
"sync"
"github.com/v2ray/v2ray-core" "github.com/v2ray/v2ray-core"
"github.com/v2ray/v2ray-core/common/log" "github.com/v2ray/v2ray-core/common/log"
@ -41,31 +42,32 @@ func (vconn *FreedomConnection) Start(ray core.OutboundRay) error {
input := ray.OutboundInput() input := ray.OutboundInput()
output := ray.OutboundOutput() output := ray.OutboundOutput()
readFinish := make(chan bool) var readMutex, writeMutex sync.Mutex
writeFinish := make(chan bool) readMutex.Lock()
writeMutex.Lock()
go dumpInput(conn, input, writeFinish) go dumpInput(conn, input, writeMutex)
go dumpOutput(conn, output, readFinish) go dumpOutput(conn, output, readMutex)
go func() { go func() {
<-writeFinish writeMutex.Lock()
if tcpConn, ok := conn.(*net.TCPConn); ok { if tcpConn, ok := conn.(*net.TCPConn); ok {
tcpConn.CloseWrite() tcpConn.CloseWrite()
} }
<-readFinish readMutex.Lock()
conn.Close() conn.Close()
}() }()
return nil return nil
} }
func dumpInput(conn net.Conn, input <-chan []byte, finish chan<- bool) { func dumpInput(conn net.Conn, input <-chan []byte, finish sync.Mutex) {
v2net.ChanToWriter(conn, input) v2net.ChanToWriter(conn, input)
close(finish) finish.Unlock()
} }
func dumpOutput(conn net.Conn, output chan<- []byte, finish chan<- bool) { func dumpOutput(conn net.Conn, output chan<- []byte, finish sync.Mutex) {
v2net.ReaderToChan(output, conn) v2net.ReaderToChan(output, conn)
finish.Unlock()
close(output) close(output)
close(finish)
} }

View File

@ -6,6 +6,7 @@ import (
"io" "io"
"net" "net"
"strconv" "strconv"
"sync"
"github.com/v2ray/v2ray-core" "github.com/v2ray/v2ray-core"
"github.com/v2ray/v2ray-core/common/log" "github.com/v2ray/v2ray-core/common/log"
@ -173,23 +174,24 @@ func (server *SocksServer) HandleConnection(connection net.Conn) error {
ray := server.vPoint.DispatchToOutbound(v2net.NewTCPPacket(dest)) ray := server.vPoint.DispatchToOutbound(v2net.NewTCPPacket(dest))
input := ray.InboundInput() input := ray.InboundInput()
output := ray.InboundOutput() output := ray.InboundOutput()
readFinish := make(chan bool) var readFinish, writeFinish sync.Mutex
writeFinish := make(chan bool) readFinish.Lock()
writeFinish.Lock()
go dumpInput(reader, input, readFinish) go dumpInput(reader, input, readFinish)
go dumpOutput(connection, output, writeFinish) go dumpOutput(connection, output, writeFinish)
<-writeFinish writeFinish.Lock()
return nil return nil
} }
func dumpInput(reader io.Reader, input chan<- []byte, finish chan<- bool) { func dumpInput(reader io.Reader, input chan<- []byte, finish sync.Mutex) {
v2net.ReaderToChan(input, reader) v2net.ReaderToChan(input, reader)
finish.Unlock()
close(input) close(input)
close(finish)
} }
func dumpOutput(writer io.Writer, output <-chan []byte, finish chan<- bool) { func dumpOutput(writer io.Writer, output <-chan []byte, finish sync.Mutex) {
v2net.ChanToWriter(writer, output) v2net.ChanToWriter(writer, output)
close(finish) finish.Unlock()
} }

View File

@ -5,6 +5,7 @@ import (
"io" "io"
"net" "net"
"strconv" "strconv"
"sync"
"time" "time"
"github.com/v2ray/v2ray-core" "github.com/v2ray/v2ray-core"
@ -77,9 +78,9 @@ func (handler *VMessInboundHandler) HandleConnection(connection net.Conn) error
ray := handler.vPoint.DispatchToOutbound(v2net.NewTCPPacket(request.Destination())) ray := handler.vPoint.DispatchToOutbound(v2net.NewTCPPacket(request.Destination()))
input := ray.InboundInput() input := ray.InboundInput()
output := ray.InboundOutput() output := ray.InboundOutput()
var readFinish, writeFinish sync.Mutex
readFinish := make(chan bool) readFinish.Lock()
writeFinish := make(chan bool) writeFinish.Lock()
go handleInput(request, connection, input, readFinish) go handleInput(request, connection, input, readFinish)
@ -100,20 +101,20 @@ func (handler *VMessInboundHandler) HandleConnection(connection net.Conn) error
buffer = append(buffer, data...) buffer = append(buffer, data...)
responseWriter.Write(buffer) responseWriter.Write(buffer)
go handleOutput(request, responseWriter, output, writeFinish) go handleOutput(request, responseWriter, output, writeFinish)
<-writeFinish writeFinish.Lock()
} }
if tcpConn, ok := connection.(*net.TCPConn); ok { if tcpConn, ok := connection.(*net.TCPConn); ok {
tcpConn.CloseWrite() tcpConn.CloseWrite()
} }
<-readFinish readFinish.Lock()
return nil return nil
} }
func handleInput(request *protocol.VMessRequest, reader io.Reader, input chan<- []byte, finish chan<- bool) { func handleInput(request *protocol.VMessRequest, reader io.Reader, input chan<- []byte, finish sync.Mutex) {
defer close(input) defer close(input)
defer close(finish) defer finish.Unlock()
requestReader, err := v2io.NewAesDecryptReader(request.RequestKey[:], request.RequestIV[:], reader) requestReader, err := v2io.NewAesDecryptReader(request.RequestKey[:], request.RequestIV[:], reader)
if err != nil { if err != nil {
@ -124,9 +125,9 @@ func handleInput(request *protocol.VMessRequest, reader io.Reader, input chan<-
v2net.ReaderToChan(input, requestReader) v2net.ReaderToChan(input, requestReader)
} }
func handleOutput(request *protocol.VMessRequest, writer io.Writer, output <-chan []byte, finish chan<- bool) { func handleOutput(request *protocol.VMessRequest, writer io.Writer, output <-chan []byte, finish sync.Mutex) {
v2net.ChanToWriter(writer, output) v2net.ChanToWriter(writer, output)
close(finish) finish.Unlock()
} }
type VMessInboundHandlerFactory struct { type VMessInboundHandlerFactory struct {

View File

@ -6,6 +6,7 @@ import (
"crypto/rand" "crypto/rand"
mrand "math/rand" mrand "math/rand"
"net" "net"
"sync"
"github.com/v2ray/v2ray-core" "github.com/v2ray/v2ray-core"
v2io "github.com/v2ray/v2ray-core/common/io" v2io "github.com/v2ray/v2ray-core/common/io"
@ -109,21 +110,21 @@ func startCommunicate(request *protocol.VMessRequest, dest v2net.Destination, ra
input := ray.OutboundInput() input := ray.OutboundInput()
output := ray.OutboundOutput() output := ray.OutboundOutput()
var requestFinish, responseFinish sync.Mutex
requestFinish := make(chan bool) requestFinish.Lock()
responseFinish := make(chan bool) responseFinish.Lock()
go handleRequest(conn, request, input, requestFinish) go handleRequest(conn, request, input, requestFinish)
go handleResponse(conn, request, output, responseFinish) go handleResponse(conn, request, output, responseFinish)
<-requestFinish requestFinish.Lock()
conn.CloseWrite() conn.CloseWrite()
<-responseFinish responseFinish.Lock()
return nil return nil
} }
func handleRequest(conn *net.TCPConn, request *protocol.VMessRequest, input <-chan []byte, finish chan<- bool) { func handleRequest(conn *net.TCPConn, request *protocol.VMessRequest, input <-chan []byte, finish sync.Mutex) {
defer close(finish) defer finish.Unlock()
encryptRequestWriter, err := v2io.NewAesEncryptWriter(request.RequestKey[:], request.RequestIV[:], conn) encryptRequestWriter, err := v2io.NewAesEncryptWriter(request.RequestKey[:], request.RequestIV[:], conn)
if err != nil { if err != nil {
log.Error("VMessOut: Failed to create encrypt writer: %v", err) log.Error("VMessOut: Failed to create encrypt writer: %v", err)
@ -154,8 +155,8 @@ func handleRequest(conn *net.TCPConn, request *protocol.VMessRequest, input <-ch
return return
} }
func handleResponse(conn *net.TCPConn, request *protocol.VMessRequest, output chan<- []byte, finish chan<- bool) { func handleResponse(conn *net.TCPConn, request *protocol.VMessRequest, output chan<- []byte, finish sync.Mutex) {
defer close(finish) defer finish.Unlock()
defer close(output) defer close(output)
responseKey := md5.Sum(request.RequestKey[:]) responseKey := md5.Sum(request.RequestKey[:])
responseIV := md5.Sum(request.RequestIV[:]) responseIV := md5.Sum(request.RequestIV[:])