プロジェクト

全般

プロフィール

バグ #952

未完了

【948-4】自動化機能統合 - Playwright/VPN/CAPTCHA

Redmine Admin さんが2ヶ月前に追加.

ステータス:
新規
優先度:
高め
担当者:
-
開始日:
2025-07-31
期日:
進捗率:

0%

予定工数:

説明

【子チケット4】自動化機能統合 - Playwright/VPN/CAPTCHA

🎯 目的

Playwright・VPN・CAPTCHA統合により、実際のフォーム自動化機能を実装し、システムの核となる自動化エンジンを完成させる。

📋 実装内容

1. Playwright統合実装

フォーム検出エンジン

# app/services/form_detection_service.py
from playwright.async_api import Page
from typing import Dict, List, Optional
import logging

logger = logging.getLogger(__name__)

class FormDetectionService:
    """フォーム検出・分析サービス"""
    
    async def detect_forms(self, page: Page) -> List[Dict]:
        """ページ内のフォーム検出"""
        forms = []
        
        # フォーム要素検出
        form_elements = await page.query_selector_all('form')
        
        for i, form in enumerate(form_elements):
            form_info = await self._analyze_form(page, form, i)
            if form_info:
                forms.append(form_info)
        
        return forms
    
    async def _analyze_form(self, page: Page, form_element, index: int) -> Optional[Dict]:
        """個別フォーム分析"""
        try:
            # フォーム基本情報
            action = await form_element.get_attribute('action') or page.url
            method = await form_element.get_attribute('method') or 'GET'
            
            # 入力フィールド検出
            fields = await self._detect_form_fields(form_element)
            
            # 送信ボタン検出
            submit_buttons = await self._detect_submit_buttons(form_element)
            
            # フォーム用途推定
            form_type = await self._estimate_form_type(fields)
            
            return {
                'index': index,
                'action': action,
                'method': method,
                'fields': fields,
                'submit_buttons': submit_buttons,
                'form_type': form_type,
                'selector': f'form:nth-child({index + 1})'
            }
            
        except Exception as e:
            logger.error(f"Form analysis failed: {e}")
            return None

    async def _detect_form_fields(self, form_element) -> List[Dict]:
        """フォームフィールド検出"""
        fields = []
        
        # input要素
        inputs = await form_element.query_selector_all('input')
        for input_elem in inputs:
            field_info = await self._analyze_input_field(input_elem)
            if field_info:
                fields.append(field_info)
        
        # textarea要素
        textareas = await form_element.query_selector_all('textarea')
        for textarea in textareas:
            field_info = await self._analyze_textarea_field(textarea)
            if field_info:
                fields.append(field_info)
                
        # select要素
        selects = await form_element.query_selector_all('select')
        for select in selects:
            field_info = await self._analyze_select_field(select)
            if field_info:
                fields.append(field_info)
        
        return fields

    async def _analyze_input_field(self, input_elem) -> Optional[Dict]:
        """input要素分析"""
        try:
            input_type = await input_elem.get_attribute('type') or 'text'
            name = await input_elem.get_attribute('name')
            placeholder = await input_elem.get_attribute('placeholder')
            required = await input_elem.get_attribute('required') is not None
            
            # 隠しフィールドや送信ボタンは除外
            if input_type in ['hidden', 'submit', 'button', 'reset']:
                return None
            
            return {
                'type': 'input',
                'input_type': input_type,
                'name': name,
                'placeholder': placeholder,
                'required': required,
                'field_purpose': self._estimate_field_purpose(name, placeholder)
            }
        except Exception:
            return None

    def _estimate_field_purpose(self, name: str, placeholder: str) -> str:
        """フィールド用途推定"""
        text = f"{name or ''} {placeholder or ''}".lower()
        
        if any(keyword in text for keyword in ['会社', 'company', '企業']):
            return 'company_name'
        elif any(keyword in text for keyword in ['名前', 'name', '氏名']):
            return 'contact_name'
        elif any(keyword in text for keyword in ['メール', 'email', 'mail']):
            return 'email'
        elif any(keyword in text for keyword in ['電話', 'phone', 'tel']):
            return 'phone'
        elif any(keyword in text for keyword in ['件名', 'subject', 'title']):
            return 'subject'
        elif any(keyword in text for keyword in ['内容', 'message', '問い合わせ']):
            return 'message'
        else:
            return 'other'

