ccm,ocm: mark credentials unusable on usage poll failure and trigger poll on upstream error

This commit is contained in:
世界
2026-03-14 14:05:10 +08:00
parent ee65b375cb
commit 1628272507
6 changed files with 108 additions and 50 deletions

View File

@@ -239,6 +239,10 @@ func (c *externalCredential) isUsable() bool {
return false
}
c.stateMutex.RLock()
if c.state.consecutivePollFailures > 0 {
c.stateMutex.RUnlock()
return false
}
if c.state.hardRateLimited {
if time.Now().Before(c.state.rateLimitResetAt) {
c.stateMutex.RUnlock()
@@ -402,6 +406,7 @@ func (c *externalCredential) updateStateFromHeaders(headers http.Header) {
}
}
if hadData {
c.state.consecutivePollFailures = 0
c.state.lastUpdated = time.Now()
}
if isFirstUpdate || int(c.state.fiveHourUtilization*100) != int(oldFiveHour*100) || int(c.state.weeklyUtilization*100) != int(oldWeekly*100) {
@@ -419,7 +424,7 @@ func (c *externalCredential) updateStateFromHeaders(headers http.Header) {
}
func (c *externalCredential) checkTransitionLocked() bool {
unusable := c.state.hardRateLimited || c.state.fiveHourUtilization >= 100 || c.state.weeklyUtilization >= 100
unusable := c.state.hardRateLimited || c.state.fiveHourUtilization >= 100 || c.state.weeklyUtilization >= 100 || c.state.consecutivePollFailures > 0
if unusable && !c.interrupted {
c.interrupted = true
return true
@@ -479,19 +484,24 @@ func (c *externalCredential) pollUsage(ctx context.Context) {
})
if err != nil {
c.logger.Error("poll usage for ", c.tag, ": ", err)
c.stateMutex.Lock()
c.state.consecutivePollFailures++
c.stateMutex.Unlock()
c.incrementPollFailures()
return
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
body, _ := io.ReadAll(response.Body)
c.stateMutex.Lock()
c.state.consecutivePollFailures++
c.stateMutex.Unlock()
c.logger.Debug("poll usage for ", c.tag, ": status ", response.StatusCode, " ", string(body))
// 404 means the remote does not have a status endpoint yet;
// usage will be updated passively from response headers.
if response.StatusCode == http.StatusNotFound {
c.stateMutex.Lock()
c.state.consecutivePollFailures = 0
c.checkTransitionLocked()
c.stateMutex.Unlock()
} else {
c.incrementPollFailures()
}
return
}
@@ -501,10 +511,8 @@ func (c *externalCredential) pollUsage(ctx context.Context) {
}
err = json.NewDecoder(response.Body).Decode(&statusResponse)
if err != nil {
c.stateMutex.Lock()
c.state.consecutivePollFailures++
c.stateMutex.Unlock()
c.logger.Debug("poll usage for ", c.tag, ": decode: ", err)
c.incrementPollFailures()
return
}
@@ -551,10 +559,17 @@ func (c *externalCredential) pollBackoff(baseInterval time.Duration) time.Durati
if failures <= 0 {
return baseInterval
}
if failures > 4 {
failures = 4
return failedPollRetryInterval
}
func (c *externalCredential) incrementPollFailures() {
c.stateMutex.Lock()
c.state.consecutivePollFailures++
shouldInterrupt := c.checkTransitionLocked()
c.stateMutex.Unlock()
if shouldInterrupt {
c.interruptConnections()
}
return baseInterval * time.Duration(1<<failures)
}
func (c *externalCredential) usageTrackerOrNil() *AggregatedUsage {

View File

@@ -26,7 +26,10 @@ import (
"github.com/sagernet/sing/common/ntp"
)
const defaultPollInterval = 60 * time.Minute
const (
defaultPollInterval = 60 * time.Minute
failedPollRetryInterval = time.Minute
)
const (
httpRetryMaxAttempts = 3
@@ -391,6 +394,7 @@ func (c *defaultCredential) updateStateFromHeaders(headers http.Header) {
}
}
if hadData {
c.state.consecutivePollFailures = 0
c.state.lastUpdated = time.Now()
}
if isFirstUpdate || int(c.state.fiveHourUtilization*100) != int(oldFiveHour*100) || int(c.state.weeklyUtilization*100) != int(oldWeekly*100) {
@@ -427,6 +431,10 @@ func (c *defaultCredential) isUsable() bool {
c.stateMutex.RUnlock()
return false
}
if c.state.consecutivePollFailures > 0 {
c.stateMutex.RUnlock()
return false
}
if c.state.hardRateLimited {
if time.Now().Before(c.state.rateLimitResetAt) {
c.stateMutex.RUnlock()
@@ -459,7 +467,7 @@ func (c *defaultCredential) checkReservesLocked() bool {
// checkTransitionLocked detects usable→unusable transition.
// Must be called with stateMutex write lock held.
func (c *defaultCredential) checkTransitionLocked() bool {
unusable := c.state.unavailable || c.state.hardRateLimited || !c.checkReservesLocked()
unusable := c.state.unavailable || c.state.hardRateLimited || !c.checkReservesLocked() || c.state.consecutivePollFailures > 0
if unusable && !c.interrupted {
c.interrupted = true
return true
@@ -534,6 +542,16 @@ func (c *defaultCredential) markUsagePollAttempted() {
c.state.lastUpdated = time.Now()
}
func (c *defaultCredential) incrementPollFailures() {
c.stateMutex.Lock()
c.state.consecutivePollFailures++
shouldInterrupt := c.checkTransitionLocked()
c.stateMutex.Unlock()
if shouldInterrupt {
c.interruptConnections()
}
}
func (c *defaultCredential) pollBackoff(baseInterval time.Duration) time.Duration {
c.stateMutex.RLock()
failures := c.state.consecutivePollFailures
@@ -541,10 +559,7 @@ func (c *defaultCredential) pollBackoff(baseInterval time.Duration) time.Duratio
if failures <= 0 {
return baseInterval
}
if failures > 4 {
failures = 4
}
return baseInterval * time.Duration(1<<failures)
return failedPollRetryInterval
}
func (c *defaultCredential) earliestReset() time.Time {
@@ -578,6 +593,7 @@ func (c *defaultCredential) pollUsage(ctx context.Context) {
accessToken, err := c.getAccessToken()
if err != nil {
c.logger.Error("poll usage for ", c.tag, ": get token: ", err)
c.incrementPollFailures()
return
}
@@ -599,6 +615,7 @@ func (c *defaultCredential) pollUsage(ctx context.Context) {
})
if err != nil {
c.logger.Error("poll usage for ", c.tag, ": ", err)
c.incrementPollFailures()
return
}
defer response.Body.Close()
@@ -608,10 +625,8 @@ func (c *defaultCredential) pollUsage(ctx context.Context) {
c.logger.Warn("poll usage for ", c.tag, ": rate limited")
}
body, _ := io.ReadAll(response.Body)
c.stateMutex.Lock()
c.state.consecutivePollFailures++
c.stateMutex.Unlock()
c.logger.Debug("poll usage for ", c.tag, ": status ", response.StatusCode, " ", string(body))
c.incrementPollFailures()
return
}
@@ -627,10 +642,8 @@ func (c *defaultCredential) pollUsage(ctx context.Context) {
}
err = json.NewDecoder(response.Body).Decode(&usageResponse)
if err != nil {
c.stateMutex.Lock()
c.state.consecutivePollFailures++
c.stateMutex.Unlock()
c.logger.Debug("poll usage for ", c.tag, ": decode: ", err)
c.incrementPollFailures()
return
}

View File

@@ -526,6 +526,7 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if response.StatusCode != http.StatusOK && response.StatusCode != http.StatusTooManyRequests {
body, _ := io.ReadAll(response.Body)
s.logger.Error("upstream error from ", selectedCredential.tagName(), ": status ", response.StatusCode, " ", string(body))
go selectedCredential.pollUsage(s.ctx)
writeJSONError(w, r, http.StatusInternalServerError, "api_error",
"proxy request (status "+strconv.Itoa(response.StatusCode)+"): "+string(body))
return

View File

@@ -262,6 +262,10 @@ func (c *externalCredential) isUsable() bool {
return false
}
c.stateMutex.RLock()
if c.state.consecutivePollFailures > 0 {
c.stateMutex.RUnlock()
return false
}
if c.state.hardRateLimited {
if time.Now().Before(c.state.rateLimitResetAt) {
c.stateMutex.RUnlock()
@@ -439,6 +443,7 @@ func (c *externalCredential) updateStateFromHeaders(headers http.Header) {
}
}
if hadData {
c.state.consecutivePollFailures = 0
c.state.lastUpdated = time.Now()
}
if isFirstUpdate || int(c.state.fiveHourUtilization*100) != int(oldFiveHour*100) || int(c.state.weeklyUtilization*100) != int(oldWeekly*100) {
@@ -456,7 +461,7 @@ func (c *externalCredential) updateStateFromHeaders(headers http.Header) {
}
func (c *externalCredential) checkTransitionLocked() bool {
unusable := c.state.hardRateLimited || c.state.fiveHourUtilization >= 100 || c.state.weeklyUtilization >= 100
unusable := c.state.hardRateLimited || c.state.fiveHourUtilization >= 100 || c.state.weeklyUtilization >= 100 || c.state.consecutivePollFailures > 0
if unusable && !c.interrupted {
c.interrupted = true
return true
@@ -516,19 +521,24 @@ func (c *externalCredential) pollUsage(ctx context.Context) {
})
if err != nil {
c.logger.Error("poll usage for ", c.tag, ": ", err)
c.stateMutex.Lock()
c.state.consecutivePollFailures++
c.stateMutex.Unlock()
c.incrementPollFailures()
return
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
body, _ := io.ReadAll(response.Body)
c.stateMutex.Lock()
c.state.consecutivePollFailures++
c.stateMutex.Unlock()
c.logger.Debug("poll usage for ", c.tag, ": status ", response.StatusCode, " ", string(body))
// 404 means the remote does not have a status endpoint yet;
// usage will be updated passively from response headers.
if response.StatusCode == http.StatusNotFound {
c.stateMutex.Lock()
c.state.consecutivePollFailures = 0
c.checkTransitionLocked()
c.stateMutex.Unlock()
} else {
c.incrementPollFailures()
}
return
}
@@ -538,10 +548,8 @@ func (c *externalCredential) pollUsage(ctx context.Context) {
}
err = json.NewDecoder(response.Body).Decode(&statusResponse)
if err != nil {
c.stateMutex.Lock()
c.state.consecutivePollFailures++
c.stateMutex.Unlock()
c.logger.Debug("poll usage for ", c.tag, ": decode: ", err)
c.incrementPollFailures()
return
}
@@ -588,10 +596,17 @@ func (c *externalCredential) pollBackoff(baseInterval time.Duration) time.Durati
if failures <= 0 {
return baseInterval
}
if failures > 4 {
failures = 4
return failedPollRetryInterval
}
func (c *externalCredential) incrementPollFailures() {
c.stateMutex.Lock()
c.state.consecutivePollFailures++
shouldInterrupt := c.checkTransitionLocked()
c.stateMutex.Unlock()
if shouldInterrupt {
c.interruptConnections()
}
return baseInterval * time.Duration(1<<failures)
}
func (c *externalCredential) usageTrackerOrNil() *AggregatedUsage {

View File

@@ -26,7 +26,10 @@ import (
"github.com/sagernet/sing/common/ntp"
)
const defaultPollInterval = 60 * time.Minute
const (
defaultPollInterval = 60 * time.Minute
failedPollRetryInterval = time.Minute
)
const (
httpRetryMaxAttempts = 3
@@ -408,6 +411,7 @@ func (c *defaultCredential) updateStateFromHeaders(headers http.Header) {
}
}
if hadData {
c.state.consecutivePollFailures = 0
c.state.lastUpdated = time.Now()
}
if isFirstUpdate || int(c.state.fiveHourUtilization*100) != int(oldFiveHour*100) || int(c.state.weeklyUtilization*100) != int(oldWeekly*100) {
@@ -444,6 +448,10 @@ func (c *defaultCredential) isUsable() bool {
c.stateMutex.RUnlock()
return false
}
if c.state.consecutivePollFailures > 0 {
c.stateMutex.RUnlock()
return false
}
if c.state.hardRateLimited {
if time.Now().Before(c.state.rateLimitResetAt) {
c.stateMutex.RUnlock()
@@ -476,7 +484,7 @@ func (c *defaultCredential) checkReservesLocked() bool {
// checkTransitionLocked detects usable→unusable transition.
// Must be called with stateMutex write lock held.
func (c *defaultCredential) checkTransitionLocked() bool {
unusable := c.state.unavailable || c.state.hardRateLimited || !c.checkReservesLocked()
unusable := c.state.unavailable || c.state.hardRateLimited || !c.checkReservesLocked() || c.state.consecutivePollFailures > 0
if unusable && !c.interrupted {
c.interrupted = true
return true
@@ -551,6 +559,16 @@ func (c *defaultCredential) markUsagePollAttempted() {
c.state.lastUpdated = time.Now()
}
func (c *defaultCredential) incrementPollFailures() {
c.stateMutex.Lock()
c.state.consecutivePollFailures++
shouldInterrupt := c.checkTransitionLocked()
c.stateMutex.Unlock()
if shouldInterrupt {
c.interruptConnections()
}
}
func (c *defaultCredential) pollBackoff(baseInterval time.Duration) time.Duration {
c.stateMutex.RLock()
failures := c.state.consecutivePollFailures
@@ -558,10 +576,7 @@ func (c *defaultCredential) pollBackoff(baseInterval time.Duration) time.Duratio
if failures <= 0 {
return baseInterval
}
if failures > 4 {
failures = 4
}
return baseInterval * time.Duration(1<<failures)
return failedPollRetryInterval
}
func (c *defaultCredential) earliestReset() time.Time {
@@ -598,6 +613,7 @@ func (c *defaultCredential) pollUsage(ctx context.Context) {
accessToken, err := c.getAccessToken()
if err != nil {
c.logger.Error("poll usage for ", c.tag, ": get token: ", err)
c.incrementPollFailures()
return
}
@@ -627,6 +643,7 @@ func (c *defaultCredential) pollUsage(ctx context.Context) {
})
if err != nil {
c.logger.Error("poll usage for ", c.tag, ": ", err)
c.incrementPollFailures()
return
}
defer response.Body.Close()
@@ -636,10 +653,8 @@ func (c *defaultCredential) pollUsage(ctx context.Context) {
c.logger.Warn("poll usage for ", c.tag, ": rate limited")
}
body, _ := io.ReadAll(response.Body)
c.stateMutex.Lock()
c.state.consecutivePollFailures++
c.stateMutex.Unlock()
c.logger.Debug("poll usage for ", c.tag, ": status ", response.StatusCode, " ", string(body))
c.incrementPollFailures()
return
}
@@ -656,10 +671,8 @@ func (c *defaultCredential) pollUsage(ctx context.Context) {
}
err = json.NewDecoder(response.Body).Decode(&usageResponse)
if err != nil {
c.stateMutex.Lock()
c.state.consecutivePollFailures++
c.stateMutex.Unlock()
c.logger.Debug("poll usage for ", c.tag, ": decode: ", err)
c.incrementPollFailures()
return
}

View File

@@ -572,6 +572,7 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if response.StatusCode != http.StatusOK && response.StatusCode != http.StatusTooManyRequests {
body, _ := io.ReadAll(response.Body)
s.logger.Error("upstream error from ", selectedCredential.tagName(), ": status ", response.StatusCode, " ", string(body))
go selectedCredential.pollUsage(s.ctx)
writeJSONError(w, r, http.StatusInternalServerError, "api_error",
"proxy request (status "+strconv.Itoa(response.StatusCode)+"): "+string(body))
return