HomeBlog

Building Resilient Task Queues in FastAPI with ARQ Retries

7 min read
An illustration showing a task queue with retry logic, where a failed task is sent back to the queue with a delay.
By David Muraya • October 1, 2025

In a previous article, we explored how to manage background tasks in FastAPI using ARQ and Redis. While that setup is great for offloading work, real-world applications must handle failure. Network connections drop, external APIs return errors, and services temporarily go offline. A robust system doesn't just run tasks; it anticipates and recovers from these failures.

This guide dives into ARQ's powerful retry mechanisms. We'll cover how to automatically handle interruptions, implement custom retry logic with exponential backoff, and build resilient, production-grade task queues, a key component of any production-ready FastAPI application.

ARQ's Automatic Retry on Cancellation

One of ARQ's most critical features for reliability is its default behavior during shutdowns or cancellations. If an ARQ worker is stopped while a job is running (for example, during a deployment or by pressing Ctrl+C), it doesn't just discard the job. Instead, it gracefully cancels the task and immediately requeues it to be run again later.

When the worker is restarted, it picks up the cancelled job from the queue and runs it from the beginning.

You can see this in the worker's log output:

➤  arq worker.WorkerSettings
12:42:38: Starting worker...
12:42:38:  10.23s → job_id:the_task() delayed=10.23s
12:42:40: shutdown on SIGINT ◆ 0 jobs complete ◆ 0 failed ◆ 1 ongoing to cancel
12:42:40:   1.16s ↻ job_id:the_task cancelled, will be run again

When the worker starts again:

➤  arq worker.WorkerSettings
12:42:50: Starting worker...
12:42:50:  21.78s → job_id:the_task() try=2 delayed=21.78s
12:42:55:   5.00s ← job_id:the_task ●

Notice the try=2 in the log. ARQ automatically tracks retry attempts. This default behavior ensures that transient interruptions don't cause jobs to be lost, providing a strong foundation for a reliable system.

Important: Design for Idempotency

This automatic requeueing behavior means that ARQ guarantees at-least-once delivery. A job will run at least one time, but it might run more than once if it's interrupted.

Because of this, your tasks must be idempotent. An idempotent operation is one that can be applied multiple times without changing the result beyond the initial application. For example, setting a user's status to active is idempotent, but adding a $10 charge to their account is not.

To make non-idempotent operations safe, you can:

  • Use database transactions to ensure atomicity.
  • Check for a final state before acting (e.g., "if user is not yet subscribed, subscribe them").
  • Use an idempotency key to track whether a specific operation has already been completed.

Manual Retries with the `Retry` Exception

For failures that happen within your task logic—like a failed API call—you can manually trigger a retry by raising the arq.worker.Retry exception. This tells the worker to stop the current execution and requeue the job.

You can also specify a defer period to tell the worker how long to wait before trying again. This is essential for implementing backoff strategies to avoid overwhelming a struggling service.

Example: Retrying a Failing API Call

Let's create a task that downloads content from a URL using the popular httpx library. If the API returns a non-200 status code, we'll retry the job with an increasing delay. This is known as a linear backoff.

# in tasks.py
import asyncio
from httpx import AsyncClient
from arq import Retry

# This function will be registered as an ARQ task
async def download_content(ctx, url: str):
    """
    Downloads content from a URL, with retries on failure.
    """
    session: AsyncClient = ctx['session']
    job_try = ctx.get("job_try", 1)
    max_tries = ctx.get("max_tries", 5)

    print(f"Attempt {job_try}/{max_tries}: Downloading {url}...")

    try:
        response = await session.get(url)
        response.raise_for_status()  # Raises an exception for 4xx/5xx responses
    except Exception as e:
        print(f"Download failed: {e}")
        # If we have retries left, raise Retry with a delay
        if job_try < max_tries:
            # Delays will be 5s, 10s, 15s, 20s
            defer_by = job_try * 5
            print(f"Retrying in {defer_by} seconds...")
            raise Retry(defer=defer_by) from e
        else:
            # If no retries left, let the exception bubble up to fail the job
            print("Max retries reached. Job will fail.")
            raise e

    print("Download successful.")
    return len(response.text)