フォーム入力エンジン

# app/services/form_filling_service.py
from playwright.async_api import Page
from typing import Dict, Any
import logging

logger = logging.getLogger(__name__)

class FormFillingService:
    """フォーム入力サービス"""
    
    async def fill_form(self, page: Page, form_data: Dict, template: Dict, url_data: Dict):
        """フォーム入力実行"""
        try:
            # テンプレートから実際の入力値を生成
            input_values = self._generate_input_values(template, url_data)
            
            # フィールドごとに入力
            for field in form_data['fields']:
                await self._fill_field(page, field, input_values)
            
            # 入力後の待機
            await page.wait_for_timeout(1000)
            
        except Exception as e:
            logger.error(f"Form filling failed: {e}")
            raise

    def _generate_input_values(self, template: Dict, url_data: Dict) -> Dict:
        """テンプレートから入力値生成"""
        values = {}
        
        # テンプレート変数の置換
        for field_purpose, template_value in template.get('fields', {}).items():
            # 変数置換
            actual_value = template_value
            for var_name, var_value in url_data.items():
                actual_value = actual_value.replace(f'{{{var_name}}}', str(var_value))
            
            values[field_purpose] = actual_value
        
        return values

    async def _fill_field(self, page: Page, field: Dict, input_values: Dict):
        """個別フィールド入力"""
        field_purpose = field.get('field_purpose')
        if field_purpose not in input_values:
            return
        
        value = input_values[field_purpose]
        field_name = field.get('name')
        
        try:
            if field['type'] == 'input':
                if field['input_type'] == 'checkbox':
                    if value and str(value).lower() in ['true', '1', 'yes']:
                        await page.check(f'input[name="{field_name}"]')
                elif field['input_type'] == 'radio':
                    await page.check(f'input[name="{field_name}"][value="{value}"]')
                else:
                    await page.fill(f'input[name="{field_name}"]', str(value))
            
            elif field['type'] == 'textarea':
                await page.fill(f'textarea[name="{field_name}"]', str(value))
            
            elif field['type'] == 'select':
                await page.select_option(f'select[name="{field_name}"]', str(value))
                
        except Exception as e:
            logger.warning(f"Field filling failed for {field_name}: {e}")

2. VPN統合実装

VPNサービス基盤

# app/services/vpn_service.py
from abc import ABC, abstractmethod
from typing import Dict, List, Optional
import asyncio
import subprocess
import logging

logger = logging.getLogger(__name__)

class VPNProvider(ABC):
    """VPNプロバイダー抽象基底クラス"""
    
    @abstractmethod
    async def connect(self, config: Dict) -> bool:
        pass
    
    @abstractmethod
    async def disconnect(self) -> bool:
        pass
    
    @abstractmethod
    async def get_current_ip(self) -> str:
        pass
    
    @abstractmethod
    async def list_servers(self) -> List[Dict]:
        pass

