ccm,ocm: fix passive usage update for WebSocket connections

WebSocket 101 upgrade responses do not include utilization headers
(confirmed via codex CLI source). Rate limit data is delivered
exclusively through in-band events (codex.rate_limits and error
events with status 429).

Previously, updateStateFromHeaders unconditionally bumped lastUpdated
even when no utilization headers were found, which suppressed polling
and left credential utilization permanently stale during WebSocket
sessions.

- Only bump lastUpdated when actual utilization data is parsed
- Parse in-band codex.rate_limits events to update credential state
- Detect in-band 429 error events to markRateLimited
- Fix WebSocket 429 retry to update old credential state before retry
This commit is contained in:
世界
2026-03-13 21:42:05 +08:00
parent c639c27cdb
commit 5516d7b045
5 changed files with 147 additions and 40 deletions

View File

@@ -368,27 +368,34 @@ func (c *externalCredential) updateStateFromHeaders(headers http.Header) {
isFirstUpdate := c.state.lastUpdated.IsZero()
oldFiveHour := c.state.fiveHourUtilization
oldWeekly := c.state.weeklyUtilization
hadData := false
if value, exists := parseOptionalAnthropicResetHeader(headers, "anthropic-ratelimit-unified-5h-reset"); exists {
hadData = true
c.state.fiveHourReset = value
}
if utilization := headers.Get("anthropic-ratelimit-unified-5h-utilization"); utilization != "" {
value, err := strconv.ParseFloat(utilization, 64)
if err == nil {
hadData = true
c.state.fiveHourUtilization = value * 100
}
}
if value, exists := parseOptionalAnthropicResetHeader(headers, "anthropic-ratelimit-unified-7d-reset"); exists {
hadData = true
c.state.weeklyReset = value
}
if utilization := headers.Get("anthropic-ratelimit-unified-7d-utilization"); utilization != "" {
value, err := strconv.ParseFloat(utilization, 64)
if err == nil {
hadData = true
c.state.weeklyUtilization = value * 100
}
}
c.state.lastUpdated = time.Now()
if hadData {
c.state.lastUpdated = time.Now()
}
if isFirstUpdate || int(c.state.fiveHourUtilization*100) != int(oldFiveHour*100) || int(c.state.weeklyUtilization*100) != int(oldWeekly*100) {
c.logger.Debug("usage update for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%")
}

View File

@@ -337,9 +337,11 @@ func (c *defaultCredential) updateStateFromHeaders(headers http.Header) {
isFirstUpdate := c.state.lastUpdated.IsZero()
oldFiveHour := c.state.fiveHourUtilization
oldWeekly := c.state.weeklyUtilization
hadData := false
fiveHourResetChanged := false
if value, exists := parseOptionalAnthropicResetHeader(headers, "anthropic-ratelimit-unified-5h-reset"); exists {
hadData = true
if value.After(c.state.fiveHourReset) {
fiveHourResetChanged = true
c.state.fiveHourReset = value
@@ -348,6 +350,7 @@ func (c *defaultCredential) updateStateFromHeaders(headers http.Header) {
if utilization := headers.Get("anthropic-ratelimit-unified-5h-utilization"); utilization != "" {
value, err := strconv.ParseFloat(utilization, 64)
if err == nil {
hadData = true
newValue := math.Ceil(value * 100)
if newValue >= c.state.fiveHourUtilization || fiveHourResetChanged {
c.state.fiveHourUtilization = newValue
@@ -357,6 +360,7 @@ func (c *defaultCredential) updateStateFromHeaders(headers http.Header) {
weeklyResetChanged := false
if value, exists := parseOptionalAnthropicResetHeader(headers, "anthropic-ratelimit-unified-7d-reset"); exists {
hadData = true
if value.After(c.state.weeklyReset) {
weeklyResetChanged = true
c.state.weeklyReset = value
@@ -365,13 +369,16 @@ func (c *defaultCredential) updateStateFromHeaders(headers http.Header) {
if utilization := headers.Get("anthropic-ratelimit-unified-7d-utilization"); utilization != "" {
value, err := strconv.ParseFloat(utilization, 64)
if err == nil {
hadData = true
newValue := math.Ceil(value * 100)
if newValue >= c.state.weeklyUtilization || weeklyResetChanged {
c.state.weeklyUtilization = newValue
}
}
}
c.state.lastUpdated = time.Now()
if hadData {
c.state.lastUpdated = time.Now()
}
if isFirstUpdate || int(c.state.fiveHourUtilization*100) != int(oldFiveHour*100) || int(c.state.weeklyUtilization*100) != int(oldWeekly*100) {
c.logger.Debug("usage update for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%")
}

View File

@@ -390,6 +390,7 @@ func (c *externalCredential) updateStateFromHeaders(headers http.Header) {
isFirstUpdate := c.state.lastUpdated.IsZero()
oldFiveHour := c.state.fiveHourUtilization
oldWeekly := c.state.weeklyUtilization
hadData := false
activeLimitIdentifier := normalizeRateLimitIdentifier(headers.Get("x-codex-active-limit"))
if activeLimitIdentifier == "" {
@@ -400,6 +401,7 @@ func (c *externalCredential) updateStateFromHeaders(headers http.Header) {
if fiveHourResetAt != "" {
value, err := strconv.ParseInt(fiveHourResetAt, 10, 64)
if err == nil {
hadData = true
c.state.fiveHourReset = time.Unix(value, 0)
}
}
@@ -407,6 +409,7 @@ func (c *externalCredential) updateStateFromHeaders(headers http.Header) {
if fiveHourPercent != "" {
value, err := strconv.ParseFloat(fiveHourPercent, 64)
if err == nil {
hadData = true
c.state.fiveHourUtilization = value
}
}
@@ -415,6 +418,7 @@ func (c *externalCredential) updateStateFromHeaders(headers http.Header) {
if weeklyResetAt != "" {
value, err := strconv.ParseInt(weeklyResetAt, 10, 64)
if err == nil {
hadData = true
c.state.weeklyReset = time.Unix(value, 0)
}
}
@@ -422,10 +426,13 @@ func (c *externalCredential) updateStateFromHeaders(headers http.Header) {
if weeklyPercent != "" {
value, err := strconv.ParseFloat(weeklyPercent, 64)
if err == nil {
hadData = true
c.state.weeklyUtilization = value
}
}
c.state.lastUpdated = time.Now()
if hadData {
c.state.lastUpdated = time.Now()
}
if isFirstUpdate || int(c.state.fiveHourUtilization*100) != int(oldFiveHour*100) || int(c.state.weeklyUtilization*100) != int(oldWeekly*100) {
c.logger.Debug("usage update for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%")
}

View File

@@ -339,6 +339,7 @@ func (c *defaultCredential) updateStateFromHeaders(headers http.Header) {
isFirstUpdate := c.state.lastUpdated.IsZero()
oldFiveHour := c.state.fiveHourUtilization
oldWeekly := c.state.weeklyUtilization
hadData := false
activeLimitIdentifier := normalizeRateLimitIdentifier(headers.Get("x-codex-active-limit"))
if activeLimitIdentifier == "" {
@@ -350,6 +351,7 @@ func (c *defaultCredential) updateStateFromHeaders(headers http.Header) {
if fiveHourResetAt != "" {
value, err := strconv.ParseInt(fiveHourResetAt, 10, 64)
if err == nil {
hadData = true
newReset := time.Unix(value, 0)
if newReset.After(c.state.fiveHourReset) {
fiveHourResetChanged = true
@@ -361,6 +363,7 @@ func (c *defaultCredential) updateStateFromHeaders(headers http.Header) {
if fiveHourPercent != "" {
value, err := strconv.ParseFloat(fiveHourPercent, 64)
if err == nil {
hadData = true
if value >= c.state.fiveHourUtilization || fiveHourResetChanged {
c.state.fiveHourUtilization = value
}
@@ -372,6 +375,7 @@ func (c *defaultCredential) updateStateFromHeaders(headers http.Header) {
if weeklyResetAt != "" {
value, err := strconv.ParseInt(weeklyResetAt, 10, 64)
if err == nil {
hadData = true
newReset := time.Unix(value, 0)
if newReset.After(c.state.weeklyReset) {
weeklyResetChanged = true
@@ -383,12 +387,15 @@ func (c *defaultCredential) updateStateFromHeaders(headers http.Header) {
if weeklyPercent != "" {
value, err := strconv.ParseFloat(weeklyPercent, 64)
if err == nil {
hadData = true
if value >= c.state.weeklyUtilization || weeklyResetChanged {
c.state.weeklyUtilization = value
}
}
}
c.state.lastUpdated = time.Now()
if hadData {
c.state.lastUpdated = time.Now()
}
if isFirstUpdate || int(c.state.fiveHourUtilization*100) != int(oldFiveHour*100) || int(c.state.weeklyUtilization*100) != int(oldWeekly*100) {
c.logger.Debug("usage update for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%")
}

View File

@@ -9,6 +9,7 @@ import (
"net"
"net/http"
"net/textproto"
"strconv"
"strings"
"sync"
"time"
@@ -174,8 +175,8 @@ func (s *Service) handleWebSocket(
if statusCode == http.StatusTooManyRequests {
resetAt := parseOCMRateLimitResetFromHeaders(upstreamResponseHeaders)
nextCredential := provider.onRateLimited(sessionID, selectedCredential, resetAt, credentialFilter)
selectedCredential.updateStateFromHeaders(upstreamResponseHeaders)
if nextCredential == nil {
selectedCredential.updateStateFromHeaders(upstreamResponseHeaders)
writeCredentialUnavailableError(w, r, provider, selectedCredential, credentialFilter, "all credentials rate-limited")
return
}
@@ -298,44 +299,27 @@ func (s *Service) proxyWebSocketUpstreamToClient(upstreamReadWriter io.ReadWrite
return
}
if opCode == ws.OpText && usageTracker != nil {
select {
case model := <-modelChannel:
requestModel = model
default:
}
if opCode == ws.OpText {
var event struct {
Type string `json:"type"`
Type string `json:"type"`
StatusCode int `json:"status_code"`
}
if json.Unmarshal(data, &event) == nil && event.Type == "response.completed" {
var streamEvent responses.ResponseStreamEventUnion
if json.Unmarshal(data, &streamEvent) == nil {
completedEvent := streamEvent.AsResponseCompleted()
responseModel := string(completedEvent.Response.Model)
serviceTier := string(completedEvent.Response.ServiceTier)
inputTokens := completedEvent.Response.Usage.InputTokens
outputTokens := completedEvent.Response.Usage.OutputTokens
cachedTokens := completedEvent.Response.Usage.InputTokensDetails.CachedTokens
if inputTokens > 0 || outputTokens > 0 {
if responseModel == "" {
responseModel = requestModel
}
if responseModel != "" {
contextWindow := detectContextWindow(responseModel, serviceTier, inputTokens)
usageTracker.AddUsageWithCycleHint(
responseModel,
contextWindow,
inputTokens,
outputTokens,
cachedTokens,
serviceTier,
username,
time.Now(),
weeklyCycleHint,
)
if json.Unmarshal(data, &event) == nil {
switch event.Type {
case "codex.rate_limits":
s.handleWebSocketRateLimitsEvent(data, selectedCredential)
case "error":
if event.StatusCode == http.StatusTooManyRequests {
s.handleWebSocketErrorRateLimited(data, selectedCredential)
}
case "response.completed":
if usageTracker != nil {
select {
case model := <-modelChannel:
requestModel = model
default:
}
s.handleWebSocketResponseCompleted(data, usageTracker, requestModel, username, weeklyCycleHint)
}
}
}
@@ -350,3 +334,98 @@ func (s *Service) proxyWebSocketUpstreamToClient(upstreamReadWriter io.ReadWrite
}
}
}
func (s *Service) handleWebSocketRateLimitsEvent(data []byte, selectedCredential credential) {
var rateLimitsEvent struct {
RateLimits struct {
Primary *struct {
UsedPercent float64 `json:"used_percent"`
ResetAt int64 `json:"reset_at"`
} `json:"primary"`
Secondary *struct {
UsedPercent float64 `json:"used_percent"`
ResetAt int64 `json:"reset_at"`
} `json:"secondary"`
} `json:"rate_limits"`
LimitName string `json:"limit_name"`
MeteredLimitName string `json:"metered_limit_name"`
}
err := json.Unmarshal(data, &rateLimitsEvent)
if err != nil {
return
}
identifier := rateLimitsEvent.MeteredLimitName
if identifier == "" {
identifier = rateLimitsEvent.LimitName
}
if identifier == "" {
identifier = "codex"
}
identifier = normalizeRateLimitIdentifier(identifier)
headers := make(http.Header)
headers.Set("x-codex-active-limit", identifier)
if w := rateLimitsEvent.RateLimits.Primary; w != nil {
headers.Set("x-"+identifier+"-primary-used-percent", strconv.FormatFloat(w.UsedPercent, 'f', -1, 64))
if w.ResetAt > 0 {
headers.Set("x-"+identifier+"-primary-reset-at", strconv.FormatInt(w.ResetAt, 10))
}
}
if w := rateLimitsEvent.RateLimits.Secondary; w != nil {
headers.Set("x-"+identifier+"-secondary-used-percent", strconv.FormatFloat(w.UsedPercent, 'f', -1, 64))
if w.ResetAt > 0 {
headers.Set("x-"+identifier+"-secondary-reset-at", strconv.FormatInt(w.ResetAt, 10))
}
}
selectedCredential.updateStateFromHeaders(headers)
}
func (s *Service) handleWebSocketErrorRateLimited(data []byte, selectedCredential credential) {
var errorEvent struct {
Headers map[string]string `json:"headers"`
}
err := json.Unmarshal(data, &errorEvent)
if err != nil {
return
}
headers := make(http.Header)
for key, value := range errorEvent.Headers {
headers.Set(key, value)
}
selectedCredential.updateStateFromHeaders(headers)
resetAt := parseOCMRateLimitResetFromHeaders(headers)
selectedCredential.markRateLimited(resetAt)
}
func (s *Service) handleWebSocketResponseCompleted(data []byte, usageTracker *AggregatedUsage, requestModel string, username string, weeklyCycleHint *WeeklyCycleHint) {
var streamEvent responses.ResponseStreamEventUnion
if json.Unmarshal(data, &streamEvent) != nil {
return
}
completedEvent := streamEvent.AsResponseCompleted()
responseModel := string(completedEvent.Response.Model)
serviceTier := string(completedEvent.Response.ServiceTier)
inputTokens := completedEvent.Response.Usage.InputTokens
outputTokens := completedEvent.Response.Usage.OutputTokens
cachedTokens := completedEvent.Response.Usage.InputTokensDetails.CachedTokens
if inputTokens > 0 || outputTokens > 0 {
if responseModel == "" {
responseModel = requestModel
}
if responseModel != "" {
contextWindow := detectContextWindow(responseModel, serviceTier, inputTokens)
usageTracker.AddUsageWithCycleHint(
responseModel,
contextWindow,
inputTokens,
outputTokens,
cachedTokens,
serviceTier,
username,
time.Now(),
weeklyCycleHint,
)
}
}
}