Skip to content

Conversation

@yuezato
Copy link
Member

@yuezato yuezato commented Feb 26, 2019

背景

createまたはjoin時にエラーが発生するissue #65 を解決するためのPR

技術的背景

fibersの内部実装に触れるので、興味がない場合は飛ばしてください

frugalos_config::cluster::create関数

pub fn create<P: AsRef<Path>>(logger: &Logger, mut local: Server, data_dir: P) -> Result<()> {
info!(
logger,
"[START] create: {}",
dump!(local, data_dir.as_ref())
);
// 既にクラスタに参加済みではないかをチェック
track!(assert_to_be_newbie(&logger, &data_dir))?;
// 自分だけを含むRaftクラスタを作成
local.seqno = 0;
let node = server_to_frugalos_raft_node(&local);
let mut executor = track!(ThreadPoolExecutor::new().map_err(Error::from))?;
let rpc_service = RpcServiceBuilder::new()
.logger(logger.clone())
.finish(executor.handle());
let mut rpc_server_builder = RpcServerBuilder::new(node.addr);
let raft_service = frugalos_raft::Service::new(logger.clone(), &mut rpc_server_builder);
let rpc_server = rpc_server_builder.finish(executor.handle());
let (device, rlog) = track!(make_rlog(
logger.clone(),
&data_dir,
&local,
rpc_service.handle(),
executor.handle(),
raft_service.handle(),
vec![local.clone()],
))?;
executor.spawn(rpc_server.map_err(move |e| panic!("Error: {}", e)));
executor.spawn(raft_service.map_err(move |e| panic!("Error: {}", e)));
executor.spawn(rpc_service.map_err(move |e| panic!("Error: {}", e)));
// クラスタ構成に自サーバを登録
let monitor = executor.spawn_monitor(CreateCluster::new(logger.clone(), rlog, local.clone()));
let result = track!(executor.run_fiber(monitor).map_err(Error::from))?;
track!(result.map_err(Error::from))?;
// ディスクの同期を待機 (不要かも)
device.stop(Deadline::Immediate);
let result = track!(executor.run_future(device).map_err(Error::from))?;
track!(result.map_err(Error::from))?;
// ローカルにも情報を保存
track!(save_local_server_info(data_dir, local))?;
info!(logger, "[FINISH] create");
Ok(())
}

に問題があり、この関数を抜けるタイミングでthread '<unnamed>' panicked at 'Error: Other (cause; Monitor target aborted)が発生する。

より具体的に述べるために、以下の部分に注目する:

    let mut executor = track!(ThreadPoolExecutor::new().map_err(Error::from))?;
    ...
    let mut rpc_server_builder = RpcServerBuilder::new(node.addr);
    ...
    let rpc_server = rpc_server_builder.finish(executor.handle());
    ...
    executor.spawn(rpc_server.map_err(move |e| panic!("Error: {}", e)));

この関数を抜けるタイミングで、以下の流れが生じる(ことがある):

  1. executorのdrop処理を進行中に、executorの抱える(fibersレベルでの)pollersメンバ
    https://github.com/dwango/fibers-rs/blob/f104bfab6bc73cbdf159529f810eacd39fda8f52/src/executor/thread_pool.rs#L48-L55
    がdropされる
  2. rpc_serverの抱えるmonitorを使おうとする(spawn済みなのでrpc_server以下は別スレッドで実行されていることに注意する)
    https://github.com/dwango/fibers-rs/blob/f104bfab6bc73cbdf159529f810eacd39fda8f52/src/net/tcp.rs#L150-L153
  3. このmonitorに対して情報を送るmonitoredは、1でdrop済みのpollersメンバが抱えているため既に削除されているため、Monitor target abortedエラーが生じる。

解決策

// クラスタ構成に自サーバを登録
let monitor = executor.spawn_monitor(CreateCluster::new(logger.clone(), rlog, local.clone()));
let result = track!(executor.run_fiber(monitor).map_err(Error::from))?;
track!(result.map_err(Error::from))?;

この部分の処理以降ではrpc_serverの機能は一切用いないため、ここで呼び出しているrun_fiberが終了するタイミングでrpc_serverをストップさせたい。

そのために、frugalos_configに、与えられたfutureをストップ可能なfutureに変形するためのmoduleを加えた:
https://github.com/frugalos/frugalos/blob/9fc3ea66dd9925b79ae72bde2e057361b7aef1e5/frugalos_config/src/cancelable_future.rs

以下のようにcancel可能なfutureにした上で、停止したいタイミングで明示的に停止シグナルを送る:

    let (cancelable_rpc_server, mut rpc_server_cancelizer) = Cancelable::new(rpc_server);
    ...
    executor.spawn(cancelable_rpc_server.map_err(move |e| panic!("Error: {}", e)));
    ...
    track!(rpc_server_cancelizer.send_signal())?;

@yuezato
Copy link
Member Author

yuezato commented Feb 26, 2019

コメント: 既存のcommitで解決するが、新たにこのPRを準備したのはfibers_rpcのServer構造体を触ろうとするとこのstopの意味を相当明確に定めなければならなくなってしまうため(Serverの停止には一般的に設計が幾つかあると思われるため)、futureのレベルの話で止めようと思ったからということがあります。
#65 (comment)

@yuezato yuezato changed the title Futureをcancelするための補助的なmoduleを追加し、frugalos_config::cluster::createで用いている [WIP] Futureをcancelするための補助的なmoduleを追加し、frugalos_config::cluster::createで用いている Feb 27, 2019
@yuezato
Copy link
Member Author

yuezato commented Feb 27, 2019

RpcServiceもcancelの必要ありかどうか確認

結論からいうと、ある。その理由としては、joinでもabortするため。

pub fn join<P: AsRef<Path>>(
logger: &Logger,
local: &Server,
data_dir: P,
contact_server: SocketAddr,
) -> Result<()> {
info!(
logger,
"[START] join: {}",
dump!(local, data_dir.as_ref(), contact_server)
);
// 既にクラスタに参加済みではないかをチェック
track!(assert_to_be_newbie(&logger, &data_dir))?;
// 既存クラスタへの追加を依頼する
let mut executor = track!(ThreadPoolExecutor::new().map_err(Error::from))?;
let rpc_service = RpcServiceBuilder::new()
.logger(logger.clone())
.finish(executor.handle());
let client = Client::new(contact_server, rpc_service.handle());
executor.spawn(rpc_service.map_err(|e| panic!("{}", e)));
let monitor = executor.spawn_monitor(client.put_server(local.clone()));
let result = track!(executor.run_fiber(monitor).map_err(Error::from))?;
let joined = track!(result.map_err(Error::from))?;
info!(
logger,
"This server is joined to the cluster: {}",
dump!(joined)
);
// ローカルにも情報を保存
track!(save_local_server_info(data_dir, joined))?;
info!(logger, "[FINISH] join");
Ok(())
}

確認結果

joinでabortするのはこのケース

./target/debug/frugalos join --contact-server 127.0.0.1:14278 --data-dir child1 --addr 127.0.0.1:14279 --id child1
Feb 27 16:39:34.999 INFO [START] join: local=Server { id: "child1", seqno: 0, host: V4(127.0.0.1), port: 14279 }; data_dir.as_ref()="child1"; contact_server=V4(127.0.0.1:14278); , server: child1@127.0.0.1:14279, module: frugalos_config::cluster:181
Feb 27 16:39:35.006 INFO Creates data directry: "child1", server: child1@127.0.0.1:14279, module: frugalos_config::cluster:113
Feb 27 16:39:35.008 INFO New client-side RPC channel is created, server: 127.0.0.1:14278, server: child1@127.0.0.1:14279, module: fibers_rpc::client_service:143
Feb 27 16:39:35.012 INFO TCP connected: stream=TcpStream { local_addr:V4(127.0.0.1:50688), peer_addr:V4(127.0.0.1:14278), .. }, buffered_messages=1, server: 127.0.0.1:14278, server: child1@127.0.0.1:14279, module: fibers_rpc::client_side_channel:148
Feb 27 16:39:35.017 INFO This server is joined to the cluster: joined=Server { id: "child1", seqno: 1, host: V4(127.0.0.1), port: 14279 }; , server: child1@127.0.0.1:14279, module: frugalos_config::cluster:201
Feb 27 16:39:35.018 INFO [FINISH] join, server: child1@127.0.0.1:14279, module: frugalos_config::cluster:210
Feb 27 16:39:35.019 ERRO A client-side RPC channel aborted: Other (cause; Broken timer)
HISTORY:
  [0] at /Users/yuuya_uezato/.cargo/registry/src/github.com-1ecc6299db9ec823/fibers_rpc-0.2.18/src/client_side_channel.rs:338
  [1] at /Users/yuuya_uezato/.cargo/registry/src/github.com-1ecc6299db9ec823/fibers_rpc-0.2.18/src/client_side_channel.rs:347
  [2] at /Users/yuuya_uezato/.cargo/registry/src/github.com-1ecc6299db9ec823/fibers_rpc-0.2.18/src/client_side_channel.rs:202
  [3] at /Users/yuuya_uezato/.cargo/registry/src/github.com-1ecc6299db9ec823/fibers_rpc-0.2.18/src/client_service.rs:280
