プロジェクト

全般

プロフィール

バグ #953

未完了

【948-5】監視・運用機能実装 - ログ/アラート/バックアップ

Redmine Admin さんが2ヶ月前に追加.

ステータス:
新規
優先度:
通常
担当者:
-
開始日:
2025-07-31
期日:
進捗率:

0%

予定工数:

説明

【子チケット5】監視・運用機能実装

🎯 目的

本格運用に向けたシステム監視・ログ管理・運用自動化機能を実装し、安定したサービス運用基盤を確立する。

📋 実装内容

1. リアルタイム監視システム

WebSocket監視機能

# app/services/monitoring_service.py
from fastapi import WebSocket
from typing import Dict, List, Set
import json
import asyncio
import logging
from datetime import datetime

logger = logging.getLogger(__name__)

class MonitoringService:
    """リアルタイム監視サービス"""
    
    def __init__(self):
        self.active_connections: Set[WebSocket] = set()
        self.system_metrics = {}
        self.task_metrics = {}
        self._monitoring_active = False
    
    async def connect(self, websocket: WebSocket):
        """WebSocket接続"""
        await websocket.accept()
        self.active_connections.add(websocket)
        logger.info(f"Monitoring client connected: {len(self.active_connections)} total")
    
    def disconnect(self, websocket: WebSocket):
        """WebSocket切断"""
        self.active_connections.discard(websocket)
        logger.info(f"Monitoring client disconnected: {len(self.active_connections)} remaining")
    
    async def broadcast_metrics(self, data: Dict):
        """メトリクス配信"""
        if not self.active_connections:
            return
        
        message = json.dumps({
            "type": "metrics_update",
            "timestamp": datetime.utcnow().isoformat(),
            "data": data
        })
        
        # 切断されたコネクションを検出・削除
        disconnected = set()
        
        for connection in self.active_connections:
            try:
                await connection.send_text(message)
            except Exception as e:
                logger.warning(f"Failed to send to client: {e}")
                disconnected.add(connection)
        
        # 切断されたコネクションを削除
        self.active_connections -= disconnected
    
    async def start_monitoring(self):
        """監視開始"""
        if self._monitoring_active:
            return
        
        self._monitoring_active = True
        
        while self._monitoring_active:
            try:
                # システムメトリクス収集
                system_metrics = await self._collect_system_metrics()
                
                # タスクメトリクス収集
                task_metrics = await self._collect_task_metrics()
                
                # 統合メトリクス
                combined_metrics = {
                    "system": system_metrics,
                    "tasks": task_metrics,
                    "timestamp": datetime.utcnow().isoformat()
                }
                
                # ブロードキャスト
                await self.broadcast_metrics(combined_metrics)
                
                # 5秒間隔
                await asyncio.sleep(5)
                
            except Exception as e:
                logger.error(f"Monitoring error: {e}")
                await asyncio.sleep(10)
    
    async def _collect_system_metrics(self) -> Dict:
        """システムメトリクス収集"""
        import psutil
        
        return {
            "cpu_percent": psutil.cpu_percent(interval=1),
            "memory": {
                "total": psutil.virtual_memory().total,
                "available": psutil.virtual_memory().available,
                "percent": psutil.virtual_memory().percent
            },
            "disk": {
                "total": psutil.disk_usage('/').total,
                "free": psutil.disk_usage('/').free,
                "percent": psutil.disk_usage('/').percent
            },
            "network": {
                "bytes_sent": psutil.net_io_counters().bytes_sent,
                "bytes_recv": psutil.net_io_counters().bytes_recv
            }
        }
    
    async def _collect_task_metrics(self) -> Dict:
        """タスクメトリクス収集"""
        # データベースからタスク統計取得
        from app.core.database import SessionLocal
        from app.models.task import Task, TaskStatus
        
        db = SessionLocal()
        try:
            # 実行中タスク数
            running_count = db.query(Task).filter(Task.status == TaskStatus.RUNNING).count()
            
            # 今日の完了数
            today = datetime.utcnow().date()
            completed_today = db.query(Task).filter(
                Task.status == TaskStatus.COMPLETED,
                Task.completed_at >= today
            ).count()
            
            # 成功率計算
            total_completed = db.query(Task).filter(Task.status == TaskStatus.COMPLETED).count()
            successful_completed = db.query(Task).filter(
                Task.status == TaskStatus.COMPLETED,
                Task.successful_urls > 0
            ).count()
            
            success_rate = (successful_completed / total_completed * 100) if total_completed > 0 else 0
            
            return {
                "running_tasks": running_count,
                "completed_today": completed_today,
                "success_rate": round(success_rate, 1),
                "total_completed": total_completed
            }
            
        finally:
            db.close()

