操作
バグ #953
未完了【948-5】監視・運用機能実装 - ログ/アラート/バックアップ
ステータス:
新規
優先度:
通常
担当者:
-
開始日:
2025-07-31
期日:
進捗率:
0%
予定工数:
説明
【子チケット5】監視・運用機能実装¶
🎯 目的¶
本格運用に向けたシステム監視・ログ管理・運用自動化機能を実装し、安定したサービス運用基盤を確立する。
📋 実装内容¶
1. リアルタイム監視システム¶
WebSocket監視機能¶
# app/services/monitoring_service.py
from fastapi import WebSocket
from typing import Dict, List, Set
import json
import asyncio
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
class MonitoringService:
"""リアルタイム監視サービス"""
def __init__(self):
self.active_connections: Set[WebSocket] = set()
self.system_metrics = {}
self.task_metrics = {}
self._monitoring_active = False
async def connect(self, websocket: WebSocket):
"""WebSocket接続"""
await websocket.accept()
self.active_connections.add(websocket)
logger.info(f"Monitoring client connected: {len(self.active_connections)} total")
def disconnect(self, websocket: WebSocket):
"""WebSocket切断"""
self.active_connections.discard(websocket)
logger.info(f"Monitoring client disconnected: {len(self.active_connections)} remaining")
async def broadcast_metrics(self, data: Dict):
"""メトリクス配信"""
if not self.active_connections:
return
message = json.dumps({
"type": "metrics_update",
"timestamp": datetime.utcnow().isoformat(),
"data": data
})
# 切断されたコネクションを検出・削除
disconnected = set()
for connection in self.active_connections:
try:
await connection.send_text(message)
except Exception as e:
logger.warning(f"Failed to send to client: {e}")
disconnected.add(connection)
# 切断されたコネクションを削除
self.active_connections -= disconnected
async def start_monitoring(self):
"""監視開始"""
if self._monitoring_active:
return
self._monitoring_active = True
while self._monitoring_active:
try:
# システムメトリクス収集
system_metrics = await self._collect_system_metrics()
# タスクメトリクス収集
task_metrics = await self._collect_task_metrics()
# 統合メトリクス
combined_metrics = {
"system": system_metrics,
"tasks": task_metrics,
"timestamp": datetime.utcnow().isoformat()
}
# ブロードキャスト
await self.broadcast_metrics(combined_metrics)
# 5秒間隔
await asyncio.sleep(5)
except Exception as e:
logger.error(f"Monitoring error: {e}")
await asyncio.sleep(10)
async def _collect_system_metrics(self) -> Dict:
"""システムメトリクス収集"""
import psutil
return {
"cpu_percent": psutil.cpu_percent(interval=1),
"memory": {
"total": psutil.virtual_memory().total,
"available": psutil.virtual_memory().available,
"percent": psutil.virtual_memory().percent
},
"disk": {
"total": psutil.disk_usage('/').total,
"free": psutil.disk_usage('/').free,
"percent": psutil.disk_usage('/').percent
},
"network": {
"bytes_sent": psutil.net_io_counters().bytes_sent,
"bytes_recv": psutil.net_io_counters().bytes_recv
}
}
async def _collect_task_metrics(self) -> Dict:
"""タスクメトリクス収集"""
# データベースからタスク統計取得
from app.core.database import SessionLocal
from app.models.task import Task, TaskStatus
db = SessionLocal()
try:
# 実行中タスク数
running_count = db.query(Task).filter(Task.status == TaskStatus.RUNNING).count()
# 今日の完了数
today = datetime.utcnow().date()
completed_today = db.query(Task).filter(
Task.status == TaskStatus.COMPLETED,
Task.completed_at >= today
).count()
# 成功率計算
total_completed = db.query(Task).filter(Task.status == TaskStatus.COMPLETED).count()
successful_completed = db.query(Task).filter(
Task.status == TaskStatus.COMPLETED,
Task.successful_urls > 0
).count()
success_rate = (successful_completed / total_completed * 100) if total_completed > 0 else 0
return {
"running_tasks": running_count,
"completed_today": completed_today,
"success_rate": round(success_rate, 1),
"total_completed": total_completed
}
finally:
db.close()
パフォーマンス監視¶
# app/services/performance_monitor.py
from typing import Dict, List
import time
import asyncio
from datetime import datetime, timedelta
from collections import deque
import logging
logger = logging.getLogger(__name__)
class PerformanceMonitor:
"""パフォーマンス監視"""
def __init__(self, window_size: int = 100):
self.response_times = deque(maxlen=window_size)
self.error_counts = deque(maxlen=window_size)
self.request_counts = deque(maxlen=window_size)
self.start_time = time.time()
def record_request(self, response_time: float, is_error: bool = False):
"""リクエスト記録"""
timestamp = time.time()
self.response_times.append({
'timestamp': timestamp,
'response_time': response_time
})
self.error_counts.append({
'timestamp': timestamp,
'is_error': is_error
})
self.request_counts.append({
'timestamp': timestamp
})
def get_metrics(self) -> Dict:
"""メトリクス取得"""
now = time.time()
one_minute_ago = now - 60
# 過去1分間のデータを抽出
recent_response_times = [
item['response_time'] for item in self.response_times
if item['timestamp'] > one_minute_ago
]
recent_errors = [
item for item in self.error_counts
if item['timestamp'] > one_minute_ago and item['is_error']
]
recent_requests = [
item for item in self.request_counts
if item['timestamp'] > one_minute_ago
]
# 統計計算
avg_response_time = sum(recent_response_times) / len(recent_response_times) if recent_response_times else 0
error_rate = len(recent_errors) / len(recent_requests) * 100 if recent_requests else 0
requests_per_minute = len(recent_requests)
return {
"average_response_time": round(avg_response_time, 3),
"error_rate": round(error_rate, 2),
"requests_per_minute": requests_per_minute,
"uptime": round(now - self.start_time, 2)
}
2. ログ管理システム¶
構造化ログ¶
# app/core/logging.py
import logging
import json
from datetime import datetime
from typing import Dict, Any
import traceback
class StructuredLogger:
"""構造化ログ"""
def __init__(self, name: str):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.INFO)
# フォーマッター設定
formatter = StructuredFormatter()
# ハンドラー設定
handler = logging.StreamHandler()
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def info(self, message: str, extra: Dict[str, Any] = None):
"""情報ログ"""
self._log(logging.INFO, message, extra)
def warning(self, message: str, extra: Dict[str, Any] = None):
"""警告ログ"""
self._log(logging.WARNING, message, extra)
def error(self, message: str, extra: Dict[str, Any] = None, exc_info: bool = False):
"""エラーログ"""
if exc_info:
extra = extra or {}
extra['traceback'] = traceback.format_exc()
self._log(logging.ERROR, message, extra)
def _log(self, level: int, message: str, extra: Dict[str, Any] = None):
"""ログ出力"""
log_data = {
'timestamp': datetime.utcnow().isoformat(),
'level': logging.getLevelName(level),
'message': message,
'extra': extra or {}
}
self.logger.log(level, json.dumps(log_data))
class StructuredFormatter(logging.Formatter):
"""構造化フォーマッター"""
def format(self, record):
return record.getMessage()
# ログインスタンス作成
def get_logger(name: str) -> StructuredLogger:
return StructuredLogger(name)
監査ログ¶
# app/services/audit_service.py
from sqlalchemy import Column, String, DateTime, Text, Integer, ForeignKey
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.sql import func
import uuid
from app.core.database import Base
from typing import Dict, Optional
class AuditLog(Base):
"""監査ログモデル"""
__tablename__ = "audit_logs"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
user_id = Column(UUID(as_uuid=True), ForeignKey("users.id"))
action = Column(String(100), nullable=False, index=True)
resource_type = Column(String(50), nullable=False)
resource_id = Column(String(100))
details = Column(JSONB)
ip_address = Column(String(45))
user_agent = Column(Text)
timestamp = Column(DateTime(timezone=True), server_default=func.now(), index=True)
class AuditService:
"""監査サービス"""
def __init__(self, db):
self.db = db
def log_action(
self,
user_id: str,
action: str,
resource_type: str,
resource_id: str = None,
details: Dict = None,
ip_address: str = None,
user_agent: str = None
):
"""アクション記録"""
audit_log = AuditLog(
user_id=user_id,
action=action,
resource_type=resource_type,
resource_id=resource_id,
details=details,
ip_address=ip_address,
user_agent=user_agent
)
self.db.add(audit_log)
self.db.commit()
def get_user_activity(self, user_id: str, limit: int = 100):
"""ユーザー活動取得"""
return self.db.query(AuditLog).filter(
AuditLog.user_id == user_id
).order_by(AuditLog.timestamp.desc()).limit(limit).all()
def get_resource_history(self, resource_type: str, resource_id: str):
"""リソース履歴取得"""
return self.db.query(AuditLog).filter(
AuditLog.resource_type == resource_type,
AuditLog.resource_id == resource_id
).order_by(AuditLog.timestamp.desc()).all()
3. アラート・通知システム¶
アラート管理¶
# app/services/alert_service.py
from enum import Enum
from typing import Dict, List, Callable
import asyncio
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
class AlertLevel(Enum):
INFO = "info"
WARNING = "warning"
ERROR = "error"
CRITICAL = "critical"
class AlertService:
"""アラートサービス"""
def __init__(self):
self.handlers: Dict[AlertLevel, List[Callable]] = {
AlertLevel.INFO: [],
AlertLevel.WARNING: [],
AlertLevel.ERROR: [],
AlertLevel.CRITICAL: []
}
self.alert_history = []
def register_handler(self, level: AlertLevel, handler: Callable):
"""アラートハンドラー登録"""
self.handlers[level].append(handler)
async def send_alert(
self,
level: AlertLevel,
title: str,
message: str,
details: Dict = None
):
"""アラート送信"""
alert_data = {
"level": level.value,
"title": title,
"message": message,
"details": details or {},
"timestamp": datetime.utcnow().isoformat()
}
# 履歴保存
self.alert_history.append(alert_data)
# ハンドラー実行
handlers = self.handlers.get(level, [])
for handler in handlers:
try:
await handler(alert_data)
except Exception as e:
logger.error(f"Alert handler failed: {e}")
# Slack通知ハンドラー
class SlackNotificationHandler:
"""Slack通知ハンドラー"""
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
async def __call__(self, alert_data: Dict):
"""Slack通知送信"""
import httpx
color_map = {
"info": "#36a64f",
"warning": "#ff9500",
"error": "#ff0000",
"critical": "#8b0000"
}
payload = {
"attachments": [{
"color": color_map.get(alert_data["level"], "#808080"),
"title": alert_data["title"],
"text": alert_data["message"],
"fields": [
{
"title": "Level",
"value": alert_data["level"].upper(),
"short": True
},
{
"title": "Time",
"value": alert_data["timestamp"],
"short": True
}
]
}]
}
async with httpx.AsyncClient() as client:
await client.post(self.webhook_url, json=payload)
# メール通知ハンドラー
class EmailNotificationHandler:
"""メール通知ハンドラー"""
def __init__(self, smtp_config: Dict):
self.smtp_config = smtp_config
async def __call__(self, alert_data: Dict):
"""メール通知送信"""
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
msg = MIMEMultipart()
msg['From'] = self.smtp_config['from']
msg['To'] = self.smtp_config['to']
msg['Subject'] = f"[{alert_data['level'].upper()}] {alert_data['title']}"
body = f"""
Alert Level: {alert_data['level'].upper()}
Time: {alert_data['timestamp']}
Message: {alert_data['message']}
Details: {alert_data.get('details', 'None')}
"""
msg.attach(MIMEText(body, 'plain'))
# SMTP送信
with smtplib.SMTP(self.smtp_config['host'], self.smtp_config['port']) as server:
if self.smtp_config.get('use_tls'):
server.starttls()
if self.smtp_config.get('username'):
server.login(self.smtp_config['username'], self.smtp_config['password'])
server.send_message(msg)
4. 自動バックアップ・復旧¶
バックアップサービス¶
# app/services/backup_service.py
import asyncio
import subprocess
import tarfile
import gzip
import shutil
from datetime import datetime, timedelta
from pathlib import Path
import logging
logger = logging.getLogger(__name__)
class BackupService:
"""バックアップサービス"""
def __init__(self, backup_config: Dict):
self.config = backup_config
self.backup_dir = Path(backup_config['backup_directory'])
self.backup_dir.mkdir(exist_ok=True)
async def create_full_backup(self) -> str:
"""フルバックアップ作成"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_name = f"full_backup_{timestamp}"
backup_path = self.backup_dir / f"{backup_name}.tar.gz"
try:
# データベースバックアップ
db_backup_path = await self._backup_database(backup_name)
# ファイルシステムバックアップ
files_backup_path = await self._backup_files(backup_name)
# 統合アーカイブ作成
with tarfile.open(backup_path, "w:gz") as tar:
tar.add(db_backup_path, arcname="database.sql")
tar.add(files_backup_path, arcname="files.tar")
# 一時ファイル削除
db_backup_path.unlink()
files_backup_path.unlink()
logger.info(f"Full backup created: {backup_path}")
return str(backup_path)
except Exception as e:
logger.error(f"Backup creation failed: {e}")
raise
async def _backup_database(self, backup_name: str) -> Path:
"""データベースバックアップ"""
backup_path = self.backup_dir / f"{backup_name}_db.sql"
# PostgreSQL dump
cmd = [
'pg_dump',
'--host', self.config['db_host'],
'--port', str(self.config['db_port']),
'--username', self.config['db_username'],
'--dbname', self.config['db_name'],
'--no-password',
'--file', str(backup_path)
]
env = {'PGPASSWORD': self.config['db_password']}
process = await asyncio.create_subprocess_exec(
*cmd,
env=env,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
raise Exception(f"Database backup failed: {stderr.decode()}")
return backup_path
async def _backup_files(self, backup_name: str) -> Path:
"""ファイルバックアップ"""
backup_path = self.backup_dir / f"{backup_name}_files.tar"
with tarfile.open(backup_path, "w") as tar:
# アップロードファイル
upload_dir = Path(self.config['upload_directory'])
if upload_dir.exists():
tar.add(upload_dir, arcname="uploads")
# 設定ファイル
config_dir = Path(self.config['config_directory'])
if config_dir.exists():
tar.add(config_dir, arcname="config")
return backup_path
async def cleanup_old_backups(self, retention_days: int = 30):
"""古いバックアップ削除"""
cutoff_date = datetime.now() - timedelta(days=retention_days)
for backup_file in self.backup_dir.glob("*.tar.gz"):
if backup_file.stat().st_mtime < cutoff_date.timestamp():
backup_file.unlink()
logger.info(f"Deleted old backup: {backup_file}")
async def restore_from_backup(self, backup_path: str):
"""バックアップから復旧"""
backup_file = Path(backup_path)
if not backup_file.exists():
raise FileNotFoundError(f"Backup file not found: {backup_path}")
temp_dir = self.backup_dir / "restore_temp"
temp_dir.mkdir(exist_ok=True)
try:
# アーカイブ展開
with tarfile.open(backup_file, "r:gz") as tar:
tar.extractall(temp_dir)
# データベース復旧
await self._restore_database(temp_dir / "database.sql")
# ファイル復旧
await self._restore_files(temp_dir / "files.tar")
logger.info(f"Restore completed from: {backup_path}")
except Exception as e:
logger.error(f"Restore failed: {e}")
raise
finally:
# 一時ディレクトリ削除
shutil.rmtree(temp_dir, ignore_errors=True)
5. ヘルスチェック・診断¶
システムヘルスチェック¶
# app/services/health_service.py
from typing import Dict, List
import asyncio
import httpx
import psutil
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class HealthService:
"""ヘルスチェックサービス"""
def __init__(self):
self.checks = []
def register_check(self, name: str, check_func):
"""ヘルスチェック関数登録"""
self.checks.append({
'name': name,
'func': check_func
})
async def run_all_checks(self) -> Dict:
"""全ヘルスチェック実行"""
results = {
'overall_status': 'healthy',
'timestamp': datetime.utcnow().isoformat(),
'checks': {}
}
for check in self.checks:
try:
result = await check['func']()
results['checks'][check['name']] = {
'status': 'healthy' if result['healthy'] else 'unhealthy',
'details': result.get('details', {}),
'response_time': result.get('response_time', 0)
}
if not result['healthy']:
results['overall_status'] = 'unhealthy'
except Exception as e:
results['checks'][check['name']] = {
'status': 'error',
'error': str(e),
'response_time': 0
}
results['overall_status'] = 'unhealthy'
return results
# 基本ヘルスチェック関数
async def database_health_check() -> Dict:
"""データベースヘルスチェック"""
from app.core.database import SessionLocal
start_time = time.time()
try:
db = SessionLocal()
# 簡単なクエリ実行
db.execute("SELECT 1")
db.close()
response_time = time.time() - start_time
return {
'healthy': True,
'response_time': response_time,
'details': {'connection': 'successful'}
}
except Exception as e:
return {
'healthy': False,
'response_time': time.time() - start_time,
'details': {'error': str(e)}
}
async def redis_health_check() -> Dict:
"""Redisヘルスチェック"""
import redis
start_time = time.time()
try:
client = redis.Redis.from_url(settings.REDIS_URL)
client.ping()
response_time = time.time() - start_time
return {
'healthy': True,
'response_time': response_time,
'details': {'ping': 'successful'}
}
except Exception as e:
return {
'healthy': False,
'response_time': time.time() - start_time,
'details': {'error': str(e)}
}
async def system_resources_check() -> Dict:
"""システムリソースチェック"""
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
# 閾値チェック
healthy = (
cpu_percent < 80 and
memory.percent < 85 and
disk.percent < 90
)
return {
'healthy': healthy,
'details': {
'cpu_percent': cpu_percent,
'memory_percent': memory.percent,
'disk_percent': disk.percent
}
}
✅ 完了条件¶
監視システム¶
- リアルタイム監視実装
- WebSocket配信機能
- パフォーマンス監視
ログ管理¶
- 構造化ログ実装
- 監査ログ機能
- ログ検索・分析
アラート・通知¶
- アラートシステム実装
- Slack/メール通知
- エスカレーション機能
バックアップ・復旧¶
- 自動バックアップ実装
- 復旧機能実装
- スケジュール実行
ヘルスチェック¶
- システムヘルスチェック
- 外部依存性確認
- 自動診断機能
🔄 プロジェクト完了¶
全子チケット完了により、formauto.call2arm.comフォーム自動化システムの実装完了。
Claude Code実行プロンプト:
フォーム自動化システムの監視・運用機能を実装してください。リアルタイム監視・ログ管理・アラート通知・バックアップ・ヘルスチェック機能により、本格運用に耐える安定したシステムを完成させてください。段階的に実装し、運用準備を整えてください。
表示するデータがありません
操作