, server: 127.0.0.1:14278, server: child1@127.0.0.1:14279, module: fibers_rpc::client_service:158

原因

次のエラーメッセージの通り、timerが問題になる:

Feb 27 16:39:35.019 ERRO A client-side RPC channel aborted: Other (cause; Broken timer)
  [0] at /Users/yuuya_uezato/.cargo/registry/src/github.com-1ecc6299db9ec823/fibers_rpc-0.2.18/src/client_side_channel.rs:338
  [1] at /Users/yuuya_uezato/.cargo/registry/src/github.com-1ecc6299db9ec823/fibers_rpc-0.2.18/src/client_side_channel.rs:347

どのtimerかというと、次のKeepAlive構造体の持つfuture: Timeout:
https://github.com/sile/fibers_rpc/blob/89b04e19ba309ae3d2a095e9d9e07a3963ec3f79/src/client_side_channel.rs#L318-L323

#[derive(Debug)]
struct KeepAlive {
    future: Timeout,
    timeout: Duration,
    extend_period: bool,
}

このTimeoutは次で定義されている:
https://github.com/dwango/fibers-rs/blob/f104bfab6bc73cbdf159529f810eacd39fda8f52/src/time.rs#L53-L58

    #[derive(Debug)]
    pub struct Timeout {
        start: time::Instant,
        duration: time::Duration,
        inner: Option<poll::poller::Timeout>,
    }

このTimeout構造体のpoll呼び出しがエラーになっていることがログから判明しているが、
pollは実際にはOption<poll::poller::Timeout> を呼び出している。
poll::poller::Timeoutは次で定義されている:
https://github.com/dwango/fibers-rs/blob/f104bfab6bc73cbdf159529f810eacd39fda8f52/src/io/poll/poller.rs#L309-L324

#[derive(Debug)]
pub struct Timeout {
    cancel: Option<CancelTimeout>,
    rx: oneshot::Receiver<()>,
}
impl Future for Timeout {
    type Item = ();
    type Error = RecvError;
    fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
        let result = self.rx.poll();
        if result != Ok(futures::Async::NotReady) {
            self.cancel = None;
        }
        result
    }
}

このTimeout構造体は、oneshot::Receiver<()> を用いてシグナルハンドリングしている。
このため、Receiverの対になるsenderが先にdropするとrx.poll()でエラーになる。
joinメソッドのケースにおいては、

     let mut executor = track!(ThreadPoolExecutor::new().map_err(Error::from))?; 

executor内部にsenderが存在しているため、joinメソッドから抜けるタイミングで先にexecutorがdropし、spawnされたfutureの内部でシグナルを受信しようとするパスでエラーとなる。

最小の状況確認コード片

上の話をまとめたものが次のコード片である:
https://gist.github.com/yuezato/b75d35f59b064bdc8697561ba347bc6d

対応

    let rpc_service = RpcServiceBuilder::new()
        .logger(logger.clone())
        .finish(executor.handle());
    let rpc_service_handle = rpc_service.handle();
    let (cancelable_rpc_service, mut rpc_service_cancelizer) = Cancelable::new(rpc_service.map_err(Error::from));

rpc_serviceも明示的にcancelするようにした。

@yuezato yuezato changed the title [WIP] Futureをcancelするための補助的なmoduleを追加し、frugalos_config::cluster::createで用いている Futureをcancelするための補助的なmoduleを追加 frugalos_configで用いる Feb 27, 2019
@koba-e964 koba-e964 changed the base branch from master to develop August 28, 2019 08:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants