ccm,ocm: remove dead code, fix timer leaks, eliminate redundant lookups
- Remove unused onBecameUnusable field from CCM credential structs (OCM wires it for WebSocket interruption; CCM has no equivalent) - Replace time.After with time.NewTimer in doHTTPWithRetry and connectorLoop to avoid timer leaks on context cancellation - Pass already-resolved provider to rewriteResponseHeadersForExternalUser instead of re-resolving via credentialForUser - Hoist reverseYamuxConfig to package-level var (immutable, no need to allocate on every call)
This commit is contained in:
@@ -26,10 +26,12 @@ func doHTTPWithRetry(ctx context.Context, client *http.Client, buildRequest func
|
||||
for attempt := range httpRetryMaxAttempts {
|
||||
if attempt > 0 {
|
||||
delay := httpRetryInitialDelay * time.Duration(1<<(attempt-1))
|
||||
timer := time.NewTimer(delay)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
timer.Stop()
|
||||
return nil, lastError
|
||||
case <-time.After(delay):
|
||||
case <-timer.C:
|
||||
}
|
||||
}
|
||||
request, err := buildRequest()
|
||||
|
||||
@@ -44,8 +44,7 @@ type defaultCredential struct {
|
||||
watcherRetryAt time.Time
|
||||
|
||||
// Connection interruption
|
||||
onBecameUnusable func()
|
||||
interrupted bool
|
||||
interrupted bool
|
||||
requestContext context.Context
|
||||
cancelRequests context.CancelFunc
|
||||
requestAccess sync.Mutex
|
||||
@@ -353,9 +352,6 @@ func (c *defaultCredential) interruptConnections() {
|
||||
c.cancelRequests()
|
||||
c.requestContext, c.cancelRequests = context.WithCancel(context.Background())
|
||||
c.requestAccess.Unlock()
|
||||
if c.onBecameUnusable != nil {
|
||||
c.onBecameUnusable()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *defaultCredential) wrapRequestContext(parent context.Context) *credentialRequestContext {
|
||||
|
||||
@@ -40,8 +40,7 @@ type externalCredential struct {
|
||||
usageTracker *AggregatedUsage
|
||||
logger log.ContextLogger
|
||||
|
||||
onBecameUnusable func()
|
||||
interrupted bool
|
||||
interrupted bool
|
||||
requestContext context.Context
|
||||
cancelRequests context.CancelFunc
|
||||
requestAccess sync.Mutex
|
||||
@@ -494,9 +493,6 @@ func (c *externalCredential) interruptConnections() {
|
||||
c.cancelRequests()
|
||||
c.requestContext, c.cancelRequests = context.WithCancel(context.Background())
|
||||
c.requestAccess.Unlock()
|
||||
if c.onBecameUnusable != nil {
|
||||
c.onBecameUnusable()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *externalCredential) doPollUsageRequest(ctx context.Context) (*http.Response, error) {
|
||||
|
||||
@@ -17,14 +17,14 @@ import (
|
||||
"github.com/hashicorp/yamux"
|
||||
)
|
||||
|
||||
func reverseYamuxConfig() *yamux.Config {
|
||||
var defaultYamuxConfig = func() *yamux.Config {
|
||||
config := yamux.DefaultConfig()
|
||||
config.KeepAliveInterval = 15 * time.Second
|
||||
config.ConnectionWriteTimeout = 10 * time.Second
|
||||
config.MaxStreamWindowSize = 512 * 1024
|
||||
config.LogOutput = io.Discard
|
||||
return config
|
||||
}
|
||||
}()
|
||||
|
||||
type bufferedConn struct {
|
||||
reader *bufio.Reader
|
||||
@@ -108,7 +108,7 @@ func (s *Service) handleReverseConnect(ctx context.Context, w http.ResponseWrite
|
||||
return
|
||||
}
|
||||
|
||||
session, err := yamux.Client(conn, reverseYamuxConfig())
|
||||
session, err := yamux.Client(conn, defaultYamuxConfig)
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
s.logger.ErrorContext(ctx, "reverse connect: create yamux client for ", receiverCredential.tagName(), ": ", err)
|
||||
@@ -161,9 +161,11 @@ func (c *externalCredential) connectorLoop() {
|
||||
consecutiveFailures++
|
||||
backoff := connectorBackoff(consecutiveFailures)
|
||||
c.logger.Warn("reverse connection for ", c.tag, " lost: ", err, ", reconnecting in ", backoff)
|
||||
timer := time.NewTimer(backoff)
|
||||
select {
|
||||
case <-time.After(backoff):
|
||||
case <-timer.C:
|
||||
case <-ctx.Done():
|
||||
timer.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -236,7 +238,7 @@ func (c *externalCredential) connectorConnect(ctx context.Context) (time.Duratio
|
||||
}
|
||||
}
|
||||
|
||||
session, err := yamux.Server(&bufferedConn{reader: reader, Conn: conn}, reverseYamuxConfig())
|
||||
session, err := yamux.Server(&bufferedConn{reader: reader, Conn: conn}, defaultYamuxConfig)
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
return 0, E.Cause(err, "create yamux server")
|
||||
|
||||
@@ -313,7 +313,7 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// Rewrite response headers for external users
|
||||
if userConfig != nil && userConfig.ExternalCredential != "" {
|
||||
s.rewriteResponseHeadersForExternalUser(response.Header, userConfig)
|
||||
s.rewriteResponseHeadersForExternalUser(response.Header, provider, userConfig)
|
||||
}
|
||||
|
||||
for key, values := range response.Header {
|
||||
|
||||
@@ -100,12 +100,7 @@ func (s *Service) computeAggregatedUtilization(provider credentialProvider, user
|
||||
totalWeight
|
||||
}
|
||||
|
||||
func (s *Service) rewriteResponseHeadersForExternalUser(headers http.Header, userConfig *option.CCMUser) {
|
||||
provider, err := credentialForUser(s.userConfigMap, s.providers, userConfig.Name)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Service) rewriteResponseHeadersForExternalUser(headers http.Header, provider credentialProvider, userConfig *option.CCMUser) {
|
||||
avgFiveHour, avgWeekly, totalWeight := s.computeAggregatedUtilization(provider, userConfig)
|
||||
|
||||
headers.Set("anthropic-ratelimit-unified-5h-utilization", strconv.FormatFloat(avgFiveHour/100, 'f', 6, 64))
|
||||
|
||||
@@ -29,10 +29,12 @@ func doHTTPWithRetry(ctx context.Context, client *http.Client, buildRequest func
|
||||
for attempt := range httpRetryMaxAttempts {
|
||||
if attempt > 0 {
|
||||
delay := httpRetryInitialDelay * time.Duration(1<<(attempt-1))
|
||||
timer := time.NewTimer(delay)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
timer.Stop()
|
||||
return nil, lastError
|
||||
case <-time.After(delay):
|
||||
case <-timer.C:
|
||||
}
|
||||
}
|
||||
request, err := buildRequest()
|
||||
|
||||
@@ -17,14 +17,14 @@ import (
|
||||
"github.com/hashicorp/yamux"
|
||||
)
|
||||
|
||||
func reverseYamuxConfig() *yamux.Config {
|
||||
var defaultYamuxConfig = func() *yamux.Config {
|
||||
config := yamux.DefaultConfig()
|
||||
config.KeepAliveInterval = 15 * time.Second
|
||||
config.ConnectionWriteTimeout = 10 * time.Second
|
||||
config.MaxStreamWindowSize = 512 * 1024
|
||||
config.LogOutput = io.Discard
|
||||
return config
|
||||
}
|
||||
}()
|
||||
|
||||
type bufferedConn struct {
|
||||
reader *bufio.Reader
|
||||
@@ -108,7 +108,7 @@ func (s *Service) handleReverseConnect(ctx context.Context, w http.ResponseWrite
|
||||
return
|
||||
}
|
||||
|
||||
session, err := yamux.Client(conn, reverseYamuxConfig())
|
||||
session, err := yamux.Client(conn, defaultYamuxConfig)
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
s.logger.ErrorContext(ctx, "reverse connect: create yamux client for ", receiverCredential.tagName(), ": ", err)
|
||||
@@ -161,9 +161,11 @@ func (c *externalCredential) connectorLoop() {
|
||||
consecutiveFailures++
|
||||
backoff := connectorBackoff(consecutiveFailures)
|
||||
c.logger.Warn("reverse connection for ", c.tag, " lost: ", err, ", reconnecting in ", backoff)
|
||||
timer := time.NewTimer(backoff)
|
||||
select {
|
||||
case <-time.After(backoff):
|
||||
case <-timer.C:
|
||||
case <-ctx.Done():
|
||||
timer.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -236,7 +238,7 @@ func (c *externalCredential) connectorConnect(ctx context.Context) (time.Duratio
|
||||
}
|
||||
}
|
||||
|
||||
session, err := yamux.Server(&bufferedConn{reader: reader, Conn: conn}, reverseYamuxConfig())
|
||||
session, err := yamux.Server(&bufferedConn{reader: reader, Conn: conn}, defaultYamuxConfig)
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
return 0, E.Cause(err, "create yamux server")
|
||||
|
||||
@@ -293,7 +293,7 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// Rewrite response headers for external users
|
||||
if userConfig != nil && userConfig.ExternalCredential != "" {
|
||||
s.rewriteResponseHeadersForExternalUser(response.Header, userConfig)
|
||||
s.rewriteResponseHeadersForExternalUser(response.Header, provider, userConfig)
|
||||
}
|
||||
|
||||
for key, values := range response.Header {
|
||||
|
||||
@@ -100,12 +100,7 @@ func (s *Service) computeAggregatedUtilization(provider credentialProvider, user
|
||||
totalWeight
|
||||
}
|
||||
|
||||
func (s *Service) rewriteResponseHeadersForExternalUser(headers http.Header, userConfig *option.OCMUser) {
|
||||
provider, err := credentialForUser(s.userConfigMap, s.providers, userConfig.Name)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Service) rewriteResponseHeadersForExternalUser(headers http.Header, provider credentialProvider, userConfig *option.OCMUser) {
|
||||
avgFiveHour, avgWeekly, totalWeight := s.computeAggregatedUtilization(provider, userConfig)
|
||||
|
||||
activeLimitIdentifier := normalizeRateLimitIdentifier(headers.Get("x-codex-active-limit"))
|
||||
|
||||
@@ -253,7 +253,7 @@ func (s *Service) handleWebSocket(
|
||||
}
|
||||
}
|
||||
if userConfig != nil && userConfig.ExternalCredential != "" {
|
||||
s.rewriteResponseHeadersForExternalUser(clientResponseHeaders, userConfig)
|
||||
s.rewriteResponseHeadersForExternalUser(clientResponseHeaders, provider, userConfig)
|
||||
}
|
||||
|
||||
clientUpgrader := ws.HTTPUpgrader{
|
||||
|
||||
Reference in New Issue
Block a user