diff --git a/option/ccm.go b/option/ccm.go index ae80cc64b..dd55a4ba4 100644 --- a/option/ccm.go +++ b/option/ccm.go @@ -86,6 +86,8 @@ type CCMDefaultCredentialOptions struct { Detour string `json:"detour,omitempty"` Reserve5h uint8 `json:"reserve_5h"` ReserveWeekly uint8 `json:"reserve_weekly"` + Limit5h uint8 `json:"limit_5h,omitempty"` + LimitWeekly uint8 `json:"limit_weekly,omitempty"` } type CCMBalancerCredentialOptions struct { diff --git a/option/ocm.go b/option/ocm.go index 20cafee12..e508abae7 100644 --- a/option/ocm.go +++ b/option/ocm.go @@ -86,6 +86,8 @@ type OCMDefaultCredentialOptions struct { Detour string `json:"detour,omitempty"` Reserve5h uint8 `json:"reserve_5h"` ReserveWeekly uint8 `json:"reserve_weekly"` + Limit5h uint8 `json:"limit_5h,omitempty"` + LimitWeekly uint8 `json:"limit_weekly,omitempty"` } type OCMBalancerCredentialOptions struct { diff --git a/service/ccm/credential_external.go b/service/ccm/credential_external.go index 8a550719b..74ce6617e 100644 --- a/service/ccm/credential_external.go +++ b/service/ccm/credential_external.go @@ -271,6 +271,14 @@ func (c *externalCredential) weeklyUtilization() float64 { return c.state.weeklyUtilization } +func (c *externalCredential) fiveHourCap() float64 { + return 100 +} + +func (c *externalCredential) weeklyCap() float64 { + return 100 +} + func (c *externalCredential) markRateLimited(resetAt time.Time) { c.logger.Warn("rate limited for ", c.tag, ", reset in ", log.FormatDuration(time.Until(resetAt))) c.stateMutex.Lock() @@ -397,7 +405,11 @@ func (c *externalCredential) updateStateFromHeaders(headers http.Header) { c.state.lastUpdated = time.Now() } if isFirstUpdate || int(c.state.fiveHourUtilization*100) != int(oldFiveHour*100) || int(c.state.weeklyUtilization*100) != int(oldWeekly*100) { - c.logger.Debug("usage update for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%") + resetSuffix := "" + if !c.state.weeklyReset.IsZero() { + resetSuffix = ", resets=" + log.FormatDuration(time.Until(c.state.weeklyReset)) + } + c.logger.Debug("usage update for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%", resetSuffix) } shouldInterrupt := c.checkTransitionLocked() c.stateMutex.Unlock() @@ -507,7 +519,11 @@ func (c *externalCredential) pollUsage(ctx context.Context) { c.state.hardRateLimited = false } if isFirstUpdate || int(c.state.fiveHourUtilization*100) != int(oldFiveHour*100) || int(c.state.weeklyUtilization*100) != int(oldWeekly*100) { - c.logger.Debug("poll usage for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%") + resetSuffix := "" + if !c.state.weeklyReset.IsZero() { + resetSuffix = ", resets=" + log.FormatDuration(time.Until(c.state.weeklyReset)) + } + c.logger.Debug("poll usage for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%", resetSuffix) } shouldInterrupt := c.checkTransitionLocked() c.stateMutex.Unlock() diff --git a/service/ccm/credential_state.go b/service/ccm/credential_state.go index 35fb52dce..d681c222b 100644 --- a/service/ccm/credential_state.go +++ b/service/ccm/credential_state.go @@ -89,6 +89,8 @@ type defaultCredential struct { watcherAccess sync.Mutex reserve5h uint8 reserveWeekly uint8 + cap5h float64 + capWeekly float64 usageTracker *AggregatedUsage httpClient *http.Client logger log.ContextLogger @@ -129,6 +131,8 @@ type credential interface { isExternal() bool fiveHourUtilization() float64 weeklyUtilization() float64 + fiveHourCap() float64 + weeklyCap() float64 markRateLimited(resetAt time.Time) earliestReset() time.Time unavailableError() error @@ -180,6 +184,18 @@ func newDefaultCredential(ctx context.Context, tag string, options option.CCMDef if reserveWeekly == 0 { reserveWeekly = 10 } + var cap5h float64 + if options.Limit5h > 0 { + cap5h = float64(options.Limit5h) + } else { + cap5h = float64(100 - reserve5h) + } + var capWeekly float64 + if options.LimitWeekly > 0 { + capWeekly = float64(options.LimitWeekly) + } else { + capWeekly = float64(100 - reserveWeekly) + } requestContext, cancelRequests := context.WithCancel(context.Background()) credential := &defaultCredential{ tag: tag, @@ -187,6 +203,8 @@ func newDefaultCredential(ctx context.Context, tag string, options option.CCMDef credentialPath: options.CredentialPath, reserve5h: reserve5h, reserveWeekly: reserveWeekly, + cap5h: cap5h, + capWeekly: capWeekly, httpClient: httpClient, logger: logger, requestContext: requestContext, @@ -380,7 +398,11 @@ func (c *defaultCredential) updateStateFromHeaders(headers http.Header) { c.state.lastUpdated = time.Now() } if isFirstUpdate || int(c.state.fiveHourUtilization*100) != int(oldFiveHour*100) || int(c.state.weeklyUtilization*100) != int(oldWeekly*100) { - c.logger.Debug("usage update for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%") + resetSuffix := "" + if !c.state.weeklyReset.IsZero() { + resetSuffix = ", resets=" + log.FormatDuration(time.Until(c.state.weeklyReset)) + } + c.logger.Debug("usage update for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%", resetSuffix) } shouldInterrupt := c.checkTransitionLocked() c.stateMutex.Unlock() @@ -429,10 +451,10 @@ func (c *defaultCredential) isUsable() bool { } func (c *defaultCredential) checkReservesLocked() bool { - if c.state.fiveHourUtilization >= float64(100-c.reserve5h) { + if c.state.fiveHourUtilization >= c.cap5h { return false } - if c.state.weeklyUtilization >= float64(100-c.reserveWeekly) { + if c.state.weeklyUtilization >= c.capWeekly { return false } return true @@ -633,7 +655,11 @@ func (c *defaultCredential) pollUsage(ctx context.Context) { c.state.hardRateLimited = false } if isFirstUpdate || int(c.state.fiveHourUtilization*100) != int(oldFiveHour*100) || int(c.state.weeklyUtilization*100) != int(oldWeekly*100) { - c.logger.Debug("poll usage for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%") + resetSuffix := "" + if !c.state.weeklyReset.IsZero() { + resetSuffix = ", resets=" + log.FormatDuration(time.Until(c.state.weeklyReset)) + } + c.logger.Debug("poll usage for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%", resetSuffix) } shouldInterrupt := c.checkTransitionLocked() c.stateMutex.Unlock() @@ -672,6 +698,14 @@ func (c *defaultCredential) fiveHourUtilization() float64 { return c.state.fiveHourUtilization } +func (c *defaultCredential) fiveHourCap() float64 { + return c.cap5h +} + +func (c *defaultCredential) weeklyCap() float64 { + return c.capWeekly +} + func (c *defaultCredential) usageTrackerOrNil() *AggregatedUsage { return c.usageTracker } @@ -736,10 +770,12 @@ type credentialProvider interface { // singleCredentialProvider wraps a single credential (legacy or single default). type singleCredentialProvider struct { - cred credential + cred credential + sessionAccess sync.RWMutex + sessions map[string]time.Time } -func (p *singleCredentialProvider) selectCredential(_ string, filter func(credential) bool) (credential, bool, error) { +func (p *singleCredentialProvider) selectCredential(sessionID string, filter func(credential) bool) (credential, bool, error) { if filter != nil && !filter(p.cred) { return nil, false, E.New("credential ", p.cred.tagName(), " is filtered out") } @@ -749,7 +785,20 @@ func (p *singleCredentialProvider) selectCredential(_ string, filter func(creden if !p.cred.isUsable() { return nil, false, E.New("credential ", p.cred.tagName(), " is rate-limited") } - return p.cred, false, nil + var isNew bool + if sessionID != "" { + p.sessionAccess.Lock() + if p.sessions == nil { + p.sessions = make(map[string]time.Time) + } + _, exists := p.sessions[sessionID] + if !exists { + p.sessions[sessionID] = time.Now() + isNew = true + } + p.sessionAccess.Unlock() + } + return p.cred, isNew, nil } func (p *singleCredentialProvider) onRateLimited(_ string, cred credential, resetAt time.Time, _ func(credential) bool) credential { @@ -758,6 +807,15 @@ func (p *singleCredentialProvider) onRateLimited(_ string, cred credential, rese } func (p *singleCredentialProvider) pollIfStale(ctx context.Context) { + now := time.Now() + p.sessionAccess.Lock() + for id, createdAt := range p.sessions { + if now.Sub(createdAt) > sessionExpiry { + delete(p.sessions, id) + } + } + p.sessionAccess.Unlock() + if time.Since(p.cred.lastUpdatedTime()) > p.cred.pollBackoff(defaultPollInterval) { p.cred.pollUsage(ctx) } @@ -861,7 +919,7 @@ func (p *balancerProvider) pickCredential(filter func(credential) bool) credenti func (p *balancerProvider) pickLeastUsed(filter func(credential) bool) credential { var best credential - bestUtilization := float64(101) + bestRemaining := float64(-1) for _, cred := range p.credentials { if filter != nil && !filter(cred) { continue @@ -869,9 +927,9 @@ func (p *balancerProvider) pickLeastUsed(filter func(credential) bool) credentia if !cred.isUsable() { continue } - utilization := cred.weeklyUtilization() - if utilization < bestUtilization { - bestUtilization = utilization + remaining := cred.weeklyCap() - cred.weeklyUtilization() + if remaining > bestRemaining { + bestRemaining = remaining best = cred } } @@ -1140,6 +1198,18 @@ func validateCCMOptions(options option.CCMServiceOptions) error { if cred.DefaultOptions.ReserveWeekly > 99 { return E.New("credential ", cred.Tag, ": reserve_weekly must be at most 99") } + if cred.DefaultOptions.Limit5h > 100 { + return E.New("credential ", cred.Tag, ": limit_5h must be at most 100") + } + if cred.DefaultOptions.LimitWeekly > 100 { + return E.New("credential ", cred.Tag, ": limit_weekly must be at most 100") + } + if cred.DefaultOptions.Reserve5h > 0 && cred.DefaultOptions.Limit5h > 0 { + return E.New("credential ", cred.Tag, ": reserve_5h and limit_5h are mutually exclusive") + } + if cred.DefaultOptions.ReserveWeekly > 0 && cred.DefaultOptions.LimitWeekly > 0 { + return E.New("credential ", cred.Tag, ": reserve_weekly and limit_weekly are mutually exclusive") + } } if cred.Type == "external" { if cred.ExternalOptions.Token == "" { diff --git a/service/ccm/service.go b/service/ccm/service.go index 2e7685e71..6f1500fb8 100644 --- a/service/ccm/service.go +++ b/service/ccm/service.go @@ -306,6 +306,15 @@ func isExtendedContextRequest(betaHeader string) bool { return false } +func isFastModeRequest(betaHeader string) bool { + for _, feature := range strings.Split(betaHeader, ",") { + if strings.HasPrefix(strings.TrimSpace(feature), "fast-mode") { + return true + } + } + return false +} + func detectContextWindow(betaHeader string, totalInputTokens int64) int { if totalInputTokens > premiumContextThreshold { if isExtendedContextRequest(betaHeader) { @@ -414,6 +423,14 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } + if isFastModeRequest(anthropicBetaHeader) { + if _, isSingle := provider.(*singleCredentialProvider); !isSingle { + writeJSONError(w, r, http.StatusBadRequest, "invalid_request_error", + "fast mode requests will consume Extra usage, please use a default credential directly") + return + } + } + var credentialFilter func(credential) bool if userConfig != nil && !userConfig.AllowExternalUsage { credentialFilter = func(c credential) bool { return !c.isExternal() } @@ -424,13 +441,23 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { writeNonRetryableCredentialError(w, r, unavailableCredentialMessage(provider, err.Error())) return } + var logParts []any if isNew { - if username != "" { - s.logger.Debug("assigned credential ", selectedCredential.tagName(), " for session ", sessionID, " by user ", username) - } else { - s.logger.Debug("assigned credential ", selectedCredential.tagName(), " for session ", sessionID) - } + logParts = append(logParts, "assigned credential ") + } else { + logParts = append(logParts, "credential ") } + logParts = append(logParts, selectedCredential.tagName()) + if sessionID != "" { + logParts = append(logParts, " for session ", sessionID) + } + if isNew && username != "" { + logParts = append(logParts, " by user ", username) + } + if requestModel != "" { + logParts = append(logParts, ", model=", requestModel) + } + s.logger.Debug(logParts...) if isExtendedContextRequest(anthropicBetaHeader) && selectedCredential.isExternal() { writeJSONError(w, r, http.StatusBadRequest, "invalid_request_error", @@ -771,8 +798,16 @@ func (s *Service) computeAggregatedUtilization(provider credentialProvider, user if !userConfig.AllowExternalUsage && cred.isExternal() { continue } - totalFiveHour += cred.fiveHourUtilization() - totalWeekly += cred.weeklyUtilization() + scaledFiveHour := cred.fiveHourUtilization() / cred.fiveHourCap() * 100 + if scaledFiveHour > 100 { + scaledFiveHour = 100 + } + scaledWeekly := cred.weeklyUtilization() / cred.weeklyCap() * 100 + if scaledWeekly > 100 { + scaledWeekly = 100 + } + totalFiveHour += scaledFiveHour + totalWeekly += scaledWeekly count++ } if count == 0 { diff --git a/service/ocm/credential_external.go b/service/ocm/credential_external.go index edc369edd..2864c684e 100644 --- a/service/ocm/credential_external.go +++ b/service/ocm/credential_external.go @@ -293,6 +293,14 @@ func (c *externalCredential) weeklyUtilization() float64 { return c.state.weeklyUtilization } +func (c *externalCredential) fiveHourCap() float64 { + return 100 +} + +func (c *externalCredential) weeklyCap() float64 { + return 100 +} + func (c *externalCredential) markRateLimited(resetAt time.Time) { c.logger.Warn("rate limited for ", c.tag, ", reset in ", log.FormatDuration(time.Until(resetAt))) c.stateMutex.Lock() @@ -434,7 +442,11 @@ func (c *externalCredential) updateStateFromHeaders(headers http.Header) { c.state.lastUpdated = time.Now() } if isFirstUpdate || int(c.state.fiveHourUtilization*100) != int(oldFiveHour*100) || int(c.state.weeklyUtilization*100) != int(oldWeekly*100) { - c.logger.Debug("usage update for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%") + resetSuffix := "" + if !c.state.weeklyReset.IsZero() { + resetSuffix = ", resets=" + log.FormatDuration(time.Until(c.state.weeklyReset)) + } + c.logger.Debug("usage update for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%", resetSuffix) } shouldInterrupt := c.checkTransitionLocked() c.stateMutex.Unlock() @@ -544,7 +556,11 @@ func (c *externalCredential) pollUsage(ctx context.Context) { c.state.hardRateLimited = false } if isFirstUpdate || int(c.state.fiveHourUtilization*100) != int(oldFiveHour*100) || int(c.state.weeklyUtilization*100) != int(oldWeekly*100) { - c.logger.Debug("poll usage for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%") + resetSuffix := "" + if !c.state.weeklyReset.IsZero() { + resetSuffix = ", resets=" + log.FormatDuration(time.Until(c.state.weeklyReset)) + } + c.logger.Debug("poll usage for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%", resetSuffix) } shouldInterrupt := c.checkTransitionLocked() c.stateMutex.Unlock() diff --git a/service/ocm/credential_state.go b/service/ocm/credential_state.go index 2de63b960..b3564d9b0 100644 --- a/service/ocm/credential_state.go +++ b/service/ocm/credential_state.go @@ -89,6 +89,8 @@ type defaultCredential struct { watcherAccess sync.Mutex reserve5h uint8 reserveWeekly uint8 + cap5h float64 + capWeekly float64 usageTracker *AggregatedUsage dialer N.Dialer httpClient *http.Client @@ -130,6 +132,8 @@ type credential interface { isExternal() bool fiveHourUtilization() float64 weeklyUtilization() float64 + fiveHourCap() float64 + weeklyCap() float64 markRateLimited(resetAt time.Time) earliestReset() time.Time unavailableError() error @@ -188,6 +192,18 @@ func newDefaultCredential(ctx context.Context, tag string, options option.OCMDef if reserveWeekly == 0 { reserveWeekly = 10 } + var cap5h float64 + if options.Limit5h > 0 { + cap5h = float64(options.Limit5h) + } else { + cap5h = float64(100 - reserve5h) + } + var capWeekly float64 + if options.LimitWeekly > 0 { + capWeekly = float64(options.LimitWeekly) + } else { + capWeekly = float64(100 - reserveWeekly) + } requestContext, cancelRequests := context.WithCancel(context.Background()) credential := &defaultCredential{ tag: tag, @@ -195,6 +211,8 @@ func newDefaultCredential(ctx context.Context, tag string, options option.OCMDef credentialPath: options.CredentialPath, reserve5h: reserve5h, reserveWeekly: reserveWeekly, + cap5h: cap5h, + capWeekly: capWeekly, dialer: credentialDialer, httpClient: httpClient, logger: logger, @@ -397,7 +415,11 @@ func (c *defaultCredential) updateStateFromHeaders(headers http.Header) { c.state.lastUpdated = time.Now() } if isFirstUpdate || int(c.state.fiveHourUtilization*100) != int(oldFiveHour*100) || int(c.state.weeklyUtilization*100) != int(oldWeekly*100) { - c.logger.Debug("usage update for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%") + resetSuffix := "" + if !c.state.weeklyReset.IsZero() { + resetSuffix = ", resets=" + log.FormatDuration(time.Until(c.state.weeklyReset)) + } + c.logger.Debug("usage update for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%", resetSuffix) } shouldInterrupt := c.checkTransitionLocked() c.stateMutex.Unlock() @@ -446,10 +468,10 @@ func (c *defaultCredential) isUsable() bool { } func (c *defaultCredential) checkReservesLocked() bool { - if c.state.fiveHourUtilization >= float64(100-c.reserve5h) { + if c.state.fiveHourUtilization >= c.cap5h { return false } - if c.state.weeklyUtilization >= float64(100-c.reserveWeekly) { + if c.state.weeklyUtilization >= c.capWeekly { return false } return true @@ -671,7 +693,11 @@ func (c *defaultCredential) pollUsage(ctx context.Context) { c.state.hardRateLimited = false } if isFirstUpdate || int(c.state.fiveHourUtilization*100) != int(oldFiveHour*100) || int(c.state.weeklyUtilization*100) != int(oldWeekly*100) { - c.logger.Debug("poll usage for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%") + resetSuffix := "" + if !c.state.weeklyReset.IsZero() { + resetSuffix = ", resets=" + log.FormatDuration(time.Until(c.state.weeklyReset)) + } + c.logger.Debug("poll usage for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%", resetSuffix) } shouldInterrupt := c.checkTransitionLocked() c.stateMutex.Unlock() @@ -714,6 +740,14 @@ func (c *defaultCredential) fiveHourUtilization() float64 { return c.state.fiveHourUtilization } +func (c *defaultCredential) fiveHourCap() float64 { + return c.cap5h +} + +func (c *defaultCredential) weeklyCap() float64 { + return c.capWeekly +} + func (c *defaultCredential) usageTrackerOrNil() *AggregatedUsage { return c.usageTracker } @@ -796,10 +830,12 @@ type credentialProvider interface { } type singleCredentialProvider struct { - cred credential + cred credential + sessionAccess sync.RWMutex + sessions map[string]time.Time } -func (p *singleCredentialProvider) selectCredential(_ string, filter func(credential) bool) (credential, bool, error) { +func (p *singleCredentialProvider) selectCredential(sessionID string, filter func(credential) bool) (credential, bool, error) { if filter != nil && !filter(p.cred) { return nil, false, E.New("credential ", p.cred.tagName(), " is filtered out") } @@ -809,7 +845,20 @@ func (p *singleCredentialProvider) selectCredential(_ string, filter func(creden if !p.cred.isUsable() { return nil, false, E.New("credential ", p.cred.tagName(), " is rate-limited") } - return p.cred, false, nil + var isNew bool + if sessionID != "" { + p.sessionAccess.Lock() + if p.sessions == nil { + p.sessions = make(map[string]time.Time) + } + _, exists := p.sessions[sessionID] + if !exists { + p.sessions[sessionID] = time.Now() + isNew = true + } + p.sessionAccess.Unlock() + } + return p.cred, isNew, nil } func (p *singleCredentialProvider) onRateLimited(_ string, cred credential, resetAt time.Time, _ func(credential) bool) credential { @@ -818,6 +867,15 @@ func (p *singleCredentialProvider) onRateLimited(_ string, cred credential, rese } func (p *singleCredentialProvider) pollIfStale(ctx context.Context) { + now := time.Now() + p.sessionAccess.Lock() + for id, createdAt := range p.sessions { + if now.Sub(createdAt) > sessionExpiry { + delete(p.sessions, id) + } + } + p.sessionAccess.Unlock() + if time.Since(p.cred.lastUpdatedTime()) > p.cred.pollBackoff(defaultPollInterval) { p.cred.pollUsage(ctx) } @@ -924,7 +982,7 @@ func (p *balancerProvider) pickCredential(filter func(credential) bool) credenti func (p *balancerProvider) pickLeastUsed(filter func(credential) bool) credential { var best credential - bestUtilization := float64(101) + bestRemaining := float64(-1) for _, cred := range p.credentials { if filter != nil && !filter(cred) { continue @@ -935,9 +993,9 @@ func (p *balancerProvider) pickLeastUsed(filter func(credential) bool) credentia if !cred.isUsable() { continue } - utilization := cred.weeklyUtilization() - if utilization < bestUtilization { - bestUtilization = utilization + remaining := cred.weeklyCap() - cred.weeklyUtilization() + if remaining > bestRemaining { + bestRemaining = remaining best = cred } } @@ -1207,6 +1265,18 @@ func validateOCMOptions(options option.OCMServiceOptions) error { if cred.DefaultOptions.ReserveWeekly > 99 { return E.New("credential ", cred.Tag, ": reserve_weekly must be at most 99") } + if cred.DefaultOptions.Limit5h > 100 { + return E.New("credential ", cred.Tag, ": limit_5h must be at most 100") + } + if cred.DefaultOptions.LimitWeekly > 100 { + return E.New("credential ", cred.Tag, ": limit_weekly must be at most 100") + } + if cred.DefaultOptions.Reserve5h > 0 && cred.DefaultOptions.Limit5h > 0 { + return E.New("credential ", cred.Tag, ": reserve_5h and limit_5h are mutually exclusive") + } + if cred.DefaultOptions.ReserveWeekly > 0 && cred.DefaultOptions.LimitWeekly > 0 { + return E.New("credential ", cred.Tag, ": reserve_weekly and limit_weekly are mutually exclusive") + } } if cred.Type == "external" { if cred.ExternalOptions.Token == "" { diff --git a/service/ocm/service.go b/service/ocm/service.go index 3868725ff..751d03f2b 100644 --- a/service/ocm/service.go +++ b/service/ocm/service.go @@ -435,13 +435,6 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { writeNonRetryableCredentialError(w, unavailableCredentialMessage(provider, err.Error())) return } - if isNew { - if username != "" { - s.logger.Debug("assigned credential ", selectedCredential.tagName(), " for session ", sessionID, " by user ", username) - } else { - s.logger.Debug("assigned credential ", selectedCredential.tagName(), " for session ", sessionID) - } - } if strings.EqualFold(r.Header.Get("Upgrade"), "websocket") && strings.HasPrefix(path, "/v1/responses") { s.handleWebSocket(w, r, path, username, sessionID, userConfig, provider, selectedCredential, credentialFilter) @@ -465,6 +458,7 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Read body for model extraction and retry buffer when JSON replay is useful. var bodyBytes []byte var requestModel string + var requestServiceTier string if r.Body != nil && (shouldTrackUsage || canRetryRequest) { mediaType, _, parseErr := mime.ParseMediaType(r.Header.Get("Content-Type")) isJSONRequest := parseErr == nil && (mediaType == "application/json" || strings.HasSuffix(mediaType, "+json")) @@ -476,15 +470,38 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } var request struct { - Model string `json:"model"` + Model string `json:"model"` + ServiceTier string `json:"service_tier"` } if json.Unmarshal(bodyBytes, &request) == nil { requestModel = request.Model + requestServiceTier = request.ServiceTier } r.Body = io.NopCloser(bytes.NewReader(bodyBytes)) } } + var logParts []any + if isNew { + logParts = append(logParts, "assigned credential ") + } else { + logParts = append(logParts, "credential ") + } + logParts = append(logParts, selectedCredential.tagName()) + if sessionID != "" { + logParts = append(logParts, " for session ", sessionID) + } + if isNew && username != "" { + logParts = append(logParts, " by user ", username) + } + if requestModel != "" { + logParts = append(logParts, ", model=", requestModel) + } + if requestServiceTier == "priority" { + logParts = append(logParts, ", fast") + } + s.logger.Debug(logParts...) + requestContext := selectedCredential.wrapRequestContext(r.Context()) defer func() { requestContext.cancelRequest() @@ -841,8 +858,16 @@ func (s *Service) computeAggregatedUtilization(provider credentialProvider, user if !userConfig.AllowExternalUsage && cred.isExternal() { continue } - totalFiveHour += cred.fiveHourUtilization() - totalWeekly += cred.weeklyUtilization() + scaledFiveHour := cred.fiveHourUtilization() / cred.fiveHourCap() * 100 + if scaledFiveHour > 100 { + scaledFiveHour = 100 + } + scaledWeekly := cred.weeklyUtilization() / cred.weeklyCap() * 100 + if scaledWeekly > 100 { + scaledWeekly = 100 + } + totalFiveHour += scaledFiveHour + totalWeekly += scaledWeekly count++ } if count == 0 {