Verwaltung von Hintergrundaufgaben und lang laufenden Operationen in FastAPI
Grace Collins
Solutions Engineer · Leapcell

Einleitung
In der Welt moderner Webanwendungen ist die Benutzererfahrung von größter Bedeutung. Eine langsame oder nicht reagierende Benutzeroberfläche kann schnell zu Frustration und Abbruch durch den Benutzer führen. Oft müssen Anwendungen rechenintensive Operationen ausführen, externe Dienste einbeziehen oder einfach eine beträchtliche Zeit zum Abschluss benötigen. Werden diese Aufgaben synchron innerhalb des Haupt-Request-Response-Zyklus ausgeführt, blockieren sie den Server, was zu Verzögerungen und potenziellen Timeouts für den Benutzer führt. Hier wird das Konzept der Hintergrundaufgaben entscheidend. Indem diese lang laufenden Operationen ausgelagert werden, um sie asynchron zu verarbeiten, können wir sicherstellen, dass unsere API schnell, reaktionsfähig und für sofortige Benutzerinteraktionen verfügbar bleibt. Dieser Artikel untersucht, wie FastAPI, ein modernes, schnelles (hochperformantes) Web-Framework zum Erstellen von APIs mit Python 3.7+, Entwickler befähigt, solche Aufgaben effektiv zu verwalten, von einfachen "fire-and-forget"-Operationen bis hin zu komplexen, lang laufenden Prozessen, die dedizierte Warteschlangen erfordern.
Kernkonzepte erklärt
Bevor wir uns mit den Implementierungsdetails befassen, definieren wir einige Schlüsselbegriffe, die für unsere Diskussion zentral sein werden:
- Synchrone Operation: Eine Aufgabe, die abgeschlossen werden muss, bevor die nächste Aufgabe beginnen kann. Im Kontext eines Webservers blockiert ein Request Handler, der eine synchrone, lang laufende Operation ausführt, den Server daran, andere Anfragen zu verarbeiten, bis diese Operation abgeschlossen ist.
- Asynchrone Operation: Eine Aufgabe, die unabhängig im Hintergrund ausgeführt werden kann und es dem Hauptprogramm ermöglicht, andere Aufgaben fortzusetzen, ohne auf ihren Abschluss zu warten. Webserver, die Asynchronität nutzen, können mehrere Anfragen gleichzeitig bearbeiten, auch wenn einige davon auf I/O-Operationen warten.
- Hintergrundaufgabe: Ein spezifischer Typ asynchroner Operation, der im Kontext einer Webanfrage initiiert wird, aber nach dem Senden der HTTP-Antwort an den Client verarbeitet wird. Dies sind typischerweise "fire-and-forget"-Aufgaben, bei denen der Client nicht auf den Abschluss der Aufgabe warten muss.
- Lang laufende Aufgabe: Eine Aufgabe, die eine beträchtliche Zeit (Sekunden, Minuten oder sogar Stunden) zum Abschluss benötigt. Diese Aufgaben beinhalten oft komplexe Berechnungen, Datenverarbeitung, Dateikonvertierungen oder die Integration mit externen APIs. Obwohl Hintergrundaufgaben lang laufen können, sind nicht alle lang laufenden Aufgaben einfache "fire-and-forget"-Aufgaben; einige erfordern Fortschrittsverfolgung, Fehlerbehandlung und potenzielle Wiederholungsversuche.
- Aufgabenwarteschlange: Ein System, das für die Verwaltung und Ausführung von Aufgaben im asynchronen Modus entwickelt wurde. Wenn eine lang laufende Aufgabe initiiert wird, wird sie in eine Warteschlange gestellt. Ein separater Worker-Prozess holt dann Aufgaben aus dieser Warteschlange und führt sie aus, oft mit Funktionen wie Wiederholungsversuchen, Zeitplanung und Ergebnisverwaltung. Beliebte Beispiele sind Celery und RQ (Redis Queue).
- Worker: Ein dedizierter Prozess oder eine Reihe von Prozessen, der für das Abrufen von Aufgaben aus einer Aufgabenwarteschlange und deren Ausführung zuständig ist. Worker arbeiten unabhängig vom Haupt-API-Server.
Umgang mit Aufgaben in FastAPI
FastAPI bietet integrierte Unterstützung für einfache Hintergrundaufgaben mithilfe seiner BackgroundTasks
-Abhängigkeit. Für komplexere und wirklich lang laufende Operationen ist die Integration mit externen asynchronen Aufgabenwarteschlangen der Industriestandard.
Hintergrundaufgaben von FastAPI für einfache Auslagerung
Die BackgroundTasks
-Funktion von FastAPI ist ideal für leichte, nicht kritische Aufgaben, die ausgeführt werden müssen, nachdem die HTTP-Antwort gesendet wurde. Dies kann das Senden von E-Mail-Benachrichtigungen, das Protokollieren von Aktivitäten oder das Aktualisieren von Caches umfassen. Das wichtigste Merkmal hierbei ist, dass der Client nicht auf den Abschluss dieser Aufgaben wartet und deren Fehler die unmittelbare Benutzererfahrung nicht direkt beeinträchtigen.
Wir veranschaulichen dies anhand eines Beispiels, bei dem wir die Aktivität eines Benutzers protokollieren, nachdem er erfolgreich auf eine Ressource zugegriffen hat.
from fastapi import FastAPI, BackgroundTasks, Depends import uvicorn import time app = FastAPI() def write_log(message: str): with open("app.log", mode="a") as log: log.write(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {message}\n") @app.post("/send_email/{email}") async def send_email_background(email: str, background_tasks: BackgroundTasks): background_tasks.add_task(write_log, f"Sending email to {email}") # Simulate email sending process (could be another background task or actual async operation) # email_service.send_async(email, "Welcome!", "Thanks for signing up!") return {"message": "Email sending initiated. Check logs for details."} @app.get("/items/{item_id}") async def read_item(item_id: int, background_tasks: BackgroundTasks): background_tasks.add_task(write_log, f"Accessed item {item_id}") return {"item_id": item_id, "message": "Item accessed successfully."} if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000)
In diesem Beispiel:
- Wir definieren eine einfache
write_log
-Funktion, die eine Aufgabe simuliert. - In unseren API-Endpunkten (
/send_email/{email}
und/items/{item_id}
) deklarieren wirbackground_tasks: BackgroundTasks
als Abhängigkeit. - Anschließend verwenden wir
background_tasks.add_task(function_name, *args, **kwargs)
, um unserewrite_log
-Funktion zu planen. - FastAPI stellt sicher, dass
write_log
ausgeführt wird, nachdem die HTTP-Antwort für/send_email/{email}
oder/items/{item_id}
an den Client gesendet wurde. Der Client erhält eine sofortige Antwort, während die Protokollierung hinter den Kulissen geschieht.
Prinzip: BackgroundTasks
arbeitet, indem es einen aufrufbaren Code so plant, dass er innerhalb derselben Ereignisschleife ausgeführt wird, nachdem die Antwort gestreamt wurde. Dies ist effizient für kleine, schnelle Aufgaben, aber es ist entscheidend, seine Grenzen zu verstehen. Wenn die Hintergrundaufgabe selbst zu lange dauert, kann sie dennoch die Ressourcen der Hauptereignisschleife beanspruchen und potenziell die Reaktionsfähigkeit anderer Anfragen beeinträchtigen, wenn auch nach der anfänglichen Antwort. Sie bietet auch keine Persistenz oder Wiederholungsmechanismen im Falle eines Fehlers oder eines Server-Neustarts.
Umgang mit wirklich lang laufenden Aufgaben mit externen Aufgabenwarteschlangen
Für Aufgaben, die CPU-gebunden, I/O-gebunden mit externen Diensten, fehleranfällig oder einfach zu lang sind, um sie innerhalb des Haupt-FastAPI-Prozesses auszuführen (auch nach der Antwort), werden spezielle asynchrone Aufgabenwarteschlangen wie Celery oder RQ unerlässlich. Diese Systeme entkoppeln die Aufgabenausführung vom Webserver und bieten Robustheit, Skalierbarkeit und Beobachtbarkeit.
Betrachten wir ein Beispiel, bei dem ein Benutzer ein großes Bild hochlädt und wir es verarbeiten müssen (Größe ändern, Filter anwenden, in Cloud-Speicher hochladen), was mehrere Sekunden oder sogar Minuten dauern kann.
Architektur:
- FastAPI-Anwendung: Empfängt die Benutzeranfrage und leitet die Aufgabendetails an eine Aufgabenwarteschlange (z.B. Redis) weiter.
- Redis (oder RabbitMQ): Fungiert als Message Broker und speichert Aufgaben in einer Warteschlange.
- Worker-Prozess (Celery oder RQ Worker): Ein separater, persistenter Prozess, der kontinuierlich die Warteschlange überwacht, Aufgaben abholt und ausführt. Nach Abschluss kann er eine Datenbank aktualisieren oder den Benutzer benachrichtigen.
Hier verwenden wir Celery
mit Redis
als Broker, da dies eine beliebte und robuste Wahl ist.
1. Einrichtung von Celery und Redis:
- Notwendige Pakete installieren:
pip install "celery[redis]"
- Stellen Sie sicher, dass der Redis-Server läuft. Sie können ihn normalerweise mit
redis-server
starten.
2. Erstellen einer Celery-Anwendung (worker.py
):
# worker.py from celery import Celery import time # Konfigurieren Sie Celery # broker_url ist, wo Celery Aufgaben abruft # result_backend ist, wo Celery Aufgabenergebnisse speichert (optional, aber gut für lang laufende Aufgaben) celery_app = Celery( 'my_app', broker='redis://localhost:6379/0', result_backend='redis://localhost:6379/0' ) # Definieren Sie eine Celery-Aufgabe @celery_app.task def process_image(image_id: str, operations: list): print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Starting image processing for {image_id} with operations: {operations}") time.sleep(10) # Simulieren einer lang laufenden Bildverarbeitung print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Finished image processing for {image_id}") return {"image_id": image_id, "status": "processed", "applied_operations": operations} @celery_app.task def send_marketing_email(recipient_email: str, subject: str, body: str): print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Sending email to {recipient_email}...") time.sleep(5) # Simulieren des E-Mail-Versands print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Email sent to {recipient_email}") return {"email": recipient_email, "status": "sent", "subject": subject}
3. Integration von Celery mit FastAPI (main.py
):
# main.py from fastapi import FastAPI, BackgroundTasks, HTTPException from pydantic import BaseModel from worker import process_image, send_marketing_email # Importieren Sie unsere Celery-Aufgaben import uvicorn app = FastAPI() class ImageProcessingRequest(BaseModel): image_id: str operations: list class EmailRequest(BaseModel): recipient_email: str subject: str body: str @app.post("/process_image/", status_code=202) # Verwenden Sie 202 Accepted für Aufgaben, die später abgeschlossen werden async def schedule_image_processing(request: ImageProcessingRequest): # Aufgabe an Celery übergeben task = process_image.delay(request.image_id, request.operations) return {"message": "Image processing task scheduled", "task_id": task.id} @app.post("/send_marketing_email/", status_code=202) async def schedule_marketing_email(request: EmailRequest): task = send_marketing_email.apply_async( args=[request.recipient_email, request.subject, request.body], countdown=60 # Planen Sie die Ausführung nach 60 Sekunden ) return {"message": "Marketing email task scheduled", "task_id": task.id} @app.get("/task_status/{task_id}") async def get_task_status(task_id: str): task = celery_app.AsyncResult(task_id) if task.state == 'PENDING': response = {"status": task.state, "info": "Task is waiting to be processed or is in progress."} elif task.state != 'FAILURE': response = {"status": task.state, "result": task.result} else: # Aufgabe fehlgeschlagen response = {"status": task.state, "info": str(task.info)} # Dies speichert die Ausnahme return response # Beispiel zur Demonstration der nativen BackgroundTasks von FastAPI (wie zuvor) def write_log(message: str): with open("app.log", mode="a") as log: log.write(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] FastAPI BgTask: {message}\n") @app.get("/fastapi_bgtask_example") async def fastapi_bgtask_example(background_tasks: BackgroundTasks): background_tasks.add_task(write_log, "This is a simple native FastAPI background task.") return {"message": "FastAPI background task initiated."} if __name__ == "__main__": # Zum Ausführen: # 1. Redis starten: redis-server # 2. Celery worker starten: celery -A worker worker --loglevel=info # 3. FastAPI-Anwendung starten: python main.py uvicorn.run(app, host="0.0.0.0", port=8000)
4. Ausführen des Systems:
- Redis-Server starten:
redis-server
(in einem separaten Terminal) - Celery Worker starten:
celery -A worker worker --loglevel=info
(in einem anderen separaten Terminal,worker
bezieht sich aufworker.py
) - FastAPI-Anwendung starten:
python main.py
(in einem dritten Terminal)
Wenn Sie nun eine Anfrage an /process_image/
oder /send_marketing_email/
von Ihrer FastAPI-App senden, gibt FastAPI sofort einen 202 Accepted
-Status zusammen mit einer task_id
zurück. Die eigentliche Bildverarbeitung oder der E-Mail-Versand wird dann vom Celery Worker im Hintergrund gehandhabt, vollständig entkoppelt vom Request-Response-Zyklus des Webservers. Sie können /task_status/{task_id}
abfragen, um den Fortschritt oder das Ergebnis der Aufgabe zu überprüfen.
Vergleich und Anwendungsfälle:
-
BackgroundTasks
(FastAPI nativ):- Vorteile: Einfach zu implementieren, keine externen Abhängigkeiten (außer FastAPI selbst), gut für schnelle, nicht kritische Folgeaktionen.
- Nachteile: Läuft innerhalb derselben Ereignisschleife des FastAPI-Prozesses, keine Persistenz (wenn der Server neu startet, gehen laufende Aufgaben verloren), kein Wiederholungsmechanismus, keine Fortschrittsverfolgung oder Ergebnisverwaltung, eingeschränkte Skalierbarkeit für schwere Aufgaben.
- Anwendungsfälle: Schnelle Willkommens-E-Mail nach Benutzerregistrierung senden (wenn die Serverlast gering ist), einen Zähler aktualisieren, API-Zugriffe protokollieren, einen kleinen Cache löschen.
-
Externe Aufgabenwarteschlangen (z.B. Celery, RQ):
- Vorteile: Echte Entkopplung der Aufgabenausführung, robuste Handhabung lang laufender und fehleranfälliger Aufgaben, Persistenz (Aufgaben können Server-Neustarts überleben), automatische Wiederholungsversuche, Steuerungslogik für gleichzeitige Ausführung, Planungsfunktionen, Fortschrittsverfolgung, Ergebnisverwaltung, Skalierbarkeit (bei Bedarf mehr Worker hochfahren).
- Nachteile: Erhöht die Komplexität durch externe Abhängigkeiten (Message Broker, Worker-Prozesse), anfangs steilere Lernkurve.
- Anwendungsfälle: Bild-/Videoverarbeitung, große Datenexporte, Generieren von Berichten, Senden von Massen-E-Mails/Benachrichtigungen, lang laufende API-Aufrufe an externe Dienste, komplexe Finanzberechnungen, jede Aufgabe, die erhebliche Verarbeitungszeit oder Fehlerbehebung erfordert.
Fazit
Die effektive Verwaltung von Hintergrund- und lang laufenden Aufgaben ist ein Eckpfeiler beim Aufbau leistungsstarker und skalierbarer Webanwendungen. Die BackgroundTasks
von FastAPI bieten eine einfache Lösung für einfache "fire-and-forget"-Operationen, die die sofortige Antwort nicht blockieren. Für alles, was komplexer, robuster ist oder Persistenz und Fehlerbehandlung erfordert, ist die Integration mit dedizierten asynchronen Aufgabenwarteschlangen wie Celery oder RQ der empfohlene Ansatz. Durch die strategische Nutzung dieser Werkzeuge können Entwickler sicherstellen, dass ihre FastAPI-Anwendungen agil, reaktionsfähig bleiben und anspruchsvolle Arbeitslasten bewältigen können, ohne die Benutzererfahrung zu beeinträchtigen.