Networking bugfixes and cleanup

Make sure connections close properly, without weird error messages
Remove player map entity when a player disconnects from a multiplayer game
Close server properly when host disconnects, handle ServerClose on remote clients
Don't mix JSON decoders and raw TCP writes
Actually handle incoming packets from remote clients
General code cleanup for simplicity and consistency
This commit is contained in:
Ian Ling 2020-12-28 13:33:17 -08:00
parent f3a869c2be
commit 3936e01afb
7 changed files with 152 additions and 197 deletions

View File

@ -87,19 +87,16 @@ func (l *LocalClientConnection) Open(_, saveFilePath string) error {
// Close disconnects from the server and destroys it.
func (l *LocalClientConnection) Close() error {
sc, err := d2netpacket.CreateServerClosedPacket()
disconnectRequest, err := d2netpacket.CreatePlayerDisconnectRequestPacket(l.uniqueID)
if err != nil {
return err
}
err = l.SendPacketToServer(sc)
err = l.SendPacketToServer(disconnectRequest)
if err != nil {
return err
}
l.gameServer.OnClientDisconnected(l)
l.gameServer.Stop()
return nil
}

View File

@ -3,6 +3,7 @@ package d2remoteclient
import (
"encoding/json"
"fmt"
"io"
"net"
"strings"
@ -131,12 +132,10 @@ func (r *RemoteClientConnection) SetClientListener(listener d2networking.ClientL
// SendPacketToServer compresses the JSON encoding of a NetPacket and
// sends it to the server.
func (r *RemoteClientConnection) SendPacketToServer(packet d2netpacket.NetPacket) error {
data, err := json.Marshal(packet)
if err != nil {
return err
}
encoder := json.NewEncoder(r.tcpConnection)
if _, err = r.tcpConnection.Write(data); err != nil {
err := encoder.Encode(packet)
if err != nil {
return err
}
@ -146,15 +145,21 @@ func (r *RemoteClientConnection) SendPacketToServer(packet d2netpacket.NetPacket
// serverListener runs a while loop, reading from the GameServer's TCP
// connection.
func (r *RemoteClientConnection) serverListener() {
var packet d2netpacket.NetPacket
decoder := json.NewDecoder(r.tcpConnection)
for {
var packet d2netpacket.NetPacket
err := decoder.Decode(&packet)
if err != nil {
r.Errorf("failed to decode the packet, err: %v\n", err)
return
switch err {
case io.EOF:
break // the other side closed the connection
default:
r.Errorf("failed to decode the packet, err: %v\n", err)
}
return // allow the connection to close
}
p, err := r.decodeToPacket(packet.PacketType, string(packet.PacketData))
@ -186,102 +191,29 @@ func (r *RemoteClientConnection) bytesToJSON(buffer []byte) (string, d2netpacket
func (r *RemoteClientConnection) decodeToPacket(
t d2netpackettype.NetPacketType,
data string) (d2netpacket.NetPacket, error) {
var np = d2netpacket.NetPacket{}
var err error
var (
np = d2netpacket.NetPacket{}
err error
p interface{}
)
switch t {
case d2netpackettype.GenerateMap:
var p d2netpacket.GenerateMapPacket
if err = json.Unmarshal([]byte(data), &p); err != nil {
break
}
mp, marshalErr := d2netpacket.MarshalPacket(p)
if marshalErr != nil {
r.Errorf("MarshalPacket: %v", marshalErr)
}
np = d2netpacket.NetPacket{PacketType: t, PacketData: mp}
p, err = d2netpacket.UnmarshalGenerateMap([]byte(data))
case d2netpackettype.MovePlayer:
var p d2netpacket.MovePlayerPacket
if err = json.Unmarshal([]byte(data), &p); err != nil {
break
}
mp, marshalErr := d2netpacket.MarshalPacket(p)
if marshalErr != nil {
r.Errorf("MarshalPacket: %v", marshalErr)
}
np = d2netpacket.NetPacket{PacketType: t, PacketData: mp}
p, err = d2netpacket.UnmarshalMovePlayer([]byte(data))
case d2netpackettype.UpdateServerInfo:
var p d2netpacket.UpdateServerInfoPacket
if err = json.Unmarshal([]byte(data), &p); err != nil {
break
}
mp, marshalErr := d2netpacket.MarshalPacket(p)
if marshalErr != nil {
r.Errorf("MarshalPacket: %v", marshalErr)
}
np = d2netpacket.NetPacket{PacketType: t, PacketData: mp}
p, err = d2netpacket.UnmarshalUpdateServerInfo([]byte(data))
case d2netpackettype.AddPlayer:
var p d2netpacket.AddPlayerPacket
if err = json.Unmarshal([]byte(data), &p); err != nil {
break
}
mp, marshalErr := d2netpacket.MarshalPacket(p)
if marshalErr != nil {
r.Errorf("MarshalPacket: %v", marshalErr)
}
np = d2netpacket.NetPacket{PacketType: t, PacketData: mp}
p, err = d2netpacket.UnmarshalAddPlayer([]byte(data))
case d2netpackettype.CastSkill:
var p d2netpacket.CastPacket
if err = json.Unmarshal([]byte(data), &p); err != nil {
break
}
mp, marshalErr := d2netpacket.MarshalPacket(p)
if marshalErr != nil {
r.Errorf("MarshalPacket: %v", marshalErr)
}
np = d2netpacket.NetPacket{PacketType: t, PacketData: mp}
p, err = d2netpacket.UnmarshalCast([]byte(data))
case d2netpackettype.Ping:
var p d2netpacket.PingPacket
if err = json.Unmarshal([]byte(data), &p); err != nil {
break
}
mp, marshalErr := d2netpacket.MarshalPacket(p)
if marshalErr != nil {
r.Errorf("MarshalPacket: %v", marshalErr)
}
np = d2netpacket.NetPacket{PacketType: t, PacketData: mp}
p, err = d2netpacket.UnmarshalPing([]byte(data))
case d2netpackettype.PlayerDisconnectionNotification:
var p d2netpacket.PlayerDisconnectRequestPacket
if err = json.Unmarshal([]byte(data), &p); err != nil {
break
}
mp, marshalErr := d2netpacket.MarshalPacket(p)
if marshalErr != nil {
r.Errorf("MarshalPacket: %v", marshalErr)
}
np = d2netpacket.NetPacket{PacketType: t, PacketData: mp}
p, err = d2netpacket.UnmarshalPlayerDisconnectionRequest([]byte(data))
case d2netpackettype.ServerClosed:
p, err = d2netpacket.UnmarshalServerClosed([]byte(data))
default:
err = fmt.Errorf("RemoteClientConnection: unrecognized packet type: %v", t)
}
@ -290,5 +222,12 @@ func (r *RemoteClientConnection) decodeToPacket(
return np, err
}
mp, marshalErr := d2netpacket.MarshalPacket(p)
if marshalErr != nil {
r.Errorf("MarshalPacket: %v", marshalErr)
}
np = d2netpacket.NetPacket{PacketType: t, PacketData: mp}
return np, nil
}

View File

@ -160,8 +160,9 @@ func (g *GameClient) OnPacketReceived(packet d2netpacket.NetPacket) error {
g.Errorf("GameClient: error responding to server ping: %s", err)
}
case d2netpackettype.PlayerDisconnectionNotification:
// Not implemented
g.Infof("RemoteClientConnection: received disconnect: %s", packet.PacketData)
if err := g.handlePlayerDisconnectionPacket(packet); err != nil {
return err
}
case d2netpackettype.ServerClosed:
// https://github.com/OpenDiablo2/OpenDiablo2/issues/802
g.Infof("Server has been closed")
@ -446,6 +447,19 @@ func (g *GameClient) handlePingPacket() error {
return nil
}
func (g *GameClient) handlePlayerDisconnectionPacket(packet d2netpacket.NetPacket) error {
disconnectPacket, err := d2netpacket.UnmarshalPlayerDisconnectionRequest(packet.PacketData)
if err != nil {
return err
}
player := g.Players[disconnectPacket.ID]
g.MapEngine.RemoveEntity(player)
delete(g.Players, disconnectPacket.ID)
return nil
}
// IsSinglePlayer returns a bool for whether the game is a single-player game
func (g *GameClient) IsSinglePlayer() bool {
return g.connectionType == d2clientconnectiontype.Local

View File

@ -1,4 +1,4 @@
package d2netpacket
package d2netpacket //nolint:dupl // ServerClosed and Ping just happen to be very similar packets
import (
"encoding/json"
@ -30,3 +30,13 @@ func CreatePingPacket() (NetPacket, error) {
PacketData: b,
}, nil
}
// UnmarshalPing unmarshals the given data to a PingPacket struct
func UnmarshalPing(packet []byte) (PingPacket, error) {
var p PingPacket
if err := json.Unmarshal(packet, &p); err != nil {
return p, err
}
return p, nil
}

View File

@ -1,4 +1,4 @@
package d2netpacket
package d2netpacket //nolint:dupl // ServerClosed and Ping just happen to be very similar packets
import (
"encoding/json"

View File

@ -33,12 +33,9 @@ func (t TCPClientConnection) GetUniqueID() string {
// SendPacketToClient marshals and sends (writes) NetPackets
func (t *TCPClientConnection) SendPacketToClient(p d2netpacket.NetPacket) error {
packet, err := json.Marshal(p)
if err != nil {
return err
}
encoder := json.NewEncoder(t.tcpConnection)
_, err = t.tcpConnection.Write(packet)
err := encoder.Encode(p)
if err != nil {
return err
}

View File

@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"errors"
"io"
"net"
"sync"
"time"
@ -16,6 +17,7 @@ import (
"github.com/OpenDiablo2/OpenDiablo2/d2core/d2hero"
"github.com/OpenDiablo2/OpenDiablo2/d2core/d2map/d2mapengine"
"github.com/OpenDiablo2/OpenDiablo2/d2core/d2map/d2mapgen"
"github.com/OpenDiablo2/OpenDiablo2/d2networking/d2client/d2clientconnectiontype"
"github.com/OpenDiablo2/OpenDiablo2/d2networking/d2netpacket"
"github.com/OpenDiablo2/OpenDiablo2/d2networking/d2netpacket/d2netpackettype"
"github.com/OpenDiablo2/OpenDiablo2/d2networking/d2server/d2tcpclientconnection"
@ -50,12 +52,19 @@ type GameServer struct {
scriptEngine *d2script.ScriptEngine
seed int64
maxConnections int
packetManagerChan chan []byte
packetManagerChan chan ReceivedPacket
heroStateFactory *d2hero.HeroStateFactory
*d2util.Logger
}
// ReceivedPacket encapsulates the data necessary for the packet manager goroutine to process data from clients.
// The packet manager needs to know who sent the data, in addition to the data itself.
type ReceivedPacket struct {
Client ClientConnection
Packet d2netpacket.NetPacket
}
// NewGameServer builds a new GameServer that can be started
//
// ctx: required context item
@ -84,7 +93,7 @@ func NewGameServer(asset *d2asset.AssetManager,
connections: make(map[string]ClientConnection),
networkServer: networkServer,
maxConnections: maxConnections[0],
packetManagerChan: make(chan []byte),
packetManagerChan: make(chan ReceivedPacket),
mapEngines: make([]*d2mapengine.MapEngine, 0),
scriptEngine: d2script.CreateScriptEngine(),
seed: time.Now().UnixNano(),
@ -142,7 +151,13 @@ func (g *GameServer) Start() error {
for {
c, err := g.listener.Accept()
if err != nil {
g.Errorf("Unable to accept connection: %s", err)
select {
case <-g.ctx.Done():
// this error was just a result of the server closing, don't worry about it
default:
g.Errorf("Unable to accept connection: %s", err)
}
return
}
@ -157,6 +172,7 @@ func (g *GameServer) Start() error {
func (g *GameServer) Stop() {
g.Lock()
g.cancel()
g.connections = make(map[string]ClientConnection)
if err := g.listener.Close(); err != nil {
g.Errorf("failed to close the listener %s, err: %v\n", g.listener.Addr(), err)
@ -173,45 +189,9 @@ func (g *GameServer) packetManager() {
case <-g.ctx.Done():
return
case p := <-g.packetManagerChan:
ipt, err := d2netpacket.InspectPacketType(p)
err := g.OnPacketReceived(p.Client, p.Packet)
if err != nil {
g.Errorf("InspectPacketType: %v", err)
}
switch ipt {
case d2netpackettype.PlayerConnectionRequest:
player, err := d2netpacket.UnmarshalNetPacket(p)
if err != nil {
g.Errorf("Unable to unmarshal PlayerConnectionRequestPacket: %s\n", err)
}
g.sendPacketToClients(player)
case d2netpackettype.MovePlayer:
move, err := d2netpacket.UnmarshalNetPacket(p)
if err != nil {
g.Error(err.Error())
continue
}
g.sendPacketToClients(move)
case d2netpackettype.CastSkill:
castSkill, err := d2netpacket.UnmarshalNetPacket(p)
if err != nil {
g.Error(err.Error())
continue
}
g.sendPacketToClients(castSkill)
case d2netpackettype.SpawnItem:
item, err := d2netpacket.UnmarshalNetPacket(p)
if err != nil {
g.Error(err.Error())
continue
}
g.sendPacketToClients(item)
case d2netpackettype.ServerClosed:
g.Stop()
g.Errorf("failed to handle packet received from client %s: %v", p.Client.GetUniqueID(), err)
}
}
}
@ -228,9 +208,10 @@ func (g *GameServer) sendPacketToClients(packet d2netpacket.NetPacket) {
// handleConnection accepts an individual connection and starts pooling for new packets. It is recommended this is called
// via Go Routine. Context should be a property of the GameServer Struct.
func (g *GameServer) handleConnection(conn net.Conn) {
var connected int
var packet d2netpacket.NetPacket
var (
connected int
client ClientConnection
)
g.Infof("Accepting connection: %s\n", conn.RemoteAddr().String())
@ -243,10 +224,18 @@ func (g *GameServer) handleConnection(conn net.Conn) {
decoder := json.NewDecoder(conn)
for {
var packet d2netpacket.NetPacket
err := decoder.Decode(&packet)
if err != nil {
g.Error(err.Error())
return // exit this connection as we could not read the first packet
switch err {
case io.EOF:
break // the other side closed the connection
default:
g.Error(err.Error())
}
return // allow the connection to close
}
// If this is the first packet we are seeing from this specific connection we first need to see if the client
@ -257,25 +246,7 @@ func (g *GameServer) handleConnection(conn net.Conn) {
g.Infof("Closing connection with %s: did not receive new player connection request...", conn.RemoteAddr().String())
}
if err := g.registerConnection(packet.PacketData, conn); err != nil {
switch err {
case errServerFull: // Server is currently full and not accepting new connections.
sf, serverFullErr := d2netpacket.CreateServerFullPacket()
if serverFullErr != nil {
g.Errorf("ServerFullPacket: %v", serverFullErr)
}
msf, marshalServerFullErr := d2netpacket.MarshalPacket(sf)
if marshalServerFullErr != nil {
g.Errorf("MarshalPacket: %v", marshalServerFullErr)
}
_, errServerFullPacket := conn.Write(msf)
g.Warningf("%v", errServerFullPacket)
case errPlayerAlreadyExists: // Player is already registered and did not disconnection correctly.
g.Errorf("%v", err)
}
if client, err = g.registerConnection(packet.PacketData, conn); err != nil {
return
}
@ -286,7 +257,10 @@ func (g *GameServer) handleConnection(conn net.Conn) {
case <-g.ctx.Done():
return
default:
g.packetManagerChan <- packet.PacketData
g.packetManagerChan <- ReceivedPacket{
Client: client,
Packet: packet,
}
}
}
}
@ -296,12 +270,28 @@ func (g *GameServer) handleConnection(conn net.Conn) {
// Errors:
// - errServerFull
// - errPlayerAlreadyExists
func (g *GameServer) registerConnection(b []byte, conn net.Conn) error {
func (g *GameServer) registerConnection(b []byte, conn net.Conn) (ClientConnection, error) {
var client ClientConnection
g.Lock()
defer g.Unlock()
// check to see if the server is full
if len(g.connections) >= g.maxConnections {
return errServerFull
sf, serverFullErr := d2netpacket.CreateServerFullPacket()
if serverFullErr != nil {
g.Errorf("ServerFullPacket: %v", serverFullErr)
}
msf, marshalServerFullErr := d2netpacket.MarshalPacket(sf)
if marshalServerFullErr != nil {
g.Errorf("MarshalPacket: %v", marshalServerFullErr)
}
_, errServerFullPacket := conn.Write(msf)
g.Warningf("%v", errServerFullPacket)
return client, errServerFull
}
// if it is not full, unmarshal the playerConnectionRequest
@ -312,29 +302,17 @@ func (g *GameServer) registerConnection(b []byte, conn net.Conn) error {
// check to see if the player is already registered
if _, ok := g.connections[packet.ID]; ok {
return errPlayerAlreadyExists
g.Errorf("%v", errPlayerAlreadyExists)
return client, errPlayerAlreadyExists
}
// Client a new TCP Client Connection and add it to the connections map
client := d2tcpclientconnection.CreateTCPClientConnection(conn, packet.ID)
client = d2tcpclientconnection.CreateTCPClientConnection(conn, packet.ID)
client.SetPlayerState(packet.PlayerState)
g.Infof("Client connected with an id of %s", client.GetUniqueID())
g.connections[client.GetUniqueID()] = client
// Temporary position hack --------------------------------------------
// https://github.com/OpenDiablo2/OpenDiablo2/issues/829
sx, sy := g.mapEngines[0].GetStartPosition()
clientPlayerState := client.GetPlayerState()
clientPlayerState.X = sx
clientPlayerState.Y = sy
// ---------
g.OnClientConnected(client)
// This really should be deferred however to much time will be spend holding a lock when we attempt to send a packet
g.Unlock()
g.handleClientConnection(client, sx, sy)
return nil
return client, nil
}
// OnClientConnected initializes the given ClientConnection. It sends the
@ -447,12 +425,27 @@ func (g *GameServer) handleClientConnection(client ClientConnection, x, y float6
// OnClientDisconnected removes the given client from the list
// of client connections.
// If this client was the host, disconnects all clients and kills GameServer.
func (g *GameServer) OnClientDisconnected(client ClientConnection) {
g.Infof("Client disconnected with an id of %s", client.GetUniqueID())
delete(g.connections, client.GetUniqueID())
if client.GetConnectionType() == d2clientconnectiontype.Local {
g.Info("Host disconnected, game server shuting down")
serverClosed, err := d2netpacket.CreateServerClosedPacket()
if err != nil {
g.Errorf("failed to generate ServerClosed packet after host disconnected: %s", err)
} else {
g.sendPacketToClients(serverClosed)
}
g.Stop()
}
}
// OnPacketReceived is called by the local client to 'send' a packet to the server.
// OnPacketReceived is called when a packet has been received from a remote client,
// and by the local client to 'send' a packet to the server,
// nolint:gocyclo // switch statement on packet type makes sense, no need to change
func (g *GameServer) OnPacketReceived(client ClientConnection, packet d2netpacket.NetPacket) error {
if g == nil {
@ -490,8 +483,13 @@ func (g *GameServer) OnPacketReceived(client ClientConnection, packet d2netpacke
if err != nil {
g.Errorf("GameServer: error saving saving Player: %s", err)
}
case d2netpackettype.PlayerConnectionRequest:
break // prevent log message. these are handled by handleConnection
case d2netpackettype.PlayerDisconnectionNotification:
g.sendPacketToClients(packet)
g.OnClientDisconnected(client)
default:
g.Warningf("GameServer: received unknown packet %T", packet)
g.Warningf("GameServer: received unknown packet %s", packet.PacketType)
}
return nil