SQLAlchemy 2.0:これまでで最も強力なPython ORMである理由
Min-jun Kim
Dev Intern · Leapcell

SQLAlchemyチュートリアル
SQLAlchemyはPythonエコシステムで最も人気のあるオブジェクトリレーショナルマッパー(ORM)です。エレガントな設計を持ち、基盤となるCoreと高レベルの伝統的なORMの2つの部分に分かれています。Pythonや他の言語のほとんどのORMでは、優れた階層設計が実装されていません。例えば、DjangoのORMでは、データベース接続とORM自体が完全に混在しています。

なぜCoreが必要なのか?
Coreレイヤーは主にクライアント接続プールを実装しています。最新のWebアプリケーションのコアとして、リレーショナルデータベースの同時接続能力はしばしば強くありません。多数の短い接続を使用することは一般的に推奨されず、ほとんどの場合、接続プールが必要です。接続プールには大まかに2種類あります。
- サーバーサイド接続プール:短い接続ごとに再利用のための長い接続を割り当てる専門の接続プールミドルウェア。
- クライアントサイド接続プール:一般的にサードパーティライブラリとしてコードに導入されます。
SQLAlchemyの接続プールはクライアントサイド接続プールに属します。この接続プールでは、SQLAlchemyは一定数の長い接続を維持します。connectが呼び出されると、実際にはプールから接続を取得し、closeが呼び出されると、実際には接続をプールに戻します。
接続の作成
SQLAlchemyでは、create_engineを使用して接続(プール)を作成します。create_engineのパラメータはデータベースのURLです。
from sqlalchemy import create_engine # MySQL接続例 engine = create_engine( "mysql://user:password@localhost:3306/dbname", echo=True, # echoをTrueに設定すると、実際に実行されたSQLが出力され、デバッグに便利です future=True, # SQLAlchemy 2.0 APIを使用します。これは後方互換性があります pool_size=5, # 接続プールのサイズはデフォルトで5です。0に設定すると、接続に制限はありません pool_recycle=3600 # データベースの自動切断の制限時間を設定します ) # インメモリSQLiteデータベースを作成します。check_same_thread=Falseを追加する必要があります。そうしないと、マルチスレッド環境で使用できません engine = create_engine("sqlite:///:memory:", echo=True, future=True, connect_args={"check_same_thread": False}) # MySQLへの別の接続方法 # pip install mysqlclient engine = create_engine('mysql+mysqldb://user:password@localhost/foo?charset=utf8mb4')
Coreレイヤー -- SQLの直接使用
CRUD
from sqlalchemy import text with engine.connect() as conn: result = conn.execute(text("select * from users")) print(result.all()) # 結果は反復可能で、各行の結果はRowオブジェクトです for row in result: # rowオブジェクトは3つのアクセス方法をサポートします print(row.x, row.y) print(row[0], row[1]) print(row["x"], row["y"]) # パラメータを渡す、`:var`を使用して渡します result = conn.execute( text("SELECT x, y FROM some_table WHERE y > :y"), {"y": 2} ) # パラメータを事前にコンパイルすることもできます stmt = text("SELECT x, y FROM some_table WHERE y > :y ORDER BY x, y").bindparams(y=6) # insertsを実行する場合、複数の行を直接insertできます conn.execute( text("INSERT INTO some_table (x, y) VALUES (:x, :y)"), [{"x": 11, "y": 12}, {"x": 13, "y": 14}] )
トランザクションとコミット
SQLAlchemyはコミットの2つの方法を提供します。1つは手動のcommit、もう1つは半自動のcommitです。公式ドキュメントではengine.begin()の使用を推奨しています。行ごとにコミットする完全に自動のautocommitメソッドもありますが、これは非推奨です。
# "commit as you go"は手動コミットが必要です with engine.connect() as conn: conn.execute(text("CREATE TABLE some_table (x int, y int)")) conn.execute( text("INSERT INTO some_table (x, y) VALUES (:x, :y)"), [{"x": 1, "y": 1}, {"x": 2, "y": 4}] ) conn.commit() # ここでのコミットに注意 # "begin once"半自動コミット with engine.begin() as conn: conn.execute( text("INSERT INTO some_table (x, y) VALUES (:x, :y)"), [{"x": 6, "y": 8}, {"x": 9, "y": 10}] )
ORM
セッション
Sessionはスレッドセーフではありません。しかし、一般的にWebフレームワークは各リクエストの開始時にsessionを取得するため、問題はありません。
from sqlalchemy.orm import Session with Session(engine) as session: session.add(foo) session.commit() # sessionmakerを使用してファクトリ関数を作成することもできます。 # これにより、毎回パラメータを入力する必要がなくなります from sqlalchemy.orm import sessionmaker new_session = sessionmaker(engine) with new_session() as session: ...
Declarative API
__tablename__を使用してデータベーステーブル名を指定します。Mappedとネイティブ型を使用して各フィールドを宣言します。Integer、Stringなどを使用してフィールド型を指定します。indexパラメータを使用してインデックスを指定します。uniqueパラメータを使用して一意インデックスを指定します。__table_args__を使用して、複合インデックスなどのその他の属性を指定します。
from datetime import datetime from sqlalchemy import Integer, String, func, UniqueConstraint, Text, DateTime from sqlalchemy.orm import relationship, mapped_column, Mapped from sqlalchemy.orm import DeclarativeBase class Base(DeclarativeBase): pass class User(Base): __tablename__ = "users" # タプルである必要があります、リストではありません __table_args__ = (UniqueConstraint("name", "time_created"),) id: Mapped[int] = mapped_column(Integer, primary_key=True) name: Mapped[str] = mapped_column(String(30), index=True) fullname: Mapped[str] = mapped_column(String, unique=True) # 特に大きなフィールドの場合、deferredを使用することもできます。これにより、このフィールドはデフォルトでロードされなくなります description: Mapped[str] = mapped_column(Text, deferred=True) # デフォルト値、関数が渡されることに注意してください、現在の時間ではありません time_created: Mapped[datetime] = mapped_column(DateTime(Timezone=True), default=datetime.now) # またはサーバーのデフォルト値を使用しますが、テーブル作成時に設定する必要があり、テーブルスキーマの一部になります time_created: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now()) time_updated: Mapped[datetime] = mapped_column(DateTime(timezone=True), onupdate=func.now()) class Address(Base): __tablename__ = "address" id: Mapped[int] = mapped_column(Integer, primary_key=True) email_address: Mapped[str] = mapped_column(String, nullable=False) # create_allを呼び出してすべてのモデルを作成します Base.metadata.create_all(engine) # 1つのモデルのみを作成する必要がある場合 User.__table__.create(engine)
外部キー (Foreign Keys)
relationshipを使用してモデル間の関連関係を指定します。
双方向マッピングの1対多リレーションシップ
from sqlalchemy import create_engine, Integer, String, ForeignKey from sqlalchemy.orm import DeclarativeBase, relationship, Session, Mapped, mapped_column class Group(Base): __tablename__ = 'groups' id: Mapped[int] = mapped_column(Integer, primary_key=True) name: Mapped[str] = mapped_column(String) # 対応する複数のユーザー。ここではモデル名をパラメータとして使用します members = relationship('User') class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String) # group_idはデータベース内の実際の外部キー名であり、2番目のフィールドForeignKeyは対応するIDを指定するために使用されます group_id = Column(Integer, ForeignKey('groups.id')) # モデル内の対応するgroupフィールド。対応するモデル内のどのフィールドと重複するかを宣言する必要があります group = relationship('Group', overlaps="members")
多対多マッピング、アソシエーションテーブルが必要です
# アソシエーションテーブル class UserPermissions(Base): __tablename__ = 'user_permissions' id: Mapped[int] = mapped_column(Integer, primary_key=True) # 外部キーを使用して外部キーを指定します user_id: Mapped[int] = mapped_column(Integer, ForeignKey('users.id')) permission_id: Mapped[str] = mapped_column(String, ForeignKey('permissions.id')) class User(Base): __tablename__ = 'users' id: Mapped[int] = mapped_column(Integer, primary_key=True) name: Mapped[str] = Column(String) # secondaryを使用してアソシエーションテーブルを指定します。また、overlapsを使用してモデル内の対応するフィールドを指定します permissions = relationship('Permission', secondary="user_permissions", overlaps="users") class Permission(Base): __tablename__ = 'permissions' id: Mapped[int] = mapped_column(Integer, primary_key=True) name: Mapped[str] = Column(String) # 上記と同じ users = relationship('User', secondary="user_permissions", overlaps="permissions") user1 = User(name='user1', group_id=1) user2 = User(name='user2') group1 = Group(name='group1') group2 = Group(name='group2', members=[user2]) permission1 = Permission(name="open_file") permission2 = Permission(name="save_file") user1.permissions.append(permission1) db.add_all([user1, user2, group1, group2, permission1, permission2]) db.commit() print(user1.permissions[0].id)
ほとんどの他のチュートリアルでは、backrefを使用して対応するモデルの属性を生成しています。ここでは、対応するモデルでアクセス可能な属性を明示的に宣言する方が好ましいです。
CRUD
1.x APIとは異なり、2.0 APIではqueryは使用されなくなり、代わりにselectを使用してデータをクエリします。
from sqlalchemy import select # whereのパラメータは`==`で構成される式です。利点は、コードを記述する際にスペルミスが検出されることです stmt = select(User).where(User.name == "john").order_by(User.id) # filter_byは**kwargsをパラメータとして使用します stmt = select(User).filter_by(name="some_user") # order_byはUser.id.desc()を使用して逆順ソートを表すこともできます result = session.execute(stmt) # 一般的に、オブジェクト全体を選択する場合、scalarsメソッドを使用する必要があります。そうしないと、1つのオブジェクトを含むタプルが返されます for user in result.scalars(): print(user.name) # モデルの単一属性をクエリする場合、scalarsを使用する必要はありません result = session.execute(select(User.name)) for row in result: print(row.name) # idでクエリするためのショートカットもあります user = session.get(User, pk=1) # データを更新するには、updateステートメントを使用する必要があります from sqlalchemy import update # synchronize_sessionには、false、「fetch」、「evaluate」の3つのオプションがあり、デフォルトはevaluateです # falseは、Python内のオブジェクトをまったく更新しないことを意味します # fetchは、データベースからオブジェクトを再ロードすることを意味します # evaluateは、データベースを更新しながら、Python内のオブジェクトに対しても可能な限り同じ操作を試みることを意味します stmt = update(User).where(User.name == "john").values(name="John").execution_options(synchronize_session="fetch") session.execute(stmt) # または、属性に直接値を割り当てます user.name = "John" session.commit() # ここで競合状態(競態条件)を導入する可能性のある場所があります # 間違い!2つのプロセスがこの値を同時に更新した場合、1つの値のみが更新される可能性があります。 # 両方とも正しいと考える値を割り当てますが、実際には正しい値は1 + 1 + 1 = 3です。 # 対応するSQL:Update users set visit_count = 2 where user.id = 1 user.visit_count += 1 # 正しいアプローチ:大文字のUに注意してください。つまり、モデルの属性を使用します。生成されるSQLは、SQLサーバー側で1を加算します # 対応するSQL:Update users set visit_count = visit_count + 1 where user.id = 1 user.visit_count = User.visit_count + 1 # オブジェクトを追加するには、session.addメソッドを直接使用します session.add(user) # またはadd_all session.add_all([user1, user2, group1]) # 挿入されたIDを取得したい場合、もちろんコミット後に読み取ることもできます session.flush() # flushはコミットではありません。トランザクションはコミットされていません。これは繰り返し読み取りに関連しており、データベースの分離レベルに関連しています。 print(user.id) # 削除するには、session.deleteを使用します session.delete(user)
関連モデルのロード
N件のレコードのリストを読み取った後、各アイテムの特定の値に対して1つずつデータベースにアクセスすると、N+1クエリが生成されます。これはデータベースで最も一般的な間違いであるN+1問題です。
デフォルトでは、外部キーに関連付けられたモデルはクエリでロードされません。selectinloadオプションを使用して外部キーをロードし、N+1問題を回避できます。
# 外部キーはロードされていません session.execute(select(User)).scalars().all() # 外部キーがロードされています session.execute(select(User).options(selectinload(User.groups))).scalars().all()
selectinloadの原則は、select inサブクエリを使用することです。selectinloadに加えて、従来のjoinedloadも使用でき、その原則は最も一般的なjoin tableです。
# joinedloadを使用して外部キーをロードします。注意:uniqueメソッドを使用する必要があります。これは2.0で指定されています。 session.execute(select(User).options(joinedload(User.groups))).unique().scalars().all()
2.0では、joinedloadよりもselectinloadを使用することがより推奨されます。一般的に、selectinloadの方が優れており、uniqueを使用する必要はありません。
外部キーの書き込み
SQLAlchemyでは、配列のように外部キーを直接操作できます。
user.permissions.append(open_permission) # 追加 user.permissions.remove(save_permission) # 削除 # すべての外部キーをクリアします user.permissions.clear() user.permissions = []
JSONフィールドの特別な処理
ほとんどのデータベースは現在JSONフィールドをサポートしています。SQLAlchemyでは、JSONオブジェクトをフィールドから直接読み取ったり、JSONオブジェクトを書き込んだりできます。ただし、このJSONオブジェクトを直接updateしてデータベースに書き戻そうとしないでください。これは信頼性がありません。必ずコピー、読み取り、書き込みを行い、その後再割り当てしてください。
import copy article = session.get(Article, 1) tags = copy.copy(article.tags) tags.append("iOS") article.tags = tags session.commit()
バッチ挿入
大量のデータを挿入する必要がある場合、1つずつ挿入する方法を使用すると、データベースとのやり取りに多くの時間が費やされ、効率が非常に低くなります。MySQLなどのほとんどのデータベースはinsert ... values (...), (...) ...バッチ挿入APIを提供しており、これもSQLAlchemyでうまく活用できます。
# session.bulk_save_objects(...)を使用して複数のオブジェクトを直接挿入します from sqlalchemy.orm import Session s = Session() objects = [ User(name="u1"), User(name="u2"), User(name="u3") ] s.bulk_save_objects(objects) s.commit() # bulk_insert_mappingsを使用してオブジェクト作成のオーバーヘッドを節約し、辞書を直接挿入します users = [ {"name": "u1"}, {"name": "u2"}, {"name": "u3"}, ] s.bulk_insert_mappings(User, users) s.commit() # bulk_update_mappingsを使用してオブジェクトをバッチで更新できます。辞書のidはwhere条件として使用され、 # 他のすべてのフィールドは更新に使用されます session.bulk_update_mappings(User, users)
DeclarativeBase
Pythonネイティブ型システムを完全に採用します
from sqlalchemy.orm import DeclarativeBase class Base(DeclarativeBase): pass from sqlalchemy.orm import mapped_column, MappedColumn id: Mapped[int] = mapped_column(Integer, primary_key=True) fullname: Mapped[Optional[str]]
Asyncio
1つのAsyncSessionを1つのタスクにつき。AsyncSessionオブジェクトは、進行中の単一のステートフルなデータベーストランザクションを表す、変更可能でステートフルなオブジェクトです。asyncioを並列タスクに使用する場合、asyncio.gather()のようなAPIを使用すると、個々のタスクごとに個別のAsyncSessionを使用する必要があります。
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession engine = create_async_engine(url, echo=True) session = async_sessionmaker(engine) # オブジェクトの作成 async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) # データ挿入 async with session() as db: db.add(...) await db.commit() # データクエリ async with session() as db: stmt = select(A) row = await db.execute(stmt) for obj in row.scalars(): print(obj.id) await engine.dispose()
マルチプロセス環境での使用
Pythonのグローバルインタープリタロック(GIL)のため、マルチコアプロセッサを利用するにはマルチプロセスを使用する必要があります。マルチプロセス環境では、リソースを共有できません。SQLAlchemyに対応するのは、接続プールを共有できないということです。この問題は手動で解決する必要があります。
一般的に、複数のプロセス間で同じSessionを共有しようとしないのが最善です。各プロセスの初期化時にSessionを作成するのが最善です。
値が設定されている場合にのみWhere条件を追加する
URLでは、ユーザーが指定したオプションに応じて、対応する結果を返す必要があることがよくあります。
query = select(User) if username is not None: query = query.where(User.username == username) if password is not None: query = query.where(User.password == password)
Leapcell:Webホスティング、非同期タスク、Redis向けの次世代サーバーレスプラットフォーム
最後に、Pythonサービスをデプロイするのに最も適したプラットフォーム、**Leapcell**をお勧めします。

1. 多言語サポート
- JavaScript、Python、Go、またはRustで開発します。
2. 無制限のプロジェクトを無料でデプロイ
- 使用量のみ支払い。リクエスト数による課金はありません。
3. 比類のないコスト効率
- 使用量に応じた従量課金、アイドル料金なし。
- 例:25ドルで、平均応答時間60msで694万リクエストをサポート。
4. 合理化された開発者エクスペリエンス
- 簡単なセットアップのための直感的なUI。
- 完全自動化されたCI/CDパイプラインとGitOps統合。
- 説明責任のある洞察のためのリアルタイムメトリクスとロギング。
5. 簡単なスケーラビリティと高性能
- 高い並列処理を容易に処理するための自動スケーリング。
- 運用オーバーヘッドゼロ — 構築に集中するだけです。

Leapcell Twitter: https://x.com/LeapcellHQ

