Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ ignore:
- memberlist-core/src/state/tests.rs
- memberlist-core/src/base/tests.rs
- memberlist-core/src/delegate/mock.rs
- memberlist-core/src/transport/unimplemented.rs
- memberlist-core/src/error.rs
- memberlist-net/src/tests.rs
- memberlist-net/src/tests
Expand Down
64 changes: 64 additions & 0 deletions .github/workflows/proto.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
name: proto

on:
push:
branches:
- main
paths-ignore:
- 'README.md'
- 'COPYRIGHT'
- 'LICENSE*'
- '**.md'
- '**.txt'
- 'art'
pull_request:
paths-ignore:
- 'README.md'
- 'COPYRIGHT'
- 'LICENSE*'
- '**.md'
- '**.txt'
- 'art'
workflow_dispatch:
schedule: [cron: "0 1 */7 * *"]

jobs:
test:
name: ${{ matrix.os }} - ${{ matrix.runtime }} - ${{ matrix.stream_layer }}
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os:
- ubuntu-latest
# - macos-latest,
# - windows-latest
runtime: [tokio, async-std, smol]
stream_layer: [tls, tcp]
steps:
- uses: actions/checkout@v4

- name: Install Rust
run: |
rustup update stable && rustup default stable
rustup component add clippy
rustup component add rustfmt

- name: Run Proto Tests (With Rayon)
run: |
cargo test --no-default-features --features "std,metrics,encryption,all-compression,all-checksum,quickcheck,rayon" -- --test-threads=1
working-directory: memberlist-proto

- name: Run Proto Tests (Without Rayon)
run: |
cargo test --no-default-features --features "std,metrics,encryption,all-compression,all-checksum,quickcheck" -- --test-threads=1
working-directory: memberlist-proto

- name: Cache Cargo registry
uses: actions/cache@v4
with:
path: |
~/.cargo/registry
~/.cargo/git
target
key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}-${{ matrix.runtime }}-${{ matrix.stream_layer }}
4 changes: 2 additions & 2 deletions examples/toydb/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use memberlist::{
agnostic::tokio::TokioRuntime,
bytes::Bytes,
delegate::{CompositeDelegate, NodeDelegate},
net::{MaybeResolvedAddress, NetTransportOptions, Node, stream_layer::tcp::Tcp},
proto::{HostAddr, Meta, NodeId},
net::{NetTransportOptions, Node, stream_layer::tcp::Tcp},
proto::{HostAddr, MaybeResolvedAddress, Meta, NodeId},
tokio::TokioTcpMemberlist,
transport::resolver::dns::DnsResolver,
};
Expand Down
3 changes: 3 additions & 0 deletions memberlist-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ tokio = ["agnostic-io/tokio"]
# see memberlist-wasm for more examples about how to use those tests fn
test = ["tracing-subscriber"]

arbitrary = ["dep:arbitrary", "memberlist-proto/arbitrary"]
quickcheck = ["dep:quickcheck", "memberlist-proto/quickcheck"]

[target.'cfg(target_family = "wasm")'.dependencies]
getrandom.workspace = true
once_cell = "1.17"
Expand Down
65 changes: 24 additions & 41 deletions memberlist-core/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
delegate::{Delegate, VoidDelegate},
error::Error,
network::META_MAX_SIZE,
proto::{Alive, Dead, Message, Meta, NodeState, Ping, SmallVec},
proto::{Alive, Dead, MaybeResolvedAddress, Message, Meta, NodeState, Ping, SmallVec},
state::AckMessage,
transport::{AddressResolver, CheapClone, MaybeResolvedAddress, Node, Transport},
transport::{AddressResolver, CheapClone, Node, Transport},
};

