mirror of
https://github.com/v2fly/v2ray-core.git
synced 2024-09-12 23:18:36 -04:00
rename 'this'
This commit is contained in:
parent
d00f8eef56
commit
f95c322677
@ -26,31 +26,31 @@ func NewDefaultDispatcher(space app.Space) *DefaultDispatcher {
|
||||
}
|
||||
|
||||
// Private: Used by app.Space only.
|
||||
func (this *DefaultDispatcher) Initialize(space app.Space) error {
|
||||
func (v *DefaultDispatcher) Initialize(space app.Space) error {
|
||||
if !space.HasApp(proxyman.APP_ID_OUTBOUND_MANAGER) {
|
||||
return errors.New("DefaultDispatcher: OutboundHandlerManager is not found in the space.")
|
||||
}
|
||||
this.ohm = space.GetApp(proxyman.APP_ID_OUTBOUND_MANAGER).(proxyman.OutboundHandlerManager)
|
||||
v.ohm = space.GetApp(proxyman.APP_ID_OUTBOUND_MANAGER).(proxyman.OutboundHandlerManager)
|
||||
|
||||
if space.HasApp(router.APP_ID) {
|
||||
this.router = space.GetApp(router.APP_ID).(*router.Router)
|
||||
v.router = space.GetApp(router.APP_ID).(*router.Router)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *DefaultDispatcher) Release() {
|
||||
func (v *DefaultDispatcher) Release() {
|
||||
|
||||
}
|
||||
|
||||
func (this *DefaultDispatcher) DispatchToOutbound(session *proxy.SessionInfo) ray.InboundRay {
|
||||
func (v *DefaultDispatcher) DispatchToOutbound(session *proxy.SessionInfo) ray.InboundRay {
|
||||
direct := ray.NewRay()
|
||||
dispatcher := this.ohm.GetDefaultHandler()
|
||||
dispatcher := v.ohm.GetDefaultHandler()
|
||||
destination := session.Destination
|
||||
|
||||
if this.router != nil {
|
||||
if tag, err := this.router.TakeDetour(session); err == nil {
|
||||
if handler := this.ohm.GetHandler(tag); handler != nil {
|
||||
if v.router != nil {
|
||||
if tag, err := v.router.TakeDetour(session); err == nil {
|
||||
if handler := v.ohm.GetHandler(tag); handler != nil {
|
||||
log.Info("DefaultDispatcher: Taking detour [", tag, "] for [", destination, "].")
|
||||
dispatcher = handler
|
||||
} else {
|
||||
@ -64,14 +64,14 @@ func (this *DefaultDispatcher) DispatchToOutbound(session *proxy.SessionInfo) ra
|
||||
if session.Inbound != nil && session.Inbound.AllowPassiveConnection {
|
||||
go dispatcher.Dispatch(destination, alloc.NewLocalBuffer(32).Clear(), direct)
|
||||
} else {
|
||||
go this.FilterPacketAndDispatch(destination, direct, dispatcher)
|
||||
go v.FilterPacketAndDispatch(destination, direct, dispatcher)
|
||||
}
|
||||
|
||||
return direct
|
||||
}
|
||||
|
||||
// Private: Visible for testing.
|
||||
func (this *DefaultDispatcher) FilterPacketAndDispatch(destination v2net.Destination, link ray.OutboundRay, dispatcher proxy.OutboundHandler) {
|
||||
func (v *DefaultDispatcher) FilterPacketAndDispatch(destination v2net.Destination, link ray.OutboundRay, dispatcher proxy.OutboundHandler) {
|
||||
payload, err := link.OutboundInput().Read()
|
||||
if err != nil {
|
||||
log.Info("DefaultDispatcher: No payload towards ", destination, ", stopping now.")
|
||||
|
@ -30,10 +30,10 @@ func NewTestPacketDispatcher(handler func(destination v2net.Destination, traffic
|
||||
}
|
||||
}
|
||||
|
||||
func (this *TestPacketDispatcher) DispatchToOutbound(session *proxy.SessionInfo) ray.InboundRay {
|
||||
func (v *TestPacketDispatcher) DispatchToOutbound(session *proxy.SessionInfo) ray.InboundRay {
|
||||
traffic := ray.NewRay()
|
||||
this.Destination <- session.Destination
|
||||
go this.Handler(session.Destination, traffic)
|
||||
v.Destination <- session.Destination
|
||||
go v.Handler(session.Destination, traffic)
|
||||
|
||||
return traffic
|
||||
}
|
||||
|
@ -58,50 +58,50 @@ func NewUDPNameServer(address v2net.Destination, dispatcher dispatcher.PacketDis
|
||||
}
|
||||
|
||||
// Private: Visible for testing.
|
||||
func (this *UDPNameServer) Cleanup() {
|
||||
func (v *UDPNameServer) Cleanup() {
|
||||
expiredRequests := make([]uint16, 0, 16)
|
||||
now := time.Now()
|
||||
this.Lock()
|
||||
for id, r := range this.requests {
|
||||
v.Lock()
|
||||
for id, r := range v.requests {
|
||||
if r.expire.Before(now) {
|
||||
expiredRequests = append(expiredRequests, id)
|
||||
close(r.response)
|
||||
}
|
||||
}
|
||||
for _, id := range expiredRequests {
|
||||
delete(this.requests, id)
|
||||
delete(v.requests, id)
|
||||
}
|
||||
this.Unlock()
|
||||
v.Unlock()
|
||||
expiredRequests = nil
|
||||
}
|
||||
|
||||
// Private: Visible for testing.
|
||||
func (this *UDPNameServer) AssignUnusedID(response chan<- *ARecord) uint16 {
|
||||
func (v *UDPNameServer) AssignUnusedID(response chan<- *ARecord) uint16 {
|
||||
var id uint16
|
||||
this.Lock()
|
||||
if len(this.requests) > CleanupThreshold && this.nextCleanup.Before(time.Now()) {
|
||||
this.nextCleanup = time.Now().Add(CleanupInterval)
|
||||
go this.Cleanup()
|
||||
v.Lock()
|
||||
if len(v.requests) > CleanupThreshold && v.nextCleanup.Before(time.Now()) {
|
||||
v.nextCleanup = time.Now().Add(CleanupInterval)
|
||||
go v.Cleanup()
|
||||
}
|
||||
|
||||
for {
|
||||
id = uint16(dice.Roll(65536))
|
||||
if _, found := this.requests[id]; found {
|
||||
if _, found := v.requests[id]; found {
|
||||
continue
|
||||
}
|
||||
log.Debug("DNS: Add pending request id ", id)
|
||||
this.requests[id] = &PendingRequest{
|
||||
v.requests[id] = &PendingRequest{
|
||||
expire: time.Now().Add(time.Second * 8),
|
||||
response: response,
|
||||
}
|
||||
break
|
||||
}
|
||||
this.Unlock()
|
||||
v.Unlock()
|
||||
return id
|
||||
}
|
||||
|
||||
// Private: Visible for testing.
|
||||
func (this *UDPNameServer) HandleResponse(dest v2net.Destination, payload *alloc.Buffer) {
|
||||
func (v *UDPNameServer) HandleResponse(dest v2net.Destination, payload *alloc.Buffer) {
|
||||
msg := new(dns.Msg)
|
||||
err := msg.Unpack(payload.Value)
|
||||
if err != nil {
|
||||
@ -115,14 +115,14 @@ func (this *UDPNameServer) HandleResponse(dest v2net.Destination, payload *alloc
|
||||
ttl := DefaultTTL
|
||||
log.Debug("DNS: Handling response for id ", id, " content: ", msg.String())
|
||||
|
||||
this.Lock()
|
||||
request, found := this.requests[id]
|
||||
v.Lock()
|
||||
request, found := v.requests[id]
|
||||
if !found {
|
||||
this.Unlock()
|
||||
v.Unlock()
|
||||
return
|
||||
}
|
||||
delete(this.requests, id)
|
||||
this.Unlock()
|
||||
delete(v.requests, id)
|
||||
v.Unlock()
|
||||
|
||||
for _, rr := range msg.Answer {
|
||||
switch rr := rr.(type) {
|
||||
@ -144,7 +144,7 @@ func (this *UDPNameServer) HandleResponse(dest v2net.Destination, payload *alloc
|
||||
close(request.response)
|
||||
}
|
||||
|
||||
func (this *UDPNameServer) BuildQueryA(domain string, id uint16) *alloc.Buffer {
|
||||
func (v *UDPNameServer) BuildQueryA(domain string, id uint16) *alloc.Buffer {
|
||||
buffer := alloc.NewBuffer()
|
||||
msg := new(dns.Msg)
|
||||
msg.Id = id
|
||||
@ -162,24 +162,24 @@ func (this *UDPNameServer) BuildQueryA(domain string, id uint16) *alloc.Buffer {
|
||||
return buffer
|
||||
}
|
||||
|
||||
func (this *UDPNameServer) DispatchQuery(payload *alloc.Buffer) {
|
||||
this.udpServer.Dispatch(&proxy.SessionInfo{Source: pseudoDestination, Destination: this.address}, payload, this.HandleResponse)
|
||||
func (v *UDPNameServer) DispatchQuery(payload *alloc.Buffer) {
|
||||
v.udpServer.Dispatch(&proxy.SessionInfo{Source: pseudoDestination, Destination: v.address}, payload, v.HandleResponse)
|
||||
}
|
||||
|
||||
func (this *UDPNameServer) QueryA(domain string) <-chan *ARecord {
|
||||
func (v *UDPNameServer) QueryA(domain string) <-chan *ARecord {
|
||||
response := make(chan *ARecord, 1)
|
||||
id := this.AssignUnusedID(response)
|
||||
id := v.AssignUnusedID(response)
|
||||
|
||||
this.DispatchQuery(this.BuildQueryA(domain, id))
|
||||
v.DispatchQuery(v.BuildQueryA(domain, id))
|
||||
|
||||
go func() {
|
||||
for i := 0; i < 2; i++ {
|
||||
time.Sleep(time.Second)
|
||||
this.Lock()
|
||||
_, found := this.requests[id]
|
||||
this.Unlock()
|
||||
v.Lock()
|
||||
_, found := v.requests[id]
|
||||
v.Unlock()
|
||||
if found {
|
||||
this.DispatchQuery(this.BuildQueryA(domain, id))
|
||||
v.DispatchQuery(v.BuildQueryA(domain, id))
|
||||
} else {
|
||||
break
|
||||
}
|
||||
@ -192,7 +192,7 @@ func (this *UDPNameServer) QueryA(domain string) <-chan *ARecord {
|
||||
type LocalNameServer struct {
|
||||
}
|
||||
|
||||
func (this *LocalNameServer) QueryA(domain string) <-chan *ARecord {
|
||||
func (v *LocalNameServer) QueryA(domain string) <-chan *ARecord {
|
||||
response := make(chan *ARecord, 1)
|
||||
|
||||
go func() {
|
||||
|
@ -65,44 +65,44 @@ func NewCacheServer(space app.Space, config *Config) *CacheServer {
|
||||
return server
|
||||
}
|
||||
|
||||
func (this *CacheServer) Release() {
|
||||
func (v *CacheServer) Release() {
|
||||
|
||||
}
|
||||
|
||||
// Private: Visible for testing.
|
||||
func (this *CacheServer) GetCached(domain string) []net.IP {
|
||||
this.RLock()
|
||||
defer this.RUnlock()
|
||||
func (v *CacheServer) GetCached(domain string) []net.IP {
|
||||
v.RLock()
|
||||
defer v.RUnlock()
|
||||
|
||||
if record, found := this.records[domain]; found && record.A.Expire.After(time.Now()) {
|
||||
if record, found := v.records[domain]; found && record.A.Expire.After(time.Now()) {
|
||||
return record.A.IPs
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *CacheServer) Get(domain string) []net.IP {
|
||||
if ip, found := this.hosts[domain]; found {
|
||||
func (v *CacheServer) Get(domain string) []net.IP {
|
||||
if ip, found := v.hosts[domain]; found {
|
||||
return []net.IP{ip}
|
||||
}
|
||||
|
||||
domain = dns.Fqdn(domain)
|
||||
ips := this.GetCached(domain)
|
||||
ips := v.GetCached(domain)
|
||||
if ips != nil {
|
||||
return ips
|
||||
}
|
||||
|
||||
for _, server := range this.servers {
|
||||
for _, server := range v.servers {
|
||||
response := server.QueryA(domain)
|
||||
select {
|
||||
case a, open := <-response:
|
||||
if !open || a == nil {
|
||||
continue
|
||||
}
|
||||
this.Lock()
|
||||
this.records[domain] = &DomainRecord{
|
||||
v.Lock()
|
||||
v.records[domain] = &DomainRecord{
|
||||
A: a,
|
||||
}
|
||||
this.Unlock()
|
||||
v.Unlock()
|
||||
log.Debug("DNS: Returning ", len(a.IPs), " IPs for domain ", domain)
|
||||
return a.IPs
|
||||
case <-time.After(QueryTimeout):
|
||||
@ -115,12 +115,12 @@ func (this *CacheServer) Get(domain string) []net.IP {
|
||||
|
||||
type CacheServerFactory struct{}
|
||||
|
||||
func (this CacheServerFactory) Create(space app.Space, config interface{}) (app.Application, error) {
|
||||
func (v CacheServerFactory) Create(space app.Space, config interface{}) (app.Application, error) {
|
||||
server := NewCacheServer(space, config.(*Config))
|
||||
return server, nil
|
||||
}
|
||||
|
||||
func (this CacheServerFactory) AppId() app.ID {
|
||||
func (v CacheServerFactory) AppId() app.ID {
|
||||
return APP_ID
|
||||
}
|
||||
|
||||
|
@ -35,12 +35,12 @@ func NewOutboundProxy(space app.Space) *OutboundProxy {
|
||||
return proxy
|
||||
}
|
||||
|
||||
func (this *OutboundProxy) RegisterDialer() {
|
||||
internet.ProxyDialer = this.Dial
|
||||
func (v *OutboundProxy) RegisterDialer() {
|
||||
internet.ProxyDialer = v.Dial
|
||||
}
|
||||
|
||||
func (this *OutboundProxy) Dial(src v2net.Address, dest v2net.Destination, options internet.DialerOptions) (internet.Connection, error) {
|
||||
handler := this.outboundManager.GetHandler(options.Proxy.Tag)
|
||||
func (v *OutboundProxy) Dial(src v2net.Address, dest v2net.Destination, options internet.DialerOptions) (internet.Connection, error) {
|
||||
handler := v.outboundManager.GetHandler(options.Proxy.Tag)
|
||||
if handler == nil {
|
||||
log.Warning("Proxy: Failed to get outbound handler with tag: ", options.Proxy.Tag)
|
||||
return internet.Dial(src, dest, internet.DialerOptions{
|
||||
@ -53,7 +53,7 @@ func (this *OutboundProxy) Dial(src v2net.Address, dest v2net.Destination, optio
|
||||
return NewProxyConnection(src, dest, stream), nil
|
||||
}
|
||||
|
||||
func (this *OutboundProxy) Release() {
|
||||
func (v *OutboundProxy) Release() {
|
||||
|
||||
}
|
||||
|
||||
@ -83,53 +83,53 @@ func NewProxyConnection(src v2net.Address, dest v2net.Destination, stream ray.Ra
|
||||
}
|
||||
}
|
||||
|
||||
func (this *ProxyConnection) Read(b []byte) (int, error) {
|
||||
if this.closed {
|
||||
func (v *ProxyConnection) Read(b []byte) (int, error) {
|
||||
if v.closed {
|
||||
return 0, io.EOF
|
||||
}
|
||||
return this.reader.Read(b)
|
||||
return v.reader.Read(b)
|
||||
}
|
||||
|
||||
func (this *ProxyConnection) Write(b []byte) (int, error) {
|
||||
if this.closed {
|
||||
func (v *ProxyConnection) Write(b []byte) (int, error) {
|
||||
if v.closed {
|
||||
return 0, io.ErrClosedPipe
|
||||
}
|
||||
return this.writer.Write(b)
|
||||
return v.writer.Write(b)
|
||||
}
|
||||
|
||||
func (this *ProxyConnection) Close() error {
|
||||
this.closed = true
|
||||
this.stream.InboundInput().Close()
|
||||
this.stream.InboundOutput().Release()
|
||||
this.reader.Release()
|
||||
this.writer.Release()
|
||||
func (v *ProxyConnection) Close() error {
|
||||
v.closed = true
|
||||
v.stream.InboundInput().Close()
|
||||
v.stream.InboundOutput().Release()
|
||||
v.reader.Release()
|
||||
v.writer.Release()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *ProxyConnection) LocalAddr() net.Addr {
|
||||
return this.localAddr
|
||||
func (v *ProxyConnection) LocalAddr() net.Addr {
|
||||
return v.localAddr
|
||||
}
|
||||
|
||||
func (this *ProxyConnection) RemoteAddr() net.Addr {
|
||||
return this.remoteAddr
|
||||
func (v *ProxyConnection) RemoteAddr() net.Addr {
|
||||
return v.remoteAddr
|
||||
}
|
||||
|
||||
func (this *ProxyConnection) SetDeadline(t time.Time) error {
|
||||
func (v *ProxyConnection) SetDeadline(t time.Time) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *ProxyConnection) SetReadDeadline(t time.Time) error {
|
||||
func (v *ProxyConnection) SetReadDeadline(t time.Time) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *ProxyConnection) SetWriteDeadline(t time.Time) error {
|
||||
func (v *ProxyConnection) SetWriteDeadline(t time.Time) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *ProxyConnection) Reusable() bool {
|
||||
func (v *ProxyConnection) Reusable() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (this *ProxyConnection) SetReusable(bool) {
|
||||
func (v *ProxyConnection) SetReusable(bool) {
|
||||
|
||||
}
|
||||
|
@ -33,37 +33,37 @@ func NewDefaultOutboundHandlerManager() *DefaultOutboundHandlerManager {
|
||||
}
|
||||
}
|
||||
|
||||
func (this *DefaultOutboundHandlerManager) Release() {
|
||||
func (v *DefaultOutboundHandlerManager) Release() {
|
||||
|
||||
}
|
||||
|
||||
func (this *DefaultOutboundHandlerManager) GetDefaultHandler() proxy.OutboundHandler {
|
||||
this.RLock()
|
||||
defer this.RUnlock()
|
||||
if this.defaultHandler == nil {
|
||||
func (v *DefaultOutboundHandlerManager) GetDefaultHandler() proxy.OutboundHandler {
|
||||
v.RLock()
|
||||
defer v.RUnlock()
|
||||
if v.defaultHandler == nil {
|
||||
return nil
|
||||
}
|
||||
return this.defaultHandler
|
||||
return v.defaultHandler
|
||||
}
|
||||
|
||||
func (this *DefaultOutboundHandlerManager) SetDefaultHandler(handler proxy.OutboundHandler) {
|
||||
this.Lock()
|
||||
defer this.Unlock()
|
||||
this.defaultHandler = handler
|
||||
func (v *DefaultOutboundHandlerManager) SetDefaultHandler(handler proxy.OutboundHandler) {
|
||||
v.Lock()
|
||||
defer v.Unlock()
|
||||
v.defaultHandler = handler
|
||||
}
|
||||
|
||||
func (this *DefaultOutboundHandlerManager) GetHandler(tag string) proxy.OutboundHandler {
|
||||
this.RLock()
|
||||
defer this.RUnlock()
|
||||
if handler, found := this.taggedHandler[tag]; found {
|
||||
func (v *DefaultOutboundHandlerManager) GetHandler(tag string) proxy.OutboundHandler {
|
||||
v.RLock()
|
||||
defer v.RUnlock()
|
||||
if handler, found := v.taggedHandler[tag]; found {
|
||||
return handler
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *DefaultOutboundHandlerManager) SetHandler(tag string, handler proxy.OutboundHandler) {
|
||||
this.Lock()
|
||||
defer this.Unlock()
|
||||
func (v *DefaultOutboundHandlerManager) SetHandler(tag string, handler proxy.OutboundHandler) {
|
||||
v.Lock()
|
||||
defer v.Unlock()
|
||||
|
||||
this.taggedHandler[tag] = handler
|
||||
v.taggedHandler[tag] = handler
|
||||
}
|
||||
|
@ -20,13 +20,13 @@ func NewConditionChan() *ConditionChan {
|
||||
return &condChan
|
||||
}
|
||||
|
||||
func (this *ConditionChan) Add(cond Condition) *ConditionChan {
|
||||
*this = append(*this, cond)
|
||||
return this
|
||||
func (v *ConditionChan) Add(cond Condition) *ConditionChan {
|
||||
*v = append(*v, cond)
|
||||
return v
|
||||
}
|
||||
|
||||
func (this *ConditionChan) Apply(session *proxy.SessionInfo) bool {
|
||||
for _, cond := range *this {
|
||||
func (v *ConditionChan) Apply(session *proxy.SessionInfo) bool {
|
||||
for _, cond := range *v {
|
||||
if !cond.Apply(session) {
|
||||
return false
|
||||
}
|
||||
@ -34,8 +34,8 @@ func (this *ConditionChan) Apply(session *proxy.SessionInfo) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (this *ConditionChan) Len() int {
|
||||
return len(*this)
|
||||
func (v *ConditionChan) Len() int {
|
||||
return len(*v)
|
||||
}
|
||||
|
||||
type AnyCondition []Condition
|
||||
@ -45,13 +45,13 @@ func NewAnyCondition() *AnyCondition {
|
||||
return &anyCond
|
||||
}
|
||||
|
||||
func (this *AnyCondition) Add(cond Condition) *AnyCondition {
|
||||
*this = append(*this, cond)
|
||||
return this
|
||||
func (v *AnyCondition) Add(cond Condition) *AnyCondition {
|
||||
*v = append(*v, cond)
|
||||
return v
|
||||
}
|
||||
|
||||
func (this *AnyCondition) Apply(session *proxy.SessionInfo) bool {
|
||||
for _, cond := range *this {
|
||||
func (v *AnyCondition) Apply(session *proxy.SessionInfo) bool {
|
||||
for _, cond := range *v {
|
||||
if cond.Apply(session) {
|
||||
return true
|
||||
}
|
||||
@ -59,8 +59,8 @@ func (this *AnyCondition) Apply(session *proxy.SessionInfo) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (this *AnyCondition) Len() int {
|
||||
return len(*this)
|
||||
func (v *AnyCondition) Len() int {
|
||||
return len(*v)
|
||||
}
|
||||
|
||||
type PlainDomainMatcher struct {
|
||||
@ -73,13 +73,13 @@ func NewPlainDomainMatcher(pattern string) *PlainDomainMatcher {
|
||||
}
|
||||
}
|
||||
|
||||
func (this *PlainDomainMatcher) Apply(session *proxy.SessionInfo) bool {
|
||||
func (v *PlainDomainMatcher) Apply(session *proxy.SessionInfo) bool {
|
||||
dest := session.Destination
|
||||
if !dest.Address.Family().IsDomain() {
|
||||
return false
|
||||
}
|
||||
domain := dest.Address.Domain()
|
||||
return strings.Contains(domain, this.pattern)
|
||||
return strings.Contains(domain, v.pattern)
|
||||
}
|
||||
|
||||
type RegexpDomainMatcher struct {
|
||||
@ -96,13 +96,13 @@ func NewRegexpDomainMatcher(pattern string) (*RegexpDomainMatcher, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (this *RegexpDomainMatcher) Apply(session *proxy.SessionInfo) bool {
|
||||
func (v *RegexpDomainMatcher) Apply(session *proxy.SessionInfo) bool {
|
||||
dest := session.Destination
|
||||
if !dest.Address.Family().IsDomain() {
|
||||
return false
|
||||
}
|
||||
domain := dest.Address.Domain()
|
||||
return this.pattern.MatchString(strings.ToLower(domain))
|
||||
return v.pattern.MatchString(strings.ToLower(domain))
|
||||
}
|
||||
|
||||
type CIDRMatcher struct {
|
||||
@ -121,15 +121,15 @@ func NewCIDRMatcher(ip []byte, mask uint32, onSource bool) (*CIDRMatcher, error)
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (this *CIDRMatcher) Apply(session *proxy.SessionInfo) bool {
|
||||
func (v *CIDRMatcher) Apply(session *proxy.SessionInfo) bool {
|
||||
dest := session.Destination
|
||||
if this.onSource {
|
||||
if v.onSource {
|
||||
dest = session.Source
|
||||
}
|
||||
if !dest.Address.Family().Either(v2net.AddressFamilyIPv4, v2net.AddressFamilyIPv6) {
|
||||
return false
|
||||
}
|
||||
return this.cidr.Contains(dest.Address.IP())
|
||||
return v.cidr.Contains(dest.Address.IP())
|
||||
}
|
||||
|
||||
type IPv4Matcher struct {
|
||||
@ -144,15 +144,15 @@ func NewIPv4Matcher(ipnet *v2net.IPNet, onSource bool) *IPv4Matcher {
|
||||
}
|
||||
}
|
||||
|
||||
func (this *IPv4Matcher) Apply(session *proxy.SessionInfo) bool {
|
||||
func (v *IPv4Matcher) Apply(session *proxy.SessionInfo) bool {
|
||||
dest := session.Destination
|
||||
if this.onSource {
|
||||
if v.onSource {
|
||||
dest = session.Source
|
||||
}
|
||||
if !dest.Address.Family().Either(v2net.AddressFamilyIPv4) {
|
||||
return false
|
||||
}
|
||||
return this.ipv4net.Contains(dest.Address.IP())
|
||||
return v.ipv4net.Contains(dest.Address.IP())
|
||||
}
|
||||
|
||||
type PortMatcher struct {
|
||||
@ -165,8 +165,8 @@ func NewPortMatcher(portRange v2net.PortRange) *PortMatcher {
|
||||
}
|
||||
}
|
||||
|
||||
func (this *PortMatcher) Apply(session *proxy.SessionInfo) bool {
|
||||
return this.port.Contains(session.Destination.Port)
|
||||
func (v *PortMatcher) Apply(session *proxy.SessionInfo) bool {
|
||||
return v.port.Contains(session.Destination.Port)
|
||||
}
|
||||
|
||||
type NetworkMatcher struct {
|
||||
@ -179,8 +179,8 @@ func NewNetworkMatcher(network *v2net.NetworkList) *NetworkMatcher {
|
||||
}
|
||||
}
|
||||
|
||||
func (this *NetworkMatcher) Apply(session *proxy.SessionInfo) bool {
|
||||
return this.network.HasNetwork(session.Destination.Network)
|
||||
func (v *NetworkMatcher) Apply(session *proxy.SessionInfo) bool {
|
||||
return v.network.HasNetwork(session.Destination.Network)
|
||||
}
|
||||
|
||||
type UserMatcher struct {
|
||||
@ -193,11 +193,11 @@ func NewUserMatcher(users []string) *UserMatcher {
|
||||
}
|
||||
}
|
||||
|
||||
func (this *UserMatcher) Apply(session *proxy.SessionInfo) bool {
|
||||
func (v *UserMatcher) Apply(session *proxy.SessionInfo) bool {
|
||||
if session.User == nil {
|
||||
return false
|
||||
}
|
||||
for _, u := range this.user {
|
||||
for _, u := range v.user {
|
||||
if u == session.User.Email {
|
||||
return true
|
||||
}
|
||||
@ -215,12 +215,12 @@ func NewInboundTagMatcher(tags []string) *InboundTagMatcher {
|
||||
}
|
||||
}
|
||||
|
||||
func (this *InboundTagMatcher) Apply(session *proxy.SessionInfo) bool {
|
||||
func (v *InboundTagMatcher) Apply(session *proxy.SessionInfo) bool {
|
||||
if session.Inbound == nil || len(session.Inbound.Tag) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
for _, t := range this.tags {
|
||||
for _, t := range v.tags {
|
||||
if t == session.Inbound.Tag {
|
||||
return true
|
||||
}
|
||||
|
@ -13,16 +13,16 @@ type Rule struct {
|
||||
Condition Condition
|
||||
}
|
||||
|
||||
func (this *Rule) Apply(session *proxy.SessionInfo) bool {
|
||||
return this.Condition.Apply(session)
|
||||
func (v *Rule) Apply(session *proxy.SessionInfo) bool {
|
||||
return v.Condition.Apply(session)
|
||||
}
|
||||
|
||||
func (this *RoutingRule) BuildCondition() (Condition, error) {
|
||||
func (v *RoutingRule) BuildCondition() (Condition, error) {
|
||||
conds := NewConditionChan()
|
||||
|
||||
if len(this.Domain) > 0 {
|
||||
if len(v.Domain) > 0 {
|
||||
anyCond := NewAnyCondition()
|
||||
for _, domain := range this.Domain {
|
||||
for _, domain := range v.Domain {
|
||||
if domain.Type == Domain_Plain {
|
||||
anyCond.Add(NewPlainDomainMatcher(domain.Value))
|
||||
} else {
|
||||
@ -36,12 +36,12 @@ func (this *RoutingRule) BuildCondition() (Condition, error) {
|
||||
conds.Add(anyCond)
|
||||
}
|
||||
|
||||
if len(this.Cidr) > 0 {
|
||||
if len(v.Cidr) > 0 {
|
||||
ipv4Net := v2net.NewIPNet()
|
||||
ipv6Cond := NewAnyCondition()
|
||||
hasIpv6 := false
|
||||
|
||||
for _, ip := range this.Cidr {
|
||||
for _, ip := range v.Cidr {
|
||||
switch len(ip.Ip) {
|
||||
case net.IPv4len:
|
||||
ipv4Net.AddIP(ip.Ip, byte(ip.Prefix))
|
||||
@ -69,20 +69,20 @@ func (this *RoutingRule) BuildCondition() (Condition, error) {
|
||||
}
|
||||
}
|
||||
|
||||
if this.PortRange != nil {
|
||||
conds.Add(NewPortMatcher(*this.PortRange))
|
||||
if v.PortRange != nil {
|
||||
conds.Add(NewPortMatcher(*v.PortRange))
|
||||
}
|
||||
|
||||
if this.NetworkList != nil {
|
||||
conds.Add(NewNetworkMatcher(this.NetworkList))
|
||||
if v.NetworkList != nil {
|
||||
conds.Add(NewNetworkMatcher(v.NetworkList))
|
||||
}
|
||||
|
||||
if len(this.SourceCidr) > 0 {
|
||||
if len(v.SourceCidr) > 0 {
|
||||
ipv4Net := v2net.NewIPNet()
|
||||
ipv6Cond := NewAnyCondition()
|
||||
hasIpv6 := false
|
||||
|
||||
for _, ip := range this.SourceCidr {
|
||||
for _, ip := range v.SourceCidr {
|
||||
switch len(ip.Ip) {
|
||||
case net.IPv4len:
|
||||
ipv4Net.AddIP(ip.Ip, byte(ip.Prefix))
|
||||
@ -110,12 +110,12 @@ func (this *RoutingRule) BuildCondition() (Condition, error) {
|
||||
}
|
||||
}
|
||||
|
||||
if len(this.UserEmail) > 0 {
|
||||
conds.Add(NewUserMatcher(this.UserEmail))
|
||||
if len(v.UserEmail) > 0 {
|
||||
conds.Add(NewUserMatcher(v.UserEmail))
|
||||
}
|
||||
|
||||
if len(this.InboundTag) > 0 {
|
||||
conds.Add(NewInboundTagMatcher(this.InboundTag))
|
||||
if len(v.InboundTag) > 0 {
|
||||
conds.Add(NewInboundTagMatcher(v.InboundTag))
|
||||
}
|
||||
|
||||
if conds.Len() == 0 {
|
||||
|
@ -53,13 +53,13 @@ func NewRouter(config *Config, space app.Space) *Router {
|
||||
return r
|
||||
}
|
||||
|
||||
func (this *Router) Release() {
|
||||
func (v *Router) Release() {
|
||||
|
||||
}
|
||||
|
||||
// Private: Visible for testing.
|
||||
func (this *Router) ResolveIP(dest v2net.Destination) []v2net.Destination {
|
||||
ips := this.dnsServer.Get(dest.Address.Domain())
|
||||
func (v *Router) ResolveIP(dest v2net.Destination) []v2net.Destination {
|
||||
ips := v.dnsServer.Get(dest.Address.Domain())
|
||||
if len(ips) == 0 {
|
||||
return nil
|
||||
}
|
||||
@ -74,20 +74,20 @@ func (this *Router) ResolveIP(dest v2net.Destination) []v2net.Destination {
|
||||
return dests
|
||||
}
|
||||
|
||||
func (this *Router) takeDetourWithoutCache(session *proxy.SessionInfo) (string, error) {
|
||||
for _, rule := range this.rules {
|
||||
func (v *Router) takeDetourWithoutCache(session *proxy.SessionInfo) (string, error) {
|
||||
for _, rule := range v.rules {
|
||||
if rule.Apply(session) {
|
||||
return rule.Tag, nil
|
||||
}
|
||||
}
|
||||
dest := session.Destination
|
||||
if this.domainStrategy == Config_IpIfNonMatch && dest.Address.Family().IsDomain() {
|
||||
if v.domainStrategy == Config_IpIfNonMatch && dest.Address.Family().IsDomain() {
|
||||
log.Info("Router: Looking up IP for ", dest)
|
||||
ipDests := this.ResolveIP(dest)
|
||||
ipDests := v.ResolveIP(dest)
|
||||
if ipDests != nil {
|
||||
for _, ipDest := range ipDests {
|
||||
log.Info("Router: Trying IP ", ipDest)
|
||||
for _, rule := range this.rules {
|
||||
for _, rule := range v.rules {
|
||||
if rule.Apply(&proxy.SessionInfo{
|
||||
Source: session.Source,
|
||||
Destination: ipDest,
|
||||
@ -103,12 +103,12 @@ func (this *Router) takeDetourWithoutCache(session *proxy.SessionInfo) (string,
|
||||
return "", ErrNoRuleApplicable
|
||||
}
|
||||
|
||||
func (this *Router) TakeDetour(session *proxy.SessionInfo) (string, error) {
|
||||
func (v *Router) TakeDetour(session *proxy.SessionInfo) (string, error) {
|
||||
//destStr := dest.String()
|
||||
//found, tag, err := this.cache.Get(destStr)
|
||||
//found, tag, err := v.cache.Get(destStr)
|
||||
//if !found {
|
||||
tag, err := this.takeDetourWithoutCache(session)
|
||||
//this.cache.Set(destStr, tag, err)
|
||||
tag, err := v.takeDetourWithoutCache(session)
|
||||
//v.cache.Set(destStr, tag, err)
|
||||
return tag, err
|
||||
//}
|
||||
//return tag, err
|
||||
|
@ -11,12 +11,12 @@ type RoutingEntry struct {
|
||||
expire time.Time
|
||||
}
|
||||
|
||||
func (this *RoutingEntry) Extend() {
|
||||
this.expire = time.Now().Add(time.Hour)
|
||||
func (v *RoutingEntry) Extend() {
|
||||
v.expire = time.Now().Add(time.Hour)
|
||||
}
|
||||
|
||||
func (this *RoutingEntry) Expired() bool {
|
||||
return this.expire.Before(time.Now())
|
||||
func (v *RoutingEntry) Expired() bool {
|
||||
return v.expire.Before(time.Now())
|
||||
}
|
||||
|
||||
type RoutingTable struct {
|
||||
@ -30,38 +30,38 @@ func NewRoutingTable() *RoutingTable {
|
||||
}
|
||||
}
|
||||
|
||||
func (this *RoutingTable) Cleanup() {
|
||||
this.Lock()
|
||||
defer this.Unlock()
|
||||
func (v *RoutingTable) Cleanup() {
|
||||
v.Lock()
|
||||
defer v.Unlock()
|
||||
|
||||
for key, value := range this.table {
|
||||
for key, value := range v.table {
|
||||
if value.Expired() {
|
||||
delete(this.table, key)
|
||||
delete(v.table, key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (this *RoutingTable) Set(destination string, tag string, err error) {
|
||||
this.Lock()
|
||||
defer this.Unlock()
|
||||
func (v *RoutingTable) Set(destination string, tag string, err error) {
|
||||
v.Lock()
|
||||
defer v.Unlock()
|
||||
|
||||
entry := &RoutingEntry{
|
||||
tag: tag,
|
||||
err: err,
|
||||
}
|
||||
entry.Extend()
|
||||
this.table[destination] = entry
|
||||
v.table[destination] = entry
|
||||
|
||||
if len(this.table) > 1000 {
|
||||
go this.Cleanup()
|
||||
if len(v.table) > 1000 {
|
||||
go v.Cleanup()
|
||||
}
|
||||
}
|
||||
|
||||
func (this *RoutingTable) Get(destination string) (bool, string, error) {
|
||||
this.RLock()
|
||||
defer this.RUnlock()
|
||||
func (v *RoutingTable) Get(destination string) (bool, string, error) {
|
||||
v.RLock()
|
||||
defer v.RUnlock()
|
||||
|
||||
entry, found := this.table[destination]
|
||||
entry, found := v.table[destination]
|
||||
if !found {
|
||||
return false, "", nil
|
||||
}
|
||||
|
26
app/space.go
26
app/space.go
@ -60,12 +60,12 @@ func NewSpace() Space {
|
||||
}
|
||||
}
|
||||
|
||||
func (this *spaceImpl) InitializeApplication(f ApplicationInitializer) {
|
||||
this.appInit = append(this.appInit, f)
|
||||
func (v *spaceImpl) InitializeApplication(f ApplicationInitializer) {
|
||||
v.appInit = append(v.appInit, f)
|
||||
}
|
||||
|
||||
func (this *spaceImpl) Initialize() error {
|
||||
for _, f := range this.appInit {
|
||||
func (v *spaceImpl) Initialize() error {
|
||||
for _, f := range v.appInit {
|
||||
err := f()
|
||||
if err != nil {
|
||||
return err
|
||||
@ -74,32 +74,32 @@ func (this *spaceImpl) Initialize() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *spaceImpl) HasApp(id ID) bool {
|
||||
_, found := this.cache[id]
|
||||
func (v *spaceImpl) HasApp(id ID) bool {
|
||||
_, found := v.cache[id]
|
||||
return found
|
||||
}
|
||||
|
||||
func (this *spaceImpl) GetApp(id ID) Application {
|
||||
obj, found := this.cache[id]
|
||||
func (v *spaceImpl) GetApp(id ID) Application {
|
||||
obj, found := v.cache[id]
|
||||
if !found {
|
||||
return nil
|
||||
}
|
||||
return obj
|
||||
}
|
||||
|
||||
func (this *spaceImpl) BindApp(id ID, application Application) {
|
||||
this.cache[id] = application
|
||||
func (v *spaceImpl) BindApp(id ID, application Application) {
|
||||
v.cache[id] = application
|
||||
}
|
||||
|
||||
func (this *spaceImpl) BindFromConfig(name string, config interface{}) error {
|
||||
func (v *spaceImpl) BindFromConfig(name string, config interface{}) error {
|
||||
factory, found := applicationFactoryCache[name]
|
||||
if !found {
|
||||
return errors.New("Space: app not registered: " + name)
|
||||
}
|
||||
app, err := factory.Create(this, config)
|
||||
app, err := factory.Create(v, config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
this.BindApp(factory.AppId(), app)
|
||||
v.BindApp(factory.AppId(), app)
|
||||
return nil
|
||||
}
|
||||
|
@ -19,20 +19,20 @@ func NewCryptionReader(stream cipher.Stream, reader io.Reader) *CryptionReader {
|
||||
}
|
||||
}
|
||||
|
||||
func (this *CryptionReader) Read(data []byte) (int, error) {
|
||||
if this.reader == nil {
|
||||
func (v *CryptionReader) Read(data []byte) (int, error) {
|
||||
if v.reader == nil {
|
||||
return 0, common.ErrObjectReleased
|
||||
}
|
||||
nBytes, err := this.reader.Read(data)
|
||||
nBytes, err := v.reader.Read(data)
|
||||
if nBytes > 0 {
|
||||
this.stream.XORKeyStream(data[:nBytes], data[:nBytes])
|
||||
v.stream.XORKeyStream(data[:nBytes], data[:nBytes])
|
||||
}
|
||||
return nBytes, err
|
||||
}
|
||||
|
||||
func (this *CryptionReader) Release() {
|
||||
this.reader = nil
|
||||
this.stream = nil
|
||||
func (v *CryptionReader) Release() {
|
||||
v.reader = nil
|
||||
v.stream = nil
|
||||
}
|
||||
|
||||
type CryptionWriter struct {
|
||||
@ -47,15 +47,15 @@ func NewCryptionWriter(stream cipher.Stream, writer io.Writer) *CryptionWriter {
|
||||
}
|
||||
}
|
||||
|
||||
func (this *CryptionWriter) Write(data []byte) (int, error) {
|
||||
if this.writer == nil {
|
||||
func (v *CryptionWriter) Write(data []byte) (int, error) {
|
||||
if v.writer == nil {
|
||||
return 0, common.ErrObjectReleased
|
||||
}
|
||||
this.stream.XORKeyStream(data, data)
|
||||
return this.writer.Write(data)
|
||||
v.stream.XORKeyStream(data, data)
|
||||
return v.writer.Write(data)
|
||||
}
|
||||
|
||||
func (this *CryptionWriter) Release() {
|
||||
this.writer = nil
|
||||
this.stream = nil
|
||||
func (v *CryptionWriter) Release() {
|
||||
v.writer = nil
|
||||
v.stream = nil
|
||||
}
|
||||
|
@ -22,47 +22,47 @@ func NewBufferedReader(rawReader io.Reader) *BufferedReader {
|
||||
}
|
||||
}
|
||||
|
||||
func (this *BufferedReader) Release() {
|
||||
this.Lock()
|
||||
defer this.Unlock()
|
||||
func (v *BufferedReader) Release() {
|
||||
v.Lock()
|
||||
defer v.Unlock()
|
||||
|
||||
this.buffer.Release()
|
||||
this.buffer = nil
|
||||
this.reader = nil
|
||||
v.buffer.Release()
|
||||
v.buffer = nil
|
||||
v.reader = nil
|
||||
}
|
||||
|
||||
func (this *BufferedReader) Cached() bool {
|
||||
return this.cached
|
||||
func (v *BufferedReader) Cached() bool {
|
||||
return v.cached
|
||||
}
|
||||
|
||||
func (this *BufferedReader) SetCached(cached bool) {
|
||||
this.cached = cached
|
||||
func (v *BufferedReader) SetCached(cached bool) {
|
||||
v.cached = cached
|
||||
}
|
||||
|
||||
func (this *BufferedReader) Read(b []byte) (int, error) {
|
||||
this.Lock()
|
||||
defer this.Unlock()
|
||||
func (v *BufferedReader) Read(b []byte) (int, error) {
|
||||
v.Lock()
|
||||
defer v.Unlock()
|
||||
|
||||
if this.reader == nil {
|
||||
if v.reader == nil {
|
||||
return 0, io.EOF
|
||||
}
|
||||
|
||||
if !this.cached {
|
||||
if !this.buffer.IsEmpty() {
|
||||
return this.buffer.Read(b)
|
||||
if !v.cached {
|
||||
if !v.buffer.IsEmpty() {
|
||||
return v.buffer.Read(b)
|
||||
}
|
||||
return this.reader.Read(b)
|
||||
return v.reader.Read(b)
|
||||
}
|
||||
if this.buffer.IsEmpty() {
|
||||
_, err := this.buffer.FillFrom(this.reader)
|
||||
if v.buffer.IsEmpty() {
|
||||
_, err := v.buffer.FillFrom(v.reader)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
||||
if this.buffer.IsEmpty() {
|
||||
if v.buffer.IsEmpty() {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
return this.buffer.Read(b)
|
||||
return v.buffer.Read(b)
|
||||
}
|
||||
|
@ -21,17 +21,17 @@ func NewBufferedWriter(rawWriter io.Writer) *BufferedWriter {
|
||||
}
|
||||
}
|
||||
|
||||
func (this *BufferedWriter) ReadFrom(reader io.Reader) (int64, error) {
|
||||
this.Lock()
|
||||
defer this.Unlock()
|
||||
func (v *BufferedWriter) ReadFrom(reader io.Reader) (int64, error) {
|
||||
v.Lock()
|
||||
defer v.Unlock()
|
||||
|
||||
if this.writer == nil {
|
||||
if v.writer == nil {
|
||||
return 0, io.ErrClosedPipe
|
||||
}
|
||||
|
||||
totalBytes := int64(0)
|
||||
for {
|
||||
nBytes, err := this.buffer.FillFrom(reader)
|
||||
nBytes, err := v.buffer.FillFrom(reader)
|
||||
totalBytes += int64(nBytes)
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
@ -39,69 +39,69 @@ func (this *BufferedWriter) ReadFrom(reader io.Reader) (int64, error) {
|
||||
}
|
||||
return totalBytes, err
|
||||
}
|
||||
this.FlushWithoutLock()
|
||||
v.FlushWithoutLock()
|
||||
}
|
||||
}
|
||||
|
||||
func (this *BufferedWriter) Write(b []byte) (int, error) {
|
||||
this.Lock()
|
||||
defer this.Unlock()
|
||||
func (v *BufferedWriter) Write(b []byte) (int, error) {
|
||||
v.Lock()
|
||||
defer v.Unlock()
|
||||
|
||||
if this.writer == nil {
|
||||
if v.writer == nil {
|
||||
return 0, io.ErrClosedPipe
|
||||
}
|
||||
|
||||
if !this.cached {
|
||||
return this.writer.Write(b)
|
||||
if !v.cached {
|
||||
return v.writer.Write(b)
|
||||
}
|
||||
nBytes, _ := this.buffer.Write(b)
|
||||
if this.buffer.IsFull() {
|
||||
this.FlushWithoutLock()
|
||||
nBytes, _ := v.buffer.Write(b)
|
||||
if v.buffer.IsFull() {
|
||||
v.FlushWithoutLock()
|
||||
}
|
||||
return nBytes, nil
|
||||
}
|
||||
|
||||
func (this *BufferedWriter) Flush() error {
|
||||
this.Lock()
|
||||
defer this.Unlock()
|
||||
func (v *BufferedWriter) Flush() error {
|
||||
v.Lock()
|
||||
defer v.Unlock()
|
||||
|
||||
if this.writer == nil {
|
||||
if v.writer == nil {
|
||||
return io.ErrClosedPipe
|
||||
}
|
||||
|
||||
return this.FlushWithoutLock()
|
||||
return v.FlushWithoutLock()
|
||||
}
|
||||
|
||||
func (this *BufferedWriter) FlushWithoutLock() error {
|
||||
defer this.buffer.Clear()
|
||||
for !this.buffer.IsEmpty() {
|
||||
nBytes, err := this.writer.Write(this.buffer.Value)
|
||||
func (v *BufferedWriter) FlushWithoutLock() error {
|
||||
defer v.buffer.Clear()
|
||||
for !v.buffer.IsEmpty() {
|
||||
nBytes, err := v.writer.Write(v.buffer.Value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
this.buffer.SliceFrom(nBytes)
|
||||
v.buffer.SliceFrom(nBytes)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *BufferedWriter) Cached() bool {
|
||||
return this.cached
|
||||
func (v *BufferedWriter) Cached() bool {
|
||||
return v.cached
|
||||
}
|
||||
|
||||
func (this *BufferedWriter) SetCached(cached bool) {
|
||||
this.cached = cached
|
||||
if !cached && !this.buffer.IsEmpty() {
|
||||
this.Flush()
|
||||
func (v *BufferedWriter) SetCached(cached bool) {
|
||||
v.cached = cached
|
||||
if !cached && !v.buffer.IsEmpty() {
|
||||
v.Flush()
|
||||
}
|
||||
}
|
||||
|
||||
func (this *BufferedWriter) Release() {
|
||||
this.Flush()
|
||||
func (v *BufferedWriter) Release() {
|
||||
v.Flush()
|
||||
|
||||
this.Lock()
|
||||
defer this.Unlock()
|
||||
v.Lock()
|
||||
defer v.Unlock()
|
||||
|
||||
this.buffer.Release()
|
||||
this.buffer = nil
|
||||
this.writer = nil
|
||||
v.buffer.Release()
|
||||
v.buffer = nil
|
||||
v.writer = nil
|
||||
}
|
||||
|
@ -18,10 +18,10 @@ func NewChainWriter(writer Writer) *ChainWriter {
|
||||
}
|
||||
}
|
||||
|
||||
func (this *ChainWriter) Write(payload []byte) (int, error) {
|
||||
this.Lock()
|
||||
defer this.Unlock()
|
||||
if this.writer == nil {
|
||||
func (v *ChainWriter) Write(payload []byte) (int, error) {
|
||||
v.Lock()
|
||||
defer v.Unlock()
|
||||
if v.writer == nil {
|
||||
return 0, io.ErrClosedPipe
|
||||
}
|
||||
|
||||
@ -39,7 +39,7 @@ func (this *ChainWriter) Write(payload []byte) (int, error) {
|
||||
bytesWritten += size
|
||||
size = 0
|
||||
}
|
||||
err := this.writer.Write(buffer)
|
||||
err := v.writer.Write(buffer)
|
||||
if err != nil {
|
||||
return bytesWritten, err
|
||||
}
|
||||
@ -48,9 +48,9 @@ func (this *ChainWriter) Write(payload []byte) (int, error) {
|
||||
return bytesWritten, nil
|
||||
}
|
||||
|
||||
func (this *ChainWriter) Release() {
|
||||
this.Lock()
|
||||
this.writer.Release()
|
||||
this.writer = nil
|
||||
this.Unlock()
|
||||
func (v *ChainWriter) Release() {
|
||||
v.Lock()
|
||||
v.writer.Release()
|
||||
v.writer = nil
|
||||
v.Unlock()
|
||||
}
|
||||
|
@ -21,42 +21,42 @@ func NewChanReader(stream Reader) *ChanReader {
|
||||
}
|
||||
|
||||
// Private: Visible for testing.
|
||||
func (this *ChanReader) Fill() {
|
||||
b, err := this.stream.Read()
|
||||
this.current = b
|
||||
func (v *ChanReader) Fill() {
|
||||
b, err := v.stream.Read()
|
||||
v.current = b
|
||||
if err != nil {
|
||||
this.eof = true
|
||||
this.current = nil
|
||||
v.eof = true
|
||||
v.current = nil
|
||||
}
|
||||
}
|
||||
|
||||
func (this *ChanReader) Read(b []byte) (int, error) {
|
||||
if this.eof {
|
||||
func (v *ChanReader) Read(b []byte) (int, error) {
|
||||
if v.eof {
|
||||
return 0, io.EOF
|
||||
}
|
||||
|
||||
this.Lock()
|
||||
defer this.Unlock()
|
||||
if this.current == nil {
|
||||
this.Fill()
|
||||
if this.eof {
|
||||
v.Lock()
|
||||
defer v.Unlock()
|
||||
if v.current == nil {
|
||||
v.Fill()
|
||||
if v.eof {
|
||||
return 0, io.EOF
|
||||
}
|
||||
}
|
||||
nBytes, err := this.current.Read(b)
|
||||
if this.current.IsEmpty() {
|
||||
this.current.Release()
|
||||
this.current = nil
|
||||
nBytes, err := v.current.Read(b)
|
||||
if v.current.IsEmpty() {
|
||||
v.current.Release()
|
||||
v.current = nil
|
||||
}
|
||||
return nBytes, err
|
||||
}
|
||||
|
||||
func (this *ChanReader) Release() {
|
||||
this.Lock()
|
||||
defer this.Unlock()
|
||||
func (v *ChanReader) Release() {
|
||||
v.Lock()
|
||||
defer v.Unlock()
|
||||
|
||||
this.eof = true
|
||||
this.current.Release()
|
||||
this.current = nil
|
||||
this.stream = nil
|
||||
v.eof = true
|
||||
v.current.Release()
|
||||
v.current = nil
|
||||
v.stream = nil
|
||||
}
|
||||
|