From 6b8838d323a17c7821b9043ec585ba9aed310f4b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=96=E7=95=8C?= Date: Tue, 17 Mar 2026 21:54:09 +0800 Subject: [PATCH] fix(ccm,ocm): restart status stream when receiver gets reverse session statusStreamLoop started on start() before any reverse session existed, got a non-retryable error, and exited permanently. Restart it when setReverseSession transitions receiver credentials to available. --- service/ccm/credential_external.go | 15 +++++++++++++++ service/ocm/credential_external.go | 15 +++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/service/ccm/credential_external.go b/service/ccm/credential_external.go index ba42ad64e..98332e41d 100644 --- a/service/ccm/credential_external.go +++ b/service/ccm/credential_external.go @@ -884,6 +884,8 @@ func (c *externalCredential) getReverseSession() *yamux.Session { func (c *externalCredential) setReverseSession(session *yamux.Session) bool { var emitStatus bool + var restartStatusStream bool + var triggerUsageRefresh bool c.reverseAccess.Lock() if c.closed { c.reverseAccess.Unlock() @@ -894,10 +896,23 @@ func (c *externalCredential) setReverseSession(session *yamux.Session) bool { c.reverseSession = session isAvailable := c.baseURL == reverseProxyBaseURL && c.reverseSession != nil && !c.reverseSession.IsClosed() emitStatus = wasAvailable != isAvailable + if isAvailable && !wasAvailable { + c.reverseCancel() + c.reverseContext, c.reverseCancel = context.WithCancel(context.Background()) + restartStatusStream = true + triggerUsageRefresh = true + } c.reverseAccess.Unlock() if old != nil { old.Close() } + if restartStatusStream { + c.logger.Debug("poll usage for ", c.tag, ": reverse session ready, restarting status stream") + go c.statusStreamLoop() + } + if triggerUsageRefresh { + go c.pollUsage(c.getReverseContext()) + } if emitStatus { c.emitStatusUpdate() } diff --git a/service/ocm/credential_external.go b/service/ocm/credential_external.go index b06171d03..47b6c0d5e 100644 --- a/service/ocm/credential_external.go +++ b/service/ocm/credential_external.go @@ -954,6 +954,8 @@ func (c *externalCredential) getReverseSession() *yamux.Session { func (c *externalCredential) setReverseSession(session *yamux.Session) bool { var emitStatus bool + var restartStatusStream bool + var triggerUsageRefresh bool c.reverseAccess.Lock() if c.closed { c.reverseAccess.Unlock() @@ -964,10 +966,23 @@ func (c *externalCredential) setReverseSession(session *yamux.Session) bool { c.reverseSession = session isAvailable := c.baseURL == reverseProxyBaseURL && c.reverseSession != nil && !c.reverseSession.IsClosed() emitStatus = wasAvailable != isAvailable + if isAvailable && !wasAvailable { + c.reverseCancel() + c.reverseContext, c.reverseCancel = context.WithCancel(context.Background()) + restartStatusStream = true + triggerUsageRefresh = true + } c.reverseAccess.Unlock() if old != nil { old.Close() } + if restartStatusStream { + c.logger.Debug("poll usage for ", c.tag, ": reverse session ready, restarting status stream") + go c.statusStreamLoop() + } + if triggerUsageRefresh { + go c.pollUsage(c.getReverseContext()) + } if emitStatus { c.emitStatusUpdate() }