mirror of
https://github.com/v2fly/v2ray-core.git
synced 2025-01-10 19:36:32 -05:00
175 lines
4.4 KiB
Go
175 lines
4.4 KiB
Go
package simple
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/rand"
|
|
"io"
|
|
"time"
|
|
|
|
"github.com/v2fly/v2ray-core/v5/common"
|
|
|
|
"github.com/v2fly/v2ray-core/v5/transport/internet/request"
|
|
)
|
|
|
|
func newClient(config *ClientConfig) request.SessionAssemblerClient {
|
|
return &simpleAssemblerClient{config: config}
|
|
}
|
|
|
|
type simpleAssemblerClient struct {
|
|
assembly request.TransportClientAssembly
|
|
config *ClientConfig
|
|
}
|
|
|
|
func (s *simpleAssemblerClient) OnTransportClientAssemblyReady(assembly request.TransportClientAssembly) {
|
|
s.assembly = assembly
|
|
}
|
|
|
|
func (s *simpleAssemblerClient) NewSession(ctx context.Context, opts ...request.SessionOption) (request.Session, error) {
|
|
sessionID := make([]byte, 16)
|
|
_, err := io.ReadFull(rand.Reader, sessionID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
sessionContext, finish := context.WithCancel(ctx)
|
|
session := &simpleAssemblerClientSession{
|
|
sessionID: sessionID, tripper: s.assembly.Tripper(), readBuffer: bytes.NewBuffer(nil),
|
|
ctx: sessionContext, finish: finish, writerChan: make(chan []byte), readerChan: make(chan []byte, 16), assembler: s,
|
|
}
|
|
go session.keepRunning()
|
|
return session, nil
|
|
}
|
|
|
|
type simpleAssemblerClientSession struct {
|
|
sessionID []byte
|
|
currentWriteWait int
|
|
|
|
assembler *simpleAssemblerClient
|
|
tripper request.Tripper
|
|
readBuffer *bytes.Buffer
|
|
writerChan chan []byte
|
|
readerChan chan []byte
|
|
ctx context.Context
|
|
finish func()
|
|
}
|
|
|
|
func (s *simpleAssemblerClientSession) keepRunning() {
|
|
s.currentWriteWait = int(s.assembler.config.InitialPollingIntervalMs)
|
|
for s.ctx.Err() == nil {
|
|
s.runOnce()
|
|
}
|
|
}
|
|
|
|
func (s *simpleAssemblerClientSession) runOnce() {
|
|
sendBuffer := bytes.NewBuffer(nil)
|
|
if s.currentWriteWait != 0 {
|
|
waitTimer := time.NewTimer(time.Millisecond * time.Duration(s.currentWriteWait))
|
|
waitForFirstWrite := true
|
|
copyFromWriterLoop:
|
|
for {
|
|
select {
|
|
case <-s.ctx.Done():
|
|
return
|
|
case data := <-s.writerChan:
|
|
sendBuffer.Write(data)
|
|
if sendBuffer.Len() >= int(s.assembler.config.MaxWriteSize) {
|
|
break copyFromWriterLoop
|
|
}
|
|
if waitForFirstWrite {
|
|
waitForFirstWrite = false
|
|
waitTimer.Reset(time.Millisecond * time.Duration(s.assembler.config.WaitSubsequentWriteMs))
|
|
}
|
|
case <-waitTimer.C:
|
|
break copyFromWriterLoop
|
|
}
|
|
}
|
|
waitTimer.Stop()
|
|
}
|
|
|
|
firstRound := true
|
|
pollConnection := true
|
|
for sendBuffer.Len() != 0 || firstRound {
|
|
firstRound = false
|
|
sendAmount := sendBuffer.Len()
|
|
if sendAmount > int(s.assembler.config.MaxWriteSize) {
|
|
sendAmount = int(s.assembler.config.MaxWriteSize)
|
|
}
|
|
data := sendBuffer.Next(sendAmount)
|
|
if len(data) != 0 {
|
|
pollConnection = false
|
|
}
|
|
for {
|
|
resp, err := s.tripper.RoundTrip(s.ctx, request.Request{Data: data, ConnectionTag: s.sessionID})
|
|
if err != nil {
|
|
newError("failed to send data").Base(err).WriteToLog()
|
|
if s.ctx.Err() != nil {
|
|
return
|
|
}
|
|
time.Sleep(time.Millisecond * time.Duration(s.assembler.config.FailedRetryIntervalMs))
|
|
continue
|
|
}
|
|
if len(resp.Data) != 0 {
|
|
s.readerChan <- resp.Data
|
|
}
|
|
if len(resp.Data) != 0 {
|
|
pollConnection = false
|
|
}
|
|
break
|
|
}
|
|
}
|
|
if pollConnection {
|
|
s.currentWriteWait = int(s.assembler.config.BackoffFactor * float32(s.currentWriteWait))
|
|
if s.currentWriteWait > int(s.assembler.config.MaxPollingIntervalMs) {
|
|
s.currentWriteWait = int(s.assembler.config.MaxPollingIntervalMs)
|
|
}
|
|
if s.currentWriteWait < int(s.assembler.config.MinPollingIntervalMs) {
|
|
s.currentWriteWait = int(s.assembler.config.MinPollingIntervalMs)
|
|
}
|
|
} else {
|
|
s.currentWriteWait = int(0)
|
|
}
|
|
}
|
|
|
|
func (s *simpleAssemblerClientSession) Read(p []byte) (n int, err error) {
|
|
if s.readBuffer.Len() == 0 {
|
|
select {
|
|
case <-s.ctx.Done():
|
|
return 0, s.ctx.Err()
|
|
case data := <-s.readerChan:
|
|
s.readBuffer.Write(data)
|
|
}
|
|
}
|
|
n, err = s.readBuffer.Read(p)
|
|
if err == io.EOF {
|
|
s.readBuffer.Reset()
|
|
return 0, nil
|
|
}
|
|
return
|
|
}
|
|
|
|
func (s *simpleAssemblerClientSession) Write(p []byte) (n int, err error) {
|
|
buf := make([]byte, len(p))
|
|
copy(buf, p)
|
|
select {
|
|
case <-s.ctx.Done():
|
|
return 0, s.ctx.Err()
|
|
case s.writerChan <- buf:
|
|
return len(p), nil
|
|
}
|
|
}
|
|
|
|
func (s *simpleAssemblerClientSession) Close() error {
|
|
s.finish()
|
|
return nil
|
|
}
|
|
|
|
func init() {
|
|
common.Must(common.RegisterConfig((*ClientConfig)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
|
|
clientConfig, ok := config.(*ClientConfig)
|
|
if !ok {
|
|
return nil, newError("not a ClientConfig")
|
|
}
|
|
return newClient(clientConfig), nil
|
|
}))
|
|
}
|