mas_storage/queue/
worker.rs

1// Copyright 2024 New Vector Ltd.
2//
3// SPDX-License-Identifier: AGPL-3.0-only
4// Please see LICENSE in the repository root for full details.
5
6//! Repository to interact with workers in the job queue
7
8use async_trait::async_trait;
9use chrono::Duration;
10use rand_core::RngCore;
11use ulid::Ulid;
12
13use crate::{Clock, repository_impl};
14
15/// A worker is an entity which can execute jobs.
16pub struct Worker {
17    /// The ID of the worker.
18    pub id: Ulid,
19}
20
21/// A [`QueueWorkerRepository`] is used to schedule jobs to be executed by a
22/// worker.
23#[async_trait]
24pub trait QueueWorkerRepository: Send + Sync {
25    /// The error type returned by the repository.
26    type Error;
27
28    /// Register a new worker.
29    ///
30    /// Returns a reference to the worker.
31    ///
32    /// # Errors
33    ///
34    /// Returns an error if the underlying repository fails.
35    async fn register(
36        &mut self,
37        rng: &mut (dyn RngCore + Send),
38        clock: &dyn Clock,
39    ) -> Result<Worker, Self::Error>;
40
41    /// Send a heartbeat for the given worker.
42    ///
43    /// # Errors
44    ///
45    /// Returns an error if the underlying repository fails or if the worker was
46    /// shutdown.
47    async fn heartbeat(&mut self, clock: &dyn Clock, worker: &Worker) -> Result<(), Self::Error>;
48
49    /// Mark the given worker as shutdown.
50    ///
51    /// # Errors
52    ///
53    /// Returns an error if the underlying repository fails.
54    async fn shutdown(&mut self, clock: &dyn Clock, worker: &Worker) -> Result<(), Self::Error>;
55
56    /// Find dead workers and shut them down.
57    ///
58    /// # Errors
59    ///
60    /// Returns an error if the underlying repository fails.
61    async fn shutdown_dead_workers(
62        &mut self,
63        clock: &dyn Clock,
64        threshold: Duration,
65    ) -> Result<(), Self::Error>;
66
67    /// Remove the leader lease if it is expired, sending a notification to
68    /// trigger a new leader election.
69    ///
70    /// # Errors
71    ///
72    /// Returns an error if the underlying repository fails.
73    async fn remove_leader_lease_if_expired(
74        &mut self,
75        clock: &dyn Clock,
76    ) -> Result<(), Self::Error>;
77
78    /// Try to get the leader lease, renewing it if we already have it
79    ///
80    /// Returns `true` if we got the leader lease, `false` if we didn't
81    ///
82    /// # Errors
83    ///
84    /// Returns an error if the underlying repository fails.
85    async fn try_get_leader_lease(
86        &mut self,
87        clock: &dyn Clock,
88        worker: &Worker,
89    ) -> Result<bool, Self::Error>;
90}
91
92repository_impl!(QueueWorkerRepository:
93    async fn register(
94        &mut self,
95        rng: &mut (dyn RngCore + Send),
96        clock: &dyn Clock,
97    ) -> Result<Worker, Self::Error>;
98
99    async fn heartbeat(
100        &mut self,
101        clock: &dyn Clock,
102        worker: &Worker,
103    ) -> Result<(), Self::Error>;
104
105    async fn shutdown(
106        &mut self,
107        clock: &dyn Clock,
108        worker: &Worker,
109    ) -> Result<(), Self::Error>;
110
111    async fn shutdown_dead_workers(
112        &mut self,
113        clock: &dyn Clock,
114        threshold: Duration,
115    ) -> Result<(), Self::Error>;
116
117    async fn remove_leader_lease_if_expired(
118        &mut self,
119        clock: &dyn Clock,
120    ) -> Result<(), Self::Error>;
121
122    async fn try_get_leader_lease(
123        &mut self,
124        clock: &dyn Clock,
125        worker: &Worker,
126    ) -> Result<bool, Self::Error>;
127);