操作
バグ #952
未完了【948-4】自動化機能統合 - Playwright/VPN/CAPTCHA
ステータス:
新規
優先度:
高め
担当者:
-
開始日:
2025-07-31
期日:
進捗率:
0%
予定工数:
説明
【子チケット4】自動化機能統合 - Playwright/VPN/CAPTCHA¶
🎯 目的¶
Playwright・VPN・CAPTCHA統合により、実際のフォーム自動化機能を実装し、システムの核となる自動化エンジンを完成させる。
📋 実装内容¶
1. Playwright統合実装¶
フォーム検出エンジン¶
# app/services/form_detection_service.py
from playwright.async_api import Page
from typing import Dict, List, Optional
import logging
logger = logging.getLogger(__name__)
class FormDetectionService:
"""フォーム検出・分析サービス"""
async def detect_forms(self, page: Page) -> List[Dict]:
"""ページ内のフォーム検出"""
forms = []
# フォーム要素検出
form_elements = await page.query_selector_all('form')
for i, form in enumerate(form_elements):
form_info = await self._analyze_form(page, form, i)
if form_info:
forms.append(form_info)
return forms
async def _analyze_form(self, page: Page, form_element, index: int) -> Optional[Dict]:
"""個別フォーム分析"""
try:
# フォーム基本情報
action = await form_element.get_attribute('action') or page.url
method = await form_element.get_attribute('method') or 'GET'
# 入力フィールド検出
fields = await self._detect_form_fields(form_element)
# 送信ボタン検出
submit_buttons = await self._detect_submit_buttons(form_element)
# フォーム用途推定
form_type = await self._estimate_form_type(fields)
return {
'index': index,
'action': action,
'method': method,
'fields': fields,
'submit_buttons': submit_buttons,
'form_type': form_type,
'selector': f'form:nth-child({index + 1})'
}
except Exception as e:
logger.error(f"Form analysis failed: {e}")
return None
async def _detect_form_fields(self, form_element) -> List[Dict]:
"""フォームフィールド検出"""
fields = []
# input要素
inputs = await form_element.query_selector_all('input')
for input_elem in inputs:
field_info = await self._analyze_input_field(input_elem)
if field_info:
fields.append(field_info)
# textarea要素
textareas = await form_element.query_selector_all('textarea')
for textarea in textareas:
field_info = await self._analyze_textarea_field(textarea)
if field_info:
fields.append(field_info)
# select要素
selects = await form_element.query_selector_all('select')
for select in selects:
field_info = await self._analyze_select_field(select)
if field_info:
fields.append(field_info)
return fields
async def _analyze_input_field(self, input_elem) -> Optional[Dict]:
"""input要素分析"""
try:
input_type = await input_elem.get_attribute('type') or 'text'
name = await input_elem.get_attribute('name')
placeholder = await input_elem.get_attribute('placeholder')
required = await input_elem.get_attribute('required') is not None
# 隠しフィールドや送信ボタンは除外
if input_type in ['hidden', 'submit', 'button', 'reset']:
return None
return {
'type': 'input',
'input_type': input_type,
'name': name,
'placeholder': placeholder,
'required': required,
'field_purpose': self._estimate_field_purpose(name, placeholder)
}
except Exception:
return None
def _estimate_field_purpose(self, name: str, placeholder: str) -> str:
"""フィールド用途推定"""
text = f"{name or ''} {placeholder or ''}".lower()
if any(keyword in text for keyword in ['会社', 'company', '企業']):
return 'company_name'
elif any(keyword in text for keyword in ['名前', 'name', '氏名']):
return 'contact_name'
elif any(keyword in text for keyword in ['メール', 'email', 'mail']):
return 'email'
elif any(keyword in text for keyword in ['電話', 'phone', 'tel']):
return 'phone'
elif any(keyword in text for keyword in ['件名', 'subject', 'title']):
return 'subject'
elif any(keyword in text for keyword in ['内容', 'message', '問い合わせ']):
return 'message'
else:
return 'other'
フォーム入力エンジン¶
# app/services/form_filling_service.py
from playwright.async_api import Page
from typing import Dict, Any
import logging
logger = logging.getLogger(__name__)
class FormFillingService:
"""フォーム入力サービス"""
async def fill_form(self, page: Page, form_data: Dict, template: Dict, url_data: Dict):
"""フォーム入力実行"""
try:
# テンプレートから実際の入力値を生成
input_values = self._generate_input_values(template, url_data)
# フィールドごとに入力
for field in form_data['fields']:
await self._fill_field(page, field, input_values)
# 入力後の待機
await page.wait_for_timeout(1000)
except Exception as e:
logger.error(f"Form filling failed: {e}")
raise
def _generate_input_values(self, template: Dict, url_data: Dict) -> Dict:
"""テンプレートから入力値生成"""
values = {}
# テンプレート変数の置換
for field_purpose, template_value in template.get('fields', {}).items():
# 変数置換
actual_value = template_value
for var_name, var_value in url_data.items():
actual_value = actual_value.replace(f'{{{var_name}}}', str(var_value))
values[field_purpose] = actual_value
return values
async def _fill_field(self, page: Page, field: Dict, input_values: Dict):
"""個別フィールド入力"""
field_purpose = field.get('field_purpose')
if field_purpose not in input_values:
return
value = input_values[field_purpose]
field_name = field.get('name')
try:
if field['type'] == 'input':
if field['input_type'] == 'checkbox':
if value and str(value).lower() in ['true', '1', 'yes']:
await page.check(f'input[name="{field_name}"]')
elif field['input_type'] == 'radio':
await page.check(f'input[name="{field_name}"][value="{value}"]')
else:
await page.fill(f'input[name="{field_name}"]', str(value))
elif field['type'] == 'textarea':
await page.fill(f'textarea[name="{field_name}"]', str(value))
elif field['type'] == 'select':
await page.select_option(f'select[name="{field_name}"]', str(value))
except Exception as e:
logger.warning(f"Field filling failed for {field_name}: {e}")
2. VPN統合実装¶
VPNサービス基盤¶
# app/services/vpn_service.py
from abc import ABC, abstractmethod
from typing import Dict, List, Optional
import asyncio
import subprocess
import logging
logger = logging.getLogger(__name__)
class VPNProvider(ABC):
"""VPNプロバイダー抽象基底クラス"""
@abstractmethod
async def connect(self, config: Dict) -> bool:
pass
@abstractmethod
async def disconnect(self) -> bool:
pass
@abstractmethod
async def get_current_ip(self) -> str:
pass
@abstractmethod
async def list_servers(self) -> List[Dict]:
pass
class NordVPNProvider(VPNProvider):
"""NordVPN実装"""
def __init__(self, username: str, password: str):
self.username = username
self.password = password
self.current_server = None
async def connect(self, config: Dict) -> bool:
try:
# ログイン
await self._login()
# サーバー接続
server = config.get('server', 'auto')
result = await self._run_command(['nordvpn', 'connect', server])
if 'connected' in result.lower():
self.current_server = server
return True
return False
except Exception as e:
logger.error(f"NordVPN connection failed: {e}")
return False
async def disconnect(self) -> bool:
try:
result = await self._run_command(['nordvpn', 'disconnect'])
self.current_server = None
return 'disconnected' in result.lower()
except Exception as e:
logger.error(f"NordVPN disconnection failed: {e}")
return False
async def _login(self):
"""NordVPNログイン"""
await self._run_command(['nordvpn', 'login', '--username', self.username, '--password', self.password])
async def _run_command(self, cmd: List[str]) -> str:
"""コマンド実行"""
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
raise Exception(f"Command failed: {stderr.decode()}")
return stdout.decode()
class VPNService:
"""VPN管理サービス"""
def __init__(self):
self.providers = {}
self.current_provider = None
self._init_providers()
def _init_providers(self):
"""プロバイダー初期化"""
from app.core.config import settings
if settings.NORDVPN_USERNAME and settings.NORDVPN_PASSWORD:
self.providers['nordvpn'] = NordVPNProvider(
settings.NORDVPN_USERNAME,
settings.NORDVPN_PASSWORD
)
async def connect(self, config: Dict) -> bool:
"""VPN接続"""
provider_name = config.get('provider', 'nordvpn')
if provider_name not in self.providers:
raise ValueError(f"VPN provider {provider_name} not configured")
provider = self.providers[provider_name]
success = await provider.connect(config)
if success:
self.current_provider = provider
logger.info(f"Connected to VPN via {provider_name}")
return success
async def disconnect(self) -> bool:
"""VPN切断"""
if not self.current_provider:
return True
success = await self.current_provider.disconnect()
if success:
self.current_provider = None
logger.info("Disconnected from VPN")
return success
async def rotate(self, config: Dict = None) -> bool:
"""VPNローテーション"""
if self.current_provider:
await self.disconnect()
await asyncio.sleep(2) # 待機
return await self.connect(config or {})
3. CAPTCHA統合実装¶
CAPTCHA解決サービス¶
# app/services/captcha_service.py
from abc import ABC, abstractmethod
from typing import Dict, Optional
import httpx
import asyncio
import base64
import logging
logger = logging.getLogger(__name__)
class CaptchaProvider(ABC):
"""CAPTCHA解決プロバイダー抽象基底クラス"""
@abstractmethod
async def solve_image_captcha(self, image_data: bytes) -> Optional[str]:
pass
@abstractmethod
async def solve_recaptcha(self, site_key: str, page_url: str) -> Optional[str]:
pass
class CapsolverProvider(CaptchaProvider):
"""Capsolver実装"""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.capsolver.com"
async def solve_image_captcha(self, image_data: bytes) -> Optional[str]:
"""画像CAPTCHA解決"""
try:
# Base64エンコード
image_base64 = base64.b64encode(image_data).decode()
# タスク作成
async with httpx.AsyncClient() as client:
create_response = await client.post(
f"{self.base_url}/createTask",
json={
"clientKey": self.api_key,
"task": {
"type": "ImageToTextTask",
"body": image_base64
}
}
)
if create_response.status_code != 200:
logger.error(f"Capsolver create task failed: {create_response.text}")
return None
create_data = create_response.json()
if create_data.get("errorId") != 0:
logger.error(f"Capsolver error: {create_data.get('errorDescription')}")
return None
task_id = create_data["taskId"]
# 結果取得(ポーリング)
for _ in range(30): # 最大30回試行
await asyncio.sleep(2)
result_response = await client.post(
f"{self.base_url}/getTaskResult",
json={
"clientKey": self.api_key,
"taskId": task_id
}
)
if result_response.status_code == 200:
result_data = result_response.json()
if result_data.get("status") == "ready":
return result_data["solution"]["text"]
elif result_data.get("status") == "failed":
logger.error(f"Capsolver task failed: {result_data}")
return None
logger.error("Capsolver timeout")
return None
except Exception as e:
logger.error(f"Capsolver image captcha failed: {e}")
return None
async def solve_recaptcha(self, site_key: str, page_url: str) -> Optional[str]:
"""reCAPTCHA解決"""
try:
async with httpx.AsyncClient() as client:
# タスク作成
create_response = await client.post(
f"{self.base_url}/createTask",
json={
"clientKey": self.api_key,
"task": {
"type": "ReCaptchaV2TaskProxyless",
"websiteURL": page_url,
"websiteKey": site_key
}
}
)
create_data = create_response.json()
if create_data.get("errorId") != 0:
logger.error(f"Capsolver reCAPTCHA error: {create_data}")
return None
task_id = create_data["taskId"]
# 結果取得
for _ in range(60): # reCAPTCHAは時間がかかる
await asyncio.sleep(3)
result_response = await client.post(
f"{self.base_url}/getTaskResult",
json={
"clientKey": self.api_key,
"taskId": task_id
}
)
result_data = result_response.json()
if result_data.get("status") == "ready":
return result_data["solution"]["gRecaptchaResponse"]
elif result_data.get("status") == "failed":
return None
return None
except Exception as e:
logger.error(f"Capsolver reCAPTCHA failed: {e}")
return None
class CaptchaService:
"""CAPTCHA管理サービス"""
def __init__(self):
self.providers = {}
self._init_providers()
def _init_providers(self):
"""プロバイダー初期化"""
from app.core.config import settings
if settings.CAPSOLVER_API_KEY:
self.providers['capsolver'] = CapsolverProvider(settings.CAPSOLVER_API_KEY)
async def detect_captcha(self, page) -> Optional[Dict]:
"""CAPTCHA検出"""
try:
# reCAPTCHA検出
recaptcha = await page.query_selector('.g-recaptcha')
if recaptcha:
site_key = await recaptcha.get_attribute('data-sitekey')
return {
'type': 'recaptcha',
'site_key': site_key,
'element': recaptcha
}
# 画像CAPTCHA検出
captcha_images = await page.query_selector_all('img[src*="captcha"], img[alt*="captcha"]')
if captcha_images:
return {
'type': 'image',
'element': captcha_images[0]
}
return None
except Exception as e:
logger.error(f"CAPTCHA detection failed: {e}")
return None
async def solve_captcha(self, page, captcha_info: Dict, provider_name: str = 'capsolver') -> bool:
"""CAPTCHA解決"""
if provider_name not in self.providers:
logger.error(f"CAPTCHA provider {provider_name} not available")
return False
provider = self.providers[provider_name]
try:
if captcha_info['type'] == 'recaptcha':
# reCAPTCHA解決
solution = await provider.solve_recaptcha(
captcha_info['site_key'],
page.url
)
if solution:
# 解決結果を挿入
await page.evaluate(f"""
document.querySelector('#g-recaptcha-response').innerHTML = '{solution}';
if (window.grecaptcha && window.grecaptcha.getResponse) {{
window.grecaptcha.getResponse = function() {{ return '{solution}'; }};
}}
""")
return True
elif captcha_info['type'] == 'image':
# 画像CAPTCHA解決
element = captcha_info['element']
screenshot = await element.screenshot()
solution = await provider.solve_image_captcha(screenshot)
if solution:
# 入力フィールドを検索して入力
input_field = await page.query_selector('input[name*="captcha"], input[id*="captcha"]')
if input_field:
await input_field.fill(solution)
return True
return False
except Exception as e:
logger.error(f"CAPTCHA solving failed: {e}")
return False
4. 統合自動化エンジン¶
メイン自動化クラス¶
# app/services/automation_engine.py
from playwright.async_api import async_playwright, Page
from app.services.form_detection_service import FormDetectionService
from app.services.form_filling_service import FormFillingService
from app.services.vpn_service import VPNService
from app.services.captcha_service import CaptchaService
from typing import Dict, List
import asyncio
import logging
logger = logging.getLogger(__name__)
class AutomationEngine:
"""フォーム自動化エンジン"""
def __init__(self):
self.form_detector = FormDetectionService()
self.form_filler = FormFillingService()
self.vpn_service = VPNService()
self.captcha_service = CaptchaService()
async def execute_automation(self, task_config: Dict) -> Dict:
"""自動化実行"""
try:
# VPN接続
if task_config.get('use_vpn'):
vpn_success = await self.vpn_service.connect(task_config['vpn_config'])
if not vpn_success:
logger.warning("VPN connection failed, proceeding without VPN")
# ブラウザ起動
async with async_playwright() as playwright:
browser = await playwright.chromium.launch(
headless=task_config.get('headless', True),
args=['--no-sandbox', '--disable-web-security']
)
context = await browser.new_context(
viewport={'width': 1366, 'height': 768},
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
)
results = []
urls = task_config['urls']
for i, url_data in enumerate(urls):
try:
result = await self._process_single_url(
context, url_data, task_config
)
results.append(result)
# VPNローテーション
if (task_config.get('use_vpn') and
task_config.get('vpn_rotation_interval') and
(i + 1) % task_config['vpn_rotation_interval'] == 0):
await self.vpn_service.rotate(task_config['vpn_config'])
await asyncio.sleep(5) # 接続安定化待機
# レート制限
if task_config.get('delay_between_requests'):
await asyncio.sleep(task_config['delay_between_requests'])
except Exception as e:
logger.error(f"URL processing failed: {e}")
results.append({
'url': url_data['url'],
'status': 'failed',
'error': str(e)
})
await browser.close()
return {
'total_processed': len(results),
'successful': len([r for r in results if r['status'] == 'success']),
'failed': len([r for r in results if r['status'] == 'failed']),
'results': results
}
except Exception as e:
logger.error(f"Automation execution failed: {e}")
raise
finally:
# VPN切断
if task_config.get('use_vpn'):
await self.vpn_service.disconnect()
async def _process_single_url(self, context, url_data: Dict, task_config: Dict) -> Dict:
"""単一URL処理"""
page = await context.new_page()
try:
# ページアクセス
response = await page.goto(
url_data['url'],
timeout=task_config.get('page_timeout', 30000)
)
if not response or response.status >= 400:
return {
'url': url_data['url'],
'status': 'failed',
'error': f'HTTP {response.status if response else "No response"}'
}
# ページ読み込み完了待機
await page.wait_for_load_state('networkidle')
# フォーム検出
forms = await self.form_detector.detect_forms(page)
if not forms:
return {
'url': url_data['url'],
'status': 'failed',
'error': 'No suitable form found'
}
# 最適なフォーム選択(最初の問い合わせフォームを選択)
target_form = self._select_best_form(forms)
# フォーム入力
await self.form_filler.fill_form(
page, target_form, task_config['template'], url_data
)
# CAPTCHA処理
captcha_info = await self.captcha_service.detect_captcha(page)
if captcha_info:
captcha_solved = await self.captcha_service.solve_captcha(
page, captcha_info, task_config.get('captcha_provider', 'capsolver')
)
if not captcha_solved:
return {
'url': url_data['url'],
'status': 'failed',
'error': 'CAPTCHA solving failed'
}
# フォーム送信
success = await self._submit_form(page, target_form)
if success:
# 送信結果確認
success_confirmed = await self._verify_submission_success(page)
return {
'url': url_data['url'],
'status': 'success' if success_confirmed else 'partial',
'form_submitted': True,
'confirmation_verified': success_confirmed
}
else:
return {
'url': url_data['url'],
'status': 'failed',
'error': 'Form submission failed'
}
except Exception as e:
logger.error(f"Single URL processing failed: {e}")
return {
'url': url_data['url'],
'status': 'failed',
'error': str(e)
}
finally:
await page.close()
def _select_best_form(self, forms: List[Dict]) -> Dict:
"""最適なフォーム選択"""
# 問い合わせフォームを優先
for form in forms:
if form['form_type'] in ['contact', 'inquiry']:
return form
# フィールド数が多いフォームを選択
return max(forms, key=lambda f: len(f['fields']))
async def _submit_form(self, page: Page, form: Dict) -> bool:
"""フォーム送信"""
try:
# 送信ボタンクリック
submit_buttons = form['submit_buttons']
if submit_buttons:
await page.click(submit_buttons[0]['selector'])
else:
# フォーム送信(Enterキー)
await page.keyboard.press('Enter')
# 送信後の待機
await page.wait_for_timeout(3000)
return True
except Exception as e:
logger.error(f"Form submission failed: {e}")
return False
async def _verify_submission_success(self, page: Page) -> bool:
"""送信成功確認"""
try:
# 成功メッセージの検出
success_indicators = [
'ありがとう', '送信完了', '受付', '確認',
'thank you', 'success', 'submitted', 'received'
]
page_content = await page.content()
page_text = page_content.lower()
return any(indicator in page_text for indicator in success_indicators)
except Exception:
return False
✅ 完了条件¶
Playwright統合¶
- フォーム検出エンジン実装
- フォーム入力エンジン実装
- ブラウザ自動化基盤構築
VPN統合¶
- VPNプロバイダー抽象化
- NordVPN実装
- VPNローテーション機能
CAPTCHA統合¶
- CAPTCHAプロバイダー抽象化
- Capsolver統合
- 画像・reCAPTCHA対応
統合エンジン¶
- 自動化エンジン実装
- エラーハンドリング
- ログ・監視機能
動作確認¶
- 実際のフォーム送信テスト
- VPN切り替えテスト
- CAPTCHA解決テスト
🔄 次のステップ¶
自動化機能完了後、子チケット5(監視・ログ機能)に移行。
Claude Code実行プロンプト:
フォーム自動化システムの核となる自動化機能を実装してください。Playwright・VPN・CAPTCHA統合による実際のフォーム自動化エンジンを構築し、フォーム検出から送信まで完全自動化を実現してください。段階的に機能を実装し、各コンポーネントの動作確認を行ってください。
表示するデータがありません
操作