Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

What to do / not do when connections to brokers are down ? #785

Closed
4 tasks
vin-d opened this issue Sep 9, 2016 · 19 comments
Closed
4 tasks

What to do / not do when connections to brokers are down ? #785

vin-d opened this issue Sep 9, 2016 · 19 comments
Milestone

Comments

@vin-d
Copy link
Contributor

vin-d commented Sep 9, 2016

Description

Hi I have a question on the usage of librdkafka (c++ wrapper) when brokers go down.
(I don't know how to set up the question flag here).

  • I Have a high level consumer that consumes, then processes asynchronously.
  • I commit offset regularly when the messages have been completly processed.

I trying to find out what to do when the connections to my kafka broker is down.
I simulate that through :
sudo iptables -A INPUT -s 127.0.0.1 -p tcp --destination-port 9092 -j DROP and
sudo iptables -D INPUT -s 127.0.0.1 -p tcp --destination-port 9092 -j DROP

should my program be aware that the brokers are down so I don't request librdkafka to try commit offset (or read the topic partition offset of my consumer group ) while brokers are down ?

second question, how can I know that the connection to the brokers is up or down. If I know that information I would just stop calling consume(), commitSync() or committed() from my KafkaConsumer until I know one broker is up again.

Thank you.

Checklist

Please provide the following information:

  • [ e36cd1e] librdkafka version (release number or git tag):
    ::edit:: also in :
  • [ 8d96733] librdkafka version (release number or git tag):
  • [0.10 ] Apache Kafka version:
  • librdkafka client configuration:
  • [ubuntu 16.04 ] Operating system:
  • [ no ] Using the legacy Consumer
  • [ yes ] Using the high-level KafkaConsumer
  • Provide logs (with debug=.. as necessary) from librdkafka
  • Provide broker log excerpts
  • Critical issue
@vin-d
Copy link
Contributor Author

vin-d commented Sep 9, 2016

I have a segmentation fault in Kafka, I"ve make it happened by blocking then enabling the connection to brokers.

Would it have anything to do by a call I shouldn't have made ?
Would that be a bug ?

sefault is in rdkafka_partition.c at line 2483
if (RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset)) {

while gdb tells :

Thread 5 "KafkaForwarder" received signal SIGSEGV, Segmentation fault.
[Switching to Thread 0x7ffff3a66700 (LWP 22930)]
rd_kafka_topic_partition_list_set_offsets (rk=0x7fffec000c30, rktparlist=rktparlist@entry=0x7fffed3981e0, from_rktp=from_rktp@entry=0, def_value=def_value@entry=-1001, is_commit=is_commit@entry=0) at rdkafka_partition.c:2483
2483 if (RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset)) {

here's a stack
http://pix.toile-libre.org/upload/original/1473439536.jpg

any idea of what happens in such a context ?

@edenhill
Copy link
Contributor

Do you still have that core file?
Can you print offsets and rktpar
E.g.:

gdb> p *offsets
gdb> p i
gdb> p *rktpar

Thanks

@edenhill edenhill added the bug label Sep 12, 2016
@vin-d
Copy link
Contributor Author

vin-d commented Sep 13, 2016

No, I don't.
If remember correctly what was going on here: i was doing the following somewhere in my loop

std::vector<RdKafka::TopicPartition * > vTopPart;
consumer->assignment(vTopPart);
consumer->committed(vTopPart ,2000);

and I've noticed that when playing with iptables for messing with the connection, sometimes the assignment() call returns me an invalid TopicPartition (with empty topic name an invalid partition number (big negative number).

So now I'm filtering vTopPart to remove any TopicPartition with empty topic name and haven't experienced this anymore.

@edenhill
Copy link
Contributor

You need to check the return value of both assignment() and committed(), it is possible that they will return an error when the client is not fully joined.

@vin-d
Copy link
Contributor Author

vin-d commented Sep 13, 2016

Forget my previous comment I don't think it's related since the problem finally just happened again now.

from function rd_kafka_handle_OffsetFetch

(gdb) p *offsets
$1 = {cnt = 1098016100, size = 1949200496, elems = 0x4e7c20312d6b7361}

from function rd_kafka_topic_partition_list_set_offsets

(gdb) p *rktparlist
$3 = {cnt = 1098016100, size = 1949200496, elems = 0x4e7c20312d6b7361}
(gdb) p i
$5 = 0
(gdb) p *rktpar
Cannot access memory at address 0x4e7c20312d6b7361

Here is the gdb core dump
https://drive.google.com/file/d/0B9qWE-U2505lelo1a3ZGSzFEWkk/view?usp=sharing

@vin-d
Copy link
Contributor Author

vin-d commented Sep 13, 2016

as for the return ErrorCode from assignment() and committed() i do check them all now.

@edenhill
Copy link
Contributor

Thanks, I can't use the core file without the corresponding binary though.
Could you go to gdb and: bt full and paste that?

@vin-d
Copy link
Contributor Author

vin-d commented Sep 13, 2016

for the result of bt full
https://drive.google.com/file/d/0B9qWE-U2505lOXNqTFR4QjVQRjA/view?usp=sharing

I can also give you the binary if you need to browse the dump.

@vin-d
Copy link
Contributor Author

vin-d commented Sep 13, 2016

I don't know if the context can help you to understand what's the issue here but in my strength test I start 3 instances of my kafka client, then I block/unblock the connection with this shell script
https://drive.google.com/file/d/0B9qWE-U2505lM25qSnRhYjlQam8/view?usp=sharing

When the seg fault happens, it happens on all three separate instances at the same time.

@vin-d
Copy link
Contributor Author

vin-d commented Sep 14, 2016

I've finally decided to jump a bit in the libRdKafka code :
I trace my fetch offset requests.
in https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_cgrp.c
in case RD_KAFKA_OP_OFFSET_FETCH :
the topicPartition list here is always good.

then in rd_kafka_OffsetFetchRequest (from https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_request.c) I see (I read it somewhere else in the code also) that the list of topic partitions must be sorted. In that context isn't there a problem that variable last_topic is NULL and const ?
Shouldn't it be updated at the end of the for loop because as it is, every topic partition is then considered as a *New topic
(whatever critical that is).

@edenhill
Copy link
Contributor

Thanks for your effort in finding the root cause of the issue, I havent had time to check closer yet but will do so next week.

@edenhill edenhill added this to the 0.9.2 milestone Sep 27, 2016
edenhill added a commit that referenced this issue Sep 29, 2016
@edenhill
Copy link
Contributor

This issue is now fixed on master, please try to verify the fix in your environment.
Thank you.

@vin-d
Copy link
Contributor Author

vin-d commented Oct 11, 2016

cool !
I'll try to test that this week.
Thank you.

On Tue, Oct 11, 2016 at 5:09 PM, Magnus Edenhill notifications@github.com
wrote:

Closed #785 #785.


You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
#785 (comment), or mute
the thread
https://github.com/notifications/unsubscribe-auth/ATQE2zoS-haI6jPlRN8_LN89ME1WhdpOks5qy6a6gaJpZM4J467F
.

The information transmitted is intended only for the person or entity to
which it is addressed and may contain confidential and/or privileged
material. Any review, retransmission, dissemination or other use of, or
taking of any action in reliance upon, this information by persons or
entities other than the intended recipient is prohibited. If you received
this in error, please contact the sender and delete the material from any
computer.

@vin-d
Copy link
Contributor Author

vin-d commented Oct 14, 2016

So I just did a long stress test and haven't seen any SIGSEGV anymore, so I'm comfortable assuming this is fixed.

One note though : during my test I have experienced another issue (maybe related to what was fixed, or maybe that was hidden before as the seg fault would crash the program before).

after 20min of random blocking/ unblocking my Kafka server with iptables i got the following events when Kafka was unavailable :

  • because of a timeout of connection to kafka, librdkafka sends me a rebalance revoke callback.
  • iptables unblock the Kafka port.
  • normally I should get a rebalance assign callback triggered by librdkafka, but it never happens.
  • I've let my program run for 20min, but the assign callback have never been triggered.

As I have registered the eventCb, here are the log of the events I receives around the time the revoke was triggered: http://pastebin.com/w1wJmHPf

@edenhill
Copy link
Contributor

The error/event log wont show when it reconnects to the brokers so it is unfortuantely not enough to troubleshoot this issue.
Can you reproduce with debug=cgrp,broker,topic enabled?

Thanks

@vin-d
Copy link
Contributor Author

vin-d commented Oct 19, 2016

Sorry for the delay,
Here is my program output (no reassign bug.tar.gz) with logging of the event_cb

here is my test script :

sleep 2
iptables -A INPUT -s 127.0.0.1 -p tcp --destination-port 9092 -j DROP
sleep 175
iptables -D INPUT -s 127.0.0.1 -p tcp --destination-port 9092 -j DROP

here is the test scenario with results explained:

  1. startup my program which subscribes topic KFtest-out (10:18:23Z)
  • rebalance assign cb gets called and program consume data
  1. starting my test script (around 10:18:35Z)
  • one minute later librdkafka triggers rebalance revoke callback.
  1. test script ends and librdkafka triggers rebalance assign callback (10:21:45Z)
    [up to here all is good, this was the behaviour we want, but let's do that same test again]

  2. starting my test script (i guess around 10:27:19Z)

  • one minute later librdkafka triggers rebalance revoke callback.
    5) at 10:29:23Z librdkafka outputs the following on cerr

    %3|1476872963.671|FAIL|rdkafka#producer-2| 127.0.0.1:9092/bootstrap: 1 request(s) timed out: disconnect

    %3|1476872963.671|ERROR|rdkafka#producer-2| 127.0.0.1:9092/bootstrap: 1 request(s) timed out: disconnect

(at 10:29:25Z)
%4|1476872965.871|METADATA|rdkafka#producer-2| 127.0.0.1:9092/bootstrap: Metadata request failed: Local: Timed out in queue (62207ms)

Note that we haven't seen this output while doing the test the first time.

  1. (around 10:30:30Z) test script ends

(at 10:31:30Z)
%3|1476873091.062|FAIL|rdkafka#producer-2| 127.0.0.1:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection timed out

%3|1476873091.062|ERROR|rdkafka#producer-2| 127.0.0.1:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection timed out

(at 10:33:39Z)
%3|1476873219.318|FAIL|rdkafka#producer-2| 127.0.0.1:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection timed out

%3|1476873219.318|ERROR|rdkafka#producer-2| 127.0.0.1:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection timed out

  1. then there are connection events at 10:33:39Z
  • then we get infinite events of

Group "Kafka-consumer-group" received op GET_ASSIGNMENT (v0) in state wait-coord (join state init, v11) and never get a rebalance assign.

Do a search for the word "triggered" if you want to browse where the 4 rebalance callbacks are called in the log.

@edenhill
Copy link
Contributor

I'm not sure why nothing seems to happen after it reconnects to broker 0 at 10:33:39.
Can you enable debug=cgrp,topic,broker,protocol,metadata and provide logs?

6:
GET_ASSIGNMENT is the internal representation of your application calling assignment()

@vin-d
Copy link
Contributor Author

vin-d commented Oct 20, 2016

Here is my program output (no reassign bug2.txt.tar.gz) using debug=cgrp,topic,broker,protocol,metadata

at 06:51:16Z I started my test script (this is not in the txt file which begins at 06:52:28Z )
around 06:54:12Z test script ends.
at 06:51:16Z I log : KAFKA triggered a rebalance event = Local: Assign partitions. {KFtest-out:0}

at 06:55:50Z I started the script again
at 06:56:51Z I log : KAFKA triggered a rebalance event = Local: Revoke partitions. {KFtest-out:0}
around 6:58:45Z test script ends.
from 06:59:01Z I get the the log :
Group "Kafka-consumer-group" received op GET_ASSIGNMENT (v0) in state wait-coord (join state init, v11)

@vin-d
Copy link
Contributor Author

vin-d commented Oct 20, 2016

in the following logs, isn't there a reset missing on the state machine when following the events

%3|1476951526.420|FAIL|rdkafka#producer-2| 127.0.0.1:9092/bootstrap: Receive failed: Disconnected
%3|1476951526.420|ERROR|rdkafka#producer-2| 127.0.0.1:9092/bootstrap: Receive failed: Disconnected

perhaps the state should change from being state wait-coord and go back an earlier state like query-coord ?
Does that make any sense ?

2016-10-20 08:18:46Z KafKaForwarderApp.task-1 |Fatal| : 
------------------------------------------------------------
 event.type()= 2
 event.str()= vincent-UX305UA:9092/0: Received MetadataResponse (v0, 1446 bytes, CorrId 23523, rtt 0.08ms)
 event.err()= 0. detail: Success
 event.severity()= 7
 event.fac()= RECV
 event.throttle_time()= 0
------------------------------------------------------------
2016-10-20 08:18:46Z KafKaForwarderApp.task-1 |Fatal| : 
------------------------------------------------------------
 event.type()= 2
 event.str()= vincent-UX305UA:9092/0: ===== Received metadata =====
 event.err()= 0. detail: Success
 event.severity()= 7
 event.fac()= METADATA
 event.throttle_time()= 0
------------------------------------------------------------
2016-10-20 08:18:46Z KafKaForwarderApp.task-1 |Fatal| : 
------------------------------------------------------------
 event.type()= 2
 event.str()= vincent-UX305UA:9092/0: 1 brokers, 3 topics
 event.err()= 0. detail: Success
 event.severity()= 7
 event.fac()= METADATA
 event.throttle_time()= 0
------------------------------------------------------------
2016-10-20 08:18:46Z KafKaForwarderApp.task-1 |Fatal| : 
------------------------------------------------------------
 event.type()= 2
 event.str()= vincent-UX305UA:9092/0:   Broker #0/1: vincent-UX305UA:9092 NodeId 0
 event.err()= 0. detail: Success
 event.severity()= 7
 event.fac()= METADATA
 event.throttle_time()= 0
------------------------------------------------------------
2016-10-20 08:18:46Z KafKaForwarderApp.task-1 |Fatal| : 
------------------------------------------------------------
 event.type()= 2
 event.str()= vincent-UX305UA:9092/0:   Topic #0/3: KFtest-out with 1 partitions
 event.err()= 0. detail: Success
 event.severity()= 7
 event.fac()= METADATA
 event.throttle_time()= 0
------------------------------------------------------------
2016-10-20 08:18:46Z KafKaForwarderApp.task-1 |Fatal| : 
------------------------------------------------------------
 event.type()= 2
 event.str()= vincent-UX305UA:9092/0:   Topic KFtest-out partition 0 Leader 0
 event.err()= 0. detail: Success
 event.severity()= 7
 event.fac()= METADATA
 event.throttle_time()= 0
------------------------------------------------------------
2016-10-20 08:18:46Z KafKaForwarderApp.task-1 |Fatal| : 
------------------------------------------------------------
 event.type()= 2
 event.str()= vincent-UX305UA:9092/0:   Topic #1/3: __consumer_offsets with 50 partitions
 event.err()= 0. detail: Success
 event.severity()= 7
 event.fac()= METADATA
 event.throttle_time()= 0
------------------------------------------------------------
2016-10-20 08:18:46Z KafKaForwarderApp.task-1 |Fatal| : 
------------------------------------------------------------
 event.type()= 2
 event.str()= vincent-UX305UA:9092/0:   Topic #2/3: KFtest-in with 1 partitions
 event.err()= 0. detail: Success
 event.severity()= 7
 event.fac()= METADATA
 event.throttle_time()= 0
------------------------------------------------------------
%3|1476951526.420|FAIL|rdkafka#producer-2| 127.0.0.1:9092/bootstrap: Receive failed: Disconnected
%3|1476951526.420|ERROR|rdkafka#producer-2| 127.0.0.1:9092/bootstrap: Receive failed: Disconnected
2016-10-20 08:18:46Z KafKaForwarderApp |Information| : statistics: 
    task-1:hourly_duplicate_counter = 500461

2016-10-20 08:18:47Z KafKaForwarderApp.task-1 |Fatal| : 
------------------------------------------------------------
 event.type()= 2
 event.str()= Group "Kafka-consumer-group" received op GET_ASSIGNMENT (v0) in state wait-coord (join state init, v11)
 event.err()= 0. detail: Success
 event.severity()= 7
 event.fac()= CGRPOP
 event.throttle_time()= 0
------------------------------------------------------------
2016-10-20 08:18:49Z KafKaForwarderApp.task-1 |Fatal| : 
------------------------------------------------------------
 event.type()= 2
 event.str()= Group "Kafka-consumer-group" received op GET_ASSIGNMENT (v0) in state wait-coord (join state init, v11)
 event.err()= 0. detail: Success
 event.severity()= 7
 event.fac()= CGRPOP
 event.throttle_time()= 0
------------------------------------------------------------

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants