package kcp import ( "io" "net" "sync" "sync/atomic" "time" "v2ray.com/core/common/errors" "v2ray.com/core/common/log" "v2ray.com/core/common/predicate" "v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet/internal" ) var ( ErrIOTimeout = errors.New("Read/Write timeout") ErrClosedListener = errors.New("Listener closed.") ErrClosedConnection = errors.New("Connection closed.") ) type State int32 func (v State) Is(states ...State) bool { for _, state := range states { if v == state { return true } } return false } const ( StateActive State = 0 StateReadyToClose State = 1 StatePeerClosed State = 2 StateTerminating State = 3 StatePeerTerminating State = 4 StateTerminated State = 5 ) const ( headerSize uint32 = 2 ) func nowMillisec() int64 { now := time.Now() return now.Unix()*1000 + int64(now.Nanosecond()/1000000) } type RoundTripInfo struct { sync.RWMutex variation uint32 srtt uint32 rto uint32 minRtt uint32 updatedTimestamp uint32 } func (v *RoundTripInfo) UpdatePeerRTO(rto uint32, current uint32) { v.Lock() defer v.Unlock() if current-v.updatedTimestamp < 3000 { return } v.updatedTimestamp = current v.rto = rto } func (v *RoundTripInfo) Update(rtt uint32, current uint32) { if rtt > 0x7FFFFFFF { return } v.Lock() defer v.Unlock() // https://tools.ietf.org/html/rfc6298 if v.srtt == 0 { v.srtt = rtt v.variation = rtt / 2 } else { delta := rtt - v.srtt if v.srtt > rtt { delta = v.srtt - rtt } v.variation = (3*v.variation + delta) / 4 v.srtt = (7*v.srtt + rtt) / 8 if v.srtt < v.minRtt { v.srtt = v.minRtt } } var rto uint32 if v.minRtt < 4*v.variation { rto = v.srtt + 4*v.variation } else { rto = v.srtt + v.variation } if rto > 10000 { rto = 10000 } v.rto = rto * 5 / 4 v.updatedTimestamp = current } func (v *RoundTripInfo) Timeout() uint32 { v.RLock() defer v.RUnlock() return v.rto } func (v *RoundTripInfo) SmoothedTime() uint32 { v.RLock() defer v.RUnlock() return v.srtt } type Updater struct { interval time.Duration shouldContinue predicate.Predicate shouldTerminate predicate.Predicate updateFunc func() notifier chan bool } func NewUpdater(interval uint32, shouldContinue predicate.Predicate, shouldTerminate predicate.Predicate, updateFunc func()) *Updater { u := &Updater{ interval: time.Duration(interval) * time.Millisecond, shouldContinue: shouldContinue, shouldTerminate: shouldTerminate, updateFunc: updateFunc, notifier: make(chan bool, 1), } go u.Run() return u } func (v *Updater) WakeUp() { select { case v.notifier <- true: default: } } func (v *Updater) Run() { for <-v.notifier { if v.shouldTerminate() { return } for v.shouldContinue() { v.updateFunc() time.Sleep(v.interval) } } } type SystemConnection interface { net.Conn Id() internal.ConnectionId Reset(internet.Authenticator, func([]byte)) } // Connection is a KCP connection over UDP. type Connection struct { conn SystemConnection connRecycler internal.ConnectionRecyler block internet.Authenticator rd time.Time wd time.Time // write deadline since int64 dataInputCond *sync.Cond dataOutputCond *sync.Cond Config *Config conv uint16 state State stateBeginTime uint32 lastIncomingTime uint32 lastPingTime uint32 mss uint32 roundTrip *RoundTripInfo receivingWorker *ReceivingWorker sendingWorker *SendingWorker output *BufferedSegmentWriter dataUpdater *Updater pingUpdater *Updater reusable bool } // NewConnection create a new KCP connection between local and remote. func NewConnection(conv uint16, sysConn SystemConnection, recycler internal.ConnectionRecyler, block internet.Authenticator, config *Config) *Connection { log.Info("KCP|Connection: creating connection ", conv) authWriter := &AuthenticationWriter{ Authenticator: block, Writer: sysConn, Config: config, } conn := &Connection{ conv: conv, conn: sysConn, connRecycler: recycler, block: block, since: nowMillisec(), dataInputCond: sync.NewCond(new(sync.Mutex)), dataOutputCond: sync.NewCond(new(sync.Mutex)), Config: config, output: NewSegmentWriter(authWriter), mss: authWriter.Mtu() - DataSegmentOverhead, roundTrip: &RoundTripInfo{ rto: 100, minRtt: config.Tti.GetValue(), }, } sysConn.Reset(block, conn.Input) conn.receivingWorker = NewReceivingWorker(conn) conn.sendingWorker = NewSendingWorker(conn) isTerminating := func() bool { return conn.State().Is(StateTerminating, StateTerminated) } isTerminated := func() bool { return conn.State() == StateTerminated } conn.dataUpdater = NewUpdater( config.Tti.GetValue(), predicate.Not(isTerminating).And(predicate.Any(conn.sendingWorker.UpdateNecessary, conn.receivingWorker.UpdateNecessary)), isTerminating, conn.updateTask) conn.pingUpdater = NewUpdater( 5000, // 5 seconds predicate.Not(isTerminated), isTerminated, conn.updateTask) conn.pingUpdater.WakeUp() return conn } func (v *Connection) Elapsed() uint32 { return uint32(nowMillisec() - v.since) } // Read implements the Conn Read method. func (v *Connection) Read(b []byte) (int, error) { if v == nil { return 0, io.EOF } for { if v.State().Is(StateReadyToClose, StateTerminating, StateTerminated) { return 0, io.EOF } nBytes := v.receivingWorker.Read(b) if nBytes > 0 { return nBytes, nil } if v.State() == StatePeerTerminating { return 0, io.EOF } var timer *time.Timer if !v.rd.IsZero() { duration := v.rd.Sub(time.Now()) if duration <= 0 { return 0, ErrIOTimeout } timer = time.AfterFunc(duration, v.dataInputCond.Signal) } v.dataInputCond.L.Lock() v.dataInputCond.Wait() v.dataInputCond.L.Unlock() if timer != nil { timer.Stop() } if !v.rd.IsZero() && v.rd.Before(time.Now()) { return 0, ErrIOTimeout } } } // Write implements the Conn Write method. func (v *Connection) Write(b []byte) (int, error) { totalWritten := 0 for { if v == nil || v.State() != StateActive { return totalWritten, io.ErrClosedPipe } nBytes := v.sendingWorker.Push(b[totalWritten:]) v.dataUpdater.WakeUp() if nBytes > 0 { totalWritten += nBytes if totalWritten == len(b) { return totalWritten, nil } } var timer *time.Timer if !v.wd.IsZero() { duration := v.wd.Sub(time.Now()) if duration <= 0 { return totalWritten, ErrIOTimeout } timer = time.AfterFunc(duration, v.dataOutputCond.Signal) } v.dataOutputCond.L.Lock() v.dataOutputCond.Wait() v.dataOutputCond.L.Unlock() if timer != nil { timer.Stop() } if !v.wd.IsZero() && v.wd.Before(time.Now()) { return totalWritten, ErrIOTimeout } } } func (v *Connection) SetState(state State) { current := v.Elapsed() atomic.StoreInt32((*int32)(&v.state), int32(state)) atomic.StoreUint32(&v.stateBeginTime, current) log.Debug("KCP|Connection: #", v.conv, " entering state ", state, " at ", current) switch state { case StateReadyToClose: v.receivingWorker.CloseRead() case StatePeerClosed: v.sendingWorker.CloseWrite() case StateTerminating: v.receivingWorker.CloseRead() v.sendingWorker.CloseWrite() v.pingUpdater.interval = time.Second case StatePeerTerminating: v.sendingWorker.CloseWrite() v.pingUpdater.interval = time.Second case StateTerminated: v.receivingWorker.CloseRead() v.sendingWorker.CloseWrite() v.pingUpdater.interval = time.Second v.dataUpdater.WakeUp() v.pingUpdater.WakeUp() go v.Terminate() } } // Close closes the connection. func (v *Connection) Close() error { if v == nil { return ErrClosedConnection } v.dataInputCond.Broadcast() v.dataOutputCond.Broadcast() state := v.State() if state.Is(StateReadyToClose, StateTerminating, StateTerminated) { return ErrClosedConnection } log.Info("KCP|Connection: Closing connection to ", v.conn.RemoteAddr()) if state == StateActive { v.SetState(StateReadyToClose) } if state == StatePeerClosed { v.SetState(StateTerminating) } if state == StatePeerTerminating { v.SetState(StateTerminated) } return nil } // LocalAddr returns the local network address. The Addr returned is shared by all invocations of LocalAddr, so do not modify it. func (v *Connection) LocalAddr() net.Addr { if v == nil { return nil } return v.conn.LocalAddr() } // RemoteAddr returns the remote network address. The Addr returned is shared by all invocations of RemoteAddr, so do not modify it. func (v *Connection) RemoteAddr() net.Addr { if v == nil { return nil } return v.conn.RemoteAddr() } // SetDeadline sets the deadline associated with the listener. A zero time value disables the deadline. func (v *Connection) SetDeadline(t time.Time) error { if err := v.SetReadDeadline(t); err != nil { return err } if err := v.SetWriteDeadline(t); err != nil { return err } return nil } // SetReadDeadline implements the Conn SetReadDeadline method. func (v *Connection) SetReadDeadline(t time.Time) error { if v == nil || v.State() != StateActive { return ErrClosedConnection } v.rd = t return nil } // SetWriteDeadline implements the Conn SetWriteDeadline method. func (v *Connection) SetWriteDeadline(t time.Time) error { if v == nil || v.State() != StateActive { return ErrClosedConnection } v.wd = t return nil } // kcp update, input loop func (v *Connection) updateTask() { v.flush() } func (v *Connection) Reusable() bool { return v.Config.ConnectionReuse.IsEnabled() && v.reusable } func (v *Connection) SetReusable(b bool) { v.reusable = b } func (v *Connection) Terminate() { if v == nil { return } log.Info("KCP|Connection: Terminating connection to ", v.RemoteAddr()) //v.SetState(StateTerminated) v.dataInputCond.Broadcast() v.dataOutputCond.Broadcast() if v.Config.ConnectionReuse.IsEnabled() && v.reusable { v.connRecycler.Put(v.conn.Id(), v.conn) } else { v.conn.Close() } v.sendingWorker.Release() v.receivingWorker.Release() } func (v *Connection) HandleOption(opt SegmentOption) { if (opt & SegmentOptionClose) == SegmentOptionClose { v.OnPeerClosed() } } func (v *Connection) OnPeerClosed() { state := v.State() if state == StateReadyToClose { v.SetState(StateTerminating) } if state == StateActive { v.SetState(StatePeerClosed) } } // Input when you received a low level packet (eg. UDP packet), call it func (v *Connection) Input(data []byte) { current := v.Elapsed() atomic.StoreUint32(&v.lastIncomingTime, current) var seg Segment for { seg, data = ReadSegment(data) if seg == nil { break } if seg.Conversation() != v.conv { return } switch seg := seg.(type) { case *DataSegment: v.HandleOption(seg.Option) v.receivingWorker.ProcessSegment(seg) v.dataInputCond.Signal() v.dataUpdater.WakeUp() case *AckSegment: v.HandleOption(seg.Option) v.sendingWorker.ProcessSegment(current, seg, v.roundTrip.Timeout()) v.dataOutputCond.Signal() v.dataUpdater.WakeUp() case *CmdOnlySegment: v.HandleOption(seg.Option) if seg.Command == CommandTerminate { state := v.State() if state == StateActive || state == StatePeerClosed { v.SetState(StatePeerTerminating) } else if state == StateReadyToClose { v.SetState(StateTerminating) } else if state == StateTerminating { v.SetState(StateTerminated) } } v.sendingWorker.ProcessReceivingNext(seg.ReceivinNext) v.receivingWorker.ProcessSendingNext(seg.SendingNext) v.roundTrip.UpdatePeerRTO(seg.PeerRTO, current) seg.Release() default: } } } func (v *Connection) flush() { current := v.Elapsed() if v.State() == StateTerminated { return } if v.State() == StateActive && current-atomic.LoadUint32(&v.lastIncomingTime) >= 30000 { v.Close() } if v.State() == StateReadyToClose && v.sendingWorker.IsEmpty() { v.SetState(StateTerminating) } if v.State() == StateTerminating { log.Debug("KCP|Connection: #", v.conv, " sending terminating cmd.") v.Ping(current, CommandTerminate) v.output.Flush() if current-atomic.LoadUint32(&v.stateBeginTime) > 8000 { v.SetState(StateTerminated) } return } if v.State() == StatePeerTerminating && current-atomic.LoadUint32(&v.stateBeginTime) > 4000 { v.SetState(StateTerminating) } if v.State() == StateReadyToClose && current-atomic.LoadUint32(&v.stateBeginTime) > 15000 { v.SetState(StateTerminating) } // flush acknowledges v.receivingWorker.Flush(current) v.sendingWorker.Flush(current) if current-atomic.LoadUint32(&v.lastPingTime) >= 3000 { v.Ping(current, CommandPing) } // flash remain segments v.output.Flush() } func (v *Connection) State() State { return State(atomic.LoadInt32((*int32)(&v.state))) } func (v *Connection) Ping(current uint32, cmd Command) { seg := NewCmdOnlySegment() seg.Conv = v.conv seg.Command = cmd seg.ReceivinNext = v.receivingWorker.nextNumber seg.SendingNext = v.sendingWorker.firstUnacknowledged seg.PeerRTO = v.roundTrip.Timeout() if v.State() == StateReadyToClose { seg.Option = SegmentOptionClose } v.output.Write(seg) atomic.StoreUint32(&v.lastPingTime, current) seg.Release() }