プロジェクト

全般

プロフィール

バグ #951

未完了

【948-3】バックエンドAPI実装 - FastAPI/SQLAlchemy/Celery

Redmine Admin さんが2ヶ月前に追加.

ステータス:
新規
優先度:
急いで
担当者:
-
開始日:
2025-07-31
期日:
進捗率:

0%

予定工数:

説明

【子チケット3】バックエンドAPI実装

🎯 目的

FastAPI + SQLAlchemy + Celeryベースのバックエンドシステムを実装し、フォーム自動化の核となるAPI基盤を構築する。

📋 実装内容

1. バックエンド基盤セットアップ

cd /root/form-automation-system/backend

# Poetry プロジェクト初期化
poetry init --name form-automation-api --version 0.1.0 --python "^3.11"

# 基本依存関係インストール
poetry add \
  fastapi \
  uvicorn[standard] \
  sqlalchemy \
  alembic \
  psycopg2-binary \
  redis \
  celery \
  pydantic[email] \
  python-multipart \
  python-jose[cryptography] \
  passlib[bcrypt] \
  httpx \
  aiofiles \
  python-dotenv

# 開発依存関係
poetry add --group dev \
  pytest \
  pytest-asyncio \
  pytest-cov \
  black \
  isort \
  flake8 \
  mypy \
  pre-commit

2. プロジェクト構造

backend/
├── app/
│   ├── api/                 # API エンドポイント
│   │   ├── v1/             
│   │   │   ├── auth.py     # 認証
│   │   │   ├── tasks.py    # タスク管理
│   │   │   ├── data.py     # データ管理
│   │   │   ├── settings.py # 設定
│   │   │   └── monitoring.py # 監視
│   │   └── deps.py         # 依存関係
│   ├── core/               # コア機能
│   │   ├── config.py       # 設定管理
│   │   ├── security.py     # セキュリティ
│   │   ├── database.py     # データベース
│   │   └── celery_app.py   # Celery設定
│   ├── models/             # SQLAlchemyモデル
│   │   ├── user.py
│   │   ├── task.py
│   │   ├── url_list.py
│   │   ├── template.py
│   │   └── execution_result.py
│   ├── schemas/            # Pydanticスキーマ
│   │   ├── user.py
│   │   ├── task.py
│   │   ├── data.py
│   │   └── common.py
│   ├── services/           # ビジネスロジック
│   │   ├── task_service.py
│   │   ├── automation_service.py
│   │   ├── vpn_service.py
│   │   └── captcha_service.py
│   ├── workers/            # Celeryワーカー
│   │   ├── form_automation.py
│   │   ├── vpn_manager.py
│   │   └── monitoring.py
│   ├── utils/              # ユーティリティ
│   │   ├── playwright_utils.py
│   │   ├── vpn_utils.py
│   │   └── common.py
│   └── main.py             # FastAPIアプリケーション
├── alembic/                # データベースマイグレーション
├── tests/                  # テスト
├── Dockerfile
├── pyproject.toml
└── alembic.ini

3. 設定管理(app/core/config.py)

from pydantic_settings import BaseSettings
from typing import List, Optional

class Settings(BaseSettings):
    # API設定
    API_V1_STR: str = "/api/v1"
    PROJECT_NAME: str = "Form Automation API"
    VERSION: str = "1.0.0"
    
    # セキュリティ
    SECRET_KEY: str
    ACCESS_TOKEN_EXPIRE_MINUTES: int = 60 * 24 * 8  # 8 days
    ALGORITHM: str = "HS256"
    
    # データベース
    DATABASE_URL: str
    
    # Redis
    REDIS_URL: str = "redis://localhost:6379"
    
    # Celery
    CELERY_BROKER_URL: str = "redis://localhost:6379/0"
    CELERY_RESULT_BACKEND: str = "redis://localhost:6379/0"
    
    # CORS
    ALLOWED_ORIGINS: List[str] = ["https://formauto.call2arm.com"]
    
    # VPN設定
    NORDVPN_USERNAME: Optional[str] = None
    NORDVPN_PASSWORD: Optional[str] = None
    EXPRESSVPN_ACTIVATION_CODE: Optional[str] = None
    
    # CAPTCHA設定
    CAPSOLVER_API_KEY: Optional[str] = None
    TWOCAPTCHA_API_KEY: Optional[str] = None
    NEXTCAPTCHA_API_KEY: Optional[str] = None
    
    # Playwright設定
    PLAYWRIGHT_HEADLESS: bool = True
    PLAYWRIGHT_TIMEOUT: int = 30000
    
    # ログ設定
    LOG_LEVEL: str = "INFO"
    
    class Config:
        env_file = ".env"

