From 5516d7b045aebd3b3a829734eab52242c2b8902a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=96=E7=95=8C?= Date: Fri, 13 Mar 2026 21:42:05 +0800 Subject: [PATCH] ccm,ocm: fix passive usage update for WebSocket connections WebSocket 101 upgrade responses do not include utilization headers (confirmed via codex CLI source). Rate limit data is delivered exclusively through in-band events (codex.rate_limits and error events with status 429). Previously, updateStateFromHeaders unconditionally bumped lastUpdated even when no utilization headers were found, which suppressed polling and left credential utilization permanently stale during WebSocket sessions. - Only bump lastUpdated when actual utilization data is parsed - Parse in-band codex.rate_limits events to update credential state - Detect in-band 429 error events to markRateLimited - Fix WebSocket 429 retry to update old credential state before retry --- service/ccm/credential_external.go | 9 +- service/ccm/credential_state.go | 9 +- service/ocm/credential_external.go | 9 +- service/ocm/credential_state.go | 9 +- service/ocm/service_websocket.go | 151 ++++++++++++++++++++++------- 5 files changed, 147 insertions(+), 40 deletions(-) diff --git a/service/ccm/credential_external.go b/service/ccm/credential_external.go index a0350a9fd..8a550719b 100644 --- a/service/ccm/credential_external.go +++ b/service/ccm/credential_external.go @@ -368,27 +368,34 @@ func (c *externalCredential) updateStateFromHeaders(headers http.Header) { isFirstUpdate := c.state.lastUpdated.IsZero() oldFiveHour := c.state.fiveHourUtilization oldWeekly := c.state.weeklyUtilization + hadData := false if value, exists := parseOptionalAnthropicResetHeader(headers, "anthropic-ratelimit-unified-5h-reset"); exists { + hadData = true c.state.fiveHourReset = value } if utilization := headers.Get("anthropic-ratelimit-unified-5h-utilization"); utilization != "" { value, err := strconv.ParseFloat(utilization, 64) if err == nil { + hadData = true c.state.fiveHourUtilization = value * 100 } } if value, exists := parseOptionalAnthropicResetHeader(headers, "anthropic-ratelimit-unified-7d-reset"); exists { + hadData = true c.state.weeklyReset = value } if utilization := headers.Get("anthropic-ratelimit-unified-7d-utilization"); utilization != "" { value, err := strconv.ParseFloat(utilization, 64) if err == nil { + hadData = true c.state.weeklyUtilization = value * 100 } } - c.state.lastUpdated = time.Now() + if hadData { + c.state.lastUpdated = time.Now() + } if isFirstUpdate || int(c.state.fiveHourUtilization*100) != int(oldFiveHour*100) || int(c.state.weeklyUtilization*100) != int(oldWeekly*100) { c.logger.Debug("usage update for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%") } diff --git a/service/ccm/credential_state.go b/service/ccm/credential_state.go index 788058547..35fb52dce 100644 --- a/service/ccm/credential_state.go +++ b/service/ccm/credential_state.go @@ -337,9 +337,11 @@ func (c *defaultCredential) updateStateFromHeaders(headers http.Header) { isFirstUpdate := c.state.lastUpdated.IsZero() oldFiveHour := c.state.fiveHourUtilization oldWeekly := c.state.weeklyUtilization + hadData := false fiveHourResetChanged := false if value, exists := parseOptionalAnthropicResetHeader(headers, "anthropic-ratelimit-unified-5h-reset"); exists { + hadData = true if value.After(c.state.fiveHourReset) { fiveHourResetChanged = true c.state.fiveHourReset = value @@ -348,6 +350,7 @@ func (c *defaultCredential) updateStateFromHeaders(headers http.Header) { if utilization := headers.Get("anthropic-ratelimit-unified-5h-utilization"); utilization != "" { value, err := strconv.ParseFloat(utilization, 64) if err == nil { + hadData = true newValue := math.Ceil(value * 100) if newValue >= c.state.fiveHourUtilization || fiveHourResetChanged { c.state.fiveHourUtilization = newValue @@ -357,6 +360,7 @@ func (c *defaultCredential) updateStateFromHeaders(headers http.Header) { weeklyResetChanged := false if value, exists := parseOptionalAnthropicResetHeader(headers, "anthropic-ratelimit-unified-7d-reset"); exists { + hadData = true if value.After(c.state.weeklyReset) { weeklyResetChanged = true c.state.weeklyReset = value @@ -365,13 +369,16 @@ func (c *defaultCredential) updateStateFromHeaders(headers http.Header) { if utilization := headers.Get("anthropic-ratelimit-unified-7d-utilization"); utilization != "" { value, err := strconv.ParseFloat(utilization, 64) if err == nil { + hadData = true newValue := math.Ceil(value * 100) if newValue >= c.state.weeklyUtilization || weeklyResetChanged { c.state.weeklyUtilization = newValue } } } - c.state.lastUpdated = time.Now() + if hadData { + c.state.lastUpdated = time.Now() + } if isFirstUpdate || int(c.state.fiveHourUtilization*100) != int(oldFiveHour*100) || int(c.state.weeklyUtilization*100) != int(oldWeekly*100) { c.logger.Debug("usage update for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%") } diff --git a/service/ocm/credential_external.go b/service/ocm/credential_external.go index 5c42350d7..edc369edd 100644 --- a/service/ocm/credential_external.go +++ b/service/ocm/credential_external.go @@ -390,6 +390,7 @@ func (c *externalCredential) updateStateFromHeaders(headers http.Header) { isFirstUpdate := c.state.lastUpdated.IsZero() oldFiveHour := c.state.fiveHourUtilization oldWeekly := c.state.weeklyUtilization + hadData := false activeLimitIdentifier := normalizeRateLimitIdentifier(headers.Get("x-codex-active-limit")) if activeLimitIdentifier == "" { @@ -400,6 +401,7 @@ func (c *externalCredential) updateStateFromHeaders(headers http.Header) { if fiveHourResetAt != "" { value, err := strconv.ParseInt(fiveHourResetAt, 10, 64) if err == nil { + hadData = true c.state.fiveHourReset = time.Unix(value, 0) } } @@ -407,6 +409,7 @@ func (c *externalCredential) updateStateFromHeaders(headers http.Header) { if fiveHourPercent != "" { value, err := strconv.ParseFloat(fiveHourPercent, 64) if err == nil { + hadData = true c.state.fiveHourUtilization = value } } @@ -415,6 +418,7 @@ func (c *externalCredential) updateStateFromHeaders(headers http.Header) { if weeklyResetAt != "" { value, err := strconv.ParseInt(weeklyResetAt, 10, 64) if err == nil { + hadData = true c.state.weeklyReset = time.Unix(value, 0) } } @@ -422,10 +426,13 @@ func (c *externalCredential) updateStateFromHeaders(headers http.Header) { if weeklyPercent != "" { value, err := strconv.ParseFloat(weeklyPercent, 64) if err == nil { + hadData = true c.state.weeklyUtilization = value } } - c.state.lastUpdated = time.Now() + if hadData { + c.state.lastUpdated = time.Now() + } if isFirstUpdate || int(c.state.fiveHourUtilization*100) != int(oldFiveHour*100) || int(c.state.weeklyUtilization*100) != int(oldWeekly*100) { c.logger.Debug("usage update for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%") } diff --git a/service/ocm/credential_state.go b/service/ocm/credential_state.go index 81019336b..2de63b960 100644 --- a/service/ocm/credential_state.go +++ b/service/ocm/credential_state.go @@ -339,6 +339,7 @@ func (c *defaultCredential) updateStateFromHeaders(headers http.Header) { isFirstUpdate := c.state.lastUpdated.IsZero() oldFiveHour := c.state.fiveHourUtilization oldWeekly := c.state.weeklyUtilization + hadData := false activeLimitIdentifier := normalizeRateLimitIdentifier(headers.Get("x-codex-active-limit")) if activeLimitIdentifier == "" { @@ -350,6 +351,7 @@ func (c *defaultCredential) updateStateFromHeaders(headers http.Header) { if fiveHourResetAt != "" { value, err := strconv.ParseInt(fiveHourResetAt, 10, 64) if err == nil { + hadData = true newReset := time.Unix(value, 0) if newReset.After(c.state.fiveHourReset) { fiveHourResetChanged = true @@ -361,6 +363,7 @@ func (c *defaultCredential) updateStateFromHeaders(headers http.Header) { if fiveHourPercent != "" { value, err := strconv.ParseFloat(fiveHourPercent, 64) if err == nil { + hadData = true if value >= c.state.fiveHourUtilization || fiveHourResetChanged { c.state.fiveHourUtilization = value } @@ -372,6 +375,7 @@ func (c *defaultCredential) updateStateFromHeaders(headers http.Header) { if weeklyResetAt != "" { value, err := strconv.ParseInt(weeklyResetAt, 10, 64) if err == nil { + hadData = true newReset := time.Unix(value, 0) if newReset.After(c.state.weeklyReset) { weeklyResetChanged = true @@ -383,12 +387,15 @@ func (c *defaultCredential) updateStateFromHeaders(headers http.Header) { if weeklyPercent != "" { value, err := strconv.ParseFloat(weeklyPercent, 64) if err == nil { + hadData = true if value >= c.state.weeklyUtilization || weeklyResetChanged { c.state.weeklyUtilization = value } } } - c.state.lastUpdated = time.Now() + if hadData { + c.state.lastUpdated = time.Now() + } if isFirstUpdate || int(c.state.fiveHourUtilization*100) != int(oldFiveHour*100) || int(c.state.weeklyUtilization*100) != int(oldWeekly*100) { c.logger.Debug("usage update for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%") } diff --git a/service/ocm/service_websocket.go b/service/ocm/service_websocket.go index 7aa68499c..d3f2535c0 100644 --- a/service/ocm/service_websocket.go +++ b/service/ocm/service_websocket.go @@ -9,6 +9,7 @@ import ( "net" "net/http" "net/textproto" + "strconv" "strings" "sync" "time" @@ -174,8 +175,8 @@ func (s *Service) handleWebSocket( if statusCode == http.StatusTooManyRequests { resetAt := parseOCMRateLimitResetFromHeaders(upstreamResponseHeaders) nextCredential := provider.onRateLimited(sessionID, selectedCredential, resetAt, credentialFilter) + selectedCredential.updateStateFromHeaders(upstreamResponseHeaders) if nextCredential == nil { - selectedCredential.updateStateFromHeaders(upstreamResponseHeaders) writeCredentialUnavailableError(w, r, provider, selectedCredential, credentialFilter, "all credentials rate-limited") return } @@ -298,44 +299,27 @@ func (s *Service) proxyWebSocketUpstreamToClient(upstreamReadWriter io.ReadWrite return } - if opCode == ws.OpText && usageTracker != nil { - select { - case model := <-modelChannel: - requestModel = model - default: - } - + if opCode == ws.OpText { var event struct { - Type string `json:"type"` + Type string `json:"type"` + StatusCode int `json:"status_code"` } - if json.Unmarshal(data, &event) == nil && event.Type == "response.completed" { - var streamEvent responses.ResponseStreamEventUnion - if json.Unmarshal(data, &streamEvent) == nil { - completedEvent := streamEvent.AsResponseCompleted() - responseModel := string(completedEvent.Response.Model) - serviceTier := string(completedEvent.Response.ServiceTier) - inputTokens := completedEvent.Response.Usage.InputTokens - outputTokens := completedEvent.Response.Usage.OutputTokens - cachedTokens := completedEvent.Response.Usage.InputTokensDetails.CachedTokens - - if inputTokens > 0 || outputTokens > 0 { - if responseModel == "" { - responseModel = requestModel - } - if responseModel != "" { - contextWindow := detectContextWindow(responseModel, serviceTier, inputTokens) - usageTracker.AddUsageWithCycleHint( - responseModel, - contextWindow, - inputTokens, - outputTokens, - cachedTokens, - serviceTier, - username, - time.Now(), - weeklyCycleHint, - ) + if json.Unmarshal(data, &event) == nil { + switch event.Type { + case "codex.rate_limits": + s.handleWebSocketRateLimitsEvent(data, selectedCredential) + case "error": + if event.StatusCode == http.StatusTooManyRequests { + s.handleWebSocketErrorRateLimited(data, selectedCredential) + } + case "response.completed": + if usageTracker != nil { + select { + case model := <-modelChannel: + requestModel = model + default: } + s.handleWebSocketResponseCompleted(data, usageTracker, requestModel, username, weeklyCycleHint) } } } @@ -350,3 +334,98 @@ func (s *Service) proxyWebSocketUpstreamToClient(upstreamReadWriter io.ReadWrite } } } + +func (s *Service) handleWebSocketRateLimitsEvent(data []byte, selectedCredential credential) { + var rateLimitsEvent struct { + RateLimits struct { + Primary *struct { + UsedPercent float64 `json:"used_percent"` + ResetAt int64 `json:"reset_at"` + } `json:"primary"` + Secondary *struct { + UsedPercent float64 `json:"used_percent"` + ResetAt int64 `json:"reset_at"` + } `json:"secondary"` + } `json:"rate_limits"` + LimitName string `json:"limit_name"` + MeteredLimitName string `json:"metered_limit_name"` + } + err := json.Unmarshal(data, &rateLimitsEvent) + if err != nil { + return + } + identifier := rateLimitsEvent.MeteredLimitName + if identifier == "" { + identifier = rateLimitsEvent.LimitName + } + if identifier == "" { + identifier = "codex" + } + identifier = normalizeRateLimitIdentifier(identifier) + + headers := make(http.Header) + headers.Set("x-codex-active-limit", identifier) + if w := rateLimitsEvent.RateLimits.Primary; w != nil { + headers.Set("x-"+identifier+"-primary-used-percent", strconv.FormatFloat(w.UsedPercent, 'f', -1, 64)) + if w.ResetAt > 0 { + headers.Set("x-"+identifier+"-primary-reset-at", strconv.FormatInt(w.ResetAt, 10)) + } + } + if w := rateLimitsEvent.RateLimits.Secondary; w != nil { + headers.Set("x-"+identifier+"-secondary-used-percent", strconv.FormatFloat(w.UsedPercent, 'f', -1, 64)) + if w.ResetAt > 0 { + headers.Set("x-"+identifier+"-secondary-reset-at", strconv.FormatInt(w.ResetAt, 10)) + } + } + selectedCredential.updateStateFromHeaders(headers) +} + +func (s *Service) handleWebSocketErrorRateLimited(data []byte, selectedCredential credential) { + var errorEvent struct { + Headers map[string]string `json:"headers"` + } + err := json.Unmarshal(data, &errorEvent) + if err != nil { + return + } + headers := make(http.Header) + for key, value := range errorEvent.Headers { + headers.Set(key, value) + } + selectedCredential.updateStateFromHeaders(headers) + resetAt := parseOCMRateLimitResetFromHeaders(headers) + selectedCredential.markRateLimited(resetAt) +} + +func (s *Service) handleWebSocketResponseCompleted(data []byte, usageTracker *AggregatedUsage, requestModel string, username string, weeklyCycleHint *WeeklyCycleHint) { + var streamEvent responses.ResponseStreamEventUnion + if json.Unmarshal(data, &streamEvent) != nil { + return + } + completedEvent := streamEvent.AsResponseCompleted() + responseModel := string(completedEvent.Response.Model) + serviceTier := string(completedEvent.Response.ServiceTier) + inputTokens := completedEvent.Response.Usage.InputTokens + outputTokens := completedEvent.Response.Usage.OutputTokens + cachedTokens := completedEvent.Response.Usage.InputTokensDetails.CachedTokens + + if inputTokens > 0 || outputTokens > 0 { + if responseModel == "" { + responseModel = requestModel + } + if responseModel != "" { + contextWindow := detectContextWindow(responseModel, serviceTier, inputTokens) + usageTracker.AddUsageWithCycleHint( + responseModel, + contextWindow, + inputTokens, + outputTokens, + cachedTokens, + serviceTier, + username, + time.Now(), + weeklyCycleHint, + ) + } + } +}