class NordVPNProvider(VPNProvider):
    """NordVPN実装"""
    
    def __init__(self, username: str, password: str):
        self.username = username
        self.password = password
        self.current_server = None
    
    async def connect(self, config: Dict) -> bool:
        try:
            # ログイン
            await self._login()
            
            # サーバー接続
            server = config.get('server', 'auto')
            result = await self._run_command(['nordvpn', 'connect', server])
            
            if 'connected' in result.lower():
                self.current_server = server
                return True
            return False
            
        except Exception as e:
            logger.error(f"NordVPN connection failed: {e}")
            return False
    
    async def disconnect(self) -> bool:
        try:
            result = await self._run_command(['nordvpn', 'disconnect'])
            self.current_server = None
            return 'disconnected' in result.lower()
        except Exception as e:
            logger.error(f"NordVPN disconnection failed: {e}")
            return False
    
    async def _login(self):
        """NordVPNログイン"""
        await self._run_command(['nordvpn', 'login', '--username', self.username, '--password', self.password])
    
    async def _run_command(self, cmd: List[str]) -> str:
        """コマンド実行"""
        process = await asyncio.create_subprocess_exec(
            *cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )
        stdout, stderr = await process.communicate()
        
        if process.returncode != 0:
            raise Exception(f"Command failed: {stderr.decode()}")
        
        return stdout.decode()

class VPNService:
    """VPN管理サービス"""
    
    def __init__(self):
        self.providers = {}
        self.current_provider = None
        self._init_providers()
    
    def _init_providers(self):
        """プロバイダー初期化"""
        from app.core.config import settings
        
        if settings.NORDVPN_USERNAME and settings.NORDVPN_PASSWORD:
            self.providers['nordvpn'] = NordVPNProvider(
                settings.NORDVPN_USERNAME,
                settings.NORDVPN_PASSWORD
            )
    
    async def connect(self, config: Dict) -> bool:
        """VPN接続"""
        provider_name = config.get('provider', 'nordvpn')
        
        if provider_name not in self.providers:
            raise ValueError(f"VPN provider {provider_name} not configured")
        
        provider = self.providers[provider_name]
        success = await provider.connect(config)
        
        if success:
            self.current_provider = provider
            logger.info(f"Connected to VPN via {provider_name}")
        
        return success
    
    async def disconnect(self) -> bool:
        """VPN切断"""
        if not self.current_provider:
            return True
        
        success = await self.current_provider.disconnect()
        if success:
            self.current_provider = None
            logger.info("Disconnected from VPN")
        
        return success
    
    async def rotate(self, config: Dict = None) -> bool:
        """VPNローテーション"""
        if self.current_provider:
            await self.disconnect()
            await asyncio.sleep(2)  # 待機
        
        return await self.connect(config or {})

3. CAPTCHA統合実装

CAPTCHA解決サービス

# app/services/captcha_service.py
from abc import ABC, abstractmethod
from typing import Dict, Optional
import httpx
import asyncio
import base64
import logging

logger = logging.getLogger(__name__)

class CaptchaProvider(ABC):
    """CAPTCHA解決プロバイダー抽象基底クラス"""
    
    @abstractmethod
    async def solve_image_captcha(self, image_data: bytes) -> Optional[str]:
        pass
    
    @abstractmethod
    async def solve_recaptcha(self, site_key: str, page_url: str) -> Optional[str]:
        pass