settings = Settings()

4. データベースモデル

タスクモデル(app/models/task.py)

from sqlalchemy import Column, Integer, String, DateTime, Text, Enum, Boolean, ForeignKey
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
import uuid
import enum
from app.core.database import Base

class TaskStatus(str, enum.Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    PAUSED = "paused"

class TaskPriority(str, enum.Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"

class Task(Base):
    __tablename__ = "tasks"

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    name = Column(String(255), nullable=False, index=True)
    description = Column(Text)
    
    # Status and priority
    status = Column(Enum(TaskStatus), default=TaskStatus.PENDING, index=True)
    priority = Column(Enum(TaskPriority), default=TaskPriority.MEDIUM)
    
    # Configuration
    url_list_id = Column(UUID(as_uuid=True), ForeignKey("url_lists.id"))
    template_id = Column(UUID(as_uuid=True), ForeignKey("templates.id"))
    settings = Column(JSONB)  # VPN, CAPTCHA, automation settings
    
    # Execution info
    total_urls = Column(Integer, default=0)
    completed_urls = Column(Integer, default=0)
    successful_urls = Column(Integer, default=0)
    failed_urls = Column(Integer, default=0)
    
    # Timing
    scheduled_at = Column(DateTime(timezone=True))
    started_at = Column(DateTime(timezone=True))
    completed_at = Column(DateTime(timezone=True))
    estimated_completion = Column(DateTime(timezone=True))
    
    # Metadata
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), onupdate=func.now())
    created_by_id = Column(UUID(as_uuid=True), ForeignKey("users.id"))
    
    # Relationships
    url_list = relationship("URLList", back_populates="tasks")
    template = relationship("Template", back_populates="tasks")
    created_by = relationship("User", back_populates="tasks")
    execution_results = relationship("ExecutionResult", back_populates="task")

URLリストモデル(app/models/url_list.py)

from sqlalchemy import Column, String, Text, DateTime, Integer, ForeignKey
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
import uuid
from app.core.database import Base

class URLList(Base):
    __tablename__ = "url_lists"

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    name = Column(String(255), nullable=False, index=True)
    description = Column(Text)
    
    # URL data
    urls = Column(JSONB)  # List of URL objects with metadata
    total_count = Column(Integer, default=0)
    
    # File info
    original_filename = Column(String(255))
    file_size = Column(Integer)
    file_hash = Column(String(64))  # SHA256
    
    # Validation
    validation_status = Column(String(50), default="pending")
    validation_errors = Column(JSONB)
    
    # Metadata
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), onupdate=func.now())
    created_by_id = Column(UUID(as_uuid=True), ForeignKey("users.id"))
    
    # Relationships
    created_by = relationship("User", back_populates="url_lists")
    tasks = relationship("Task", back_populates="url_list")

5. APIエンドポイント

タスク管理API(app/api/v1/tasks.py)

from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.orm import Session
from typing import List, Optional
from uuid import UUID

from app.api.deps import get_current_user, get_db
from app.models.user import User
from app.models.task import Task, TaskStatus
from app.schemas.task import TaskCreate, TaskUpdate, TaskResponse, TaskList
from app.services.task_service import TaskService

router = APIRouter()

