fix(ccm,ocm): restart status stream when receiver gets reverse session

statusStreamLoop started on start() before any reverse session existed,
got a non-retryable error, and exited permanently. Restart it when
setReverseSession transitions receiver credentials to available.
This commit is contained in:
世界
2026-03-17 21:54:09 +08:00
parent b3429ef1f3
commit 6b8838d323
2 changed files with 30 additions and 0 deletions

View File

@@ -884,6 +884,8 @@ func (c *externalCredential) getReverseSession() *yamux.Session {
func (c *externalCredential) setReverseSession(session *yamux.Session) bool {
var emitStatus bool
var restartStatusStream bool
var triggerUsageRefresh bool
c.reverseAccess.Lock()
if c.closed {
c.reverseAccess.Unlock()
@@ -894,10 +896,23 @@ func (c *externalCredential) setReverseSession(session *yamux.Session) bool {
c.reverseSession = session
isAvailable := c.baseURL == reverseProxyBaseURL && c.reverseSession != nil && !c.reverseSession.IsClosed()
emitStatus = wasAvailable != isAvailable
if isAvailable && !wasAvailable {
c.reverseCancel()
c.reverseContext, c.reverseCancel = context.WithCancel(context.Background())
restartStatusStream = true
triggerUsageRefresh = true
}
c.reverseAccess.Unlock()
if old != nil {
old.Close()
}
if restartStatusStream {
c.logger.Debug("poll usage for ", c.tag, ": reverse session ready, restarting status stream")
go c.statusStreamLoop()
}
if triggerUsageRefresh {
go c.pollUsage(c.getReverseContext())
}
if emitStatus {
c.emitStatusUpdate()
}

View File

@@ -954,6 +954,8 @@ func (c *externalCredential) getReverseSession() *yamux.Session {
func (c *externalCredential) setReverseSession(session *yamux.Session) bool {
var emitStatus bool
var restartStatusStream bool
var triggerUsageRefresh bool
c.reverseAccess.Lock()
if c.closed {
c.reverseAccess.Unlock()
@@ -964,10 +966,23 @@ func (c *externalCredential) setReverseSession(session *yamux.Session) bool {
c.reverseSession = session
isAvailable := c.baseURL == reverseProxyBaseURL && c.reverseSession != nil && !c.reverseSession.IsClosed()
emitStatus = wasAvailable != isAvailable
if isAvailable && !wasAvailable {
c.reverseCancel()
c.reverseContext, c.reverseCancel = context.WithCancel(context.Background())
restartStatusStream = true
triggerUsageRefresh = true
}
c.reverseAccess.Unlock()
if old != nil {
old.Close()
}
if restartStatusStream {
c.logger.Debug("poll usage for ", c.tag, ": reverse session ready, restarting status stream")
go c.statusStreamLoop()
}
if triggerUsageRefresh {
go c.pollUsage(c.getReverseContext())
}
if emitStatus {
c.emitStatusUpdate()
}