1const INGRESS: &str = "sel://example.org/data-pipelines/ingress";
2const EVEN: &str = "sel://example.org/data-pipelines/even";
3
4/// Generates data to be transformed by the pipeline
5#[entrypoint]
6async fn generator(ctx: Context) -> Result<()> {
7 let switchboard: Switchboard = ctx.require().await;
8 let atlas: Atlas = ctx.require().await;
9
10 let mut publisher = Publisher::<i32>::create(&switchboard).await?;
11 atlas
12 .insert(Uri::parse(INGRESS).unwrap(), publisher.endpoint_id() as u64)
13 .await?;
14
15 // Emit a value every 0.5 seconds.
16 let mut state: i32 = -1;
17 loop {
18 state = state.wrapping_mul(1_103_515_245).wrapping_add(12_345);
19 publisher.send(state as i32).await?;
20 time::sleep(Duration::from_millis(500)).await?;
21 }
22}
23
24/// Doubles the input value
25#[entrypoint]
26async fn double(ctx: Context) -> Result<()> {
27 let switchboard: Switchboard = ctx.require().await;
28 let atlas: Atlas = ctx.require().await;
29
30 let ingress = Subscriber::<i32>::create(&switchboard).await?;
31 SubscriberAtlasExt::connect(&ingress, &atlas, &switchboard, INGRESS).await?;
32
33 let egress = Publisher::<i32>::create(&switchboard).await?;
34 PublisherAtlasExt::matching(&egress, &atlas, &switchboard, EVEN).await?;
35
36 ingress.map_ok(|item| item * 2).forward(egress).await?;
37
38 Ok(())
39}
40
41/// Adds 5u32 to the input value
42#[entrypoint]
43async fn add_five(ctx: Context) -> Result<()> {
44 let switchboard: Switchboard = ctx.require().await;
45 let atlas: Atlas = ctx.require().await;
46
47 let ingress = Subscriber::<i32>::create(&switchboard).await?;
48 SubscriberAtlasExt::connect(&ingress, &atlas, &switchboard, INGRESS).await?;
49
50 let egress = Publisher::<i32>::create(&switchboard).await?;
51 PublisherAtlasExt::matching(&egress, &atlas, &switchboard, EVEN).await?;
52
53 ingress.map_ok(|item| item + 5).forward(egress).await?;
54
55 Ok(())
56}
57
58/// Filters by even numbers
59#[entrypoint]
60async fn even(ctx: Context) -> Result<()> {
61 let switchboard: Switchboard = ctx.require().await;
62 let atlas: Atlas = ctx.require().await;
63
64 let ingress = Subscriber::<i32>::create(&switchboard).await?;
65 atlas
66 .insert(Uri::parse(EVEN).unwrap(), ingress.endpoint_id() as u64)
67 .await?;
68
69 ingress
70 .filter_map(|item| ready(item.ok()))
71 .filter(|item| ready(item % 2 == 0))
72 .for_each(|item| {
73 info!("Found even number: {item}");
74 ready(())
75 })
76 .await;
77
78 Ok(())
79}