Honor cloudflare warp active flow limits

This commit is contained in:
世界
2026-03-24 12:33:44 +08:00
parent 25a94ac5b6
commit 854718992f
6 changed files with 165 additions and 4 deletions

View File

@@ -133,6 +133,11 @@ func (m *DatagramV2Muxer) RegisterSession(
m.sessionAccess.Unlock()
return nil
}
limit := m.inbound.maxActiveFlows()
if !m.inbound.flowLimiter.Acquire(limit) {
m.sessionAccess.Unlock()
return E.New("too many active flows")
}
session := newUDPSession(sessionID, destination, closeAfterIdle, m)
m.sessions[sessionID] = session
@@ -140,7 +145,7 @@ func (m *DatagramV2Muxer) RegisterSession(
m.logger.Info("registered V2 UDP session ", sessionID, " to ", destination)
go m.serveSession(ctx, session)
go m.serveSession(ctx, session, limit)
return nil
}
@@ -159,8 +164,9 @@ func (m *DatagramV2Muxer) UnregisterSession(sessionID uuid.UUID) {
}
}
func (m *DatagramV2Muxer) serveSession(ctx context.Context, session *udpSession) {
func (m *DatagramV2Muxer) serveSession(ctx context.Context, session *udpSession, limit uint64) {
defer m.UnregisterSession(session.id)
defer m.inbound.flowLimiter.Release(limit)
metadata := adapter.InboundContext{
Inbound: m.inbound.Tag(),

View File

@@ -154,6 +154,12 @@ func (m *DatagramV3Muxer) handleRegistration(ctx context.Context, data []byte) {
}
return
}
limit := m.inbound.maxActiveFlows()
if !m.inbound.flowLimiter.Acquire(limit) {
m.sessionAccess.Unlock()
m.sendRegistrationResponse(requestID, v3ResponseTooManyActiveFlows, "")
return
}
session := newV3Session(requestID, destination, closeAfterIdle, m)
m.sessions[requestID] = session
@@ -167,7 +173,7 @@ func (m *DatagramV3Muxer) handleRegistration(ctx context.Context, data []byte) {
session.writeToOrigin(data[offset:])
}
go m.serveV3Session(ctx, session)
go m.serveV3Session(ctx, session, limit)
}
func (m *DatagramV3Muxer) handlePayload(data []byte) {
@@ -222,8 +228,9 @@ func (m *DatagramV3Muxer) unregisterSession(requestID RequestID) {
}
}
func (m *DatagramV3Muxer) serveV3Session(ctx context.Context, session *v3Session) {
func (m *DatagramV3Muxer) serveV3Session(ctx context.Context, session *v3Session, limit uint64) {
defer m.unregisterSession(session.id)
defer m.inbound.flowLimiter.Release(limit)
metadata := adapter.InboundContext{
Inbound: m.inbound.Tag(),

View File

@@ -185,6 +185,14 @@ func parseHTTPDestination(dest string) M.Socksaddr {
func (i *Inbound) handleTCPStream(ctx context.Context, stream io.ReadWriteCloser, respWriter ConnectResponseWriter, metadata adapter.InboundContext) {
metadata.Network = N.NetworkTCP
i.logger.InfoContext(ctx, "inbound TCP connection to ", metadata.Destination)
limit := i.maxActiveFlows()
if !i.flowLimiter.Acquire(limit) {
err := E.New("too many active flows")
i.logger.ErrorContext(ctx, err)
respWriter.WriteResponse(err, nil)
return
}
defer i.flowLimiter.Release(limit)
err := respWriter.WriteResponse(nil, nil)
if err != nil {

View File

@@ -0,0 +1,34 @@
//go:build with_cloudflare_tunnel
package cloudflare
import "sync"
type FlowLimiter struct {
access sync.Mutex
active uint64
}
func (l *FlowLimiter) Acquire(limit uint64) bool {
if limit == 0 {
return true
}
l.access.Lock()
defer l.access.Unlock()
if l.active >= limit {
return false
}
l.active++
return true
}
func (l *FlowLimiter) Release(limit uint64) {
if limit == 0 {
return
}
l.access.Lock()
defer l.access.Unlock()
if l.active > 0 {
l.active--
}
}

View File

@@ -0,0 +1,100 @@
//go:build with_cloudflare_tunnel
package cloudflare
import (
"context"
"encoding/binary"
"net"
"testing"
"github.com/sagernet/sing-box/adapter"
"github.com/sagernet/sing-box/adapter/inbound"
C "github.com/sagernet/sing-box/constant"
"github.com/sagernet/sing-box/log"
"github.com/sagernet/sing-box/option"
"github.com/google/uuid"
)
func newLimitedInbound(t *testing.T, limit uint64) *Inbound {
t.Helper()
logFactory, err := log.New(log.Options{Options: option.LogOptions{Level: "debug"}})
if err != nil {
t.Fatal(err)
}
configManager, err := NewConfigManager(option.CloudflareTunnelInboundOptions{})
if err != nil {
t.Fatal(err)
}
config := configManager.Snapshot()
config.WarpRouting.MaxActiveFlows = limit
configManager.activeConfig = config
return &Inbound{
Adapter: inbound.NewAdapter(C.TypeCloudflareTunnel, "test"),
router: &testRouter{},
logger: logFactory.NewLogger("test"),
configManager: configManager,
flowLimiter: &FlowLimiter{},
}
}
func TestHandleTCPStreamRespectsMaxActiveFlows(t *testing.T) {
inboundInstance := newLimitedInbound(t, 1)
if !inboundInstance.flowLimiter.Acquire(1) {
t.Fatal("failed to pre-acquire limiter")
}
stream, peer := net.Pipe()
defer stream.Close()
defer peer.Close()
respWriter := &fakeConnectResponseWriter{}
inboundInstance.handleTCPStream(context.Background(), stream, respWriter, adapter.InboundContext{})
if respWriter.err == nil {
t.Fatal("expected too many active flows error")
}
}
func TestDatagramV2RegisterSessionRespectsMaxActiveFlows(t *testing.T) {
inboundInstance := newLimitedInbound(t, 1)
if !inboundInstance.flowLimiter.Acquire(1) {
t.Fatal("failed to pre-acquire limiter")
}
muxer := NewDatagramV2Muxer(inboundInstance, &captureDatagramSender{}, inboundInstance.logger)
err := muxer.RegisterSession(context.Background(), uuidTest(1), net.IPv4(1, 1, 1, 1), 53, 0)
if err == nil {
t.Fatal("expected too many active flows error")
}
}
func TestDatagramV3RegistrationTooManyActiveFlows(t *testing.T) {
inboundInstance := newLimitedInbound(t, 1)
if !inboundInstance.flowLimiter.Acquire(1) {
t.Fatal("failed to pre-acquire limiter")
}
sender := &captureDatagramSender{}
muxer := NewDatagramV3Muxer(inboundInstance, sender, inboundInstance.logger)
requestID := RequestID{}
requestID[15] = 1
payload := make([]byte, 1+1+2+2+16+4)
payload[0] = 0
binary.BigEndian.PutUint16(payload[1:3], 53)
binary.BigEndian.PutUint16(payload[3:5], 30)
copy(payload[5:21], requestID[:])
copy(payload[21:25], []byte{1, 1, 1, 1})
muxer.handleRegistration(context.Background(), payload)
if len(sender.sent) != 1 {
t.Fatalf("expected one registration response, got %d", len(sender.sent))
}
if sender.sent[0][0] != byte(DatagramV3TypeRegistrationResponse) || sender.sent[0][1] != v3ResponseTooManyActiveFlows {
t.Fatalf("unexpected v3 response: %v", sender.sent[0])
}
}
func uuidTest(last byte) uuid.UUID {
var value uuid.UUID
value[15] = last
return value
}

View File

@@ -44,6 +44,7 @@ type Inbound struct {
datagramVersion string
gracePeriod time.Duration
configManager *ConfigManager
flowLimiter *FlowLimiter
connectionAccess sync.Mutex
connections []io.Closer
@@ -119,6 +120,7 @@ func NewInbound(ctx context.Context, router adapter.Router, logger log.ContextLo
datagramVersion: datagramVersion,
gracePeriod: gracePeriod,
configManager: configManager,
flowLimiter: &FlowLimiter{},
datagramV2Muxers: make(map[DatagramSender]*DatagramV2Muxer),
datagramV3Muxers: make(map[DatagramSender]*DatagramV3Muxer),
}, nil
@@ -174,6 +176,10 @@ func (i *Inbound) ApplyConfig(version int32, config []byte) ConfigUpdateResult {
return result
}
func (i *Inbound) maxActiveFlows() uint64 {
return i.configManager.Snapshot().WarpRouting.MaxActiveFlows
}
func (i *Inbound) Close() error {
i.cancel()
i.done.Wait()