impl<T, D> Memberlist<T, D>
where
D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
T: Transport,
{
/// Returns the local node ID.
Expand All @@ -38,13 +38,13 @@

/// Returns a [`Node`] with the local id and the advertise address of local node.
#[inline]
pub fn advertise_node(&self) -> Node<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress> {
pub fn advertise_node(&self) -> Node<T::Id, T::ResolvedAddress> {
Node::new(self.inner.id.clone(), self.inner.advertise.clone())
}

/// Returns the advertise address of local node.
#[inline]
pub fn advertise_address(&self) -> &<T::Resolver as AddressResolver>::ResolvedAddress {
pub fn advertise_address(&self) -> &T::ResolvedAddress {
&self.inner.advertise
}

Expand Down Expand Up @@ -74,9 +74,7 @@

/// Returns the local node instance state.
#[inline]
pub async fn local_state(
&self,
) -> Option<Arc<NodeState<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>>> {
pub async fn local_state(&self) -> Option<Arc<NodeState<T::Id, T::ResolvedAddress>>> {
let nodes = self.inner.nodes.read().await;
nodes
.node_map
Expand All @@ -85,10 +83,7 @@
}

/// Returns the node state of the given id. (if any).
pub async fn by_id(
&self,
id: &T::Id,
) -> Option<Arc<NodeState<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>>> {
pub async fn by_id(&self, id: &T::Id) -> Option<Arc<NodeState<T::Id, T::ResolvedAddress>>> {
let members = self.inner.nodes.read().await;

members
Expand All @@ -99,9 +94,7 @@

/// Returns a list of all known nodes.
#[inline]
pub async fn members(
&self,
) -> SmallVec<Arc<NodeState<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>>> {
pub async fn members(&self) -> SmallVec<Arc<NodeState<T::Id, T::ResolvedAddress>>> {
self
.inner
.nodes
Expand All @@ -120,9 +113,7 @@
}

/// Returns a list of all known nodes that are online.
pub async fn online_members(
&self,
) -> SmallVec<Arc<NodeState<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>>> {
pub async fn online_members(&self) -> SmallVec<Arc<NodeState<T::Id, T::ResolvedAddress>>> {
self
.inner
.nodes
Expand Down Expand Up @@ -151,8 +142,8 @@
/// Returns a list of all known nodes that match the given predicate.
pub async fn members_by(
&self,
mut f: impl FnMut(&NodeState<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>) -> bool,
) -> SmallVec<Arc<NodeState<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>>> {
mut f: impl FnMut(&NodeState<T::Id, T::ResolvedAddress>) -> bool,
) -> SmallVec<Arc<NodeState<T::Id, T::ResolvedAddress>>> {
self
.inner
.nodes
Expand All @@ -168,7 +159,7 @@
/// Returns the number of members match the given predicate.
pub async fn num_members_by(
&self,
mut f: impl FnMut(&NodeState<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>) -> bool,
mut f: impl FnMut(&NodeState<T::Id, T::ResolvedAddress>) -> bool,
) -> usize {
self
.inner
Expand All @@ -184,7 +175,7 @@
/// Returns a list of map result on all known members that match the given predicate.
pub async fn members_map_by<O>(
&self,
mut f: impl FnMut(&NodeState<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>) -> Option<O>,
mut f: impl FnMut(&NodeState<T::Id, T::ResolvedAddress>) -> Option<O>,
) -> SmallVec<O> {
self
.inner
Expand All @@ -207,15 +198,14 @@
pub async fn new(
transport_options: T::Options,
opts: Options,
) -> Result<Self, Error<T, VoidDelegate<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>>>
{
) -> Result<Self, Error<T, VoidDelegate<T::Id, T::ResolvedAddress>>> {
Self::create(None, transport_options, opts).await
}
}

impl<T, D> Memberlist<T, D>
where
D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
T: Transport,
{
/// Create a new memberlist with the given transport, delegate and options.
Expand Down Expand Up @@ -325,8 +315,8 @@
/// Returns the node if successfully joined, or an error if the node could not be reached.
pub async fn join(
&self,
node: Node<T::Id, MaybeResolvedAddress<T>>,
) -> Result<Node<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>, Error<T, D>> {
node: Node<T::Id, MaybeResolvedAddress<T::Address, T::ResolvedAddress>>,
) -> Result<Node<T::Id, T::ResolvedAddress>, Error<T, D>> {
if self.has_left() || self.has_shutdown() {
return Err(Error::NotRunning);
}
Expand Down Expand Up @@ -355,7 +345,7 @@
/// On error, returns a list of nodes are successfully joined with resolved addresses and the error.
pub async fn join_many(
&self,
existing: impl Iterator<Item = Node<T::Id, MaybeResolvedAddress<T>>>,
existing: impl Iterator<Item = Node<T::Id, MaybeResolvedAddress<T::Address, T::ResolvedAddress>>>,
) -> Result<
SmallVec<Node<T::Id, T::ResolvedAddress>>,
(SmallVec<Node<T::Id, T::ResolvedAddress>>, Error<T, D>),
Expand All @@ -382,7 +372,7 @@
"memberlist: failed to resolve address {}",
addr,
);
return Err((Node::new(id, MaybeResolvedAddress::<T>::unresolved(addr)), Error::<T, D>::transport(e)))
return Err((Node::new(id, MaybeResolvedAddress::<T::Address, T::ResolvedAddress>::unresolved(addr)), Error::<T, D>::transport(e)))

Check warning on line 375 in memberlist-core/src/api.rs

View check run for this annotation

Codecov / codecov/patch

memberlist-core/src/api.rs#L375

Added line #L375 was not covered by tests
}
}
}
Expand Down Expand Up @@ -493,11 +483,7 @@
///
/// See also [`send_reliable`](Memberlist::send_reliable).
#[inline]
pub async fn send(
&self,
to: &<T::Resolver as AddressResolver>::ResolvedAddress,
msg: Bytes,
) -> Result<(), Error<T, D>> {
pub async fn send(&self, to: &T::ResolvedAddress, msg: Bytes) -> Result<(), Error<T, D>> {
self.send_many(to, std::iter::once(msg)).await
}

Expand All @@ -508,7 +494,7 @@
#[inline]
pub async fn send_many(
&self,
to: &<T::Resolver as AddressResolver>::ResolvedAddress,
to: &T::ResolvedAddress,
msgs: impl Iterator<Item = Bytes>,
) -> Result<(), Error<T, D>> {
if self.has_left() || self.has_shutdown() {
Expand All @@ -535,7 +521,7 @@
#[inline]
pub async fn send_reliable(
&self,
to: &<T::Resolver as AddressResolver>::ResolvedAddress,
to: &T::ResolvedAddress,
msg: Bytes,
) -> Result<(), Error<T, D>> {
self.send_many_reliable(to, std::iter::once(msg)).await
Expand All @@ -548,7 +534,7 @@
#[inline]
pub async fn send_many_reliable(
&self,
to: &<T::Resolver as AddressResolver>::ResolvedAddress,
to: &T::ResolvedAddress,
msgs: impl Iterator<Item = Bytes>,
) -> Result<(), Error<T, D>> {
if self.has_left() || self.has_shutdown() {
Expand All @@ -560,10 +546,7 @@
}

/// Initiates a ping to the node with the specified node.
pub async fn ping(
&self,
node: Node<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
) -> Result<Duration, Error<T, D>> {
pub async fn ping(&self, node: Node<T::Id, T::ResolvedAddress>) -> Result<Duration, Error<T, D>> {
// Prepare a ping message and setup an ack handler.
let self_addr = self.get_advertise();
let ping = Ping::new(
Expand Down
Loading
Loading