-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
http listener refactor #1915
http listener refactor #1915
Conversation
1bb9945
to
db2579f
Compare
t.acc = acc | ||
t.pool = NewPool(100) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Magic number, but it needs to be something
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can probably be lowered, or made configurable. As it stands, the current code will allocate 100MB+ of RAM for the HTTP buffers, and there is no way for the user to specify otherwise. When it comes to allocating machine resources, I think that should be a configurable option. Especially since our pool isn't leaky, and once it's full, the memory will never drop back down or be released to the OS.
c2889ea
to
9400799
Compare
write_timeout = "10s" | ||
|
||
## Maximum allowed http request body size in bytes. | ||
## 0 means to use the default of 1,000,000,000 bytes (1 gigabyte) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It appears that default is 512MB above, not 1 GB.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few suggestions here and there.
toolarge
and badrequest
have enough in common that they should probably be replaced with a function similar to http.Error
, taking the ResponseWriter
, a status code, and a message.
@@ -0,0 +1,34 @@ | |||
package http_listener | |||
|
|||
type pool struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can just be type pool chan []byte
, otherwise you're allocating a struct whose only field is a reference type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either that, or also include the buffer size as a field in the struct. That would make it configurable at the type level, even if that doesn't get exposed as a user-facing config option.
buffers chan []byte | ||
} | ||
|
||
func NewPool(n int) *pool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would replace this with return make(pool, n)
, I don't think it needs to pre-allocate every slot.
case p.buffers <- b: | ||
default: | ||
// the pool is full, so drop this buffer | ||
b = nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line does nothing. b
will be out of scope once this function returns, which will be immediately after this block, anyway.
t.acc = acc | ||
t.pool = NewPool(100) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can probably be lowered, or made configurable. As it stands, the current code will allocate 100MB+ of RAM for the HTTP buffers, and there is no way for the user to specify otherwise. When it comes to allocating machine resources, I think that should be a configurable option. Especially since our pool isn't leaky, and once it's full, the memory will never drop back down or be released to the OS.
func (t *HttpListener) serveWrite(res http.ResponseWriter, req *http.Request) { | ||
// Check that the content length is not too large for us to handle. | ||
if req.ContentLength > t.MaxBodySize { | ||
toolarge(res) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style nit: tooLarge
and badRequest
(below)
} | ||
|
||
var return400 bool | ||
var buf []byte |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be a get()
from the pool, and the same buffer should be used throughout the processing of a single request.
return | ||
} | ||
|
||
if err == io.ErrUnexpectedEOF || err == io.EOF || n == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
io.ReadFull
only returns io.EOF
if no bytes were read, the n == 0
check can be removed.
t.pool.put(buf) | ||
newlinei := findnewline(body) | ||
log.Printf("E! http_listener received a single line of %d bytes, maximum is %d bytes", | ||
MAX_LINE_SIZE+newlinei, MAX_LINE_SIZE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use len(buf)
here
} | ||
// rotate the bit remaining after the last newline to the front of the buffer | ||
bufstart = len(buf) - i | ||
copy(buf[:bufstart], buf[i:]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
copy(buf, buf[i:])
would also work here.
func findnewline(r io.Reader) int { | ||
counter := 0 | ||
// read until the next newline: | ||
var tmp [1]byte |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reading one byte at a time (especially if it results in a syscall) is very inefficient. This might be better captured with some state in the main parsing loop. var badLineBytes int
, and if that's greater than zero, scan for a newline in the next chunk, log the error, and drop those bytes.
I know this is a WIP, but I've tried running it and I get run-away memory growth. |
This change also doesn't preserve the timestamping of batches for points that don't provide a timestamp. |
@johnrengelman what size of batches are you writing in? approximate write load? approximate number of writers? |
We're batching at 1000 points coming out of the host machines, there's According to the ELB statistics, we're seeing something like 20 MB/s and 26 I tried patching this change by removing the I'll report back once I run on some dedicated hardware. Alternatively, do you have any suggestions for a different implementation On Tue, Oct 18, 2016 at 4:55 PM Cameron Sparr notifications@github.com
|
@johnrengelman At the moment I wouldn't recommend going down that path. The telegraf write path has been designed to make metrics easy to modify and make it simple to write output plugins for widely different protocols and services. The tradeoff has been high write throughput and performance. When running on SSDs, I would say that telegraf performs around half as well as InfluxDB. Significant work would need to be undertaken to optimize the telegraf write path to the level that InfluxDB is at. If your InfluxDB instance is not keeping up with 26 req/s and 200-300 machines, then it sounds like it might be running on some very lean hardware. Unfortunately switching to telegraf as your http listener probably isn't going to help, and probably will only make your issues worse. I don't think relay will help much, as that project only relays the http writes and doesn't do batching. Maybe one of us should start an influxdb-batcher project ;-) |
That's an option I was discussing earlier today. In theory since the line I think our issue on the influx side was overloading it with too many http On Tue, Oct 18, 2016 at 6:02 PM Cameron Sparr notifications@github.com
|
ea70f6a
to
3ed3b51
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few minor things, and some logic issues in the processing code.
@@ -11,6 +13,8 @@ type Buffer struct { | |||
drops int | |||
// total metrics added | |||
total int | |||
|
|||
sync.Mutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably be a named variable, mu
, so that it can't be locked outside of this package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should still be changed
// MAX_LINE_SIZE is the maximum size, in bytes, that can be allocated for | ||
// a single InfluxDB point. | ||
// 64 KB | ||
DEFAULT_MAX_LINE_SIZE = 64 * 1000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a power of two here might make the runtime happier, since for larger values, it will use memory pages directly (generally 4K), so 64 * 1024
would line up nicely.
var hangingBytes bool | ||
buf := t.pool.get() | ||
defer func() { t.pool.put(buf) }() | ||
bufstart := 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: should be bufStart
or just start
len(buf)) | ||
hangingBytes = true | ||
return400 = true | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably also reset start
to zero.
return400 = true | ||
} | ||
// rotate the bit remaining after the last newline to the front of the buffer | ||
bufstart = len(buf) - i |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should always just skip the newline, something like this:
i++ // start copying after the newline
start = len(buf) - i
if start > 0 {
copy(buf, buf[i:])
}
3cbcbc7
to
e666485
Compare
// MAX_LINE_SIZE is the maximum size, in bytes, that can be allocated for | ||
// a single InfluxDB point. | ||
// 64 KB | ||
DEFAULT_MAX_LINE_SIZE = 64 * 1024 | ||
) | ||
|
||
type HttpListener struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be HTTPListener
func (h *HttpListener) Start(acc telegraf.Accumulator) error { | ||
h.mu.Lock() | ||
defer h.mu.Unlock() | ||
h.parser = influx.InfluxParser{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line looks unecessary, unless I'm missing something. Is there some unexported state that needs to be reset if this HttpListener
is reused?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nope, you're right that it's unnecessary, I'll remove that line
var return400 bool | ||
var hangingBytes bool | ||
buf := h.pool.get() | ||
defer func() { h.pool.put(buf) }() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
defer h.pool.put(buf)
will work, unless you want to delay evaluation of the captured variables, which is probably unnecessary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few minor things, and the one question about "resetting" the parser
field. If there is a reason to force a zeroing of the field, add a comment.
9993aac
to
6ce127c
Compare
6ce127c
to
e3ccd6a
Compare
in this commit: - chunks out the http request body to avoid making very large allocations. - establishes a limit for the maximum http request body size that the listener will accept. - utilizes a pool of byte buffers to reduce GC pressure.
e3ccd6a
to
c849b58
Compare
Required for all PRs:
in this commit:
allocations.
listener will accept.