操作
バグ #951
未完了【948-3】バックエンドAPI実装 - FastAPI/SQLAlchemy/Celery
ステータス:
新規
優先度:
急いで
担当者:
-
開始日:
2025-07-31
期日:
進捗率:
0%
予定工数:
説明
【子チケット3】バックエンドAPI実装¶
🎯 目的¶
FastAPI + SQLAlchemy + Celeryベースのバックエンドシステムを実装し、フォーム自動化の核となるAPI基盤を構築する。
📋 実装内容¶
1. バックエンド基盤セットアップ¶
cd /root/form-automation-system/backend
# Poetry プロジェクト初期化
poetry init --name form-automation-api --version 0.1.0 --python "^3.11"
# 基本依存関係インストール
poetry add \
fastapi \
uvicorn[standard] \
sqlalchemy \
alembic \
psycopg2-binary \
redis \
celery \
pydantic[email] \
python-multipart \
python-jose[cryptography] \
passlib[bcrypt] \
httpx \
aiofiles \
python-dotenv
# 開発依存関係
poetry add --group dev \
pytest \
pytest-asyncio \
pytest-cov \
black \
isort \
flake8 \
mypy \
pre-commit
2. プロジェクト構造¶
backend/
├── app/
│ ├── api/ # API エンドポイント
│ │ ├── v1/
│ │ │ ├── auth.py # 認証
│ │ │ ├── tasks.py # タスク管理
│ │ │ ├── data.py # データ管理
│ │ │ ├── settings.py # 設定
│ │ │ └── monitoring.py # 監視
│ │ └── deps.py # 依存関係
│ ├── core/ # コア機能
│ │ ├── config.py # 設定管理
│ │ ├── security.py # セキュリティ
│ │ ├── database.py # データベース
│ │ └── celery_app.py # Celery設定
│ ├── models/ # SQLAlchemyモデル
│ │ ├── user.py
│ │ ├── task.py
│ │ ├── url_list.py
│ │ ├── template.py
│ │ └── execution_result.py
│ ├── schemas/ # Pydanticスキーマ
│ │ ├── user.py
│ │ ├── task.py
│ │ ├── data.py
│ │ └── common.py
│ ├── services/ # ビジネスロジック
│ │ ├── task_service.py
│ │ ├── automation_service.py
│ │ ├── vpn_service.py
│ │ └── captcha_service.py
│ ├── workers/ # Celeryワーカー
│ │ ├── form_automation.py
│ │ ├── vpn_manager.py
│ │ └── monitoring.py
│ ├── utils/ # ユーティリティ
│ │ ├── playwright_utils.py
│ │ ├── vpn_utils.py
│ │ └── common.py
│ └── main.py # FastAPIアプリケーション
├── alembic/ # データベースマイグレーション
├── tests/ # テスト
├── Dockerfile
├── pyproject.toml
└── alembic.ini
3. 設定管理(app/core/config.py)¶
from pydantic_settings import BaseSettings
from typing import List, Optional
class Settings(BaseSettings):
# API設定
API_V1_STR: str = "/api/v1"
PROJECT_NAME: str = "Form Automation API"
VERSION: str = "1.0.0"
# セキュリティ
SECRET_KEY: str
ACCESS_TOKEN_EXPIRE_MINUTES: int = 60 * 24 * 8 # 8 days
ALGORITHM: str = "HS256"
# データベース
DATABASE_URL: str
# Redis
REDIS_URL: str = "redis://localhost:6379"
# Celery
CELERY_BROKER_URL: str = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND: str = "redis://localhost:6379/0"
# CORS
ALLOWED_ORIGINS: List[str] = ["https://formauto.call2arm.com"]
# VPN設定
NORDVPN_USERNAME: Optional[str] = None
NORDVPN_PASSWORD: Optional[str] = None
EXPRESSVPN_ACTIVATION_CODE: Optional[str] = None
# CAPTCHA設定
CAPSOLVER_API_KEY: Optional[str] = None
TWOCAPTCHA_API_KEY: Optional[str] = None
NEXTCAPTCHA_API_KEY: Optional[str] = None
# Playwright設定
PLAYWRIGHT_HEADLESS: bool = True
PLAYWRIGHT_TIMEOUT: int = 30000
# ログ設定
LOG_LEVEL: str = "INFO"
class Config:
env_file = ".env"
settings = Settings()
4. データベースモデル¶
タスクモデル(app/models/task.py)¶
from sqlalchemy import Column, Integer, String, DateTime, Text, Enum, Boolean, ForeignKey
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
import uuid
import enum
from app.core.database import Base
class TaskStatus(str, enum.Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
PAUSED = "paused"
class TaskPriority(str, enum.Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
class Task(Base):
__tablename__ = "tasks"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
name = Column(String(255), nullable=False, index=True)
description = Column(Text)
# Status and priority
status = Column(Enum(TaskStatus), default=TaskStatus.PENDING, index=True)
priority = Column(Enum(TaskPriority), default=TaskPriority.MEDIUM)
# Configuration
url_list_id = Column(UUID(as_uuid=True), ForeignKey("url_lists.id"))
template_id = Column(UUID(as_uuid=True), ForeignKey("templates.id"))
settings = Column(JSONB) # VPN, CAPTCHA, automation settings
# Execution info
total_urls = Column(Integer, default=0)
completed_urls = Column(Integer, default=0)
successful_urls = Column(Integer, default=0)
failed_urls = Column(Integer, default=0)
# Timing
scheduled_at = Column(DateTime(timezone=True))
started_at = Column(DateTime(timezone=True))
completed_at = Column(DateTime(timezone=True))
estimated_completion = Column(DateTime(timezone=True))
# Metadata
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
created_by_id = Column(UUID(as_uuid=True), ForeignKey("users.id"))
# Relationships
url_list = relationship("URLList", back_populates="tasks")
template = relationship("Template", back_populates="tasks")
created_by = relationship("User", back_populates="tasks")
execution_results = relationship("ExecutionResult", back_populates="task")
URLリストモデル(app/models/url_list.py)¶
from sqlalchemy import Column, String, Text, DateTime, Integer, ForeignKey
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
import uuid
from app.core.database import Base
class URLList(Base):
__tablename__ = "url_lists"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
name = Column(String(255), nullable=False, index=True)
description = Column(Text)
# URL data
urls = Column(JSONB) # List of URL objects with metadata
total_count = Column(Integer, default=0)
# File info
original_filename = Column(String(255))
file_size = Column(Integer)
file_hash = Column(String(64)) # SHA256
# Validation
validation_status = Column(String(50), default="pending")
validation_errors = Column(JSONB)
# Metadata
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
created_by_id = Column(UUID(as_uuid=True), ForeignKey("users.id"))
# Relationships
created_by = relationship("User", back_populates="url_lists")
tasks = relationship("Task", back_populates="url_list")
5. APIエンドポイント¶
タスク管理API(app/api/v1/tasks.py)¶
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.orm import Session
from typing import List, Optional
from uuid import UUID
from app.api.deps import get_current_user, get_db
from app.models.user import User
from app.models.task import Task, TaskStatus
from app.schemas.task import TaskCreate, TaskUpdate, TaskResponse, TaskList
from app.services.task_service import TaskService
router = APIRouter()
@router.get("/", response_model=TaskList)
async def list_tasks(
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=1000),
status: Optional[TaskStatus] = None,
priority: Optional[str] = None,
search: Optional[str] = None,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""タスク一覧取得"""
service = TaskService(db)
tasks, total = await service.list_tasks(
user_id=current_user.id,
skip=skip,
limit=limit,
status=status,
priority=priority,
search=search
)
return TaskList(
items=tasks,
total=total,
skip=skip,
limit=limit
)
@router.post("/", response_model=TaskResponse)
async def create_task(
task_data: TaskCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""新規タスク作成"""
service = TaskService(db)
task = await service.create_task(task_data, current_user.id)
return task
@router.get("/{task_id}", response_model=TaskResponse)
async def get_task(
task_id: UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""タスク詳細取得"""
service = TaskService(db)
task = await service.get_task(task_id, current_user.id)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found"
)
return task
@router.put("/{task_id}", response_model=TaskResponse)
async def update_task(
task_id: UUID,
task_data: TaskUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""タスク更新"""
service = TaskService(db)
task = await service.update_task(task_id, task_data, current_user.id)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found"
)
return task
@router.post("/{task_id}/start")
async def start_task(
task_id: UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""タスク開始"""
service = TaskService(db)
result = await service.start_task(task_id, current_user.id)
return {"message": "Task started", "job_id": result.job_id}
@router.post("/{task_id}/pause")
async def pause_task(
task_id: UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""タスク一時停止"""
service = TaskService(db)
await service.pause_task(task_id, current_user.id)
return {"message": "Task paused"}
@router.post("/{task_id}/resume")
async def resume_task(
task_id: UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""タスク再開"""
service = TaskService(db)
await service.resume_task(task_id, current_user.id)
return {"message": "Task resumed"}
@router.delete("/{task_id}")
async def delete_task(
task_id: UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""タスク削除"""
service = TaskService(db)
await service.delete_task(task_id, current_user.id)
return {"message": "Task deleted"}
6. Celeryワーカー(app/workers/form_automation.py)¶
from celery import Celery
from app.core.config import settings
from app.services.automation_service import AutomationService
from app.core.database import SessionLocal
import logging
logger = logging.getLogger(__name__)
celery_app = Celery(
"form_automation",
broker=settings.CELERY_BROKER_URL,
backend=settings.CELERY_RESULT_BACKEND
)
@celery_app.task(bind=True)
def execute_form_automation(self, task_id: str):
"""フォーム自動化実行"""
try:
db = SessionLocal()
service = AutomationService(db)
# タスク実行
result = service.execute_task(task_id)
return {
"status": "completed",
"task_id": task_id,
"result": result
}
except Exception as exc:
logger.error(f"Task {task_id} failed: {exc}")
self.retry(countdown=60, max_retries=3)
finally:
db.close()
@celery_app.task
def process_url_batch(task_id: str, url_batch: list):
"""URL バッチ処理"""
try:
db = SessionLocal()
service = AutomationService(db)
results = []
for url_data in url_batch:
result = service.process_single_url(task_id, url_data)
results.append(result)
return results
except Exception as exc:
logger.error(f"Batch processing failed: {exc}")
raise
finally:
db.close()
7. 自動化サービス(app/services/automation_service.py)¶
from playwright.async_api import async_playwright
from sqlalchemy.orm import Session
from app.models.task import Task
from app.services.vpn_service import VPNService
from app.services.captcha_service import CaptchaService
import asyncio
import logging
logger = logging.getLogger(__name__)
class AutomationService:
def __init__(self, db: Session):
self.db = db
self.vpn_service = VPNService()
self.captcha_service = CaptchaService()
async def execute_task(self, task_id: str):
"""タスク実行メイン"""
task = self.db.query(Task).filter(Task.id == task_id).first()
if not task:
raise ValueError(f"Task {task_id} not found")
try:
# VPN接続
if task.settings.get("use_vpn"):
await self.vpn_service.connect(task.settings.get("vpn_config"))
# Playwright起動
async with async_playwright() as playwright:
browser = await playwright.chromium.launch(
headless=settings.PLAYWRIGHT_HEADLESS
)
context = await browser.new_context()
# URL処理
results = []
for url_data in task.url_list.urls:
try:
result = await self.process_single_url(
context, task, url_data
)
results.append(result)
# VPNローテーション
if self._should_rotate_vpn(task, len(results)):
await self.vpn_service.rotate()
except Exception as e:
logger.error(f"URL processing failed: {e}")
results.append({
"url": url_data["url"],
"status": "failed",
"error": str(e)
})
await browser.close()
return {
"processed": len(results),
"successful": len([r for r in results if r["status"] == "success"]),
"failed": len([r for r in results if r["status"] == "failed"]),
"results": results
}
except Exception as e:
logger.error(f"Task execution failed: {e}")
raise
finally:
# VPN切断
if task.settings.get("use_vpn"):
await self.vpn_service.disconnect()
async def process_single_url(self, context, task, url_data):
"""単一URL処理"""
page = await context.new_page()
try:
# ページアクセス
await page.goto(url_data["url"], timeout=30000)
# フォーム検出
form_selector = await self._detect_form(page)
if not form_selector:
return {
"url": url_data["url"],
"status": "failed",
"error": "No form detected"
}
# フォーム入力
await self._fill_form(page, task.template, url_data)
# CAPTCHA処理
if await self._has_captcha(page):
await self._solve_captcha(page, task.settings)
# 送信
await self._submit_form(page, form_selector)
# 結果確認
success = await self._verify_submission(page)
return {
"url": url_data["url"],
"status": "success" if success else "failed",
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"Single URL processing failed: {e}")
return {
"url": url_data["url"],
"status": "failed",
"error": str(e)
}
finally:
await page.close()
✅ 完了条件¶
基盤セットアップ¶
- Poetry プロジェクト初期化
- 依存関係インストール完了
- プロジェクト構造作成
データベース・モデル¶
- SQLAlchemy モデル実装
- Alembic マイグレーション設定
- データベース初期化
API実装¶
- FastAPI アプリケーション実装
- 主要エンドポイント実装
- 認証・認可機能実装
自動化機能¶
- Celery ワーカー実装
- Playwright 統合
- VPN・CAPTCHA サービス基盤
動作確認¶
- API サーバー起動確認
- データベース接続確認
- 基本API動作確認
🔄 次のステップ¶
バックエンド基盤完了後、子チケット4(自動化機能統合)に移行。
Claude Code実行プロンプト:
フォーム自動化システムのバックエンドAPI基盤を実装してください。FastAPI + SQLAlchemy + Celery構成で、タスク管理・データ管理・自動化実行の核となるシステムを構築してください。データベースモデル設計から API実装、Celeryワーカー設定まで段階的に進め、各ステップの完了を報告してください。
表示するデータがありません
操作