Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 105 additions & 28 deletions gtm.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,31 +71,32 @@ const (
)

type Options struct {
After TimestampGenerator
Token ResumeTokenGenenerator
Filter OpFilter
NamespaceFilter OpFilter
OpLogDisabled bool
OpLogDatabaseName string
OpLogCollectionName string
ChannelSize int
BufferSize int
BufferDuration time.Duration
Ordering OrderingGuarantee
WorkerCount int
MaxAwaitTime time.Duration
UpdateDataAsDelta bool
ChangeStreamNs []string
DirectReadNs []string
DirectReadFilter OpFilter
DirectReadSplitMax int32
DirectReadConcur int
DirectReadNoTimeout bool
DirectReadBounded bool
Unmarshal DataUnmarshaller
Pipe PipelineBuilder
PipeAllowDisk bool
Log *log.Logger
After TimestampGenerator
Token ResumeTokenGenenerator
Filter OpFilter
NamespaceFilter OpFilter
OpLogDisabled bool
OpLogDatabaseName string
OpLogCollectionName string
ChannelSize int
BufferSize int
BufferDuration time.Duration
Ordering OrderingGuarantee
WorkerCount int
MaxAwaitTime time.Duration
UpdateDataAsDelta bool
ChangeStreamNs []string
DirectReadNs []string
DirectReadFilter OpFilter
DirectReadSplitMax int32
DirectReadConcur int
DirectReadNoTimeout bool
DirectReadBounded bool
Unmarshal DataUnmarshaller
Pipe PipelineBuilder
PipeAllowDisk bool
DisableFlatTransactionData bool
Log *log.Logger
}

type OpResumeToken struct {
Expand Down Expand Up @@ -714,6 +715,17 @@ func (op *Op) IsDropDatabase() (string, bool) {
return "", false
}

func (op *Op) IsTransactionApplyOps() bool {
if op.IsCommand() && op.Namespace == "admin.$cmd" {
if op.Data != nil {
if _, ok := op.Data["applyOps"]; ok {
return true
}
}
}
return false
}

func (op *Op) IsCommand() bool {
return op.Operation == "c"
}
Expand Down Expand Up @@ -774,13 +786,26 @@ func (buf *OpBuf) Flush(client *mongo.Client, ctx *OpCtx, o *Options) {
}
ns := make(map[string][]interface{})
byId := make(map[interface{}][]*Op)
for _, op := range buf.Entries {
opList := make([]*Op, 0)
var appendOpList = func(op *Op) {
opList = append(opList, op)
if op.IsUpdate() && op.Doc == nil {
idKey := fmt.Sprintf("%s.%v", op.Namespace, op.Id)
ns[op.Namespace] = append(ns[op.Namespace], op.Id)
byId[idKey] = append(byId[idKey], op)
}
}
for _, op := range buf.Entries {
if !o.DisableFlatTransactionData && op.IsTransactionApplyOps() {
for _, newOp := range op.GetTransactionApplyOpsList() {
if newOp.matchesNsFilter(o) {
appendOpList(newOp)
}
}
} else {
appendOpList(op)
}
}
retry:
for n, opIds := range ns {
var parts = strings.SplitN(n, ".", 2)
Expand All @@ -798,7 +823,6 @@ retry:
op.processData(doc, o)
}
}

}
}
if err = cursor.Close(context.Background()); err != nil {
Expand All @@ -809,14 +833,61 @@ retry:
break retry
}
}
for _, op := range buf.Entries {
for _, op := range opList {
if op.matchesFilter(o) {
ctx.OpC <- op
}
}
buf.Entries = nil
}

func (op *Op) GetTransactionApplyOpsList() (ops []*Op) {
if !op.IsCommand() || op.Data == nil {
return
}
if applyOps, ok := op.Data["applyOps"].([]interface{}); ok {
for _, applyOp := range applyOps {
if opData, ok := applyOp.(map[string]interface{}); ok {
namespace, _ := opData["ns"].(string)
operation, _ := opData["op"].(string)
var data map[string]interface{}
var id interface{}
var b bool
var doc interface{}
if data, b = opData["o"].(map[string]interface{}); !b {
continue
}
switch operation {
case "i", "d":
id = data["_id"]
doc = data
case "u":
if update, ok := opData["o2"].(map[string]interface{}); ok {
id = update["_id"]
} else {
continue
}
default:
continue
}
newOp := &Op{
Id: id,
Operation: operation,
Namespace: namespace,
Data: data,
Timestamp: op.Timestamp,
Source: op.Source,
Doc: doc,
UpdateDescription: nil,
ResumeToken: op.ResumeToken,
}
ops = append(ops, newOp)
}
}
}
return
}

func (op *Op) shouldParse() bool {
return op.IsInsert() || op.IsDelete() || op.IsUpdate() || op.IsCommand()
}
Expand Down Expand Up @@ -915,6 +986,12 @@ func (op *Op) ParseLogEntry(entry *OpLog, o *Options) (include bool, err error)
rawField = entry.Doc
op.processData(rawField, o)
}
// The transaction oplog is a special log format,
// which needs to be unpacked here and then flattened into multiple entries in FetchDocuments before being filtered and matched.
if !o.DisableFlatTransactionData && op.IsTransactionApplyOps() {
include = true
return
}
if op.matchesNsFilter(o) {
if op.IsInsert() || op.IsDelete() || op.IsUpdate() {
if op.IsUpdate() {
Expand Down