パフォーマンス監視

# app/services/performance_monitor.py
from typing import Dict, List
import time
import asyncio
from datetime import datetime, timedelta
from collections import deque
import logging

logger = logging.getLogger(__name__)

class PerformanceMonitor:
    """パフォーマンス監視"""
    
    def __init__(self, window_size: int = 100):
        self.response_times = deque(maxlen=window_size)
        self.error_counts = deque(maxlen=window_size)
        self.request_counts = deque(maxlen=window_size)
        self.start_time = time.time()
    
    def record_request(self, response_time: float, is_error: bool = False):
        """リクエスト記録"""
        timestamp = time.time()
        
        self.response_times.append({
            'timestamp': timestamp,
            'response_time': response_time
        })
        
        self.error_counts.append({
            'timestamp': timestamp,
            'is_error': is_error
        })
        
        self.request_counts.append({
            'timestamp': timestamp
        })
    
    def get_metrics(self) -> Dict:
        """メトリクス取得"""
        now = time.time()
        one_minute_ago = now - 60
        
        # 過去1分間のデータを抽出
        recent_response_times = [
            item['response_time'] for item in self.response_times
            if item['timestamp'] > one_minute_ago
        ]
        
        recent_errors = [
            item for item in self.error_counts
            if item['timestamp'] > one_minute_ago and item['is_error']
        ]
        
        recent_requests = [
            item for item in self.request_counts
            if item['timestamp'] > one_minute_ago
        ]
        
        # 統計計算
        avg_response_time = sum(recent_response_times) / len(recent_response_times) if recent_response_times else 0
        error_rate = len(recent_errors) / len(recent_requests) * 100 if recent_requests else 0
        requests_per_minute = len(recent_requests)
        
        return {
            "average_response_time": round(avg_response_time, 3),
            "error_rate": round(error_rate, 2),
            "requests_per_minute": requests_per_minute,
            "uptime": round(now - self.start_time, 2)
        }

2. ログ管理システム

構造化ログ

# app/core/logging.py
import logging
import json
from datetime import datetime
from typing import Dict, Any
import traceback

class StructuredLogger:
    """構造化ログ"""
    
    def __init__(self, name: str):
        self.logger = logging.getLogger(name)
        self.logger.setLevel(logging.INFO)
        
        # フォーマッター設定
        formatter = StructuredFormatter()
        
        # ハンドラー設定
        handler = logging.StreamHandler()
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)
    
    def info(self, message: str, extra: Dict[str, Any] = None):
        """情報ログ"""
        self._log(logging.INFO, message, extra)
    
    def warning(self, message: str, extra: Dict[str, Any] = None):
        """警告ログ"""
        self._log(logging.WARNING, message, extra)
    
    def error(self, message: str, extra: Dict[str, Any] = None, exc_info: bool = False):
        """エラーログ"""
        if exc_info:
            extra = extra or {}
            extra['traceback'] = traceback.format_exc()
        self._log(logging.ERROR, message, extra)
    
    def _log(self, level: int, message: str, extra: Dict[str, Any] = None):
        """ログ出力"""
        log_data = {
            'timestamp': datetime.utcnow().isoformat(),
            'level': logging.getLevelName(level),
            'message': message,
            'extra': extra or {}
        }
        
        self.logger.log(level, json.dumps(log_data))

class StructuredFormatter(logging.Formatter):
    """構造化フォーマッター"""
    
    def format(self, record):
        return record.getMessage()

# ログインスタンス作成
def get_logger(name: str) -> StructuredLogger:
    return StructuredLogger(name)

監査ログ

# app/services/audit_service.py
from sqlalchemy import Column, String, DateTime, Text, Integer, ForeignKey
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.sql import func
import uuid
from app.core.database import Base
from typing import Dict, Optional

class AuditLog(Base):
    """監査ログモデル"""
    __tablename__ = "audit_logs"

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    user_id = Column(UUID(as_uuid=True), ForeignKey("users.id"))
    action = Column(String(100), nullable=False, index=True)
    resource_type = Column(String(50), nullable=False)
    resource_id = Column(String(100))
    details = Column(JSONB)
    ip_address = Column(String(45))
    user_agent = Column(Text)
    timestamp = Column(DateTime(timezone=True), server_default=func.now(), index=True)

