ccm,ocm: reduce status emission noise, simplify emit-guard pattern

Guard updateStateFromHeaders emission with value-change detection to
avoid unnecessary computeAggregatedUtilization scans on every proxied
response. Replace statusAggregateStateLocked two-value return with
comparable statusSnapshot struct. Define statusPayload type for the
status wire format, replacing anonymous structs and map literals.
This commit is contained in:
世界
2026-03-17 16:10:59 +08:00
parent f84832a369
commit 4a6a211775
7 changed files with 66 additions and 56 deletions

View File

@@ -152,11 +152,16 @@ func (c *defaultCredential) emitStatusUpdate() {
}
}
func (c *defaultCredential) statusAggregateStateLocked() (bool, float64) {
type statusSnapshot struct {
available bool
weight float64
}
func (c *defaultCredential) statusSnapshotLocked() statusSnapshot {
if c.state.unavailable {
return false, 0
return statusSnapshot{}
}
return true, ccmPlanWeight(c.state.accountType, c.state.rateLimitTier)
return statusSnapshot{true, ccmPlanWeight(c.state.accountType, c.state.rateLimitTier)}
}
func (c *defaultCredential) getAccessToken() (string, error) {
@@ -206,15 +211,14 @@ func (c *defaultCredential) getAccessToken() (string, error) {
if latestErr == nil && !credentialsEqual(latestCredentials, baseCredentials) {
c.credentials = latestCredentials
c.stateAccess.Lock()
wasAvailable, oldWeight := c.statusAggregateStateLocked()
before := c.statusSnapshotLocked()
c.state.unavailable = false
c.state.lastCredentialLoadAttempt = time.Now()
c.state.lastCredentialLoadError = ""
c.state.accountType = latestCredentials.SubscriptionType
c.state.rateLimitTier = latestCredentials.RateLimitTier
c.checkTransitionLocked()
isAvailable, newWeight := c.statusAggregateStateLocked()
shouldEmit := wasAvailable != isAvailable || oldWeight != newWeight
shouldEmit := before != c.statusSnapshotLocked()
c.stateAccess.Unlock()
if shouldEmit {
c.emitStatusUpdate()
@@ -227,15 +231,14 @@ func (c *defaultCredential) getAccessToken() (string, error) {
c.credentials = newCredentials
c.stateAccess.Lock()
wasAvailable, oldWeight := c.statusAggregateStateLocked()
before := c.statusSnapshotLocked()
c.state.unavailable = false
c.state.lastCredentialLoadAttempt = time.Now()
c.state.lastCredentialLoadError = ""
c.state.accountType = newCredentials.SubscriptionType
c.state.rateLimitTier = newCredentials.RateLimitTier
c.checkTransitionLocked()
isAvailable, newWeight := c.statusAggregateStateLocked()
shouldEmit := wasAvailable != isAvailable || oldWeight != newWeight
shouldEmit := before != c.statusSnapshotLocked()
c.stateAccess.Unlock()
if shouldEmit {
c.emitStatusUpdate()
@@ -304,12 +307,13 @@ func (c *defaultCredential) updateStateFromHeaders(headers http.Header) {
}
c.logger.Debug("usage update for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%", resetSuffix)
}
shouldEmit := hadData && (c.state.fiveHourUtilization != oldFiveHour || c.state.weeklyUtilization != oldWeekly)
shouldInterrupt := c.checkTransitionLocked()
c.stateAccess.Unlock()
if shouldInterrupt {
c.interruptConnections()
}
if hadData {
if shouldEmit {
c.emitStatusUpdate()
}
}
@@ -673,7 +677,7 @@ func (c *defaultCredential) fetchProfile(ctx context.Context, httpClient *http.C
rateLimitTier := profileResponse.Organization.RateLimitTier
c.stateAccess.Lock()
wasAvailable, oldWeight := c.statusAggregateStateLocked()
before := c.statusSnapshotLocked()
if accountType != "" && c.state.accountType == "" {
c.state.accountType = accountType
}
@@ -681,8 +685,7 @@ func (c *defaultCredential) fetchProfile(ctx context.Context, httpClient *http.C
c.state.rateLimitTier = rateLimitTier
}
resolvedAccountType := c.state.accountType
isAvailable, newWeight := c.statusAggregateStateLocked()
shouldEmit := wasAvailable != isAvailable || oldWeight != newWeight
shouldEmit := before != c.statusSnapshotLocked()
c.stateAccess.Unlock()
if shouldEmit {
c.emitStatusUpdate()

View File

@@ -436,6 +436,7 @@ func (c *externalCredential) updateStateFromHeaders(headers http.Header) {
isFirstUpdate := c.state.lastUpdated.IsZero()
oldFiveHour := c.state.fiveHourUtilization
oldWeekly := c.state.weeklyUtilization
oldPlanWeight := c.state.remotePlanWeight
hadData := false
if value, exists := parseOptionalAnthropicResetHeader(headers, "anthropic-ratelimit-unified-5h-reset"); exists {
@@ -478,12 +479,13 @@ func (c *externalCredential) updateStateFromHeaders(headers http.Header) {
}
c.logger.Debug("usage update for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%", resetSuffix)
}
shouldEmit := hadData && (c.state.fiveHourUtilization != oldFiveHour || c.state.weeklyUtilization != oldWeekly || c.state.remotePlanWeight != oldPlanWeight)
shouldInterrupt := c.checkTransitionLocked()
c.stateAccess.Unlock()
if shouldInterrupt {
c.interruptConnections()
}
if hadData {
if shouldEmit {
c.emitStatusUpdate()
}
}
@@ -679,11 +681,7 @@ func (c *externalCredential) connectStatusStream(ctx context.Context) (statusStr
previousLastUpdated := c.lastUpdatedTime()
var firstFrameUpdatedAt time.Time
for {
var statusResponse struct {
FiveHourUtilization float64 `json:"five_hour_utilization"`
WeeklyUtilization float64 `json:"weekly_utilization"`
PlanWeight float64 `json:"plan_weight"`
}
var statusResponse statusPayload
err = decoder.Decode(&statusResponse)
if err != nil {
result.duration = time.Since(startTime)

View File

@@ -111,14 +111,13 @@ func (c *defaultCredential) reloadCredentials(force bool) error {
c.access.Unlock()
c.stateAccess.Lock()
wasAvailable, oldWeight := c.statusAggregateStateLocked()
before := c.statusSnapshotLocked()
c.state.unavailable = false
c.state.lastCredentialLoadError = ""
c.state.accountType = credentials.SubscriptionType
c.state.rateLimitTier = credentials.RateLimitTier
c.checkTransitionLocked()
isAvailable, newWeight := c.statusAggregateStateLocked()
shouldEmit := wasAvailable != isAvailable || oldWeight != newWeight
shouldEmit := before != c.statusSnapshotLocked()
c.stateAccess.Unlock()
if shouldEmit {
c.emitStatusUpdate()
@@ -134,14 +133,13 @@ func (c *defaultCredential) markCredentialsUnavailable(err error) error {
c.access.Unlock()
c.stateAccess.Lock()
wasAvailable, oldWeight := c.statusAggregateStateLocked()
before := c.statusSnapshotLocked()
c.state.unavailable = true
c.state.lastCredentialLoadError = err.Error()
c.state.accountType = ""
c.state.rateLimitTier = ""
shouldInterrupt := c.checkTransitionLocked()
isAvailable, newWeight := c.statusAggregateStateLocked()
shouldEmit := wasAvailable != isAvailable || oldWeight != newWeight
shouldEmit := before != c.statusSnapshotLocked()
c.stateAccess.Unlock()
if shouldInterrupt && hadCredentials {

View File

@@ -10,6 +10,12 @@ import (
"github.com/sagernet/sing-box/option"
)
type statusPayload struct {
FiveHourUtilization float64 `json:"five_hour_utilization"`
WeeklyUtilization float64 `json:"weekly_utilization"`
PlanWeight float64 `json:"plan_weight"`
}
func (s *Service) handleStatusEndpoint(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
writeJSONError(w, r, http.StatusMethodNotAllowed, "invalid_request_error", "method not allowed")
@@ -66,10 +72,10 @@ func (s *Service) handleStatusEndpoint(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]float64{
"five_hour_utilization": avgFiveHour,
"weekly_utilization": avgWeekly,
"plan_weight": totalWeight,
json.NewEncoder(w).Encode(statusPayload{
FiveHourUtilization: avgFiveHour,
WeeklyUtilization: avgWeekly,
PlanWeight: totalWeight,
})
}
@@ -95,10 +101,10 @@ func (s *Service) handleStatusStream(w http.ResponseWriter, r *http.Request, pro
lastFiveHour, lastWeekly, lastWeight := s.computeAggregatedUtilization(provider, userConfig)
buf := &bytes.Buffer{}
json.NewEncoder(buf).Encode(map[string]float64{
"five_hour_utilization": lastFiveHour,
"weekly_utilization": lastWeekly,
"plan_weight": lastWeight,
json.NewEncoder(buf).Encode(statusPayload{
FiveHourUtilization: lastFiveHour,
WeeklyUtilization: lastWeekly,
PlanWeight: lastWeight,
})
_, writeErr := w.Write(buf.Bytes())
if writeErr != nil {
@@ -129,10 +135,10 @@ func (s *Service) handleStatusStream(w http.ResponseWriter, r *http.Request, pro
lastWeekly = weekly
lastWeight = weight
buf.Reset()
json.NewEncoder(buf).Encode(map[string]float64{
"five_hour_utilization": fiveHour,
"weekly_utilization": weekly,
"plan_weight": weight,
json.NewEncoder(buf).Encode(statusPayload{
FiveHourUtilization: fiveHour,
WeeklyUtilization: weekly,
PlanWeight: weight,
})
_, writeErr = w.Write(buf.Bytes())
if writeErr != nil {

View File

@@ -347,12 +347,13 @@ func (c *defaultCredential) updateStateFromHeaders(headers http.Header) {
}
c.logger.Debug("usage update for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%", resetSuffix)
}
shouldEmit := hadData && (c.state.fiveHourUtilization != oldFiveHour || c.state.weeklyUtilization != oldWeekly)
shouldInterrupt := c.checkTransitionLocked()
c.stateAccess.Unlock()
if shouldInterrupt {
c.interruptConnections()
}
if hadData {
if shouldEmit {
c.emitStatusUpdate()
}
}

View File

@@ -460,6 +460,7 @@ func (c *externalCredential) updateStateFromHeaders(headers http.Header) {
isFirstUpdate := c.state.lastUpdated.IsZero()
oldFiveHour := c.state.fiveHourUtilization
oldWeekly := c.state.weeklyUtilization
oldPlanWeight := c.state.remotePlanWeight
hadData := false
activeLimitIdentifier := normalizeRateLimitIdentifier(headers.Get("x-codex-active-limit"))
@@ -517,12 +518,13 @@ func (c *externalCredential) updateStateFromHeaders(headers http.Header) {
}
c.logger.Debug("usage update for ", c.tag, ": 5h=", c.state.fiveHourUtilization, "%, weekly=", c.state.weeklyUtilization, "%", resetSuffix)
}
shouldEmit := hadData && (c.state.fiveHourUtilization != oldFiveHour || c.state.weeklyUtilization != oldWeekly || c.state.remotePlanWeight != oldPlanWeight)
shouldInterrupt := c.checkTransitionLocked()
c.stateAccess.Unlock()
if shouldInterrupt {
c.interruptConnections()
}
if hadData {
if shouldEmit {
c.emitStatusUpdate()
}
}
@@ -721,11 +723,7 @@ func (c *externalCredential) connectStatusStream(ctx context.Context) (statusStr
previousLastUpdated := c.lastUpdatedTime()
var firstFrameUpdatedAt time.Time
for {
var statusResponse struct {
FiveHourUtilization float64 `json:"five_hour_utilization"`
WeeklyUtilization float64 `json:"weekly_utilization"`
PlanWeight float64 `json:"plan_weight"`
}
var statusResponse statusPayload
err = decoder.Decode(&statusResponse)
if err != nil {
result.duration = time.Since(startTime)

View File

@@ -10,6 +10,12 @@ import (
"github.com/sagernet/sing-box/option"
)
type statusPayload struct {
FiveHourUtilization float64 `json:"five_hour_utilization"`
WeeklyUtilization float64 `json:"weekly_utilization"`
PlanWeight float64 `json:"plan_weight"`
}
func (s *Service) handleStatusEndpoint(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
writeJSONError(w, r, http.StatusMethodNotAllowed, "invalid_request_error", "method not allowed")
@@ -66,10 +72,10 @@ func (s *Service) handleStatusEndpoint(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]float64{
"five_hour_utilization": avgFiveHour,
"weekly_utilization": avgWeekly,
"plan_weight": totalWeight,
json.NewEncoder(w).Encode(statusPayload{
FiveHourUtilization: avgFiveHour,
WeeklyUtilization: avgWeekly,
PlanWeight: totalWeight,
})
}
@@ -95,10 +101,10 @@ func (s *Service) handleStatusStream(w http.ResponseWriter, r *http.Request, pro
lastFiveHour, lastWeekly, lastWeight := s.computeAggregatedUtilization(provider, userConfig)
buf := &bytes.Buffer{}
json.NewEncoder(buf).Encode(map[string]float64{
"five_hour_utilization": lastFiveHour,
"weekly_utilization": lastWeekly,
"plan_weight": lastWeight,
json.NewEncoder(buf).Encode(statusPayload{
FiveHourUtilization: lastFiveHour,
WeeklyUtilization: lastWeekly,
PlanWeight: lastWeight,
})
_, writeErr := w.Write(buf.Bytes())
if writeErr != nil {
@@ -129,10 +135,10 @@ func (s *Service) handleStatusStream(w http.ResponseWriter, r *http.Request, pro
lastWeekly = weekly
lastWeight = weight
buf.Reset()
json.NewEncoder(buf).Encode(map[string]float64{
"five_hour_utilization": fiveHour,
"weekly_utilization": weekly,
"plan_weight": weight,
json.NewEncoder(buf).Encode(statusPayload{
FiveHourUtilization: fiveHour,
WeeklyUtilization: weekly,
PlanWeight: weight,
})
_, writeErr = w.Write(buf.Bytes())
if writeErr != nil {