In a recent post, I announced the very non-exciting feature of having custom-built email subscriptions for this blog. Writing the subscription flow was easy, but developing the automation to, first, periodically scrape the blog’s RSS feed and, second, schedule email notifications to readers based on the items in the feed was really time-consuming and tricky to implement.

You would say: “How is that hard? Just set up a cron job that fetches the RSS feed and sends emails!” Yeah, yeah, of course, that can work. But when you add in other requirements, this approach is not so simple. Note a few things: you need to determine which posts in the feed are new and which aren’t, which means you have to keep track of all past-seen posts; my email gateway has daily limits on how many emails I can send, so I need to schedule the submissions across multiple days; and if the submission of one email fails, I want to retry sending that one email only. Add to that the fact that I run not one but multiple sites with their own subscription features, and these processes—in particular, the email quota controls—all have to somehow agree.

The more you think about this problem, the more you realize having some sort of queue to track which emails have to be sent and which haven’t been sent yet could be very useful. At that point, you might as well realize that such a queuing system can also support all kinds of background operations that happen in a web service, not just email submissions, and I have been needing that ability for a while now.

A blog on operating systems, programming languages, testing, build systems, my own software projects and even personal productivity. Specifics include FreeBSD, Linux, Rust, Bazel and EndBASIC.

0 subscribers

Follow @jmmv on Mastodon Follow @jmmv on Twitter RSS feed

So let’s get into specifics. Here are the conceptual tasks I envisioned within the context of EndTRACKER (terrible name, I know), along with the names they actually got in the code, to convert an RSS feed into email notifications:

  • ProcessFeeds: Loads all feeds from the database and enqueues new ScrapeFeed and ProcessFeed tasks for every feed. This is the entry point to the system, and this task is enqueued by a timer once an hour to start the processing flow.

  • ScrapeFeed(feed): Loads the details for feed, downloads its registered RSS feed, and computes the delta against previously-known items. Newly-discovered feed items are inserted into a database and marked as “not yet notified”.

  • ProcessFeed(feed): Loads all “not yet notified” posts for feed from the database plus the list of verified email subscribers for the site that owns the feed. It then schedules one SendFeedItem task for every post/subscriber pair and finally marks the feed item as notified.

  • SendEmailItem(feed, item, subscriber): Loads the feed’s item details from the database, composes an email for the subscriber by formatting arbitrary HTML into text, and sends the result to them. This is where sanity-checks happen, such as ensuring we have enough email submission quota left or that the subscriber hasn’t unsubscribed, and thus this task must be retried at a later stage if it fails.

With these task definitions in mind, it was time to go to the drawing board and design a queuing system to support them.


Now, the question you surely have is: “Why bother? There are plenty of queuing services out there!” And my answer is: "Because why not". I wanted to have some fun designing and implementing this feature, and I don’t fancy the idea of marrying a specific cloud provider by relying on their cloud-native services: so far, the only thing I depend on for my web services is a hosted PostgreSQL instance and a bunch of Azure Functions deployments, and I could trivially move these to a VM in my home server if I had to.

Anyhow. What is a “persistent task queue” anyway, other than a mouthful? Let’s look at the words: it is a queue, so it is an ordered record of “things”; it is for tasks, so it needs to offer task-specific execution logic and tracking to ensure at-most-once semantics and the like; and it is persistent, so the tasks and their statuses need to be stored somewhere.

Key ideas

As in any design, we have to start by enumerating the specific requirements I had for the solution to put the various design choices in context. These are:

  • Service-agnostic. I run a few web services, all of which share common logic via the III-IV framework. The queue implementation must live in this open-source framework. The framework must not know anything about the task specifics, so task descriptors must be defined by the services and they need to be persisted in some generic way (JSON serialization).

  • Client and worker separation. Clients enqueuing tasks must not know anything about how to run them. Only the worker processes need to contain the code that processes the tasks. This means there ought to be two separate APIs, and there can be one or more client and/or workers running at any given time.

  • Azure Functions deployment. The queue must run within existing services, not as a new standalone deployment. This is a restrictive requirement because my existing services are run by a serverless runtime, so the queue operations cannot rely on long-running processes.

  • At-most once execution. Tasks have side-effects so they must run at most once. It is OK if some are lost as long as it’s a rare occurrence, but it is not acceptable to run the same task twice. The fact that we run in a serverless environment helps with this because Azure Functions enforces a maximum runtime for processes, which we will take advantage of.

  • Ability to retry tasks that fail. Some tasks will fail for expected reasons, such as when we have run out of outbound email quota for the day, and these must be postponed and retried at a later time. But some failures are not retriable, so whether a task needs to be retried or not has to be decided on an error-by-error basis.

  • Quarantining of problematic tasks. Tasks that fail repeatedly or that cause the queue worker to crash in unexpected ways need to be moved out of the way after a few execution attempts so that they do not cause future trouble.

  • Database-agnostic. As is the case for all of III-IV-based services, the choice of the database must be independent from the queue logic and both PostgreSQL (for production) and SQLite (for lightning-fast tests) must be supported. However, the database must be ACID because it will be used for synchronization.

