@@ -1251,6 +1251,18 @@ const resetGracePeriodMs int64 = 30000
12511251// long after a real disconnect.
12521252const onlineGracePeriodMs int64 = 20000
12531253
1254+ type nodeTrafficCounter struct {
1255+ Up int64
1256+ Down int64
1257+ }
1258+
1259+ func (s * InboundService ) upsertNodeBaseline (tx * gorm.DB , nodeID int , email string , up , down int64 ) error {
1260+ return tx .Clauses (clause.OnConflict {
1261+ Columns : []clause.Column {{Name : "node_id" }, {Name : "email" }},
1262+ DoUpdates : clause .AssignmentColumns ([]string {"up" , "down" }),
1263+ }).Create (& model.NodeClientTraffic {NodeId : nodeID , Email : email , Up : up , Down : down }).Error
1264+ }
1265+
12541266func (s * InboundService ) SetRemoteTraffic (nodeID int , snap * runtime.TrafficSnapshot ) (bool , error ) {
12551267 var structuralChange bool
12561268 err := submitTrafficWrite (func () error {
@@ -1313,6 +1325,26 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
13131325 centralCSByEmail [centralClientStats [i ].Email ] = & centralClientStats [i ]
13141326 }
13151327
1328+ nodeBaselines := make (map [string ]nodeTrafficCounter )
1329+ var baselineRows []model.NodeClientTraffic
1330+ if err := db .Model (& model.NodeClientTraffic {}).
1331+ Where ("node_id = ?" , nodeID ).
1332+ Find (& baselineRows ).Error ; err != nil {
1333+ return false , err
1334+ }
1335+ for i := range baselineRows {
1336+ nodeBaselines [baselineRows [i ].Email ] = nodeTrafficCounter {Up : baselineRows [i ].Up , Down : baselineRows [i ].Down }
1337+ }
1338+
1339+ var existingEmailsList []string
1340+ if err := db .Model (xray.ClientTraffic {}).Pluck ("email" , & existingEmailsList ).Error ; err != nil {
1341+ return false , err
1342+ }
1343+ existingEmails := make (map [string ]struct {}, len (existingEmailsList ))
1344+ for _ , e := range existingEmailsList {
1345+ existingEmails [e ] = struct {}{}
1346+ }
1347+
13161348 var defaultUserId int
13171349 if len (central ) > 0 {
13181350 defaultUserId = central [0 ].UserId
@@ -1458,6 +1490,18 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
14581490 if _ , kept := snapTags [c .Tag ]; kept {
14591491 continue
14601492 }
1493+ var goneEmails []string
1494+ if err := tx .Model (xray.ClientTraffic {}).
1495+ Where ("inbound_id = ?" , c .Id ).
1496+ Pluck ("email" , & goneEmails ).Error ; err != nil {
1497+ return false , err
1498+ }
1499+ if len (goneEmails ) > 0 {
1500+ if err := tx .Where ("node_id = ? AND email IN ?" , nodeID , goneEmails ).
1501+ Delete (& model.NodeClientTraffic {}).Error ; err != nil {
1502+ return false , err
1503+ }
1504+ }
14611505 if err := tx .Where ("inbound_id = ?" , c .Id ).
14621506 Delete (& xray.ClientTraffic {}).Error ; err != nil {
14631507 return false , err
@@ -1481,17 +1525,22 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
14811525 if ! ok {
14821526 continue
14831527 }
1484- inGrace := c .LastTrafficResetTime > 0 && now - c .LastTrafficResetTime < resetGracePeriodMs
1485-
14861528 snapEmails := make (map [string ]struct {}, len (snapIb .ClientStats ))
14871529 for _ , cs := range snapIb .ClientStats {
14881530 snapEmails [cs .Email ] = struct {}{}
14891531
1490- existing := centralCS [csKey {c .Id , cs .Email }]
1491- if existing == nil {
1492- existing = centralCSByEmail [cs .Email ]
1532+ base , seen := nodeBaselines [cs .Email ]
1533+ var deltaUp , deltaDown int64
1534+ if seen {
1535+ if deltaUp = cs .Up - base .Up ; deltaUp < 0 {
1536+ deltaUp = cs .Up
1537+ }
1538+ if deltaDown = cs .Down - base .Down ; deltaDown < 0 {
1539+ deltaDown = cs .Down
1540+ }
14931541 }
1494- if existing == nil {
1542+
1543+ if _ , rowExists := existingEmails [cs .Email ]; ! rowExists {
14951544 row := & xray.ClientTraffic {
14961545 InboundId : c .Id ,
14971546 Email : cs .Email ,
@@ -1509,42 +1558,40 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
15091558 }
15101559 centralCS [csKey {c .Id , cs .Email }] = row
15111560 centralCSByEmail [cs .Email ] = row
1561+ existingEmails [cs .Email ] = struct {}{}
15121562 structuralChange = true
1563+ if err := s .upsertNodeBaseline (tx , nodeID , cs .Email , cs .Up , cs .Down ); err != nil {
1564+ return false , err
1565+ }
1566+ nodeBaselines [cs .Email ] = nodeTrafficCounter {Up : cs .Up , Down : cs .Down }
15131567 continue
15141568 }
15151569
1516- if existing .Enable != cs .Enable ||
1517- existing .Total != cs .Total ||
1518- existing .ExpiryTime != cs .ExpiryTime ||
1519- existing .Reset != cs .Reset {
1570+ if existing := centralCSByEmail [cs .Email ]; existing != nil &&
1571+ (existing .Enable != cs .Enable ||
1572+ existing .Total != cs .Total ||
1573+ existing .ExpiryTime != cs .ExpiryTime ||
1574+ existing .Reset != cs .Reset ) {
15201575 structuralChange = true
15211576 }
15221577
1523- if inGrace && cs .Up + cs .Down > 0 {
1524- if err := tx .Exec (
1525- `UPDATE client_traffics
1526- SET enable = ?, total = ?, expiry_time = ?, reset = ?
1527- WHERE email = ?` ,
1528- cs .Enable , cs .Total , cs .ExpiryTime , cs .Reset , cs .Email ,
1529- ).Error ; err != nil {
1530- return false , err
1531- }
1532- continue
1533- }
1534-
15351578 if err := tx .Exec (
15361579 fmt .Sprintf (
15371580 `UPDATE client_traffics
1538- SET up = ?, down = ?, enable = ?, total = ?, expiry_time = ?, reset = ?,
1581+ SET up = up + ?, down = down + ?, enable = ?, total = ?, expiry_time = ?, reset = ?,
15391582 last_online = %s
15401583 WHERE email = ?` ,
15411584 database .GreatestExpr ("last_online" , "?" ),
15421585 ),
1543- cs . Up , cs . Down , cs .Enable , cs .Total , cs .ExpiryTime , cs .Reset ,
1586+ deltaUp , deltaDown , cs .Enable , cs .Total , cs .ExpiryTime , cs .Reset ,
15441587 cs .LastOnline , cs .Email ,
15451588 ).Error ; err != nil {
15461589 return false , err
15471590 }
1591+ if err := s .upsertNodeBaseline (tx , nodeID , cs .Email , cs .Up , cs .Down ); err != nil {
1592+ return false , err
1593+ }
1594+ nodeBaselines [cs .Email ] = nodeTrafficCounter {Up : cs .Up , Down : cs .Down }
15481595 }
15491596
15501597 for k , existing := range centralCS {
@@ -1554,6 +1601,10 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
15541601 if _ , kept := snapEmails [k .email ]; kept {
15551602 continue
15561603 }
1604+ if err := tx .Where ("node_id = ? AND email = ?" , nodeID , existing .Email ).
1605+ Delete (& model.NodeClientTraffic {}).Error ; err != nil {
1606+ return false , err
1607+ }
15571608 if err := tx .Where ("inbound_id = ? AND email = ?" , c .Id , existing .Email ).
15581609 Delete (& xray.ClientTraffic {}).Error ; err != nil {
15591610 return false , err
@@ -1671,6 +1722,9 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
16711722 if err := tx .Where ("email = ?" , email ).Delete (& xray.ClientTraffic {}).Error ; err != nil {
16721723 logger .Warningf ("setRemoteTraffic: delete ClientTraffic %q failed: %v" , email , err )
16731724 }
1725+ if err := tx .Where ("email = ?" , email ).Delete (& model.NodeClientTraffic {}).Error ; err != nil {
1726+ logger .Warningf ("setRemoteTraffic: delete NodeClientTraffic %q failed: %v" , email , err )
1727+ }
16741728 structuralChange = true
16751729 }
16761730 }
@@ -2329,7 +2383,10 @@ func (s *InboundService) UpdateClientIPs(tx *gorm.DB, oldEmail string, newEmail
23292383}
23302384
23312385func (s * InboundService ) DelClientStat (tx * gorm.DB , email string ) error {
2332- return tx .Where ("email = ?" , email ).Delete (xray.ClientTraffic {}).Error
2386+ if err := tx .Where ("email = ?" , email ).Delete (xray.ClientTraffic {}).Error ; err != nil {
2387+ return err
2388+ }
2389+ return tx .Where ("email = ?" , email ).Delete (& model.NodeClientTraffic {}).Error
23332390}
23342391
23352392func (s * InboundService ) DelClientIPs (tx * gorm.DB , email string ) error {
0 commit comments