@router.get("/", response_model=TaskList)
async def list_tasks(
    skip: int = Query(0, ge=0),
    limit: int = Query(100, ge=1, le=1000),
    status: Optional[TaskStatus] = None,
    priority: Optional[str] = None,
    search: Optional[str] = None,
    db: Session = Depends(get_db),
    current_user: User = Depends(get_current_user)
):
    """タスク一覧取得"""
    service = TaskService(db)
    tasks, total = await service.list_tasks(
        user_id=current_user.id,
        skip=skip,
        limit=limit,
        status=status,
        priority=priority,
        search=search
    )
    
    return TaskList(
        items=tasks,
        total=total,
        skip=skip,
        limit=limit
    )

@router.post("/", response_model=TaskResponse)
async def create_task(
    task_data: TaskCreate,
    db: Session = Depends(get_db),
    current_user: User = Depends(get_current_user)
):
    """新規タスク作成"""
    service = TaskService(db)
    task = await service.create_task(task_data, current_user.id)
    return task

@router.get("/{task_id}", response_model=TaskResponse)
async def get_task(
    task_id: UUID,
    db: Session = Depends(get_db),
    current_user: User = Depends(get_current_user)
):
    """タスク詳細取得"""
    service = TaskService(db)
    task = await service.get_task(task_id, current_user.id)
    if not task:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="Task not found"
        )
    return task

@router.put("/{task_id}", response_model=TaskResponse)
async def update_task(
    task_id: UUID,
    task_data: TaskUpdate,
    db: Session = Depends(get_db),
    current_user: User = Depends(get_current_user)
):
    """タスク更新"""
    service = TaskService(db)
    task = await service.update_task(task_id, task_data, current_user.id)
    if not task:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="Task not found"
        )
    return task

@router.post("/{task_id}/start")
async def start_task(
    task_id: UUID,
    db: Session = Depends(get_db),
    current_user: User = Depends(get_current_user)
):
    """タスク開始"""
    service = TaskService(db)
    result = await service.start_task(task_id, current_user.id)
    return {"message": "Task started", "job_id": result.job_id}

@router.post("/{task_id}/pause")
async def pause_task(
    task_id: UUID,
    db: Session = Depends(get_db),
    current_user: User = Depends(get_current_user)
):
    """タスク一時停止"""
    service = TaskService(db)
    await service.pause_task(task_id, current_user.id)
    return {"message": "Task paused"}

@router.post("/{task_id}/resume")
async def resume_task(
    task_id: UUID,
    db: Session = Depends(get_db),
    current_user: User = Depends(get_current_user)
):
    """タスク再開"""
    service = TaskService(db)
    await service.resume_task(task_id, current_user.id)
    return {"message": "Task resumed"}

@router.delete("/{task_id}")
async def delete_task(
    task_id: UUID,
    db: Session = Depends(get_db),
    current_user: User = Depends(get_current_user)
):
    """タスク削除"""
    service = TaskService(db)
    await service.delete_task(task_id, current_user.id)
    return {"message": "Task deleted"}

6. Celeryワーカー(app/workers/form_automation.py)

from celery import Celery
from app.core.config import settings
from app.services.automation_service import AutomationService
from app.core.database import SessionLocal
import logging

logger = logging.getLogger(__name__)

celery_app = Celery(
    "form_automation",
    broker=settings.CELERY_BROKER_URL,
    backend=settings.CELERY_RESULT_BACKEND
)

@celery_app.task(bind=True)
def execute_form_automation(self, task_id: str):
    """フォーム自動化実行"""
    try:
        db = SessionLocal()
        service = AutomationService(db)
        
        # タスク実行
        result = service.execute_task(task_id)
        
        return {
            "status": "completed",
            "task_id": task_id,
            "result": result
        }
        
    except Exception as exc:
        logger.error(f"Task {task_id} failed: {exc}")
        self.retry(countdown=60, max_retries=3)
        
    finally:
        db.close()

@celery_app.task
def process_url_batch(task_id: str, url_batch: list):
    """URL バッチ処理"""
    try:
        db = SessionLocal()
        service = AutomationService(db)
        
        results = []
        for url_data in url_batch:
            result = service.process_single_url(task_id, url_data)
            results.append(result)
            
        return results
        
    except Exception as exc:
        logger.error(f"Batch processing failed: {exc}")
        raise
        
    finally:
        db.close()

7. 自動化サービス(app/services/automation_service.py)

