Asynchrone Aufgabenverarbeitung mit Celery in Django und Flask
Olivia Novak
Dev Intern · Leapcell

Einleitung
In der Welt der Webentwicklung ist das Benutzererlebnis von größter Bedeutung. Eine langsam ladende Seite oder eine nicht reagierende Anwendung kann schnell zu Frustration und Abbruch durch den Benutzer führen. Oft müssen Anwendungen Aufgaben ausführen, die von Natur aus zeitaufwändig sind – denken Sie an Bildverarbeitung, das Senden von Massen-E-Mails, die Generierung komplexer Berichte oder die Interaktion mit externen APIs. Wenn diese Operationen synchron als Teil einer Webanfrage ausgeführt werden, können sie den Hauptthread der Anwendung blockieren, was zu erheblichen Verzögerungen und einem schlechten Benutzererlebnis führt. Hier kommen asynchrone Aufgabenwarteschlangen ins Spiel, die es ermöglichen, diese ressourcenintensiven Aufgaben aus dem Hauptanforderungs-Antwortzyklus auszulagern. Dieser Artikel untersucht, wie Celery, eine leistungsstarke verteilte Aufgabenwarteschlange, in gängige Python-Web-Frameworks wie Django und Flask integriert werden kann, um solche mühsamen Operationen elegant zu handhaben und so die Reaktionsfähigkeit der Anwendung aufrechtzuerhalten und die Gesamtleistung zu verbessern.
Kernkonzepte für asynchrone Operationen
Bevor wir uns mit den Integrationsdetails befassen, wollen wir ein gemeinsames Verständnis der Schlüsselkomponenten und Konzepte aufbauen, die bei der Erstellung asynchroner Aufgabensysteme mit Celery beteiligt sind.
Celery: Im Kern ist Celery eine asynchrone Aufgaben- oder Jobwarteschlange, die auf verteilter Nachrichtenübermittlung basiert. Sie ist darauf ausgelegt, große Mengen von Nachrichten zu verarbeiten und Operationen mit einer Echtzeit-Komponente (nahezu Echtzeit) bereitzustellen.
Broker: Celery benötigt einen Message Broker zum Senden und Empfangen von Nachrichten. Der Broker fungiert als Vermittler, speichert die auszuführenden Aufgaben und liefert sie an die Worker. Beliebte Optionen sind RabbitMQ, Redis und Amazon SQS.
Worker: Ein Celery Worker ist ein separater Prozess, der kontinuierlich den Message Broker auf neue Aufgaben überwacht. Sobald eine Aufgabe abgerufen wurde, führt der Worker sie aus und speichert optional das Ergebnis. Sie können mehrere Worker ausführen, um Ihre Aufgabenverarbeitungskapazitäten zu skalieren.
Task: In Celery ist eine Aufgabe eine aufrufbare Python-Funktion, die asynchron von einem Worker ausgeführt wird. Diese Funktionen werden normalerweise in Ihrer Anwendung definiert und mit einem Dekoramateur versehen, um als Celery-Aufgaben erkannt zu werden.
Result Backend (Optional): Nachdem eine Aufgabe abgeschlossen ist, möchten Sie möglicherweise ihr Ergebnis oder ihren Status abrufen. Ein Result Backend speichert diese Informationen. Gängige Optionen sind Redis, SQLAlchemy, Django ORM und Memcached.
Implementierung von Celery in Django
Die Integration von Celery in ein Django-Projekt umfasst einige einfache Schritte: Konfiguration, Definition von Aufgaben und Ausführung der notwendigen Komponenten.
1. Installation
Installieren Sie zuerst Celery und einen ausgewählten Broker. Für dieses Beispiel verwenden wir Redis als Broker und Result Backend.
pip install celery redis
2. Django Projekt-Setup
Erstellen Sie eine Celery
-Instanz in Ihrem Django-Projekt. Typischerweise wird dies in einer Datei proj_name/celery.py
vorgenommen.
# proj_name/celery.py import os from celery import Celery # Setzen Sie das Standardmodul für Django-Einstellungen für das 'celery'-Programm. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj_name.settings') app = Celery('proj_name') # Die Verwendung einer Zeichenkette hier bedeutet, dass der Worker das Konfigurationsobjekt nicht in Kindprozesse serialisieren muss. # - namespace='CELERY' bedeutet, dass alle Celery-bezogenen Konfigurationsschlüssel einen Präfix `CELERY_` haben sollten. app.config_from_object('django.conf:settings', namespace='CELERY') # Task-Module aus allen registrierten Django-App-Konfigurationen laden. app.autodiscover_tasks() @app.task(bind=True, ignore_result=True) def debug_task(self): print(f'Request: {self.request!r}')
Importieren Sie dann diese Celery-App in die __init__.py
Ihres Projekts, um sicherzustellen, dass sie beim Start von Django geladen wird.
# proj_name/__init__.py from .celery import app as celery_app
3. Celery-Konfiguration in settings.py
Fügen Sie der settings.py
Ihres Django-Projekts Konfigurationen für Celery hinzu.
# proj_name/settings.py # ... bestehende Einstellungen ... CELERY_BROKER_URL = 'redis://localhost:6379/0' CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' CELERY_ACCEPT_CONTENT = ['json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_TIMEZONE = 'Asia/Shanghai' # Oder Ihre gewünschte Zeitzone
4. Definieren einer Aufgabe
Erstellen Sie eine Datei tasks.py
innerhalb einer Ihrer Django-Apps (z.B. my_app/tasks.py
), um asynchrone Aufgaben zu definieren.
# my_app/tasks.py import time from celery import shared_task @shared_task def send_marketing_emails(user_ids): print(f"Starting to send emails to {len(user_ids)} users...") for user_id in user_ids: # Simulieren eines zeitaufwändigen E-Mail-Versandprozesses time.sleep(2) print(f"Email sent to user {user_id}") return f"Finished sending emails to {len(user_ids)} users." @shared_task def generate_report(report_id): print(f"Generating report {report_id}...") time.sleep(10) # Lange Berichtserstellung simulieren print(f"Report {report_id} generated successfully!") return {"status": "completed", "report_id": report_id}
5. Aufrufen von Aufgaben
Sie können diese Aufgaben von Ihren Django-Views oder anderen Teilen Ihrer Anwendung aus aufrufen.
# my_app/views.py from django.http import HttpResponse from .tasks import send_marketing_emails, generate_report def email_sending_view(request): user_ids_to_email = [1, 2, 3, 4, 5] # .delay() ist eine Abkürzung für .apply_async() send_marketing_emails.delay(user_ids_to_email) return HttpResponse("Email sending initiated. Check logs for progress.") def report_generation_view(request): report_task = generate_report.delay("monthly_sales") # Sie können die Aufgaben-ID abrufen, um ihren Status später zu überprüfen return HttpResponse(f"Report generation initiated. Task ID: {report_task.id}") def check_report_status(request, task_id): from celery.result import AsyncResult result = AsyncResult(task_id) return HttpResponse(f"Task ID: {task_id}, Status: {result.status}, Result: {result.result}")
6. Ausführen von Celery Worker und Broker
Sie benötigen einen laufenden Redis-Server (oder Ihren ausgewählten Broker) und einen Celery-Worker.
Starten Sie Redis:
redis-server
Starten Sie den Celery Worker von Ihrem Django-Projektstammverzeichnis aus:
celery -A proj_name worker -l info
Das -A proj_name
gibt Ihr Django-Projekt an. -l info
setzt die Protokollierungsstufe.
Integration von Celery mit Flask
Der Prozess für Flask ist ziemlich ähnlich, wobei der Schwerpunkt auf dem Anwendungskontext und der Konfiguration liegt.
1. Installation
pip install celery redis Flask
2. Flask Anwendungs-Setup
Erstellen Sie eine celery_app.py
oder integrieren Sie sie direkt in Ihre app.py
. Ein gängiges Muster ist die Erstellung einer Celery Application Factory.
# app.py from flask import Flask, jsonify from celery import Celery import time def make_celery(app): celery = Celery( app.import_name, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'] ) celery.conf.update(app.config) class ContextTask(celery.Task): def __call__(self, *args, **kwargs): with app.app_context(): return self.run(*args, **kwargs) celery.Task = ContextTask return celery flask_app = Flask(__name__) flask_app.config.update( CELERY_BROKER_URL='redis://localhost:6379/0', CELERY_RESULT_BACKEND='redis://localhost:6379/0', CELERY_ACCEPT_CONTENT = ['json'], CELERY_TASK_SERIALIZER = 'json', CELERY_RESULT_SERIALIZER = 'json', CELERY_TIMEZONE = 'Asia/Shanghai' ) celery_app = make_celery(flask_app) # Aufgaben definieren @celery_app.task def long_running_task(x, y): print(f"Starting long running task with {x} and {y}...") time.sleep(5) result = x + y print(f"Task completed. Result: {result}") return result @celery_app.task(bind=True) def background_download(self, url): print(f"Downloading from {url}...") time.sleep(7) # Download-Fortschritt simulieren for i in range(1, 11): self.update_state(state='PROGRESS', meta={'current': i * 10, 'total': 100}) time.sleep(0.5) print(f"Finished downloading from {url}") return {"status": "success", "url": url} # Flask-Routen @flask_app.route('/') def index(): return "Hello, Flask with Celery!" @flask_app.route('/start_task/<int:num1>/<int:num2>') def start_task(num1, num2): task = long_running_task.delay(num1, num2) return jsonify({"message": "Task started", "task_id": task.id}) @flask_app.route('/download/<path:url>') def start_download(url): task = background_download.delay(url) return jsonify({"message": "Download initiated", "task_id": task.id}) @flask_app.route('/check_task/<task_id>') def check_task(task_id): from celery.result import AsyncResult result = AsyncResult(task_id, app=celery_app) if result.state == 'PENDING': response = {'state': result.state, 'status': 'Pending...'} elif result.state == 'PROGRESS': response = {'state': result.state, 'status': 'Running...', 'data': result.info.get('current', 0)} elif result.state == 'SUCCESS': response = {'state': result.state, 'status': 'Completed', 'result': result.result} else: response = {'state': result.state, 'status': str(result.info)} # Etwas ist schief gelaufen return jsonify(response) if __name__ == '__main__': flask_app.run(debug=True)
3. Ausführen von Celery Worker und Broker
Starten Sie Redis:
redis-server
Starten Sie den Celery Worker, typischerweise vom Verzeichnis aus, das Ihre app.py
enthält:
celery -A app.celery_app worker -l info
Hier verweist -A app.celery_app
auf die Celery-App-Instanz namens celery_app
in app.py
.
Anwendungsszenarien
Celery ist unglaublich vielseitig und kann in einer Vielzahl von Szenarien angewendet werden, in denen asynchrone Verarbeitung von Vorteil ist:
- E-Mail-Versand: Senden von Willkommens-E-Mails, Newslettern, Links zum Zurücksetzen des Passworts oder Benachrichtigungen.
- Bild-/Videoverarbeitung: Miniaturansichtserstellung, Größenänderung, Wasserzeichen, Videokodierung.
- Berichtsgenerierung: Erstellung komplexer PDFs, CSVs oder Excel-Berichte, deren Erstellung lange dauert.
- Datenimporte/-exporte: Verarbeitung großer Datei-Uploads oder Generierung großer Datendumps.
- API-Integrationen: Anfragen an Drittanbieter-APIs, die langsam oder ratenlimitiert sein können.
- Geplante Aufgaben: Verwendung von Celery Beat, um Aufgaben in regelmäßigen Abständen auszuführen (z.B. tägliche Datensicherungen, wöchentliche Statistikzusammenstellungen).
Durch das Auslagern dieser Aufgaben konzentriert sich Ihr Webserver darauf, Benutzeranfragen schnell zu beantworten, was zu einer viel reibungsloseren und angenehmeren Benutzererfahrung führt.
Fazit
Die Integration von Celery in Ihre Django- oder Flask-Anwendungen ist ein leistungsstarkes Muster für die asynchrone Verarbeitung zeitaufwändiger Operationen. Durch die Trennung dieser Aufgaben vom Haupt-Anforderungs-Antwort-Zyklus verbessern Sie die Reaktionsfähigkeit, Skalierbarkeit und das Gesamterlebnis Ihrer Anwendung erheblich. Die Einrichtung ist unkompliziert und die Vorteile in Bezug auf Anwendungsleistung und Zuverlässigkeit sind beträchtlich, was sie zu einem unverzichtbaren Werkzeug für die robuste Webentwicklung macht. Das Auslagern mühsamer Aufgaben ist der Schlüssel zur Aufrechterhaltung einer flüssigen und reaktionsschnellen Benutzeroberfläche.