class AuditService:
    """監査サービス"""
    
    def __init__(self, db):
        self.db = db
    
    def log_action(
        self,
        user_id: str,
        action: str,
        resource_type: str,
        resource_id: str = None,
        details: Dict = None,
        ip_address: str = None,
        user_agent: str = None
    ):
        """アクション記録"""
        audit_log = AuditLog(
            user_id=user_id,
            action=action,
            resource_type=resource_type,
            resource_id=resource_id,
            details=details,
            ip_address=ip_address,
            user_agent=user_agent
        )
        
        self.db.add(audit_log)
        self.db.commit()
    
    def get_user_activity(self, user_id: str, limit: int = 100):
        """ユーザー活動取得"""
        return self.db.query(AuditLog).filter(
            AuditLog.user_id == user_id
        ).order_by(AuditLog.timestamp.desc()).limit(limit).all()
    
    def get_resource_history(self, resource_type: str, resource_id: str):
        """リソース履歴取得"""
        return self.db.query(AuditLog).filter(
            AuditLog.resource_type == resource_type,
            AuditLog.resource_id == resource_id
        ).order_by(AuditLog.timestamp.desc()).all()

3. アラート・通知システム

アラート管理

# app/services/alert_service.py
from enum import Enum
from typing import Dict, List, Callable
import asyncio
import logging
from datetime import datetime

logger = logging.getLogger(__name__)

class AlertLevel(Enum):
    INFO = "info"
    WARNING = "warning"
    ERROR = "error"
    CRITICAL = "critical"

class AlertService:
    """アラートサービス"""
    
    def __init__(self):
        self.handlers: Dict[AlertLevel, List[Callable]] = {
            AlertLevel.INFO: [],
            AlertLevel.WARNING: [],
            AlertLevel.ERROR: [],
            AlertLevel.CRITICAL: []
        }
        self.alert_history = []
    
    def register_handler(self, level: AlertLevel, handler: Callable):
        """アラートハンドラー登録"""
        self.handlers[level].append(handler)
    
    async def send_alert(
        self,
        level: AlertLevel,
        title: str,
        message: str,
        details: Dict = None
    ):
        """アラート送信"""
        alert_data = {
            "level": level.value,
            "title": title,
            "message": message,
            "details": details or {},
            "timestamp": datetime.utcnow().isoformat()
        }
        
        # 履歴保存
        self.alert_history.append(alert_data)
        
        # ハンドラー実行
        handlers = self.handlers.get(level, [])
        for handler in handlers:
            try:
                await handler(alert_data)
            except Exception as e:
                logger.error(f"Alert handler failed: {e}")

# Slack通知ハンドラー
class SlackNotificationHandler:
    """Slack通知ハンドラー"""
    
    def __init__(self, webhook_url: str):
        self.webhook_url = webhook_url
    
    async def __call__(self, alert_data: Dict):
        """Slack通知送信"""
        import httpx
        
        color_map = {
            "info": "#36a64f",
            "warning": "#ff9500",
            "error": "#ff0000",
            "critical": "#8b0000"
        }
        
        payload = {
            "attachments": [{
                "color": color_map.get(alert_data["level"], "#808080"),
                "title": alert_data["title"],
                "text": alert_data["message"],
                "fields": [
                    {
                        "title": "Level",
                        "value": alert_data["level"].upper(),
                        "short": True
                    },
                    {
                        "title": "Time",
                        "value": alert_data["timestamp"],
                        "short": True
                    }
                ]
            }]
        }
        
        async with httpx.AsyncClient() as client:
            await client.post(self.webhook_url, json=payload)

