-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
compressable:Zstd comp support #4657
base: master
Are you sure you want to change the base?
compressable:Zstd comp support #4657
Conversation
2b8c9d1
to
4143998
Compare
6bf74ae
to
0b79317
Compare
Thanks! I will see this this weekend! |
@daipom did u get chance to look at this? |
Sorry, I haven't made time for this. 😢 |
Its fine i guess, |
@Athishpranav2003 I have confirmed the entire direction! Sorry I haven't made time to see the detailed implementation, such as It looks basically good as The core logic would be the following.
It is complicated, but the compression of Buffer and Chunk and the flush process of output plugins are closely related.
|
@daipom thanks for the review. Yah, this will kick a lot more changes in the overall events pipeline. |
@daipom can you please check the change in the Compression module as well. Once if we merge this we can proceed with the other development around this |
Sure! I will review the
To judge if we can merge this only with the support of the module and It would be necessary to update |
Sure @daipom If it's needed then I can split the PR into 2 where this will only focus on |
@daipom how to proceed in this PR? |
Sorry, please give me a few more days 😢 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Athishpranav2003
Sorry for my late response 😢
I have commented on some points, but basically it looks good! Thanks!
About our future policies, I think it is better not to separate the PRs, as these changes need to be considered in combination with updating Forward-Protocol.
How about supporting the out_forward
by implementing Output/Buffer/Chunk logic in this PR?
My concern is whether we can support Buffer/Chunk zstd compression.
To support it, the compressed data must be able to be concatenated.
gzip allows it.
It appears that we can concat zstd files without any problems, but I wonder if this is an officially supported specification of zstd and zstd-ruby
gem.
$ echo "Hello world" > 1
$ echo "Hello Fluentd" > 2
$ zstd -f 1
$ zstd -f 2
$ cat 1.zst 2.zst > 3.zst
$ zstd -d 3.zst
$ cat 3
Hello world
Hello Fluentd
lib/fluent/event.rb
Outdated
@@ -203,7 +203,7 @@ class MessagePackEventStream < EventStream | |||
# https://github.com/msgpack/msgpack-ruby/issues/119 | |||
|
|||
# Keep cached_unpacker argument for existing plugins | |||
def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil) | |||
def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil, compress: nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil, compress: nil) | |
def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil) |
lib/fluent/event.rb
Outdated
def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil, compress: :gzip) | ||
super |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil, compress: :gzip) | |
super | |
def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil, compress: :gzip) | |
super(data, cached_unpacker, size, unpacked_times: unpacked_times, unpacked_records: unpacked_records) |
lib/fluent/plugin/compressable.rb
Outdated
output_io = kwargs[:output_io] | ||
writer = nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line would be unnecessary.
writer = nil |
lib/fluent/plugin/compressable.rb
Outdated
end | ||
end | ||
|
||
private | ||
|
||
def string_decompress(compressed_data) | ||
def string_decompress(compressed_data, type = :gzip) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about splitting the logic for simplicity?
def string_decompress(compressed_data, type = :gzip)
if type == :gzip
string_decompress_gzip(compressed_data)
elsif type == :zstd
string_decompress_zstd(compressed_data)
else
raise ArgumentError, "Unknown compression type: #{type}"
end
end
lib/fluent/plugin/compressable.rb
Outdated
end | ||
break if io.eof? | ||
end | ||
|
||
out | ||
end | ||
|
||
def io_decompress(input, output) | ||
def io_decompress(input, output, type = :gzip) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
lib/fluent/plugin/in_forward.rb
Outdated
@@ -309,8 +309,7 @@ def on_message(msg, chunk_size, conn) | |||
# PackedForward | |||
option = msg[2] | |||
size = (option && option['size']) || 0 | |||
es_class = (option && option['compressed'] == 'gzip') ? Fluent::CompressedMessagePackEventStream : Fluent::MessagePackEventStream | |||
es = es_class.new(entries, nil, size.to_i) | |||
es = (option && option['compressed']!=nil && option['compressed']!="text") ? Fluent::CompressedMessagePackEventStream.new(entries, nil, size.to_i, compress: option['compressed'].to_sym) : Fluent::MessagePackEventStream.new(entries, nil, size.to_i) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about this for simplicity?
option = msg[2] || {}
size = option['size'] || 0
if option['compressed'] == 'gzip' || option['compressed'] == 'zstd'
es = Fluent::CompressedMessagePackEventStream.new(entries, nil, size.to_i, compress: option['compressed'].to_sym)
else
es = Fluent::MessagePackEventStream.new(entries, nil, size.to_i)
end
0b79317
to
dbbd2f6
Compare
@daipom i addressed your comments but couldnt test it locally due to some issue. |
Signed-off-by: Athish Pranav D <athishanna@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did not this code work?
option = msg[2] | ||
size = (option && option['size']) || 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
option = msg[2] | |
size = (option && option['size']) || 0 | |
option = msg[2] || {} | |
size = option['size'] || 0 |
es_class = (option && option['compressed'] == 'gzip') ? Fluent::CompressedMessagePackEventStream : Fluent::MessagePackEventStream | ||
es = es_class.new(entries, nil, size.to_i) | ||
|
||
if option['compressed'] == 'gzip' && option['compressed'] != 'zstd' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if option['compressed'] == 'gzip' && option['compressed'] != 'zstd' | |
if option['compressed'] == 'gzip' || option['compressed'] == 'zstd' |
I guess there is a small careless I will check after 18th |
Hmm, these changes would be necessary, but looks like the error in your environment is caused by another reason.
|
OK! I'm sorry my response was so slow, and it took so long. |
Which issue(s) this PR fixes:
Fixes #4162
What this PR does / why we need it:
Adds new compression method support to handle messages
Docs Changes:
TODO
Release Note:
N/A