In a previous article, we explored how to manage background tasks in FastAPI using ARQ and Redis. While that setup is great for offloading work, real-world applications must handle failure. Network connections drop, external APIs return errors, and services temporarily go offline. A robust system doesn't just run tasks; it anticipates and recovers from these failures.
This guide dives into ARQ's powerful retry mechanisms. We'll cover how to automatically handle interruptions, implement custom retry logic with exponential backoff, and build resilient, production-grade task queues, a key component of any production-ready FastAPI application.
One of ARQ's most critical features for reliability is its default behavior during shutdowns or cancellations. If an ARQ worker is stopped while a job is running (for example, during a deployment or by pressing Ctrl+C
), it doesn't just discard the job. Instead, it gracefully cancels the task and immediately requeues it to be run again later.
When the worker is restarted, it picks up the cancelled job from the queue and runs it from the beginning.
You can see this in the worker's log output:
➤ arq worker.WorkerSettings 12:42:38: Starting worker... 12:42:38: 10.23s → job_id:the_task() delayed=10.23s 12:42:40: shutdown on SIGINT ◆ 0 jobs complete ◆ 0 failed ◆ 1 ongoing to cancel 12:42:40: 1.16s ↻ job_id:the_task cancelled, will be run again
When the worker starts again:
➤ arq worker.WorkerSettings 12:42:50: Starting worker... 12:42:50: 21.78s → job_id:the_task() try=2 delayed=21.78s 12:42:55: 5.00s ← job_id:the_task ●
Notice the try=2
in the log. ARQ automatically tracks retry attempts. This default behavior ensures that transient interruptions don't cause jobs to be lost, providing a strong foundation for a reliable system.
This automatic requeueing behavior means that ARQ guarantees at-least-once delivery. A job will run at least one time, but it might run more than once if it's interrupted.
Because of this, your tasks must be idempotent. An idempotent operation is one that can be applied multiple times without changing the result beyond the initial application. For example, setting a user's status to active
is idempotent, but adding a $10
charge to their account is not.
To make non-idempotent operations safe, you can:
For failures that happen within your task logic—like a failed API call—you can manually trigger a retry by raising the arq.worker.Retry
exception. This tells the worker to stop the current execution and requeue the job.
You can also specify a defer
period to tell the worker how long to wait before trying again. This is essential for implementing backoff strategies to avoid overwhelming a struggling service.
Let's create a task that downloads content from a URL using the popular httpx
library. If the API returns a non-200 status code, we'll retry the job with an increasing delay. This is known as a linear backoff.
# in tasks.py import asyncio from httpx import AsyncClient from arq import Retry # This function will be registered as an ARQ task async def download_content(ctx, url: str): """ Downloads content from a URL, with retries on failure. """ session: AsyncClient = ctx['session'] job_try = ctx.get("job_try", 1) max_tries = ctx.get("max_tries", 5) print(f"Attempt {job_try}/{max_tries}: Downloading {url}...") try: response = await session.get(url) response.raise_for_status() # Raises an exception for 4xx/5xx responses except Exception as e: print(f"Download failed: {e}") # If we have retries left, raise Retry with a delay if job_try < max_tries: # Delays will be 5s, 10s, 15s, 20s defer_by = job_try * 5 print(f"Retrying in {defer_by} seconds...") raise Retry(defer=defer_by) from e else: # If no retries left, let the exception bubble up to fail the job print("Max retries reached. Job will fail.") raise e print("Download successful.") return len(response.text) # --- Worker Configuration --- # in worker.py async def startup(ctx): """ Creates an httpx.AsyncClient instance for the worker to use. """ ctx['session'] = AsyncClient() async def shutdown(ctx): """ Closes the httpx.AsyncClient instance. """ await ctx['session'].aclose() class WorkerSettings: functions = [download_content] on_startup = startup on_shutdown = shutdown max_tries = 5 # Set the default max_tries for all jobs
In this example:
ctx
dictionary, which ARQ provides to every job, to get the current job_try
number.job_try < max_tries
). For robust monitoring, this is where you would also implement structured logging to record the failure.Retry
, passing a defer
value that increases with each attempt.Sometimes, you may have a function that you want to use both as an ARQ task and as a regular async function elsewhere in your code. If you raise Retry
outside of an ARQ worker context, it will cause an unhandled exception
.
To solve this, you can check for the existence of the ctx
dictionary to determine if the function is running as an ARQ job.
import requests from arq import Retry async def call_external_service(ctx: dict | None, payload: dict): try: # ... logic to call the service ... response = requests.post("https://api.example.com/data", json=payload) response.raise_for_status() return response.json() except requests.exceptions.ConnectionError as e: # This logic runs ONLY if the function is executed by an ARQ worker if ctx: job_try = ctx.get("job_try", 1) max_tries = ctx.get("max_tries", 5) if job_try < max_tries: # Retry with a fixed 20-second delay raise Retry(defer=20) # Re-raise the exception for arq if no retries are left raise Exception("Connection Error: The service is currently unavailable. Please try again later.") # rest of the function logic..
This pattern makes your functions more versatile. When run by ARQ, it leverages the retry system. When called directly, it fails fast by raising a standard exception, allowing the calling code to handle the error immediately.
Building a reliable system requires planning for failure. ARQ provides simple yet powerful tools to make your background tasks resilient to transient errors and interruptions. By combining automatic requeueing with manual Retry
logic and backoff strategies, you can ensure that your important jobs eventually get done, even when things go wrong. This approach is fundamental to creating production-ready applications with FastAPI.
David Muraya is a Solutions Architect specializing in Python, FastAPI, and Cloud Infrastructure. He is passionate about building scalable, production-ready applications and sharing his knowledge with the developer community. You can connect with him on LinkedIn.
Enjoyed this blog post? Check out these related posts!
Managing Background Tasks in FastAPI: BackgroundTasks vs ARQ + Redis
A practical guide to background processing in FastAPI, comparing built-in BackgroundTasks with ARQ and Redis for scalable async job queues.
Read More...
A Guide to Authentication in FastAPI with JWT
From Basic Auth to OAuth2 with Password Flow and JWT Tokens.
Read More...
Reusable Model Fields in SQLModel with Mixins
A Guide to Creating DRY Database Models with Timestamps and Base Models.
Read More...
Advanced Performance Tuning for FastAPI on Google Cloud Run
From Cold Starts to Concurrency: A Deep Dive into FastAPI Performance on Cloud Run.
Read More...
Have a project in mind? Send me an email at hello@davidmuraya.com and let's bring your ideas to life. I am always available for exciting discussions.