# --- Worker Configuration ---
# in worker.py

async def startup(ctx):
    """
    Creates an httpx.AsyncClient instance for the worker to use.
    """
    ctx['session'] = AsyncClient()

async def shutdown(ctx):
    """
    Closes the httpx.AsyncClient instance.
    """
    await ctx['session'].aclose()

class WorkerSettings:
    functions = [download_content]
    on_startup = startup
    on_shutdown = shutdown
    max_tries = 5 # Set the default max_tries for all jobs

In this example:

  1. We use the ctx dictionary, which ARQ provides to every job, to get the current job_try number.
  2. If the API call fails, we check if we have retries left (job_try < max_tries). For robust monitoring, this is where you would also implement structured logging to record the failure.
  3. If so, we raise Retry, passing a defer value that increases with each attempt.
  4. If we've reached the maximum number of tries, we re-raise the original exception, which causes the job to be marked as permanently failed.

Handling Retries in Reusable Functions

Sometimes, you may have a function that you want to use both as an ARQ task and as a regular async function elsewhere in your code. If you raise Retry outside of an ARQ worker context, it will cause an unhandled exception.

To solve this, you can check for the existence of the ctx dictionary to determine if the function is running as an ARQ job.

import requests
from arq import Retry

async def call_external_service(ctx: dict | None, payload: dict):
    try:
        # ... logic to call the service ...
        response = requests.post("https://api.example.com/data", json=payload)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.ConnectionError as e:
        # This logic runs ONLY if the function is executed by an ARQ worker
        if ctx:
            job_try = ctx.get("job_try", 1)
            max_tries = ctx.get("max_tries", 5)
            if job_try < max_tries:
                # Retry with a fixed 20-second delay
                raise Retry(defer=20)

            # Re-raise the exception for arq if no retries are left
            raise Exception("Connection Error: The service is currently unavailable. Please try again later.")

    # rest of the function logic..

This pattern makes your functions more versatile. When run by ARQ, it leverages the retry system. When called directly, it fails fast by raising a standard exception, allowing the calling code to handle the error immediately.

Final Thoughts

Building a reliable system requires planning for failure. ARQ provides simple yet powerful tools to make your background tasks resilient to transient errors and interruptions. By combining automatic requeueing with manual Retry logic and backoff strategies, you can ensure that your important jobs eventually get done, even when things go wrong. This approach is fundamental to creating production-ready applications with FastAPI.

Share This Article

About the Author

David Muraya is a Solutions Architect specializing in Python, FastAPI, and Cloud Infrastructure. He is passionate about building scalable, production-ready applications and sharing his knowledge with the developer community. You can connect with him on LinkedIn.

Related Blog Posts

Enjoyed this blog post? Check out these related posts!

Managing Background Tasks in FastAPI: BackgroundTasks vs ARQ + Redis

Managing Background Tasks in FastAPI: BackgroundTasks vs ARQ + Redis

A practical guide to background processing in FastAPI, comparing built-in BackgroundTasks with ARQ and Redis for scalable async job queues.

Read More...

A Guide to Authentication in FastAPI with JWT

A Guide to Authentication in FastAPI with JWT

From Basic Auth to OAuth2 with Password Flow and JWT Tokens.

Read More...

Reusable Model Fields in SQLModel with Mixins

Reusable Model Fields in SQLModel with Mixins

A Guide to Creating DRY Database Models with Timestamps and Base Models.

Read More...

Advanced Performance Tuning for FastAPI on Google Cloud Run

Advanced Performance Tuning for FastAPI on Google Cloud Run

From Cold Starts to Concurrency: A Deep Dive into FastAPI Performance on Cloud Run.

Read More...

On this page

ARQ's Automatic Retry on CancellationImportant: Design for IdempotencyManual Retries with the `Retry` ExceptionExample: Retrying a Failing API CallHandling Retries in Reusable FunctionsFinal Thoughts

Contact Me

Have a project in mind? Send me an email at hello@davidmuraya.com and let's bring your ideas to life. I am always available for exciting discussions.

© 2025 David Muraya. All rights reserved.