With these, you can start envisioning how the queue looks like. Beware that I’m no expert in queuing systems, so this design and implementation are possibly flawed or incomplete in various ways… but it does the job for now. Let’s dive into its specifics.

Azure Functions integration

The first consumer of the task queue was going to be EndTRACKER and, right now, EndTRACKER runs as an Azure Functions serverless service: I upload a tiny Rust binary that exposes an HTTP interface and the cloud runtime takes care of spinning up short-lived containers whenever the configured HTTP routes are accessed. The only magic involved is making the Rust HTTP server expose itself via the TCP/IP port given to it in the FUNCTIONS_CUSTOMHANDLER_PORT environment variable—and, after that, all communication between the Azure Functions runtime and the Rust binary happens over HTTP.

This is great, but… I had to clear some questions before even attempting to write the queue service in this environment.

The first question was: was it even possible to build the queue runtime in a serverless environment? A worker is, in theory, a long-lived process that polls the queue every few seconds or minutes to detect work to do and executes such work. The answer is obviously “yes, you can do that”. Imagine exposing a /queue-loop HTTP endpoint and having a timer that calls this endpoint periodically to trigger the queue’s processing loop. As long as the loop can execute at least one task within the maximum allowed container run time, the queue will make forward progress.

The second question was: can we integrate this timer into Azure Functions without needing a separate cron job that pokes /queue-loop every few minutes? The answer also seemed to be yes: Azure Functions endpoints can be exposed via different triggers. Some triggers are HTTP endpoint calls, but other triggers, such as timers, can be used. If we define a trigger like the following in a functions/queue-loop.json file:

{
  "bindings": [
    {
      "type": "timerTrigger",
      "direction": "in",
      "name": "req",
      "schedule": "0 */10 * * * *"
    }
  ]
}

Then the Azure Functions runtime will call this queue-loop function every 10 minutes given the schedule stanza above.

But then the third question was: this is not an HTTP trigger… can it be processed at all from a custom Rust binary, or are the officially supported SDKs relying on some other communication mechanism with the runtime engine to react to non-HTTP triggers?

Fortunately, the answer is also yes. It took me a bit of fiddling, but in the end I found that the runtime will invoke the /queue-loop POST HTTP handler (outside of the default /api namespace for user-supplied handlers). However, after I got this hooked up, I noticed that the Azure Functions runtime claimed that my handler failed (even when it did return a 200 OK code) and kept re-invoking it every few seconds—as if previous calls had been lost. This took some more effort to figure out, and I had to peek into the C# SDK code to find the answer: the endpoint needs to return a valid JSON payload, not an empty document. After figuring this out, changing the endpoint to return an empty {} dictionary allowed the runtime to interact with my handler just fine.

I was cleared to proceed with the original idea.

The client interface

Defining the right interface for tasks, especially the internal interface to process them, took many iterations. As mentioned earlier, clients must not know anything about how tasks are executed, so there have to be two separate APIs: one that is public for clients, and one that is internal to the workers.

In the end, I settled on the following client operations. Note that this is missing a ton of details, so don’t take the Rust code too literally here:

enum TaskResult {
    /// The task succeeded with an optional diagnostic/status message.
    Done(Option<String>),
    
    /// The task failed hard with the given error message.
    Failed(String),
    
    /// The task was abandoned after N failed retries with the given error message.
    Abandoned(String),
}

