From e00c471f3ab87f0fcf32686ef7b9297fb695238f Mon Sep 17 00:00:00 2001 From: jc3wish <> Date: Sat, 26 Apr 2025 12:52:15 +0800 Subject: [PATCH] =?UTF-8?q?It=20supports=20flattening=20transaction=20logs?= =?UTF-8?q?=20into=20multiple=20oplog=20entries=20for=20output,=20which=20?= =?UTF-8?q?can=20be=20disabled=20by=20setting=20the=20DisableFlatTransacti?= =?UTF-8?q?onData=20parameter.=20=E6=94=AF=E6=8C=81=E5=B0=86=E4=BA=8B?= =?UTF-8?q?=E5=8A=A1=E6=97=A5=E5=BF=97=E5=B9=B3=E9=93=BA=E6=88=90=E5=A4=9A?= =?UTF-8?q?=E6=9D=A1oplog=E8=BF=9B=E8=A1=8C=E8=BE=93=E5=87=BA=EF=BC=8C?= =?UTF-8?q?=E5=8F=AF=E4=BB=A5=E9=80=9A=E8=BF=87=E8=AE=BE=E7=BD=AEDisableFl?= =?UTF-8?q?atTransactionData=E5=8F=82=E6=95=B0=E8=BF=9B=E8=A1=8C=E5=85=B3?= =?UTF-8?q?=E9=97=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- gtm.go | 133 +++++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 105 insertions(+), 28 deletions(-) diff --git a/gtm.go b/gtm.go index 9f83b5b..98dc810 100644 --- a/gtm.go +++ b/gtm.go @@ -71,31 +71,32 @@ const ( ) type Options struct { - After TimestampGenerator - Token ResumeTokenGenenerator - Filter OpFilter - NamespaceFilter OpFilter - OpLogDisabled bool - OpLogDatabaseName string - OpLogCollectionName string - ChannelSize int - BufferSize int - BufferDuration time.Duration - Ordering OrderingGuarantee - WorkerCount int - MaxAwaitTime time.Duration - UpdateDataAsDelta bool - ChangeStreamNs []string - DirectReadNs []string - DirectReadFilter OpFilter - DirectReadSplitMax int32 - DirectReadConcur int - DirectReadNoTimeout bool - DirectReadBounded bool - Unmarshal DataUnmarshaller - Pipe PipelineBuilder - PipeAllowDisk bool - Log *log.Logger + After TimestampGenerator + Token ResumeTokenGenenerator + Filter OpFilter + NamespaceFilter OpFilter + OpLogDisabled bool + OpLogDatabaseName string + OpLogCollectionName string + ChannelSize int + BufferSize int + BufferDuration time.Duration + Ordering OrderingGuarantee + WorkerCount int + MaxAwaitTime time.Duration + UpdateDataAsDelta bool + ChangeStreamNs []string + DirectReadNs []string + DirectReadFilter OpFilter + DirectReadSplitMax int32 + DirectReadConcur int + DirectReadNoTimeout bool + DirectReadBounded bool + Unmarshal DataUnmarshaller + Pipe PipelineBuilder + PipeAllowDisk bool + DisableFlatTransactionData bool + Log *log.Logger } type OpResumeToken struct { @@ -714,6 +715,17 @@ func (op *Op) IsDropDatabase() (string, bool) { return "", false } +func (op *Op) IsTransactionApplyOps() bool { + if op.IsCommand() && op.Namespace == "admin.$cmd" { + if op.Data != nil { + if _, ok := op.Data["applyOps"]; ok { + return true + } + } + } + return false +} + func (op *Op) IsCommand() bool { return op.Operation == "c" } @@ -774,13 +786,26 @@ func (buf *OpBuf) Flush(client *mongo.Client, ctx *OpCtx, o *Options) { } ns := make(map[string][]interface{}) byId := make(map[interface{}][]*Op) - for _, op := range buf.Entries { + opList := make([]*Op, 0) + var appendOpList = func(op *Op) { + opList = append(opList, op) if op.IsUpdate() && op.Doc == nil { idKey := fmt.Sprintf("%s.%v", op.Namespace, op.Id) ns[op.Namespace] = append(ns[op.Namespace], op.Id) byId[idKey] = append(byId[idKey], op) } } + for _, op := range buf.Entries { + if !o.DisableFlatTransactionData && op.IsTransactionApplyOps() { + for _, newOp := range op.GetTransactionApplyOpsList() { + if newOp.matchesNsFilter(o) { + appendOpList(newOp) + } + } + } else { + appendOpList(op) + } + } retry: for n, opIds := range ns { var parts = strings.SplitN(n, ".", 2) @@ -798,7 +823,6 @@ retry: op.processData(doc, o) } } - } } if err = cursor.Close(context.Background()); err != nil { @@ -809,7 +833,7 @@ retry: break retry } } - for _, op := range buf.Entries { + for _, op := range opList { if op.matchesFilter(o) { ctx.OpC <- op } @@ -817,6 +841,53 @@ retry: buf.Entries = nil } +func (op *Op) GetTransactionApplyOpsList() (ops []*Op) { + if !op.IsCommand() || op.Data == nil { + return + } + if applyOps, ok := op.Data["applyOps"].([]interface{}); ok { + for _, applyOp := range applyOps { + if opData, ok := applyOp.(map[string]interface{}); ok { + namespace, _ := opData["ns"].(string) + operation, _ := opData["op"].(string) + var data map[string]interface{} + var id interface{} + var b bool + var doc interface{} + if data, b = opData["o"].(map[string]interface{}); !b { + continue + } + switch operation { + case "i", "d": + id = data["_id"] + doc = data + case "u": + if update, ok := opData["o2"].(map[string]interface{}); ok { + id = update["_id"] + } else { + continue + } + default: + continue + } + newOp := &Op{ + Id: id, + Operation: operation, + Namespace: namespace, + Data: data, + Timestamp: op.Timestamp, + Source: op.Source, + Doc: doc, + UpdateDescription: nil, + ResumeToken: op.ResumeToken, + } + ops = append(ops, newOp) + } + } + } + return +} + func (op *Op) shouldParse() bool { return op.IsInsert() || op.IsDelete() || op.IsUpdate() || op.IsCommand() } @@ -915,6 +986,12 @@ func (op *Op) ParseLogEntry(entry *OpLog, o *Options) (include bool, err error) rawField = entry.Doc op.processData(rawField, o) } + // The transaction oplog is a special log format, + // which needs to be unpacked here and then flattened into multiple entries in FetchDocuments before being filtered and matched. + if !o.DisableFlatTransactionData && op.IsTransactionApplyOps() { + include = true + return + } if op.matchesNsFilter(o) { if op.IsInsert() || op.IsDelete() || op.IsUpdate() { if op.IsUpdate() {