mirror of
https://github.com/SagerNet/sing-box.git
synced 2026-04-13 20:28:32 +10:00
ccm,ocm: add limit options and fix aggregated utilization scaling
Add limit_5h and limit_weekly options as alternatives to reserve_5h and reserve_weekly for capping credential utilization. The two are mutually exclusive per window. Fix computeAggregatedUtilization to scale per-credential utilization relative to each credential's cap before averaging, so external users see correct available capacity regardless of per-credential caps. Fix pickLeastUsed to compare remaining capacity (cap - utilization) instead of raw utilization, ensuring fair comparison across credentials with different caps.
This commit is contained in:
@@ -86,6 +86,8 @@ type CCMDefaultCredentialOptions struct {
|
||||
Detour string `json:"detour,omitempty"`
|
||||
Reserve5h uint8 `json:"reserve_5h"`
|
||||
ReserveWeekly uint8 `json:"reserve_weekly"`
|
||||
Limit5h uint8 `json:"limit_5h,omitempty"`
|
||||
LimitWeekly uint8 `json:"limit_weekly,omitempty"`
|
||||
}
|
||||
|
||||
type CCMBalancerCredentialOptions struct {
|
||||
|
||||
@@ -86,6 +86,8 @@ type OCMDefaultCredentialOptions struct {
|
||||
Detour string `json:"detour,omitempty"`
|
||||
Reserve5h uint8 `json:"reserve_5h"`
|
||||
ReserveWeekly uint8 `json:"reserve_weekly"`
|
||||
Limit5h uint8 `json:"limit_5h,omitempty"`
|
||||
LimitWeekly uint8 `json:"limit_weekly,omitempty"`
|
||||
}
|
||||
|
||||
type OCMBalancerCredentialOptions struct {
|
||||
|
||||
@@ -271,6 +271,14 @@ func (c *externalCredential) weeklyUtilization() float64 {
|
||||
return c.state.weeklyUtilization
|
||||
}
|
||||
|
||||
func (c *externalCredential) fiveHourCap() float64 {
|
||||
return 100
|
||||
}
|
||||
|
||||
func (c *externalCredential) weeklyCap() float64 {
|
||||
return 100
|
||||
}
|
||||
|
||||
func (c *externalCredential) markRateLimited(resetAt time.Time) {
|
||||
c.logger.Warn("rate limited for ", c.tag, ", reset in ", log.FormatDuration(time.Until(resetAt)))
|
||||
c.stateMutex.Lock()
|
||||
@@ -397,7 +405,11 @@ func (c *externalCredential) updateStateFromHeaders(headers http.Header) {
|
||||
c.state.lastUpdated = time.Now()
|
||||
}
|
||||
if isFirstUpdate || int(c.state.fiveHourUtilization*100) != int(oldFiveHour*100) || int(c.state.weeklyUtilization*100) != int(oldWeekly*100) {
|
||||
c.logger.Debug("usage update for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%")
|
||||
resetSuffix := ""
|
||||
if !c.state.weeklyReset.IsZero() {
|
||||
resetSuffix = ", resets=" + log.FormatDuration(time.Until(c.state.weeklyReset))
|
||||
}
|
||||
c.logger.Debug("usage update for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%", resetSuffix)
|
||||
}
|
||||
shouldInterrupt := c.checkTransitionLocked()
|
||||
c.stateMutex.Unlock()
|
||||
@@ -507,7 +519,11 @@ func (c *externalCredential) pollUsage(ctx context.Context) {
|
||||
c.state.hardRateLimited = false
|
||||
}
|
||||
if isFirstUpdate || int(c.state.fiveHourUtilization*100) != int(oldFiveHour*100) || int(c.state.weeklyUtilization*100) != int(oldWeekly*100) {
|
||||
c.logger.Debug("poll usage for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%")
|
||||
resetSuffix := ""
|
||||
if !c.state.weeklyReset.IsZero() {
|
||||
resetSuffix = ", resets=" + log.FormatDuration(time.Until(c.state.weeklyReset))
|
||||
}
|
||||
c.logger.Debug("poll usage for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%", resetSuffix)
|
||||
}
|
||||
shouldInterrupt := c.checkTransitionLocked()
|
||||
c.stateMutex.Unlock()
|
||||
|
||||
@@ -89,6 +89,8 @@ type defaultCredential struct {
|
||||
watcherAccess sync.Mutex
|
||||
reserve5h uint8
|
||||
reserveWeekly uint8
|
||||
cap5h float64
|
||||
capWeekly float64
|
||||
usageTracker *AggregatedUsage
|
||||
httpClient *http.Client
|
||||
logger log.ContextLogger
|
||||
@@ -129,6 +131,8 @@ type credential interface {
|
||||
isExternal() bool
|
||||
fiveHourUtilization() float64
|
||||
weeklyUtilization() float64
|
||||
fiveHourCap() float64
|
||||
weeklyCap() float64
|
||||
markRateLimited(resetAt time.Time)
|
||||
earliestReset() time.Time
|
||||
unavailableError() error
|
||||
@@ -180,6 +184,18 @@ func newDefaultCredential(ctx context.Context, tag string, options option.CCMDef
|
||||
if reserveWeekly == 0 {
|
||||
reserveWeekly = 10
|
||||
}
|
||||
var cap5h float64
|
||||
if options.Limit5h > 0 {
|
||||
cap5h = float64(options.Limit5h)
|
||||
} else {
|
||||
cap5h = float64(100 - reserve5h)
|
||||
}
|
||||
var capWeekly float64
|
||||
if options.LimitWeekly > 0 {
|
||||
capWeekly = float64(options.LimitWeekly)
|
||||
} else {
|
||||
capWeekly = float64(100 - reserveWeekly)
|
||||
}
|
||||
requestContext, cancelRequests := context.WithCancel(context.Background())
|
||||
credential := &defaultCredential{
|
||||
tag: tag,
|
||||
@@ -187,6 +203,8 @@ func newDefaultCredential(ctx context.Context, tag string, options option.CCMDef
|
||||
credentialPath: options.CredentialPath,
|
||||
reserve5h: reserve5h,
|
||||
reserveWeekly: reserveWeekly,
|
||||
cap5h: cap5h,
|
||||
capWeekly: capWeekly,
|
||||
httpClient: httpClient,
|
||||
logger: logger,
|
||||
requestContext: requestContext,
|
||||
@@ -380,7 +398,11 @@ func (c *defaultCredential) updateStateFromHeaders(headers http.Header) {
|
||||
c.state.lastUpdated = time.Now()
|
||||
}
|
||||
if isFirstUpdate || int(c.state.fiveHourUtilization*100) != int(oldFiveHour*100) || int(c.state.weeklyUtilization*100) != int(oldWeekly*100) {
|
||||
c.logger.Debug("usage update for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%")
|
||||
resetSuffix := ""
|
||||
if !c.state.weeklyReset.IsZero() {
|
||||
resetSuffix = ", resets=" + log.FormatDuration(time.Until(c.state.weeklyReset))
|
||||
}
|
||||
c.logger.Debug("usage update for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%", resetSuffix)
|
||||
}
|
||||
shouldInterrupt := c.checkTransitionLocked()
|
||||
c.stateMutex.Unlock()
|
||||
@@ -429,10 +451,10 @@ func (c *defaultCredential) isUsable() bool {
|
||||
}
|
||||
|
||||
func (c *defaultCredential) checkReservesLocked() bool {
|
||||
if c.state.fiveHourUtilization >= float64(100-c.reserve5h) {
|
||||
if c.state.fiveHourUtilization >= c.cap5h {
|
||||
return false
|
||||
}
|
||||
if c.state.weeklyUtilization >= float64(100-c.reserveWeekly) {
|
||||
if c.state.weeklyUtilization >= c.capWeekly {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
@@ -633,7 +655,11 @@ func (c *defaultCredential) pollUsage(ctx context.Context) {
|
||||
c.state.hardRateLimited = false
|
||||
}
|
||||
if isFirstUpdate || int(c.state.fiveHourUtilization*100) != int(oldFiveHour*100) || int(c.state.weeklyUtilization*100) != int(oldWeekly*100) {
|
||||
c.logger.Debug("poll usage for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%")
|
||||
resetSuffix := ""
|
||||
if !c.state.weeklyReset.IsZero() {
|
||||
resetSuffix = ", resets=" + log.FormatDuration(time.Until(c.state.weeklyReset))
|
||||
}
|
||||
c.logger.Debug("poll usage for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%", resetSuffix)
|
||||
}
|
||||
shouldInterrupt := c.checkTransitionLocked()
|
||||
c.stateMutex.Unlock()
|
||||
@@ -672,6 +698,14 @@ func (c *defaultCredential) fiveHourUtilization() float64 {
|
||||
return c.state.fiveHourUtilization
|
||||
}
|
||||
|
||||
func (c *defaultCredential) fiveHourCap() float64 {
|
||||
return c.cap5h
|
||||
}
|
||||
|
||||
func (c *defaultCredential) weeklyCap() float64 {
|
||||
return c.capWeekly
|
||||
}
|
||||
|
||||
func (c *defaultCredential) usageTrackerOrNil() *AggregatedUsage {
|
||||
return c.usageTracker
|
||||
}
|
||||
@@ -736,10 +770,12 @@ type credentialProvider interface {
|
||||
|
||||
// singleCredentialProvider wraps a single credential (legacy or single default).
|
||||
type singleCredentialProvider struct {
|
||||
cred credential
|
||||
cred credential
|
||||
sessionAccess sync.RWMutex
|
||||
sessions map[string]time.Time
|
||||
}
|
||||
|
||||
func (p *singleCredentialProvider) selectCredential(_ string, filter func(credential) bool) (credential, bool, error) {
|
||||
func (p *singleCredentialProvider) selectCredential(sessionID string, filter func(credential) bool) (credential, bool, error) {
|
||||
if filter != nil && !filter(p.cred) {
|
||||
return nil, false, E.New("credential ", p.cred.tagName(), " is filtered out")
|
||||
}
|
||||
@@ -749,7 +785,20 @@ func (p *singleCredentialProvider) selectCredential(_ string, filter func(creden
|
||||
if !p.cred.isUsable() {
|
||||
return nil, false, E.New("credential ", p.cred.tagName(), " is rate-limited")
|
||||
}
|
||||
return p.cred, false, nil
|
||||
var isNew bool
|
||||
if sessionID != "" {
|
||||
p.sessionAccess.Lock()
|
||||
if p.sessions == nil {
|
||||
p.sessions = make(map[string]time.Time)
|
||||
}
|
||||
_, exists := p.sessions[sessionID]
|
||||
if !exists {
|
||||
p.sessions[sessionID] = time.Now()
|
||||
isNew = true
|
||||
}
|
||||
p.sessionAccess.Unlock()
|
||||
}
|
||||
return p.cred, isNew, nil
|
||||
}
|
||||
|
||||
func (p *singleCredentialProvider) onRateLimited(_ string, cred credential, resetAt time.Time, _ func(credential) bool) credential {
|
||||
@@ -758,6 +807,15 @@ func (p *singleCredentialProvider) onRateLimited(_ string, cred credential, rese
|
||||
}
|
||||
|
||||
func (p *singleCredentialProvider) pollIfStale(ctx context.Context) {
|
||||
now := time.Now()
|
||||
p.sessionAccess.Lock()
|
||||
for id, createdAt := range p.sessions {
|
||||
if now.Sub(createdAt) > sessionExpiry {
|
||||
delete(p.sessions, id)
|
||||
}
|
||||
}
|
||||
p.sessionAccess.Unlock()
|
||||
|
||||
if time.Since(p.cred.lastUpdatedTime()) > p.cred.pollBackoff(defaultPollInterval) {
|
||||
p.cred.pollUsage(ctx)
|
||||
}
|
||||
@@ -861,7 +919,7 @@ func (p *balancerProvider) pickCredential(filter func(credential) bool) credenti
|
||||
|
||||
func (p *balancerProvider) pickLeastUsed(filter func(credential) bool) credential {
|
||||
var best credential
|
||||
bestUtilization := float64(101)
|
||||
bestRemaining := float64(-1)
|
||||
for _, cred := range p.credentials {
|
||||
if filter != nil && !filter(cred) {
|
||||
continue
|
||||
@@ -869,9 +927,9 @@ func (p *balancerProvider) pickLeastUsed(filter func(credential) bool) credentia
|
||||
if !cred.isUsable() {
|
||||
continue
|
||||
}
|
||||
utilization := cred.weeklyUtilization()
|
||||
if utilization < bestUtilization {
|
||||
bestUtilization = utilization
|
||||
remaining := cred.weeklyCap() - cred.weeklyUtilization()
|
||||
if remaining > bestRemaining {
|
||||
bestRemaining = remaining
|
||||
best = cred
|
||||
}
|
||||
}
|
||||
@@ -1140,6 +1198,18 @@ func validateCCMOptions(options option.CCMServiceOptions) error {
|
||||
if cred.DefaultOptions.ReserveWeekly > 99 {
|
||||
return E.New("credential ", cred.Tag, ": reserve_weekly must be at most 99")
|
||||
}
|
||||
if cred.DefaultOptions.Limit5h > 100 {
|
||||
return E.New("credential ", cred.Tag, ": limit_5h must be at most 100")
|
||||
}
|
||||
if cred.DefaultOptions.LimitWeekly > 100 {
|
||||
return E.New("credential ", cred.Tag, ": limit_weekly must be at most 100")
|
||||
}
|
||||
if cred.DefaultOptions.Reserve5h > 0 && cred.DefaultOptions.Limit5h > 0 {
|
||||
return E.New("credential ", cred.Tag, ": reserve_5h and limit_5h are mutually exclusive")
|
||||
}
|
||||
if cred.DefaultOptions.ReserveWeekly > 0 && cred.DefaultOptions.LimitWeekly > 0 {
|
||||
return E.New("credential ", cred.Tag, ": reserve_weekly and limit_weekly are mutually exclusive")
|
||||
}
|
||||
}
|
||||
if cred.Type == "external" {
|
||||
if cred.ExternalOptions.Token == "" {
|
||||
|
||||
@@ -306,6 +306,15 @@ func isExtendedContextRequest(betaHeader string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func isFastModeRequest(betaHeader string) bool {
|
||||
for _, feature := range strings.Split(betaHeader, ",") {
|
||||
if strings.HasPrefix(strings.TrimSpace(feature), "fast-mode") {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func detectContextWindow(betaHeader string, totalInputTokens int64) int {
|
||||
if totalInputTokens > premiumContextThreshold {
|
||||
if isExtendedContextRequest(betaHeader) {
|
||||
@@ -414,6 +423,14 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
if isFastModeRequest(anthropicBetaHeader) {
|
||||
if _, isSingle := provider.(*singleCredentialProvider); !isSingle {
|
||||
writeJSONError(w, r, http.StatusBadRequest, "invalid_request_error",
|
||||
"fast mode requests will consume Extra usage, please use a default credential directly")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
var credentialFilter func(credential) bool
|
||||
if userConfig != nil && !userConfig.AllowExternalUsage {
|
||||
credentialFilter = func(c credential) bool { return !c.isExternal() }
|
||||
@@ -424,13 +441,23 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
writeNonRetryableCredentialError(w, r, unavailableCredentialMessage(provider, err.Error()))
|
||||
return
|
||||
}
|
||||
var logParts []any
|
||||
if isNew {
|
||||
if username != "" {
|
||||
s.logger.Debug("assigned credential ", selectedCredential.tagName(), " for session ", sessionID, " by user ", username)
|
||||
} else {
|
||||
s.logger.Debug("assigned credential ", selectedCredential.tagName(), " for session ", sessionID)
|
||||
}
|
||||
logParts = append(logParts, "assigned credential ")
|
||||
} else {
|
||||
logParts = append(logParts, "credential ")
|
||||
}
|
||||
logParts = append(logParts, selectedCredential.tagName())
|
||||
if sessionID != "" {
|
||||
logParts = append(logParts, " for session ", sessionID)
|
||||
}
|
||||
if isNew && username != "" {
|
||||
logParts = append(logParts, " by user ", username)
|
||||
}
|
||||
if requestModel != "" {
|
||||
logParts = append(logParts, ", model=", requestModel)
|
||||
}
|
||||
s.logger.Debug(logParts...)
|
||||
|
||||
if isExtendedContextRequest(anthropicBetaHeader) && selectedCredential.isExternal() {
|
||||
writeJSONError(w, r, http.StatusBadRequest, "invalid_request_error",
|
||||
@@ -771,8 +798,16 @@ func (s *Service) computeAggregatedUtilization(provider credentialProvider, user
|
||||
if !userConfig.AllowExternalUsage && cred.isExternal() {
|
||||
continue
|
||||
}
|
||||
totalFiveHour += cred.fiveHourUtilization()
|
||||
totalWeekly += cred.weeklyUtilization()
|
||||
scaledFiveHour := cred.fiveHourUtilization() / cred.fiveHourCap() * 100
|
||||
if scaledFiveHour > 100 {
|
||||
scaledFiveHour = 100
|
||||
}
|
||||
scaledWeekly := cred.weeklyUtilization() / cred.weeklyCap() * 100
|
||||
if scaledWeekly > 100 {
|
||||
scaledWeekly = 100
|
||||
}
|
||||
totalFiveHour += scaledFiveHour
|
||||
totalWeekly += scaledWeekly
|
||||
count++
|
||||
}
|
||||
if count == 0 {
|
||||
|
||||
@@ -293,6 +293,14 @@ func (c *externalCredential) weeklyUtilization() float64 {
|
||||
return c.state.weeklyUtilization
|
||||
}
|
||||
|
||||
func (c *externalCredential) fiveHourCap() float64 {
|
||||
return 100
|
||||
}
|
||||
|
||||
func (c *externalCredential) weeklyCap() float64 {
|
||||
return 100
|
||||
}
|
||||
|
||||
func (c *externalCredential) markRateLimited(resetAt time.Time) {
|
||||
c.logger.Warn("rate limited for ", c.tag, ", reset in ", log.FormatDuration(time.Until(resetAt)))
|
||||
c.stateMutex.Lock()
|
||||
@@ -434,7 +442,11 @@ func (c *externalCredential) updateStateFromHeaders(headers http.Header) {
|
||||
c.state.lastUpdated = time.Now()
|
||||
}
|
||||
if isFirstUpdate || int(c.state.fiveHourUtilization*100) != int(oldFiveHour*100) || int(c.state.weeklyUtilization*100) != int(oldWeekly*100) {
|
||||
c.logger.Debug("usage update for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%")
|
||||
resetSuffix := ""
|
||||
if !c.state.weeklyReset.IsZero() {
|
||||
resetSuffix = ", resets=" + log.FormatDuration(time.Until(c.state.weeklyReset))
|
||||
}
|
||||
c.logger.Debug("usage update for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%", resetSuffix)
|
||||
}
|
||||
shouldInterrupt := c.checkTransitionLocked()
|
||||
c.stateMutex.Unlock()
|
||||
@@ -544,7 +556,11 @@ func (c *externalCredential) pollUsage(ctx context.Context) {
|
||||
c.state.hardRateLimited = false
|
||||
}
|
||||
if isFirstUpdate || int(c.state.fiveHourUtilization*100) != int(oldFiveHour*100) || int(c.state.weeklyUtilization*100) != int(oldWeekly*100) {
|
||||
c.logger.Debug("poll usage for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%")
|
||||
resetSuffix := ""
|
||||
if !c.state.weeklyReset.IsZero() {
|
||||
resetSuffix = ", resets=" + log.FormatDuration(time.Until(c.state.weeklyReset))
|
||||
}
|
||||
c.logger.Debug("poll usage for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%", resetSuffix)
|
||||
}
|
||||
shouldInterrupt := c.checkTransitionLocked()
|
||||
c.stateMutex.Unlock()
|
||||
|
||||
@@ -89,6 +89,8 @@ type defaultCredential struct {
|
||||
watcherAccess sync.Mutex
|
||||
reserve5h uint8
|
||||
reserveWeekly uint8
|
||||
cap5h float64
|
||||
capWeekly float64
|
||||
usageTracker *AggregatedUsage
|
||||
dialer N.Dialer
|
||||
httpClient *http.Client
|
||||
@@ -130,6 +132,8 @@ type credential interface {
|
||||
isExternal() bool
|
||||
fiveHourUtilization() float64
|
||||
weeklyUtilization() float64
|
||||
fiveHourCap() float64
|
||||
weeklyCap() float64
|
||||
markRateLimited(resetAt time.Time)
|
||||
earliestReset() time.Time
|
||||
unavailableError() error
|
||||
@@ -188,6 +192,18 @@ func newDefaultCredential(ctx context.Context, tag string, options option.OCMDef
|
||||
if reserveWeekly == 0 {
|
||||
reserveWeekly = 10
|
||||
}
|
||||
var cap5h float64
|
||||
if options.Limit5h > 0 {
|
||||
cap5h = float64(options.Limit5h)
|
||||
} else {
|
||||
cap5h = float64(100 - reserve5h)
|
||||
}
|
||||
var capWeekly float64
|
||||
if options.LimitWeekly > 0 {
|
||||
capWeekly = float64(options.LimitWeekly)
|
||||
} else {
|
||||
capWeekly = float64(100 - reserveWeekly)
|
||||
}
|
||||
requestContext, cancelRequests := context.WithCancel(context.Background())
|
||||
credential := &defaultCredential{
|
||||
tag: tag,
|
||||
@@ -195,6 +211,8 @@ func newDefaultCredential(ctx context.Context, tag string, options option.OCMDef
|
||||
credentialPath: options.CredentialPath,
|
||||
reserve5h: reserve5h,
|
||||
reserveWeekly: reserveWeekly,
|
||||
cap5h: cap5h,
|
||||
capWeekly: capWeekly,
|
||||
dialer: credentialDialer,
|
||||
httpClient: httpClient,
|
||||
logger: logger,
|
||||
@@ -397,7 +415,11 @@ func (c *defaultCredential) updateStateFromHeaders(headers http.Header) {
|
||||
c.state.lastUpdated = time.Now()
|
||||
}
|
||||
if isFirstUpdate || int(c.state.fiveHourUtilization*100) != int(oldFiveHour*100) || int(c.state.weeklyUtilization*100) != int(oldWeekly*100) {
|
||||
c.logger.Debug("usage update for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%")
|
||||
resetSuffix := ""
|
||||
if !c.state.weeklyReset.IsZero() {
|
||||
resetSuffix = ", resets=" + log.FormatDuration(time.Until(c.state.weeklyReset))
|
||||
}
|
||||
c.logger.Debug("usage update for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%", resetSuffix)
|
||||
}
|
||||
shouldInterrupt := c.checkTransitionLocked()
|
||||
c.stateMutex.Unlock()
|
||||
@@ -446,10 +468,10 @@ func (c *defaultCredential) isUsable() bool {
|
||||
}
|
||||
|
||||
func (c *defaultCredential) checkReservesLocked() bool {
|
||||
if c.state.fiveHourUtilization >= float64(100-c.reserve5h) {
|
||||
if c.state.fiveHourUtilization >= c.cap5h {
|
||||
return false
|
||||
}
|
||||
if c.state.weeklyUtilization >= float64(100-c.reserveWeekly) {
|
||||
if c.state.weeklyUtilization >= c.capWeekly {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
@@ -671,7 +693,11 @@ func (c *defaultCredential) pollUsage(ctx context.Context) {
|
||||
c.state.hardRateLimited = false
|
||||
}
|
||||
if isFirstUpdate || int(c.state.fiveHourUtilization*100) != int(oldFiveHour*100) || int(c.state.weeklyUtilization*100) != int(oldWeekly*100) {
|
||||
c.logger.Debug("poll usage for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%")
|
||||
resetSuffix := ""
|
||||
if !c.state.weeklyReset.IsZero() {
|
||||
resetSuffix = ", resets=" + log.FormatDuration(time.Until(c.state.weeklyReset))
|
||||
}
|
||||
c.logger.Debug("poll usage for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%", resetSuffix)
|
||||
}
|
||||
shouldInterrupt := c.checkTransitionLocked()
|
||||
c.stateMutex.Unlock()
|
||||
@@ -714,6 +740,14 @@ func (c *defaultCredential) fiveHourUtilization() float64 {
|
||||
return c.state.fiveHourUtilization
|
||||
}
|
||||
|
||||
func (c *defaultCredential) fiveHourCap() float64 {
|
||||
return c.cap5h
|
||||
}
|
||||
|
||||
func (c *defaultCredential) weeklyCap() float64 {
|
||||
return c.capWeekly
|
||||
}
|
||||
|
||||
func (c *defaultCredential) usageTrackerOrNil() *AggregatedUsage {
|
||||
return c.usageTracker
|
||||
}
|
||||
@@ -796,10 +830,12 @@ type credentialProvider interface {
|
||||
}
|
||||
|
||||
type singleCredentialProvider struct {
|
||||
cred credential
|
||||
cred credential
|
||||
sessionAccess sync.RWMutex
|
||||
sessions map[string]time.Time
|
||||
}
|
||||
|
||||
func (p *singleCredentialProvider) selectCredential(_ string, filter func(credential) bool) (credential, bool, error) {
|
||||
func (p *singleCredentialProvider) selectCredential(sessionID string, filter func(credential) bool) (credential, bool, error) {
|
||||
if filter != nil && !filter(p.cred) {
|
||||
return nil, false, E.New("credential ", p.cred.tagName(), " is filtered out")
|
||||
}
|
||||
@@ -809,7 +845,20 @@ func (p *singleCredentialProvider) selectCredential(_ string, filter func(creden
|
||||
if !p.cred.isUsable() {
|
||||
return nil, false, E.New("credential ", p.cred.tagName(), " is rate-limited")
|
||||
}
|
||||
return p.cred, false, nil
|
||||
var isNew bool
|
||||
if sessionID != "" {
|
||||
p.sessionAccess.Lock()
|
||||
if p.sessions == nil {
|
||||
p.sessions = make(map[string]time.Time)
|
||||
}
|
||||
_, exists := p.sessions[sessionID]
|
||||
if !exists {
|
||||
p.sessions[sessionID] = time.Now()
|
||||
isNew = true
|
||||
}
|
||||
p.sessionAccess.Unlock()
|
||||
}
|
||||
return p.cred, isNew, nil
|
||||
}
|
||||
|
||||
func (p *singleCredentialProvider) onRateLimited(_ string, cred credential, resetAt time.Time, _ func(credential) bool) credential {
|
||||
@@ -818,6 +867,15 @@ func (p *singleCredentialProvider) onRateLimited(_ string, cred credential, rese
|
||||
}
|
||||
|
||||
func (p *singleCredentialProvider) pollIfStale(ctx context.Context) {
|
||||
now := time.Now()
|
||||
p.sessionAccess.Lock()
|
||||
for id, createdAt := range p.sessions {
|
||||
if now.Sub(createdAt) > sessionExpiry {
|
||||
delete(p.sessions, id)
|
||||
}
|
||||
}
|
||||
p.sessionAccess.Unlock()
|
||||
|
||||
if time.Since(p.cred.lastUpdatedTime()) > p.cred.pollBackoff(defaultPollInterval) {
|
||||
p.cred.pollUsage(ctx)
|
||||
}
|
||||
@@ -924,7 +982,7 @@ func (p *balancerProvider) pickCredential(filter func(credential) bool) credenti
|
||||
|
||||
func (p *balancerProvider) pickLeastUsed(filter func(credential) bool) credential {
|
||||
var best credential
|
||||
bestUtilization := float64(101)
|
||||
bestRemaining := float64(-1)
|
||||
for _, cred := range p.credentials {
|
||||
if filter != nil && !filter(cred) {
|
||||
continue
|
||||
@@ -935,9 +993,9 @@ func (p *balancerProvider) pickLeastUsed(filter func(credential) bool) credentia
|
||||
if !cred.isUsable() {
|
||||
continue
|
||||
}
|
||||
utilization := cred.weeklyUtilization()
|
||||
if utilization < bestUtilization {
|
||||
bestUtilization = utilization
|
||||
remaining := cred.weeklyCap() - cred.weeklyUtilization()
|
||||
if remaining > bestRemaining {
|
||||
bestRemaining = remaining
|
||||
best = cred
|
||||
}
|
||||
}
|
||||
@@ -1207,6 +1265,18 @@ func validateOCMOptions(options option.OCMServiceOptions) error {
|
||||
if cred.DefaultOptions.ReserveWeekly > 99 {
|
||||
return E.New("credential ", cred.Tag, ": reserve_weekly must be at most 99")
|
||||
}
|
||||
if cred.DefaultOptions.Limit5h > 100 {
|
||||
return E.New("credential ", cred.Tag, ": limit_5h must be at most 100")
|
||||
}
|
||||
if cred.DefaultOptions.LimitWeekly > 100 {
|
||||
return E.New("credential ", cred.Tag, ": limit_weekly must be at most 100")
|
||||
}
|
||||
if cred.DefaultOptions.Reserve5h > 0 && cred.DefaultOptions.Limit5h > 0 {
|
||||
return E.New("credential ", cred.Tag, ": reserve_5h and limit_5h are mutually exclusive")
|
||||
}
|
||||
if cred.DefaultOptions.ReserveWeekly > 0 && cred.DefaultOptions.LimitWeekly > 0 {
|
||||
return E.New("credential ", cred.Tag, ": reserve_weekly and limit_weekly are mutually exclusive")
|
||||
}
|
||||
}
|
||||
if cred.Type == "external" {
|
||||
if cred.ExternalOptions.Token == "" {
|
||||
|
||||
@@ -435,13 +435,6 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
writeNonRetryableCredentialError(w, unavailableCredentialMessage(provider, err.Error()))
|
||||
return
|
||||
}
|
||||
if isNew {
|
||||
if username != "" {
|
||||
s.logger.Debug("assigned credential ", selectedCredential.tagName(), " for session ", sessionID, " by user ", username)
|
||||
} else {
|
||||
s.logger.Debug("assigned credential ", selectedCredential.tagName(), " for session ", sessionID)
|
||||
}
|
||||
}
|
||||
|
||||
if strings.EqualFold(r.Header.Get("Upgrade"), "websocket") && strings.HasPrefix(path, "/v1/responses") {
|
||||
s.handleWebSocket(w, r, path, username, sessionID, userConfig, provider, selectedCredential, credentialFilter)
|
||||
@@ -465,6 +458,7 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
// Read body for model extraction and retry buffer when JSON replay is useful.
|
||||
var bodyBytes []byte
|
||||
var requestModel string
|
||||
var requestServiceTier string
|
||||
if r.Body != nil && (shouldTrackUsage || canRetryRequest) {
|
||||
mediaType, _, parseErr := mime.ParseMediaType(r.Header.Get("Content-Type"))
|
||||
isJSONRequest := parseErr == nil && (mediaType == "application/json" || strings.HasSuffix(mediaType, "+json"))
|
||||
@@ -476,15 +470,38 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
var request struct {
|
||||
Model string `json:"model"`
|
||||
Model string `json:"model"`
|
||||
ServiceTier string `json:"service_tier"`
|
||||
}
|
||||
if json.Unmarshal(bodyBytes, &request) == nil {
|
||||
requestModel = request.Model
|
||||
requestServiceTier = request.ServiceTier
|
||||
}
|
||||
r.Body = io.NopCloser(bytes.NewReader(bodyBytes))
|
||||
}
|
||||
}
|
||||
|
||||
var logParts []any
|
||||
if isNew {
|
||||
logParts = append(logParts, "assigned credential ")
|
||||
} else {
|
||||
logParts = append(logParts, "credential ")
|
||||
}
|
||||
logParts = append(logParts, selectedCredential.tagName())
|
||||
if sessionID != "" {
|
||||
logParts = append(logParts, " for session ", sessionID)
|
||||
}
|
||||
if isNew && username != "" {
|
||||
logParts = append(logParts, " by user ", username)
|
||||
}
|
||||
if requestModel != "" {
|
||||
logParts = append(logParts, ", model=", requestModel)
|
||||
}
|
||||
if requestServiceTier == "priority" {
|
||||
logParts = append(logParts, ", fast")
|
||||
}
|
||||
s.logger.Debug(logParts...)
|
||||
|
||||
requestContext := selectedCredential.wrapRequestContext(r.Context())
|
||||
defer func() {
|
||||
requestContext.cancelRequest()
|
||||
@@ -841,8 +858,16 @@ func (s *Service) computeAggregatedUtilization(provider credentialProvider, user
|
||||
if !userConfig.AllowExternalUsage && cred.isExternal() {
|
||||
continue
|
||||
}
|
||||
totalFiveHour += cred.fiveHourUtilization()
|
||||
totalWeekly += cred.weeklyUtilization()
|
||||
scaledFiveHour := cred.fiveHourUtilization() / cred.fiveHourCap() * 100
|
||||
if scaledFiveHour > 100 {
|
||||
scaledFiveHour = 100
|
||||
}
|
||||
scaledWeekly := cred.weeklyUtilization() / cred.weeklyCap() * 100
|
||||
if scaledWeekly > 100 {
|
||||
scaledWeekly = 100
|
||||
}
|
||||
totalFiveHour += scaledFiveHour
|
||||
totalWeekly += scaledWeekly
|
||||
count++
|
||||
}
|
||||
if count == 0 {
|
||||
|
||||
Reference in New Issue
Block a user