Skip to content

Commit

Permalink
Add path tag to logparser containing path of logfile (#3098)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored Aug 7, 2017
1 parent 07cda89 commit d9ddf7b
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 13 deletions.
29 changes: 20 additions & 9 deletions plugins/inputs/logparser/logparser.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,18 @@ type LogParser interface {
Compile() error
}

type logEntry struct {
path string
line string
}

// LogParserPlugin is the primary struct to implement the interface for logparser plugin
type LogParserPlugin struct {
Files []string
FromBeginning bool

tailers map[string]*tail.Tail
lines chan string
lines chan logEntry
done chan struct{}
wg sync.WaitGroup
acc telegraf.Accumulator
Expand Down Expand Up @@ -112,7 +117,7 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
defer l.Unlock()

l.acc = acc
l.lines = make(chan string, 1000)
l.lines = make(chan logEntry, 1000)
l.done = make(chan struct{})
l.tailers = make(map[string]*tail.Tail)

Expand Down Expand Up @@ -214,9 +219,14 @@ func (l *LogParserPlugin) receiver(tailer *tail.Tail) {
// Fix up files with Windows line endings.
text := strings.TrimRight(line.Text, "\r")

entry := logEntry{
path: tailer.Filename,
line: text,
}

select {
case <-l.done:
case l.lines <- text:
case l.lines <- entry:
}
}
}
Expand All @@ -229,22 +239,23 @@ func (l *LogParserPlugin) parser() {

var m telegraf.Metric
var err error
var line string
var entry logEntry
for {
select {
case <-l.done:
return
case line = <-l.lines:
if line == "" || line == "\n" {
case entry = <-l.lines:
if entry.line == "" || entry.line == "\n" {
continue
}
}

for _, parser := range l.parsers {
m, err = parser.ParseLine(line)
m, err = parser.ParseLine(entry.line)
if err == nil {
tags := m.Tags()
tags["path"] = entry.path
if m != nil {
l.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
l.acc.AddFields(m.Name(), m.Fields(), tags, m.Time())
}
} else {
log.Println("E! Error parsing log line: " + err.Error())
Expand Down
19 changes: 15 additions & 4 deletions plugins/inputs/logparser/logparser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,20 @@ func TestGrokParseLogFiles(t *testing.T) {
"response_time": int64(5432),
"myint": int64(101),
},
map[string]string{"response_code": "200"})
map[string]string{
"response_code": "200",
"path": thisdir + "grok/testdata/test_a.log",
})

acc.AssertContainsTaggedFields(t, "logparser_grok",
map[string]interface{}{
"myfloat": 1.25,
"mystring": "mystring",
"nomodifier": "nomodifier",
},
map[string]string{})
map[string]string{
"path": thisdir + "grok/testdata/test_b.log",
})
}

func TestGrokParseLogFilesAppearLater(t *testing.T) {
Expand Down Expand Up @@ -115,7 +120,10 @@ func TestGrokParseLogFilesAppearLater(t *testing.T) {
"response_time": int64(5432),
"myint": int64(101),
},
map[string]string{"response_code": "200"})
map[string]string{
"response_code": "200",
"path": emptydir + "/test_a.log",
})
}

// Test that test_a.log line gets parsed even though we don't have the correct
Expand Down Expand Up @@ -148,7 +156,10 @@ func TestGrokParseLogFilesOneBad(t *testing.T) {
"response_time": int64(5432),
"myint": int64(101),
},
map[string]string{"response_code": "200"})
map[string]string{
"response_code": "200",
"path": thisdir + "grok/testdata/test_a.log",
})
}

func getCurrentDir() string {
Expand Down

0 comments on commit d9ddf7b

Please sign in to comment.