fix(ccm,ocm): remove upstream rate limit header forwarding, compute locally

Strip all upstream rate limit headers and compute unified-status,
representative-claim, reset times, and surpassed-threshold from
aggregated utilization data. Never expose per-account overage or
fallback information. Remove per-credential unified state storage,
snapshot aggregation, and WebSocket synthetic rate limit events.
This commit is contained in:
世界
2026-03-26 23:42:14 +08:00
parent cd5007ffbb
commit d9c298af1e
13 changed files with 212 additions and 960 deletions

View File

@@ -63,13 +63,6 @@ type credentialState struct {
availabilityReason availabilityReason
availabilityResetAt time.Time
lastKnownDataAt time.Time
unifiedStatus unifiedRateLimitStatus
unifiedResetAt time.Time
representativeClaim string
unifiedFallbackAvailable bool
overageStatus string
overageResetAt time.Time
overageDisabledReason string
accountUUID string
accountType string
rateLimitTier string
@@ -125,7 +118,6 @@ type Credential interface {
markRateLimited(resetAt time.Time)
markUpstreamRejected()
availabilityStatus() availabilityStatus
unifiedRateLimitState() unifiedRateLimitInfo
earliestReset() time.Time
unavailableError() error
@@ -252,17 +244,6 @@ func (s credentialState) currentAvailability() availabilityStatus {
}
}
func (s credentialState) currentUnifiedRateLimit() unifiedRateLimitInfo {
return unifiedRateLimitInfo{
Status: s.unifiedStatus,
ResetAt: s.unifiedResetAt,
RepresentativeClaim: s.representativeClaim,
FallbackAvailable: s.unifiedFallbackAvailable,
OverageStatus: s.overageStatus,
OverageResetAt: s.overageResetAt,
OverageDisabledReason: s.overageDisabledReason,
}.normalized()
}
func parseRateLimitResetFromHeaders(headers http.Header) time.Time {
claim := headers.Get("anthropic-ratelimit-unified-representative-claim")

View File

@@ -629,19 +629,6 @@ func (c *defaultCredential) updateStateFromHeaders(headers http.Header) {
c.state.lastUpdated = time.Now()
c.state.noteSnapshotData()
}
if unifiedStatus := unifiedRateLimitStatus(headers.Get("anthropic-ratelimit-unified-status")); unifiedStatus != "" {
c.state.unifiedStatus = unifiedStatus
}
if value, exists := parseOptionalAnthropicResetHeader(headers, "anthropic-ratelimit-unified-reset"); exists {
c.state.unifiedResetAt = value
}
c.state.representativeClaim = headers.Get("anthropic-ratelimit-unified-representative-claim")
c.state.unifiedFallbackAvailable = headers.Get("anthropic-ratelimit-unified-fallback") == "available"
c.state.overageStatus = headers.Get("anthropic-ratelimit-unified-overage-status")
if value, exists := parseOptionalAnthropicResetHeader(headers, "anthropic-ratelimit-unified-overage-reset"); exists {
c.state.overageResetAt = value
}
c.state.overageDisabledReason = headers.Get("anthropic-ratelimit-unified-overage-disabled-reason")
if isFirstUpdate || int(c.state.fiveHourUtilization*100) != int(oldFiveHour*100) || int(c.state.weeklyUtilization*100) != int(oldWeekly*100) {
resetSuffix := ""
if !c.state.weeklyReset.IsZero() {
@@ -666,8 +653,6 @@ func (c *defaultCredential) markRateLimited(resetAt time.Time) {
c.state.hardRateLimited = true
c.state.rateLimitResetAt = resetAt
c.state.setAvailability(availabilityStateRateLimited, availabilityReasonHardRateLimit, resetAt)
c.state.unifiedStatus = unifiedRateLimitStatusRejected
c.state.unifiedResetAt = resetAt
shouldInterrupt := c.checkTransitionLocked()
c.stateAccess.Unlock()
if shouldInterrupt {
@@ -800,12 +785,6 @@ func (c *defaultCredential) availabilityStatus() availabilityStatus {
return c.state.currentAvailability()
}
func (c *defaultCredential) unifiedRateLimitState() unifiedRateLimitInfo {
c.stateAccess.RLock()
defer c.stateAccess.RUnlock()
return c.state.currentUnifiedRateLimit()
}
func (c *defaultCredential) unavailableError() error {
c.stateAccess.RLock()
defer c.stateAccess.RUnlock()

View File

@@ -337,8 +337,6 @@ func (c *externalCredential) markRateLimited(resetAt time.Time) {
c.state.hardRateLimited = true
c.state.rateLimitResetAt = resetAt
c.state.setAvailability(availabilityStateRateLimited, availabilityReasonHardRateLimit, resetAt)
c.state.unifiedStatus = unifiedRateLimitStatusRejected
c.state.unifiedResetAt = resetAt
shouldInterrupt := c.checkTransitionLocked()
c.stateAccess.Unlock()
if shouldInterrupt {
@@ -492,19 +490,6 @@ func (c *externalCredential) updateStateFromHeaders(headers http.Header) {
c.state.lastUpdated = time.Now()
c.state.noteSnapshotData()
}
if unifiedStatus := unifiedRateLimitStatus(headers.Get("anthropic-ratelimit-unified-status")); unifiedStatus != "" {
c.state.unifiedStatus = unifiedStatus
}
if value, exists := parseOptionalAnthropicResetHeader(headers, "anthropic-ratelimit-unified-reset"); exists {
c.state.unifiedResetAt = value
}
c.state.representativeClaim = headers.Get("anthropic-ratelimit-unified-representative-claim")
c.state.unifiedFallbackAvailable = headers.Get("anthropic-ratelimit-unified-fallback") == "available"
c.state.overageStatus = headers.Get("anthropic-ratelimit-unified-overage-status")
if value, exists := parseOptionalAnthropicResetHeader(headers, "anthropic-ratelimit-unified-overage-reset"); exists {
c.state.overageResetAt = value
}
c.state.overageDisabledReason = headers.Get("anthropic-ratelimit-unified-overage-disabled-reason")
if isFirstUpdate || int(c.state.fiveHourUtilization*100) != int(oldFiveHour*100) || int(c.state.weeklyUtilization*100) != int(oldWeekly*100) {
resetSuffix := ""
if !c.state.weeklyReset.IsZero() {
@@ -662,11 +647,6 @@ func (c *externalCredential) pollUsage() {
c.state.upstreamRejectedUntil = time.Time{}
c.state.fiveHourUtilization = statusResponse.FiveHourUtilization
c.state.weeklyUtilization = statusResponse.WeeklyUtilization
c.state.unifiedStatus = unifiedRateLimitStatus(statusResponse.UnifiedStatus)
c.state.representativeClaim = statusResponse.RepresentativeClaim
c.state.unifiedFallbackAvailable = statusResponse.FallbackAvailable
c.state.overageStatus = statusResponse.OverageStatus
c.state.overageDisabledReason = statusResponse.OverageDisabledReason
if statusResponse.PlanWeight > 0 {
c.state.remotePlanWeight = statusResponse.PlanWeight
}
@@ -676,30 +656,6 @@ func (c *externalCredential) pollUsage() {
if statusResponse.WeeklyReset > 0 {
c.state.weeklyReset = time.Unix(statusResponse.WeeklyReset, 0)
}
if statusResponse.UnifiedReset > 0 {
c.state.unifiedResetAt = time.Unix(statusResponse.UnifiedReset, 0)
}
if statusResponse.OverageReset > 0 {
c.state.overageResetAt = time.Unix(statusResponse.OverageReset, 0)
}
if statusResponse.Availability != nil {
switch availabilityState(statusResponse.Availability.State) {
case availabilityStateRateLimited:
c.state.hardRateLimited = true
if statusResponse.Availability.ResetAt > 0 {
c.state.rateLimitResetAt = time.Unix(statusResponse.Availability.ResetAt, 0)
}
case availabilityStateTemporarilyBlocked:
resetAt := time.Time{}
if statusResponse.Availability.ResetAt > 0 {
resetAt = time.Unix(statusResponse.Availability.ResetAt, 0)
}
c.state.setAvailability(availabilityStateTemporarilyBlocked, availabilityReason(statusResponse.Availability.Reason), resetAt)
if availabilityReason(statusResponse.Availability.Reason) == availabilityReasonUpstreamRejected && !resetAt.IsZero() {
c.state.upstreamRejectedUntil = resetAt
}
}
}
if c.state.hardRateLimited && time.Now().After(c.state.rateLimitResetAt) {
c.state.hardRateLimited = false
}
@@ -800,11 +756,6 @@ func (c *externalCredential) connectStatusStream(ctx context.Context) (statusStr
c.state.upstreamRejectedUntil = time.Time{}
c.state.fiveHourUtilization = statusResponse.FiveHourUtilization
c.state.weeklyUtilization = statusResponse.WeeklyUtilization
c.state.unifiedStatus = unifiedRateLimitStatus(statusResponse.UnifiedStatus)
c.state.representativeClaim = statusResponse.RepresentativeClaim
c.state.unifiedFallbackAvailable = statusResponse.FallbackAvailable
c.state.overageStatus = statusResponse.OverageStatus
c.state.overageDisabledReason = statusResponse.OverageDisabledReason
if statusResponse.PlanWeight > 0 {
c.state.remotePlanWeight = statusResponse.PlanWeight
}
@@ -814,30 +765,6 @@ func (c *externalCredential) connectStatusStream(ctx context.Context) (statusStr
if statusResponse.WeeklyReset > 0 {
c.state.weeklyReset = time.Unix(statusResponse.WeeklyReset, 0)
}
if statusResponse.UnifiedReset > 0 {
c.state.unifiedResetAt = time.Unix(statusResponse.UnifiedReset, 0)
}
if statusResponse.OverageReset > 0 {
c.state.overageResetAt = time.Unix(statusResponse.OverageReset, 0)
}
if statusResponse.Availability != nil {
switch availabilityState(statusResponse.Availability.State) {
case availabilityStateRateLimited:
c.state.hardRateLimited = true
if statusResponse.Availability.ResetAt > 0 {
c.state.rateLimitResetAt = time.Unix(statusResponse.Availability.ResetAt, 0)
}
case availabilityStateTemporarilyBlocked:
resetAt := time.Time{}
if statusResponse.Availability.ResetAt > 0 {
resetAt = time.Unix(statusResponse.Availability.ResetAt, 0)
}
c.state.setAvailability(availabilityStateTemporarilyBlocked, availabilityReason(statusResponse.Availability.Reason), resetAt)
if availabilityReason(statusResponse.Availability.Reason) == availabilityReasonUpstreamRejected && !resetAt.IsZero() {
c.state.upstreamRejectedUntil = resetAt
}
}
}
if c.state.hardRateLimited && time.Now().After(c.state.rateLimitResetAt) {
c.state.hardRateLimited = false
}
@@ -921,11 +848,6 @@ func (c *externalCredential) availabilityStatus() availabilityStatus {
return c.state.currentAvailability()
}
func (c *externalCredential) unifiedRateLimitState() unifiedRateLimitInfo {
c.stateAccess.RLock()
defer c.stateAccess.RUnlock()
return c.state.currentUnifiedRateLimit()
}
func (c *externalCredential) markUsageStreamUpdated() {
c.stateAccess.Lock()

View File

@@ -29,12 +29,6 @@ type availabilityStatus struct {
ResetAt time.Time
}
type availabilityPayload struct {
State string `json:"state"`
Reason string `json:"reason,omitempty"`
ResetAt int64 `json:"reset_at,omitempty"`
}
func (s availabilityStatus) normalized() availabilityStatus {
if s.State == "" {
s.State = availabilityStateUnknown
@@ -45,48 +39,6 @@ func (s availabilityStatus) normalized() availabilityStatus {
return s
}
func (s availabilityStatus) toPayload() *availabilityPayload {
s = s.normalized()
if s.State == "" {
return nil
}
payload := &availabilityPayload{
State: string(s.State),
}
if s.Reason != "" && s.Reason != availabilityReasonUnknown {
payload.Reason = string(s.Reason)
}
if !s.ResetAt.IsZero() {
payload.ResetAt = s.ResetAt.Unix()
}
return payload
}
type unifiedRateLimitStatus string
const (
unifiedRateLimitStatusAllowed unifiedRateLimitStatus = "allowed"
unifiedRateLimitStatusAllowedWarning unifiedRateLimitStatus = "allowed_warning"
unifiedRateLimitStatusRejected unifiedRateLimitStatus = "rejected"
)
type unifiedRateLimitInfo struct {
Status unifiedRateLimitStatus
ResetAt time.Time
RepresentativeClaim string
FallbackAvailable bool
OverageStatus string
OverageResetAt time.Time
OverageDisabledReason string
}
func (s unifiedRateLimitInfo) normalized() unifiedRateLimitInfo {
if s.Status == "" {
s.Status = unifiedRateLimitStatusAllowed
}
return s
}
func claudeWindowProgress(resetAt time.Time, windowSeconds float64, now time.Time) float64 {
if resetAt.IsZero() || windowSeconds <= 0 {
return 0

View File

@@ -13,19 +13,11 @@ import (
)
type statusPayload struct {
FiveHourUtilization float64 `json:"five_hour_utilization"`
FiveHourReset int64 `json:"five_hour_reset"`
WeeklyUtilization float64 `json:"weekly_utilization"`
WeeklyReset int64 `json:"weekly_reset"`
PlanWeight float64 `json:"plan_weight"`
UnifiedStatus string `json:"unified_status,omitempty"`
UnifiedReset int64 `json:"unified_reset,omitempty"`
RepresentativeClaim string `json:"representative_claim,omitempty"`
FallbackAvailable bool `json:"fallback_available,omitempty"`
OverageStatus string `json:"overage_status,omitempty"`
OverageReset int64 `json:"overage_reset,omitempty"`
OverageDisabledReason string `json:"overage_disabled_reason,omitempty"`
Availability *availabilityPayload `json:"availability,omitempty"`
FiveHourUtilization float64 `json:"five_hour_utilization"`
FiveHourReset int64 `json:"five_hour_reset"`
WeeklyUtilization float64 `json:"weekly_utilization"`
WeeklyReset int64 `json:"weekly_reset"`
PlanWeight float64 `json:"plan_weight"`
}
type aggregatedStatus struct {
@@ -34,7 +26,6 @@ type aggregatedStatus struct {
totalWeight float64
fiveHourReset time.Time
weeklyReset time.Time
unifiedRateLimit unifiedRateLimitInfo
availability availabilityStatus
}
@@ -50,27 +41,17 @@ func (s aggregatedStatus) equal(other aggregatedStatus) bool {
}
func (s aggregatedStatus) toPayload() statusPayload {
unified := s.unifiedRateLimit.normalized()
return statusPayload{
FiveHourUtilization: s.fiveHourUtilization,
FiveHourReset: resetToEpoch(s.fiveHourReset),
WeeklyUtilization: s.weeklyUtilization,
WeeklyReset: resetToEpoch(s.weeklyReset),
PlanWeight: s.totalWeight,
UnifiedStatus: string(unified.Status),
UnifiedReset: resetToEpoch(unified.ResetAt),
RepresentativeClaim: unified.RepresentativeClaim,
FallbackAvailable: unified.FallbackAvailable,
OverageStatus: unified.OverageStatus,
OverageReset: resetToEpoch(unified.OverageResetAt),
OverageDisabledReason: unified.OverageDisabledReason,
Availability: s.availability.toPayload(),
FiveHourUtilization: s.fiveHourUtilization,
FiveHourReset: resetToEpoch(s.fiveHourReset),
WeeklyUtilization: s.weeklyUtilization,
WeeklyReset: resetToEpoch(s.weeklyReset),
PlanWeight: s.totalWeight,
}
}
type aggregateInput struct {
availability availabilityStatus
unified unifiedRateLimitInfo
}
func aggregateAvailability(inputs []aggregateInput) availabilityStatus {
@@ -133,7 +114,9 @@ func aggregateAvailability(inputs []aggregateInput) availabilityStatus {
}
}
func chooseRepresentativeClaim(status unifiedRateLimitStatus, fiveHourUtilization float64, fiveHourReset time.Time, weeklyUtilization float64, weeklyReset time.Time, now time.Time) string {
func chooseRepresentativeClaim(fiveHourUtilization float64, fiveHourReset time.Time, weeklyUtilization float64, weeklyReset time.Time, now time.Time) string {
fiveHourWarning := claudeFiveHourWarning(fiveHourUtilization, fiveHourReset, now)
weeklyWarning := claudeWeeklyWarning(weeklyUtilization, weeklyReset, now)
type claimCandidate struct {
name string
priority int
@@ -142,15 +125,15 @@ func chooseRepresentativeClaim(status unifiedRateLimitStatus, fiveHourUtilizatio
candidateFor := func(name string, utilization float64, warning bool) claimCandidate {
priority := 0
switch {
case status == unifiedRateLimitStatusRejected && utilization >= 100:
case utilization >= 100:
priority = 2
case warning:
priority = 1
}
return claimCandidate{name: name, priority: priority, utilization: utilization}
}
five := candidateFor("5h", fiveHourUtilization, claudeFiveHourWarning(fiveHourUtilization, fiveHourReset, now))
weekly := candidateFor("7d", weeklyUtilization, claudeWeeklyWarning(weeklyUtilization, weeklyReset, now))
five := candidateFor("5h", fiveHourUtilization, fiveHourWarning)
weekly := candidateFor("7d", weeklyUtilization, weeklyWarning)
switch {
case five.priority > weekly.priority:
return five.name
@@ -169,53 +152,6 @@ func chooseRepresentativeClaim(status unifiedRateLimitStatus, fiveHourUtilizatio
}
}
func aggregateUnifiedRateLimit(inputs []aggregateInput, fiveHourUtilization float64, fiveHourReset time.Time, weeklyUtilization float64, weeklyReset time.Time, availability availabilityStatus) unifiedRateLimitInfo {
now := time.Now()
info := unifiedRateLimitInfo{}
usableCount := 0
for _, input := range inputs {
if input.availability.State == availabilityStateUsable {
usableCount++
}
if input.unified.OverageStatus != "" && info.OverageStatus == "" {
info.OverageStatus = input.unified.OverageStatus
info.OverageResetAt = input.unified.OverageResetAt
info.OverageDisabledReason = input.unified.OverageDisabledReason
}
if input.unified.Status == unifiedRateLimitStatusRejected {
info.Status = unifiedRateLimitStatusRejected
if !input.unified.ResetAt.IsZero() && (info.ResetAt.IsZero() || input.unified.ResetAt.Before(info.ResetAt)) {
info.ResetAt = input.unified.ResetAt
info.RepresentativeClaim = input.unified.RepresentativeClaim
}
}
}
if info.Status == "" {
switch {
case availability.State == availabilityStateRateLimited || fiveHourUtilization >= 100 || weeklyUtilization >= 100:
info.Status = unifiedRateLimitStatusRejected
info.ResetAt = availability.ResetAt
case claudeFiveHourWarning(fiveHourUtilization, fiveHourReset, now) || claudeWeeklyWarning(weeklyUtilization, weeklyReset, now):
info.Status = unifiedRateLimitStatusAllowedWarning
default:
info.Status = unifiedRateLimitStatusAllowed
}
}
info.FallbackAvailable = usableCount > 0 && len(inputs) > 1
if info.RepresentativeClaim == "" {
info.RepresentativeClaim = chooseRepresentativeClaim(info.Status, fiveHourUtilization, fiveHourReset, weeklyUtilization, weeklyReset, now)
}
if info.ResetAt.IsZero() {
switch info.RepresentativeClaim {
case "7d":
info.ResetAt = weeklyReset
default:
info.ResetAt = fiveHourReset
}
}
return info.normalized()
}
func (s *Service) handleStatusEndpoint(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
writeJSONError(w, r, http.StatusMethodNotAllowed, "invalid_request_error", "method not allowed")
@@ -350,7 +286,6 @@ func (s *Service) computeAggregatedUtilization(provider credentialProvider, user
}
visibleInputs = append(visibleInputs, aggregateInput{
availability: credential.availabilityStatus(),
unified: credential.unifiedRateLimitState(),
})
if !credential.hasSnapshotData() {
continue
@@ -393,7 +328,6 @@ func (s *Service) computeAggregatedUtilization(provider credentialProvider, user
result.fiveHourUtilization = 100
result.weeklyUtilization = 100
}
result.unifiedRateLimit = aggregateUnifiedRateLimit(visibleInputs, result.fiveHourUtilization, result.fiveHourReset, result.weeklyUtilization, result.weeklyReset, availability)
return result
}
result := aggregatedStatus{
@@ -410,66 +344,55 @@ func (s *Service) computeAggregatedUtilization(provider credentialProvider, user
avgHours := totalWeightedHoursUntilWeeklyReset / totalWeeklyResetWeight
result.weeklyReset = now.Add(time.Duration(avgHours * float64(time.Hour)))
}
result.unifiedRateLimit = aggregateUnifiedRateLimit(visibleInputs, result.fiveHourUtilization, result.fiveHourReset, result.weeklyUtilization, result.weeklyReset, availability)
return result
}
func (s *Service) rewriteResponseHeaders(headers http.Header, provider credentialProvider, userConfig *option.CCMUser) {
for key := range headers {
if strings.HasPrefix(strings.ToLower(key), "anthropic-ratelimit-unified-") {
headers.Del(key)
}
}
status := s.computeAggregatedUtilization(provider, userConfig)
now := time.Now()
headers.Set("anthropic-ratelimit-unified-5h-utilization", strconv.FormatFloat(status.fiveHourUtilization/100, 'f', 6, 64))
headers.Set("anthropic-ratelimit-unified-7d-utilization", strconv.FormatFloat(status.weeklyUtilization/100, 'f', 6, 64))
if !status.fiveHourReset.IsZero() {
headers.Set("anthropic-ratelimit-unified-5h-reset", strconv.FormatInt(status.fiveHourReset.Unix(), 10))
} else {
headers.Del("anthropic-ratelimit-unified-5h-reset")
}
if !status.weeklyReset.IsZero() {
headers.Set("anthropic-ratelimit-unified-7d-reset", strconv.FormatInt(status.weeklyReset.Unix(), 10))
} else {
headers.Del("anthropic-ratelimit-unified-7d-reset")
}
if status.totalWeight > 0 {
headers.Set("X-CCM-Plan-Weight", strconv.FormatFloat(status.totalWeight, 'f', -1, 64))
}
headers.Set("anthropic-ratelimit-unified-status", string(status.unifiedRateLimit.normalized().Status))
if !status.unifiedRateLimit.ResetAt.IsZero() {
headers.Set("anthropic-ratelimit-unified-reset", strconv.FormatInt(status.unifiedRateLimit.ResetAt.Unix(), 10))
} else {
headers.Del("anthropic-ratelimit-unified-reset")
fiveHourWarning := claudeFiveHourWarning(status.fiveHourUtilization, status.fiveHourReset, now)
weeklyWarning := claudeWeeklyWarning(status.weeklyUtilization, status.weeklyReset, now)
switch {
case status.fiveHourUtilization >= 100 || status.weeklyUtilization >= 100 ||
status.availability.State == availabilityStateRateLimited:
headers.Set("anthropic-ratelimit-unified-status", "rejected")
case fiveHourWarning || weeklyWarning:
headers.Set("anthropic-ratelimit-unified-status", "allowed_warning")
default:
headers.Set("anthropic-ratelimit-unified-status", "allowed")
}
if status.unifiedRateLimit.RepresentativeClaim != "" {
headers.Set("anthropic-ratelimit-unified-representative-claim", status.unifiedRateLimit.RepresentativeClaim)
} else {
headers.Del("anthropic-ratelimit-unified-representative-claim")
claim := chooseRepresentativeClaim(status.fiveHourUtilization, status.fiveHourReset, status.weeklyUtilization, status.weeklyReset, now)
headers.Set("anthropic-ratelimit-unified-representative-claim", claim)
switch claim {
case "7d":
if !status.weeklyReset.IsZero() {
headers.Set("anthropic-ratelimit-unified-reset", strconv.FormatInt(status.weeklyReset.Unix(), 10))
}
default:
if !status.fiveHourReset.IsZero() {
headers.Set("anthropic-ratelimit-unified-reset", strconv.FormatInt(status.fiveHourReset.Unix(), 10))
}
}
if status.unifiedRateLimit.FallbackAvailable {
headers.Set("anthropic-ratelimit-unified-fallback", "available")
} else {
headers.Del("anthropic-ratelimit-unified-fallback")
}
if status.unifiedRateLimit.OverageStatus != "" {
headers.Set("anthropic-ratelimit-unified-overage-status", status.unifiedRateLimit.OverageStatus)
} else {
headers.Del("anthropic-ratelimit-unified-overage-status")
}
if !status.unifiedRateLimit.OverageResetAt.IsZero() {
headers.Set("anthropic-ratelimit-unified-overage-reset", strconv.FormatInt(status.unifiedRateLimit.OverageResetAt.Unix(), 10))
} else {
headers.Del("anthropic-ratelimit-unified-overage-reset")
}
if status.unifiedRateLimit.OverageDisabledReason != "" {
headers.Set("anthropic-ratelimit-unified-overage-disabled-reason", status.unifiedRateLimit.OverageDisabledReason)
} else {
headers.Del("anthropic-ratelimit-unified-overage-disabled-reason")
}
if claudeFiveHourWarning(status.fiveHourUtilization, status.fiveHourReset, time.Now()) || status.fiveHourUtilization >= 100 {
if fiveHourWarning || status.fiveHourUtilization >= 100 {
headers.Set("anthropic-ratelimit-unified-5h-surpassed-threshold", "true")
} else {
headers.Del("anthropic-ratelimit-unified-5h-surpassed-threshold")
}
if claudeWeeklyWarning(status.weeklyUtilization, status.weeklyReset, time.Now()) || status.weeklyUtilization >= 100 {
if weeklyWarning || status.weeklyUtilization >= 100 {
headers.Set("anthropic-ratelimit-unified-7d-surpassed-threshold", "true")
} else {
headers.Del("anthropic-ratelimit-unified-7d-surpassed-threshold")
}
}

View File

@@ -24,28 +24,26 @@ type testCredential struct {
fiveReset time.Time
weeklyReset time.Time
availability availabilityStatus
unified unifiedRateLimitInfo
}
func (c *testCredential) tagName() string { return c.tag }
func (c *testCredential) isAvailable() bool { return c.available }
func (c *testCredential) isUsable() bool { return c.usable }
func (c *testCredential) isExternal() bool { return c.external }
func (c *testCredential) hasSnapshotData() bool { return c.hasData }
func (c *testCredential) fiveHourUtilization() float64 { return c.fiveHour }
func (c *testCredential) weeklyUtilization() float64 { return c.weekly }
func (c *testCredential) fiveHourCap() float64 { return c.fiveHourCapV }
func (c *testCredential) weeklyCap() float64 { return c.weeklyCapV }
func (c *testCredential) planWeight() float64 { return c.weight }
func (c *testCredential) fiveHourResetTime() time.Time { return c.fiveReset }
func (c *testCredential) weeklyResetTime() time.Time { return c.weeklyReset }
func (c *testCredential) markRateLimited(time.Time) {}
func (c *testCredential) markUpstreamRejected() {}
func (c *testCredential) availabilityStatus() availabilityStatus { return c.availability }
func (c *testCredential) unifiedRateLimitState() unifiedRateLimitInfo { return c.unified }
func (c *testCredential) earliestReset() time.Time { return c.fiveReset }
func (c *testCredential) unavailableError() error { return nil }
func (c *testCredential) getAccessToken() (string, error) { return "", nil }
func (c *testCredential) tagName() string { return c.tag }
func (c *testCredential) isAvailable() bool { return c.available }
func (c *testCredential) isUsable() bool { return c.usable }
func (c *testCredential) isExternal() bool { return c.external }
func (c *testCredential) hasSnapshotData() bool { return c.hasData }
func (c *testCredential) fiveHourUtilization() float64 { return c.fiveHour }
func (c *testCredential) weeklyUtilization() float64 { return c.weekly }
func (c *testCredential) fiveHourCap() float64 { return c.fiveHourCapV }
func (c *testCredential) weeklyCap() float64 { return c.weeklyCapV }
func (c *testCredential) planWeight() float64 { return c.weight }
func (c *testCredential) fiveHourResetTime() time.Time { return c.fiveReset }
func (c *testCredential) weeklyResetTime() time.Time { return c.weeklyReset }
func (c *testCredential) markRateLimited(time.Time) {}
func (c *testCredential) markUpstreamRejected() {}
func (c *testCredential) availabilityStatus() availabilityStatus { return c.availability }
func (c *testCredential) earliestReset() time.Time { return c.fiveReset }
func (c *testCredential) unavailableError() error { return nil }
func (c *testCredential) getAccessToken() (string, error) { return "", nil }
func (c *testCredential) buildProxyRequest(context.Context, *http.Request, []byte, http.Header) (*http.Request, error) {
return nil, nil
}
@@ -98,22 +96,18 @@ func TestComputeAggregatedUtilizationPreservesSnapshotForRateLimitedCredential(t
fiveReset: reset,
weeklyReset: reset.Add(2 * time.Hour),
availability: availabilityStatus{State: availabilityStateRateLimited, Reason: availabilityReasonHardRateLimit, ResetAt: reset},
unified: unifiedRateLimitInfo{Status: unifiedRateLimitStatusRejected, ResetAt: reset, RepresentativeClaim: "5h"},
},
}}, nil)
if status.fiveHourUtilization != 42 || status.weeklyUtilization != 18 {
t.Fatalf("expected preserved utilization, got 5h=%v weekly=%v", status.fiveHourUtilization, status.weeklyUtilization)
}
if status.unifiedRateLimit.Status != unifiedRateLimitStatusRejected {
t.Fatalf("expected rejected unified status, got %q", status.unifiedRateLimit.Status)
}
if status.availability.State != availabilityStateRateLimited {
t.Fatalf("expected rate-limited availability, got %#v", status.availability)
}
}
func TestRewriteResponseHeadersIncludesUnifiedHeaders(t *testing.T) {
func TestRewriteResponseHeadersComputesUnifiedStatus(t *testing.T) {
t.Parallel()
reset := time.Now().Add(80 * time.Minute)
@@ -147,6 +141,73 @@ func TestRewriteResponseHeadersIncludesUnifiedHeaders(t *testing.T) {
}
}
func TestRewriteResponseHeadersStripsUpstreamHeaders(t *testing.T) {
t.Parallel()
service := &Service{}
headers := make(http.Header)
headers.Set("anthropic-ratelimit-unified-overage-status", "rejected")
headers.Set("anthropic-ratelimit-unified-overage-disabled-reason", "org_level_disabled")
headers.Set("anthropic-ratelimit-unified-fallback", "available")
service.rewriteResponseHeaders(headers, &testProvider{credentials: []Credential{
&testCredential{
tag: "a",
available: true,
usable: true,
hasData: true,
fiveHour: 10,
weekly: 5,
fiveHourCapV: 100,
weeklyCapV: 100,
weight: 1,
fiveReset: time.Now().Add(3 * time.Hour),
weeklyReset: time.Now().Add(5 * 24 * time.Hour),
availability: availabilityStatus{State: availabilityStateUsable},
},
}}, nil)
if headers.Get("anthropic-ratelimit-unified-overage-status") != "" {
t.Fatalf("expected overage-status stripped, got %q", headers.Get("anthropic-ratelimit-unified-overage-status"))
}
if headers.Get("anthropic-ratelimit-unified-overage-disabled-reason") != "" {
t.Fatalf("expected overage-disabled-reason stripped, got %q", headers.Get("anthropic-ratelimit-unified-overage-disabled-reason"))
}
if headers.Get("anthropic-ratelimit-unified-fallback") != "" {
t.Fatalf("expected fallback stripped, got %q", headers.Get("anthropic-ratelimit-unified-fallback"))
}
if headers.Get("anthropic-ratelimit-unified-status") != "allowed" {
t.Fatalf("expected allowed status, got %q", headers.Get("anthropic-ratelimit-unified-status"))
}
}
func TestRewriteResponseHeadersRejectedOnHardRateLimit(t *testing.T) {
t.Parallel()
reset := time.Now().Add(10 * time.Minute)
service := &Service{}
headers := make(http.Header)
service.rewriteResponseHeaders(headers, &testProvider{credentials: []Credential{
&testCredential{
tag: "a",
available: true,
usable: false,
hasData: true,
fiveHour: 50,
weekly: 20,
fiveHourCapV: 100,
weeklyCapV: 100,
weight: 1,
fiveReset: reset,
weeklyReset: time.Now().Add(5 * 24 * time.Hour),
availability: availabilityStatus{State: availabilityStateRateLimited, Reason: availabilityReasonHardRateLimit, ResetAt: reset},
},
}}, nil)
if headers.Get("anthropic-ratelimit-unified-status") != "rejected" {
t.Fatalf("expected rejected (hard rate limited), got %q", headers.Get("anthropic-ratelimit-unified-status"))
}
}
func TestWriteCredentialUnavailableErrorReturns429ForRateLimitedCredentials(t *testing.T) {
t.Parallel()

View File

@@ -120,8 +120,6 @@ type Credential interface {
markUpstreamRejected()
markTemporarilyBlocked(reason availabilityReason, resetAt time.Time)
availabilityStatus() availabilityStatus
rateLimitSnapshots() []rateLimitSnapshot
activeLimitID() string
earliestReset() time.Time
unavailableError() error

View File

@@ -544,26 +544,6 @@ func (c *defaultCredential) availabilityStatus() availabilityStatus {
return c.state.currentAvailability()
}
func (c *defaultCredential) rateLimitSnapshots() []rateLimitSnapshot {
c.stateAccess.RLock()
defer c.stateAccess.RUnlock()
if len(c.state.rateLimitSnapshots) == 0 {
return nil
}
snapshots := make([]rateLimitSnapshot, 0, len(c.state.rateLimitSnapshots))
for _, snapshot := range c.state.rateLimitSnapshots {
snapshots = append(snapshots, cloneRateLimitSnapshot(snapshot))
}
sortRateLimitSnapshots(snapshots)
return snapshots
}
func (c *defaultCredential) activeLimitID() string {
c.stateAccess.RLock()
defer c.stateAccess.RUnlock()
return c.state.activeLimitID
}
func (c *defaultCredential) unavailableError() error {
c.stateAccess.RLock()
defer c.stateAccess.RUnlock()

View File

@@ -702,38 +702,16 @@ func (c *externalCredential) pollUsage() {
oldWeekly := c.state.weeklyUtilization
c.state.consecutivePollFailures = 0
c.state.upstreamRejectedUntil = time.Time{}
if len(statusResponse.Limits) > 0 {
applyRateLimitSnapshotsLocked(&c.state, statusResponse.Limits, statusResponse.ActiveLimit, statusResponse.PlanWeight, c.state.accountType)
} else {
c.state.fiveHourUtilization = statusResponse.FiveHourUtilization
c.state.weeklyUtilization = statusResponse.WeeklyUtilization
if statusResponse.FiveHourReset > 0 {
c.state.fiveHourReset = time.Unix(statusResponse.FiveHourReset, 0)
}
if statusResponse.WeeklyReset > 0 {
c.state.weeklyReset = time.Unix(statusResponse.WeeklyReset, 0)
}
if statusResponse.PlanWeight > 0 {
c.state.remotePlanWeight = statusResponse.PlanWeight
}
c.state.fiveHourUtilization = statusResponse.FiveHourUtilization
c.state.weeklyUtilization = statusResponse.WeeklyUtilization
if statusResponse.FiveHourReset > 0 {
c.state.fiveHourReset = time.Unix(statusResponse.FiveHourReset, 0)
}
if statusResponse.Availability != nil {
switch availabilityState(statusResponse.Availability.State) {
case availabilityStateRateLimited:
c.state.hardRateLimited = true
if statusResponse.Availability.ResetAt > 0 {
c.state.rateLimitResetAt = time.Unix(statusResponse.Availability.ResetAt, 0)
}
case availabilityStateTemporarilyBlocked:
resetAt := time.Time{}
if statusResponse.Availability.ResetAt > 0 {
resetAt = time.Unix(statusResponse.Availability.ResetAt, 0)
}
c.state.setAvailability(availabilityStateTemporarilyBlocked, availabilityReason(statusResponse.Availability.Reason), resetAt)
if availabilityReason(statusResponse.Availability.Reason) == availabilityReasonUpstreamRejected && !resetAt.IsZero() {
c.state.upstreamRejectedUntil = resetAt
}
}
if statusResponse.WeeklyReset > 0 {
c.state.weeklyReset = time.Unix(statusResponse.WeeklyReset, 0)
}
if statusResponse.PlanWeight > 0 {
c.state.remotePlanWeight = statusResponse.PlanWeight
}
if c.state.hardRateLimited && time.Now().After(c.state.rateLimitResetAt) {
c.state.hardRateLimited = false
@@ -833,38 +811,16 @@ func (c *externalCredential) connectStatusStream(ctx context.Context) (statusStr
oldWeekly := c.state.weeklyUtilization
c.state.consecutivePollFailures = 0
c.state.upstreamRejectedUntil = time.Time{}
if len(statusResponse.Limits) > 0 {
applyRateLimitSnapshotsLocked(&c.state, statusResponse.Limits, statusResponse.ActiveLimit, statusResponse.PlanWeight, c.state.accountType)
} else {
c.state.fiveHourUtilization = statusResponse.FiveHourUtilization
c.state.weeklyUtilization = statusResponse.WeeklyUtilization
if statusResponse.FiveHourReset > 0 {
c.state.fiveHourReset = time.Unix(statusResponse.FiveHourReset, 0)
}
if statusResponse.WeeklyReset > 0 {
c.state.weeklyReset = time.Unix(statusResponse.WeeklyReset, 0)
}
if statusResponse.PlanWeight > 0 {
c.state.remotePlanWeight = statusResponse.PlanWeight
}
c.state.fiveHourUtilization = statusResponse.FiveHourUtilization
c.state.weeklyUtilization = statusResponse.WeeklyUtilization
if statusResponse.FiveHourReset > 0 {
c.state.fiveHourReset = time.Unix(statusResponse.FiveHourReset, 0)
}
if statusResponse.Availability != nil {
switch availabilityState(statusResponse.Availability.State) {
case availabilityStateRateLimited:
c.state.hardRateLimited = true
if statusResponse.Availability.ResetAt > 0 {
c.state.rateLimitResetAt = time.Unix(statusResponse.Availability.ResetAt, 0)
}
case availabilityStateTemporarilyBlocked:
resetAt := time.Time{}
if statusResponse.Availability.ResetAt > 0 {
resetAt = time.Unix(statusResponse.Availability.ResetAt, 0)
}
c.state.setAvailability(availabilityStateTemporarilyBlocked, availabilityReason(statusResponse.Availability.Reason), resetAt)
if availabilityReason(statusResponse.Availability.Reason) == availabilityReasonUpstreamRejected && !resetAt.IsZero() {
c.state.upstreamRejectedUntil = resetAt
}
}
if statusResponse.WeeklyReset > 0 {
c.state.weeklyReset = time.Unix(statusResponse.WeeklyReset, 0)
}
if statusResponse.PlanWeight > 0 {
c.state.remotePlanWeight = statusResponse.PlanWeight
}
if c.state.hardRateLimited && time.Now().After(c.state.rateLimitResetAt) {
c.state.hardRateLimited = false
@@ -949,25 +905,6 @@ func (c *externalCredential) availabilityStatus() availabilityStatus {
return c.state.currentAvailability()
}
func (c *externalCredential) rateLimitSnapshots() []rateLimitSnapshot {
c.stateAccess.RLock()
defer c.stateAccess.RUnlock()
if len(c.state.rateLimitSnapshots) == 0 {
return nil
}
snapshots := make([]rateLimitSnapshot, 0, len(c.state.rateLimitSnapshots))
for _, snapshot := range c.state.rateLimitSnapshots {
snapshots = append(snapshots, cloneRateLimitSnapshot(snapshot))
}
sortRateLimitSnapshots(snapshots)
return snapshots
}
func (c *externalCredential) activeLimitID() string {
c.stateAccess.RLock()
defer c.stateAccess.RUnlock()
return c.state.activeLimitID
}
func (c *externalCredential) markUsageStreamUpdated() {
c.stateAccess.Lock()

View File

@@ -35,12 +35,6 @@ type availabilityStatus struct {
ResetAt time.Time
}
type availabilityPayload struct {
State string `json:"state"`
Reason string `json:"reason,omitempty"`
ResetAt int64 `json:"reset_at,omitempty"`
}
func (s availabilityStatus) normalized() availabilityStatus {
if s.State == "" {
s.State = availabilityStateUnknown
@@ -51,20 +45,6 @@ func (s availabilityStatus) normalized() availabilityStatus {
return s
}
func (s availabilityStatus) toPayload() *availabilityPayload {
s = s.normalized()
payload := &availabilityPayload{
State: string(s.State),
}
if s.Reason != "" && s.Reason != availabilityReasonUnknown {
payload.Reason = string(s.Reason)
}
if !s.ResetAt.IsZero() {
payload.ResetAt = s.ResetAt.Unix()
}
return payload
}
type creditsSnapshot struct {
HasCredits bool `json:"has_credits"`
Unlimited bool `json:"unlimited"`

View File

@@ -5,7 +5,6 @@ import (
"encoding/json"
"net/http"
"reflect"
"slices"
"strconv"
"strings"
"time"
@@ -14,14 +13,11 @@ import (
)
type statusPayload struct {
FiveHourUtilization float64 `json:"five_hour_utilization"`
FiveHourReset int64 `json:"five_hour_reset"`
WeeklyUtilization float64 `json:"weekly_utilization"`
WeeklyReset int64 `json:"weekly_reset"`
PlanWeight float64 `json:"plan_weight"`
ActiveLimit string `json:"active_limit,omitempty"`
Limits []rateLimitSnapshot `json:"limits,omitempty"`
Availability *availabilityPayload `json:"availability,omitempty"`
FiveHourUtilization float64 `json:"five_hour_utilization"`
FiveHourReset int64 `json:"five_hour_reset"`
WeeklyUtilization float64 `json:"weekly_utilization"`
WeeklyReset int64 `json:"weekly_reset"`
PlanWeight float64 `json:"plan_weight"`
}
type aggregatedStatus struct {
@@ -30,8 +26,6 @@ type aggregatedStatus struct {
totalWeight float64
fiveHourReset time.Time
weeklyReset time.Time
activeLimitID string
limits []rateLimitSnapshot
availability availabilityStatus
}
@@ -53,24 +47,13 @@ func (s aggregatedStatus) toPayload() statusPayload {
WeeklyUtilization: s.weeklyUtilization,
WeeklyReset: resetToEpoch(s.weeklyReset),
PlanWeight: s.totalWeight,
ActiveLimit: s.activeLimitID,
Limits: slices.Clone(s.limits),
Availability: s.availability.toPayload(),
}
}
type aggregateInput struct {
weight float64
snapshots []rateLimitSnapshot
activeLimit string
availability availabilityStatus
}
type snapshotContribution struct {
weight float64
snapshot rateLimitSnapshot
}
func aggregateAvailability(inputs []aggregateInput) availabilityStatus {
if len(inputs) == 0 {
return availabilityStatus{
@@ -139,167 +122,6 @@ func aggregateAvailability(inputs []aggregateInput) availabilityStatus {
}
}
func aggregateRateLimitWindow(contributions []snapshotContribution, selector func(rateLimitSnapshot) *rateLimitWindow) *rateLimitWindow {
var totalWeight float64
var totalRemaining float64
var totalWindowMinutes float64
var totalResetHours float64
var resetWeight float64
now := time.Now()
for _, contribution := range contributions {
window := selector(contribution.snapshot)
if window == nil {
continue
}
totalWeight += contribution.weight
totalRemaining += (100 - window.UsedPercent) * contribution.weight
if window.WindowMinutes > 0 {
totalWindowMinutes += float64(window.WindowMinutes) * contribution.weight
}
if window.ResetAt > 0 {
resetTime := time.Unix(window.ResetAt, 0)
hours := resetTime.Sub(now).Hours()
if hours > 0 {
totalResetHours += hours * contribution.weight
resetWeight += contribution.weight
}
}
}
if totalWeight == 0 {
return nil
}
window := &rateLimitWindow{
UsedPercent: 100 - totalRemaining/totalWeight,
}
if totalWindowMinutes > 0 {
window.WindowMinutes = int64(totalWindowMinutes / totalWeight)
}
if resetWeight > 0 {
window.ResetAt = now.Add(time.Duration(totalResetHours / resetWeight * float64(time.Hour))).Unix()
}
return window
}
func aggregateCredits(contributions []snapshotContribution) *creditsSnapshot {
var hasCredits bool
var unlimited bool
var balanceTotal float64
var hasBalance bool
for _, contribution := range contributions {
if contribution.snapshot.Credits == nil {
continue
}
hasCredits = hasCredits || contribution.snapshot.Credits.HasCredits
unlimited = unlimited || contribution.snapshot.Credits.Unlimited
if balance := strings.TrimSpace(contribution.snapshot.Credits.Balance); balance != "" {
value, err := strconv.ParseFloat(balance, 64)
if err == nil {
balanceTotal += value
hasBalance = true
}
}
}
if !hasCredits && !unlimited && !hasBalance {
return nil
}
credits := &creditsSnapshot{
HasCredits: hasCredits,
Unlimited: unlimited,
}
if hasBalance && !unlimited {
credits.Balance = strconv.FormatFloat(balanceTotal, 'f', -1, 64)
}
return credits
}
func aggregateSnapshots(inputs []aggregateInput) []rateLimitSnapshot {
grouped := make(map[string][]snapshotContribution)
for _, input := range inputs {
for _, snapshot := range input.snapshots {
limitID := snapshot.LimitID
if limitID == "" {
limitID = "codex"
}
grouped[limitID] = append(grouped[limitID], snapshotContribution{
weight: input.weight,
snapshot: snapshot,
})
}
}
if len(grouped) == 0 {
return nil
}
aggregated := make([]rateLimitSnapshot, 0, len(grouped))
for limitID, contributions := range grouped {
snapshot := defaultRateLimitSnapshot(limitID)
var bestPlanWeight float64
for _, contribution := range contributions {
if contribution.snapshot.LimitName != "" && snapshot.LimitName == "" {
snapshot.LimitName = contribution.snapshot.LimitName
}
if contribution.snapshot.PlanType != "" && contribution.weight >= bestPlanWeight {
bestPlanWeight = contribution.weight
snapshot.PlanType = contribution.snapshot.PlanType
}
}
snapshot.Primary = aggregateRateLimitWindow(contributions, func(snapshot rateLimitSnapshot) *rateLimitWindow {
return snapshot.Primary
})
snapshot.Secondary = aggregateRateLimitWindow(contributions, func(snapshot rateLimitSnapshot) *rateLimitWindow {
return snapshot.Secondary
})
snapshot.Credits = aggregateCredits(contributions)
if snapshot.Primary == nil && snapshot.Secondary == nil && snapshot.Credits == nil {
continue
}
aggregated = append(aggregated, snapshot)
}
sortRateLimitSnapshots(aggregated)
return aggregated
}
func selectActiveLimitID(inputs []aggregateInput, snapshots []rateLimitSnapshot) string {
if len(snapshots) == 0 {
return ""
}
weights := make(map[string]float64)
for _, input := range inputs {
if input.activeLimit == "" {
continue
}
weights[normalizeStoredLimitID(input.activeLimit)] += input.weight
}
var (
bestID string
bestWeight float64
)
for limitID, weight := range weights {
if weight > bestWeight {
bestID = limitID
bestWeight = weight
}
}
if bestID != "" {
return bestID
}
for _, snapshot := range snapshots {
if snapshot.LimitID == "codex" {
return "codex"
}
}
return snapshots[0].LimitID
}
func findSnapshotByLimitID(snapshots []rateLimitSnapshot, limitID string) *rateLimitSnapshot {
for _, snapshot := range snapshots {
if snapshot.LimitID == limitID {
snapshotCopy := snapshot
return &snapshotCopy
}
}
return nil
}
func (s *Service) handleStatusEndpoint(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
writeJSONError(w, r, http.StatusMethodNotAllowed, "invalid_request_error", "method not allowed")
@@ -420,7 +242,10 @@ func (s *Service) handleStatusStream(w http.ResponseWriter, r *http.Request, pro
func (s *Service) computeAggregatedUtilization(provider credentialProvider, userConfig *option.OCMUser) aggregatedStatus {
inputs := make([]aggregateInput, 0, len(provider.allCredentials()))
var totalWeight float64
var totalWeightedRemaining5h, totalWeightedRemainingWeekly, totalWeight float64
now := time.Now()
var totalWeightedHoursUntil5hReset, total5hResetWeight float64
var totalWeightedHoursUntilWeeklyReset, totalWeeklyResetWeight float64
var hasSnapshotData bool
for _, credential := range provider.allCredentials() {
if userConfig != nil && userConfig.ExternalCredential != "" && credential.tagName() == userConfig.ExternalCredential {
@@ -429,61 +254,70 @@ func (s *Service) computeAggregatedUtilization(provider credentialProvider, user
if userConfig != nil && !userConfig.AllowExternalUsage && credential.isExternal() {
continue
}
input := aggregateInput{
weight: credential.planWeight(),
snapshots: credential.rateLimitSnapshots(),
activeLimit: credential.activeLimitID(),
inputs = append(inputs, aggregateInput{
availability: credential.availabilityStatus(),
})
if !credential.hasSnapshotData() {
continue
}
inputs = append(inputs, input)
if credential.hasSnapshotData() {
hasSnapshotData = true
hasSnapshotData = true
weight := credential.planWeight()
remaining5h := credential.fiveHourCap() - credential.fiveHourUtilization()
if remaining5h < 0 {
remaining5h = 0
}
remainingWeekly := credential.weeklyCap() - credential.weeklyUtilization()
if remainingWeekly < 0 {
remainingWeekly = 0
}
totalWeightedRemaining5h += remaining5h * weight
totalWeightedRemainingWeekly += remainingWeekly * weight
totalWeight += weight
fiveHourReset := credential.fiveHourResetTime()
if !fiveHourReset.IsZero() {
hours := fiveHourReset.Sub(now).Hours()
if hours > 0 {
totalWeightedHoursUntil5hReset += hours * weight
total5hResetWeight += weight
}
}
weeklyReset := credential.weeklyResetTime()
if !weeklyReset.IsZero() {
hours := weeklyReset.Sub(now).Hours()
if hours > 0 {
totalWeightedHoursUntilWeeklyReset += hours * weight
totalWeeklyResetWeight += weight
}
}
totalWeight += input.weight
}
limits := aggregateSnapshots(inputs)
availability := aggregateAvailability(inputs)
if totalWeight == 0 {
result := aggregatedStatus{availability: availability}
if !hasSnapshotData {
result.fiveHourUtilization = 100
result.weeklyUtilization = 100
}
return result
}
result := aggregatedStatus{
totalWeight: totalWeight,
availability: aggregateAvailability(inputs),
limits: limits,
activeLimitID: selectActiveLimitID(inputs, limits),
fiveHourUtilization: 100 - totalWeightedRemaining5h/totalWeight,
weeklyUtilization: 100 - totalWeightedRemainingWeekly/totalWeight,
totalWeight: totalWeight,
availability: availability,
}
if legacy := findSnapshotByLimitID(result.limits, "codex"); legacy != nil {
if legacy.Primary != nil {
result.fiveHourUtilization = legacy.Primary.UsedPercent
if legacy.Primary.ResetAt > 0 {
result.fiveHourReset = time.Unix(legacy.Primary.ResetAt, 0)
}
}
if legacy.Secondary != nil {
result.weeklyUtilization = legacy.Secondary.UsedPercent
if legacy.Secondary.ResetAt > 0 {
result.weeklyReset = time.Unix(legacy.Secondary.ResetAt, 0)
}
}
} else if legacy := findSnapshotByLimitID(result.limits, result.activeLimitID); legacy != nil {
if legacy.Primary != nil {
result.fiveHourUtilization = legacy.Primary.UsedPercent
if legacy.Primary.ResetAt > 0 {
result.fiveHourReset = time.Unix(legacy.Primary.ResetAt, 0)
}
}
if legacy.Secondary != nil {
result.weeklyUtilization = legacy.Secondary.UsedPercent
if legacy.Secondary.ResetAt > 0 {
result.weeklyReset = time.Unix(legacy.Secondary.ResetAt, 0)
}
}
if total5hResetWeight > 0 {
avgHours := totalWeightedHoursUntil5hReset / total5hResetWeight
result.fiveHourReset = now.Add(time.Duration(avgHours * float64(time.Hour)))
}
if len(result.limits) == 0 && !hasSnapshotData {
result.fiveHourUtilization = 100
result.weeklyUtilization = 100
if totalWeeklyResetWeight > 0 {
avgHours := totalWeightedHoursUntilWeeklyReset / totalWeeklyResetWeight
result.weeklyReset = now.Add(time.Duration(avgHours * float64(time.Hour)))
}
return result
}
func (s *Service) rewriteResponseHeaders(headers http.Header, provider credentialProvider, userConfig *option.OCMUser) {
status := s.computeAggregatedUtilization(provider, userConfig)
for key := range headers {
lowerKey := strings.ToLower(key)
if lowerKey == "x-codex-active-limit" ||
@@ -498,51 +332,16 @@ func (s *Service) rewriteResponseHeaders(headers http.Header, provider credentia
headers.Del(key)
}
}
headers.Set("x-codex-active-limit", headerLimitID(status.activeLimitID))
status := s.computeAggregatedUtilization(provider, userConfig)
headers.Set("x-codex-primary-used-percent", strconv.FormatFloat(status.fiveHourUtilization, 'f', 2, 64))
headers.Set("x-codex-secondary-used-percent", strconv.FormatFloat(status.weeklyUtilization, 'f', 2, 64))
if !status.fiveHourReset.IsZero() {
headers.Set("x-codex-primary-reset-at", strconv.FormatInt(status.fiveHourReset.Unix(), 10))
} else {
headers.Del("x-codex-primary-reset-at")
}
if !status.weeklyReset.IsZero() {
headers.Set("x-codex-secondary-reset-at", strconv.FormatInt(status.weeklyReset.Unix(), 10))
} else {
headers.Del("x-codex-secondary-reset-at")
}
if status.totalWeight > 0 {
headers.Set("X-OCM-Plan-Weight", strconv.FormatFloat(status.totalWeight, 'f', -1, 64))
}
for _, snapshot := range status.limits {
prefix := "x-" + headerLimitID(snapshot.LimitID)
if snapshot.Primary != nil {
headers.Set(prefix+"-primary-used-percent", strconv.FormatFloat(snapshot.Primary.UsedPercent, 'f', 2, 64))
if snapshot.Primary.WindowMinutes > 0 {
headers.Set(prefix+"-primary-window-minutes", strconv.FormatInt(snapshot.Primary.WindowMinutes, 10))
}
if snapshot.Primary.ResetAt > 0 {
headers.Set(prefix+"-primary-reset-at", strconv.FormatInt(snapshot.Primary.ResetAt, 10))
}
}
if snapshot.Secondary != nil {
headers.Set(prefix+"-secondary-used-percent", strconv.FormatFloat(snapshot.Secondary.UsedPercent, 'f', 2, 64))
if snapshot.Secondary.WindowMinutes > 0 {
headers.Set(prefix+"-secondary-window-minutes", strconv.FormatInt(snapshot.Secondary.WindowMinutes, 10))
}
if snapshot.Secondary.ResetAt > 0 {
headers.Set(prefix+"-secondary-reset-at", strconv.FormatInt(snapshot.Secondary.ResetAt, 10))
}
}
if snapshot.LimitName != "" {
headers.Set(prefix+"-limit-name", snapshot.LimitName)
}
if snapshot.LimitID == "codex" && snapshot.Credits != nil {
headers.Set("x-codex-credits-has-credits", strconv.FormatBool(snapshot.Credits.HasCredits))
headers.Set("x-codex-credits-unlimited", strconv.FormatBool(snapshot.Credits.Unlimited))
if snapshot.Credits.Balance != "" {
headers.Set("x-codex-credits-balance", snapshot.Credits.Balance)
}
}
}
}

View File

@@ -26,8 +26,6 @@ type testCredential struct {
fiveReset time.Time
weeklyReset time.Time
availability availabilityStatus
activeLimit string
snapshots []rateLimitSnapshot
}
func (c *testCredential) tagName() string { return c.tag }
@@ -48,10 +46,6 @@ func (c *testCredential) markTemporarilyBlocked(reason availabilityReason, reset
c.availability = availabilityStatus{State: availabilityStateTemporarilyBlocked, Reason: reason, ResetAt: resetAt}
}
func (c *testCredential) availabilityStatus() availabilityStatus { return c.availability }
func (c *testCredential) rateLimitSnapshots() []rateLimitSnapshot {
return slicesCloneSnapshots(c.snapshots)
}
func (c *testCredential) activeLimitID() string { return c.activeLimit }
func (c *testCredential) earliestReset() time.Time { return c.fiveReset }
func (c *testCredential) unavailableError() error { return nil }
func (c *testCredential) getAccessToken() (string, error) { return "", nil }
@@ -75,17 +69,6 @@ func (c *testCredential) ocmIsAPIKeyMode() bool
func (c *testCredential) ocmGetAccountID() string { return "" }
func (c *testCredential) ocmGetBaseURL() string { return "" }
func slicesCloneSnapshots(snapshots []rateLimitSnapshot) []rateLimitSnapshot {
if len(snapshots) == 0 {
return nil
}
cloned := make([]rateLimitSnapshot, 0, len(snapshots))
for _, snapshot := range snapshots {
cloned = append(cloned, cloneRateLimitSnapshot(snapshot))
}
return cloned
}
type testProvider struct {
credentials []Credential
}
@@ -104,78 +87,6 @@ func (p *testProvider) pollCredentialIfStale(Credential) {}
func (p *testProvider) allCredentials() []Credential { return p.credentials }
func (p *testProvider) close() {}
func TestComputeAggregatedUtilizationPreservesStoredSnapshots(t *testing.T) {
t.Parallel()
service := &Service{}
status := service.computeAggregatedUtilization(&testProvider{credentials: []Credential{
&testCredential{
tag: "a",
available: true,
usable: false,
hasData: true,
weight: 1,
activeLimit: "codex",
availability: availabilityStatus{State: availabilityStateRateLimited, Reason: availabilityReasonHardRateLimit, ResetAt: time.Now().Add(time.Minute)},
snapshots: []rateLimitSnapshot{
{
LimitID: "codex",
Primary: &rateLimitWindow{UsedPercent: 44, WindowMinutes: 300, ResetAt: time.Now().Add(time.Hour).Unix()},
Secondary: &rateLimitWindow{UsedPercent: 12, WindowMinutes: 10080, ResetAt: time.Now().Add(24 * time.Hour).Unix()},
},
},
},
}}, nil)
if status.fiveHourUtilization != 44 || status.weeklyUtilization != 12 {
t.Fatalf("expected stored snapshot utilization, got 5h=%v weekly=%v", status.fiveHourUtilization, status.weeklyUtilization)
}
if status.availability.State != availabilityStateRateLimited {
t.Fatalf("expected rate-limited availability, got %#v", status.availability)
}
}
func TestRewriteResponseHeadersIncludesAdditionalLimitFamiliesAndCredits(t *testing.T) {
t.Parallel()
service := &Service{}
headers := make(http.Header)
service.rewriteResponseHeaders(headers, &testProvider{credentials: []Credential{
&testCredential{
tag: "a",
available: true,
usable: true,
hasData: true,
weight: 1,
activeLimit: "codex_other",
availability: availabilityStatus{State: availabilityStateUsable},
snapshots: []rateLimitSnapshot{
{
LimitID: "codex",
Primary: &rateLimitWindow{UsedPercent: 20, WindowMinutes: 300, ResetAt: time.Now().Add(time.Hour).Unix()},
Secondary: &rateLimitWindow{UsedPercent: 40, WindowMinutes: 10080, ResetAt: time.Now().Add(24 * time.Hour).Unix()},
Credits: &creditsSnapshot{HasCredits: true, Unlimited: false, Balance: "12"},
},
{
LimitID: "codex_other",
LimitName: "codex-other",
Primary: &rateLimitWindow{UsedPercent: 60, WindowMinutes: 60, ResetAt: time.Now().Add(30 * time.Minute).Unix()},
},
},
},
}}, nil)
if headers.Get("x-codex-active-limit") != "codex-other" {
t.Fatalf("expected active limit header, got %q", headers.Get("x-codex-active-limit"))
}
if headers.Get("x-codex-other-primary-used-percent") == "" {
t.Fatal("expected additional rate-limit family header")
}
if headers.Get("x-codex-credits-balance") != "12" {
t.Fatalf("expected credits balance header, got %q", headers.Get("x-codex-credits-balance"))
}
}
func TestHandleWebSocketErrorEventConnectionLimitDoesNotUseRateLimitPath(t *testing.T) {
t.Parallel()
@@ -201,7 +112,6 @@ func TestWriteCredentialUnavailableErrorReturns429ForRateLimitedCredentials(t *t
hasData: true,
weight: 1,
availability: availabilityStatus{State: availabilityStateRateLimited, Reason: availabilityReasonHardRateLimit, ResetAt: time.Now().Add(time.Minute)},
snapshots: []rateLimitSnapshot{{LimitID: "codex", Primary: &rateLimitWindow{UsedPercent: 80}}},
},
}}

View File

@@ -333,7 +333,7 @@ func (s *Service) handleWebSocket(
var firstRealRequestOnce sync.Once
var waitGroup sync.WaitGroup
waitGroup.Add(3)
waitGroup.Add(2)
go func() {
defer waitGroup.Done()
defer session.Close()
@@ -344,11 +344,6 @@ func (s *Service) handleWebSocket(
defer session.Close()
s.proxyWebSocketUpstreamToClient(ctx, upstreamReadWriter, clientConn, &clientWriteAccess, selectedCredential, modelChannel, username, weeklyCycleHint)
}()
go func() {
defer waitGroup.Done()
defer session.Close()
s.pushWebSocketAggregatedStatus(ctx, clientConn, &clientWriteAccess, session.closed, firstRealRequest, provider, userConfig)
}()
waitGroup.Wait()
}
@@ -552,171 +547,6 @@ func (s *Service) handleWebSocketErrorEvent(data []byte, selectedCredential Cred
selectedCredential.markRateLimited(resetAt)
}
func writeWebSocketAggregatedStatus(clientConn net.Conn, clientWriteAccess *sync.Mutex, status aggregatedStatus) error {
clientWriteAccess.Lock()
defer clientWriteAccess.Unlock()
for _, data := range buildSyntheticRateLimitsEvents(status) {
if err := wsutil.WriteServerMessage(clientConn, ws.OpText, data); err != nil {
return err
}
}
return nil
}
func (s *Service) pushWebSocketAggregatedStatus(ctx context.Context, clientConn net.Conn, clientWriteAccess *sync.Mutex, sessionClosed <-chan struct{}, firstRealRequest <-chan struct{}, provider credentialProvider, userConfig *option.OCMUser) {
subscription, done, err := s.statusObserver.Subscribe()
if err != nil {
return
}
defer s.statusObserver.UnSubscribe(subscription)
var last aggregatedStatus
hasLast := false
for {
select {
case <-ctx.Done():
return
case <-done:
return
case <-sessionClosed:
return
case <-firstRealRequest:
current := s.computeAggregatedUtilization(provider, userConfig)
err = writeWebSocketAggregatedStatus(clientConn, clientWriteAccess, current)
if err != nil {
return
}
last = current
hasLast = true
firstRealRequest = nil
case <-subscription:
for {
select {
case <-subscription:
default:
goto drained
}
}
drained:
if !hasLast {
continue
}
current := s.computeAggregatedUtilization(provider, userConfig)
if current.equal(last) {
continue
}
last = current
err = writeWebSocketAggregatedStatus(clientConn, clientWriteAccess, current)
if err != nil {
return
}
}
}
}
func buildSyntheticRateLimitsEvents(status aggregatedStatus) [][]byte {
type rateLimitWindow struct {
UsedPercent float64 `json:"used_percent"`
WindowMinutes int64 `json:"window_minutes,omitempty"`
ResetAt int64 `json:"reset_at,omitempty"`
}
type creditsEvent struct {
HasCredits bool `json:"has_credits"`
Unlimited bool `json:"unlimited"`
Balance string `json:"balance,omitempty"`
}
type eventPayload struct {
Type string `json:"type"`
RateLimits struct {
Primary *rateLimitWindow `json:"primary,omitempty"`
Secondary *rateLimitWindow `json:"secondary,omitempty"`
} `json:"rate_limits"`
MeteredLimitName string `json:"metered_limit_name,omitempty"`
LimitName string `json:"limit_name,omitempty"`
Credits *creditsEvent `json:"credits,omitempty"`
PlanWeight float64 `json:"plan_weight,omitempty"`
}
buildEvent := func(snapshot rateLimitSnapshot, primary *rateLimitWindow, secondary *rateLimitWindow) []byte {
event := eventPayload{
Type: "codex.rate_limits",
MeteredLimitName: snapshot.LimitID,
LimitName: snapshot.LimitName,
PlanWeight: status.totalWeight,
}
if event.MeteredLimitName == "" {
event.MeteredLimitName = "codex"
}
if event.LimitName == "" {
event.LimitName = strings.ReplaceAll(event.MeteredLimitName, "_", "-")
}
event.RateLimits.Primary = primary
event.RateLimits.Secondary = secondary
if snapshot.Credits != nil {
event.Credits = &creditsEvent{
HasCredits: snapshot.Credits.HasCredits,
Unlimited: snapshot.Credits.Unlimited,
Balance: snapshot.Credits.Balance,
}
}
data, _ := json.Marshal(event)
return data
}
defaultPrimary := &rateLimitWindow{
UsedPercent: status.fiveHourUtilization,
ResetAt: resetToEpoch(status.fiveHourReset),
}
defaultSecondary := &rateLimitWindow{
UsedPercent: status.weeklyUtilization,
ResetAt: resetToEpoch(status.weeklyReset),
}
events := make([][]byte, 0, 1+len(status.limits))
if snapshot := findSnapshotByLimitID(status.limits, "codex"); snapshot != nil {
primary := defaultPrimary
if snapshot.Primary != nil {
primary = &rateLimitWindow{
UsedPercent: snapshot.Primary.UsedPercent,
WindowMinutes: snapshot.Primary.WindowMinutes,
ResetAt: snapshot.Primary.ResetAt,
}
}
secondary := defaultSecondary
if snapshot.Secondary != nil {
secondary = &rateLimitWindow{
UsedPercent: snapshot.Secondary.UsedPercent,
WindowMinutes: snapshot.Secondary.WindowMinutes,
ResetAt: snapshot.Secondary.ResetAt,
}
}
events = append(events, buildEvent(*snapshot, primary, secondary))
} else {
events = append(events, buildEvent(rateLimitSnapshot{LimitID: "codex", LimitName: "codex"}, defaultPrimary, defaultSecondary))
}
for _, snapshot := range status.limits {
if snapshot.LimitID == "codex" {
continue
}
var primary *rateLimitWindow
if snapshot.Primary != nil {
primary = &rateLimitWindow{
UsedPercent: snapshot.Primary.UsedPercent,
WindowMinutes: snapshot.Primary.WindowMinutes,
ResetAt: snapshot.Primary.ResetAt,
}
}
var secondary *rateLimitWindow
if snapshot.Secondary != nil {
secondary = &rateLimitWindow{
UsedPercent: snapshot.Secondary.UsedPercent,
WindowMinutes: snapshot.Secondary.WindowMinutes,
ResetAt: snapshot.Secondary.ResetAt,
}
}
events = append(events, buildEvent(snapshot, primary, secondary))
}
return events
}
func (s *Service) handleWebSocketResponseCompleted(data []byte, usageTracker *AggregatedUsage, requestModel string, username string, weeklyCycleHint *WeeklyCycleHint) {
var streamEvent responses.ResponseStreamEventUnion
if json.Unmarshal(data, &streamEvent) != nil {