diff --git a/testing/scenarios/config/httpupgrade_earlydataShortEarlyData_client.json b/testing/scenarios/config/httpupgrade_earlydataShortEarlyData_client.json new file mode 100644 index 000000000..acaf4be88 --- /dev/null +++ b/testing/scenarios/config/httpupgrade_earlydataShortEarlyData_client.json @@ -0,0 +1,48 @@ +{ + "log": { + "error": { + "level": "Debug", + "type": "Console" + }, + "access": { + "type": "None" + } + }, + "outbounds": [ + { + "protocol": "vmess", + "settings": { + "address": "127.0.0.1", + "port": 17793, + "uuid": "bcc71618-e552-42c2-a2a3-d4c17a9df764" + }, + "streamSettings": { + "transport": "httpupgrade", + "transportSettings": { + "path": "b66efc0c7752", + "maxEarlyData": 32, + "earlyDataHeaderName": "Sec-Websocket-Key" + }, + "security": "tls", + "securitySettings": { + "pinnedPeerCertificateChainSha256": [ + "kqHyvea27Pn+JiSqA72lhu9IKAKeGR+3yCyA8JR1mug=" + ], + "allowInsecureIfPinnedPeerCertificate": true + } + } + } + ], + "inbounds": [ + { + "protocol": "socks", + "settings": { + "udpEnabled": false, + "address": "127.0.0.1", + "packetEncoding": "Packet" + }, + "port": 17794, + "listen": "127.0.0.1" + } + ] +} diff --git a/testing/scenarios/config/httpupgrade_earlydataShortEarlyData_server.json b/testing/scenarios/config/httpupgrade_earlydataShortEarlyData_server.json new file mode 100644 index 000000000..083adf30c --- /dev/null +++ b/testing/scenarios/config/httpupgrade_earlydataShortEarlyData_server.json @@ -0,0 +1,46 @@ +{ + "log": { + "error": { + "level": "Debug", + "type": "Console" + }, + "access": { + "type": "None" + } + }, + "outbounds": [ + { + "protocol": "freedom" + } + ], + "inbounds": [ + { + "listen": "127.0.0.1", + "port": 17793, + "protocol": "vmess", + "settings": { + "users": [ + "bcc71618-e552-42c2-a2a3-d4c17a9df764" + ] + }, + "streamSettings": { + "transport": "httpupgrade", + "transportSettings": { + "path": "b66efc0c7752", + "maxEarlyData": 32, + "earlyDataHeaderName": "Sec-Websocket-Key" + }, + "security": "tls", + "securitySettings": { + "certificate": [ + { + "usage": "ENCIPHERMENT", + "certificateFile": "cert/self-signed_cert.pem", + "keyFile": "cert/self-signed_key.pem" + } + ] + } + } + } + ] +} diff --git a/testing/scenarios/config/httpupgrade_earlydata_client.json b/testing/scenarios/config/httpupgrade_earlydata_client.json new file mode 100644 index 000000000..2e34ccce2 --- /dev/null +++ b/testing/scenarios/config/httpupgrade_earlydata_client.json @@ -0,0 +1,48 @@ +{ + "log": { + "error": { + "level": "Debug", + "type": "Console" + }, + "access": { + "type": "None" + } + }, + "outbounds": [ + { + "protocol": "vmess", + "settings": { + "address": "127.0.0.1", + "port": 17793, + "uuid": "bcc71618-e552-42c2-a2a3-d4c17a9df764" + }, + "streamSettings": { + "transport": "httpupgrade", + "transportSettings": { + "path": "b66efc0c7752", + "maxEarlyData": 2048, + "earlyDataHeaderName": "Sec-Websocket-Key" + }, + "security": "tls", + "securitySettings": { + "pinnedPeerCertificateChainSha256": [ + "kqHyvea27Pn+JiSqA72lhu9IKAKeGR+3yCyA8JR1mug=" + ], + "allowInsecureIfPinnedPeerCertificate": true + } + } + } + ], + "inbounds": [ + { + "protocol": "socks", + "settings": { + "udpEnabled": false, + "address": "127.0.0.1", + "packetEncoding": "Packet" + }, + "port": 17794, + "listen": "127.0.0.1" + } + ] +} diff --git a/testing/scenarios/config/httpupgrade_earlydata_server.json b/testing/scenarios/config/httpupgrade_earlydata_server.json new file mode 100644 index 000000000..dde2515ec --- /dev/null +++ b/testing/scenarios/config/httpupgrade_earlydata_server.json @@ -0,0 +1,46 @@ +{ + "log": { + "error": { + "level": "Debug", + "type": "Console" + }, + "access": { + "type": "None" + } + }, + "outbounds": [ + { + "protocol": "freedom" + } + ], + "inbounds": [ + { + "listen": "127.0.0.1", + "port": 17793, + "protocol": "vmess", + "settings": { + "users": [ + "bcc71618-e552-42c2-a2a3-d4c17a9df764" + ] + }, + "streamSettings": { + "transport": "httpupgrade", + "transportSettings": { + "path": "b66efc0c7752", + "maxEarlyData": 2048, + "earlyDataHeaderName": "Sec-Websocket-Key" + }, + "security": "tls", + "securitySettings": { + "certificate": [ + { + "usage": "ENCIPHERMENT", + "certificateFile": "cert/self-signed_cert.pem", + "keyFile": "cert/self-signed_key.pem" + } + ] + } + } + } + ] +} diff --git a/testing/scenarios/httpupgrade_test.go b/testing/scenarios/httpupgrade_test.go index 6a7366c3b..20bc779fb 100644 --- a/testing/scenarios/httpupgrade_test.go +++ b/testing/scenarios/httpupgrade_test.go @@ -50,3 +50,81 @@ func TestHTTPUpgrade(t *testing.T) { t.Error(err) } } + +func TestHTTPUpgradeWithEarlyData(t *testing.T) { + tcpServer := tcp.Server{ + MsgProcessor: xor, + } + dest, err := tcpServer.Start() + common.Must(err) + defer tcpServer.Close() + + coreInst, InstMgrIfce := NewInstanceManagerCoreInstance() + defer coreInst.Close() + + common.Must(InstMgrIfce.AddInstance( + context.TODO(), + "httpupgrade_client", + common.Must2(os.ReadFile("config/httpupgrade_earlydata_client.json")).([]byte), + "jsonv5")) + + common.Must(InstMgrIfce.AddInstance( + context.TODO(), + "httpupgrade_server", + common.Must2(os.ReadFile("config/httpupgrade_earlydata_server.json")).([]byte), + "jsonv5")) + + common.Must(InstMgrIfce.StartInstance(context.TODO(), "httpupgrade_server")) + common.Must(InstMgrIfce.StartInstance(context.TODO(), "httpupgrade_client")) + + defer func() { + common.Must(InstMgrIfce.StopInstance(context.TODO(), "httpupgrade_server")) + common.Must(InstMgrIfce.StopInstance(context.TODO(), "httpupgrade_client")) + common.Must(InstMgrIfce.UntrackInstance(context.TODO(), "httpupgrade_server")) + common.Must(InstMgrIfce.UntrackInstance(context.TODO(), "httpupgrade_client")) + coreInst.Close() + }() + + if err := testTCPConnViaSocks(17794, dest.Port, 1024, time.Second*2)(); err != nil { + t.Error(err) + } +} + +func TestHTTPUpgradeWithShortEarlyData(t *testing.T) { + tcpServer := tcp.Server{ + MsgProcessor: xor, + } + dest, err := tcpServer.Start() + common.Must(err) + defer tcpServer.Close() + + coreInst, InstMgrIfce := NewInstanceManagerCoreInstance() + defer coreInst.Close() + + common.Must(InstMgrIfce.AddInstance( + context.TODO(), + "httpupgrade_client", + common.Must2(os.ReadFile("config/httpupgrade_earlydataShortEarlyData_client.json")).([]byte), + "jsonv5")) + + common.Must(InstMgrIfce.AddInstance( + context.TODO(), + "httpupgrade_server", + common.Must2(os.ReadFile("config/httpupgrade_earlydataShortEarlyData_server.json")).([]byte), + "jsonv5")) + + common.Must(InstMgrIfce.StartInstance(context.TODO(), "httpupgrade_server")) + common.Must(InstMgrIfce.StartInstance(context.TODO(), "httpupgrade_client")) + + defer func() { + common.Must(InstMgrIfce.StopInstance(context.TODO(), "httpupgrade_server")) + common.Must(InstMgrIfce.StopInstance(context.TODO(), "httpupgrade_client")) + common.Must(InstMgrIfce.UntrackInstance(context.TODO(), "httpupgrade_server")) + common.Must(InstMgrIfce.UntrackInstance(context.TODO(), "httpupgrade_client")) + coreInst.Close() + }() + + if err := testTCPConnViaSocks(17794, dest.Port, 1024, time.Second*2)(); err != nil { + t.Error(err) + } +} diff --git a/transport/internet/httpupgrade/config.pb.go b/transport/internet/httpupgrade/config.pb.go index b6fba507f..7c17b037d 100644 --- a/transport/internet/httpupgrade/config.pb.go +++ b/transport/internet/httpupgrade/config.pb.go @@ -16,21 +16,20 @@ const ( ) type Config struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Path string `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"` - Host string `protobuf:"bytes,2,opt,name=host,proto3" json:"host,omitempty"` + state protoimpl.MessageState `protogen:"open.v1"` + Path string `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"` + Host string `protobuf:"bytes,2,opt,name=host,proto3" json:"host,omitempty"` + MaxEarlyData int32 `protobuf:"varint,3,opt,name=max_early_data,json=maxEarlyData,proto3" json:"max_early_data,omitempty"` + EarlyDataHeaderName string `protobuf:"bytes,4,opt,name=early_data_header_name,json=earlyDataHeaderName,proto3" json:"early_data_header_name,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *Config) Reset() { *x = Config{} - if protoimpl.UnsafeEnabled { - mi := &file_transport_internet_httpupgrade_config_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } + mi := &file_transport_internet_httpupgrade_config_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) } func (x *Config) String() string { @@ -41,7 +40,7 @@ func (*Config) ProtoMessage() {} func (x *Config) ProtoReflect() protoreflect.Message { mi := &file_transport_internet_httpupgrade_config_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { + if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) @@ -70,6 +69,20 @@ func (x *Config) GetHost() string { return "" } +func (x *Config) GetMaxEarlyData() int32 { + if x != nil { + return x.MaxEarlyData + } + return 0 +} + +func (x *Config) GetEarlyDataHeaderName() string { + if x != nil { + return x.EarlyDataHeaderName + } + return "" +} + var File_transport_internet_httpupgrade_config_proto protoreflect.FileDescriptor var file_transport_internet_httpupgrade_config_proto_rawDesc = []byte{ @@ -81,22 +94,28 @@ var file_transport_internet_httpupgrade_config_proto_rawDesc = []byte{ 0x75, 0x65, 0x73, 0x74, 0x2e, 0x68, 0x74, 0x74, 0x70, 0x75, 0x70, 0x67, 0x72, 0x61, 0x64, 0x65, 0x1a, 0x20, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x65, 0x78, 0x74, 0x2f, 0x65, 0x78, 0x74, 0x65, 0x6e, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x22, 0x52, 0x0a, 0x06, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x12, 0x0a, 0x04, - 0x70, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x70, 0x61, 0x74, 0x68, - 0x12, 0x12, 0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, - 0x68, 0x6f, 0x73, 0x74, 0x3a, 0x20, 0x82, 0xb5, 0x18, 0x1c, 0x0a, 0x09, 0x74, 0x72, 0x61, 0x6e, - 0x73, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x0b, 0x68, 0x74, 0x74, 0x70, 0x75, 0x70, 0x67, 0x72, 0x61, - 0x64, 0x65, 0x90, 0xff, 0x29, 0x01, 0x42, 0x9c, 0x01, 0x0a, 0x2d, 0x63, 0x6f, 0x6d, 0x2e, 0x76, - 0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, - 0x6f, 0x72, 0x74, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x68, 0x74, 0x74, - 0x70, 0x75, 0x70, 0x67, 0x72, 0x61, 0x64, 0x65, 0x50, 0x01, 0x5a, 0x3d, 0x67, 0x69, 0x74, 0x68, - 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x76, 0x32, 0x66, 0x6c, 0x79, 0x2f, 0x76, 0x32, 0x72, - 0x61, 0x79, 0x2d, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x76, 0x35, 0x2f, 0x74, 0x72, 0x61, 0x6e, 0x73, - 0x70, 0x6f, 0x72, 0x74, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2f, 0x68, 0x74, - 0x74, 0x70, 0x75, 0x70, 0x67, 0x72, 0x61, 0x64, 0x65, 0xaa, 0x02, 0x29, 0x56, 0x32, 0x52, 0x61, - 0x79, 0x2e, 0x43, 0x6f, 0x72, 0x65, 0x2e, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, - 0x2e, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x48, 0x74, 0x74, 0x70, 0x55, 0x70, - 0x67, 0x72, 0x61, 0x64, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x74, 0x6f, 0x22, 0xad, 0x01, 0x0a, 0x06, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x12, 0x0a, + 0x04, 0x70, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x70, 0x61, 0x74, + 0x68, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x04, 0x68, 0x6f, 0x73, 0x74, 0x12, 0x24, 0x0a, 0x0e, 0x6d, 0x61, 0x78, 0x5f, 0x65, 0x61, 0x72, + 0x6c, 0x79, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0c, 0x6d, + 0x61, 0x78, 0x45, 0x61, 0x72, 0x6c, 0x79, 0x44, 0x61, 0x74, 0x61, 0x12, 0x33, 0x0a, 0x16, 0x65, + 0x61, 0x72, 0x6c, 0x79, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, + 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x13, 0x65, 0x61, 0x72, + 0x6c, 0x79, 0x44, 0x61, 0x74, 0x61, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, + 0x3a, 0x20, 0x82, 0xb5, 0x18, 0x1c, 0x0a, 0x09, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, + 0x74, 0x12, 0x0b, 0x68, 0x74, 0x74, 0x70, 0x75, 0x70, 0x67, 0x72, 0x61, 0x64, 0x65, 0x90, 0xff, + 0x29, 0x01, 0x42, 0x9c, 0x01, 0x0a, 0x2d, 0x63, 0x6f, 0x6d, 0x2e, 0x76, 0x32, 0x72, 0x61, 0x79, + 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, + 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x68, 0x74, 0x74, 0x70, 0x75, 0x70, 0x67, + 0x72, 0x61, 0x64, 0x65, 0x50, 0x01, 0x5a, 0x3d, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, + 0x6f, 0x6d, 0x2f, 0x76, 0x32, 0x66, 0x6c, 0x79, 0x2f, 0x76, 0x32, 0x72, 0x61, 0x79, 0x2d, 0x63, + 0x6f, 0x72, 0x65, 0x2f, 0x76, 0x35, 0x2f, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, + 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2f, 0x68, 0x74, 0x74, 0x70, 0x75, 0x70, + 0x67, 0x72, 0x61, 0x64, 0x65, 0xaa, 0x02, 0x29, 0x56, 0x32, 0x52, 0x61, 0x79, 0x2e, 0x43, 0x6f, + 0x72, 0x65, 0x2e, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x49, 0x6e, 0x74, + 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x48, 0x74, 0x74, 0x70, 0x55, 0x70, 0x67, 0x72, 0x61, 0x64, + 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -112,7 +131,7 @@ func file_transport_internet_httpupgrade_config_proto_rawDescGZIP() []byte { } var file_transport_internet_httpupgrade_config_proto_msgTypes = make([]protoimpl.MessageInfo, 1) -var file_transport_internet_httpupgrade_config_proto_goTypes = []interface{}{ +var file_transport_internet_httpupgrade_config_proto_goTypes = []any{ (*Config)(nil), // 0: v2ray.core.transport.internet.request.httpupgrade.Config } var file_transport_internet_httpupgrade_config_proto_depIdxs = []int32{ @@ -128,20 +147,6 @@ func file_transport_internet_httpupgrade_config_proto_init() { if File_transport_internet_httpupgrade_config_proto != nil { return } - if !protoimpl.UnsafeEnabled { - file_transport_internet_httpupgrade_config_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Config); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ diff --git a/transport/internet/httpupgrade/connection.go b/transport/internet/httpupgrade/connection.go index e7bd727ee..6a43a4384 100644 --- a/transport/internet/httpupgrade/connection.go +++ b/transport/internet/httpupgrade/connection.go @@ -1,7 +1,6 @@ package httpupgrade import ( - "bytes" "context" "github.com/v2fly/v2ray-core/v5/common/buf" "github.com/v2fly/v2ray-core/v5/common/net" @@ -18,14 +17,12 @@ type connection struct { shouldWait bool delayedDialFinish context.Context finishedDial context.CancelFunc - dialer DelayedDialer + dialer delayedDialer } -type DelayedDialer interface { - Dial(earlyData []byte) (conn net.Conn, earlyReply []byte, err error) -} +type delayedDialer func(earlyData []byte) (conn net.Conn, earlyReply io.Reader, err error) -func newConnectionWithEarlyReply(conn net.Conn, remoteAddr net.Addr, earlyReplyReader io.Reader) *connection { +func newConnectionWithPendingRead(conn net.Conn, remoteAddr net.Addr, earlyReplyReader io.Reader) *connection { return &connection{ conn: conn, remoteAddr: remoteAddr, @@ -33,6 +30,16 @@ func newConnectionWithEarlyReply(conn net.Conn, remoteAddr net.Addr, earlyReplyR } } +func newConnectionWithDelayedDial(dialer delayedDialer) *connection { + ctx, cancel := context.WithCancel(context.Background()) + return &connection{ + shouldWait: true, + delayedDialFinish: ctx, + finishedDial: cancel, + dialer: dialer, + } +} + // Read implements net.Conn.Read() func (c *connection) Read(b []byte) (int, error) { if c.shouldWait { @@ -57,10 +64,10 @@ func (c *connection) Read(b []byte) (int, error) { func (c *connection) Write(b []byte) (int, error) { if c.shouldWait { var err error - var earlyReply []byte - c.conn, earlyReply, err = c.dialer.Dial(b) + var earlyReply io.Reader + c.conn, earlyReply, err = c.dialer(b) if earlyReply != nil { - c.reader = bytes.NewReader(earlyReply) + c.reader = earlyReply } c.finishedDial() if err != nil { diff --git a/transport/internet/httpupgrade/dialer.go b/transport/internet/httpupgrade/dialer.go index 21c4ea75e..5e6d162dd 100644 --- a/transport/internet/httpupgrade/dialer.go +++ b/transport/internet/httpupgrade/dialer.go @@ -3,6 +3,7 @@ package httpupgrade import ( "bufio" "context" + "encoding/base64" "io" "net/http" "strings" @@ -17,39 +18,72 @@ import ( func dialhttpUpgrade(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (net.Conn, error) { transportConfiguration := streamSettings.ProtocolSettings.(*Config) - conn, err := transportcommon.DialWithSecuritySettings(ctx, dest, streamSettings) - if err != nil { - return nil, newError("failed to dial request to ", dest).Base(err) - } - req, err := http.NewRequest("GET", transportConfiguration.GetNormalizedPath(), nil) - if err != nil { - return nil, err + dialer := func(earlyData []byte) (net.Conn, io.Reader, error) { + conn, err := transportcommon.DialWithSecuritySettings(ctx, dest, streamSettings) + if err != nil { + return nil, nil, newError("failed to dial request to ", dest).Base(err) + } + req, err := http.NewRequest("GET", transportConfiguration.GetNormalizedPath(), nil) + if err != nil { + return nil, nil, err + } + + req.Header.Set("Connection", "upgrade") + req.Header.Set("Upgrade", "websocket") + req.Host = transportConfiguration.Host + + earlyDataSize := len(earlyData) + if earlyDataSize > int(transportConfiguration.MaxEarlyData) { + earlyDataSize = int(transportConfiguration.MaxEarlyData) + } + + if earlyData != nil && len(earlyData) > 0 { + if transportConfiguration.EarlyDataHeaderName == "" { + return nil, nil, newError("EarlyDataHeaderName is not set") + } + req.Header.Set(transportConfiguration.EarlyDataHeaderName, base64.URLEncoding.EncodeToString(earlyData)) + } + + err = req.Write(conn) + if err != nil { + return nil, nil, err + } + + if earlyData != nil && len(earlyData[earlyDataSize:]) > 0 { + _, err = conn.Write(earlyData[earlyDataSize:]) + if err != nil { + return nil, nil, newError("failed to finish write early data").Base(err) + } + } + + bufferedConn := bufio.NewReader(conn) + resp, err := http.ReadResponse(bufferedConn, req) // nolint:bodyclose + if err != nil { + return nil, nil, err + } + + if resp.Status == "101 Switching Protocols" && + strings.ToLower(resp.Header.Get("Upgrade")) == "websocket" && + strings.ToLower(resp.Header.Get("Connection")) == "upgrade" { + + earlyReplyReader := io.LimitReader(bufferedConn, int64(bufferedConn.Buffered())) + return conn, earlyReplyReader, nil + } + + return nil, nil, newError("unrecognized reply") } - req.Header.Set("Connection", "upgrade") - req.Header.Set("Upgrade", "websocket") - req.Host = transportConfiguration.Host - - err = req.Write(conn) - if err != nil { - return nil, err - } - - bufferedConn := bufio.NewReader(conn) - resp, err := http.ReadResponse(bufferedConn, req) // nolint:bodyclose - if err != nil { - return nil, err - } - - if resp.Status == "101 Switching Protocols" && - strings.ToLower(resp.Header.Get("Upgrade")) == "websocket" && - strings.ToLower(resp.Header.Get("Connection")) == "upgrade" { - + if transportConfiguration.MaxEarlyData == 0 { + conn, earlyReplyReader, err := dialer(nil) + if err != nil { + return nil, err + } remoteAddr := conn.RemoteAddr() - earlyReplyReader := io.LimitReader(bufferedConn, int64(bufferedConn.Buffered())) - return newConnectionWithEarlyReply(conn, remoteAddr, earlyReplyReader), nil + + return newConnectionWithPendingRead(conn, remoteAddr, earlyReplyReader), nil } - return nil, newError("unrecognized reply") + + return newConnectionWithDelayedDial(dialer), nil } func dial(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (internet.Connection, error) { diff --git a/transport/internet/httpupgrade/hub.go b/transport/internet/httpupgrade/hub.go index ecbe441c1..a3989ca5a 100644 --- a/transport/internet/httpupgrade/hub.go +++ b/transport/internet/httpupgrade/hub.go @@ -2,7 +2,9 @@ package httpupgrade import ( "bufio" + "bytes" "context" + "encoding/base64" "net/http" "strings" @@ -13,6 +15,8 @@ import ( ) type server struct { + config *Config + addConn internet.ConnHandler innnerListener net.Listener } @@ -52,6 +56,19 @@ func (s *server) Handle(conn net.Conn) (internet.Connection, error) { _ = conn.Close() return nil, err } + if s.config.MaxEarlyData != 0 { + if s.config.EarlyDataHeaderName == "" { + return nil, newError("EarlyDataHeaderName is not set") + } + earlyData := req.Header.Get(s.config.EarlyDataHeaderName) + if earlyData != "" { + earlyDataBytes, err := base64.URLEncoding.DecodeString(earlyData) + if err != nil { + return nil, err + } + return newConnectionWithPendingRead(conn, conn.RemoteAddr(), bytes.NewReader(earlyDataBytes)), nil + } + } return internet.Connection(conn), nil } @@ -72,8 +89,7 @@ func (s *server) keepAccepting() { func listenHTTPUpgrade(ctx context.Context, address net.Address, port net.Port, streamSettings *internet.MemoryStreamConfig, addConn internet.ConnHandler) (internet.Listener, error) { transportConfiguration := streamSettings.ProtocolSettings.(*Config) - _ = transportConfiguration - serverInstance := &server{addConn: addConn} + serverInstance := &server{config: transportConfiguration, addConn: addConn} listener, err := transportcommon.ListenWithSecuritySettings(ctx, address, port, streamSettings) if err != nil {