impl<DB, Task> Client<DB, Task>
where
    DB: Database,
    Task: Serialize,
{
    /// Creates a new client that uses the `db` database for task persistence.
    fn new(db: DB) -> Self { ... }

    /// Enqueues a new `task` and returns the identifier assigned to it.
    async fn enqueue(&mut self, task: &T) -> Result<Uuid> { ... }

    /// Checks if the task `id` has finished execution by querying the database.
    async fn poll(&mut self, id: Uuid) -> Result<Option<TaskResult>> { ... }
    
    /// Waits for the task `id` to finish execution by polling it every `period`.
    async fn wait(&mut self, id: Uuid, period: Duration) -> Result<TaskResult> { ... }
}

We have a Client that is connected to a database of a generic type at construction time. This client provides a mechanism to enqueue a JSON-serializable task via an enqueue method, which returns the identifier of the enqueued task, and also offers methods to poll for the task’s status as it runs and to wait until the task completes.

All operations issued by the client happen by talking to the database. To avoid a potential hot spot in inserting tasks, tasks cannot be identified by a globally-unique counter because, otherwise, multiple concurrent clients would need to synchronize on that datum. This is why tasks use UUIDs as identifiers. (I suppose using the default “row id” of the database could have also worked here just fine, but I’m already using UUIDs for many other things, so that’s what I picked.)

The above is the essence of the Client, but it has other methods. Take a look at the client module for more details.

The worker service: not a microservice

The worker is exposed as a Worker<Task> type which looks like this. Pardon my over-simplified Rust:

impl<Task> Worker<Task>
where
    Task: Deserialize,
{
    /// Creates a new worker process that uses the `db` database to extract
    /// tasks and persist task state, uses `opts` for configuration, and
    /// relies on `exec` for the task execution logic.
    fn new<Exec, ExecFut>(db: DB, opts: WorkerOptions, exec: Exec) -> Self
    where
        DB: Database,
        Exec: Fn(T) -> ExecFut,
        ExecFut: Future<Output = ExecResult> { ... }

    /// Tells the worker to look for runnable tasks in the database and to
    /// run them.
    async fn notify(&mut self) -> Result<()> { ... }
}

The worker’s new constructor spawns a Tokio background task that loops infinitely, polling tasks from the database db and using the exec closure to execute each runnable task. The user-configurable options provided in opts tune the behavior of the loop, and all of these options can be provided via environment variables.

But the infinite loop does nothing on its own. The worker loop starts idle and it only runs tasks whenever the notify method is called. There is an async channel between the Tokio background task and the Worker instance, which notify uses to awaken the loop. This is where the previously-described HTTP API handler /queue-loop comes into play: this handler wraps a Worker and simply invokes notify on it to trigger the processing loop.

As for the exec suppliers, these closures are intended to be stateless in memory, which simplifies their design. Remember that tasks can be executed from different workers, more than once if they have to be retried, and that the worker processes can die at any time… so keeping state in memory is nonsensical. It is possible to maintain state in memory in a very convoluted way, which I had to do for unit-testing purposes, but such difficulty is tolerable given this rationale.

The most interesting thing about the exec closure is its return type, which looks like the following:

enum ExecError {
    /// The task hard-failed with the given error message.
    Failed(String),
    
    /// The task asked to be retried after a certain delay with a diagnostic
    /// message.
    RetryAfterDelay(Duration, String),

    /// The task asked to be retried after a specific timestamp with a
    /// diagnostic message.
    RetryAfterTimestamp(OffsetDateTime, String),
}

type ExecResult = Result<Option<String>, ExecError>;

There are blanket conversions from database-layer errors (DbError) and business logic errors (DriverError) into ExecError that classify these error types into either fatal failures or retriable failures. This allows the Rust try operator ? to do the right thing by default.

As in the client section, the above is just a sketch of the Worker. Take a look at the worker module for the real details.

Safe retries

One key requirement we haven’t covered yet is how to achieve at-most-once execution guarantees. For this, we need to ensure that only one worker can pick up a task at any given time.

This is easy, right? We can implement a write-ahead journal for task processing, like this:

  1. Load a runnable (not-yet-running) task from the queue.
  2. Atomically mark the task as running (thanks, ACID transactions!).
  3. Run the task processing logic.
  4. If the task fails, atomically mark it as failed so that it can be retried later after a configurable delay.
  5. If the task succeeds, atomically mark it as done so that it is never considered as runnable again.

