Skip to main content

hydro_lang/location/
cluster.rs

1//! Definitions for clusters, which represent a group of identical processes.
2//!
3//! A [`Cluster`] is a multi-node location in the Hydro distributed programming model.
4//! Unlike a [`super::Process`], which maps to a single machine, a cluster represents
5//! a dynamically-sized set of machines that all run the same code. Each member of the
6//! cluster is assigned a unique [`super::MemberId`] that can be used to address it.
7//!
8//! Clusters are useful for parallelism, replication, and sharding patterns. Data can
9//! be broadcast to all members, sent to a specific member by ID, or scattered across
10//! members.
11
12use std::fmt::{Debug, Formatter};
13use std::marker::PhantomData;
14
15use proc_macro2::Span;
16use quote::quote;
17use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens};
18use stageleft::{QuotedWithContextWithProps, quote_type};
19
20use super::dynamic::LocationId;
21use super::{Location, MemberId};
22use crate::compile::builder::FlowState;
23use crate::location::dynamic::ClusterConsistency;
24use crate::location::member_id::TaglessMemberId;
25use crate::location::{LocationKey, TopLevel};
26use crate::staging_util::{Invariant, get_this_crate};
27
28/// A marker trait for levels of consistency that can be guaranteed for a live collection placed
29/// across members of a cluster.
30pub trait Consistency {
31    /// Gets the runtime enum variant associated with this consistency level.
32    fn consistency() -> ClusterConsistency;
33}
34
35/// No consistency is guaranteed across cluster members, which means that the live collection
36/// may take on arbitrarily different values across members.
37pub enum NoConsistency {}
38impl Consistency for NoConsistency {
39    fn consistency() -> ClusterConsistency {
40        ClusterConsistency::NoConsistency
41    }
42}
43
44/// Eventual consistency is guaranteed across cluster members, which means that at steady-state
45/// the live collection will always resolve to the same value across all members of the cluster.
46pub enum EventualConsistency {}
47impl Consistency for EventualConsistency {
48    fn consistency() -> ClusterConsistency {
49        ClusterConsistency::EventualConsistency
50    }
51}
52
53/// A multi-node location representing a group of identical processes.
54///
55/// Each member of the cluster runs the same dataflow program and is assigned a
56/// unique [`MemberId`] that can be used to address it. The number of members
57/// is determined at deployment time rather than at compile time.
58///
59/// The `ClusterTag` type parameter is a phantom tag used to distinguish between
60/// different clusters in the type system, preventing accidental mixing of
61/// member IDs across clusters.
62pub struct Cluster<'a, ClusterTag, Con: Consistency = NoConsistency> {
63    pub(crate) key: LocationKey,
64    pub(crate) flow_state: FlowState,
65    pub(crate) _phantom: Invariant<'a, (ClusterTag, Con)>,
66}
67
68impl<C, Con: Consistency> Debug for Cluster<'_, C, Con> {
69    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
70        write!(f, "Cluster({})", self.key)
71    }
72}
73
74impl<C, Con: Consistency> Eq for Cluster<'_, C, Con> {}
75impl<C, Con: Consistency> PartialEq for Cluster<'_, C, Con> {
76    fn eq(&self, other: &Self) -> bool {
77        self.key == other.key && FlowState::ptr_eq(&self.flow_state, &other.flow_state)
78    }
79}
80
81impl<C, Con: Consistency> Clone for Cluster<'_, C, Con> {
82    fn clone(&self) -> Self {
83        Cluster {
84            key: self.key,
85            flow_state: self.flow_state.clone(),
86            _phantom: PhantomData,
87        }
88    }
89}
90
91impl<'a, C, Con: Consistency> super::dynamic::DynLocation for Cluster<'a, C, Con> {
92    fn dyn_id(&self) -> LocationId {
93        LocationId::Cluster(self.key)
94    }
95
96    fn flow_state(&self) -> &FlowState {
97        &self.flow_state
98    }
99
100    fn is_top_level() -> bool {
101        true
102    }
103
104    fn multiversioned(&self) -> bool {
105        false // TODO(shadaj): enable multiversioning support for clusters
106    }
107
108    fn cluster_consistency() -> Option<ClusterConsistency> {
109        Some(Con::consistency())
110    }
111}
112
113impl<'a, C, Con: Consistency> Location<'a> for Cluster<'a, C, Con> {
114    type Root = Cluster<'a, C, Con>;
115
116    type DropConsistency = Cluster<'a, C, NoConsistency>;
117
118    fn consistency() -> Option<ClusterConsistency> {
119        Some(Con::consistency())
120    }
121
122    fn root(&self) -> Self::Root {
123        self.clone()
124    }
125
126    fn drop_consistency(&self) -> Self::DropConsistency {
127        Cluster {
128            key: self.key,
129            flow_state: self.flow_state.clone(),
130            _phantom: PhantomData,
131        }
132    }
133
134    fn from_drop_consistency(l2: Self::DropConsistency) -> Self {
135        Cluster {
136            key: l2.key,
137            flow_state: l2.flow_state,
138            _phantom: PhantomData,
139        }
140    }
141}
142
143impl<'a, C, Con: Consistency> TopLevel<'a> for Cluster<'a, C, Con> {}
144
145#[cfg(feature = "sim")]
146impl<'a, C> Cluster<'a, C> {
147    /// Sets up a simulated input port on this cluster for testing.
148    ///
149    /// Returns a `SimClusterSender` that sends `(member_id, T)` messages targeting
150    /// specific cluster members, and a `Stream<T>` received by each member.
151    #[expect(clippy::type_complexity, reason = "stream markers")]
152    pub fn sim_input<T>(
153        &self,
154    ) -> (
155        crate::sim::SimClusterSender<
156            T,
157            crate::live_collections::stream::TotalOrder,
158            crate::live_collections::stream::ExactlyOnce,
159        >,
160        crate::live_collections::Stream<
161            T,
162            Self,
163            crate::live_collections::boundedness::Unbounded,
164            crate::live_collections::stream::TotalOrder,
165            crate::live_collections::stream::ExactlyOnce,
166        >,
167    )
168    where
169        T: serde::Serialize + serde::de::DeserializeOwned,
170    {
171        use crate::location::Location;
172
173        let external_location: crate::location::External<'a, ()> = crate::location::External {
174            key: LocationKey::FIRST,
175            flow_state: self.flow_state.clone(),
176            _phantom: PhantomData,
177        };
178
179        let (external, stream) = self.source_external_bincode(&external_location);
180
181        (
182            crate::sim::SimClusterSender(external.port_id, PhantomData),
183            stream,
184        )
185    }
186}
187
188/// A free variable that resolves to the list of member IDs in a cluster at runtime.
189///
190/// When spliced into a quoted snippet, this provides access to the set of
191/// [`TaglessMemberId`]s that belong to the cluster.
192pub struct ClusterIds<'a> {
193    /// The location key identifying which cluster this refers to.
194    pub key: LocationKey,
195    /// Phantom data binding the lifetime.
196    pub _phantom: PhantomData<&'a ()>,
197}
198
199impl<'a> Clone for ClusterIds<'a> {
200    fn clone(&self) -> Self {
201        Self {
202            key: self.key,
203            _phantom: Default::default(),
204        }
205    }
206}
207
208impl<'a, Ctx> FreeVariableWithContextWithProps<Ctx, ()> for ClusterIds<'a> {
209    type O = &'a [TaglessMemberId];
210
211    fn to_tokens(self, _ctx: &Ctx) -> (QuoteTokens, ())
212    where
213        Self: Sized,
214    {
215        let ident = syn::Ident::new(
216            &format!("__hydro_lang_cluster_ids_{}", self.key),
217            Span::call_site(),
218        );
219
220        (
221            QuoteTokens {
222                prelude: None,
223                expr: Some(quote! { #ident }),
224            },
225            (),
226        )
227    }
228}
229
230impl<'a, Ctx> QuotedWithContextWithProps<'a, &'a [TaglessMemberId], Ctx, ()> for ClusterIds<'a> {}
231
232/// Marker trait implemented by [`Cluster`] locations, providing access to the cluster tag type.
233pub trait IsCluster {
234    /// The phantom tag type that distinguishes this cluster from others.
235    type Tag;
236}
237
238impl<C> IsCluster for Cluster<'_, C> {
239    type Tag = C;
240}
241
242/// A free variable representing the cluster's own ID. When spliced in
243/// a quoted snippet that will run on a cluster, this turns into a [`MemberId`].
244pub static CLUSTER_SELF_ID: ClusterSelfId = ClusterSelfId { _private: &() };
245
246/// The concrete type behind [`CLUSTER_SELF_ID`].
247///
248/// This is a compile-time variable that, when spliced into a quoted snippet running
249/// on a [`Cluster`], resolves to the [`MemberId`] of the current cluster member.
250#[derive(Clone, Copy)]
251pub struct ClusterSelfId<'a> {
252    _private: &'a (),
253}
254
255impl<'a, L> FreeVariableWithContextWithProps<L, ()> for ClusterSelfId<'a>
256where
257    L: Location<'a>,
258    <L as Location<'a>>::Root: IsCluster,
259{
260    type O = MemberId<<<L as Location<'a>>::Root as IsCluster>::Tag>;
261
262    fn to_tokens(self, ctx: &L) -> (QuoteTokens, ())
263    where
264        Self: Sized,
265    {
266        let LocationId::Cluster(cluster_id) = ctx.root().id() else {
267            unreachable!()
268        };
269
270        let ident = syn::Ident::new(
271            &format!("__hydro_lang_cluster_self_id_{}", cluster_id),
272            Span::call_site(),
273        );
274        let root = get_this_crate();
275        let c_type: syn::Type = quote_type::<<<L as Location<'a>>::Root as IsCluster>::Tag>();
276
277        (
278            QuoteTokens {
279                prelude: None,
280                expr: Some(
281                    quote! { #root::__staged::location::MemberId::<#c_type>::from_tagless((#ident).clone()) },
282                ),
283            },
284            (),
285        )
286    }
287}
288
289impl<'a, L>
290    QuotedWithContextWithProps<'a, MemberId<<<L as Location<'a>>::Root as IsCluster>::Tag>, L, ()>
291    for ClusterSelfId<'a>
292where
293    L: Location<'a>,
294    <L as Location<'a>>::Root: IsCluster,
295{
296}
297
298#[cfg(test)]
299mod tests {
300    #[cfg(feature = "sim")]
301    use stageleft::q;
302
303    #[cfg(feature = "sim")]
304    use super::CLUSTER_SELF_ID;
305    #[cfg(feature = "sim")]
306    use crate::location::{Location, MemberId, MembershipEvent};
307    #[cfg(feature = "sim")]
308    use crate::networking::TCP;
309    #[cfg(feature = "sim")]
310    use crate::nondet::nondet;
311    #[cfg(feature = "sim")]
312    use crate::prelude::FlowBuilder;
313
314    #[cfg(feature = "sim")]
315    #[test]
316    fn sim_cluster_self_id() {
317        let mut flow = FlowBuilder::new();
318        let cluster1 = flow.cluster::<()>();
319        let cluster2 = flow.cluster::<()>();
320
321        let node = flow.process::<()>();
322
323        let out_recv = cluster1
324            .source_iter(q!(vec![CLUSTER_SELF_ID]))
325            .send(&node, TCP.fail_stop().bincode())
326            .values()
327            .merge_unordered(
328                cluster2
329                    .source_iter(q!(vec![CLUSTER_SELF_ID]))
330                    .send(&node, TCP.fail_stop().bincode())
331                    .values(),
332            )
333            .sim_output();
334
335        flow.sim()
336            .with_cluster_size(&cluster1, 3)
337            .with_cluster_size(&cluster2, 4)
338            .exhaustive(async || {
339                out_recv
340                    .assert_yields_only_unordered([0, 1, 2, 0, 1, 2, 3].map(MemberId::from_raw_id))
341                    .await
342            });
343    }
344
345    #[cfg(feature = "sim")]
346    #[test]
347    fn sim_cluster_with_tick() {
348        use std::collections::HashMap;
349
350        let mut flow = FlowBuilder::new();
351        let cluster = flow.cluster::<()>();
352        let node = flow.process::<()>();
353
354        let out_recv = cluster
355            .source_iter(q!(vec![1, 2, 3]))
356            .batch(&cluster.tick(), nondet!(/** test */))
357            .count()
358            .all_ticks()
359            .send(&node, TCP.fail_stop().bincode())
360            .entries()
361            .map(q!(|(id, v)| (id, v)))
362            .sim_output();
363
364        let count = flow
365            .sim()
366            .with_cluster_size(&cluster, 2)
367            .exhaustive(async || {
368                let grouped = out_recv.collect_sorted::<Vec<_>>().await.into_iter().fold(
369                    HashMap::new(),
370                    |mut acc: HashMap<MemberId<()>, usize>, (id, v)| {
371                        *acc.entry(id).or_default() += v;
372                        acc
373                    },
374                );
375
376                assert!(grouped.len() == 2);
377                for (_id, v) in grouped {
378                    assert!(v == 3);
379                }
380            });
381
382        assert_eq!(count, 106);
383        // not a square because we simulate all interleavings of ticks across 2 cluster members
384        // eventually, we should be able to identify that the members are independent (because
385        // there are no dataflow cycles) and avoid simulating redundant interleavings
386    }
387
388    #[cfg(feature = "sim")]
389    #[test]
390    fn sim_cluster_membership() {
391        let mut flow = FlowBuilder::new();
392        let cluster = flow.cluster::<()>();
393        let node = flow.process::<()>();
394
395        let out_recv = node
396            .source_cluster_membership_stream(&cluster, nondet!(/** test */))
397            .entries()
398            .map(q!(|(id, v)| (id, v)))
399            .sim_output();
400
401        flow.sim()
402            .with_cluster_size(&cluster, 2)
403            .exhaustive(async || {
404                out_recv
405                    .assert_yields_only_unordered(vec![
406                        (MemberId::from_raw_id(0), MembershipEvent::Joined),
407                        (MemberId::from_raw_id(1), MembershipEvent::Joined),
408                    ])
409                    .await;
410            });
411    }
412}