class CapsolverProvider(CaptchaProvider):
    """Capsolver実装"""
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.capsolver.com"
    
    async def solve_image_captcha(self, image_data: bytes) -> Optional[str]:
        """画像CAPTCHA解決"""
        try:
            # Base64エンコード
            image_base64 = base64.b64encode(image_data).decode()
            
            # タスク作成
            async with httpx.AsyncClient() as client:
                create_response = await client.post(
                    f"{self.base_url}/createTask",
                    json={
                        "clientKey": self.api_key,
                        "task": {
                            "type": "ImageToTextTask",
                            "body": image_base64
                        }
                    }
                )
                
                if create_response.status_code != 200:
                    logger.error(f"Capsolver create task failed: {create_response.text}")
                    return None
                
                create_data = create_response.json()
                if create_data.get("errorId") != 0:
                    logger.error(f"Capsolver error: {create_data.get('errorDescription')}")
                    return None
                
                task_id = create_data["taskId"]
                
                # 結果取得(ポーリング)
                for _ in range(30):  # 最大30回試行
                    await asyncio.sleep(2)
                    
                    result_response = await client.post(
                        f"{self.base_url}/getTaskResult",
                        json={
                            "clientKey": self.api_key,
                            "taskId": task_id
                        }
                    )
                    
                    if result_response.status_code == 200:
                        result_data = result_response.json()
                        
                        if result_data.get("status") == "ready":
                            return result_data["solution"]["text"]
                        elif result_data.get("status") == "failed":
                            logger.error(f"Capsolver task failed: {result_data}")
                            return None
                
                logger.error("Capsolver timeout")
                return None
                
        except Exception as e:
            logger.error(f"Capsolver image captcha failed: {e}")
            return None

    async def solve_recaptcha(self, site_key: str, page_url: str) -> Optional[str]:
        """reCAPTCHA解決"""
        try:
            async with httpx.AsyncClient() as client:
                # タスク作成
                create_response = await client.post(
                    f"{self.base_url}/createTask",
                    json={
                        "clientKey": self.api_key,
                        "task": {
                            "type": "ReCaptchaV2TaskProxyless",
                            "websiteURL": page_url,
                            "websiteKey": site_key
                        }
                    }
                )
                
                create_data = create_response.json()
                if create_data.get("errorId") != 0:
                    logger.error(f"Capsolver reCAPTCHA error: {create_data}")
                    return None
                
                task_id = create_data["taskId"]
                
                # 結果取得
                for _ in range(60):  # reCAPTCHAは時間がかかる
                    await asyncio.sleep(3)
                    
                    result_response = await client.post(
                        f"{self.base_url}/getTaskResult",
                        json={
                            "clientKey": self.api_key,
                            "taskId": task_id
                        }
                    )
                    
                    result_data = result_response.json()
                    if result_data.get("status") == "ready":
                        return result_data["solution"]["gRecaptchaResponse"]
                    elif result_data.get("status") == "failed":
                        return None
                
                return None
                
        except Exception as e:
            logger.error(f"Capsolver reCAPTCHA failed: {e}")
            return None

class CaptchaService:
    """CAPTCHA管理サービス"""
    
    def __init__(self):
        self.providers = {}
        self._init_providers()
    
    def _init_providers(self):
        """プロバイダー初期化"""
        from app.core.config import settings
        
        if settings.CAPSOLVER_API_KEY:
            self.providers['capsolver'] = CapsolverProvider(settings.CAPSOLVER_API_KEY)
    
    async def detect_captcha(self, page) -> Optional[Dict]:
        """CAPTCHA検出"""
        try:
            # reCAPTCHA検出
            recaptcha = await page.query_selector('.g-recaptcha')
            if recaptcha:
                site_key = await recaptcha.get_attribute('data-sitekey')
                return {
                    'type': 'recaptcha',
                    'site_key': site_key,
                    'element': recaptcha
                }
            
            # 画像CAPTCHA検出
            captcha_images = await page.query_selector_all('img[src*="captcha"], img[alt*="captcha"]')
            if captcha_images:
                return {
                    'type': 'image',
                    'element': captcha_images[0]
                }
            
            return None
            
        except Exception as e:
            logger.error(f"CAPTCHA detection failed: {e}")
            return None
    
    async def solve_captcha(self, page, captcha_info: Dict, provider_name: str = 'capsolver') -> bool:
        """CAPTCHA解決"""
        if provider_name not in self.providers:
            logger.error(f"CAPTCHA provider {provider_name} not available")
            return False
        
        provider = self.providers[provider_name]
        
        try:
            if captcha_info['type'] == 'recaptcha':
                # reCAPTCHA解決
                solution = await provider.solve_recaptcha(
                    captcha_info['site_key'],
                    page.url
                )
                
                if solution:
                    # 解決結果を挿入
                    await page.evaluate(f"""
                        document.querySelector('#g-recaptcha-response').innerHTML = '{solution}';
                        if (window.grecaptcha && window.grecaptcha.getResponse) {{
                            window.grecaptcha.getResponse = function() {{ return '{solution}'; }};
                        }}
                    """)
                    return True
            
            elif captcha_info['type'] == 'image':
                # 画像CAPTCHA解決
                element = captcha_info['element']
                screenshot = await element.screenshot()
                
                solution = await provider.solve_image_captcha(screenshot)
                
                if solution:
                    # 入力フィールドを検索して入力
                    input_field = await page.query_selector('input[name*="captcha"], input[id*="captcha"]')
                    if input_field:
                        await input_field.fill(solution)
                        return True
            
            return False
            
        except Exception as e:
            logger.error(f"CAPTCHA solving failed: {e}")
            return False