This ensures that only one worker will ever pick up a task so we are protected against multiple concurrent executions. All good, right? Well… but what happens if the worker dies at any point after marking the task as running, for whatever reason? We will leave the task “running” so it won’t ever be considered as runnable again, which means it’ll never complete!

Thankfully, this is where the serverless runtime provided by Azure Functions comes in very handy. We can rely on the fact that the runtime enforces a maximum runtime limit for any request and use that fact to detect “lost” tasks. In other words: if we find a task in the running state that has been running for longer than the maximum allowed runtime (5 minutes by default), then we can conclude that the worker was lost and that the task has to be retried.

Which finally brings us to quarantining. A worker can die due to external reasons (exceeding its maximum runtime, hardware failure…) or it can die because the exec handler for the task crashed. It’s the latter case that’s worrying because we could have an ill-defined task that causes a repeated crash every time a worker picks it up. Such cases are rather common in large queuing systems and can cause slowdowns in task processing or a complete DOS of the service. This is why it’s important to discard a task if it has been retried more than N times, which is what’s represented by the Abandoned task state.

Putting it all together

Two requirements of the design were to separate the queue logic from the consumer services and to not need extra services to be running. This posed an interesting problem.

The queue processing loop (aka the Worker) has to run in-process with the actual service. Now, while I do like a microservice-oriented design because it keeps responsibilities clear across components, deploying a bunch of services comes at a high maintenance cost—one that I am not willing to pay for my side projects. This is why, in EndTRACKER, I have opted for a microservice-like design with a monolithic deployment. The way this works is that the code in EndTRACKER is split into various REST services, and these services all get combined into one single HTTP router from main.rs. The result is a single binary with trivial deployment practices, but leaving an easy way out of a monolith if the need to scale any of the internal services arises.

In actual terms, this means that EndTRACKER now has an in-process batch “microservice” that simply spawns the generic Worker and exposes the /queue-loop endpoint into the HTTP router, like this:

// Spawn the worker, connecting it to system services and the `run_task`
// execution logic.
let worker = iii_iv_queue::driver::Worker::new(
    worker_db, clock, worker_opts, move |t| { run_task(...) });
let worker = Arc::from(Mutex::from(worker));

// Instantiate the service-independent queue worker from III-IV.
// This is the `/queue-loop` handler, essentially.
let queue_router = iii_iv_queue::rest::worker_cron_app(worker);

// Create the HTTP router for the microservice, bundling the generic
// III-IV endpoints with ours.
let router = Router::new()
    .route("/hourly", axum::routing::post(hourly::cron_post_handler))
    .merge(queue_router);

Verbose… but not too complicated.

But wait, what is that /hourly endpoint that just showed up? That’s another Azure Functions timer trigger that gets called every hour. This is where the service injects the ProcessFeeds master task into the queue to trigger the whole processing flow. With that task injected into the queue, the /queue-loop then takes over every 10 minutes to discover and process all dependent tasks as they show up.

Future use cases

Before concluding, let me briefly mention a couple of additional use cases that the queue could serve. It’s because these use cases exist that I pursued the queue solution instead of simply deploying rss2email as a cron job in my home server.

The first use case is going back to this long-standing to-do in the EndTRACKER codebase:

// Ideally we should get full visibility into all history, but that
// can be very costly and we can tolerate some error here.  To fix
// this properly, though, we'd need some offline data preprocessing,
// which would benefit this whole function anyway.

This comment is referring to the logic that computes the historical page views graphs on demand. Right now, historical queries are limited to just one month because of this, but with deferred task processing, you can imagine having a daily task that calculates which data points are missing in a timeseries and spawns tasks to generate those precomputed data points… which should be trivial to implement at this point. Furthermore, having this ability to compute summarized timeseries means I could change the service to fully discard request data after processing, further reducing the risk of keeping any private data in the database.

The other use case would arise if I start accepting “customers” in EndTRACKER. If that were the case, one feature I’d want to expose is an “export your data as a SQLite database” button so that you wouldn’t feel trapped in the platform. Implementing such a data dump cannot happen as part of a server request, so it would also benefit from happening via a deferred task.

Finally, that’s all for today. Remember that III-IV is open source, so this queuing system is as well. Maybe you can now find use cases for this little framework yourself!