from playwright.async_api import async_playwright
from sqlalchemy.orm import Session
from app.models.task import Task
from app.services.vpn_service import VPNService
from app.services.captcha_service import CaptchaService
import asyncio
import logging

logger = logging.getLogger(__name__)

class AutomationService:
    def __init__(self, db: Session):
        self.db = db
        self.vpn_service = VPNService()
        self.captcha_service = CaptchaService()

    async def execute_task(self, task_id: str):
        """タスク実行メイン"""
        task = self.db.query(Task).filter(Task.id == task_id).first()
        if not task:
            raise ValueError(f"Task {task_id} not found")

        try:
            # VPN接続
            if task.settings.get("use_vpn"):
                await self.vpn_service.connect(task.settings.get("vpn_config"))

            # Playwright起動
            async with async_playwright() as playwright:
                browser = await playwright.chromium.launch(
                    headless=settings.PLAYWRIGHT_HEADLESS
                )
                context = await browser.new_context()
                
                # URL処理
                results = []
                for url_data in task.url_list.urls:
                    try:
                        result = await self.process_single_url(
                            context, task, url_data
                        )
                        results.append(result)
                        
                        # VPNローテーション
                        if self._should_rotate_vpn(task, len(results)):
                            await self.vpn_service.rotate()
                            
                    except Exception as e:
                        logger.error(f"URL processing failed: {e}")
                        results.append({
                            "url": url_data["url"],
                            "status": "failed",
                            "error": str(e)
                        })

                await browser.close()
                
            return {
                "processed": len(results),
                "successful": len([r for r in results if r["status"] == "success"]),
                "failed": len([r for r in results if r["status"] == "failed"]),
                "results": results
            }

        except Exception as e:
            logger.error(f"Task execution failed: {e}")
            raise
        finally:
            # VPN切断
            if task.settings.get("use_vpn"):
                await self.vpn_service.disconnect()

    async def process_single_url(self, context, task, url_data):
        """単一URL処理"""
        page = await context.new_page()
        
        try:
            # ページアクセス
            await page.goto(url_data["url"], timeout=30000)
            
            # フォーム検出
            form_selector = await self._detect_form(page)
            if not form_selector:
                return {
                    "url": url_data["url"],
                    "status": "failed",
                    "error": "No form detected"
                }

            # フォーム入力
            await self._fill_form(page, task.template, url_data)
            
            # CAPTCHA処理
            if await self._has_captcha(page):
                await self._solve_captcha(page, task.settings)
            
            # 送信
            await self._submit_form(page, form_selector)
            
            # 結果確認
            success = await self._verify_submission(page)
            
            return {
                "url": url_data["url"],
                "status": "success" if success else "failed",
                "timestamp": datetime.utcnow().isoformat()
            }

        except Exception as e:
            logger.error(f"Single URL processing failed: {e}")
            return {
                "url": url_data["url"],
                "status": "failed",
                "error": str(e)
            }
        finally:
            await page.close()

✅ 完了条件

基盤セットアップ

  • Poetry プロジェクト初期化
  • 依存関係インストール完了
  • プロジェクト構造作成

データベース・モデル

  • SQLAlchemy モデル実装
  • Alembic マイグレーション設定
  • データベース初期化

API実装

  • FastAPI アプリケーション実装
  • 主要エンドポイント実装
  • 認証・認可機能実装

自動化機能

  • Celery ワーカー実装
  • Playwright 統合
  • VPN・CAPTCHA サービス基盤

動作確認

  • API サーバー起動確認
  • データベース接続確認
  • 基本API動作確認

🔄 次のステップ

バックエンド基盤完了後、子チケット4(自動化機能統合)に移行。


Claude Code実行プロンプト:

フォーム自動化システムのバックエンドAPI基盤を実装してください。FastAPI + SQLAlchemy + Celery構成で、タスク管理・データ管理・自動化実行の核となるシステムを構築してください。データベースモデル設計から API実装、Celeryワーカー設定まで段階的に進め、各ステップの完了を報告してください。

表示するデータがありません

他の形式にエクスポート: Atom PDF