# メール通知ハンドラー
class EmailNotificationHandler:
    """メール通知ハンドラー"""
    
    def __init__(self, smtp_config: Dict):
        self.smtp_config = smtp_config
    
    async def __call__(self, alert_data: Dict):
        """メール通知送信"""
        import smtplib
        from email.mime.text import MIMEText
        from email.mime.multipart import MIMEMultipart
        
        msg = MIMEMultipart()
        msg['From'] = self.smtp_config['from']
        msg['To'] = self.smtp_config['to']
        msg['Subject'] = f"[{alert_data['level'].upper()}] {alert_data['title']}"
        
        body = f"""
        Alert Level: {alert_data['level'].upper()}
        Time: {alert_data['timestamp']}
        Message: {alert_data['message']}
        
        Details: {alert_data.get('details', 'None')}
        """
        
        msg.attach(MIMEText(body, 'plain'))
        
        # SMTP送信
        with smtplib.SMTP(self.smtp_config['host'], self.smtp_config['port']) as server:
            if self.smtp_config.get('use_tls'):
                server.starttls()
            if self.smtp_config.get('username'):
                server.login(self.smtp_config['username'], self.smtp_config['password'])
            server.send_message(msg)

4. 自動バックアップ・復旧

バックアップサービス

# app/services/backup_service.py
import asyncio
import subprocess
import tarfile
import gzip
import shutil
from datetime import datetime, timedelta
from pathlib import Path
import logging

logger = logging.getLogger(__name__)

class BackupService:
    """バックアップサービス"""
    
    def __init__(self, backup_config: Dict):
        self.config = backup_config
        self.backup_dir = Path(backup_config['backup_directory'])
        self.backup_dir.mkdir(exist_ok=True)
    
    async def create_full_backup(self) -> str:
        """フルバックアップ作成"""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        backup_name = f"full_backup_{timestamp}"
        backup_path = self.backup_dir / f"{backup_name}.tar.gz"
        
        try:
            # データベースバックアップ
            db_backup_path = await self._backup_database(backup_name)
            
            # ファイルシステムバックアップ
            files_backup_path = await self._backup_files(backup_name)
            
            # 統合アーカイブ作成
            with tarfile.open(backup_path, "w:gz") as tar:
                tar.add(db_backup_path, arcname="database.sql")
                tar.add(files_backup_path, arcname="files.tar")
            
            # 一時ファイル削除
            db_backup_path.unlink()
            files_backup_path.unlink()
            
            logger.info(f"Full backup created: {backup_path}")
            return str(backup_path)
            
        except Exception as e:
            logger.error(f"Backup creation failed: {e}")
            raise
    
    async def _backup_database(self, backup_name: str) -> Path:
        """データベースバックアップ"""
        backup_path = self.backup_dir / f"{backup_name}_db.sql"
        
        # PostgreSQL dump
        cmd = [
            'pg_dump',
            '--host', self.config['db_host'],
            '--port', str(self.config['db_port']),
            '--username', self.config['db_username'],
            '--dbname', self.config['db_name'],
            '--no-password',
            '--file', str(backup_path)
        ]
        
        env = {'PGPASSWORD': self.config['db_password']}
        
        process = await asyncio.create_subprocess_exec(
            *cmd,
            env=env,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )
        
        stdout, stderr = await process.communicate()
        
        if process.returncode != 0:
            raise Exception(f"Database backup failed: {stderr.decode()}")
        
        return backup_path
    
    async def _backup_files(self, backup_name: str) -> Path:
        """ファイルバックアップ"""
        backup_path = self.backup_dir / f"{backup_name}_files.tar"
        
        with tarfile.open(backup_path, "w") as tar:
            # アップロードファイル
            upload_dir = Path(self.config['upload_directory'])
            if upload_dir.exists():
                tar.add(upload_dir, arcname="uploads")
            
            # 設定ファイル
            config_dir = Path(self.config['config_directory'])
            if config_dir.exists():
                tar.add(config_dir, arcname="config")
        
        return backup_path
    
    async def cleanup_old_backups(self, retention_days: int = 30):
        """古いバックアップ削除"""
        cutoff_date = datetime.now() - timedelta(days=retention_days)
        
        for backup_file in self.backup_dir.glob("*.tar.gz"):
            if backup_file.stat().st_mtime < cutoff_date.timestamp():
                backup_file.unlink()
                logger.info(f"Deleted old backup: {backup_file}")
    
    async def restore_from_backup(self, backup_path: str):
        """バックアップから復旧"""
        backup_file = Path(backup_path)
        if not backup_file.exists():
            raise FileNotFoundError(f"Backup file not found: {backup_path}")
        
        temp_dir = self.backup_dir / "restore_temp"
        temp_dir.mkdir(exist_ok=True)
        
        try:
            # アーカイブ展開
            with tarfile.open(backup_file, "r:gz") as tar:
                tar.extractall(temp_dir)
            
            # データベース復旧
            await self._restore_database(temp_dir / "database.sql")
            
            # ファイル復旧
            await self._restore_files(temp_dir / "files.tar")
            
            logger.info(f"Restore completed from: {backup_path}")
            
        except Exception as e:
            logger.error(f"Restore failed: {e}")
            raise
        finally:
            # 一時ディレクトリ削除
            shutil.rmtree(temp_dir, ignore_errors=True)

5. ヘルスチェック・診断

システムヘルスチェック

# app/services/health_service.py
from typing import Dict, List
import asyncio
import httpx
import psutil
from datetime import datetime
import logging

logger = logging.getLogger(__name__)

class HealthService:
    """ヘルスチェックサービス"""
    
    def __init__(self):
        self.checks = []
    
    def register_check(self, name: str, check_func):
        """ヘルスチェック関数登録"""
        self.checks.append({
            'name': name,
            'func': check_func
        })
    
    async def run_all_checks(self) -> Dict:
        """全ヘルスチェック実行"""
        results = {
            'overall_status': 'healthy',
            'timestamp': datetime.utcnow().isoformat(),
            'checks': {}
        }
        
        for check in self.checks:
            try:
                result = await check['func']()
                results['checks'][check['name']] = {
                    'status': 'healthy' if result['healthy'] else 'unhealthy',
                    'details': result.get('details', {}),
                    'response_time': result.get('response_time', 0)
                }
                
                if not result['healthy']:
                    results['overall_status'] = 'unhealthy'
                    
            except Exception as e:
                results['checks'][check['name']] = {
                    'status': 'error',
                    'error': str(e),
                    'response_time': 0
                }
                results['overall_status'] = 'unhealthy'
        
        return results

# 基本ヘルスチェック関数
async def database_health_check() -> Dict:
    """データベースヘルスチェック"""
    from app.core.database import SessionLocal
    
    start_time = time.time()
    
    try:
        db = SessionLocal()
        # 簡単なクエリ実行
        db.execute("SELECT 1")
        db.close()
        
        response_time = time.time() - start_time
        
        return {
            'healthy': True,
            'response_time': response_time,
            'details': {'connection': 'successful'}
        }
        
    except Exception as e:
        return {
            'healthy': False,
            'response_time': time.time() - start_time,
            'details': {'error': str(e)}
        }

async def redis_health_check() -> Dict:
    """Redisヘルスチェック"""
    import redis
    
    start_time = time.time()
    
    try:
        client = redis.Redis.from_url(settings.REDIS_URL)
        client.ping()
        
        response_time = time.time() - start_time
        
        return {
            'healthy': True,
            'response_time': response_time,
            'details': {'ping': 'successful'}
        }
        
    except Exception as e:
        return {
            'healthy': False,
            'response_time': time.time() - start_time,
            'details': {'error': str(e)}
        }

async def system_resources_check() -> Dict:
    """システムリソースチェック"""
    cpu_percent = psutil.cpu_percent(interval=1)
    memory = psutil.virtual_memory()
    disk = psutil.disk_usage('/')
    
    # 閾値チェック
    healthy = (
        cpu_percent < 80 and
        memory.percent < 85 and
        disk.percent < 90
    )
    
    return {
        'healthy': healthy,
        'details': {
            'cpu_percent': cpu_percent,
            'memory_percent': memory.percent,
            'disk_percent': disk.percent
        }
    }

✅ 完了条件

監視システム

  • リアルタイム監視実装
  • WebSocket配信機能
  • パフォーマンス監視

ログ管理

  • 構造化ログ実装
  • 監査ログ機能
  • ログ検索・分析

アラート・通知

  • アラートシステム実装
  • Slack/メール通知
  • エスカレーション機能

バックアップ・復旧

  • 自動バックアップ実装
  • 復旧機能実装
  • スケジュール実行

ヘルスチェック

  • システムヘルスチェック
  • 外部依存性確認
  • 自動診断機能

🔄 プロジェクト完了

全子チケット完了により、formauto.call2arm.comフォーム自動化システムの実装完了。


Claude Code実行プロンプト:

フォーム自動化システムの監視・運用機能を実装してください。リアルタイム監視・ログ管理・アラート通知・バックアップ・ヘルスチェック機能により、本格運用に耐える安定したシステムを完成させてください。段階的に実装し、運用準備を整えてください。

表示するデータがありません

他の形式にエクスポート: Atom PDF