Backend

FastAPI実践ガイド - データベース・テスト・デプロイ完全マスター

kitahara-dev2025/11/0716 min read
FastAPI実践ガイド - データベース・テスト・デプロイ完全マスター

FastAPI実践ガイド - データベース・テスト・デプロイ完全マスター

FastAPIで本番環境に耐えうるアプリケーションを構築するための実践的なテクニックを解説します。データベース統合、テスト、Docker化、CI/CD、デプロイまで、実務で必要となるすべてのトピックをカバーします。

📋 目次

  1. データベース統合(SQLAlchemy)
  2. Alembicでのマイグレーション管理
  3. テスティング(pytest)
  4. Dockerコンテナ化
  5. Docker Composeでの複数サービス管理
  6. CI/CDパイプライン
  7. 本番環境へのデプロイ
  8. パフォーマンス最適化
  9. ロギングとモニタリング
  10. エラートラッキングとAPM

1. データベース統合(SQLAlchemy)

🔧 SQLAlchemyのセットアップ

FastAPIとSQLAlchemyの統合により、強力なORM機能を活用できます。

# requirements.txt
sqlalchemy==2.0.23
psycopg2-binary==2.9.9
asyncpg==0.29.0

# database.py
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# PostgreSQL接続URL
SQLALCHEMY_DATABASE_URL = "postgresql://user:password@localhost/dbname"

# エンジン作成
engine = create_engine(
    SQLALCHEMY_DATABASE_URL,
    pool_size=20,
    max_overflow=0,
    pool_pre_ping=True,
    echo=True  # 開発時はSQLログを出力
)

# セッション作成
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# ベースクラス
Base = declarative_base()

# 依存性注入用のDB取得関数
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

📦 モデル定義

# models.py
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String, DateTime, Text
from sqlalchemy.orm import relationship
from datetime import datetime
from database import Base

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, unique=True, index=True, nullable=False)
    username = Column(String, unique=True, index=True, nullable=False)
    hashed_password = Column(String, nullable=False)
    is_active = Column(Boolean, default=True)
    is_superuser = Column(Boolean, default=False)
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

    # リレーションシップ
    posts = relationship("Post", back_populates="owner", cascade="all, delete-orphan")

