mirror of
https://github.com/v2fly/v2ray-core.git
synced 2024-12-31 06:26:53 -05:00
265 lines
7.1 KiB
Go
265 lines
7.1 KiB
Go
package quic
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"fmt"
|
|
"net"
|
|
"sync"
|
|
"time"
|
|
|
|
"v2ray.com/core/external/github.com/lucas-clemente/quic-go/internal/protocol"
|
|
"v2ray.com/core/external/github.com/lucas-clemente/quic-go/internal/utils"
|
|
"v2ray.com/core/external/github.com/lucas-clemente/quic-go/internal/wire"
|
|
)
|
|
|
|
type packetHandlerEntry struct {
|
|
handler packetHandler
|
|
resetToken *[16]byte
|
|
}
|
|
|
|
// The packetHandlerMap stores packetHandlers, identified by connection ID.
|
|
// It is used:
|
|
// * by the server to store sessions
|
|
// * when multiplexing outgoing connections to store clients
|
|
type packetHandlerMap struct {
|
|
mutex sync.RWMutex
|
|
|
|
conn net.PacketConn
|
|
connIDLen int
|
|
|
|
handlers map[string] /* string(ConnectionID)*/ packetHandlerEntry
|
|
resetTokens map[[16]byte] /* stateless reset token */ packetHandler
|
|
server unknownPacketHandler
|
|
closed bool
|
|
|
|
deleteRetiredSessionsAfter time.Duration
|
|
|
|
logger utils.Logger
|
|
}
|
|
|
|
var _ packetHandlerManager = &packetHandlerMap{}
|
|
|
|
func newPacketHandlerMap(conn net.PacketConn, connIDLen int, logger utils.Logger) packetHandlerManager {
|
|
m := &packetHandlerMap{
|
|
conn: conn,
|
|
connIDLen: connIDLen,
|
|
handlers: make(map[string]packetHandlerEntry),
|
|
resetTokens: make(map[[16]byte]packetHandler),
|
|
deleteRetiredSessionsAfter: protocol.RetiredConnectionIDDeleteTimeout,
|
|
logger: logger,
|
|
}
|
|
go m.listen()
|
|
return m
|
|
}
|
|
|
|
func (h *packetHandlerMap) Add(id protocol.ConnectionID, handler packetHandler) {
|
|
h.mutex.Lock()
|
|
h.handlers[string(id)] = packetHandlerEntry{handler: handler}
|
|
h.mutex.Unlock()
|
|
}
|
|
|
|
func (h *packetHandlerMap) AddWithResetToken(id protocol.ConnectionID, handler packetHandler, token [16]byte) {
|
|
h.mutex.Lock()
|
|
h.handlers[string(id)] = packetHandlerEntry{handler: handler, resetToken: &token}
|
|
h.resetTokens[token] = handler
|
|
h.mutex.Unlock()
|
|
}
|
|
|
|
func (h *packetHandlerMap) Remove(id protocol.ConnectionID) {
|
|
h.removeByConnectionIDAsString(string(id))
|
|
}
|
|
|
|
func (h *packetHandlerMap) removeByConnectionIDAsString(id string) {
|
|
h.mutex.Lock()
|
|
if handlerEntry, ok := h.handlers[id]; ok {
|
|
if token := handlerEntry.resetToken; token != nil {
|
|
delete(h.resetTokens, *token)
|
|
}
|
|
delete(h.handlers, id)
|
|
}
|
|
h.mutex.Unlock()
|
|
}
|
|
|
|
func (h *packetHandlerMap) Retire(id protocol.ConnectionID) {
|
|
h.retireByConnectionIDAsString(string(id))
|
|
}
|
|
|
|
func (h *packetHandlerMap) retireByConnectionIDAsString(id string) {
|
|
time.AfterFunc(h.deleteRetiredSessionsAfter, func() {
|
|
h.removeByConnectionIDAsString(id)
|
|
})
|
|
}
|
|
|
|
func (h *packetHandlerMap) SetServer(s unknownPacketHandler) {
|
|
h.mutex.Lock()
|
|
h.server = s
|
|
h.mutex.Unlock()
|
|
}
|
|
|
|
func (h *packetHandlerMap) CloseServer() {
|
|
h.mutex.Lock()
|
|
h.server = nil
|
|
var wg sync.WaitGroup
|
|
for id, handlerEntry := range h.handlers {
|
|
handler := handlerEntry.handler
|
|
if handler.GetPerspective() == protocol.PerspectiveServer {
|
|
wg.Add(1)
|
|
go func(id string, handler packetHandler) {
|
|
// session.Close() blocks until the CONNECTION_CLOSE has been sent and the run-loop has stopped
|
|
_ = handler.Close()
|
|
h.retireByConnectionIDAsString(id)
|
|
wg.Done()
|
|
}(id, handler)
|
|
}
|
|
}
|
|
h.mutex.Unlock()
|
|
wg.Wait()
|
|
}
|
|
|
|
func (h *packetHandlerMap) close(e error) error {
|
|
h.mutex.Lock()
|
|
if h.closed {
|
|
h.mutex.Unlock()
|
|
return nil
|
|
}
|
|
h.closed = true
|
|
|
|
var wg sync.WaitGroup
|
|
for _, handlerEntry := range h.handlers {
|
|
wg.Add(1)
|
|
go func(handlerEntry packetHandlerEntry) {
|
|
handlerEntry.handler.destroy(e)
|
|
wg.Done()
|
|
}(handlerEntry)
|
|
}
|
|
|
|
if h.server != nil {
|
|
h.server.closeWithError(e)
|
|
}
|
|
h.mutex.Unlock()
|
|
wg.Wait()
|
|
return getMultiplexer().RemoveConn(h.conn)
|
|
}
|
|
|
|
func (h *packetHandlerMap) listen() {
|
|
for {
|
|
buffer := getPacketBuffer()
|
|
data := buffer.Slice
|
|
// The packet size should not exceed protocol.MaxReceivePacketSize bytes
|
|
// If it does, we only read a truncated packet, which will then end up undecryptable
|
|
n, addr, err := h.conn.ReadFrom(data)
|
|
if err != nil {
|
|
h.close(err)
|
|
return
|
|
}
|
|
h.handlePacket(addr, buffer, data[:n])
|
|
}
|
|
}
|
|
|
|
func (h *packetHandlerMap) handlePacket(
|
|
addr net.Addr,
|
|
buffer *packetBuffer,
|
|
data []byte,
|
|
) {
|
|
packets, err := h.parsePacket(addr, buffer, data)
|
|
if err != nil {
|
|
h.logger.Debugf("error parsing packets from %s: %s", addr, err)
|
|
// This is just the error from parsing the last packet.
|
|
// We still need to process the packets that were successfully parsed before.
|
|
}
|
|
if len(packets) == 0 {
|
|
buffer.Release()
|
|
return
|
|
}
|
|
h.handleParsedPackets(packets)
|
|
}
|
|
|
|
func (h *packetHandlerMap) parsePacket(
|
|
addr net.Addr,
|
|
buffer *packetBuffer,
|
|
data []byte,
|
|
) ([]*receivedPacket, error) {
|
|
rcvTime := time.Now()
|
|
packets := make([]*receivedPacket, 0, 1)
|
|
|
|
var counter int
|
|
var lastConnID protocol.ConnectionID
|
|
for len(data) > 0 {
|
|
hdr, err := wire.ParseHeader(bytes.NewReader(data), h.connIDLen)
|
|
// drop the packet if we can't parse the header
|
|
if err != nil {
|
|
return packets, fmt.Errorf("error parsing header: %s", err)
|
|
}
|
|
if counter > 0 && !hdr.DestConnectionID.Equal(lastConnID) {
|
|
return packets, fmt.Errorf("coalesced packet has different destination connection ID: %s, expected %s", hdr.DestConnectionID, lastConnID)
|
|
}
|
|
lastConnID = hdr.DestConnectionID
|
|
|
|
var rest []byte
|
|
if hdr.IsLongHeader {
|
|
if protocol.ByteCount(len(data)) < hdr.ParsedLen()+hdr.Length {
|
|
return packets, fmt.Errorf("packet length (%d bytes) is smaller than the expected length (%d bytes)", len(data)-int(hdr.ParsedLen()), hdr.Length)
|
|
}
|
|
packetLen := int(hdr.ParsedLen() + hdr.Length)
|
|
rest = data[packetLen:]
|
|
data = data[:packetLen]
|
|
}
|
|
|
|
if counter > 0 {
|
|
buffer.Split()
|
|
}
|
|
counter++
|
|
packets = append(packets, &receivedPacket{
|
|
remoteAddr: addr,
|
|
hdr: hdr,
|
|
rcvTime: rcvTime,
|
|
data: data,
|
|
buffer: buffer,
|
|
})
|
|
|
|
// only log if this actually a coalesced packet
|
|
if h.logger.Debug() && (counter > 1 || len(rest) > 0) {
|
|
h.logger.Debugf("Parsed a coalesced packet. Part %d: %d bytes. Remaining: %d bytes.", counter, len(packets[counter-1].data), len(rest))
|
|
}
|
|
|
|
data = rest
|
|
}
|
|
return packets, nil
|
|
}
|
|
|
|
func (h *packetHandlerMap) handleParsedPackets(packets []*receivedPacket) {
|
|
h.mutex.RLock()
|
|
defer h.mutex.RUnlock()
|
|
|
|
// coalesced packets all have the same destination connection ID
|
|
handlerEntry, handlerFound := h.handlers[string(packets[0].hdr.DestConnectionID)]
|
|
|
|
for _, p := range packets {
|
|
if handlerFound { // existing session
|
|
handlerEntry.handler.handlePacket(p)
|
|
continue
|
|
}
|
|
// No session found.
|
|
// This might be a stateless reset.
|
|
if !p.hdr.IsLongHeader {
|
|
if len(p.data) >= protocol.MinStatelessResetSize {
|
|
var token [16]byte
|
|
copy(token[:], p.data[len(p.data)-16:])
|
|
if sess, ok := h.resetTokens[token]; ok {
|
|
sess.destroy(errors.New("received a stateless reset"))
|
|
continue
|
|
}
|
|
}
|
|
// TODO(#943): send a stateless reset
|
|
h.logger.Debugf("received a short header packet with an unexpected connection ID %s", p.hdr.DestConnectionID)
|
|
break // a short header packet is always the last in a coalesced packet
|
|
}
|
|
if h.server == nil { // no server set
|
|
h.logger.Debugf("received a packet with an unexpected connection ID %s", p.hdr.DestConnectionID)
|
|
continue
|
|
}
|
|
h.server.handlePacket(p)
|
|
}
|
|
}
|