mas_storage/queue/worker.rs
1// Copyright 2024 New Vector Ltd.
2//
3// SPDX-License-Identifier: AGPL-3.0-only
4// Please see LICENSE in the repository root for full details.
5
6//! Repository to interact with workers in the job queue
7
8use async_trait::async_trait;
9use chrono::Duration;
10use rand_core::RngCore;
11use ulid::Ulid;
12
13use crate::{Clock, repository_impl};
14
15/// A worker is an entity which can execute jobs.
16pub struct Worker {
17 /// The ID of the worker.
18 pub id: Ulid,
19}
20
21/// A [`QueueWorkerRepository`] is used to schedule jobs to be executed by a
22/// worker.
23#[async_trait]
24pub trait QueueWorkerRepository: Send + Sync {
25 /// The error type returned by the repository.
26 type Error;
27
28 /// Register a new worker.
29 ///
30 /// Returns a reference to the worker.
31 ///
32 /// # Errors
33 ///
34 /// Returns an error if the underlying repository fails.
35 async fn register(
36 &mut self,
37 rng: &mut (dyn RngCore + Send),
38 clock: &dyn Clock,
39 ) -> Result<Worker, Self::Error>;
40
41 /// Send a heartbeat for the given worker.
42 ///
43 /// # Errors
44 ///
45 /// Returns an error if the underlying repository fails or if the worker was
46 /// shutdown.
47 async fn heartbeat(&mut self, clock: &dyn Clock, worker: &Worker) -> Result<(), Self::Error>;
48
49 /// Mark the given worker as shutdown.
50 ///
51 /// # Errors
52 ///
53 /// Returns an error if the underlying repository fails.
54 async fn shutdown(&mut self, clock: &dyn Clock, worker: &Worker) -> Result<(), Self::Error>;
55
56 /// Find dead workers and shut them down.
57 ///
58 /// # Errors
59 ///
60 /// Returns an error if the underlying repository fails.
61 async fn shutdown_dead_workers(
62 &mut self,
63 clock: &dyn Clock,
64 threshold: Duration,
65 ) -> Result<(), Self::Error>;
66
67 /// Remove the leader lease if it is expired, sending a notification to
68 /// trigger a new leader election.
69 ///
70 /// # Errors
71 ///
72 /// Returns an error if the underlying repository fails.
73 async fn remove_leader_lease_if_expired(
74 &mut self,
75 clock: &dyn Clock,
76 ) -> Result<(), Self::Error>;
77
78 /// Try to get the leader lease, renewing it if we already have it
79 ///
80 /// Returns `true` if we got the leader lease, `false` if we didn't
81 ///
82 /// # Errors
83 ///
84 /// Returns an error if the underlying repository fails.
85 async fn try_get_leader_lease(
86 &mut self,
87 clock: &dyn Clock,
88 worker: &Worker,
89 ) -> Result<bool, Self::Error>;
90}
91
92repository_impl!(QueueWorkerRepository:
93 async fn register(
94 &mut self,
95 rng: &mut (dyn RngCore + Send),
96 clock: &dyn Clock,
97 ) -> Result<Worker, Self::Error>;
98
99 async fn heartbeat(
100 &mut self,
101 clock: &dyn Clock,
102 worker: &Worker,
103 ) -> Result<(), Self::Error>;
104
105 async fn shutdown(
106 &mut self,
107 clock: &dyn Clock,
108 worker: &Worker,
109 ) -> Result<(), Self::Error>;
110
111 async fn shutdown_dead_workers(
112 &mut self,
113 clock: &dyn Clock,
114 threshold: Duration,
115 ) -> Result<(), Self::Error>;
116
117 async fn remove_leader_lease_if_expired(
118 &mut self,
119 clock: &dyn Clock,
120 ) -> Result<(), Self::Error>;
121
122 async fn try_get_leader_lease(
123 &mut self,
124 clock: &dyn Clock,
125 worker: &Worker,
126 ) -> Result<bool, Self::Error>;
127);