class Post(Base):
    __tablename__ = "posts"

    id = Column(Integer, primary_key=True, index=True)
    title = Column(String, index=True, nullable=False)
    content = Column(Text, nullable=False)
    published = Column(Boolean, default=False)
    owner_id = Column(Integer, ForeignKey("users.id"), nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

    # リレーションシップ
    owner = relationship("User", back_populates="posts")

🔄 CRUDオペレーション

# crud.py
from sqlalchemy.orm import Session
from typing import List, Optional
import models, schemas

class CRUDUser:
    def get(self, db: Session, user_id: int) -> Optional[models.User]:
        return db.query(models.User).filter(models.User.id == user_id).first()

    def get_by_email(self, db: Session, email: str) -> Optional[models.User]:
        return db.query(models.User).filter(models.User.email == email).first()

    def get_multi(self, db: Session, skip: int = 0, limit: int = 100) -> List[models.User]:
        return db.query(models.User).offset(skip).limit(limit).all()

    def create(self, db: Session, obj_in: schemas.UserCreate) -> models.User:
        db_obj = models.User(
            email=obj_in.email,
            username=obj_in.username,
            hashed_password=get_password_hash(obj_in.password)
        )
        db.add(db_obj)
        db.commit()
        db.refresh(db_obj)
        return db_obj

    def update(
        self, db: Session, db_obj: models.User, obj_in: schemas.UserUpdate
    ) -> models.User:
        update_data = obj_in.model_dump(exclude_unset=True)
        for field, value in update_data.items():
            setattr(db_obj, field, value)
        db.add(db_obj)
        db.commit()
        db.refresh(db_obj)
        return db_obj

    def delete(self, db: Session, user_id: int) -> models.User:
        obj = db.query(models.User).get(user_id)
        db.delete(obj)
        db.commit()
        return obj

user_crud = CRUDUser()

🚀 エンドポイントでの使用

# main.py
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
import crud, models, schemas
from database import engine, get_db

# テーブル作成
models.Base.metadata.create_all(bind=engine)

app = FastAPI()

@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.user_crud.get_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.user_crud.create(db=db, obj_in=user)

@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.user_crud.get_multi(db, skip=skip, limit=limit)
    return users

@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.user_crud.get(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user

2. Alembicでのマイグレーション管理

📦 Alembicのセットアップ

# Alembicインストール
pip install alembic

# 初期化
alembic init alembic

# alembic.iniを編集してDB URLを設定
# sqlalchemy.url = postgresql://user:password@localhost/dbname

⚙️ alembic/env.pyの設定

# alembic/env.py
from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import context
import os
import sys

# プロジェクトルートをPythonパスに追加
sys.path.insert(0, os.path.realpath(os.path.join(os.path.dirname(__file__), '..')))

from database import Base
from models import User, Post  # すべてのモデルをインポート

# Alembicの設定
config = context.config

# ロギング設定
if config.config_file_name is not None:
    fileConfig(config.config_file_name)

# メタデータを設定
target_metadata = Base.metadata

def run_migrations_offline() -> None:
    """オフラインモードでのマイグレーション実行"""
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
    )

    with context.begin_transaction():
        context.run_migrations()

def run_migrations_online() -> None:
    """オンラインモードでのマイグレーション実行"""
    connectable = engine_from_config(
        config.get_section(config.config_ini_section),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )

    with connectable.connect() as connection:
        context.configure(
            connection=connection,
            target_metadata=target_metadata
        )

        with context.begin_transaction():
            context.run_migrations()

if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

🔄 マイグレーションの作成と適用

# 初回マイグレーション作成
alembic revision --autogenerate -m "Initial migration"

# マイグレーション適用
alembic upgrade head

# ロールバック
alembic downgrade -1

# 履歴確認
alembic history

# 現在のリビジョン確認
alembic current

✏️ カスタムマイグレーション例

# alembic/versions/xxx_add_user_bio.py
"""add user bio

Revision ID: xxx
Revises: yyy
Create Date: 2025-11-07 10:00:00.000000

"""
from alembic import op
import sqlalchemy as sa

# revision identifiers
revision = 'xxx'
down_revision = 'yyy'
branch_labels = None
depends_on = None

def upgrade() -> None:
    # カラム追加
    op.add_column('users', sa.Column('bio', sa.Text(), nullable=True))

    # インデックス追加
    op.create_index('ix_users_username', 'users', ['username'])

def downgrade() -> None:
    # ロールバック処理
    op.drop_index('ix_users_username', 'users')
    op.drop_column('users', 'bio')

3. テスティング(pytest)

📦 テスト環境のセットアップ

# requirements-dev.txt
pytest==7.4.3
pytest-asyncio==0.21.1
pytest-cov==4.1.0
httpx==0.25.2

# conftest.py
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from database import Base, get_db
from main import app

# テスト用DB(SQLite in-memory)
SQLALCHEMY_TEST_DATABASE_URL = "sqlite:///./test.db"

engine = create_engine(
    SQLALCHEMY_TEST_DATABASE_URL,
    connect_args={"check_same_thread": False}
)
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

@pytest.fixture(scope="function")
def db():
    """テスト用DBセッション"""
    Base.metadata.create_all(bind=engine)
    db = TestingSessionLocal()
    try:
        yield db
    finally:
        db.close()
        Base.metadata.drop_all(bind=engine)

@pytest.fixture(scope="function")
def client(db):
    """テスト用クライアント"""
    def override_get_db():
        try:
            yield db
        finally:
            db.close()

    app.dependency_overrides[get_db] = override_get_db
    yield TestClient(app)
    app.dependency_overrides.clear()

🧪 ユニットテスト

# tests/test_users.py
import pytest
from fastapi import status

def test_create_user(client):
    """ユーザー作成のテスト"""
    response = client.post(
        "/users/",
        json={
            "email": "test@example.com",
            "username": "testuser",
            "password": "testpass123"
        }
    )
    assert response.status_code == status.HTTP_200_OK
    data = response.json()
    assert data["email"] == "test@example.com"
    assert data["username"] == "testuser"
    assert "id" in data

def test_create_user_duplicate_email(client):
    """重複メールアドレスのテスト"""
    user_data = {
        "email": "duplicate@example.com",
        "username": "user1",
        "password": "pass123"
    }

    # 1回目は成功
    response = client.post("/users/", json=user_data)
    assert response.status_code == status.HTTP_200_OK

    # 2回目は失敗
    user_data["username"] = "user2"  # ユーザー名は変更
    response = client.post("/users/", json=user_data)
    assert response.status_code == status.HTTP_400_BAD_REQUEST

def test_read_users(client):
    """ユーザー一覧取得のテスト"""
    # テストデータ作成
    for i in range(5):
        client.post(
            "/users/",
            json={
                "email": f"user{i}@example.com",
                "username": f"user{i}",
                "password": "pass123"
            }
        )

    # 一覧取得
    response = client.get("/users/")
    assert response.status_code == status.HTTP_200_OK
    data = response.json()
    assert len(data) == 5

@pytest.mark.asyncio
async def test_async_endpoint(client):
    """非同期エンドポイントのテスト"""
    response = client.get("/async-endpoint")
    assert response.status_code == status.HTTP_200_OK

📊 カバレッジレポート

# カバレッジ付きテスト実行
pytest --cov=. --cov-report=html --cov-report=term

# HTMLレポート確認
open htmlcov/index.html

4. Dockerコンテナ化

🐳 Dockerfile

# Dockerfile
FROM python:3.11-slim

# 作業ディレクトリ
WORKDIR /app

# 環境変数
ENV PYTHONUNBUFFERED=1 \
    PYTHONDONTWRITEBYTECODE=1 \
    PIP_NO_CACHE_DIR=1 \
    PIP_DISABLE_PIP_VERSION_CHECK=1

# システム依存関係
RUN apt-get update && apt-get install -y \
    gcc \
    postgresql-client \
    && rm -rf /var/lib/apt/lists/*

# Python依存関係
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# アプリケーションコード
COPY . .

# 非rootユーザー作成
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser

# ヘルスチェック
HEALTHCHECK --interval=30s --timeout=3s --start-period=40s --retries=3 \
    CMD python -c "import requests; requests.get('http://localhost:8000/health')"

# ポート公開
EXPOSE 8000

# 起動コマンド
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

📝 .dockerignore

__pycache__
*.pyc
*.pyo
*.pyd
.Python
env/
venv/
.venv
.git
.gitignore
.pytest_cache
.coverage
htmlcov/
*.db
.env
.vscode
.idea

🏗️ マルチステージビルド(本番用)

# Dockerfile.prod
# ビルドステージ
FROM python:3.11-slim as builder

WORKDIR /app

RUN apt-get update && apt-get install -y gcc postgresql-client

COPY requirements.txt .
RUN pip install --user --no-cache-dir -r requirements.txt

# 実行ステージ
FROM python:3.11-slim

WORKDIR /app

# ビルドステージからPythonパッケージをコピー
COPY --from=builder /root/.local /root/.local
ENV PATH=/root/.local/bin:$PATH

# PostgreSQLクライアントのみインストール
RUN apt-get update && apt-get install -y postgresql-client && rm -rf /var/lib/apt/lists/*

COPY . .

RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser

EXPOSE 8000

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]

5. Docker Composeでの複数サービス管理

🔧 docker-compose.yml

version: '3.8'

services:
  # FastAPIアプリケーション
  api:
    build:
      context: .
      dockerfile: Dockerfile
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql://postgres:password@db:5432/fastapi_db
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      db:
        condition: service_healthy
      redis:
        condition: service_started
    volumes:
      - ./:/app
    command: uvicorn main:app --host 0.0.0.0 --port 8000 --reload

  # PostgreSQL
  db:
    image: postgres:16-alpine
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=password
      - POSTGRES_DB=fastapi_db
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U postgres"]
      interval: 10s
      timeout: 5s
      retries: 5

  # Redis(キャッシング用)
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data

  # pgAdmin(DB管理ツール)
  pgadmin:
    image: dpage/pgadmin4:latest
    environment:
      - PGADMIN_DEFAULT_EMAIL=admin@example.com
      - PGADMIN_DEFAULT_PASSWORD=admin
    ports:
      - "5050:80"
    depends_on:
      - db

volumes:
  postgres_data:
  redis_data:

🚀 Docker Compose操作

# サービス起動
docker-compose up -d

# ログ確認
docker-compose logs -f api

# サービス停止
docker-compose down

# ボリュームも削除
docker-compose down -v

# 再ビルド
docker-compose up -d --build

# 特定サービスのみ再起動
docker-compose restart api

6. CI/CDパイプライン

🔄 GitHub Actions

# .github/workflows/ci.yml
name: CI/CD Pipeline

on:
  push:
    branches: [ main, develop ]
  pull_request:
    branches: [ main ]

jobs:
  test:
    runs-on: ubuntu-latest

    services:
      postgres:
        image: postgres:16
        env:
          POSTGRES_USER: postgres
          POSTGRES_PASSWORD: postgres
          POSTGRES_DB: test_db
        ports:
          - 5432:5432
        options: >-
          --health-cmd pg_isready
          --health-interval 10s
          --health-timeout 5s
          --health-retries 5

    steps:
      - uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'
          cache: 'pip'

      - name: Install dependencies
        run: |
          python -m pip install --upgrade pip
          pip install -r requirements.txt
          pip install -r requirements-dev.txt

      - name: Run linters
        run: |
          # Black
          black --check .
          # isort
          isort --check-only .
          # flake8
          flake8 .
          # mypy
          mypy .

      - name: Run tests
        env:
          DATABASE_URL: postgresql://postgres:postgres@localhost:5432/test_db
        run: |
          pytest --cov=. --cov-report=xml --cov-report=term

      - name: Upload coverage to Codecov
        uses: codecov/codecov-action@v3
        with:
          file: ./coverage.xml
          fail_ci_if_error: true

  build:
    needs: test
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'

    steps:
      - uses: actions/checkout@v4

      - name: Set up Docker Buildx
        uses: docker/setup-buildx-action@v3

      - name: Login to Docker Hub
        uses: docker/login-action@v3
        with:
          username: ${{ secrets.DOCKER_USERNAME }}
          password: ${{ secrets.DOCKER_PASSWORD }}

      - name: Build and push
        uses: docker/build-push-action@v5
        with:
          context: .
          push: true
          tags: |
            username/fastapi-app:latest
            username/fastapi-app:${{ github.sha }}
          cache-from: type=gha
          cache-to: type=gha,mode=max

  deploy:
    needs: build
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'

    steps:
      - name: Deploy to production
        run: |
          # デプロイスクリプト実行
          echo "Deploying to production..."

7. 本番環境へのデプロイ

🚀 Railwayへのデプロイ

# Railway CLIインストール
npm i -g @railway/cli

# ログイン
railway login

# プロジェクト初期化
railway init

# PostgreSQL追加
railway add postgresql

# デプロイ
railway up

# ログ確認
railway logs

☁️ AWS ECS/Fargateへのデプロイ

// task-definition.json
{
  "family": "fastapi-app",
  "networkMode": "awsvpc",
  "requiresCompatibilities": ["FARGATE"],
  "cpu": "256",
  "memory": "512",
  "containerDefinitions": [
    {
      "name": "fastapi-container",
      "image": "your-registry/fastapi-app:latest",
      "portMappings": [
        {
          "containerPort": 8000,
          "protocol": "tcp"
        }
      ],
      "environment": [
        {
          "name": "DATABASE_URL",
          "value": "postgresql://..."
        }
      ],
      "logConfiguration": {
        "logDriver": "awslogs",
        "options": {
          "awslogs-group": "/ecs/fastapi-app",
          "awslogs-region": "ap-northeast-1",
          "awslogs-stream-prefix": "ecs"
        }
      }
    }
  ]
}

🔐 環境変数管理

# config.py
from pydantic_settings import BaseSettings
from functools import lru_cache

class Settings(BaseSettings):
    # アプリケーション
    app_name: str = "FastAPI App"
    debug: bool = False

    # データベース
    database_url: str

    # Redis
    redis_url: str = "redis://localhost:6379/0"

    # JWT
    secret_key: str
    algorithm: str = "HS256"
    access_token_expire_minutes: int = 30

    # CORS
    allowed_origins: list[str] = ["http://localhost:3000"]

    # Sentry
    sentry_dsn: str | None = None

    class Config:
        env_file = ".env"
        case_sensitive = False

@lru_cache()
def get_settings() -> Settings:
    return Settings()

8. パフォーマンス最適化

⚡ Redisキャッシング

# cache.py
import redis.asyncio as redis
from fastapi import Depends
import json
from typing import Optional

redis_client = redis.from_url("redis://localhost:6379", decode_responses=True)

async def get_cache(key: str) -> Optional[str]:
    """キャッシュから取得"""
    return await redis_client.get(key)

async def set_cache(key: str, value: str, expire: int = 300):
    """キャッシュに保存"""
    await redis_client.set(key, value, ex=expire)

# エンドポイントでの使用
@app.get("/users/{user_id}")
async def get_user_cached(user_id: int, db: Session = Depends(get_db)):
    # キャッシュチェック
    cache_key = f"user:{user_id}"
    cached = await get_cache(cache_key)

    if cached:
        return json.loads(cached)

    # DBから取得
    user = crud.user_crud.get(db, user_id=user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")

    # キャッシュに保存
    await set_cache(cache_key, json.dumps(user.model_dump()), expire=600)

    return user

🔄 コネクションプーリング

# database.py(改善版)
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

engine = create_engine(
    SQLALCHEMY_DATABASE_URL,
    poolclass=QueuePool,
    pool_size=20,           # 常に保持するコネクション数
    max_overflow=10,        # 最大追加コネクション数
    pool_timeout=30,        # タイムアウト秒数
    pool_recycle=3600,      # 1時間ごとにコネクションをリサイクル
    pool_pre_ping=True,     # 使用前に接続をテスト
    echo=False              # 本番環境ではFalse
)

📊 データベースインデックス

# models.py
from sqlalchemy import Index

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, unique=True, index=True)
    username = Column(String, unique=True, index=True)
    created_at = Column(DateTime, default=datetime.utcnow, index=True)

    # 複合インデックス
    __table_args__ = (
        Index('ix_user_email_active', 'email', 'is_active'),
    )

9. ロギングとモニタリング

📝 構造化ロギング

# logging_config.py
import logging
import sys
from pythonjsonlogger import jsonlogger

def setup_logging():
    """構造化ロギングのセットアップ"""
    logger = logging.getLogger()

    # ハンドラー作成
    handler = logging.StreamHandler(sys.stdout)

    # JSON形式でログ出力
    formatter = jsonlogger.JsonFormatter(
        '%(asctime)s %(levelname)s %(name)s %(message)s'
    )
    handler.setFormatter(formatter)

    logger.addHandler(handler)
    logger.setLevel(logging.INFO)

    return logger

# main.py
import logging
from logging_config import setup_logging

logger = setup_logging()

@app.middleware("http")
async def log_requests(request: Request, call_next):
    """リクエストログミドルウェア"""
    logger.info(
        "Request started",
        extra={
            "method": request.method,
            "path": request.url.path,
            "client_ip": request.client.host
        }
    )

    start_time = time.time()
    response = await call_next(request)
    process_time = time.time() - start_time

    logger.info(
        "Request completed",
        extra={
            "method": request.method,
            "path": request.url.path,
            "status_code": response.status_code,
            "process_time": process_time
        }
    )

    return response

📈 Prometheusメトリクス

# metrics.py
from prometheus_client import Counter, Histogram, generate_latest, CONTENT_TYPE_LATEST
from fastapi import Response

# メトリクス定義
REQUEST_COUNT = Counter(
    'fastapi_requests_total',
    'Total Request Count',
    ['method', 'endpoint', 'status']
)

REQUEST_DURATION = Histogram(
    'fastapi_request_duration_seconds',
    'Request Duration',
    ['method', 'endpoint']
)

@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
    """メトリクス収集ミドルウェア"""
    start_time = time.time()

    response = await call_next(request)

    duration = time.time() - start_time

    REQUEST_COUNT.labels(
        method=request.method,
        endpoint=request.url.path,
        status=response.status_code
    ).inc()

    REQUEST_DURATION.labels(
        method=request.method,
        endpoint=request.url.path
    ).observe(duration)

    return response

@app.get("/metrics")
def metrics():
    """Prometheusメトリクスエンドポイント"""
    return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)

10. エラートラッキングとAPM

🔍 Sentry統合

# main.py
import sentry_sdk
from sentry_sdk.integrations.fastapi import FastApiIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration

sentry_sdk.init(
    dsn="your-sentry-dsn",
    integrations=[
        FastApiIntegration(),
        SqlalchemyIntegration(),
    ],
    traces_sample_rate=1.0,  # 本番環境では0.1など低めに設定
    environment="production",
    release="v1.0.0"
)

app = FastAPI()

@app.get("/debug-sentry")
async def trigger_error():
    """Sentryテスト用エンドポイント"""
    division_by_zero = 1 / 0

📊 OpenTelemetry統合

# opentelemetry_config.py
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor

def setup_telemetry(app):
    """OpenTelemetryセットアップ"""
    # Tracer設定
    trace.set_tracer_provider(TracerProvider())

    # OTLP Exporterを設定(Jaeger、Tempo等に送信)
    otlp_exporter = OTLPSpanExporter(endpoint="localhost:4317")
    trace.get_tracer_provider().add_span_processor(
        BatchSpanProcessor(otlp_exporter)
    )

    # FastAPI instrumentation
    FastAPIInstrumentor.instrument_app(app)

    # SQLAlchemy instrumentation
    SQLAlchemyInstrumentor().instrument(engine=engine)

# main.py
setup_telemetry(app)

📝 まとめ

FastAPI実践編で学んだ内容:

データベース・ORM

  • SQLAlchemy統合 - ORM、セッション管理、CRUD操作
  • Alembicマイグレーション - スキーマ管理、バージョン管理

テスティング

  • pytest - ユニットテスト、フィクスチャ
  • カバレッジ - コードカバレッジ測定

インフラ・デプロイ

  • Docker化 - コンテナ化、マルチステージビルド
  • Docker Compose - 複数サービス管理
  • CI/CD - GitHub Actions、自動テスト・デプロイ
  • 本番デプロイ - Railway、AWS ECS/Fargate

パフォーマンス

  • キャッシング - Redis統合
  • コネクションプーリング - DB最適化
  • インデックス - クエリ最適化

可観測性

  • ロギング - 構造化ログ
  • モニタリング - Prometheusメトリクス
  • エラートラッキング - Sentry、OpenTelemetry

🎯 これであなたは...

  • 本番環境で動作するFastAPIアプリケーションを構築できます
  • データベースを適切に統合し、マイグレーション管理ができます
  • 包括的なテストを実装し、品質を保証できます
  • Dockerでアプリケーションをコンテナ化できます
  • CI/CDパイプラインを構築し、自動デプロイできます
  • パフォーマンスを最適化し、スケーラブルなシステムを構築できます
  • ログ・メトリクス・トレースで可観測性を確保できます

🔗 シリーズまとめ

FastAPI 3部作:

  • FastAPI基本編 - セットアップ、エンドポイント、Pydantic、バリデーション
  • FastAPI中級編 - JWT認証、OAuth2、依存性注入、ミドルウェア、WebSocket
  • FastAPI実践編 - データベース、テスト、Docker、CI/CD、デプロイ、最適化

🔗 参考リンク

FastAPIでエンタープライズグレードのWeb APIを構築しましょう!

#Python#FastAPI#SQLAlchemy#Docker#CI/CD#PostgreSQL#pytest

著者について

kitahara-devによって執筆されました。