Skip to content

Commit 5b9ed34

Browse files
committed
fix(nodes): sum client traffic across nodes instead of overwriting
A client shared across multiple nodes has a single email-keyed client_traffics row, but each node reports its cumulative up/down. setRemoteTrafficLocked overwrote the row with one node's cumulative, so non-owning nodes hit the create branch and OnConflict-DoNothing, silently dropping their traffic and under-counting the client. Make the shared row a pure accumulator (like the local path): a new node_client_traffics(node_id, email) baseline table stores each node's last cumulative; the node path converts cumulative to a per-node delta (clamped to the post-reset value on a negative delta) and does up = up + delta. First observation seeds the baseline and adds 0 so upgrades and newly-shared clients are not double-counted. Create-vs-accumulate now keys off global email existence. Baselines are cleaned in DelClientStat, the node sweeps, and NodeService.Delete.
1 parent 588ea86 commit 5b9ed34

6 files changed

Lines changed: 305 additions & 25 deletions

File tree

database/db.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ func initModels() error {
7272
&model.ClientInbound{},
7373
&model.ClientGroup{},
7474
&model.InboundFallback{},
75+
&model.NodeClientTraffic{},
7576
}
7677
for _, mdl := range models {
7778
if err := db.AutoMigrate(mdl); err != nil {

database/migrate_data.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ func migrationModels() []any {
3636
&model.ClientRecord{},
3737
&model.ClientInbound{},
3838
&model.InboundFallback{},
39+
&model.NodeClientTraffic{},
3940
}
4041
}
4142

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package model
2+
3+
type NodeClientTraffic struct {
4+
Id int `json:"id" gorm:"primaryKey;autoIncrement"`
5+
NodeId int `json:"nodeId" gorm:"uniqueIndex:idx_node_email,priority:1;not null"`
6+
Email string `json:"email" gorm:"uniqueIndex:idx_node_email,priority:2;not null"`
7+
Up int64 `json:"up"`
8+
Down int64 `json:"down"`
9+
}

web/service/inbound.go

Lines changed: 82 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1251,6 +1251,18 @@ const resetGracePeriodMs int64 = 30000
12511251
// long after a real disconnect.
12521252
const onlineGracePeriodMs int64 = 20000
12531253

1254+
type nodeTrafficCounter struct {
1255+
Up int64
1256+
Down int64
1257+
}
1258+
1259+
func (s *InboundService) upsertNodeBaseline(tx *gorm.DB, nodeID int, email string, up, down int64) error {
1260+
return tx.Clauses(clause.OnConflict{
1261+
Columns: []clause.Column{{Name: "node_id"}, {Name: "email"}},
1262+
DoUpdates: clause.AssignmentColumns([]string{"up", "down"}),
1263+
}).Create(&model.NodeClientTraffic{NodeId: nodeID, Email: email, Up: up, Down: down}).Error
1264+
}
1265+
12541266
func (s *InboundService) SetRemoteTraffic(nodeID int, snap *runtime.TrafficSnapshot) (bool, error) {
12551267
var structuralChange bool
12561268
err := submitTrafficWrite(func() error {
@@ -1313,6 +1325,26 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
13131325
centralCSByEmail[centralClientStats[i].Email] = &centralClientStats[i]
13141326
}
13151327

1328+
nodeBaselines := make(map[string]nodeTrafficCounter)
1329+
var baselineRows []model.NodeClientTraffic
1330+
if err := db.Model(&model.NodeClientTraffic{}).
1331+
Where("node_id = ?", nodeID).
1332+
Find(&baselineRows).Error; err != nil {
1333+
return false, err
1334+
}
1335+
for i := range baselineRows {
1336+
nodeBaselines[baselineRows[i].Email] = nodeTrafficCounter{Up: baselineRows[i].Up, Down: baselineRows[i].Down}
1337+
}
1338+
1339+
var existingEmailsList []string
1340+
if err := db.Model(xray.ClientTraffic{}).Pluck("email", &existingEmailsList).Error; err != nil {
1341+
return false, err
1342+
}
1343+
existingEmails := make(map[string]struct{}, len(existingEmailsList))
1344+
for _, e := range existingEmailsList {
1345+
existingEmails[e] = struct{}{}
1346+
}
1347+
13161348
var defaultUserId int
13171349
if len(central) > 0 {
13181350
defaultUserId = central[0].UserId
@@ -1458,6 +1490,18 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
14581490
if _, kept := snapTags[c.Tag]; kept {
14591491
continue
14601492
}
1493+
var goneEmails []string
1494+
if err := tx.Model(xray.ClientTraffic{}).
1495+
Where("inbound_id = ?", c.Id).
1496+
Pluck("email", &goneEmails).Error; err != nil {
1497+
return false, err
1498+
}
1499+
if len(goneEmails) > 0 {
1500+
if err := tx.Where("node_id = ? AND email IN ?", nodeID, goneEmails).
1501+
Delete(&model.NodeClientTraffic{}).Error; err != nil {
1502+
return false, err
1503+
}
1504+
}
14611505
if err := tx.Where("inbound_id = ?", c.Id).
14621506
Delete(&xray.ClientTraffic{}).Error; err != nil {
14631507
return false, err
@@ -1481,17 +1525,22 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
14811525
if !ok {
14821526
continue
14831527
}
1484-
inGrace := c.LastTrafficResetTime > 0 && now-c.LastTrafficResetTime < resetGracePeriodMs
1485-
14861528
snapEmails := make(map[string]struct{}, len(snapIb.ClientStats))
14871529
for _, cs := range snapIb.ClientStats {
14881530
snapEmails[cs.Email] = struct{}{}
14891531

1490-
existing := centralCS[csKey{c.Id, cs.Email}]
1491-
if existing == nil {
1492-
existing = centralCSByEmail[cs.Email]
1532+
base, seen := nodeBaselines[cs.Email]
1533+
var deltaUp, deltaDown int64
1534+
if seen {
1535+
if deltaUp = cs.Up - base.Up; deltaUp < 0 {
1536+
deltaUp = cs.Up
1537+
}
1538+
if deltaDown = cs.Down - base.Down; deltaDown < 0 {
1539+
deltaDown = cs.Down
1540+
}
14931541
}
1494-
if existing == nil {
1542+
1543+
if _, rowExists := existingEmails[cs.Email]; !rowExists {
14951544
row := &xray.ClientTraffic{
14961545
InboundId: c.Id,
14971546
Email: cs.Email,
@@ -1509,42 +1558,40 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
15091558
}
15101559
centralCS[csKey{c.Id, cs.Email}] = row
15111560
centralCSByEmail[cs.Email] = row
1561+
existingEmails[cs.Email] = struct{}{}
15121562
structuralChange = true
1563+
if err := s.upsertNodeBaseline(tx, nodeID, cs.Email, cs.Up, cs.Down); err != nil {
1564+
return false, err
1565+
}
1566+
nodeBaselines[cs.Email] = nodeTrafficCounter{Up: cs.Up, Down: cs.Down}
15131567
continue
15141568
}
15151569

1516-
if existing.Enable != cs.Enable ||
1517-
existing.Total != cs.Total ||
1518-
existing.ExpiryTime != cs.ExpiryTime ||
1519-
existing.Reset != cs.Reset {
1570+
if existing := centralCSByEmail[cs.Email]; existing != nil &&
1571+
(existing.Enable != cs.Enable ||
1572+
existing.Total != cs.Total ||
1573+
existing.ExpiryTime != cs.ExpiryTime ||
1574+
existing.Reset != cs.Reset) {
15201575
structuralChange = true
15211576
}
15221577

1523-
if inGrace && cs.Up+cs.Down > 0 {
1524-
if err := tx.Exec(
1525-
`UPDATE client_traffics
1526-
SET enable = ?, total = ?, expiry_time = ?, reset = ?
1527-
WHERE email = ?`,
1528-
cs.Enable, cs.Total, cs.ExpiryTime, cs.Reset, cs.Email,
1529-
).Error; err != nil {
1530-
return false, err
1531-
}
1532-
continue
1533-
}
1534-
15351578
if err := tx.Exec(
15361579
fmt.Sprintf(
15371580
`UPDATE client_traffics
1538-
SET up = ?, down = ?, enable = ?, total = ?, expiry_time = ?, reset = ?,
1581+
SET up = up + ?, down = down + ?, enable = ?, total = ?, expiry_time = ?, reset = ?,
15391582
last_online = %s
15401583
WHERE email = ?`,
15411584
database.GreatestExpr("last_online", "?"),
15421585
),
1543-
cs.Up, cs.Down, cs.Enable, cs.Total, cs.ExpiryTime, cs.Reset,
1586+
deltaUp, deltaDown, cs.Enable, cs.Total, cs.ExpiryTime, cs.Reset,
15441587
cs.LastOnline, cs.Email,
15451588
).Error; err != nil {
15461589
return false, err
15471590
}
1591+
if err := s.upsertNodeBaseline(tx, nodeID, cs.Email, cs.Up, cs.Down); err != nil {
1592+
return false, err
1593+
}
1594+
nodeBaselines[cs.Email] = nodeTrafficCounter{Up: cs.Up, Down: cs.Down}
15481595
}
15491596

15501597
for k, existing := range centralCS {
@@ -1554,6 +1601,10 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
15541601
if _, kept := snapEmails[k.email]; kept {
15551602
continue
15561603
}
1604+
if err := tx.Where("node_id = ? AND email = ?", nodeID, existing.Email).
1605+
Delete(&model.NodeClientTraffic{}).Error; err != nil {
1606+
return false, err
1607+
}
15571608
if err := tx.Where("inbound_id = ? AND email = ?", c.Id, existing.Email).
15581609
Delete(&xray.ClientTraffic{}).Error; err != nil {
15591610
return false, err
@@ -1671,6 +1722,9 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
16711722
if err := tx.Where("email = ?", email).Delete(&xray.ClientTraffic{}).Error; err != nil {
16721723
logger.Warningf("setRemoteTraffic: delete ClientTraffic %q failed: %v", email, err)
16731724
}
1725+
if err := tx.Where("email = ?", email).Delete(&model.NodeClientTraffic{}).Error; err != nil {
1726+
logger.Warningf("setRemoteTraffic: delete NodeClientTraffic %q failed: %v", email, err)
1727+
}
16741728
structuralChange = true
16751729
}
16761730
}
@@ -2329,7 +2383,10 @@ func (s *InboundService) UpdateClientIPs(tx *gorm.DB, oldEmail string, newEmail
23292383
}
23302384

23312385
func (s *InboundService) DelClientStat(tx *gorm.DB, email string) error {
2332-
return tx.Where("email = ?", email).Delete(xray.ClientTraffic{}).Error
2386+
if err := tx.Where("email = ?", email).Delete(xray.ClientTraffic{}).Error; err != nil {
2387+
return err
2388+
}
2389+
return tx.Where("email = ?", email).Delete(&model.NodeClientTraffic{}).Error
23332390
}
23342391

23352392
func (s *InboundService) DelClientIPs(tx *gorm.DB, email string) error {

web/service/node.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,9 @@ func (s *NodeService) Delete(id int) error {
233233
if err := db.Where("id = ?", id).Delete(model.Node{}).Error; err != nil {
234234
return err
235235
}
236+
if err := db.Where("node_id = ?", id).Delete(&model.NodeClientTraffic{}).Error; err != nil {
237+
return err
238+
}
236239
if mgr := runtime.GetManager(); mgr != nil {
237240
mgr.InvalidateNode(id)
238241
}

0 commit comments

Comments
 (0)