1use std::fmt::{Debug, Formatter};
13use std::marker::PhantomData;
14
15use proc_macro2::Span;
16use quote::quote;
17use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens};
18use stageleft::{QuotedWithContextWithProps, quote_type};
19
20use super::dynamic::LocationId;
21use super::{Location, MemberId};
22use crate::compile::builder::FlowState;
23use crate::location::dynamic::ClusterConsistency;
24use crate::location::member_id::TaglessMemberId;
25use crate::location::{LocationKey, TopLevel};
26use crate::staging_util::{Invariant, get_this_crate};
27
28pub trait Consistency {
31 fn consistency() -> ClusterConsistency;
33}
34
35pub enum NoConsistency {}
38impl Consistency for NoConsistency {
39 fn consistency() -> ClusterConsistency {
40 ClusterConsistency::NoConsistency
41 }
42}
43
44pub enum EventualConsistency {}
47impl Consistency for EventualConsistency {
48 fn consistency() -> ClusterConsistency {
49 ClusterConsistency::EventualConsistency
50 }
51}
52
53pub struct Cluster<'a, ClusterTag, Con: Consistency = NoConsistency> {
63 pub(crate) key: LocationKey,
64 pub(crate) flow_state: FlowState,
65 pub(crate) _phantom: Invariant<'a, (ClusterTag, Con)>,
66}
67
68impl<C, Con: Consistency> Debug for Cluster<'_, C, Con> {
69 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
70 write!(f, "Cluster({})", self.key)
71 }
72}
73
74impl<C, Con: Consistency> Eq for Cluster<'_, C, Con> {}
75impl<C, Con: Consistency> PartialEq for Cluster<'_, C, Con> {
76 fn eq(&self, other: &Self) -> bool {
77 self.key == other.key && FlowState::ptr_eq(&self.flow_state, &other.flow_state)
78 }
79}
80
81impl<C, Con: Consistency> Clone for Cluster<'_, C, Con> {
82 fn clone(&self) -> Self {
83 Cluster {
84 key: self.key,
85 flow_state: self.flow_state.clone(),
86 _phantom: PhantomData,
87 }
88 }
89}
90
91impl<'a, C, Con: Consistency> super::dynamic::DynLocation for Cluster<'a, C, Con> {
92 fn dyn_id(&self) -> LocationId {
93 LocationId::Cluster(self.key)
94 }
95
96 fn flow_state(&self) -> &FlowState {
97 &self.flow_state
98 }
99
100 fn is_top_level() -> bool {
101 true
102 }
103
104 fn multiversioned(&self) -> bool {
105 false }
107
108 fn cluster_consistency() -> Option<ClusterConsistency> {
109 Some(Con::consistency())
110 }
111}
112
113impl<'a, C, Con: Consistency> Location<'a> for Cluster<'a, C, Con> {
114 type Root = Cluster<'a, C, Con>;
115
116 type DropConsistency = Cluster<'a, C, NoConsistency>;
117
118 fn consistency() -> Option<ClusterConsistency> {
119 Some(Con::consistency())
120 }
121
122 fn root(&self) -> Self::Root {
123 self.clone()
124 }
125
126 fn drop_consistency(&self) -> Self::DropConsistency {
127 Cluster {
128 key: self.key,
129 flow_state: self.flow_state.clone(),
130 _phantom: PhantomData,
131 }
132 }
133
134 fn from_drop_consistency(l2: Self::DropConsistency) -> Self {
135 Cluster {
136 key: l2.key,
137 flow_state: l2.flow_state,
138 _phantom: PhantomData,
139 }
140 }
141}
142
143impl<'a, C, Con: Consistency> TopLevel<'a> for Cluster<'a, C, Con> {}
144
145#[cfg(feature = "sim")]
146impl<'a, C> Cluster<'a, C> {
147 #[expect(clippy::type_complexity, reason = "stream markers")]
152 pub fn sim_input<T>(
153 &self,
154 ) -> (
155 crate::sim::SimClusterSender<
156 T,
157 crate::live_collections::stream::TotalOrder,
158 crate::live_collections::stream::ExactlyOnce,
159 >,
160 crate::live_collections::Stream<
161 T,
162 Self,
163 crate::live_collections::boundedness::Unbounded,
164 crate::live_collections::stream::TotalOrder,
165 crate::live_collections::stream::ExactlyOnce,
166 >,
167 )
168 where
169 T: serde::Serialize + serde::de::DeserializeOwned,
170 {
171 use crate::location::Location;
172
173 let external_location: crate::location::External<'a, ()> = crate::location::External {
174 key: LocationKey::FIRST,
175 flow_state: self.flow_state.clone(),
176 _phantom: PhantomData,
177 };
178
179 let (external, stream) = self.source_external_bincode(&external_location);
180
181 (
182 crate::sim::SimClusterSender(external.port_id, PhantomData),
183 stream,
184 )
185 }
186}
187
188pub struct ClusterIds<'a> {
193 pub key: LocationKey,
195 pub _phantom: PhantomData<&'a ()>,
197}
198
199impl<'a> Clone for ClusterIds<'a> {
200 fn clone(&self) -> Self {
201 Self {
202 key: self.key,
203 _phantom: Default::default(),
204 }
205 }
206}
207
208impl<'a, Ctx> FreeVariableWithContextWithProps<Ctx, ()> for ClusterIds<'a> {
209 type O = &'a [TaglessMemberId];
210
211 fn to_tokens(self, _ctx: &Ctx) -> (QuoteTokens, ())
212 where
213 Self: Sized,
214 {
215 let ident = syn::Ident::new(
216 &format!("__hydro_lang_cluster_ids_{}", self.key),
217 Span::call_site(),
218 );
219
220 (
221 QuoteTokens {
222 prelude: None,
223 expr: Some(quote! { #ident }),
224 },
225 (),
226 )
227 }
228}
229
230impl<'a, Ctx> QuotedWithContextWithProps<'a, &'a [TaglessMemberId], Ctx, ()> for ClusterIds<'a> {}
231
232pub trait IsCluster {
234 type Tag;
236}
237
238impl<C> IsCluster for Cluster<'_, C> {
239 type Tag = C;
240}
241
242pub static CLUSTER_SELF_ID: ClusterSelfId = ClusterSelfId { _private: &() };
245
246#[derive(Clone, Copy)]
251pub struct ClusterSelfId<'a> {
252 _private: &'a (),
253}
254
255impl<'a, L> FreeVariableWithContextWithProps<L, ()> for ClusterSelfId<'a>
256where
257 L: Location<'a>,
258 <L as Location<'a>>::Root: IsCluster,
259{
260 type O = MemberId<<<L as Location<'a>>::Root as IsCluster>::Tag>;
261
262 fn to_tokens(self, ctx: &L) -> (QuoteTokens, ())
263 where
264 Self: Sized,
265 {
266 let LocationId::Cluster(cluster_id) = ctx.root().id() else {
267 unreachable!()
268 };
269
270 let ident = syn::Ident::new(
271 &format!("__hydro_lang_cluster_self_id_{}", cluster_id),
272 Span::call_site(),
273 );
274 let root = get_this_crate();
275 let c_type: syn::Type = quote_type::<<<L as Location<'a>>::Root as IsCluster>::Tag>();
276
277 (
278 QuoteTokens {
279 prelude: None,
280 expr: Some(
281 quote! { #root::__staged::location::MemberId::<#c_type>::from_tagless((#ident).clone()) },
282 ),
283 },
284 (),
285 )
286 }
287}
288
289impl<'a, L>
290 QuotedWithContextWithProps<'a, MemberId<<<L as Location<'a>>::Root as IsCluster>::Tag>, L, ()>
291 for ClusterSelfId<'a>
292where
293 L: Location<'a>,
294 <L as Location<'a>>::Root: IsCluster,
295{
296}
297
298#[cfg(test)]
299mod tests {
300 #[cfg(feature = "sim")]
301 use stageleft::q;
302
303 #[cfg(feature = "sim")]
304 use super::CLUSTER_SELF_ID;
305 #[cfg(feature = "sim")]
306 use crate::location::{Location, MemberId, MembershipEvent};
307 #[cfg(feature = "sim")]
308 use crate::networking::TCP;
309 #[cfg(feature = "sim")]
310 use crate::nondet::nondet;
311 #[cfg(feature = "sim")]
312 use crate::prelude::FlowBuilder;
313
314 #[cfg(feature = "sim")]
315 #[test]
316 fn sim_cluster_self_id() {
317 let mut flow = FlowBuilder::new();
318 let cluster1 = flow.cluster::<()>();
319 let cluster2 = flow.cluster::<()>();
320
321 let node = flow.process::<()>();
322
323 let out_recv = cluster1
324 .source_iter(q!(vec![CLUSTER_SELF_ID]))
325 .send(&node, TCP.fail_stop().bincode())
326 .values()
327 .merge_unordered(
328 cluster2
329 .source_iter(q!(vec![CLUSTER_SELF_ID]))
330 .send(&node, TCP.fail_stop().bincode())
331 .values(),
332 )
333 .sim_output();
334
335 flow.sim()
336 .with_cluster_size(&cluster1, 3)
337 .with_cluster_size(&cluster2, 4)
338 .exhaustive(async || {
339 out_recv
340 .assert_yields_only_unordered([0, 1, 2, 0, 1, 2, 3].map(MemberId::from_raw_id))
341 .await
342 });
343 }
344
345 #[cfg(feature = "sim")]
346 #[test]
347 fn sim_cluster_with_tick() {
348 use std::collections::HashMap;
349
350 let mut flow = FlowBuilder::new();
351 let cluster = flow.cluster::<()>();
352 let node = flow.process::<()>();
353
354 let out_recv = cluster
355 .source_iter(q!(vec![1, 2, 3]))
356 .batch(&cluster.tick(), nondet!())
357 .count()
358 .all_ticks()
359 .send(&node, TCP.fail_stop().bincode())
360 .entries()
361 .map(q!(|(id, v)| (id, v)))
362 .sim_output();
363
364 let count = flow
365 .sim()
366 .with_cluster_size(&cluster, 2)
367 .exhaustive(async || {
368 let grouped = out_recv.collect_sorted::<Vec<_>>().await.into_iter().fold(
369 HashMap::new(),
370 |mut acc: HashMap<MemberId<()>, usize>, (id, v)| {
371 *acc.entry(id).or_default() += v;
372 acc
373 },
374 );
375
376 assert!(grouped.len() == 2);
377 for (_id, v) in grouped {
378 assert!(v == 3);
379 }
380 });
381
382 assert_eq!(count, 106);
383 }
387
388 #[cfg(feature = "sim")]
389 #[test]
390 fn sim_cluster_membership() {
391 let mut flow = FlowBuilder::new();
392 let cluster = flow.cluster::<()>();
393 let node = flow.process::<()>();
394
395 let out_recv = node
396 .source_cluster_membership_stream(&cluster, nondet!())
397 .entries()
398 .map(q!(|(id, v)| (id, v)))
399 .sim_output();
400
401 flow.sim()
402 .with_cluster_size(&cluster, 2)
403 .exhaustive(async || {
404 out_recv
405 .assert_yields_only_unordered(vec![
406 (MemberId::from_raw_id(0), MembershipEvent::Joined),
407 (MemberId::from_raw_id(1), MembershipEvent::Joined),
408 ])
409 .await;
410 });
411 }
412}