-
Notifications
You must be signed in to change notification settings - Fork 3.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Multiple Subscribe for the same topics #768
Comments
Hello Eden, any chance you can provide some feedback for this one please? |
Sorry for missing this one. A subscription set is atomic, that is, it replaces any previous subscription set. I'm not sure why you want to explicitly handle _ALL_BROKERS_DOWN, librdkafka takes care of all that under the hood and it will resume consumption when the brokers get back up. Another thing that I might have gotten wrong is that you seem to stop polling when you receive the _ALL_BROKERS_DOWN and only resume after setting up the subscription again. If you want to stop receiving messages (which seems unnecesary if the broker is down anyway) you can use the _pause() interface, but keep on calling poll() every now and then to serve events. This is probably what the event times look like in your scenarion:
|
Splendid, i see what you are saying. I was aware of the atomicity of the subscription set. So it is either that: I really hope that it is not point two as the application i have has dynamic subscription update based on user input. I hope that if the user decides to update the subscription and the cluster happens to be down, that this wont cause no more messages to be served :) as i have no way to fix this state. Botton line i will test if not stopping polling helps the matter. Now, i can only confirm that not updating the subscription helps. |
Yup:
Let me know how it goes. |
Oh and to clarify. You actually make a great point and it really does not make much sense to stop polling for messages from cluster D if i keep the connection to it. I was stopping as to avoid duplicate messages during failover to the second cluster. But as i will get those duplicates during failback anyway, then it does not make much sense to stop polling. |
Hello Eden, i have altered the logic to not stop polling for the connection to cluster D. So far it looks like there is always a matching subsequent PartitionsAssigned after PartitionsRevoked. Though, I'm not sure how many times i should test to guarantee it works as I cannot have a reliable reproduction. I hope this is the last of this, and i am closing this issue. Thank you for the tip on the silent contract. |
Looks like it did not fix it after all. What i am seeing now is that if I: Now neither PartitionsAssigned or PartitionsRevoked get called. Consequently C is not getting any messages. What do you recommend? |
In step 4, why are you calling subscribe() again? You dont need to do that it will recover automatically. |
The consumer API i expose provides the user the ability to decide what topics and from which offsets to consume from during cluster_back_up events. So when C detects that D is back up, it executes a callback to retrieve the new subscription set together with offsets. |
Okay, fair enough. Can you reproduce this with |
Sure will try that. The logging i had for this running process (it is still running now) shows that there was a PartitionsRevoked event called 17 hours ago and there is no matching PartitionsAssigned after. |
While i am trying to reproduce i notice that the logs show _ALL_BROKERS_DOWN.4/4 brokers are down - but i only have 3. Have you seen that before or is it not a cause for concern? Not sure how long it will take me to reproduce - working on this. But another thing i thought about, could it be that the call to subscribe somehow clashes with the PartitionsRevoked execution? Obviously PartitionsRevoked and PartitionsAssigned run on a different thread as opposed to my rd_kafka_subscribe call. Should i change the code to guarantee that subscribe and PartitionsRevoked are serial? |
Theres an internal broker that it counts too, neat, huh? ;) |
Here is the log for a successful reproduction: Failed over between DC2:9092 --> DC1:9092
|
I've highlighted in bold where failover (in this case failback from DC2 to DC1) happens. At this point i: Further you should see a highlighted OnPartitionsRevoked at 2016-09-06T10:31:09.4110537Z 5. |
Do you have the full logs for consumer-1 (DC1)? |
Here are the entries for just DC1.BROKER1:
tate query-coord: coordinator (DC1.BROKER1:9092/1) is unavailable: retrying later |
Thanks, that is not the complete log though :/. Any chance you have the complete consumer-1 log? I think I have found the issue but I would need the log to verify it: The offending log message that verifies this is the problem should look something like this: |
There are no entries with GroupCoordinator :| How about the entries with coord? Or should i just enable debug all and try again? |
Can you try the latest fix on master? |
Woah! Of course, one sec. |
While i test, can you please let me know if rd_kafka_assign is as heavy weight as rd_kafka_subscribe? Basically, in my API, i wonder if the optimisation of calling rd_kafka_assign instead of rd_kafka_subscribe is worth it if i can detect that the set of topics to subscribe to is the same. |
You typically only need to use subscribe() if you want to take part of a balanced consumer group. With just assign() you dont have to wait for the consumer group protocol stuff (rebalances, etc), but will start consuming right away. |
Thank you. I will take that as meaning that - for the same set of topics as a previous subscription - assign is less heavy weight and preferred to calling subscribe. |
Still happens. Here are some log entries with this new build. Log level: cgrp.topic: 2016-09-06T15:54:29.8919078Z 3.ClientLog: 0.DC1:9092.rdkafka#consumer-1|7|CGRPOP|Group "42e94ace-0706-413d-a0a8-3704a2ed867e06/09/2016 15:52:09" received op SUBSCRIBE in sta |
I am not getting any of PartitionsAssigned or PartitionsRevoked anymore, once DC1 is back up. Do you think my best bet is to just create a new connection to DC1 on failover? |
Interesting, can I get the full log from the above run? |
The consumer getting stuck in wait-coord is a serious issue so I really want to fix this, very thankful for your help! |
Working on this now. Until i produce the logs, fyi, i notice that the revoke/assign pair always happens if i subscribe to the same topic over and over in a loop. This is without taking down the cluster. I understand why this is as expected. Just wondering if doing the subscribe during cluster becoming available again introduces some race. |
Also while trying to reproduce this, i notice that the producer that tries to produce to the same cluster that i keep taking down cannot produce any more messages and is stuck in: |
Ok, just as i thought i reproduced it, the consumer started getting messages 5 minutes later... If you are curious these are the logs for the 5 minutes: Message from: SmallObjectTopicToTest 51172 |
I think what i am seeing now is what i described earlier when i re-opened the issue. Hence the above log is what you are looking for. I have manage to reproduce this again and for 4 minutes i was not getting any new messages. So to sum up: |
Well this looks pretty catastrophic. Left my test application running over the weekend. And today i notice that the consumer stopped getting any new messages. Again there was a PartitionsRevoked without any subsequent PartitionsAssigned. But the real concern is that this happened without the cluster being down! The consumer worked for 2.5 days until today. It is 5 hours now that it has not receiving any new messages. |
Do you have any logs, client or broker side? |
My plan now is to just always create a new connection if i get a PartitionsRevoked. |
This is the only log i have consumer side. 3 Broker cluster. The last message received was at 14:01: 12/09/2016 12:28:48 : OnError: DC1.BROKER1:9092,DC1.BROKER2:9092,DC1.BROKER3. _TRANSPORT. DC1.BROKER3:9092/3: Connection closed. |
The frequent connection closings are due to the broker's idle connection reaper. Also, around the time your last message was received there seemed to be a network error of some sort since it could no longer resolve the host:
|
Seeing those scenarios, do you expect the consumer connection to continue working eventually? I have started up a parallel consumer and it works. The one that is running for days is still not getting any new messages. |
Can you see if the broken client still has open connections to the broker and if it is sending data? ( |
TCP 10.70.80.24:52922 DC1.BROKER1:9092 ESTABLISHED However Network monitor is not showing any data being passed/received. |
The consumer should recover after a disconnect. |
Ok after waiting some more, i see some traffic to all three brokers. Still the consumer is not getting any new messages. As I need to release soon, what do you think of the plan of creating a new connection every time i get a PartitionsRevoked? Seems like the best bet until this is fixed. |
Yes, that seems like a reasonable workaround at this time. |
I attached a debugger and see these log entries from the log callback: |
After i discovered break point actions feature in VS, i can send you some more logs from the same callback: rdkafka#consumer-5|7|METADATA|DC1.BROKER2:9092/2: Request metadata for locally known topics: sparse periodic refresh |
Sorry for dropping this one, what is the current status? |
Hi Magnus, happy new year! We are still on 0.9.1 so maybe it is fixed already with 0.9.2. My code recreates the client as soon as partitions get revoked. Until we switch to the new version i can't tell you if this is fixed. Maybe closer to March. |
You too! Okay, I'll close this for now, we can reopen if you still see the same issue on 0.9.2 or 0.9.3 which will be available before march. |
Hello Eden,
Is it correct to allow calling rd_kafka_subscribe multiple times for the same topic lists?
The reason why i ask is that i have a test case where a consumer fails to resume consuming messages if the whole cluster was down. Scenario:
1.The only consumer C in group.id G consumes messages from cluster D.
2.I switch of cluster D completely.
3.My consumer app detects that D is down (via _ALL_BROKERS_DOWN) and stops polling for messages.
4.I switch cluster D on again.
5.My consumer app detects that D is up. It then calls rd_kafka_subscribe on the same topics as before and resumes polling for messages.
After step 5 i get a partitions revoked event. And there is nothing i can do to make consumer C carry on getting new messages. I have to tear it down and create a new one.
HOWEVER, if i do not call rd_kafka_subscribe before resuming the poll, i cannot reproduce this issue.
So is the problem that i am calling rd_kafka_subscribe again? Should i guard my API to not allow calling rd_kafka_subscribe for the same topics over and over? Or is it if you want to call rd_kafka_subscribe for the same topics then you need to first call rd_kafka_unsubscribe?
The text was updated successfully, but these errors were encountered: