From 756fa71a6b8b825082c6f4fac95e5d44cf107519 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Tue, 16 Dec 2025 17:30:10 -0500 Subject: [PATCH 1/4] Always send shutdown_worker RPC, fix WorkerStatus state when shutting down --- crates/sdk-core/src/pollers/poll_buffer.rs | 6 +-- crates/sdk-core/src/worker/client.rs | 4 +- crates/sdk-core/src/worker/mod.rs | 51 +++++++++++----------- 3 files changed, 29 insertions(+), 32 deletions(-) diff --git a/crates/sdk-core/src/pollers/poll_buffer.rs b/crates/sdk-core/src/pollers/poll_buffer.rs index 025cf9efa..80843e175 100644 --- a/crates/sdk-core/src/pollers/poll_buffer.rs +++ b/crates/sdk-core/src/pollers/poll_buffer.rs @@ -960,13 +960,11 @@ mod tests { // longer. assert!( elapsed >= Duration::from_millis(200), - "Should wait at least the interrupt period. Elapsed: {:?}", - elapsed + "Should wait at least the interrupt period. Elapsed: {elapsed:?}", ); assert!( elapsed < Duration::from_secs(1), - "Should not wait too long. Elapsed: {:?}", - elapsed + "Should not wait too long. Elapsed: {elapsed:?}", ); // Clean up diff --git a/crates/sdk-core/src/worker/client.rs b/crates/sdk-core/src/worker/client.rs index 3702f59dc..b344ce282 100644 --- a/crates/sdk-core/src/worker/client.rs +++ b/crates/sdk-core/src/worker/client.rs @@ -27,8 +27,7 @@ use temporalio_common::{ }, deployment, enums::v1::{ - TaskQueueKind, VersioningBehavior, WorkerStatus, WorkerVersioningMode, - WorkflowTaskFailedCause, + TaskQueueKind, VersioningBehavior, WorkerVersioningMode, WorkflowTaskFailedCause, }, failure::v1::Failure, nexus, @@ -696,7 +695,6 @@ impl WorkerClient for WorkerClientBag { ) -> Result { let mut final_heartbeat = final_heartbeat; if let Some(w) = final_heartbeat.as_mut() { - w.status = WorkerStatus::Shutdown.into(); self.set_heartbeat_client_fields(w); } let mut request = ShutdownWorkerRequest { diff --git a/crates/sdk-core/src/worker/mod.rs b/crates/sdk-core/src/worker/mod.rs index a001a9f09..6a3bc523d 100644 --- a/crates/sdk-core/src/worker/mod.rs +++ b/crates/sdk-core/src/worker/mod.rs @@ -260,9 +260,6 @@ impl WorkerTrait for Worker { ); } self.shutdown_token.cancel(); - { - *self.status.write() = WorkerStatus::ShuttingDown; - } // First, unregister worker from the client if !self.client_worker_registrator.shared_namespace_worker { let _res = self @@ -737,31 +734,35 @@ impl Worker { /// completed async fn shutdown(&self) { self.initiate_shutdown(); - if let Some(workflows) = &self.workflows - && let Some(name) = workflows.get_sticky_queue_name() { - let heartbeat = self - .client_worker_registrator - .heartbeat_manager - .as_ref() - .map(|hm| hm.heartbeat_callback.clone()()); - - // This is a best effort call and we can still shutdown the worker if it fails - match self.client.shutdown_worker(name, heartbeat).await { - Err(err) - if !matches!( - err.code(), - tonic::Code::Unimplemented | tonic::Code::Unavailable - ) => - { - warn!( - "shutdown_worker rpc errored during worker shutdown: {:?}", - err - ); - } - _ => {} + *self.status.write() = WorkerStatus::ShuttingDown; + } + let heartbeat = self + .client_worker_registrator + .heartbeat_manager + .as_ref() + .map(|hm| hm.heartbeat_callback.clone()()); + let sticky_name = self + .workflows + .as_ref() + .and_then(|wf| wf.get_sticky_queue_name()) + .unwrap_or_default(); + // This is a best effort call and we can still shutdown the worker if it fails + match self.client.shutdown_worker(sticky_name, heartbeat).await { + Err(err) + if !matches!( + err.code(), + tonic::Code::Unimplemented | tonic::Code::Unavailable + ) => + { + warn!( + "shutdown_worker rpc errored during worker shutdown: {:?}", + err + ); } + _ => {} } + // We need to wait for all local activities to finish so no more workflow task heartbeats // will be generated if let Some(la_mgr) = &self.local_act_mgr { From 9d47601c47cc7cd7c3e22a8d67b27e0a54b8ba5e Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Wed, 17 Dec 2025 14:21:30 -0500 Subject: [PATCH 2/4] Split unregister worker into unregister_slot_provider and finalize_unregister --- crates/client/src/worker/mod.rs | 112 +++++++++++++++++---- crates/sdk-core/src/core_tests/workers.rs | 20 ++-- crates/sdk-core/src/worker/client/mocks.rs | 4 +- crates/sdk-core/src/worker/mod.rs | 16 ++- 4 files changed, 115 insertions(+), 37 deletions(-) diff --git a/crates/client/src/worker/mod.rs b/crates/client/src/worker/mod.rs index 741d79ad1..71a0679d5 100644 --- a/crates/client/src/worker/mod.rs +++ b/crates/client/src/worker/mod.rs @@ -213,26 +213,48 @@ impl ClientWorkerSetImpl { Ok(()) } - fn unregister( + /// Slot provider should be unregistered at the beginning of worker shutdown, in order to disable + /// eager workflow start. + fn unregister_slot_provider(&mut self, worker_instance_key: Uuid) -> Result<(), anyhow::Error> { + let worker = self.all_workers.get(&worker_instance_key).ok_or_else(|| { + anyhow::anyhow!("Worker not in all_workers during slot provider unregister") + })?; + + let slot_key = SlotKey::new( + worker.namespace().to_string(), + worker.task_queue().to_string(), + ); + if let Some(slot_vec) = self.slot_providers.get_mut(&slot_key) { + slot_vec.retain(|info| info.worker_id != worker_instance_key); + if slot_vec.is_empty() { + self.slot_providers.remove(&slot_key); + } + } + Ok(()) + } + + fn finalize_unregister( &mut self, worker_instance_key: Uuid, ) -> Result, anyhow::Error> { let worker = self .all_workers .remove(&worker_instance_key) - .ok_or_else(|| { - anyhow::anyhow!("Worker with worker_instance_key {worker_instance_key} not found") - })?; + .ok_or_else(|| anyhow::anyhow!("Worker not found in all_workers"))?; + // Worker should already be removed from slot_providers let slot_key = SlotKey::new( worker.namespace().to_string(), worker.task_queue().to_string(), ); - - if let Some(slot_vec) = self.slot_providers.get_mut(&slot_key) { - slot_vec.retain(|info| info.worker_id != worker_instance_key); - if slot_vec.is_empty() { - self.slot_providers.remove(&slot_key); + if let Some(slot_vec) = self.slot_providers.get(&slot_key) { + if slot_vec + .iter() + .any(|info| info.worker_id == worker_instance_key) + { + return Err(anyhow::anyhow!( + "Worker still in slot_providers during finalize" + )); } } @@ -323,12 +345,24 @@ impl ClientWorkerSet { .register(worker, skip_client_worker_set_check) } - /// Unregisters a local worker, typically when that worker starts shutdown. - pub fn unregister_worker( + /// Disables Eager Workflow Start for this worker. This must be called before + /// `finalize_unregister`, otherwise `finalize_unregister` will return an err. + pub fn unregister_slot_provider(&self, worker_instance_key: Uuid) -> Result<(), anyhow::Error> { + self.worker_manager + .write() + .unregister_slot_provider(worker_instance_key) + } + + /// Finalizes unregistering of worker from client. This must be called at the end of worker + /// shutdown in order to finalize shutdown for worker heartbeat properly. Must call after + /// `unregister_slot_provider`, otherwise an err will be returned. + pub fn finalize_unregister( &self, worker_instance_key: Uuid, ) -> Result, anyhow::Error> { - self.worker_manager.write().unregister(worker_instance_key) + self.worker_manager + .write() + .finalize_unregister(worker_instance_key) } /// Returns the worker grouping key, which is unique for each worker. @@ -666,9 +700,12 @@ mod tests { assert_eq!(3, manager.num_providers()); let count = worker_keys.iter().fold(0, |count, key| { - manager.unregister_worker(*key).unwrap(); + manager.unregister_slot_provider(*key).unwrap(); + manager.finalize_unregister(*key).unwrap(); // expect error since worker is already unregistered - let result = manager.unregister_worker(*key); + let result = manager.unregister_slot_provider(*key); + assert!(result.is_err()); + let result = manager.finalize_unregister(*key); assert!(result.is_err()); count + 1 }); @@ -896,7 +933,10 @@ mod tests { assert_eq!(providers[1].build_id, Some("build-1".to_string())); } - manager.unregister_worker(worker2_instance_key).unwrap(); + manager + .unregister_slot_provider(worker2_instance_key) + .unwrap(); + manager.finalize_unregister(worker2_instance_key).unwrap(); { let impl_ref = manager.worker_manager.read(); @@ -1016,7 +1056,10 @@ mod tests { drop(impl_ref); // Unregister first worker - manager.unregister_worker(worker_instance_key1).unwrap(); + manager + .unregister_slot_provider(worker_instance_key1) + .unwrap(); + manager.finalize_unregister(worker_instance_key1).unwrap(); // After unregistering first worker: 1 slot provider, 1 heartbeat worker, shared worker still exists assert_eq!(1, manager.num_providers()); @@ -1036,7 +1079,10 @@ mod tests { drop(impl_ref); // Unregister second worker - manager.unregister_worker(worker_instance_key2).unwrap(); + manager + .unregister_slot_provider(worker_instance_key2) + .unwrap(); + manager.finalize_unregister(worker_instance_key2).unwrap(); // After unregistering last worker: 0 slot providers, 0 heartbeat workers, shared worker is removed assert_eq!(0, manager.num_providers()); @@ -1447,8 +1493,11 @@ mod tests { ); manager - .unregister_worker(wf_worker_key) - .expect("should unregister workflow worker"); + .unregister_slot_provider(wf_worker_key) + .expect("should unregister slot provider for workflow worker"); + manager + .finalize_unregister(wf_worker_key) + .expect("should finalize unregister for workflow worker"); // Activity worker should still be registered assert_eq!(1, manager.num_providers()); @@ -1460,9 +1509,30 @@ mod tests { ); manager - .unregister_worker(act_worker_key) - .expect("should unregister activity worker"); + .unregister_slot_provider(act_worker_key) + .expect("should unregister slot provider for activity worker"); + manager + .finalize_unregister(act_worker_key) + .expect("should finalize unregister for activity worker"); assert_eq!(0, manager.num_providers()); } + + #[test] + fn worker_unregister_order() { + let manager = ClientWorkerSet::new(); + let worker = new_mock_provider_with_heartbeat( + "namespace1".to_string(), + "queue1".to_string(), + true, + None, + ); + let worker_instance_key = worker.worker_instance_key(); + manager.register_worker(Arc::new(worker), false).unwrap(); + + let res = manager.finalize_unregister(worker_instance_key); + assert!(res.is_err()); + let err_string = res.err().map(|e| e.to_string()).unwrap(); + assert!(err_string.contains("Worker still in slot_providers during finalize")); + } } diff --git a/crates/sdk-core/src/core_tests/workers.rs b/crates/sdk-core/src/core_tests/workers.rs index 480586159..9a065a3a3 100644 --- a/crates/sdk-core/src/core_tests/workers.rs +++ b/crates/sdk-core/src/core_tests/workers.rs @@ -333,19 +333,15 @@ async fn worker_shutdown_api(#[case] use_cache: bool, #[case] api_success: bool) .returning(|| ("test-core".to_string(), "0.0.0".to_string())); mock.expect_identity() .returning(|| "test-identity".to_string()); - if use_cache { - if api_success { - mock.expect_shutdown_worker() - .times(1) - .returning(|_, _| Ok(ShutdownWorkerResponse {})); - } else { - // worker.shutdown() should succeed even if shutdown_worker fails - mock.expect_shutdown_worker() - .times(1) - .returning(|_, _| Err(tonic::Status::unavailable("fake shutdown error"))); - } + if api_success { + mock.expect_shutdown_worker() + .times(1) + .returning(|_, _| Ok(ShutdownWorkerResponse {})); } else { - mock.expect_shutdown_worker().times(0); + // worker.shutdown() should succeed even if shutdown_worker fails + mock.expect_shutdown_worker() + .times(1) + .returning(|_, _| Err(tonic::Status::unavailable("fake shutdown error"))); } let t = canned_histories::single_timer("1"); diff --git a/crates/sdk-core/src/worker/client/mocks.rs b/crates/sdk-core/src/worker/client/mocks.rs index f1726e7e3..16705a1e9 100644 --- a/crates/sdk-core/src/worker/client/mocks.rs +++ b/crates/sdk-core/src/worker/client/mocks.rs @@ -1,5 +1,5 @@ use super::*; -use futures_util::Future; +use futures_util::{Future, FutureExt}; use std::sync::{Arc, LazyLock}; use temporalio_client::worker::ClientWorkerSet; @@ -53,6 +53,8 @@ pub(crate) fn mock_manual_worker_client() -> MockManualWorkerClient { r.expect_workers() .returning(|| DEFAULT_WORKERS_REGISTRY.clone()); r.expect_is_mock().returning(|| true); + r.expect_shutdown_worker() + .returning(|_, _| async { Ok(ShutdownWorkerResponse {}) }.boxed()); r.expect_sdk_name_and_version() .returning(|| ("test-core".to_string(), "0.0.0".to_string())); r.expect_identity() diff --git a/crates/sdk-core/src/worker/mod.rs b/crates/sdk-core/src/worker/mod.rs index 6a3bc523d..72fc174f4 100644 --- a/crates/sdk-core/src/worker/mod.rs +++ b/crates/sdk-core/src/worker/mod.rs @@ -260,12 +260,12 @@ impl WorkerTrait for Worker { ); } self.shutdown_token.cancel(); - // First, unregister worker from the client + // First, disable Eager Workflow Start if !self.client_worker_registrator.shared_namespace_worker { let _res = self .client .workers() - .unregister_worker(self.worker_instance_key); + .unregister_slot_provider(self.worker_instance_key); } // Push a BumpStream message to the workflow activation queue. This ensures that @@ -347,10 +347,13 @@ impl Worker { CT: Into, { // Unregister worker from current client, register in new client at the end + self.client + .workers() + .unregister_slot_provider(self.worker_instance_key)?; let client_worker = self .client .workers() - .unregister_worker(self.worker_instance_key)?; + .finalize_unregister(self.worker_instance_key)?; let new_worker_client = super::init_worker_client( self.config.namespace.clone(), @@ -798,6 +801,13 @@ impl Worker { if let Some(b) = self.at_task_mgr { b.shutdown().await; } + // Only after worker is fully shutdown do we remove the heartbeat callback + // from SharedNamespaceWorker, allowing for accurate worker shutdown + // from Server POV + let _res = self + .client + .workers() + .finalize_unregister(self.worker_instance_key); } pub(crate) fn shutdown_token(&self) -> CancellationToken { From 61591f804701c81f885d337270b7b2925566e56a Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Wed, 17 Dec 2025 16:12:43 -0500 Subject: [PATCH 3/4] check for unregistration order without mutating state, prevent leak --- crates/client/src/worker/mod.rs | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/crates/client/src/worker/mod.rs b/crates/client/src/worker/mod.rs index 71a0679d5..8ae1f549d 100644 --- a/crates/client/src/worker/mod.rs +++ b/crates/client/src/worker/mod.rs @@ -237,6 +237,20 @@ impl ClientWorkerSetImpl { &mut self, worker_instance_key: Uuid, ) -> Result, anyhow::Error> { + if let Some(worker) = self.all_workers.get(&worker_instance_key) + && let Some(slot_vec) = self.slot_providers.get(&SlotKey::new( + worker.namespace().to_string(), + worker.task_queue().to_string(), + )) + && slot_vec + .iter() + .any(|info| info.worker_id == worker_instance_key) + { + return Err(anyhow::anyhow!( + "Worker still in slot_providers during finalize" + )); + } + let worker = self .all_workers .remove(&worker_instance_key) @@ -1534,5 +1548,12 @@ mod tests { assert!(res.is_err()); let err_string = res.err().map(|e| e.to_string()).unwrap(); assert!(err_string.contains("Worker still in slot_providers during finalize")); + + // previous incorrect call to finalize_unregister should not cause any state leaks when + // properly removed later + manager + .unregister_slot_provider(worker_instance_key) + .unwrap(); + manager.finalize_unregister(worker_instance_key).unwrap(); } } From 19af7ab432ef8a717a1f5fbb99d0b6f225c3f166 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Fri, 19 Dec 2025 09:18:12 -0500 Subject: [PATCH 4/4] Remove duplicate check, fix test --- crates/client/src/worker/mod.rs | 16 ---------------- .../tests/integ_tests/worker_heartbeat_tests.rs | 2 +- 2 files changed, 1 insertion(+), 17 deletions(-) diff --git a/crates/client/src/worker/mod.rs b/crates/client/src/worker/mod.rs index 8ae1f549d..571e950cc 100644 --- a/crates/client/src/worker/mod.rs +++ b/crates/client/src/worker/mod.rs @@ -256,22 +256,6 @@ impl ClientWorkerSetImpl { .remove(&worker_instance_key) .ok_or_else(|| anyhow::anyhow!("Worker not found in all_workers"))?; - // Worker should already be removed from slot_providers - let slot_key = SlotKey::new( - worker.namespace().to_string(), - worker.task_queue().to_string(), - ); - if let Some(slot_vec) = self.slot_providers.get(&slot_key) { - if slot_vec - .iter() - .any(|info| info.worker_id == worker_instance_key) - { - return Err(anyhow::anyhow!( - "Worker still in slot_providers during finalize" - )); - } - } - if let Some(w) = self.shared_worker.get_mut(worker.namespace()) { let (callback, is_empty) = w.unregister_callback(worker.worker_instance_key()); if callback.is_some() && is_empty { diff --git a/crates/sdk-core/tests/integ_tests/worker_heartbeat_tests.rs b/crates/sdk-core/tests/integ_tests/worker_heartbeat_tests.rs index b32162952..68bda6461 100644 --- a/crates/sdk-core/tests/integ_tests/worker_heartbeat_tests.rs +++ b/crates/sdk-core/tests/integ_tests/worker_heartbeat_tests.rs @@ -463,7 +463,7 @@ fn after_shutdown_checks( ); assert_eq!(heartbeat.sdk_name, "temporal-core"); assert_eq!(heartbeat.sdk_version, "0.1.0"); - assert_eq!(heartbeat.status, WorkerStatus::Shutdown as i32); + assert_eq!(heartbeat.status, WorkerStatus::ShuttingDown as i32); assert_eq!(start_time.load().unwrap(), heartbeat.start_time.unwrap()); assert_ne!(