4. 統合自動化エンジン

メイン自動化クラス

# app/services/automation_engine.py
from playwright.async_api import async_playwright, Page
from app.services.form_detection_service import FormDetectionService
from app.services.form_filling_service import FormFillingService
from app.services.vpn_service import VPNService
from app.services.captcha_service import CaptchaService
from typing import Dict, List
import asyncio
import logging

logger = logging.getLogger(__name__)

class AutomationEngine:
    """フォーム自動化エンジン"""
    
    def __init__(self):
        self.form_detector = FormDetectionService()
        self.form_filler = FormFillingService()
        self.vpn_service = VPNService()
        self.captcha_service = CaptchaService()
    
    async def execute_automation(self, task_config: Dict) -> Dict:
        """自動化実行"""
        try:
            # VPN接続
            if task_config.get('use_vpn'):
                vpn_success = await self.vpn_service.connect(task_config['vpn_config'])
                if not vpn_success:
                    logger.warning("VPN connection failed, proceeding without VPN")
            
            # ブラウザ起動
            async with async_playwright() as playwright:
                browser = await playwright.chromium.launch(
                    headless=task_config.get('headless', True),
                    args=['--no-sandbox', '--disable-web-security']
                )
                
                context = await browser.new_context(
                    viewport={'width': 1366, 'height': 768},
                    user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
                )
                
                results = []
                urls = task_config['urls']
                
                for i, url_data in enumerate(urls):
                    try:
                        result = await self._process_single_url(
                            context, url_data, task_config
                        )
                        results.append(result)
                        
                        # VPNローテーション
                        if (task_config.get('use_vpn') and 
                            task_config.get('vpn_rotation_interval') and
                            (i + 1) % task_config['vpn_rotation_interval'] == 0):
                            await self.vpn_service.rotate(task_config['vpn_config'])
                            await asyncio.sleep(5)  # 接続安定化待機
                        
                        # レート制限
                        if task_config.get('delay_between_requests'):
                            await asyncio.sleep(task_config['delay_between_requests'])
                            
                    except Exception as e:
                        logger.error(f"URL processing failed: {e}")
                        results.append({
                            'url': url_data['url'],
                            'status': 'failed',
                            'error': str(e)
                        })
                
                await browser.close()
                
            return {
                'total_processed': len(results),
                'successful': len([r for r in results if r['status'] == 'success']),
                'failed': len([r for r in results if r['status'] == 'failed']),
                'results': results
            }
            
        except Exception as e:
            logger.error(f"Automation execution failed: {e}")
            raise
        finally:
            # VPN切断
            if task_config.get('use_vpn'):
                await self.vpn_service.disconnect()
    
    async def _process_single_url(self, context, url_data: Dict, task_config: Dict) -> Dict:
        """単一URL処理"""
        page = await context.new_page()
        
        try:
            # ページアクセス
            response = await page.goto(
                url_data['url'], 
                timeout=task_config.get('page_timeout', 30000)
            )
            
            if not response or response.status >= 400:
                return {
                    'url': url_data['url'],
                    'status': 'failed',
                    'error': f'HTTP {response.status if response else "No response"}'
                }
            
            # ページ読み込み完了待機
            await page.wait_for_load_state('networkidle')
            
            # フォーム検出
            forms = await self.form_detector.detect_forms(page)
            if not forms:
                return {
                    'url': url_data['url'],
                    'status': 'failed',
                    'error': 'No suitable form found'
                }
            
            # 最適なフォーム選択(最初の問い合わせフォームを選択)
            target_form = self._select_best_form(forms)
            
            # フォーム入力
            await self.form_filler.fill_form(
                page, target_form, task_config['template'], url_data
            )
            
            # CAPTCHA処理
            captcha_info = await self.captcha_service.detect_captcha(page)
            if captcha_info:
                captcha_solved = await self.captcha_service.solve_captcha(
                    page, captcha_info, task_config.get('captcha_provider', 'capsolver')
                )
                if not captcha_solved:
                    return {
                        'url': url_data['url'],
                        'status': 'failed',
                        'error': 'CAPTCHA solving failed'
                    }
            
            # フォーム送信
            success = await self._submit_form(page, target_form)
            
            if success:
                # 送信結果確認
                success_confirmed = await self._verify_submission_success(page)
                return {
                    'url': url_data['url'],
                    'status': 'success' if success_confirmed else 'partial',
                    'form_submitted': True,
                    'confirmation_verified': success_confirmed
                }
            else:
                return {
                    'url': url_data['url'],
                    'status': 'failed',
                    'error': 'Form submission failed'
                }
            
        except Exception as e:
            logger.error(f"Single URL processing failed: {e}")
            return {
                'url': url_data['url'],
                'status': 'failed',
                'error': str(e)
            }
        finally:
            await page.close()
    
    def _select_best_form(self, forms: List[Dict]) -> Dict:
        """最適なフォーム選択"""
        # 問い合わせフォームを優先
        for form in forms:
            if form['form_type'] in ['contact', 'inquiry']:
                return form
        
        # フィールド数が多いフォームを選択
        return max(forms, key=lambda f: len(f['fields']))
    
    async def _submit_form(self, page: Page, form: Dict) -> bool:
        """フォーム送信"""
        try:
            # 送信ボタンクリック
            submit_buttons = form['submit_buttons']
            if submit_buttons:
                await page.click(submit_buttons[0]['selector'])
            else:
                # フォーム送信(Enterキー)
                await page.keyboard.press('Enter')
            
            # 送信後の待機
            await page.wait_for_timeout(3000)
            return True
            
        except Exception as e:
            logger.error(f"Form submission failed: {e}")
            return False
    
    async def _verify_submission_success(self, page: Page) -> bool:
        """送信成功確認"""
        try:
            # 成功メッセージの検出
            success_indicators = [
                'ありがとう', '送信完了', '受付', '確認',
                'thank you', 'success', 'submitted', 'received'
            ]
            
            page_content = await page.content()
            page_text = page_content.lower()
            
            return any(indicator in page_text for indicator in success_indicators)
            
        except Exception:
            return False

✅ 完了条件

Playwright統合

  • フォーム検出エンジン実装
  • フォーム入力エンジン実装
  • ブラウザ自動化基盤構築

VPN統合

  • VPNプロバイダー抽象化
  • NordVPN実装
  • VPNローテーション機能

CAPTCHA統合

  • CAPTCHAプロバイダー抽象化
  • Capsolver統合
  • 画像・reCAPTCHA対応

統合エンジン

  • 自動化エンジン実装
  • エラーハンドリング
  • ログ・監視機能

動作確認

  • 実際のフォーム送信テスト
  • VPN切り替えテスト
  • CAPTCHA解決テスト

🔄 次のステップ

自動化機能完了後、子チケット5(監視・ログ機能)に移行。


Claude Code実行プロンプト:

フォーム自動化システムの核となる自動化機能を実装してください。Playwright・VPN・CAPTCHA統合による実際のフォーム自動化エンジンを構築し、フォーム検出から送信まで完全自動化を実現してください。段階的に機能を実装し、各コンポーネントの動作確認を行ってください。

表示するデータがありません

他の形式にエクスポート: Atom PDF