diff --git a/service/ccm/credential.go b/service/ccm/credential.go index e3788c43f..48b97b95b 100644 --- a/service/ccm/credential.go +++ b/service/ccm/credential.go @@ -63,13 +63,6 @@ type credentialState struct { availabilityReason availabilityReason availabilityResetAt time.Time lastKnownDataAt time.Time - unifiedStatus unifiedRateLimitStatus - unifiedResetAt time.Time - representativeClaim string - unifiedFallbackAvailable bool - overageStatus string - overageResetAt time.Time - overageDisabledReason string accountUUID string accountType string rateLimitTier string @@ -125,7 +118,6 @@ type Credential interface { markRateLimited(resetAt time.Time) markUpstreamRejected() availabilityStatus() availabilityStatus - unifiedRateLimitState() unifiedRateLimitInfo earliestReset() time.Time unavailableError() error @@ -252,17 +244,6 @@ func (s credentialState) currentAvailability() availabilityStatus { } } -func (s credentialState) currentUnifiedRateLimit() unifiedRateLimitInfo { - return unifiedRateLimitInfo{ - Status: s.unifiedStatus, - ResetAt: s.unifiedResetAt, - RepresentativeClaim: s.representativeClaim, - FallbackAvailable: s.unifiedFallbackAvailable, - OverageStatus: s.overageStatus, - OverageResetAt: s.overageResetAt, - OverageDisabledReason: s.overageDisabledReason, - }.normalized() -} func parseRateLimitResetFromHeaders(headers http.Header) time.Time { claim := headers.Get("anthropic-ratelimit-unified-representative-claim") diff --git a/service/ccm/credential_default.go b/service/ccm/credential_default.go index 60a02c55b..bf8404f88 100644 --- a/service/ccm/credential_default.go +++ b/service/ccm/credential_default.go @@ -629,19 +629,6 @@ func (c *defaultCredential) updateStateFromHeaders(headers http.Header) { c.state.lastUpdated = time.Now() c.state.noteSnapshotData() } - if unifiedStatus := unifiedRateLimitStatus(headers.Get("anthropic-ratelimit-unified-status")); unifiedStatus != "" { - c.state.unifiedStatus = unifiedStatus - } - if value, exists := parseOptionalAnthropicResetHeader(headers, "anthropic-ratelimit-unified-reset"); exists { - c.state.unifiedResetAt = value - } - c.state.representativeClaim = headers.Get("anthropic-ratelimit-unified-representative-claim") - c.state.unifiedFallbackAvailable = headers.Get("anthropic-ratelimit-unified-fallback") == "available" - c.state.overageStatus = headers.Get("anthropic-ratelimit-unified-overage-status") - if value, exists := parseOptionalAnthropicResetHeader(headers, "anthropic-ratelimit-unified-overage-reset"); exists { - c.state.overageResetAt = value - } - c.state.overageDisabledReason = headers.Get("anthropic-ratelimit-unified-overage-disabled-reason") if isFirstUpdate || int(c.state.fiveHourUtilization*100) != int(oldFiveHour*100) || int(c.state.weeklyUtilization*100) != int(oldWeekly*100) { resetSuffix := "" if !c.state.weeklyReset.IsZero() { @@ -666,8 +653,6 @@ func (c *defaultCredential) markRateLimited(resetAt time.Time) { c.state.hardRateLimited = true c.state.rateLimitResetAt = resetAt c.state.setAvailability(availabilityStateRateLimited, availabilityReasonHardRateLimit, resetAt) - c.state.unifiedStatus = unifiedRateLimitStatusRejected - c.state.unifiedResetAt = resetAt shouldInterrupt := c.checkTransitionLocked() c.stateAccess.Unlock() if shouldInterrupt { @@ -800,12 +785,6 @@ func (c *defaultCredential) availabilityStatus() availabilityStatus { return c.state.currentAvailability() } -func (c *defaultCredential) unifiedRateLimitState() unifiedRateLimitInfo { - c.stateAccess.RLock() - defer c.stateAccess.RUnlock() - return c.state.currentUnifiedRateLimit() -} - func (c *defaultCredential) unavailableError() error { c.stateAccess.RLock() defer c.stateAccess.RUnlock() diff --git a/service/ccm/credential_external.go b/service/ccm/credential_external.go index 11ddc8dad..f57ead158 100644 --- a/service/ccm/credential_external.go +++ b/service/ccm/credential_external.go @@ -337,8 +337,6 @@ func (c *externalCredential) markRateLimited(resetAt time.Time) { c.state.hardRateLimited = true c.state.rateLimitResetAt = resetAt c.state.setAvailability(availabilityStateRateLimited, availabilityReasonHardRateLimit, resetAt) - c.state.unifiedStatus = unifiedRateLimitStatusRejected - c.state.unifiedResetAt = resetAt shouldInterrupt := c.checkTransitionLocked() c.stateAccess.Unlock() if shouldInterrupt { @@ -492,19 +490,6 @@ func (c *externalCredential) updateStateFromHeaders(headers http.Header) { c.state.lastUpdated = time.Now() c.state.noteSnapshotData() } - if unifiedStatus := unifiedRateLimitStatus(headers.Get("anthropic-ratelimit-unified-status")); unifiedStatus != "" { - c.state.unifiedStatus = unifiedStatus - } - if value, exists := parseOptionalAnthropicResetHeader(headers, "anthropic-ratelimit-unified-reset"); exists { - c.state.unifiedResetAt = value - } - c.state.representativeClaim = headers.Get("anthropic-ratelimit-unified-representative-claim") - c.state.unifiedFallbackAvailable = headers.Get("anthropic-ratelimit-unified-fallback") == "available" - c.state.overageStatus = headers.Get("anthropic-ratelimit-unified-overage-status") - if value, exists := parseOptionalAnthropicResetHeader(headers, "anthropic-ratelimit-unified-overage-reset"); exists { - c.state.overageResetAt = value - } - c.state.overageDisabledReason = headers.Get("anthropic-ratelimit-unified-overage-disabled-reason") if isFirstUpdate || int(c.state.fiveHourUtilization*100) != int(oldFiveHour*100) || int(c.state.weeklyUtilization*100) != int(oldWeekly*100) { resetSuffix := "" if !c.state.weeklyReset.IsZero() { @@ -662,11 +647,6 @@ func (c *externalCredential) pollUsage() { c.state.upstreamRejectedUntil = time.Time{} c.state.fiveHourUtilization = statusResponse.FiveHourUtilization c.state.weeklyUtilization = statusResponse.WeeklyUtilization - c.state.unifiedStatus = unifiedRateLimitStatus(statusResponse.UnifiedStatus) - c.state.representativeClaim = statusResponse.RepresentativeClaim - c.state.unifiedFallbackAvailable = statusResponse.FallbackAvailable - c.state.overageStatus = statusResponse.OverageStatus - c.state.overageDisabledReason = statusResponse.OverageDisabledReason if statusResponse.PlanWeight > 0 { c.state.remotePlanWeight = statusResponse.PlanWeight } @@ -676,30 +656,6 @@ func (c *externalCredential) pollUsage() { if statusResponse.WeeklyReset > 0 { c.state.weeklyReset = time.Unix(statusResponse.WeeklyReset, 0) } - if statusResponse.UnifiedReset > 0 { - c.state.unifiedResetAt = time.Unix(statusResponse.UnifiedReset, 0) - } - if statusResponse.OverageReset > 0 { - c.state.overageResetAt = time.Unix(statusResponse.OverageReset, 0) - } - if statusResponse.Availability != nil { - switch availabilityState(statusResponse.Availability.State) { - case availabilityStateRateLimited: - c.state.hardRateLimited = true - if statusResponse.Availability.ResetAt > 0 { - c.state.rateLimitResetAt = time.Unix(statusResponse.Availability.ResetAt, 0) - } - case availabilityStateTemporarilyBlocked: - resetAt := time.Time{} - if statusResponse.Availability.ResetAt > 0 { - resetAt = time.Unix(statusResponse.Availability.ResetAt, 0) - } - c.state.setAvailability(availabilityStateTemporarilyBlocked, availabilityReason(statusResponse.Availability.Reason), resetAt) - if availabilityReason(statusResponse.Availability.Reason) == availabilityReasonUpstreamRejected && !resetAt.IsZero() { - c.state.upstreamRejectedUntil = resetAt - } - } - } if c.state.hardRateLimited && time.Now().After(c.state.rateLimitResetAt) { c.state.hardRateLimited = false } @@ -800,11 +756,6 @@ func (c *externalCredential) connectStatusStream(ctx context.Context) (statusStr c.state.upstreamRejectedUntil = time.Time{} c.state.fiveHourUtilization = statusResponse.FiveHourUtilization c.state.weeklyUtilization = statusResponse.WeeklyUtilization - c.state.unifiedStatus = unifiedRateLimitStatus(statusResponse.UnifiedStatus) - c.state.representativeClaim = statusResponse.RepresentativeClaim - c.state.unifiedFallbackAvailable = statusResponse.FallbackAvailable - c.state.overageStatus = statusResponse.OverageStatus - c.state.overageDisabledReason = statusResponse.OverageDisabledReason if statusResponse.PlanWeight > 0 { c.state.remotePlanWeight = statusResponse.PlanWeight } @@ -814,30 +765,6 @@ func (c *externalCredential) connectStatusStream(ctx context.Context) (statusStr if statusResponse.WeeklyReset > 0 { c.state.weeklyReset = time.Unix(statusResponse.WeeklyReset, 0) } - if statusResponse.UnifiedReset > 0 { - c.state.unifiedResetAt = time.Unix(statusResponse.UnifiedReset, 0) - } - if statusResponse.OverageReset > 0 { - c.state.overageResetAt = time.Unix(statusResponse.OverageReset, 0) - } - if statusResponse.Availability != nil { - switch availabilityState(statusResponse.Availability.State) { - case availabilityStateRateLimited: - c.state.hardRateLimited = true - if statusResponse.Availability.ResetAt > 0 { - c.state.rateLimitResetAt = time.Unix(statusResponse.Availability.ResetAt, 0) - } - case availabilityStateTemporarilyBlocked: - resetAt := time.Time{} - if statusResponse.Availability.ResetAt > 0 { - resetAt = time.Unix(statusResponse.Availability.ResetAt, 0) - } - c.state.setAvailability(availabilityStateTemporarilyBlocked, availabilityReason(statusResponse.Availability.Reason), resetAt) - if availabilityReason(statusResponse.Availability.Reason) == availabilityReasonUpstreamRejected && !resetAt.IsZero() { - c.state.upstreamRejectedUntil = resetAt - } - } - } if c.state.hardRateLimited && time.Now().After(c.state.rateLimitResetAt) { c.state.hardRateLimited = false } @@ -921,11 +848,6 @@ func (c *externalCredential) availabilityStatus() availabilityStatus { return c.state.currentAvailability() } -func (c *externalCredential) unifiedRateLimitState() unifiedRateLimitInfo { - c.stateAccess.RLock() - defer c.stateAccess.RUnlock() - return c.state.currentUnifiedRateLimit() -} func (c *externalCredential) markUsageStreamUpdated() { c.stateAccess.Lock() diff --git a/service/ccm/rate_limit_state.go b/service/ccm/rate_limit_state.go index ab584419f..696fe8142 100644 --- a/service/ccm/rate_limit_state.go +++ b/service/ccm/rate_limit_state.go @@ -29,12 +29,6 @@ type availabilityStatus struct { ResetAt time.Time } -type availabilityPayload struct { - State string `json:"state"` - Reason string `json:"reason,omitempty"` - ResetAt int64 `json:"reset_at,omitempty"` -} - func (s availabilityStatus) normalized() availabilityStatus { if s.State == "" { s.State = availabilityStateUnknown @@ -45,48 +39,6 @@ func (s availabilityStatus) normalized() availabilityStatus { return s } -func (s availabilityStatus) toPayload() *availabilityPayload { - s = s.normalized() - if s.State == "" { - return nil - } - payload := &availabilityPayload{ - State: string(s.State), - } - if s.Reason != "" && s.Reason != availabilityReasonUnknown { - payload.Reason = string(s.Reason) - } - if !s.ResetAt.IsZero() { - payload.ResetAt = s.ResetAt.Unix() - } - return payload -} - -type unifiedRateLimitStatus string - -const ( - unifiedRateLimitStatusAllowed unifiedRateLimitStatus = "allowed" - unifiedRateLimitStatusAllowedWarning unifiedRateLimitStatus = "allowed_warning" - unifiedRateLimitStatusRejected unifiedRateLimitStatus = "rejected" -) - -type unifiedRateLimitInfo struct { - Status unifiedRateLimitStatus - ResetAt time.Time - RepresentativeClaim string - FallbackAvailable bool - OverageStatus string - OverageResetAt time.Time - OverageDisabledReason string -} - -func (s unifiedRateLimitInfo) normalized() unifiedRateLimitInfo { - if s.Status == "" { - s.Status = unifiedRateLimitStatusAllowed - } - return s -} - func claudeWindowProgress(resetAt time.Time, windowSeconds float64, now time.Time) float64 { if resetAt.IsZero() || windowSeconds <= 0 { return 0 diff --git a/service/ccm/service_status.go b/service/ccm/service_status.go index 11ae3fd3a..f5bd9bc63 100644 --- a/service/ccm/service_status.go +++ b/service/ccm/service_status.go @@ -13,19 +13,11 @@ import ( ) type statusPayload struct { - FiveHourUtilization float64 `json:"five_hour_utilization"` - FiveHourReset int64 `json:"five_hour_reset"` - WeeklyUtilization float64 `json:"weekly_utilization"` - WeeklyReset int64 `json:"weekly_reset"` - PlanWeight float64 `json:"plan_weight"` - UnifiedStatus string `json:"unified_status,omitempty"` - UnifiedReset int64 `json:"unified_reset,omitempty"` - RepresentativeClaim string `json:"representative_claim,omitempty"` - FallbackAvailable bool `json:"fallback_available,omitempty"` - OverageStatus string `json:"overage_status,omitempty"` - OverageReset int64 `json:"overage_reset,omitempty"` - OverageDisabledReason string `json:"overage_disabled_reason,omitempty"` - Availability *availabilityPayload `json:"availability,omitempty"` + FiveHourUtilization float64 `json:"five_hour_utilization"` + FiveHourReset int64 `json:"five_hour_reset"` + WeeklyUtilization float64 `json:"weekly_utilization"` + WeeklyReset int64 `json:"weekly_reset"` + PlanWeight float64 `json:"plan_weight"` } type aggregatedStatus struct { @@ -34,7 +26,6 @@ type aggregatedStatus struct { totalWeight float64 fiveHourReset time.Time weeklyReset time.Time - unifiedRateLimit unifiedRateLimitInfo availability availabilityStatus } @@ -50,27 +41,17 @@ func (s aggregatedStatus) equal(other aggregatedStatus) bool { } func (s aggregatedStatus) toPayload() statusPayload { - unified := s.unifiedRateLimit.normalized() return statusPayload{ - FiveHourUtilization: s.fiveHourUtilization, - FiveHourReset: resetToEpoch(s.fiveHourReset), - WeeklyUtilization: s.weeklyUtilization, - WeeklyReset: resetToEpoch(s.weeklyReset), - PlanWeight: s.totalWeight, - UnifiedStatus: string(unified.Status), - UnifiedReset: resetToEpoch(unified.ResetAt), - RepresentativeClaim: unified.RepresentativeClaim, - FallbackAvailable: unified.FallbackAvailable, - OverageStatus: unified.OverageStatus, - OverageReset: resetToEpoch(unified.OverageResetAt), - OverageDisabledReason: unified.OverageDisabledReason, - Availability: s.availability.toPayload(), + FiveHourUtilization: s.fiveHourUtilization, + FiveHourReset: resetToEpoch(s.fiveHourReset), + WeeklyUtilization: s.weeklyUtilization, + WeeklyReset: resetToEpoch(s.weeklyReset), + PlanWeight: s.totalWeight, } } type aggregateInput struct { availability availabilityStatus - unified unifiedRateLimitInfo } func aggregateAvailability(inputs []aggregateInput) availabilityStatus { @@ -133,7 +114,9 @@ func aggregateAvailability(inputs []aggregateInput) availabilityStatus { } } -func chooseRepresentativeClaim(status unifiedRateLimitStatus, fiveHourUtilization float64, fiveHourReset time.Time, weeklyUtilization float64, weeklyReset time.Time, now time.Time) string { +func chooseRepresentativeClaim(fiveHourUtilization float64, fiveHourReset time.Time, weeklyUtilization float64, weeklyReset time.Time, now time.Time) string { + fiveHourWarning := claudeFiveHourWarning(fiveHourUtilization, fiveHourReset, now) + weeklyWarning := claudeWeeklyWarning(weeklyUtilization, weeklyReset, now) type claimCandidate struct { name string priority int @@ -142,15 +125,15 @@ func chooseRepresentativeClaim(status unifiedRateLimitStatus, fiveHourUtilizatio candidateFor := func(name string, utilization float64, warning bool) claimCandidate { priority := 0 switch { - case status == unifiedRateLimitStatusRejected && utilization >= 100: + case utilization >= 100: priority = 2 case warning: priority = 1 } return claimCandidate{name: name, priority: priority, utilization: utilization} } - five := candidateFor("5h", fiveHourUtilization, claudeFiveHourWarning(fiveHourUtilization, fiveHourReset, now)) - weekly := candidateFor("7d", weeklyUtilization, claudeWeeklyWarning(weeklyUtilization, weeklyReset, now)) + five := candidateFor("5h", fiveHourUtilization, fiveHourWarning) + weekly := candidateFor("7d", weeklyUtilization, weeklyWarning) switch { case five.priority > weekly.priority: return five.name @@ -169,53 +152,6 @@ func chooseRepresentativeClaim(status unifiedRateLimitStatus, fiveHourUtilizatio } } -func aggregateUnifiedRateLimit(inputs []aggregateInput, fiveHourUtilization float64, fiveHourReset time.Time, weeklyUtilization float64, weeklyReset time.Time, availability availabilityStatus) unifiedRateLimitInfo { - now := time.Now() - info := unifiedRateLimitInfo{} - usableCount := 0 - for _, input := range inputs { - if input.availability.State == availabilityStateUsable { - usableCount++ - } - if input.unified.OverageStatus != "" && info.OverageStatus == "" { - info.OverageStatus = input.unified.OverageStatus - info.OverageResetAt = input.unified.OverageResetAt - info.OverageDisabledReason = input.unified.OverageDisabledReason - } - if input.unified.Status == unifiedRateLimitStatusRejected { - info.Status = unifiedRateLimitStatusRejected - if !input.unified.ResetAt.IsZero() && (info.ResetAt.IsZero() || input.unified.ResetAt.Before(info.ResetAt)) { - info.ResetAt = input.unified.ResetAt - info.RepresentativeClaim = input.unified.RepresentativeClaim - } - } - } - if info.Status == "" { - switch { - case availability.State == availabilityStateRateLimited || fiveHourUtilization >= 100 || weeklyUtilization >= 100: - info.Status = unifiedRateLimitStatusRejected - info.ResetAt = availability.ResetAt - case claudeFiveHourWarning(fiveHourUtilization, fiveHourReset, now) || claudeWeeklyWarning(weeklyUtilization, weeklyReset, now): - info.Status = unifiedRateLimitStatusAllowedWarning - default: - info.Status = unifiedRateLimitStatusAllowed - } - } - info.FallbackAvailable = usableCount > 0 && len(inputs) > 1 - if info.RepresentativeClaim == "" { - info.RepresentativeClaim = chooseRepresentativeClaim(info.Status, fiveHourUtilization, fiveHourReset, weeklyUtilization, weeklyReset, now) - } - if info.ResetAt.IsZero() { - switch info.RepresentativeClaim { - case "7d": - info.ResetAt = weeklyReset - default: - info.ResetAt = fiveHourReset - } - } - return info.normalized() -} - func (s *Service) handleStatusEndpoint(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { writeJSONError(w, r, http.StatusMethodNotAllowed, "invalid_request_error", "method not allowed") @@ -350,7 +286,6 @@ func (s *Service) computeAggregatedUtilization(provider credentialProvider, user } visibleInputs = append(visibleInputs, aggregateInput{ availability: credential.availabilityStatus(), - unified: credential.unifiedRateLimitState(), }) if !credential.hasSnapshotData() { continue @@ -393,7 +328,6 @@ func (s *Service) computeAggregatedUtilization(provider credentialProvider, user result.fiveHourUtilization = 100 result.weeklyUtilization = 100 } - result.unifiedRateLimit = aggregateUnifiedRateLimit(visibleInputs, result.fiveHourUtilization, result.fiveHourReset, result.weeklyUtilization, result.weeklyReset, availability) return result } result := aggregatedStatus{ @@ -410,66 +344,55 @@ func (s *Service) computeAggregatedUtilization(provider credentialProvider, user avgHours := totalWeightedHoursUntilWeeklyReset / totalWeeklyResetWeight result.weeklyReset = now.Add(time.Duration(avgHours * float64(time.Hour))) } - result.unifiedRateLimit = aggregateUnifiedRateLimit(visibleInputs, result.fiveHourUtilization, result.fiveHourReset, result.weeklyUtilization, result.weeklyReset, availability) return result } func (s *Service) rewriteResponseHeaders(headers http.Header, provider credentialProvider, userConfig *option.CCMUser) { + for key := range headers { + if strings.HasPrefix(strings.ToLower(key), "anthropic-ratelimit-unified-") { + headers.Del(key) + } + } status := s.computeAggregatedUtilization(provider, userConfig) + now := time.Now() headers.Set("anthropic-ratelimit-unified-5h-utilization", strconv.FormatFloat(status.fiveHourUtilization/100, 'f', 6, 64)) headers.Set("anthropic-ratelimit-unified-7d-utilization", strconv.FormatFloat(status.weeklyUtilization/100, 'f', 6, 64)) if !status.fiveHourReset.IsZero() { headers.Set("anthropic-ratelimit-unified-5h-reset", strconv.FormatInt(status.fiveHourReset.Unix(), 10)) - } else { - headers.Del("anthropic-ratelimit-unified-5h-reset") } if !status.weeklyReset.IsZero() { headers.Set("anthropic-ratelimit-unified-7d-reset", strconv.FormatInt(status.weeklyReset.Unix(), 10)) - } else { - headers.Del("anthropic-ratelimit-unified-7d-reset") } if status.totalWeight > 0 { headers.Set("X-CCM-Plan-Weight", strconv.FormatFloat(status.totalWeight, 'f', -1, 64)) } - headers.Set("anthropic-ratelimit-unified-status", string(status.unifiedRateLimit.normalized().Status)) - if !status.unifiedRateLimit.ResetAt.IsZero() { - headers.Set("anthropic-ratelimit-unified-reset", strconv.FormatInt(status.unifiedRateLimit.ResetAt.Unix(), 10)) - } else { - headers.Del("anthropic-ratelimit-unified-reset") + fiveHourWarning := claudeFiveHourWarning(status.fiveHourUtilization, status.fiveHourReset, now) + weeklyWarning := claudeWeeklyWarning(status.weeklyUtilization, status.weeklyReset, now) + switch { + case status.fiveHourUtilization >= 100 || status.weeklyUtilization >= 100 || + status.availability.State == availabilityStateRateLimited: + headers.Set("anthropic-ratelimit-unified-status", "rejected") + case fiveHourWarning || weeklyWarning: + headers.Set("anthropic-ratelimit-unified-status", "allowed_warning") + default: + headers.Set("anthropic-ratelimit-unified-status", "allowed") } - if status.unifiedRateLimit.RepresentativeClaim != "" { - headers.Set("anthropic-ratelimit-unified-representative-claim", status.unifiedRateLimit.RepresentativeClaim) - } else { - headers.Del("anthropic-ratelimit-unified-representative-claim") + claim := chooseRepresentativeClaim(status.fiveHourUtilization, status.fiveHourReset, status.weeklyUtilization, status.weeklyReset, now) + headers.Set("anthropic-ratelimit-unified-representative-claim", claim) + switch claim { + case "7d": + if !status.weeklyReset.IsZero() { + headers.Set("anthropic-ratelimit-unified-reset", strconv.FormatInt(status.weeklyReset.Unix(), 10)) + } + default: + if !status.fiveHourReset.IsZero() { + headers.Set("anthropic-ratelimit-unified-reset", strconv.FormatInt(status.fiveHourReset.Unix(), 10)) + } } - if status.unifiedRateLimit.FallbackAvailable { - headers.Set("anthropic-ratelimit-unified-fallback", "available") - } else { - headers.Del("anthropic-ratelimit-unified-fallback") - } - if status.unifiedRateLimit.OverageStatus != "" { - headers.Set("anthropic-ratelimit-unified-overage-status", status.unifiedRateLimit.OverageStatus) - } else { - headers.Del("anthropic-ratelimit-unified-overage-status") - } - if !status.unifiedRateLimit.OverageResetAt.IsZero() { - headers.Set("anthropic-ratelimit-unified-overage-reset", strconv.FormatInt(status.unifiedRateLimit.OverageResetAt.Unix(), 10)) - } else { - headers.Del("anthropic-ratelimit-unified-overage-reset") - } - if status.unifiedRateLimit.OverageDisabledReason != "" { - headers.Set("anthropic-ratelimit-unified-overage-disabled-reason", status.unifiedRateLimit.OverageDisabledReason) - } else { - headers.Del("anthropic-ratelimit-unified-overage-disabled-reason") - } - if claudeFiveHourWarning(status.fiveHourUtilization, status.fiveHourReset, time.Now()) || status.fiveHourUtilization >= 100 { + if fiveHourWarning || status.fiveHourUtilization >= 100 { headers.Set("anthropic-ratelimit-unified-5h-surpassed-threshold", "true") - } else { - headers.Del("anthropic-ratelimit-unified-5h-surpassed-threshold") } - if claudeWeeklyWarning(status.weeklyUtilization, status.weeklyReset, time.Now()) || status.weeklyUtilization >= 100 { + if weeklyWarning || status.weeklyUtilization >= 100 { headers.Set("anthropic-ratelimit-unified-7d-surpassed-threshold", "true") - } else { - headers.Del("anthropic-ratelimit-unified-7d-surpassed-threshold") } } diff --git a/service/ccm/service_status_test.go b/service/ccm/service_status_test.go index 9aef16de3..c2dbea1a2 100644 --- a/service/ccm/service_status_test.go +++ b/service/ccm/service_status_test.go @@ -24,28 +24,26 @@ type testCredential struct { fiveReset time.Time weeklyReset time.Time availability availabilityStatus - unified unifiedRateLimitInfo } -func (c *testCredential) tagName() string { return c.tag } -func (c *testCredential) isAvailable() bool { return c.available } -func (c *testCredential) isUsable() bool { return c.usable } -func (c *testCredential) isExternal() bool { return c.external } -func (c *testCredential) hasSnapshotData() bool { return c.hasData } -func (c *testCredential) fiveHourUtilization() float64 { return c.fiveHour } -func (c *testCredential) weeklyUtilization() float64 { return c.weekly } -func (c *testCredential) fiveHourCap() float64 { return c.fiveHourCapV } -func (c *testCredential) weeklyCap() float64 { return c.weeklyCapV } -func (c *testCredential) planWeight() float64 { return c.weight } -func (c *testCredential) fiveHourResetTime() time.Time { return c.fiveReset } -func (c *testCredential) weeklyResetTime() time.Time { return c.weeklyReset } -func (c *testCredential) markRateLimited(time.Time) {} -func (c *testCredential) markUpstreamRejected() {} -func (c *testCredential) availabilityStatus() availabilityStatus { return c.availability } -func (c *testCredential) unifiedRateLimitState() unifiedRateLimitInfo { return c.unified } -func (c *testCredential) earliestReset() time.Time { return c.fiveReset } -func (c *testCredential) unavailableError() error { return nil } -func (c *testCredential) getAccessToken() (string, error) { return "", nil } +func (c *testCredential) tagName() string { return c.tag } +func (c *testCredential) isAvailable() bool { return c.available } +func (c *testCredential) isUsable() bool { return c.usable } +func (c *testCredential) isExternal() bool { return c.external } +func (c *testCredential) hasSnapshotData() bool { return c.hasData } +func (c *testCredential) fiveHourUtilization() float64 { return c.fiveHour } +func (c *testCredential) weeklyUtilization() float64 { return c.weekly } +func (c *testCredential) fiveHourCap() float64 { return c.fiveHourCapV } +func (c *testCredential) weeklyCap() float64 { return c.weeklyCapV } +func (c *testCredential) planWeight() float64 { return c.weight } +func (c *testCredential) fiveHourResetTime() time.Time { return c.fiveReset } +func (c *testCredential) weeklyResetTime() time.Time { return c.weeklyReset } +func (c *testCredential) markRateLimited(time.Time) {} +func (c *testCredential) markUpstreamRejected() {} +func (c *testCredential) availabilityStatus() availabilityStatus { return c.availability } +func (c *testCredential) earliestReset() time.Time { return c.fiveReset } +func (c *testCredential) unavailableError() error { return nil } +func (c *testCredential) getAccessToken() (string, error) { return "", nil } func (c *testCredential) buildProxyRequest(context.Context, *http.Request, []byte, http.Header) (*http.Request, error) { return nil, nil } @@ -98,22 +96,18 @@ func TestComputeAggregatedUtilizationPreservesSnapshotForRateLimitedCredential(t fiveReset: reset, weeklyReset: reset.Add(2 * time.Hour), availability: availabilityStatus{State: availabilityStateRateLimited, Reason: availabilityReasonHardRateLimit, ResetAt: reset}, - unified: unifiedRateLimitInfo{Status: unifiedRateLimitStatusRejected, ResetAt: reset, RepresentativeClaim: "5h"}, }, }}, nil) if status.fiveHourUtilization != 42 || status.weeklyUtilization != 18 { t.Fatalf("expected preserved utilization, got 5h=%v weekly=%v", status.fiveHourUtilization, status.weeklyUtilization) } - if status.unifiedRateLimit.Status != unifiedRateLimitStatusRejected { - t.Fatalf("expected rejected unified status, got %q", status.unifiedRateLimit.Status) - } if status.availability.State != availabilityStateRateLimited { t.Fatalf("expected rate-limited availability, got %#v", status.availability) } } -func TestRewriteResponseHeadersIncludesUnifiedHeaders(t *testing.T) { +func TestRewriteResponseHeadersComputesUnifiedStatus(t *testing.T) { t.Parallel() reset := time.Now().Add(80 * time.Minute) @@ -147,6 +141,73 @@ func TestRewriteResponseHeadersIncludesUnifiedHeaders(t *testing.T) { } } +func TestRewriteResponseHeadersStripsUpstreamHeaders(t *testing.T) { + t.Parallel() + + service := &Service{} + headers := make(http.Header) + headers.Set("anthropic-ratelimit-unified-overage-status", "rejected") + headers.Set("anthropic-ratelimit-unified-overage-disabled-reason", "org_level_disabled") + headers.Set("anthropic-ratelimit-unified-fallback", "available") + service.rewriteResponseHeaders(headers, &testProvider{credentials: []Credential{ + &testCredential{ + tag: "a", + available: true, + usable: true, + hasData: true, + fiveHour: 10, + weekly: 5, + fiveHourCapV: 100, + weeklyCapV: 100, + weight: 1, + fiveReset: time.Now().Add(3 * time.Hour), + weeklyReset: time.Now().Add(5 * 24 * time.Hour), + availability: availabilityStatus{State: availabilityStateUsable}, + }, + }}, nil) + + if headers.Get("anthropic-ratelimit-unified-overage-status") != "" { + t.Fatalf("expected overage-status stripped, got %q", headers.Get("anthropic-ratelimit-unified-overage-status")) + } + if headers.Get("anthropic-ratelimit-unified-overage-disabled-reason") != "" { + t.Fatalf("expected overage-disabled-reason stripped, got %q", headers.Get("anthropic-ratelimit-unified-overage-disabled-reason")) + } + if headers.Get("anthropic-ratelimit-unified-fallback") != "" { + t.Fatalf("expected fallback stripped, got %q", headers.Get("anthropic-ratelimit-unified-fallback")) + } + if headers.Get("anthropic-ratelimit-unified-status") != "allowed" { + t.Fatalf("expected allowed status, got %q", headers.Get("anthropic-ratelimit-unified-status")) + } +} + +func TestRewriteResponseHeadersRejectedOnHardRateLimit(t *testing.T) { + t.Parallel() + + reset := time.Now().Add(10 * time.Minute) + service := &Service{} + headers := make(http.Header) + service.rewriteResponseHeaders(headers, &testProvider{credentials: []Credential{ + &testCredential{ + tag: "a", + available: true, + usable: false, + hasData: true, + fiveHour: 50, + weekly: 20, + fiveHourCapV: 100, + weeklyCapV: 100, + weight: 1, + fiveReset: reset, + weeklyReset: time.Now().Add(5 * 24 * time.Hour), + availability: availabilityStatus{State: availabilityStateRateLimited, Reason: availabilityReasonHardRateLimit, ResetAt: reset}, + }, + }}, nil) + + if headers.Get("anthropic-ratelimit-unified-status") != "rejected" { + t.Fatalf("expected rejected (hard rate limited), got %q", headers.Get("anthropic-ratelimit-unified-status")) + } +} + func TestWriteCredentialUnavailableErrorReturns429ForRateLimitedCredentials(t *testing.T) { t.Parallel() diff --git a/service/ocm/credential.go b/service/ocm/credential.go index 2e2589366..6070f1a8e 100644 --- a/service/ocm/credential.go +++ b/service/ocm/credential.go @@ -120,8 +120,6 @@ type Credential interface { markUpstreamRejected() markTemporarilyBlocked(reason availabilityReason, resetAt time.Time) availabilityStatus() availabilityStatus - rateLimitSnapshots() []rateLimitSnapshot - activeLimitID() string earliestReset() time.Time unavailableError() error diff --git a/service/ocm/credential_default.go b/service/ocm/credential_default.go index 977a545e6..56ffbba20 100644 --- a/service/ocm/credential_default.go +++ b/service/ocm/credential_default.go @@ -544,26 +544,6 @@ func (c *defaultCredential) availabilityStatus() availabilityStatus { return c.state.currentAvailability() } -func (c *defaultCredential) rateLimitSnapshots() []rateLimitSnapshot { - c.stateAccess.RLock() - defer c.stateAccess.RUnlock() - if len(c.state.rateLimitSnapshots) == 0 { - return nil - } - snapshots := make([]rateLimitSnapshot, 0, len(c.state.rateLimitSnapshots)) - for _, snapshot := range c.state.rateLimitSnapshots { - snapshots = append(snapshots, cloneRateLimitSnapshot(snapshot)) - } - sortRateLimitSnapshots(snapshots) - return snapshots -} - -func (c *defaultCredential) activeLimitID() string { - c.stateAccess.RLock() - defer c.stateAccess.RUnlock() - return c.state.activeLimitID -} - func (c *defaultCredential) unavailableError() error { c.stateAccess.RLock() defer c.stateAccess.RUnlock() diff --git a/service/ocm/credential_external.go b/service/ocm/credential_external.go index 67b9b2a1b..222a22e94 100644 --- a/service/ocm/credential_external.go +++ b/service/ocm/credential_external.go @@ -702,38 +702,16 @@ func (c *externalCredential) pollUsage() { oldWeekly := c.state.weeklyUtilization c.state.consecutivePollFailures = 0 c.state.upstreamRejectedUntil = time.Time{} - if len(statusResponse.Limits) > 0 { - applyRateLimitSnapshotsLocked(&c.state, statusResponse.Limits, statusResponse.ActiveLimit, statusResponse.PlanWeight, c.state.accountType) - } else { - c.state.fiveHourUtilization = statusResponse.FiveHourUtilization - c.state.weeklyUtilization = statusResponse.WeeklyUtilization - if statusResponse.FiveHourReset > 0 { - c.state.fiveHourReset = time.Unix(statusResponse.FiveHourReset, 0) - } - if statusResponse.WeeklyReset > 0 { - c.state.weeklyReset = time.Unix(statusResponse.WeeklyReset, 0) - } - if statusResponse.PlanWeight > 0 { - c.state.remotePlanWeight = statusResponse.PlanWeight - } + c.state.fiveHourUtilization = statusResponse.FiveHourUtilization + c.state.weeklyUtilization = statusResponse.WeeklyUtilization + if statusResponse.FiveHourReset > 0 { + c.state.fiveHourReset = time.Unix(statusResponse.FiveHourReset, 0) } - if statusResponse.Availability != nil { - switch availabilityState(statusResponse.Availability.State) { - case availabilityStateRateLimited: - c.state.hardRateLimited = true - if statusResponse.Availability.ResetAt > 0 { - c.state.rateLimitResetAt = time.Unix(statusResponse.Availability.ResetAt, 0) - } - case availabilityStateTemporarilyBlocked: - resetAt := time.Time{} - if statusResponse.Availability.ResetAt > 0 { - resetAt = time.Unix(statusResponse.Availability.ResetAt, 0) - } - c.state.setAvailability(availabilityStateTemporarilyBlocked, availabilityReason(statusResponse.Availability.Reason), resetAt) - if availabilityReason(statusResponse.Availability.Reason) == availabilityReasonUpstreamRejected && !resetAt.IsZero() { - c.state.upstreamRejectedUntil = resetAt - } - } + if statusResponse.WeeklyReset > 0 { + c.state.weeklyReset = time.Unix(statusResponse.WeeklyReset, 0) + } + if statusResponse.PlanWeight > 0 { + c.state.remotePlanWeight = statusResponse.PlanWeight } if c.state.hardRateLimited && time.Now().After(c.state.rateLimitResetAt) { c.state.hardRateLimited = false @@ -833,38 +811,16 @@ func (c *externalCredential) connectStatusStream(ctx context.Context) (statusStr oldWeekly := c.state.weeklyUtilization c.state.consecutivePollFailures = 0 c.state.upstreamRejectedUntil = time.Time{} - if len(statusResponse.Limits) > 0 { - applyRateLimitSnapshotsLocked(&c.state, statusResponse.Limits, statusResponse.ActiveLimit, statusResponse.PlanWeight, c.state.accountType) - } else { - c.state.fiveHourUtilization = statusResponse.FiveHourUtilization - c.state.weeklyUtilization = statusResponse.WeeklyUtilization - if statusResponse.FiveHourReset > 0 { - c.state.fiveHourReset = time.Unix(statusResponse.FiveHourReset, 0) - } - if statusResponse.WeeklyReset > 0 { - c.state.weeklyReset = time.Unix(statusResponse.WeeklyReset, 0) - } - if statusResponse.PlanWeight > 0 { - c.state.remotePlanWeight = statusResponse.PlanWeight - } + c.state.fiveHourUtilization = statusResponse.FiveHourUtilization + c.state.weeklyUtilization = statusResponse.WeeklyUtilization + if statusResponse.FiveHourReset > 0 { + c.state.fiveHourReset = time.Unix(statusResponse.FiveHourReset, 0) } - if statusResponse.Availability != nil { - switch availabilityState(statusResponse.Availability.State) { - case availabilityStateRateLimited: - c.state.hardRateLimited = true - if statusResponse.Availability.ResetAt > 0 { - c.state.rateLimitResetAt = time.Unix(statusResponse.Availability.ResetAt, 0) - } - case availabilityStateTemporarilyBlocked: - resetAt := time.Time{} - if statusResponse.Availability.ResetAt > 0 { - resetAt = time.Unix(statusResponse.Availability.ResetAt, 0) - } - c.state.setAvailability(availabilityStateTemporarilyBlocked, availabilityReason(statusResponse.Availability.Reason), resetAt) - if availabilityReason(statusResponse.Availability.Reason) == availabilityReasonUpstreamRejected && !resetAt.IsZero() { - c.state.upstreamRejectedUntil = resetAt - } - } + if statusResponse.WeeklyReset > 0 { + c.state.weeklyReset = time.Unix(statusResponse.WeeklyReset, 0) + } + if statusResponse.PlanWeight > 0 { + c.state.remotePlanWeight = statusResponse.PlanWeight } if c.state.hardRateLimited && time.Now().After(c.state.rateLimitResetAt) { c.state.hardRateLimited = false @@ -949,25 +905,6 @@ func (c *externalCredential) availabilityStatus() availabilityStatus { return c.state.currentAvailability() } -func (c *externalCredential) rateLimitSnapshots() []rateLimitSnapshot { - c.stateAccess.RLock() - defer c.stateAccess.RUnlock() - if len(c.state.rateLimitSnapshots) == 0 { - return nil - } - snapshots := make([]rateLimitSnapshot, 0, len(c.state.rateLimitSnapshots)) - for _, snapshot := range c.state.rateLimitSnapshots { - snapshots = append(snapshots, cloneRateLimitSnapshot(snapshot)) - } - sortRateLimitSnapshots(snapshots) - return snapshots -} - -func (c *externalCredential) activeLimitID() string { - c.stateAccess.RLock() - defer c.stateAccess.RUnlock() - return c.state.activeLimitID -} func (c *externalCredential) markUsageStreamUpdated() { c.stateAccess.Lock() diff --git a/service/ocm/rate_limit_state.go b/service/ocm/rate_limit_state.go index f0e4f34b1..82a01f5a7 100644 --- a/service/ocm/rate_limit_state.go +++ b/service/ocm/rate_limit_state.go @@ -35,12 +35,6 @@ type availabilityStatus struct { ResetAt time.Time } -type availabilityPayload struct { - State string `json:"state"` - Reason string `json:"reason,omitempty"` - ResetAt int64 `json:"reset_at,omitempty"` -} - func (s availabilityStatus) normalized() availabilityStatus { if s.State == "" { s.State = availabilityStateUnknown @@ -51,20 +45,6 @@ func (s availabilityStatus) normalized() availabilityStatus { return s } -func (s availabilityStatus) toPayload() *availabilityPayload { - s = s.normalized() - payload := &availabilityPayload{ - State: string(s.State), - } - if s.Reason != "" && s.Reason != availabilityReasonUnknown { - payload.Reason = string(s.Reason) - } - if !s.ResetAt.IsZero() { - payload.ResetAt = s.ResetAt.Unix() - } - return payload -} - type creditsSnapshot struct { HasCredits bool `json:"has_credits"` Unlimited bool `json:"unlimited"` diff --git a/service/ocm/service_status.go b/service/ocm/service_status.go index 92959e5e8..c6897b92f 100644 --- a/service/ocm/service_status.go +++ b/service/ocm/service_status.go @@ -5,7 +5,6 @@ import ( "encoding/json" "net/http" "reflect" - "slices" "strconv" "strings" "time" @@ -14,14 +13,11 @@ import ( ) type statusPayload struct { - FiveHourUtilization float64 `json:"five_hour_utilization"` - FiveHourReset int64 `json:"five_hour_reset"` - WeeklyUtilization float64 `json:"weekly_utilization"` - WeeklyReset int64 `json:"weekly_reset"` - PlanWeight float64 `json:"plan_weight"` - ActiveLimit string `json:"active_limit,omitempty"` - Limits []rateLimitSnapshot `json:"limits,omitempty"` - Availability *availabilityPayload `json:"availability,omitempty"` + FiveHourUtilization float64 `json:"five_hour_utilization"` + FiveHourReset int64 `json:"five_hour_reset"` + WeeklyUtilization float64 `json:"weekly_utilization"` + WeeklyReset int64 `json:"weekly_reset"` + PlanWeight float64 `json:"plan_weight"` } type aggregatedStatus struct { @@ -30,8 +26,6 @@ type aggregatedStatus struct { totalWeight float64 fiveHourReset time.Time weeklyReset time.Time - activeLimitID string - limits []rateLimitSnapshot availability availabilityStatus } @@ -53,24 +47,13 @@ func (s aggregatedStatus) toPayload() statusPayload { WeeklyUtilization: s.weeklyUtilization, WeeklyReset: resetToEpoch(s.weeklyReset), PlanWeight: s.totalWeight, - ActiveLimit: s.activeLimitID, - Limits: slices.Clone(s.limits), - Availability: s.availability.toPayload(), } } type aggregateInput struct { - weight float64 - snapshots []rateLimitSnapshot - activeLimit string availability availabilityStatus } -type snapshotContribution struct { - weight float64 - snapshot rateLimitSnapshot -} - func aggregateAvailability(inputs []aggregateInput) availabilityStatus { if len(inputs) == 0 { return availabilityStatus{ @@ -139,167 +122,6 @@ func aggregateAvailability(inputs []aggregateInput) availabilityStatus { } } -func aggregateRateLimitWindow(contributions []snapshotContribution, selector func(rateLimitSnapshot) *rateLimitWindow) *rateLimitWindow { - var totalWeight float64 - var totalRemaining float64 - var totalWindowMinutes float64 - var totalResetHours float64 - var resetWeight float64 - now := time.Now() - for _, contribution := range contributions { - window := selector(contribution.snapshot) - if window == nil { - continue - } - totalWeight += contribution.weight - totalRemaining += (100 - window.UsedPercent) * contribution.weight - if window.WindowMinutes > 0 { - totalWindowMinutes += float64(window.WindowMinutes) * contribution.weight - } - if window.ResetAt > 0 { - resetTime := time.Unix(window.ResetAt, 0) - hours := resetTime.Sub(now).Hours() - if hours > 0 { - totalResetHours += hours * contribution.weight - resetWeight += contribution.weight - } - } - } - if totalWeight == 0 { - return nil - } - window := &rateLimitWindow{ - UsedPercent: 100 - totalRemaining/totalWeight, - } - if totalWindowMinutes > 0 { - window.WindowMinutes = int64(totalWindowMinutes / totalWeight) - } - if resetWeight > 0 { - window.ResetAt = now.Add(time.Duration(totalResetHours / resetWeight * float64(time.Hour))).Unix() - } - return window -} - -func aggregateCredits(contributions []snapshotContribution) *creditsSnapshot { - var hasCredits bool - var unlimited bool - var balanceTotal float64 - var hasBalance bool - for _, contribution := range contributions { - if contribution.snapshot.Credits == nil { - continue - } - hasCredits = hasCredits || contribution.snapshot.Credits.HasCredits - unlimited = unlimited || contribution.snapshot.Credits.Unlimited - if balance := strings.TrimSpace(contribution.snapshot.Credits.Balance); balance != "" { - value, err := strconv.ParseFloat(balance, 64) - if err == nil { - balanceTotal += value - hasBalance = true - } - } - } - if !hasCredits && !unlimited && !hasBalance { - return nil - } - credits := &creditsSnapshot{ - HasCredits: hasCredits, - Unlimited: unlimited, - } - if hasBalance && !unlimited { - credits.Balance = strconv.FormatFloat(balanceTotal, 'f', -1, 64) - } - return credits -} - -func aggregateSnapshots(inputs []aggregateInput) []rateLimitSnapshot { - grouped := make(map[string][]snapshotContribution) - for _, input := range inputs { - for _, snapshot := range input.snapshots { - limitID := snapshot.LimitID - if limitID == "" { - limitID = "codex" - } - grouped[limitID] = append(grouped[limitID], snapshotContribution{ - weight: input.weight, - snapshot: snapshot, - }) - } - } - if len(grouped) == 0 { - return nil - } - aggregated := make([]rateLimitSnapshot, 0, len(grouped)) - for limitID, contributions := range grouped { - snapshot := defaultRateLimitSnapshot(limitID) - var bestPlanWeight float64 - for _, contribution := range contributions { - if contribution.snapshot.LimitName != "" && snapshot.LimitName == "" { - snapshot.LimitName = contribution.snapshot.LimitName - } - if contribution.snapshot.PlanType != "" && contribution.weight >= bestPlanWeight { - bestPlanWeight = contribution.weight - snapshot.PlanType = contribution.snapshot.PlanType - } - } - snapshot.Primary = aggregateRateLimitWindow(contributions, func(snapshot rateLimitSnapshot) *rateLimitWindow { - return snapshot.Primary - }) - snapshot.Secondary = aggregateRateLimitWindow(contributions, func(snapshot rateLimitSnapshot) *rateLimitWindow { - return snapshot.Secondary - }) - snapshot.Credits = aggregateCredits(contributions) - if snapshot.Primary == nil && snapshot.Secondary == nil && snapshot.Credits == nil { - continue - } - aggregated = append(aggregated, snapshot) - } - sortRateLimitSnapshots(aggregated) - return aggregated -} - -func selectActiveLimitID(inputs []aggregateInput, snapshots []rateLimitSnapshot) string { - if len(snapshots) == 0 { - return "" - } - weights := make(map[string]float64) - for _, input := range inputs { - if input.activeLimit == "" { - continue - } - weights[normalizeStoredLimitID(input.activeLimit)] += input.weight - } - var ( - bestID string - bestWeight float64 - ) - for limitID, weight := range weights { - if weight > bestWeight { - bestID = limitID - bestWeight = weight - } - } - if bestID != "" { - return bestID - } - for _, snapshot := range snapshots { - if snapshot.LimitID == "codex" { - return "codex" - } - } - return snapshots[0].LimitID -} - -func findSnapshotByLimitID(snapshots []rateLimitSnapshot, limitID string) *rateLimitSnapshot { - for _, snapshot := range snapshots { - if snapshot.LimitID == limitID { - snapshotCopy := snapshot - return &snapshotCopy - } - } - return nil -} - func (s *Service) handleStatusEndpoint(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { writeJSONError(w, r, http.StatusMethodNotAllowed, "invalid_request_error", "method not allowed") @@ -420,7 +242,10 @@ func (s *Service) handleStatusStream(w http.ResponseWriter, r *http.Request, pro func (s *Service) computeAggregatedUtilization(provider credentialProvider, userConfig *option.OCMUser) aggregatedStatus { inputs := make([]aggregateInput, 0, len(provider.allCredentials())) - var totalWeight float64 + var totalWeightedRemaining5h, totalWeightedRemainingWeekly, totalWeight float64 + now := time.Now() + var totalWeightedHoursUntil5hReset, total5hResetWeight float64 + var totalWeightedHoursUntilWeeklyReset, totalWeeklyResetWeight float64 var hasSnapshotData bool for _, credential := range provider.allCredentials() { if userConfig != nil && userConfig.ExternalCredential != "" && credential.tagName() == userConfig.ExternalCredential { @@ -429,61 +254,70 @@ func (s *Service) computeAggregatedUtilization(provider credentialProvider, user if userConfig != nil && !userConfig.AllowExternalUsage && credential.isExternal() { continue } - input := aggregateInput{ - weight: credential.planWeight(), - snapshots: credential.rateLimitSnapshots(), - activeLimit: credential.activeLimitID(), + inputs = append(inputs, aggregateInput{ availability: credential.availabilityStatus(), + }) + if !credential.hasSnapshotData() { + continue } - inputs = append(inputs, input) - if credential.hasSnapshotData() { - hasSnapshotData = true + hasSnapshotData = true + weight := credential.planWeight() + remaining5h := credential.fiveHourCap() - credential.fiveHourUtilization() + if remaining5h < 0 { + remaining5h = 0 + } + remainingWeekly := credential.weeklyCap() - credential.weeklyUtilization() + if remainingWeekly < 0 { + remainingWeekly = 0 + } + totalWeightedRemaining5h += remaining5h * weight + totalWeightedRemainingWeekly += remainingWeekly * weight + totalWeight += weight + + fiveHourReset := credential.fiveHourResetTime() + if !fiveHourReset.IsZero() { + hours := fiveHourReset.Sub(now).Hours() + if hours > 0 { + totalWeightedHoursUntil5hReset += hours * weight + total5hResetWeight += weight + } + } + weeklyReset := credential.weeklyResetTime() + if !weeklyReset.IsZero() { + hours := weeklyReset.Sub(now).Hours() + if hours > 0 { + totalWeightedHoursUntilWeeklyReset += hours * weight + totalWeeklyResetWeight += weight + } } - totalWeight += input.weight } - limits := aggregateSnapshots(inputs) + availability := aggregateAvailability(inputs) + if totalWeight == 0 { + result := aggregatedStatus{availability: availability} + if !hasSnapshotData { + result.fiveHourUtilization = 100 + result.weeklyUtilization = 100 + } + return result + } result := aggregatedStatus{ - totalWeight: totalWeight, - availability: aggregateAvailability(inputs), - limits: limits, - activeLimitID: selectActiveLimitID(inputs, limits), + fiveHourUtilization: 100 - totalWeightedRemaining5h/totalWeight, + weeklyUtilization: 100 - totalWeightedRemainingWeekly/totalWeight, + totalWeight: totalWeight, + availability: availability, } - if legacy := findSnapshotByLimitID(result.limits, "codex"); legacy != nil { - if legacy.Primary != nil { - result.fiveHourUtilization = legacy.Primary.UsedPercent - if legacy.Primary.ResetAt > 0 { - result.fiveHourReset = time.Unix(legacy.Primary.ResetAt, 0) - } - } - if legacy.Secondary != nil { - result.weeklyUtilization = legacy.Secondary.UsedPercent - if legacy.Secondary.ResetAt > 0 { - result.weeklyReset = time.Unix(legacy.Secondary.ResetAt, 0) - } - } - } else if legacy := findSnapshotByLimitID(result.limits, result.activeLimitID); legacy != nil { - if legacy.Primary != nil { - result.fiveHourUtilization = legacy.Primary.UsedPercent - if legacy.Primary.ResetAt > 0 { - result.fiveHourReset = time.Unix(legacy.Primary.ResetAt, 0) - } - } - if legacy.Secondary != nil { - result.weeklyUtilization = legacy.Secondary.UsedPercent - if legacy.Secondary.ResetAt > 0 { - result.weeklyReset = time.Unix(legacy.Secondary.ResetAt, 0) - } - } + if total5hResetWeight > 0 { + avgHours := totalWeightedHoursUntil5hReset / total5hResetWeight + result.fiveHourReset = now.Add(time.Duration(avgHours * float64(time.Hour))) } - if len(result.limits) == 0 && !hasSnapshotData { - result.fiveHourUtilization = 100 - result.weeklyUtilization = 100 + if totalWeeklyResetWeight > 0 { + avgHours := totalWeightedHoursUntilWeeklyReset / totalWeeklyResetWeight + result.weeklyReset = now.Add(time.Duration(avgHours * float64(time.Hour))) } return result } func (s *Service) rewriteResponseHeaders(headers http.Header, provider credentialProvider, userConfig *option.OCMUser) { - status := s.computeAggregatedUtilization(provider, userConfig) for key := range headers { lowerKey := strings.ToLower(key) if lowerKey == "x-codex-active-limit" || @@ -498,51 +332,16 @@ func (s *Service) rewriteResponseHeaders(headers http.Header, provider credentia headers.Del(key) } } - headers.Set("x-codex-active-limit", headerLimitID(status.activeLimitID)) + status := s.computeAggregatedUtilization(provider, userConfig) headers.Set("x-codex-primary-used-percent", strconv.FormatFloat(status.fiveHourUtilization, 'f', 2, 64)) headers.Set("x-codex-secondary-used-percent", strconv.FormatFloat(status.weeklyUtilization, 'f', 2, 64)) if !status.fiveHourReset.IsZero() { headers.Set("x-codex-primary-reset-at", strconv.FormatInt(status.fiveHourReset.Unix(), 10)) - } else { - headers.Del("x-codex-primary-reset-at") } if !status.weeklyReset.IsZero() { headers.Set("x-codex-secondary-reset-at", strconv.FormatInt(status.weeklyReset.Unix(), 10)) - } else { - headers.Del("x-codex-secondary-reset-at") } if status.totalWeight > 0 { headers.Set("X-OCM-Plan-Weight", strconv.FormatFloat(status.totalWeight, 'f', -1, 64)) } - for _, snapshot := range status.limits { - prefix := "x-" + headerLimitID(snapshot.LimitID) - if snapshot.Primary != nil { - headers.Set(prefix+"-primary-used-percent", strconv.FormatFloat(snapshot.Primary.UsedPercent, 'f', 2, 64)) - if snapshot.Primary.WindowMinutes > 0 { - headers.Set(prefix+"-primary-window-minutes", strconv.FormatInt(snapshot.Primary.WindowMinutes, 10)) - } - if snapshot.Primary.ResetAt > 0 { - headers.Set(prefix+"-primary-reset-at", strconv.FormatInt(snapshot.Primary.ResetAt, 10)) - } - } - if snapshot.Secondary != nil { - headers.Set(prefix+"-secondary-used-percent", strconv.FormatFloat(snapshot.Secondary.UsedPercent, 'f', 2, 64)) - if snapshot.Secondary.WindowMinutes > 0 { - headers.Set(prefix+"-secondary-window-minutes", strconv.FormatInt(snapshot.Secondary.WindowMinutes, 10)) - } - if snapshot.Secondary.ResetAt > 0 { - headers.Set(prefix+"-secondary-reset-at", strconv.FormatInt(snapshot.Secondary.ResetAt, 10)) - } - } - if snapshot.LimitName != "" { - headers.Set(prefix+"-limit-name", snapshot.LimitName) - } - if snapshot.LimitID == "codex" && snapshot.Credits != nil { - headers.Set("x-codex-credits-has-credits", strconv.FormatBool(snapshot.Credits.HasCredits)) - headers.Set("x-codex-credits-unlimited", strconv.FormatBool(snapshot.Credits.Unlimited)) - if snapshot.Credits.Balance != "" { - headers.Set("x-codex-credits-balance", snapshot.Credits.Balance) - } - } - } } diff --git a/service/ocm/service_status_test.go b/service/ocm/service_status_test.go index ba7e9324a..c3187d539 100644 --- a/service/ocm/service_status_test.go +++ b/service/ocm/service_status_test.go @@ -26,8 +26,6 @@ type testCredential struct { fiveReset time.Time weeklyReset time.Time availability availabilityStatus - activeLimit string - snapshots []rateLimitSnapshot } func (c *testCredential) tagName() string { return c.tag } @@ -48,10 +46,6 @@ func (c *testCredential) markTemporarilyBlocked(reason availabilityReason, reset c.availability = availabilityStatus{State: availabilityStateTemporarilyBlocked, Reason: reason, ResetAt: resetAt} } func (c *testCredential) availabilityStatus() availabilityStatus { return c.availability } -func (c *testCredential) rateLimitSnapshots() []rateLimitSnapshot { - return slicesCloneSnapshots(c.snapshots) -} -func (c *testCredential) activeLimitID() string { return c.activeLimit } func (c *testCredential) earliestReset() time.Time { return c.fiveReset } func (c *testCredential) unavailableError() error { return nil } func (c *testCredential) getAccessToken() (string, error) { return "", nil } @@ -75,17 +69,6 @@ func (c *testCredential) ocmIsAPIKeyMode() bool func (c *testCredential) ocmGetAccountID() string { return "" } func (c *testCredential) ocmGetBaseURL() string { return "" } -func slicesCloneSnapshots(snapshots []rateLimitSnapshot) []rateLimitSnapshot { - if len(snapshots) == 0 { - return nil - } - cloned := make([]rateLimitSnapshot, 0, len(snapshots)) - for _, snapshot := range snapshots { - cloned = append(cloned, cloneRateLimitSnapshot(snapshot)) - } - return cloned -} - type testProvider struct { credentials []Credential } @@ -104,78 +87,6 @@ func (p *testProvider) pollCredentialIfStale(Credential) {} func (p *testProvider) allCredentials() []Credential { return p.credentials } func (p *testProvider) close() {} -func TestComputeAggregatedUtilizationPreservesStoredSnapshots(t *testing.T) { - t.Parallel() - - service := &Service{} - status := service.computeAggregatedUtilization(&testProvider{credentials: []Credential{ - &testCredential{ - tag: "a", - available: true, - usable: false, - hasData: true, - weight: 1, - activeLimit: "codex", - availability: availabilityStatus{State: availabilityStateRateLimited, Reason: availabilityReasonHardRateLimit, ResetAt: time.Now().Add(time.Minute)}, - snapshots: []rateLimitSnapshot{ - { - LimitID: "codex", - Primary: &rateLimitWindow{UsedPercent: 44, WindowMinutes: 300, ResetAt: time.Now().Add(time.Hour).Unix()}, - Secondary: &rateLimitWindow{UsedPercent: 12, WindowMinutes: 10080, ResetAt: time.Now().Add(24 * time.Hour).Unix()}, - }, - }, - }, - }}, nil) - - if status.fiveHourUtilization != 44 || status.weeklyUtilization != 12 { - t.Fatalf("expected stored snapshot utilization, got 5h=%v weekly=%v", status.fiveHourUtilization, status.weeklyUtilization) - } - if status.availability.State != availabilityStateRateLimited { - t.Fatalf("expected rate-limited availability, got %#v", status.availability) - } -} - -func TestRewriteResponseHeadersIncludesAdditionalLimitFamiliesAndCredits(t *testing.T) { - t.Parallel() - - service := &Service{} - headers := make(http.Header) - service.rewriteResponseHeaders(headers, &testProvider{credentials: []Credential{ - &testCredential{ - tag: "a", - available: true, - usable: true, - hasData: true, - weight: 1, - activeLimit: "codex_other", - availability: availabilityStatus{State: availabilityStateUsable}, - snapshots: []rateLimitSnapshot{ - { - LimitID: "codex", - Primary: &rateLimitWindow{UsedPercent: 20, WindowMinutes: 300, ResetAt: time.Now().Add(time.Hour).Unix()}, - Secondary: &rateLimitWindow{UsedPercent: 40, WindowMinutes: 10080, ResetAt: time.Now().Add(24 * time.Hour).Unix()}, - Credits: &creditsSnapshot{HasCredits: true, Unlimited: false, Balance: "12"}, - }, - { - LimitID: "codex_other", - LimitName: "codex-other", - Primary: &rateLimitWindow{UsedPercent: 60, WindowMinutes: 60, ResetAt: time.Now().Add(30 * time.Minute).Unix()}, - }, - }, - }, - }}, nil) - - if headers.Get("x-codex-active-limit") != "codex-other" { - t.Fatalf("expected active limit header, got %q", headers.Get("x-codex-active-limit")) - } - if headers.Get("x-codex-other-primary-used-percent") == "" { - t.Fatal("expected additional rate-limit family header") - } - if headers.Get("x-codex-credits-balance") != "12" { - t.Fatalf("expected credits balance header, got %q", headers.Get("x-codex-credits-balance")) - } -} - func TestHandleWebSocketErrorEventConnectionLimitDoesNotUseRateLimitPath(t *testing.T) { t.Parallel() @@ -201,7 +112,6 @@ func TestWriteCredentialUnavailableErrorReturns429ForRateLimitedCredentials(t *t hasData: true, weight: 1, availability: availabilityStatus{State: availabilityStateRateLimited, Reason: availabilityReasonHardRateLimit, ResetAt: time.Now().Add(time.Minute)}, - snapshots: []rateLimitSnapshot{{LimitID: "codex", Primary: &rateLimitWindow{UsedPercent: 80}}}, }, }} diff --git a/service/ocm/service_websocket.go b/service/ocm/service_websocket.go index 4c552cbf5..6691a49af 100644 --- a/service/ocm/service_websocket.go +++ b/service/ocm/service_websocket.go @@ -333,7 +333,7 @@ func (s *Service) handleWebSocket( var firstRealRequestOnce sync.Once var waitGroup sync.WaitGroup - waitGroup.Add(3) + waitGroup.Add(2) go func() { defer waitGroup.Done() defer session.Close() @@ -344,11 +344,6 @@ func (s *Service) handleWebSocket( defer session.Close() s.proxyWebSocketUpstreamToClient(ctx, upstreamReadWriter, clientConn, &clientWriteAccess, selectedCredential, modelChannel, username, weeklyCycleHint) }() - go func() { - defer waitGroup.Done() - defer session.Close() - s.pushWebSocketAggregatedStatus(ctx, clientConn, &clientWriteAccess, session.closed, firstRealRequest, provider, userConfig) - }() waitGroup.Wait() } @@ -552,171 +547,6 @@ func (s *Service) handleWebSocketErrorEvent(data []byte, selectedCredential Cred selectedCredential.markRateLimited(resetAt) } -func writeWebSocketAggregatedStatus(clientConn net.Conn, clientWriteAccess *sync.Mutex, status aggregatedStatus) error { - clientWriteAccess.Lock() - defer clientWriteAccess.Unlock() - for _, data := range buildSyntheticRateLimitsEvents(status) { - if err := wsutil.WriteServerMessage(clientConn, ws.OpText, data); err != nil { - return err - } - } - return nil -} - -func (s *Service) pushWebSocketAggregatedStatus(ctx context.Context, clientConn net.Conn, clientWriteAccess *sync.Mutex, sessionClosed <-chan struct{}, firstRealRequest <-chan struct{}, provider credentialProvider, userConfig *option.OCMUser) { - subscription, done, err := s.statusObserver.Subscribe() - if err != nil { - return - } - defer s.statusObserver.UnSubscribe(subscription) - - var last aggregatedStatus - hasLast := false - - for { - select { - case <-ctx.Done(): - return - case <-done: - return - case <-sessionClosed: - return - case <-firstRealRequest: - current := s.computeAggregatedUtilization(provider, userConfig) - err = writeWebSocketAggregatedStatus(clientConn, clientWriteAccess, current) - if err != nil { - return - } - last = current - hasLast = true - firstRealRequest = nil - case <-subscription: - for { - select { - case <-subscription: - default: - goto drained - } - } - drained: - if !hasLast { - continue - } - current := s.computeAggregatedUtilization(provider, userConfig) - if current.equal(last) { - continue - } - last = current - err = writeWebSocketAggregatedStatus(clientConn, clientWriteAccess, current) - if err != nil { - return - } - } - } -} - -func buildSyntheticRateLimitsEvents(status aggregatedStatus) [][]byte { - type rateLimitWindow struct { - UsedPercent float64 `json:"used_percent"` - WindowMinutes int64 `json:"window_minutes,omitempty"` - ResetAt int64 `json:"reset_at,omitempty"` - } - type creditsEvent struct { - HasCredits bool `json:"has_credits"` - Unlimited bool `json:"unlimited"` - Balance string `json:"balance,omitempty"` - } - type eventPayload struct { - Type string `json:"type"` - RateLimits struct { - Primary *rateLimitWindow `json:"primary,omitempty"` - Secondary *rateLimitWindow `json:"secondary,omitempty"` - } `json:"rate_limits"` - MeteredLimitName string `json:"metered_limit_name,omitempty"` - LimitName string `json:"limit_name,omitempty"` - Credits *creditsEvent `json:"credits,omitempty"` - PlanWeight float64 `json:"plan_weight,omitempty"` - } - buildEvent := func(snapshot rateLimitSnapshot, primary *rateLimitWindow, secondary *rateLimitWindow) []byte { - event := eventPayload{ - Type: "codex.rate_limits", - MeteredLimitName: snapshot.LimitID, - LimitName: snapshot.LimitName, - PlanWeight: status.totalWeight, - } - if event.MeteredLimitName == "" { - event.MeteredLimitName = "codex" - } - if event.LimitName == "" { - event.LimitName = strings.ReplaceAll(event.MeteredLimitName, "_", "-") - } - event.RateLimits.Primary = primary - event.RateLimits.Secondary = secondary - if snapshot.Credits != nil { - event.Credits = &creditsEvent{ - HasCredits: snapshot.Credits.HasCredits, - Unlimited: snapshot.Credits.Unlimited, - Balance: snapshot.Credits.Balance, - } - } - data, _ := json.Marshal(event) - return data - } - defaultPrimary := &rateLimitWindow{ - UsedPercent: status.fiveHourUtilization, - ResetAt: resetToEpoch(status.fiveHourReset), - } - defaultSecondary := &rateLimitWindow{ - UsedPercent: status.weeklyUtilization, - ResetAt: resetToEpoch(status.weeklyReset), - } - events := make([][]byte, 0, 1+len(status.limits)) - if snapshot := findSnapshotByLimitID(status.limits, "codex"); snapshot != nil { - primary := defaultPrimary - if snapshot.Primary != nil { - primary = &rateLimitWindow{ - UsedPercent: snapshot.Primary.UsedPercent, - WindowMinutes: snapshot.Primary.WindowMinutes, - ResetAt: snapshot.Primary.ResetAt, - } - } - secondary := defaultSecondary - if snapshot.Secondary != nil { - secondary = &rateLimitWindow{ - UsedPercent: snapshot.Secondary.UsedPercent, - WindowMinutes: snapshot.Secondary.WindowMinutes, - ResetAt: snapshot.Secondary.ResetAt, - } - } - events = append(events, buildEvent(*snapshot, primary, secondary)) - } else { - events = append(events, buildEvent(rateLimitSnapshot{LimitID: "codex", LimitName: "codex"}, defaultPrimary, defaultSecondary)) - } - for _, snapshot := range status.limits { - if snapshot.LimitID == "codex" { - continue - } - var primary *rateLimitWindow - if snapshot.Primary != nil { - primary = &rateLimitWindow{ - UsedPercent: snapshot.Primary.UsedPercent, - WindowMinutes: snapshot.Primary.WindowMinutes, - ResetAt: snapshot.Primary.ResetAt, - } - } - var secondary *rateLimitWindow - if snapshot.Secondary != nil { - secondary = &rateLimitWindow{ - UsedPercent: snapshot.Secondary.UsedPercent, - WindowMinutes: snapshot.Secondary.WindowMinutes, - ResetAt: snapshot.Secondary.ResetAt, - } - } - events = append(events, buildEvent(snapshot, primary, secondary)) - } - return events -} - func (s *Service) handleWebSocketResponseCompleted(data []byte, usageTracker *AggregatedUsage, requestModel string, username string, weeklyCycleHint *WeeklyCycleHint) { var streamEvent responses.ResponseStreamEventUnion if json.Unmarshal(data, &streamEvent) != nil {