diff --git a/service/ccm/credential_external.go b/service/ccm/credential_external.go index a0350a9fd..8a550719b 100644 --- a/service/ccm/credential_external.go +++ b/service/ccm/credential_external.go @@ -368,27 +368,34 @@ func (c *externalCredential) updateStateFromHeaders(headers http.Header) { isFirstUpdate := c.state.lastUpdated.IsZero() oldFiveHour := c.state.fiveHourUtilization oldWeekly := c.state.weeklyUtilization + hadData := false if value, exists := parseOptionalAnthropicResetHeader(headers, "anthropic-ratelimit-unified-5h-reset"); exists { + hadData = true c.state.fiveHourReset = value } if utilization := headers.Get("anthropic-ratelimit-unified-5h-utilization"); utilization != "" { value, err := strconv.ParseFloat(utilization, 64) if err == nil { + hadData = true c.state.fiveHourUtilization = value * 100 } } if value, exists := parseOptionalAnthropicResetHeader(headers, "anthropic-ratelimit-unified-7d-reset"); exists { + hadData = true c.state.weeklyReset = value } if utilization := headers.Get("anthropic-ratelimit-unified-7d-utilization"); utilization != "" { value, err := strconv.ParseFloat(utilization, 64) if err == nil { + hadData = true c.state.weeklyUtilization = value * 100 } } - c.state.lastUpdated = time.Now() + if hadData { + c.state.lastUpdated = time.Now() + } if isFirstUpdate || int(c.state.fiveHourUtilization*100) != int(oldFiveHour*100) || int(c.state.weeklyUtilization*100) != int(oldWeekly*100) { c.logger.Debug("usage update for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%") } diff --git a/service/ccm/credential_state.go b/service/ccm/credential_state.go index 788058547..35fb52dce 100644 --- a/service/ccm/credential_state.go +++ b/service/ccm/credential_state.go @@ -337,9 +337,11 @@ func (c *defaultCredential) updateStateFromHeaders(headers http.Header) { isFirstUpdate := c.state.lastUpdated.IsZero() oldFiveHour := c.state.fiveHourUtilization oldWeekly := c.state.weeklyUtilization + hadData := false fiveHourResetChanged := false if value, exists := parseOptionalAnthropicResetHeader(headers, "anthropic-ratelimit-unified-5h-reset"); exists { + hadData = true if value.After(c.state.fiveHourReset) { fiveHourResetChanged = true c.state.fiveHourReset = value @@ -348,6 +350,7 @@ func (c *defaultCredential) updateStateFromHeaders(headers http.Header) { if utilization := headers.Get("anthropic-ratelimit-unified-5h-utilization"); utilization != "" { value, err := strconv.ParseFloat(utilization, 64) if err == nil { + hadData = true newValue := math.Ceil(value * 100) if newValue >= c.state.fiveHourUtilization || fiveHourResetChanged { c.state.fiveHourUtilization = newValue @@ -357,6 +360,7 @@ func (c *defaultCredential) updateStateFromHeaders(headers http.Header) { weeklyResetChanged := false if value, exists := parseOptionalAnthropicResetHeader(headers, "anthropic-ratelimit-unified-7d-reset"); exists { + hadData = true if value.After(c.state.weeklyReset) { weeklyResetChanged = true c.state.weeklyReset = value @@ -365,13 +369,16 @@ func (c *defaultCredential) updateStateFromHeaders(headers http.Header) { if utilization := headers.Get("anthropic-ratelimit-unified-7d-utilization"); utilization != "" { value, err := strconv.ParseFloat(utilization, 64) if err == nil { + hadData = true newValue := math.Ceil(value * 100) if newValue >= c.state.weeklyUtilization || weeklyResetChanged { c.state.weeklyUtilization = newValue } } } - c.state.lastUpdated = time.Now() + if hadData { + c.state.lastUpdated = time.Now() + } if isFirstUpdate || int(c.state.fiveHourUtilization*100) != int(oldFiveHour*100) || int(c.state.weeklyUtilization*100) != int(oldWeekly*100) { c.logger.Debug("usage update for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%") } diff --git a/service/ocm/credential_external.go b/service/ocm/credential_external.go index 5c42350d7..edc369edd 100644 --- a/service/ocm/credential_external.go +++ b/service/ocm/credential_external.go @@ -390,6 +390,7 @@ func (c *externalCredential) updateStateFromHeaders(headers http.Header) { isFirstUpdate := c.state.lastUpdated.IsZero() oldFiveHour := c.state.fiveHourUtilization oldWeekly := c.state.weeklyUtilization + hadData := false activeLimitIdentifier := normalizeRateLimitIdentifier(headers.Get("x-codex-active-limit")) if activeLimitIdentifier == "" { @@ -400,6 +401,7 @@ func (c *externalCredential) updateStateFromHeaders(headers http.Header) { if fiveHourResetAt != "" { value, err := strconv.ParseInt(fiveHourResetAt, 10, 64) if err == nil { + hadData = true c.state.fiveHourReset = time.Unix(value, 0) } } @@ -407,6 +409,7 @@ func (c *externalCredential) updateStateFromHeaders(headers http.Header) { if fiveHourPercent != "" { value, err := strconv.ParseFloat(fiveHourPercent, 64) if err == nil { + hadData = true c.state.fiveHourUtilization = value } } @@ -415,6 +418,7 @@ func (c *externalCredential) updateStateFromHeaders(headers http.Header) { if weeklyResetAt != "" { value, err := strconv.ParseInt(weeklyResetAt, 10, 64) if err == nil { + hadData = true c.state.weeklyReset = time.Unix(value, 0) } } @@ -422,10 +426,13 @@ func (c *externalCredential) updateStateFromHeaders(headers http.Header) { if weeklyPercent != "" { value, err := strconv.ParseFloat(weeklyPercent, 64) if err == nil { + hadData = true c.state.weeklyUtilization = value } } - c.state.lastUpdated = time.Now() + if hadData { + c.state.lastUpdated = time.Now() + } if isFirstUpdate || int(c.state.fiveHourUtilization*100) != int(oldFiveHour*100) || int(c.state.weeklyUtilization*100) != int(oldWeekly*100) { c.logger.Debug("usage update for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%") } diff --git a/service/ocm/credential_state.go b/service/ocm/credential_state.go index 81019336b..2de63b960 100644 --- a/service/ocm/credential_state.go +++ b/service/ocm/credential_state.go @@ -339,6 +339,7 @@ func (c *defaultCredential) updateStateFromHeaders(headers http.Header) { isFirstUpdate := c.state.lastUpdated.IsZero() oldFiveHour := c.state.fiveHourUtilization oldWeekly := c.state.weeklyUtilization + hadData := false activeLimitIdentifier := normalizeRateLimitIdentifier(headers.Get("x-codex-active-limit")) if activeLimitIdentifier == "" { @@ -350,6 +351,7 @@ func (c *defaultCredential) updateStateFromHeaders(headers http.Header) { if fiveHourResetAt != "" { value, err := strconv.ParseInt(fiveHourResetAt, 10, 64) if err == nil { + hadData = true newReset := time.Unix(value, 0) if newReset.After(c.state.fiveHourReset) { fiveHourResetChanged = true @@ -361,6 +363,7 @@ func (c *defaultCredential) updateStateFromHeaders(headers http.Header) { if fiveHourPercent != "" { value, err := strconv.ParseFloat(fiveHourPercent, 64) if err == nil { + hadData = true if value >= c.state.fiveHourUtilization || fiveHourResetChanged { c.state.fiveHourUtilization = value } @@ -372,6 +375,7 @@ func (c *defaultCredential) updateStateFromHeaders(headers http.Header) { if weeklyResetAt != "" { value, err := strconv.ParseInt(weeklyResetAt, 10, 64) if err == nil { + hadData = true newReset := time.Unix(value, 0) if newReset.After(c.state.weeklyReset) { weeklyResetChanged = true @@ -383,12 +387,15 @@ func (c *defaultCredential) updateStateFromHeaders(headers http.Header) { if weeklyPercent != "" { value, err := strconv.ParseFloat(weeklyPercent, 64) if err == nil { + hadData = true if value >= c.state.weeklyUtilization || weeklyResetChanged { c.state.weeklyUtilization = value } } } - c.state.lastUpdated = time.Now() + if hadData { + c.state.lastUpdated = time.Now() + } if isFirstUpdate || int(c.state.fiveHourUtilization*100) != int(oldFiveHour*100) || int(c.state.weeklyUtilization*100) != int(oldWeekly*100) { c.logger.Debug("usage update for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%") } diff --git a/service/ocm/service_websocket.go b/service/ocm/service_websocket.go index 7aa68499c..d3f2535c0 100644 --- a/service/ocm/service_websocket.go +++ b/service/ocm/service_websocket.go @@ -9,6 +9,7 @@ import ( "net" "net/http" "net/textproto" + "strconv" "strings" "sync" "time" @@ -174,8 +175,8 @@ func (s *Service) handleWebSocket( if statusCode == http.StatusTooManyRequests { resetAt := parseOCMRateLimitResetFromHeaders(upstreamResponseHeaders) nextCredential := provider.onRateLimited(sessionID, selectedCredential, resetAt, credentialFilter) + selectedCredential.updateStateFromHeaders(upstreamResponseHeaders) if nextCredential == nil { - selectedCredential.updateStateFromHeaders(upstreamResponseHeaders) writeCredentialUnavailableError(w, r, provider, selectedCredential, credentialFilter, "all credentials rate-limited") return } @@ -298,44 +299,27 @@ func (s *Service) proxyWebSocketUpstreamToClient(upstreamReadWriter io.ReadWrite return } - if opCode == ws.OpText && usageTracker != nil { - select { - case model := <-modelChannel: - requestModel = model - default: - } - + if opCode == ws.OpText { var event struct { - Type string `json:"type"` + Type string `json:"type"` + StatusCode int `json:"status_code"` } - if json.Unmarshal(data, &event) == nil && event.Type == "response.completed" { - var streamEvent responses.ResponseStreamEventUnion - if json.Unmarshal(data, &streamEvent) == nil { - completedEvent := streamEvent.AsResponseCompleted() - responseModel := string(completedEvent.Response.Model) - serviceTier := string(completedEvent.Response.ServiceTier) - inputTokens := completedEvent.Response.Usage.InputTokens - outputTokens := completedEvent.Response.Usage.OutputTokens - cachedTokens := completedEvent.Response.Usage.InputTokensDetails.CachedTokens - - if inputTokens > 0 || outputTokens > 0 { - if responseModel == "" { - responseModel = requestModel - } - if responseModel != "" { - contextWindow := detectContextWindow(responseModel, serviceTier, inputTokens) - usageTracker.AddUsageWithCycleHint( - responseModel, - contextWindow, - inputTokens, - outputTokens, - cachedTokens, - serviceTier, - username, - time.Now(), - weeklyCycleHint, - ) + if json.Unmarshal(data, &event) == nil { + switch event.Type { + case "codex.rate_limits": + s.handleWebSocketRateLimitsEvent(data, selectedCredential) + case "error": + if event.StatusCode == http.StatusTooManyRequests { + s.handleWebSocketErrorRateLimited(data, selectedCredential) + } + case "response.completed": + if usageTracker != nil { + select { + case model := <-modelChannel: + requestModel = model + default: } + s.handleWebSocketResponseCompleted(data, usageTracker, requestModel, username, weeklyCycleHint) } } } @@ -350,3 +334,98 @@ func (s *Service) proxyWebSocketUpstreamToClient(upstreamReadWriter io.ReadWrite } } } + +func (s *Service) handleWebSocketRateLimitsEvent(data []byte, selectedCredential credential) { + var rateLimitsEvent struct { + RateLimits struct { + Primary *struct { + UsedPercent float64 `json:"used_percent"` + ResetAt int64 `json:"reset_at"` + } `json:"primary"` + Secondary *struct { + UsedPercent float64 `json:"used_percent"` + ResetAt int64 `json:"reset_at"` + } `json:"secondary"` + } `json:"rate_limits"` + LimitName string `json:"limit_name"` + MeteredLimitName string `json:"metered_limit_name"` + } + err := json.Unmarshal(data, &rateLimitsEvent) + if err != nil { + return + } + identifier := rateLimitsEvent.MeteredLimitName + if identifier == "" { + identifier = rateLimitsEvent.LimitName + } + if identifier == "" { + identifier = "codex" + } + identifier = normalizeRateLimitIdentifier(identifier) + + headers := make(http.Header) + headers.Set("x-codex-active-limit", identifier) + if w := rateLimitsEvent.RateLimits.Primary; w != nil { + headers.Set("x-"+identifier+"-primary-used-percent", strconv.FormatFloat(w.UsedPercent, 'f', -1, 64)) + if w.ResetAt > 0 { + headers.Set("x-"+identifier+"-primary-reset-at", strconv.FormatInt(w.ResetAt, 10)) + } + } + if w := rateLimitsEvent.RateLimits.Secondary; w != nil { + headers.Set("x-"+identifier+"-secondary-used-percent", strconv.FormatFloat(w.UsedPercent, 'f', -1, 64)) + if w.ResetAt > 0 { + headers.Set("x-"+identifier+"-secondary-reset-at", strconv.FormatInt(w.ResetAt, 10)) + } + } + selectedCredential.updateStateFromHeaders(headers) +} + +func (s *Service) handleWebSocketErrorRateLimited(data []byte, selectedCredential credential) { + var errorEvent struct { + Headers map[string]string `json:"headers"` + } + err := json.Unmarshal(data, &errorEvent) + if err != nil { + return + } + headers := make(http.Header) + for key, value := range errorEvent.Headers { + headers.Set(key, value) + } + selectedCredential.updateStateFromHeaders(headers) + resetAt := parseOCMRateLimitResetFromHeaders(headers) + selectedCredential.markRateLimited(resetAt) +} + +func (s *Service) handleWebSocketResponseCompleted(data []byte, usageTracker *AggregatedUsage, requestModel string, username string, weeklyCycleHint *WeeklyCycleHint) { + var streamEvent responses.ResponseStreamEventUnion + if json.Unmarshal(data, &streamEvent) != nil { + return + } + completedEvent := streamEvent.AsResponseCompleted() + responseModel := string(completedEvent.Response.Model) + serviceTier := string(completedEvent.Response.ServiceTier) + inputTokens := completedEvent.Response.Usage.InputTokens + outputTokens := completedEvent.Response.Usage.OutputTokens + cachedTokens := completedEvent.Response.Usage.InputTokensDetails.CachedTokens + + if inputTokens > 0 || outputTokens > 0 { + if responseModel == "" { + responseModel = requestModel + } + if responseModel != "" { + contextWindow := detectContextWindow(responseModel, serviceTier, inputTokens) + usageTracker.AddUsageWithCycleHint( + responseModel, + contextWindow, + inputTokens, + outputTokens, + cachedTokens, + serviceTier, + username, + time.Now(), + weeklyCycleHint, + ) + } + } +}