ccm,ocm: fix session race, track fallback sessions, skip warmup logging

Fix data race in selectCredential where concurrent goroutines could
overwrite each other's session entries by adding compare-and-delete
and store-if-absent patterns with retry loop. Track sessions for
fallback strategy so isNew is reported correctly. Skip logging and
usage tracking for websocket warmup requests (generate: false).
This commit is contained in:
世界
2026-03-16 22:10:10 +08:00
parent 2dd093a32e
commit f3c3022094
3 changed files with 138 additions and 98 deletions

View File

@@ -130,63 +130,82 @@ func newBalancerProvider(credentials []Credential, strategy string, pollInterval
}
func (p *balancerProvider) selectCredential(sessionID string, selection credentialSelection) (Credential, bool, error) {
if p.strategy == C.BalancerStrategyFallback {
selectionScope := selection.scopeOrDefault()
for {
if p.strategy == C.BalancerStrategyFallback {
best := p.pickCredential(selection.filter)
if best == nil {
return nil, false, allCredentialsUnavailableError(p.credentials)
}
return best, p.storeSessionIfAbsent(sessionID, sessionEntry{createdAt: time.Now()}), nil
}
if sessionID != "" {
p.sessionAccess.RLock()
entry, exists := p.sessions[sessionID]
p.sessionAccess.RUnlock()
if exists {
if entry.selectionScope == selectionScope {
for _, credential := range p.credentials {
if credential.tagName() == entry.tag && selection.allows(credential) && credential.isUsable() {
if p.rebalanceThreshold > 0 && (p.strategy == "" || p.strategy == C.BalancerStrategyLeastUsed) {
better := p.pickLeastUsed(selection.filter)
if better != nil && better.tagName() != credential.tagName() {
effectiveThreshold := p.rebalanceThreshold / credential.planWeight()
delta := credential.weeklyUtilization() - better.weeklyUtilization()
if delta > effectiveThreshold {
p.logger.Info("rebalancing away from ", credential.tagName(),
": utilization delta ", delta, "% exceeds effective threshold ",
effectiveThreshold, "% (weight ", credential.planWeight(), ")")
p.rebalanceCredential(credential.tagName(), selectionScope)
break
}
}
}
return credential, false, nil
}
}
}
p.sessionAccess.Lock()
currentEntry, stillExists := p.sessions[sessionID]
if stillExists && currentEntry == entry {
delete(p.sessions, sessionID)
p.sessionAccess.Unlock()
} else {
p.sessionAccess.Unlock()
continue
}
}
}
best := p.pickCredential(selection.filter)
if best == nil {
return nil, false, allCredentialsUnavailableError(p.credentials)
}
return best, false, nil
}
selectionScope := selection.scopeOrDefault()
if sessionID != "" {
p.sessionAccess.RLock()
entry, exists := p.sessions[sessionID]
p.sessionAccess.RUnlock()
if exists {
if entry.selectionScope == selectionScope {
for _, credential := range p.credentials {
if credential.tagName() == entry.tag && selection.allows(credential) && credential.isUsable() {
if p.rebalanceThreshold > 0 && (p.strategy == "" || p.strategy == C.BalancerStrategyLeastUsed) {
better := p.pickLeastUsed(selection.filter)
if better != nil && better.tagName() != credential.tagName() {
effectiveThreshold := p.rebalanceThreshold / credential.planWeight()
delta := credential.weeklyUtilization() - better.weeklyUtilization()
if delta > effectiveThreshold {
p.logger.Info("rebalancing away from ", credential.tagName(),
": utilization delta ", delta, "% exceeds effective threshold ",
effectiveThreshold, "% (weight ", credential.planWeight(), ")")
p.rebalanceCredential(credential.tagName(), selectionScope)
break
}
}
}
return credential, false, nil
}
}
}
p.sessionAccess.Lock()
delete(p.sessions, sessionID)
p.sessionAccess.Unlock()
}
}
best := p.pickCredential(selection.filter)
if best == nil {
return nil, false, allCredentialsUnavailableError(p.credentials)
}
isNew := sessionID != ""
if isNew {
p.sessionAccess.Lock()
p.sessions[sessionID] = sessionEntry{
if p.storeSessionIfAbsent(sessionID, sessionEntry{
tag: best.tagName(),
selectionScope: selectionScope,
createdAt: time.Now(),
}) {
return best, true, nil
}
if sessionID == "" {
return best, false, nil
}
p.sessionAccess.Unlock()
}
return best, isNew, nil
}
func (p *balancerProvider) storeSessionIfAbsent(sessionID string, entry sessionEntry) bool {
if sessionID == "" {
return false
}
p.sessionAccess.Lock()
defer p.sessionAccess.Unlock()
if _, exists := p.sessions[sessionID]; exists {
return false
}
p.sessions[sessionID] = entry
return true
}
func (p *balancerProvider) rebalanceCredential(tag string, selectionScope credentialSelectionScope) {

View File

@@ -134,63 +134,82 @@ func newBalancerProvider(credentials []Credential, strategy string, pollInterval
}
func (p *balancerProvider) selectCredential(sessionID string, selection credentialSelection) (Credential, bool, error) {
if p.strategy == C.BalancerStrategyFallback {
selectionScope := selection.scopeOrDefault()
for {
if p.strategy == C.BalancerStrategyFallback {
best := p.pickCredential(selection.filter)
if best == nil {
return nil, false, allRateLimitedError(p.credentials)
}
return best, p.storeSessionIfAbsent(sessionID, sessionEntry{createdAt: time.Now()}), nil
}
if sessionID != "" {
p.sessionAccess.RLock()
entry, exists := p.sessions[sessionID]
p.sessionAccess.RUnlock()
if exists {
if entry.selectionScope == selectionScope {
for _, credential := range p.credentials {
if credential.tagName() == entry.tag && compositeCredentialSelectable(credential) && selection.allows(credential) && credential.isUsable() {
if p.rebalanceThreshold > 0 && (p.strategy == "" || p.strategy == C.BalancerStrategyLeastUsed) {
better := p.pickLeastUsed(selection.filter)
if better != nil && better.tagName() != credential.tagName() {
effectiveThreshold := p.rebalanceThreshold / credential.planWeight()
delta := credential.weeklyUtilization() - better.weeklyUtilization()
if delta > effectiveThreshold {
p.logger.Info("rebalancing away from ", credential.tagName(),
": utilization delta ", delta, "% exceeds effective threshold ",
effectiveThreshold, "% (weight ", credential.planWeight(), ")")
p.rebalanceCredential(credential.tagName(), selectionScope)
break
}
}
}
return credential, false, nil
}
}
}
p.sessionAccess.Lock()
currentEntry, stillExists := p.sessions[sessionID]
if stillExists && currentEntry == entry {
delete(p.sessions, sessionID)
p.sessionAccess.Unlock()
} else {
p.sessionAccess.Unlock()
continue
}
}
}
best := p.pickCredential(selection.filter)
if best == nil {
return nil, false, allRateLimitedError(p.credentials)
}
return best, false, nil
}
selectionScope := selection.scopeOrDefault()
if sessionID != "" {
p.sessionAccess.RLock()
entry, exists := p.sessions[sessionID]
p.sessionAccess.RUnlock()
if exists {
if entry.selectionScope == selectionScope {
for _, credential := range p.credentials {
if credential.tagName() == entry.tag && compositeCredentialSelectable(credential) && selection.allows(credential) && credential.isUsable() {
if p.rebalanceThreshold > 0 && (p.strategy == "" || p.strategy == C.BalancerStrategyLeastUsed) {
better := p.pickLeastUsed(selection.filter)
if better != nil && better.tagName() != credential.tagName() {
effectiveThreshold := p.rebalanceThreshold / credential.planWeight()
delta := credential.weeklyUtilization() - better.weeklyUtilization()
if delta > effectiveThreshold {
p.logger.Info("rebalancing away from ", credential.tagName(),
": utilization delta ", delta, "% exceeds effective threshold ",
effectiveThreshold, "% (weight ", credential.planWeight(), ")")
p.rebalanceCredential(credential.tagName(), selectionScope)
break
}
}
}
return credential, false, nil
}
}
}
p.sessionAccess.Lock()
delete(p.sessions, sessionID)
p.sessionAccess.Unlock()
}
}
best := p.pickCredential(selection.filter)
if best == nil {
return nil, false, allRateLimitedError(p.credentials)
}
isNew := sessionID != ""
if isNew {
p.sessionAccess.Lock()
p.sessions[sessionID] = sessionEntry{
if p.storeSessionIfAbsent(sessionID, sessionEntry{
tag: best.tagName(),
selectionScope: selectionScope,
createdAt: time.Now(),
}) {
return best, true, nil
}
if sessionID == "" {
return best, false, nil
}
p.sessionAccess.Unlock()
}
return best, isNew, nil
}
func (p *balancerProvider) storeSessionIfAbsent(sessionID string, entry sessionEntry) bool {
if sessionID == "" {
return false
}
p.sessionAccess.Lock()
defer p.sessionAccess.Unlock()
if _, exists := p.sessions[sessionID]; exists {
return false
}
p.sessions[sessionID] = entry
return true
}
func (p *balancerProvider) rebalanceCredential(tag string, selectionScope credentialSelectionScope) {

View File

@@ -325,9 +325,11 @@ func (s *Service) proxyWebSocketClientToUpstream(ctx context.Context, clientConn
Type string `json:"type"`
Model string `json:"model"`
ServiceTier string `json:"service_tier"`
Generate *bool `json:"generate"`
}
if json.Unmarshal(data, &request) == nil && request.Type == "response.create" && request.Model != "" {
if isNew && !logged {
isWarmup := request.Generate != nil && !*request.Generate
if !isWarmup && isNew && !logged {
logged = true
logParts := []any{"assigned credential ", selectedCredential.tagName()}
if sessionID != "" {
@@ -342,7 +344,7 @@ func (s *Service) proxyWebSocketClientToUpstream(ctx context.Context, clientConn
}
s.logger.DebugContext(ctx, logParts...)
}
if selectedCredential.usageTrackerOrNil() != nil {
if !isWarmup && selectedCredential.usageTrackerOrNil() != nil {
select {
case modelChannel <- request.Model:
default: