From 3936e01afb6c312835e9f1dc3f31bf0e54f7e410 Mon Sep 17 00:00:00 2001 From: Ian Ling Date: Mon, 28 Dec 2020 13:33:17 -0800 Subject: [PATCH] Networking bugfixes and cleanup Make sure connections close properly, without weird error messages Remove player map entity when a player disconnects from a multiplayer game Close server properly when host disconnects, handle ServerClose on remote clients Don't mix JSON decoders and raw TCP writes Actually handle incoming packets from remote clients General code cleanup for simplicity and consistency --- .../d2localclient/local_client_connection.go | 7 +- .../remote_client_connection.go | 131 ++++--------- d2networking/d2client/game_client.go | 18 +- d2networking/d2netpacket/packet_ping.go | 12 +- .../d2netpacket/packet_server_closed.go | 2 +- .../tcp_client_connection.go | 7 +- d2networking/d2server/game_server.go | 172 +++++++++--------- 7 files changed, 152 insertions(+), 197 deletions(-) diff --git a/d2networking/d2client/d2localclient/local_client_connection.go b/d2networking/d2client/d2localclient/local_client_connection.go index 391160b7..3923d42b 100644 --- a/d2networking/d2client/d2localclient/local_client_connection.go +++ b/d2networking/d2client/d2localclient/local_client_connection.go @@ -87,19 +87,16 @@ func (l *LocalClientConnection) Open(_, saveFilePath string) error { // Close disconnects from the server and destroys it. func (l *LocalClientConnection) Close() error { - sc, err := d2netpacket.CreateServerClosedPacket() + disconnectRequest, err := d2netpacket.CreatePlayerDisconnectRequestPacket(l.uniqueID) if err != nil { return err } - err = l.SendPacketToServer(sc) + err = l.SendPacketToServer(disconnectRequest) if err != nil { return err } - l.gameServer.OnClientDisconnected(l) - l.gameServer.Stop() - return nil } diff --git a/d2networking/d2client/d2remoteclient/remote_client_connection.go b/d2networking/d2client/d2remoteclient/remote_client_connection.go index 9f47744e..57df11ef 100644 --- a/d2networking/d2client/d2remoteclient/remote_client_connection.go +++ b/d2networking/d2client/d2remoteclient/remote_client_connection.go @@ -3,6 +3,7 @@ package d2remoteclient import ( "encoding/json" "fmt" + "io" "net" "strings" @@ -131,12 +132,10 @@ func (r *RemoteClientConnection) SetClientListener(listener d2networking.ClientL // SendPacketToServer compresses the JSON encoding of a NetPacket and // sends it to the server. func (r *RemoteClientConnection) SendPacketToServer(packet d2netpacket.NetPacket) error { - data, err := json.Marshal(packet) - if err != nil { - return err - } + encoder := json.NewEncoder(r.tcpConnection) - if _, err = r.tcpConnection.Write(data); err != nil { + err := encoder.Encode(packet) + if err != nil { return err } @@ -146,15 +145,21 @@ func (r *RemoteClientConnection) SendPacketToServer(packet d2netpacket.NetPacket // serverListener runs a while loop, reading from the GameServer's TCP // connection. func (r *RemoteClientConnection) serverListener() { - var packet d2netpacket.NetPacket - decoder := json.NewDecoder(r.tcpConnection) for { + var packet d2netpacket.NetPacket + err := decoder.Decode(&packet) if err != nil { - r.Errorf("failed to decode the packet, err: %v\n", err) - return + switch err { + case io.EOF: + break // the other side closed the connection + default: + r.Errorf("failed to decode the packet, err: %v\n", err) + } + + return // allow the connection to close } p, err := r.decodeToPacket(packet.PacketType, string(packet.PacketData)) @@ -186,102 +191,29 @@ func (r *RemoteClientConnection) bytesToJSON(buffer []byte) (string, d2netpacket func (r *RemoteClientConnection) decodeToPacket( t d2netpackettype.NetPacketType, data string) (d2netpacket.NetPacket, error) { - var np = d2netpacket.NetPacket{} - - var err error + var ( + np = d2netpacket.NetPacket{} + err error + p interface{} + ) switch t { case d2netpackettype.GenerateMap: - var p d2netpacket.GenerateMapPacket - if err = json.Unmarshal([]byte(data), &p); err != nil { - break - } - - mp, marshalErr := d2netpacket.MarshalPacket(p) - if marshalErr != nil { - r.Errorf("MarshalPacket: %v", marshalErr) - } - - np = d2netpacket.NetPacket{PacketType: t, PacketData: mp} - + p, err = d2netpacket.UnmarshalGenerateMap([]byte(data)) case d2netpackettype.MovePlayer: - var p d2netpacket.MovePlayerPacket - if err = json.Unmarshal([]byte(data), &p); err != nil { - break - } - - mp, marshalErr := d2netpacket.MarshalPacket(p) - if marshalErr != nil { - r.Errorf("MarshalPacket: %v", marshalErr) - } - - np = d2netpacket.NetPacket{PacketType: t, PacketData: mp} - + p, err = d2netpacket.UnmarshalMovePlayer([]byte(data)) case d2netpackettype.UpdateServerInfo: - var p d2netpacket.UpdateServerInfoPacket - if err = json.Unmarshal([]byte(data), &p); err != nil { - break - } - - mp, marshalErr := d2netpacket.MarshalPacket(p) - if marshalErr != nil { - r.Errorf("MarshalPacket: %v", marshalErr) - } - - np = d2netpacket.NetPacket{PacketType: t, PacketData: mp} - + p, err = d2netpacket.UnmarshalUpdateServerInfo([]byte(data)) case d2netpackettype.AddPlayer: - var p d2netpacket.AddPlayerPacket - if err = json.Unmarshal([]byte(data), &p); err != nil { - break - } - - mp, marshalErr := d2netpacket.MarshalPacket(p) - if marshalErr != nil { - r.Errorf("MarshalPacket: %v", marshalErr) - } - - np = d2netpacket.NetPacket{PacketType: t, PacketData: mp} - + p, err = d2netpacket.UnmarshalAddPlayer([]byte(data)) case d2netpackettype.CastSkill: - var p d2netpacket.CastPacket - if err = json.Unmarshal([]byte(data), &p); err != nil { - break - } - - mp, marshalErr := d2netpacket.MarshalPacket(p) - if marshalErr != nil { - r.Errorf("MarshalPacket: %v", marshalErr) - } - - np = d2netpacket.NetPacket{PacketType: t, PacketData: mp} - + p, err = d2netpacket.UnmarshalCast([]byte(data)) case d2netpackettype.Ping: - var p d2netpacket.PingPacket - if err = json.Unmarshal([]byte(data), &p); err != nil { - break - } - - mp, marshalErr := d2netpacket.MarshalPacket(p) - if marshalErr != nil { - r.Errorf("MarshalPacket: %v", marshalErr) - } - - np = d2netpacket.NetPacket{PacketType: t, PacketData: mp} - + p, err = d2netpacket.UnmarshalPing([]byte(data)) case d2netpackettype.PlayerDisconnectionNotification: - var p d2netpacket.PlayerDisconnectRequestPacket - if err = json.Unmarshal([]byte(data), &p); err != nil { - break - } - - mp, marshalErr := d2netpacket.MarshalPacket(p) - if marshalErr != nil { - r.Errorf("MarshalPacket: %v", marshalErr) - } - - np = d2netpacket.NetPacket{PacketType: t, PacketData: mp} - + p, err = d2netpacket.UnmarshalPlayerDisconnectionRequest([]byte(data)) + case d2netpackettype.ServerClosed: + p, err = d2netpacket.UnmarshalServerClosed([]byte(data)) default: err = fmt.Errorf("RemoteClientConnection: unrecognized packet type: %v", t) } @@ -290,5 +222,12 @@ func (r *RemoteClientConnection) decodeToPacket( return np, err } + mp, marshalErr := d2netpacket.MarshalPacket(p) + if marshalErr != nil { + r.Errorf("MarshalPacket: %v", marshalErr) + } + + np = d2netpacket.NetPacket{PacketType: t, PacketData: mp} + return np, nil } diff --git a/d2networking/d2client/game_client.go b/d2networking/d2client/game_client.go index d3b197aa..0737223d 100644 --- a/d2networking/d2client/game_client.go +++ b/d2networking/d2client/game_client.go @@ -160,8 +160,9 @@ func (g *GameClient) OnPacketReceived(packet d2netpacket.NetPacket) error { g.Errorf("GameClient: error responding to server ping: %s", err) } case d2netpackettype.PlayerDisconnectionNotification: - // Not implemented - g.Infof("RemoteClientConnection: received disconnect: %s", packet.PacketData) + if err := g.handlePlayerDisconnectionPacket(packet); err != nil { + return err + } case d2netpackettype.ServerClosed: // https://github.com/OpenDiablo2/OpenDiablo2/issues/802 g.Infof("Server has been closed") @@ -446,6 +447,19 @@ func (g *GameClient) handlePingPacket() error { return nil } +func (g *GameClient) handlePlayerDisconnectionPacket(packet d2netpacket.NetPacket) error { + disconnectPacket, err := d2netpacket.UnmarshalPlayerDisconnectionRequest(packet.PacketData) + if err != nil { + return err + } + + player := g.Players[disconnectPacket.ID] + g.MapEngine.RemoveEntity(player) + delete(g.Players, disconnectPacket.ID) + + return nil +} + // IsSinglePlayer returns a bool for whether the game is a single-player game func (g *GameClient) IsSinglePlayer() bool { return g.connectionType == d2clientconnectiontype.Local diff --git a/d2networking/d2netpacket/packet_ping.go b/d2networking/d2netpacket/packet_ping.go index edad91f0..5ce7b61a 100644 --- a/d2networking/d2netpacket/packet_ping.go +++ b/d2networking/d2netpacket/packet_ping.go @@ -1,4 +1,4 @@ -package d2netpacket +package d2netpacket //nolint:dupl // ServerClosed and Ping just happen to be very similar packets import ( "encoding/json" @@ -30,3 +30,13 @@ func CreatePingPacket() (NetPacket, error) { PacketData: b, }, nil } + +// UnmarshalPing unmarshals the given data to a PingPacket struct +func UnmarshalPing(packet []byte) (PingPacket, error) { + var p PingPacket + if err := json.Unmarshal(packet, &p); err != nil { + return p, err + } + + return p, nil +} diff --git a/d2networking/d2netpacket/packet_server_closed.go b/d2networking/d2netpacket/packet_server_closed.go index eb7fcb19..79c05833 100644 --- a/d2networking/d2netpacket/packet_server_closed.go +++ b/d2networking/d2netpacket/packet_server_closed.go @@ -1,4 +1,4 @@ -package d2netpacket +package d2netpacket //nolint:dupl // ServerClosed and Ping just happen to be very similar packets import ( "encoding/json" diff --git a/d2networking/d2server/d2tcpclientconnection/tcp_client_connection.go b/d2networking/d2server/d2tcpclientconnection/tcp_client_connection.go index 36bf953c..7a457feb 100644 --- a/d2networking/d2server/d2tcpclientconnection/tcp_client_connection.go +++ b/d2networking/d2server/d2tcpclientconnection/tcp_client_connection.go @@ -33,12 +33,9 @@ func (t TCPClientConnection) GetUniqueID() string { // SendPacketToClient marshals and sends (writes) NetPackets func (t *TCPClientConnection) SendPacketToClient(p d2netpacket.NetPacket) error { - packet, err := json.Marshal(p) - if err != nil { - return err - } + encoder := json.NewEncoder(t.tcpConnection) - _, err = t.tcpConnection.Write(packet) + err := encoder.Encode(p) if err != nil { return err } diff --git a/d2networking/d2server/game_server.go b/d2networking/d2server/game_server.go index de20b456..8e349281 100644 --- a/d2networking/d2server/game_server.go +++ b/d2networking/d2server/game_server.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "io" "net" "sync" "time" @@ -16,6 +17,7 @@ import ( "github.com/OpenDiablo2/OpenDiablo2/d2core/d2hero" "github.com/OpenDiablo2/OpenDiablo2/d2core/d2map/d2mapengine" "github.com/OpenDiablo2/OpenDiablo2/d2core/d2map/d2mapgen" + "github.com/OpenDiablo2/OpenDiablo2/d2networking/d2client/d2clientconnectiontype" "github.com/OpenDiablo2/OpenDiablo2/d2networking/d2netpacket" "github.com/OpenDiablo2/OpenDiablo2/d2networking/d2netpacket/d2netpackettype" "github.com/OpenDiablo2/OpenDiablo2/d2networking/d2server/d2tcpclientconnection" @@ -50,12 +52,19 @@ type GameServer struct { scriptEngine *d2script.ScriptEngine seed int64 maxConnections int - packetManagerChan chan []byte + packetManagerChan chan ReceivedPacket heroStateFactory *d2hero.HeroStateFactory *d2util.Logger } +// ReceivedPacket encapsulates the data necessary for the packet manager goroutine to process data from clients. +// The packet manager needs to know who sent the data, in addition to the data itself. +type ReceivedPacket struct { + Client ClientConnection + Packet d2netpacket.NetPacket +} + // NewGameServer builds a new GameServer that can be started // // ctx: required context item @@ -84,7 +93,7 @@ func NewGameServer(asset *d2asset.AssetManager, connections: make(map[string]ClientConnection), networkServer: networkServer, maxConnections: maxConnections[0], - packetManagerChan: make(chan []byte), + packetManagerChan: make(chan ReceivedPacket), mapEngines: make([]*d2mapengine.MapEngine, 0), scriptEngine: d2script.CreateScriptEngine(), seed: time.Now().UnixNano(), @@ -142,7 +151,13 @@ func (g *GameServer) Start() error { for { c, err := g.listener.Accept() if err != nil { - g.Errorf("Unable to accept connection: %s", err) + select { + case <-g.ctx.Done(): + // this error was just a result of the server closing, don't worry about it + default: + g.Errorf("Unable to accept connection: %s", err) + } + return } @@ -157,6 +172,7 @@ func (g *GameServer) Start() error { func (g *GameServer) Stop() { g.Lock() g.cancel() + g.connections = make(map[string]ClientConnection) if err := g.listener.Close(); err != nil { g.Errorf("failed to close the listener %s, err: %v\n", g.listener.Addr(), err) @@ -173,45 +189,9 @@ func (g *GameServer) packetManager() { case <-g.ctx.Done(): return case p := <-g.packetManagerChan: - ipt, err := d2netpacket.InspectPacketType(p) + err := g.OnPacketReceived(p.Client, p.Packet) if err != nil { - g.Errorf("InspectPacketType: %v", err) - } - - switch ipt { - case d2netpackettype.PlayerConnectionRequest: - player, err := d2netpacket.UnmarshalNetPacket(p) - if err != nil { - g.Errorf("Unable to unmarshal PlayerConnectionRequestPacket: %s\n", err) - } - - g.sendPacketToClients(player) - case d2netpackettype.MovePlayer: - move, err := d2netpacket.UnmarshalNetPacket(p) - if err != nil { - g.Error(err.Error()) - continue - } - - g.sendPacketToClients(move) - case d2netpackettype.CastSkill: - castSkill, err := d2netpacket.UnmarshalNetPacket(p) - if err != nil { - g.Error(err.Error()) - continue - } - - g.sendPacketToClients(castSkill) - case d2netpackettype.SpawnItem: - item, err := d2netpacket.UnmarshalNetPacket(p) - if err != nil { - g.Error(err.Error()) - continue - } - - g.sendPacketToClients(item) - case d2netpackettype.ServerClosed: - g.Stop() + g.Errorf("failed to handle packet received from client %s: %v", p.Client.GetUniqueID(), err) } } } @@ -228,9 +208,10 @@ func (g *GameServer) sendPacketToClients(packet d2netpacket.NetPacket) { // handleConnection accepts an individual connection and starts pooling for new packets. It is recommended this is called // via Go Routine. Context should be a property of the GameServer Struct. func (g *GameServer) handleConnection(conn net.Conn) { - var connected int - - var packet d2netpacket.NetPacket + var ( + connected int + client ClientConnection + ) g.Infof("Accepting connection: %s\n", conn.RemoteAddr().String()) @@ -243,10 +224,18 @@ func (g *GameServer) handleConnection(conn net.Conn) { decoder := json.NewDecoder(conn) for { + var packet d2netpacket.NetPacket + err := decoder.Decode(&packet) if err != nil { - g.Error(err.Error()) - return // exit this connection as we could not read the first packet + switch err { + case io.EOF: + break // the other side closed the connection + default: + g.Error(err.Error()) + } + + return // allow the connection to close } // If this is the first packet we are seeing from this specific connection we first need to see if the client @@ -257,25 +246,7 @@ func (g *GameServer) handleConnection(conn net.Conn) { g.Infof("Closing connection with %s: did not receive new player connection request...", conn.RemoteAddr().String()) } - if err := g.registerConnection(packet.PacketData, conn); err != nil { - switch err { - case errServerFull: // Server is currently full and not accepting new connections. - sf, serverFullErr := d2netpacket.CreateServerFullPacket() - if serverFullErr != nil { - g.Errorf("ServerFullPacket: %v", serverFullErr) - } - - msf, marshalServerFullErr := d2netpacket.MarshalPacket(sf) - if marshalServerFullErr != nil { - g.Errorf("MarshalPacket: %v", marshalServerFullErr) - } - - _, errServerFullPacket := conn.Write(msf) - g.Warningf("%v", errServerFullPacket) - case errPlayerAlreadyExists: // Player is already registered and did not disconnection correctly. - g.Errorf("%v", err) - } - + if client, err = g.registerConnection(packet.PacketData, conn); err != nil { return } @@ -286,7 +257,10 @@ func (g *GameServer) handleConnection(conn net.Conn) { case <-g.ctx.Done(): return default: - g.packetManagerChan <- packet.PacketData + g.packetManagerChan <- ReceivedPacket{ + Client: client, + Packet: packet, + } } } } @@ -296,12 +270,28 @@ func (g *GameServer) handleConnection(conn net.Conn) { // Errors: // - errServerFull // - errPlayerAlreadyExists -func (g *GameServer) registerConnection(b []byte, conn net.Conn) error { +func (g *GameServer) registerConnection(b []byte, conn net.Conn) (ClientConnection, error) { + var client ClientConnection + g.Lock() + defer g.Unlock() // check to see if the server is full if len(g.connections) >= g.maxConnections { - return errServerFull + sf, serverFullErr := d2netpacket.CreateServerFullPacket() + if serverFullErr != nil { + g.Errorf("ServerFullPacket: %v", serverFullErr) + } + + msf, marshalServerFullErr := d2netpacket.MarshalPacket(sf) + if marshalServerFullErr != nil { + g.Errorf("MarshalPacket: %v", marshalServerFullErr) + } + + _, errServerFullPacket := conn.Write(msf) + g.Warningf("%v", errServerFullPacket) + + return client, errServerFull } // if it is not full, unmarshal the playerConnectionRequest @@ -312,29 +302,17 @@ func (g *GameServer) registerConnection(b []byte, conn net.Conn) error { // check to see if the player is already registered if _, ok := g.connections[packet.ID]; ok { - return errPlayerAlreadyExists + g.Errorf("%v", errPlayerAlreadyExists) + return client, errPlayerAlreadyExists } // Client a new TCP Client Connection and add it to the connections map - client := d2tcpclientconnection.CreateTCPClientConnection(conn, packet.ID) + client = d2tcpclientconnection.CreateTCPClientConnection(conn, packet.ID) client.SetPlayerState(packet.PlayerState) - g.Infof("Client connected with an id of %s", client.GetUniqueID()) - g.connections[client.GetUniqueID()] = client - // Temporary position hack -------------------------------------------- - // https://github.com/OpenDiablo2/OpenDiablo2/issues/829 - sx, sy := g.mapEngines[0].GetStartPosition() - clientPlayerState := client.GetPlayerState() - clientPlayerState.X = sx - clientPlayerState.Y = sy - // --------- + g.OnClientConnected(client) - // This really should be deferred however to much time will be spend holding a lock when we attempt to send a packet - g.Unlock() - - g.handleClientConnection(client, sx, sy) - - return nil + return client, nil } // OnClientConnected initializes the given ClientConnection. It sends the @@ -447,12 +425,27 @@ func (g *GameServer) handleClientConnection(client ClientConnection, x, y float6 // OnClientDisconnected removes the given client from the list // of client connections. +// If this client was the host, disconnects all clients and kills GameServer. func (g *GameServer) OnClientDisconnected(client ClientConnection) { g.Infof("Client disconnected with an id of %s", client.GetUniqueID()) delete(g.connections, client.GetUniqueID()) + + if client.GetConnectionType() == d2clientconnectiontype.Local { + g.Info("Host disconnected, game server shuting down") + + serverClosed, err := d2netpacket.CreateServerClosedPacket() + if err != nil { + g.Errorf("failed to generate ServerClosed packet after host disconnected: %s", err) + } else { + g.sendPacketToClients(serverClosed) + } + + g.Stop() + } } -// OnPacketReceived is called by the local client to 'send' a packet to the server. +// OnPacketReceived is called when a packet has been received from a remote client, +// and by the local client to 'send' a packet to the server, // nolint:gocyclo // switch statement on packet type makes sense, no need to change func (g *GameServer) OnPacketReceived(client ClientConnection, packet d2netpacket.NetPacket) error { if g == nil { @@ -490,8 +483,13 @@ func (g *GameServer) OnPacketReceived(client ClientConnection, packet d2netpacke if err != nil { g.Errorf("GameServer: error saving saving Player: %s", err) } + case d2netpackettype.PlayerConnectionRequest: + break // prevent log message. these are handled by handleConnection + case d2netpackettype.PlayerDisconnectionNotification: + g.sendPacketToClients(packet) + g.OnClientDisconnected(client) default: - g.Warningf("GameServer: received unknown packet %T", packet) + g.Warningf("GameServer: received unknown packet %s", packet.PacketType) } return nil