1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2024-06-10 18:00:43 +00:00

report transportation error to observer

This commit is contained in:
Shelikhoo 2021-03-07 12:14:58 +00:00
parent a42ef81ac0
commit a553ccc4c4
No known key found for this signature in database
GPG Key ID: C4D5E79D22B25316
4 changed files with 60 additions and 5 deletions

View File

@ -0,0 +1,26 @@
package observatory
import "github.com/v2fly/v2ray-core/v4/common/errors"
type errorCollector struct {
errors *errors.Error
}
func (e *errorCollector) SubmitError(err error) {
if e.errors == nil {
e.errors = newError("underlying connection error").Base(err)
return
}
e.errors = e.errors.Base(newError("underlying connection error").Base(err))
}
func newErrorCollector() *errorCollector {
return &errorCollector{}
}
func (e *errorCollector) UnderlyingError() error {
if e.errors == nil {
return newError("failed to produce report")
}
return e.errors
}

View File

@ -8,6 +8,7 @@ import (
core "github.com/v2fly/v2ray-core/v4"
"github.com/v2fly/v2ray-core/v4/common"
v2net "github.com/v2fly/v2ray-core/v4/common/net"
"github.com/v2fly/v2ray-core/v4/common/session"
"github.com/v2fly/v2ray-core/v4/common/signal/done"
"github.com/v2fly/v2ray-core/v4/common/task"
"github.com/v2fly/v2ray-core/v4/features/extension"
@ -81,6 +82,8 @@ func (o *Observer) updateStatus(outbounds []string) {
}
func (o *Observer) probe(outbound string) ProbeResult {
errorCollectorForRequest := newErrorCollector()
httpTransport := http.Transport{
Proxy: func(*http.Request) (*url.URL, error) {
return nil, nil
@ -93,7 +96,8 @@ func (o *Observer) probe(outbound string) ProbeResult {
if err != nil {
return newError("cannot understand address").Base(err)
}
conn, err := tagged.Dialer(o.ctx, dest, outbound)
trackedCtx := session.TrackedConnectionError(o.ctx, errorCollectorForRequest)
conn, err := tagged.Dialer(trackedCtx, dest, outbound)
if err != nil {
return newError("cannot dial remote address", dest).Base(err)
}
@ -130,8 +134,13 @@ func (o *Observer) probe(outbound string) ProbeResult {
return nil
})
if err != nil {
newError("the outbound ", outbound, "is dead:").Base(err).AtInfo().WriteToLog()
return ProbeResult{Alive: false, LastErrorReason: err.Error()}
fullerr := newError("underlying connection failed").Base(errorCollectorForRequest.UnderlyingError())
fullerr = newError("with outbound handler report").Base(fullerr)
fullerr = newError("GET request failed:", err).Base(fullerr)
fullerr = newError("the outbound ", outbound, "is dead:").Base(fullerr)
fullerr = fullerr.AtInfo()
fullerr.WriteToLog()
return ProbeResult{Alive: false, LastErrorReason: fullerr.Error()}
}
newError("the outbound ", outbound, "is alive:", GETTime.Seconds()).AtInfo().WriteToLog()
return ProbeResult{Alive: true, Delay: GETTime.Milliseconds()}

View File

@ -134,13 +134,17 @@ func (h *Handler) Tag() string {
func (h *Handler) Dispatch(ctx context.Context, link *transport.Link) {
if h.mux != nil && (h.mux.Enabled || session.MuxPreferedFromContext(ctx)) {
if err := h.mux.Dispatch(ctx, link); err != nil {
newError("failed to process mux outbound traffic").Base(err).WriteToLog(session.ExportIDToError(ctx))
err := newError("failed to process mux outbound traffic").Base(err)
session.SubmitOutboundErrorToOriginator(ctx, err)
err.WriteToLog(session.ExportIDToError(ctx))
common.Interrupt(link.Writer)
}
} else {
if err := h.proxy.Process(ctx, link, h); err != nil {
// Ensure outbound ray is properly closed.
newError("failed to process outbound traffic").Base(err).WriteToLog(session.ExportIDToError(ctx))
err := newError("failed to process outbound traffic").Base(err)
session.SubmitOutboundErrorToOriginator(ctx, err)
err.WriteToLog(session.ExportIDToError(ctx))
common.Interrupt(link.Writer)
} else {
common.Must(common.Close(link.Writer))

View File

@ -13,6 +13,7 @@ const (
contentSessionKey
muxPreferedSessionKey
sockoptSessionKey
trackedConnectionErrorKey
)
// ContextWithID returns a new context with the given ID.
@ -116,3 +117,18 @@ func SetForcedOutboundTagToContext(ctx context.Context, tag string) context.Cont
ContentFromContext(ctx).SetAttribute("forcedOutboundTag", tag)
return ctx
}
type TrackedRequestErrorFeedback interface {
SubmitError(err error)
}
func SubmitOutboundErrorToOriginator(ctx context.Context, err error) {
if errorTracker := ctx.Value(trackedConnectionErrorKey); errorTracker != nil {
errorTracker := errorTracker.(TrackedRequestErrorFeedback)
errorTracker.SubmitError(err)
}
}
func TrackedConnectionError(ctx context.Context, tracker TrackedRequestErrorFeedback) context.Context {
return context.WithValue(ctx, trackedConnectionErrorKey, tracker)
}