OWASP Amass と Python 連携: amass-common の役割と Python からの活用術 🐍

セキュリティツール

はじめに ✨

現代の複雑なITインフラにおいて、自組織の「アタックサーフェス(攻撃対象領域)」を正確に把握することは、セキュリティ対策の第一歩です。OWASP Amass は、このアタックサーフェス管理 (ASM: Attack Surface Management) を支援するための強力なオープンソースツールです。主にDNS列挙、ネットワークマッピング、外部資産の発見などを自動で行い、組織がインターネット上で公開している資産の全体像を明らかにします。

Amass は Go 言語で開発されており、その内部では様々な共通機能を提供するパッケージ群が利用されています。その中でも amass/common や関連する共通パッケージ (本記事では便宜上これらを総称して `amass-common` と呼ぶことがあります) は、設定の管理、イベント処理、データ構造の定義など、Amassのコアな動作を支える重要な役割を担っています。

しかし、重要な点として、`amass-common` は Go 言語の内部パッケージであり、Python から直接 `import` して利用できるライブラリではありません

では、Python 開発者にとって Amass や `amass-common` の概念を知る意味はないのでしょうか? いいえ、そんなことはありません。Python はその柔軟性と豊富なライブラリから、セキュリティ運用の自動化や他のツールとの連携において中心的な役割を果たすことが多々あります。Amass を Python スクリプトから呼び出し、その強力な情報収集能力を活用することで、より高度でカスタマイズされたセキュリティワークフローを構築できます。

このブログ記事では、まず OWASP Amass の基本的な概要とインストール方法を解説します。次に、`amass-common` が Amass 内部でどのような役割を果たしているのかを簡単に紹介し、Amass の動作原理への理解を深めます。そして最も重要な部分として、Python の subprocess モジュールなどを使い、Amass を外部コマンドとして実行し、その結果を Python で効果的に処理・活用する方法を、具体的なコード例と共に詳しく解説していきます。💪

この記事を読み終える頃には、Python を使って Amass を自在に操り、アタックサーフェス管理の自動化や効率化を実現するための知識とスキルを身につけていることでしょう。さあ、Amass と Python の連携の世界を探求しましょう!🚀

OWASP Amass のインストールと基本操作 🛠️

Python から Amass を利用するためには、まず Amass 本体が実行可能な状態でシステムにインストールされている必要があります。Amass のインストール方法はいくつか提供されています。

利用環境や好みに合わせて最適なインストール方法を選択してください。
  • Go を使ってインストール (推奨): Go 言語の開発環境がセットアップされている場合、以下のコマンドで簡単にインストールできます。これが最も確実で最新版を得やすい方法です。
    
    go install -v github.com/owasp-amass/amass/v4/...@master
    

    インストール後、$GOPATH/bin にパスが通っていることを確認してください。

  • Docker を使う: Docker がインストールされていれば、公式の Docker イメージを利用できます。ローカル環境を汚さずに試したい場合に便利です。
    
    docker pull owasp/amass
    docker run -v $(pwd)/amass-output:/amass/output owasp/amass enum -d example.com
    

    上記コマンド例では、結果をホストのカレントディレクトリ下の amass-output に出力するようにボリュームをマウントしています。

  • パッケージマネージャーを使う (Linux): 一部の Linux ディストリビューションでは、パッケージマネージャーからインストールできる場合があります (例: Kali Linux)。ただし、バージョンが古い可能性があるため注意が必要です。
    
    sudo apt update && sudo apt install amass
    
  • ビルド済みバイナリを使う: GitHub の Releases ページから、お使いの OS に合わせたビルド済みバイナリをダウンロードして利用することも可能です。ダウンロード後、実行権限を付与し、パスの通ったディレクトリに配置します。

インストールが完了したら、ターミナルで以下のコマンドを実行して、バージョン情報が表示されるか確認しましょう。


amass -version

バージョン番号が表示されれば、インストールは成功です 🎉

Amass は多機能なツールであり、様々なサブコマンドとオプションを持っています。ここでは、Python から利用する際に特によく使われるものをいくつか紹介します。

  • amass enum: 最も基本的なサブコマンドで、指定されたドメインのサブドメインを列挙します。様々なデータソース(パッシブDNS、公開証明書、Webアーカイブ、APIなど)を利用して情報を収集します。
    
    # 特定のドメインのサブドメインを列挙
    amass enum -d example.com
    
    # より網羅的なスキャン (アクティブ偵察も含む)
    amass enum -active -d example.com -p 80,443,8080
    
    # 結果をテキストファイルに出力
    amass enum -d example.com -o results.txt
    
  • amass intel: 特定の組織やドメインに関連する情報を収集します。Whois 情報や逆引き DNS などから、関連ドメインや IP アドレス範囲(ASN)などを発見します。
    
    # 組織名から関連ドメインやASNを調査
    amass intel -org "Example Inc."
    
    # 特定ドメインのWhois情報から調査
    amass intel -whois -d example.com
    
  • amass track: 以前のスキャン結果と比較し、アタックサーフェスの変化(追加・削除されたサブドメインなど)を追跡します。継続的な監視に役立ちます。
    
    # 現在のenum結果と前回の結果を比較
    amass track -d example.com
    
  • amass db: Amass が収集した情報を格納するグラフデータベースを操作します。結果の確認やエクスポートが可能です。
    
    # データベース内の情報を表示
    amass db -show -d example.com
    
    # データベース内の情報をJSON形式でエクスポート
    amass db -json output.json -d example.com
    
  • amass viz: 収集したデータを可視化するためのファイル(HTML/JS、Gephi、Maltego など)を生成します。ネットワーク構造の理解に役立ちます。
    
    # D3.js を使ったインタラクティブなグラフを生成
    amass viz -d3 -d example.com -o graph.html
    

これらのサブコマンドには、さらに多くのオプションが存在します。例えば、使用するデータソースの指定、スキャンの深さや速度の調整、出力フォーマットの変更などが可能です。Python から利用する際には、特に以下のオプションが重要になります。

  • -config <path>: カスタム設定ファイルを指定します。APIキーの設定やスキャン挙動の細かい調整に必須です。
  • -dir <path>: Amass が使用するデータベースやログファイルの出力ディレクトリを指定します。デフォルトは OS 標準の一時ディレクトリ (~/.config/amass など) です。
  • -json <path>: 結果を JSON 形式で指定したファイルに出力します。Python で処理する際に非常に便利です。
  • -o <path>: 結果をプレーンテキスト形式で指定したファイルに出力します。
  • -d <domain>: 対象ドメインを指定します。複数指定可能です。
  • -df <path>: 対象ドメインがリストされたファイルを指定します。
  • -p <ports>: アクティブ偵察で調査するポート番号を指定します (enum -active と併用)。
  • -brute: サブドメインのブルートフォースを実施します。
  • -passive: パッシブな情報源のみを使用します (デフォルト)。
  • -active: アクティブな偵察手法 (DNS解決確認、証明書取得など) も使用します。

これらのコマンドとオプションを組み合わせることで、目的に合わせた情報収集が可能になります。詳細なオプションについては、amass <subcommand> -h でヘルプを参照してください。

amass-common パッケージの概要 (Go言語の視点から) 🧩

ここで一度立ち止まって、Amass の内部構造、特に `amass-common` (や関連する共通パッケージ群) がどのような役割を果たしているのかを簡単に見てみましょう。これは Go 言語のコードレベルの話であり、Python から直接触れる部分ではありませんが、Amass の動作を深く理解する上で役立ちます。💡

OWASP Amass は、情報収集、処理、格納、可視化といった複数のステップからなる複雑なプロセスを実行します。これらのプロセス間で共通して必要となる機能をまとめたものが、`amass/common`, `amass/config`, `amass/eventbus`, `amass/graph`, `amass/datasources` といったパッケージ群です。これらが連携して Amass のコアエンジンを形成しています。

主なコンポーネントとその役割は以下の通りです。

  • 設定管理 (amass/config):
    • 設定ファイル (config.ini) の読み込み、解析、管理を担当します。
    • API キー、スコープ定義、データソースの有効/無効、リゾルバー設定、ブラックリストなど、Amass の動作全体に関わる設定情報を保持します。
    • Python から Amass を実行する際も、この設定ファイルを介して詳細な挙動を制御できます (-config オプション)。
  • イベントバス (amass/eventbus):
    • Amass 内部の異なるコンポーネント間 (例: データソース、エンジン、グラフDB) で情報を非同期にやり取りするためのメッセージングシステムです。
    • 新しい名前(サブドメイン候補)の発見、DNS 解決結果、エラー情報などがイベントとしてバス上を流れ、関連するコンポーネントがそれを購読して処理します。
    • これにより、各コンポーネントが疎結合になり、拡張性や保守性が高まります。
  • グラフデータベース (amass/graph):
    • 収集した情報を格納・管理するためのデータ構造です。Amass は内部的にグラフデータベースを使用しており、ドメイン、サブドメイン、IPアドレス、ASN、証明書などのエンティティとその関係性をノードとエッジとして表現します。
    • これにより、情報の関連性を効率的に辿り、複雑な分析を行うことが可能になります。
    • デフォルトではオンメモリまたはファイルベース (指定した -dir 内) のデータベースが使われます。PostgreSQLなどの外部データベースをバックエンドとして利用する設定も可能です。
  • データソース管理 (amass/datasources):
    • VirusTotal, SecurityTrails, Shodan, DNSDB, Certificate Transparency Logs など、多種多様な外部データソースと連携するためのインターフェースやロジックを提供します。
    • 各データソースは「サービス」として実装され、設定ファイルで有効/無効を切り替えたり、APIキーを設定したりできます。
    • 新しいデータソースを追加するためのフレームワークも提供されています。
  • 共通ユーティリティ (amass/common など):
    • DNS クエリの実行、IP アドレスの操作、文字列処理、ファイル操作、レートリミット制御など、様々なモジュールで共通して利用される基本的な関数やデータ型を提供します。

このように、`amass-common` (および関連パッケージ) は、Amass の強力な機能を実現するための基盤技術を提供しています。Python 開発者がこれらの内部構造を完全に理解する必要はありませんが、「Amass は設定ファイルで細かく制御でき、多様なデータソースを使い、結果をグラフ構造で管理している」という点を把握しておくと、Python から Amass を効果的に利用する上で役立ちます。例えば、適切な設定ファイルを用意することの重要性や、得られる情報の種類と構造を理解する助けになります。

Python から OWASP Amass を利用する方法 🐍 + ⚙️

いよいよ本題です。Python スクリプトから OWASP Amass の機能を活用する方法を見ていきましょう。前述の通り、`amass-common` を直接インポートするのではなく、Amass コマンドを外部プロセスとして実行し、その結果を Python で受け取って処理するのが基本的なアプローチとなります。これには Python の標準ライブラリである subprocess モジュールが主に使われます。

subprocess モジュールは、新しいプロセスを生成し、その入出力ストリーム(標準入力、標準出力、標準エラー出力)に接続し、リターンコードを取得するための機能を提供します。

最もシンプルな例として、特定のドメインに対して amass enum を実行し、発見されたサブドメインのリスト(標準出力)を取得してみましょう。


import subprocess
import sys

def run_amass_enum_basic(domain):
    """指定されたドメインに対して amass enum を実行し、標準出力を返す"""
    command = ["amass", "enum", "-d", domain, "-passive"] # passiveモードで実行
    print(f"実行コマンド: {' '.join(command)}")

    try:
        # universal_newlines=True はテキストモードでの入出力を意味し、
        # encoding='utf-8' で文字コードを指定 (環境によっては不要/変更)
        # capture_output=True で標準出力と標準エラー出力をキャプチャ
        result = subprocess.run(
            command,
            capture_output=True,
            text=True,
            encoding='utf-8',
            check=True,  # 戻り値が0以外の場合に CalledProcessError を送出
            timeout=300  # タイムアウトを5分に設定 (必要に応じて調整)
        )

        print("Amass の標準出力:")
        print(result.stdout)
        print("-" * 20)
        print("Amass の標準エラー出力:")
        print(result.stderr) # stderr にも進捗情報などが出力されることがある
        print("-" * 20)

        # 標準出力をリストとして返す (空行を除外)
        subdomains = [line for line in result.stdout.splitlines() if line.strip()]
        return subdomains

    except FileNotFoundError:
        print("エラー: 'amass' コマンドが見つかりません。パスが通っているか確認してください。", file=sys.stderr)
        return None
    except subprocess.CalledProcessError as e:
        print(f"エラー: Amass の実行に失敗しました (リターンコード: {e.returncode})", file=sys.stderr)
        print(f"標準エラー出力:\n{e.stderr}", file=sys.stderr)
        return None
    except subprocess.TimeoutExpired:
        print("エラー: Amass の実行がタイムアウトしました。", file=sys.stderr)
        return None
    except Exception as e:
        print(f"予期せぬエラーが発生しました: {e}", file=sys.stderr)
        return None

# 実行例
target_domain = "example.com" # ここに対象ドメインを指定
found_subdomains = run_amass_enum_basic(target_domain)

if found_subdomains:
    print(f"\n{target_domain} で発見されたサブドメイン:")
    for sub in found_subdomains:
        print(f"- {sub}")
else:
    print(f"\n{target_domain} のサブドメイン取得に失敗しました。")

このコードでは、subprocess.run() を使用しています。

  • command: 実行するコマンドと引数をリストで指定します。
  • capture_output=True: 標準出力と標準エラー出力をキャプチャするように指示します。結果は result.stdoutresult.stderr にバイト文字列として格納されます。
  • text=True (または universal_newlines=True): 出力をテキスト(文字列)としてデコードします。内部的にはシステムのデフォルトエンコーディングが使われます。
  • encoding='utf-8': デコードに使用するエンコーディングを明示的に指定します。環境差異による問題を避けるために推奨されます。
  • check=True: コマンドがゼロ以外のリターンコードで終了した場合(エラーが発生した場合)に subprocess.CalledProcessError 例外を発生させます。
  • timeout: コマンドの実行時間制限(秒)を設定します。Amass は実行に時間がかかることがあるため、適切なタイムアウトを設定することが重要です。

また、try...except ブロックで、コマンドが見つからない場合 (FileNotFoundError)、実行エラー (CalledProcessError)、タイムアウト (TimeoutExpired) などの例外を捕捉し、適切に処理しています。エラーハンドリングは安定したスクリプトを作成する上で不可欠です。✅

Amass の標準出力はシンプルなリスト形式ですが、より詳細な情報(発見元ソース、IPアドレスなど)が必要な場合や、プログラムで処理しやすい形式が必要な場合は、-json オプションが非常に役立ちます。このオプションを使うと、結果が JSON Lines 形式 (各行が独立した JSON オブジェクト) で出力されます。


import subprocess
import json
import sys
import os # ファイルパス操作のため

def run_amass_enum_json(domain, output_dir="amass_output"):
    """
    指定されたドメインに対して amass enum -json を実行し、
    結果のJSONファイルパスを返す
    """
    # 出力ディレクトリが存在しない場合は作成
    os.makedirs(output_dir, exist_ok=True)
    json_output_path = os.path.join(output_dir, f"{domain}_amass.json")

    # amass db サブコマンドを使う場合は -dir で出力先を揃えることが多い
    # amass enum でも -dir を指定してログなどを管理しやすくできる
    amass_db_dir = os.path.join(output_dir, "amass_db")
    os.makedirs(amass_db_dir, exist_ok=True)

    command = [
        "amass", "enum",
        "-d", domain,
        "-passive",
        "-json", json_output_path, # JSON出力ファイルの指定
        "-dir", amass_db_dir       # Amassの作業ディレクトリ指定
    ]
    print(f"実行コマンド: {' '.join(command)}")

    try:
        # JSON出力時は stdout/stderr を直接使う必要性は低い場合が多い
        # (エラーチェックは必要)
        result = subprocess.run(
            command,
            capture_output=True, # エラーメッセージ取得のためキャプチャは行う
            text=True,
            encoding='utf-8',
            check=True,
            timeout=600 # 少し長めのタイムアウト (10分)
        )
        print("Amass の実行が正常に完了しました。")
        print(f"結果は {json_output_path} に出力されました。")
        # stderr に有用な情報が含まれる場合があるため表示
        if result.stderr:
            print("Amass の標準エラー出力 (進捗など):")
            print(result.stderr)
        return json_output_path

    except FileNotFoundError:
        print("エラー: 'amass' コマンドが見つかりません。", file=sys.stderr)
        return None
    except subprocess.CalledProcessError as e:
        print(f"エラー: Amass の実行に失敗しました (リターンコード: {e.returncode})", file=sys.stderr)
        print(f"標準エラー出力:\n{e.stderr}", file=sys.stderr)
        return None
    except subprocess.TimeoutExpired:
        print("エラー: Amass の実行がタイムアウトしました。", file=sys.stderr)
        return None
    except Exception as e:
        print(f"予期せぬエラーが発生しました: {e}", file=sys.stderr)
        return None

def parse_amass_json(json_filepath):
    """Amass が出力した JSON Lines ファイルをパースしてリストで返す"""
    results = []
    if not json_filepath or not os.path.exists(json_filepath):
        print(f"エラー: JSONファイルが見つかりません: {json_filepath}", file=sys.stderr)
        return results

    try:
        with open(json_filepath, 'r', encoding='utf-8') as f:
            for line in f:
                line = line.strip()
                if line:
                    try:
                        data = json.loads(line)
                        results.append(data)
                    except json.JSONDecodeError as e:
                        print(f"警告: JSON行のパースに失敗しました: {e}. 行: {line}", file=sys.stderr)
        return results
    except Exception as e:
        print(f"エラー: JSONファイルの読み込み中にエラーが発生しました: {e}", file=sys.stderr)
        return []

# 実行例
target_domain = "example.com" # ここに対象ドメインを指定
output_directory = "./amass_results" # 出力先ディレクトリ

json_file = run_amass_enum_json(target_domain, output_directory)

if json_file:
    parsed_data = parse_amass_json(json_file)
    if parsed_data:
        print(f"\n{target_domain} に関する情報 (JSONから抜粋):")
        count = 0
        for item in parsed_data:
            # name, domain, addresses, tag, source のキーが存在するか確認
            name = item.get("name", "N/A")
            domain = item.get("domain", "N/A")
            tag = item.get("tag", "N/A")
            source = item.get("source", "N/A")
            addresses = item.get("addresses", [])
            ip_list = [addr.get("ip", "N/A") for addr in addresses]

            print(f"- 名前: {name}, ドメイン: {domain}, 種別: {tag}, 発見元: {source}, IP: {', '.join(ip_list)}")
            count += 1
            if count >= 10: # とりあえず最初の10件だけ表示
                print("... (以降省略)")
                break
    else:
        print("JSONデータのパースに失敗またはデータがありませんでした。")

この例では、まず run_amass_enum_json 関数で Amass を -json オプション付きで実行し、指定したファイルに結果を出力させます。成功した場合、そのファイルのパスを返します。

次に parse_amass_json 関数で、出力された JSON Lines ファイルを開き、一行ずつ読み込みます。各行は独立した JSON 文字列なので、json.loads() を使って Python の辞書オブジェクトに変換し、リストに追加していきます。

JSON 形式のデータには、サブドメイン名 (name) だけでなく、それが属するドメイン (domain)、関連する IP アドレス (addresses 配列内の ip)、情報の種類を示すタグ (tag、例: dns, cert, api)、発見元のデータソース (source) など、豊富な情報が含まれています。これにより、より詳細な分析や後続処理が可能になります。📊

-dir オプションも併用し、Amass の作業ディレクトリ(内部データベースやログファイルが保存される場所)を指定することで、実行状態の管理がしやすくなります。

Amass の挙動をより細かく制御したい場合、特に API キーが必要なデータソースを利用したり、スキャンのスコープを厳密に定義したりするには、設定ファイル (config.ini) を使用します。

まず、設定ファイルのテンプレートを生成します。


amass config -list > config.ini

生成された config.ini ファイルをテキストエディタで開き、必要な箇所を編集します。例えば、特定のデータソース (例: VirusTotal) の API キーを設定するには、[data_sources.VirusTotal] セクションを探し、apikey = YOUR_API_KEY のように記述します。

設定ファイルを編集したら、Python スクリプトから Amass を実行する際に -config オプションでそのファイルのパスを指定します。


import subprocess
import sys
import os

def run_amass_with_config(domain, config_path, output_dir="amass_output"):
    """設定ファイルを指定して amass enum を実行する"""
    os.makedirs(output_dir, exist_ok=True)
    json_output_path = os.path.join(output_dir, f"{domain}_amass_config.json")
    amass_db_dir = os.path.join(output_dir, "amass_db_config")
    os.makedirs(amass_db_dir, exist_ok=True)

    if not os.path.exists(config_path):
        print(f"エラー: 設定ファイルが見つかりません: {config_path}", file=sys.stderr)
        return None

    command = [
        "amass", "enum",
        "-d", domain,
        "-config", config_path, # 設定ファイルの指定
        "-json", json_output_path,
        "-dir", amass_db_dir
        # 必要に応じて -passive, -active などを追加
    ]
    print(f"実行コマンド: {' '.join(command)}")

    try:
        result = subprocess.run(
            command,
            capture_output=True,
            text=True,
            encoding='utf-8',
            check=True,
            timeout=1800 # 設定によっては時間がかかる可能性 (30分)
        )
        print(f"設定ファイル '{config_path}' を使用した Amass の実行が完了しました。")
        print(f"結果は {json_output_path} に出力されました。")
        if result.stderr:
            print("Amass 標準エラー出力:")
            print(result.stderr)
        return json_output_path

    # ... (エラーハンドリングは前述の例と同様) ...
    except FileNotFoundError:
        print("エラー: 'amass' コマンドが見つかりません。", file=sys.stderr)
        return None
    except subprocess.CalledProcessError as e:
        print(f"エラー: Amass の実行に失敗しました (リターンコード: {e.returncode})", file=sys.stderr)
        print(f"標準エラー出力:\n{e.stderr}", file=sys.stderr)
        return None
    except subprocess.TimeoutExpired:
        print("エラー: Amass の実行がタイムアウトしました。", file=sys.stderr)
        return None
    except Exception as e:
        print(f"予期せぬエラーが発生しました: {e}", file=sys.stderr)
        return None

# 実行例
target_domain = "example.com"
config_file_path = "./config.ini" # 事前に編集した設定ファイルのパス
output_directory = "./amass_results"

# 事前に config.ini が存在するか確認 (簡易的)
if not os.path.exists(config_file_path):
    print(f"警告: 設定ファイル {config_file_path} が見つかりません。デフォルト設定で実行される可能性があります。")
    # 必要であればここで処理を中断するなどの対応
    # exit()

json_file_config = run_amass_with_config(target_domain, config_file_path, output_directory)

if json_file_config:
    # 前述の parse_amass_json 関数でパース可能
    parsed_data_config = parse_amass_json(json_file_config)
    if parsed_data_config:
        print(f"\n設定ファイルを用いた {target_domain} のスキャン結果 (JSONから抜粋):")
        # ... (結果の表示処理) ...
        pass # 結果表示部分は省略

設定ファイルを使うことで、Amass の強力なカスタマイズ機能を最大限に活用できます。特に、多くのデータソースは API キーを必要とするため、本格的な情報収集を行う際には設定ファイルの利用が不可欠です。🔑

Python スクリプトでの応用例 🚀

基本的な Amass の実行と結果取得の方法を学んだところで、より実用的な応用例を見ていきましょう。Python の得意分野である自動化、データ処理、他ツールとの連携能力を活かすことで、Amass をさらに強力なツールに変えることができます。

管理対象のドメインが多数ある場合、一つ一つ手動でコマンドを実行するのは非効率です。Python スクリプトを使えば、ドメインリストを読み込んで順番に Amass スキャンを実行し、結果をまとめて管理することができます。


import time # スリープ用

# ... (前述の run_amass_enum_json, parse_amass_json 関数は定義済みとする) ...

def scan_multiple_domains(domain_list_file, config_path, output_dir="amass_reports", sleep_sec=5):
    """ドメインリストファイルを読み込み、各ドメインに対してAmassスキャンを実行する"""
    all_results = {}
    failed_domains = []

    try:
        with open(domain_list_file, 'r', encoding='utf-8') as f:
            domains = [line.strip() for line in f if line.strip()]
    except FileNotFoundError:
        print(f"エラー: ドメインリストファイルが見つかりません: {domain_list_file}", file=sys.stderr)
        return None, None
    except Exception as e:
        print(f"エラー: ドメインリストファイルの読み込み中にエラー: {e}", file=sys.stderr)
        return None, None

    print(f"{len(domains)} 個のドメインをスキャンします...")

    for domain in domains:
        print(f"\n--- ドメイン: {domain} のスキャンを開始 ---")
        json_file = run_amass_enum_json(domain, config_path=config_path, output_dir=os.path.join(output_dir, domain)) # ドメインごとにサブディレクトリ作成

        if json_file:
            parsed_data = parse_amass_json(json_file)
            all_results[domain] = parsed_data
            print(f"ドメイン: {domain} のスキャン完了。{len(parsed_data)} 件の結果を取得。")
        else:
            print(f"ドメイン: {domain} のスキャンに失敗しました。")
            failed_domains.append(domain)

        print(f"次のスキャンまで {sleep_sec} 秒待機します...")
        time.sleep(sleep_sec) # API制限などを考慮してスリープを入れる

    print("\n--- 全ドメインのスキャンが完了しました ---")
    return all_results, failed_domains

# --- 実行部分 ---
domain_file = "domains.txt"   # スキャン対象ドメインのリスト (1行1ドメイン)
config_file = "config.ini"    # Amass設定ファイル
report_dir = "./amass_multi_scan_reports"

# domains.txt ファイルを作成 (例)
with open(domain_file, "w") as f:
    f.write("example.com\n")
    f.write("example.org\n")
    # f.write("another-domain.net\n") # 必要に応じて追加

# run_amass_enum_json を config_path 受け取れるように修正する必要あり (前述の run_amass_with_config を参考に)
# ここでは仮に関数が修正済みとして呼び出す
# results, failures = scan_multiple_domains(domain_file, config_file, report_dir)

# --- 修正版 run_amass_enum_json (config_path を受け付ける) ---
def run_amass_enum_json_with_config(domain, config_path=None, output_dir="amass_output"):
    os.makedirs(output_dir, exist_ok=True)
    json_output_path = os.path.join(output_dir, f"{domain}_amass.json")
    amass_db_dir = os.path.join(output_dir, "amass_db")
    os.makedirs(amass_db_dir, exist_ok=True)

    command = ["amass", "enum", "-d", domain, "-json", json_output_path, "-dir", amass_db_dir]
    if config_path and os.path.exists(config_path):
        command.extend(["-config", config_path])
        print(f"設定ファイル {config_path} を使用します。")
    else:
        print("設定ファイルなし、または見つからないため、デフォルト設定で実行します。")
        command.append("-passive") # デフォルトはパッシブに

    print(f"実行コマンド: {' '.join(command)}")
    try:
        result = subprocess.run(command, capture_output=True, text=True, encoding='utf-8', check=True, timeout=1800)
        print(f"Amass 実行完了。結果: {json_output_path}")
        if result.stderr: print(f"Amass stderr:\n{result.stderr}")
        return json_output_path
    except FileNotFoundError: print("エラー: 'amass' が見つかりません。", file=sys.stderr); return None
    except subprocess.CalledProcessError as e: print(f"エラー: Amass 失敗 (コード: {e.returncode})\n{e.stderr}", file=sys.stderr); return None
    except subprocess.TimeoutExpired: print("エラー: Amass タイムアウト", file=sys.stderr); return None
    except Exception as e: print(f"予期せぬエラー: {e}", file=sys.stderr); return None

# --- scan_multiple_domains を修正版関数で呼び出す ---
def scan_multiple_domains_v2(domain_list_file, config_path, output_dir="amass_reports", sleep_sec=5):
    # ... (ファイル読み込み部分は同じ) ...
    all_results = {}
    failed_domains = []
    try:
        with open(domain_list_file, 'r', encoding='utf-8') as f:
            domains = [line.strip() for line in f if line.strip()]
    except FileNotFoundError:
        print(f"エラー: ドメインリストファイルが見つかりません: {domain_list_file}", file=sys.stderr)
        return None, None

    print(f"{len(domains)} 個のドメインをスキャンします...")
    for domain in domains:
        print(f"\n--- ドメイン: {domain} のスキャンを開始 ---")
        # 修正版関数を呼び出し
        domain_output_dir = os.path.join(output_dir, domain.replace('.', '_')) # ドメイン名をフォルダ名に
        json_file = run_amass_enum_json_with_config(domain, config_path=config_path, output_dir=domain_output_dir)

        if json_file:
            parsed_data = parse_amass_json(json_file)
            all_results[domain] = parsed_data
            print(f"ドメイン: {domain} スキャン完了。{len(parsed_data)} 件取得。")
        else:
            print(f"ドメイン: {domain} スキャン失敗。")
            failed_domains.append(domain)
        print(f"次のスキャンまで {sleep_sec} 秒待機...")
        time.sleep(sleep_sec)
    print("\n--- 全ドメインのスキャン完了 ---")
    return all_results, failed_domains


# --- 実行 ---
results, failures = scan_multiple_domains_v2(domain_file, config_file, report_dir)

if results:
    print("\n--- スキャン結果サマリー ---")
    for domain, data in results.items():
        print(f"ドメイン: {domain}, 発見数: {len(data)}")
        # 必要ならここで結果をDBに保存したり、レポートを生成したりする
        # 例: 発見されたサブドメイン名をファイルに書き出す
        subdomains_only = [item.get('name') for item in data if item.get('name')]
        with open(os.path.join(report_dir, f"{domain.replace('.', '_')}_subdomains.txt"), 'w', encoding='utf-8') as f_out:
            for sub in subdomains_only:
                f_out.write(sub + '\n')

if failures:
    print("\n--- スキャンに失敗したドメイン ---")
    for domain in failures:
        print(f"- {domain}")

このスクリプトでは、まずドメインリストが書かれたテキストファイル (domains.txt) を読み込みます。そして、リスト内の各ドメインに対して順番に run_amass_enum_json_with_config 関数 (設定ファイル対応版) を呼び出して Amass スキャンを実行します。スキャン結果 (パースされた JSON データ) はドメイン名をキーとする辞書 all_results に格納されます。

ポイントは、time.sleep(sleep_sec) を入れて、連続スキャンによる API レートリミット超過や、ターゲットへの過負荷を防いでいる点です。また、ドメインごとに結果を出力するサブディレクトリを作成し、整理しやすくしています。最後に、成功したドメインと失敗したドメインのリストを表示し、必要に応じて結果をさらに処理 (例: サブドメインリストのファイル出力) します。📂

Amass で発見したサブドメインや IP アドレスは、さらなる調査の出発点となります。例えば、発見したサブドメインに対してポートスキャンを実行したり (Nmap)、Web アプリケーションの脆弱性スキャンを行ったり (Nuclei, Nikto など) することが考えられます。Python スクリプトを使えば、これらのツール連携をスムーズに行えます。


# ... (Amass実行・パース部分は前述のものを利用) ...
import subprocess

def run_nmap_on_subdomains(subdomains, nmap_options=['-T4', '-F']):
    """サブドメインリストに対してNmapスキャンを実行する"""
    print(f"\n--- {len(subdomains)} 個のサブドメインに対してNmapスキャンを開始 ---")
    nmap_results = {}

    for sub in subdomains:
        command = ['nmap'] + nmap_options + [sub]
        print(f"実行コマンド: {' '.join(command)}")
        try:
            result = subprocess.run(
                command,
                capture_output=True,
                text=True,
                encoding='utf-8',
                check=True,
                timeout=300 # Nmapのスキャンタイムアウト
            )
            print(f"Nmapスキャン成功: {sub}")
            nmap_results[sub] = result.stdout
            # print(result.stdout) # 結果をコンソールに出力する場合はコメント解除

        except FileNotFoundError:
            print("エラー: 'nmap' コマンドが見つかりません。", file=sys.stderr)
            # Nmapがない場合は以降のスキャンを中断するか、スキップするか選択
            break
        except subprocess.CalledProcessError as e:
            print(f"エラー: Nmapスキャン失敗 ({sub}): {e.returncode}", file=sys.stderr)
            # print(f"stderr:\n{e.stderr}", file=sys.stderr) # エラー詳細
            nmap_results[sub] = f"Scan failed with code {e.returncode}"
        except subprocess.TimeoutExpired:
            print(f"エラー: Nmapスキャンタイムアウト ({sub})", file=sys.stderr)
            nmap_results[sub] = "Scan timed out"
        except Exception as e:
            print(f"予期せぬエラー ({sub}): {e}", file=sys.stderr)
            nmap_results[sub] = f"Unexpected error: {e}"

        time.sleep(1) # 連続実行を避けるための短いスリープ

    print("--- Nmapスキャン完了 ---")
    return nmap_results

# --- 実行部分 (前の例の続きを想定) ---
# results, failures = scan_multiple_domains_v2(...)
# if results and 'example.com' in results:
#     example_com_data = results['example.com']
#     subdomains_to_scan = [item.get('name') for item in example_com_data if item.get('name')]
#
#     if subdomains_to_scan:
#         # 上位10件に絞るなど、対象を限定することも検討
#         # target_subdomains = subdomains_to_scan[:10]
#         target_subdomains = subdomains_to_scan
#
#         # Nmap で Fast Scan (-F: よく使われる100ポート) を実行
#         nmap_output = run_nmap_on_subdomains(target_subdomains, nmap_options=['-T4', '-F'])
#
#         # 結果の表示や保存
#         print("\n--- Nmap スキャン結果 ---")
#         for sub, output in nmap_output.items():
#             print(f"--- {sub} ---")
#             if "Scan failed" not in output and "timed out" not in output and "Unexpected error" not in output:
#                 # 簡単なオープンポート抽出 (Nmap出力形式に依存)
#                 open_ports = []
#                 for line in output.splitlines():
#                     if '/tcp' in line and 'open' in line:
#                         parts = line.split()
#                         if len(parts) >= 3:
#                             open_ports.append(f"{parts[0]} ({parts[2]})") # 例: "80/tcp (http)"
#                 if open_ports:
#                     print(f"オープンポート: {', '.join(open_ports)}")
#                 else:
#                     print("オープンポートは見つかりませんでした (Fast Scan)。")
#             else:
#                 print(f"スキャン結果: {output}")
#             print("-" * (len(sub) + 8))


この例では、Amass で取得したサブドメインのリスト (subdomains_to_scan) を run_nmap_on_subdomains 関数に渡しています。この関数は、各サブドメインに対して nmap コマンドを subprocess.run で実行し、結果(標準出力)を辞書に格納して返します。

同様のアプローチで、Nuclei (nuclei -u subdomain -o result.txt) や他のコマンドラインツールを呼び出すことができます。これにより、Amass を起点とした一連のセキュリティチェックを自動化するパイプラインを構築できます。🔗

Amass の -json 出力は詳細ですが、そのままでは情報が多すぎることがあります。Python を使えば、必要な情報だけを抽出したり、統計情報を計算したり、独自の形式でレポートを生成したりすることが容易になります。


import pandas as pd # データ集計・整形に Pandas を利用 (要インストール: pip install pandas)
from datetime import datetime
import os

# ... (Amass実行・パース部分は前述のものを利用) ...

def generate_report_from_amass_data(all_results, output_dir="amass_reports"):
    """Amassの複数ドメイン結果からレポート(CSV, サマリー)を生成する"""
    os.makedirs(output_dir, exist_ok=True)
    all_data_list = []
    summary = {}

    print("\n--- レポート生成を開始 ---")

    for domain, data in all_results.items():
        domain_subdomains = 0
        domain_ips = set()
        sources = {}

        if not data:
            summary[domain] = {"subdomains": 0, "unique_ips": 0, "top_source": "N/A"}
            continue

        for item in data:
            # name があるものをサブドメインとしてカウント
            if item.get('name'):
                domain_subdomains += 1
                all_data_list.append({
                    'domain': item.get('domain', domain),
                    'subdomain': item.get('name'),
                    'ip_addresses': ", ".join([addr.get('ip') for addr in item.get('addresses', []) if addr.get('ip')]),
                    'source': item.get('source', 'N/A'),
                    'tag': item.get('tag', 'N/A')
                })

            # IPアドレスを収集
            for addr in item.get('addresses', []):
                if addr.get('ip'):
                    domain_ips.add(addr.get('ip'))

            # 発見元ソースを集計
            source = item.get('source', 'Unknown')
            sources[source] = sources.get(source, 0) + 1

        # サマリー情報
        top_source = max(sources, key=sources.get) if sources else "N/A"
        summary[domain] = {
            "subdomains": domain_subdomains,
            "unique_ips": len(domain_ips),
            "top_source": f"{top_source} ({sources.get(top_source, 0)} 件)"
        }
        print(f"ドメイン '{domain}' のデータを処理完了。")

    # --- 全結果をまとめたCSVファイルの生成 ---
    if all_data_list:
        try:
            df = pd.DataFrame(all_data_list)
            csv_filename = os.path.join(output_dir, f"amass_all_results_{datetime.now():%Y%m%d_%H%M%S}.csv")
            df.to_csv(csv_filename, index=False, encoding='utf-8-sig') # 日本語環境用に utf-8-sig
            print(f"全結果をCSVファイルに出力しました: {csv_filename}")
        except ImportError:
            print("警告: Pandas がインストールされていません。CSVレポートは生成されませんでした。")
            print("インストールするには 'pip install pandas' を実行してください。")
        except Exception as e:
            print(f"エラー: CSVファイル生成中にエラーが発生しました: {e}", file=sys.stderr)

    # --- サマリーレポートの生成 ---
    summary_filename = os.path.join(output_dir, f"amass_summary_{datetime.now():%Y%m%d_%H%M%S}.txt")
    with open(summary_filename, 'w', encoding='utf-8') as f:
        f.write(f"OWASP Amass スキャンサマリー ({datetime.now():%Y-%m-%d %H:%M:%S})\n")
        f.write("=" * 40 + "\n")
        for domain, stats in summary.items():
            f.write(f"ドメイン: {domain}\n")
            f.write(f"  発見サブドメイン数: {stats['subdomains']}\n")
            f.write(f"  ユニークIPアドレス数: {stats['unique_ips']}\n")
            f.write(f"  最も多い発見元ソース: {stats['top_source']}\n")
            f.write("-" * 20 + "\n")
    print(f"サマリーレポートを出力しました: {summary_filename}")

    print("--- レポート生成を完了 ---")


# --- 実行部分 (前の例の続きを想定) ---
# results, failures = scan_multiple_domains_v2(...)
# if results:
#    generate_report_from_amass_data(results, report_dir)

この例では、generate_report_from_amass_data 関数が、複数のドメインに対する Amass のスキャン結果 (all_results 辞書) を受け取ります。

  1. まず、すべての結果をループし、各アイテムから必要な情報(ドメイン、サブドメイン、IPアドレス、発見元ソース、タグ)を抽出してリスト all_data_list に追加します。
  2. 同時に、ドメインごとのサブドメイン数、ユニークなIPアドレス数、発見元ソースの頻度などを集計し、summary 辞書に格納します。
  3. Pandas ライブラリ (事前に pip install pandas でインストールが必要) を使用して、all_data_list から DataFrame を作成し、全詳細結果を単一の CSV ファイルに出力します。Pandas を使うと、データの整形やフィルタリング、さらなる分析が容易になります。
  4. 最後に、summary 辞書の内容を整形し、人間が読みやすい形式のテキストファイルとしてサマリーレポートを出力します。

このように Python を使うことで、Amass の生データをビジネスや運用のニーズに合わせた形に加工し、価値ある情報に変換することができます。📈

注意点とベストプラクティス ⚠️

Python から Amass を利用する際には、いくつかの注意点と推奨される実践方法があります。これらを考慮することで、より安定し、効率的で、責任ある運用が可能になります。

  • Amass の実行時間とリソース消費:
    • Amass、特に -active-brute オプションを使用する場合、実行にかなりの時間がかかることがあります (数分〜数時間以上)。また、CPU やメモリ、ネットワーク帯域も相応に消費します。
    • Python スクリプトから実行する際は、subprocess.runtimeout パラメータを適切に設定し、予期せず長時間実行が続くことを防ぎましょう。
    • 大規模なスキャンを実行する際は、実行環境のリソース(CPUコア数、メモリ容量)を考慮し、必要であればより性能の高いマシンで実行するか、スキャン対象を分割するなどの工夫が必要です。
  • API 制限とレートリミット:
    • Amass は多くの外部データソース (VirusTotal, SecurityTrails など) の API を利用します。これらの API の多くは、無料プランでは利用回数や頻度に制限 (レートリミット) があります。
    • 短時間に大量のスキャンを実行すると、これらのレートリミットに達し、情報収集が不完全になったり、一時的に API が利用できなくなったりする可能性があります。
    • Amass の設定ファイル (config.ini) で各データソースの API キーを設定し、必要であれば有料プランの利用を検討してください。
    • Python スクリプト側でも、複数のドメインを連続してスキャンする際には time.sleep() を入れて適切な間隔を空けるなど、レートリミットを意識した設計が重要です。
  • エラー処理の重要性:
    • 外部コマンドの実行は、様々な理由で失敗する可能性があります(コマンドが見つからない、権限がない、設定ファイルが間違っている、ネットワークエラー、対象サーバーからの応答がない、タイムアウトなど)。
    • subprocess.run を使う際は、check=True を設定して異常終了時に例外を発生させるか、リターンコード (result.returncode) を必ず確認しましょう。
    • 標準エラー出力 (result.stderr) には、エラーの原因に関する重要な情報が含まれていることが多いので、ログに記録するなどして確認できるようにしておくことが推奨されます。
    • try...except ブロックを使って、想定されるエラー(FileNotFoundError, CalledProcessError, TimeoutExpired など)を適切に捕捉し、スクリプトが予期せず停止しないように、また、問題発生時に原因を特定しやすくするようにしましょう。
  • 法的・倫理的な考慮事項:
    • Amass は強力な情報収集ツールですが、その使用は必ず許可された対象に対してのみ行ってください。
    • 自組織が所有・管理していないドメインやネットワークに対して、許可なく詳細なスキャン (特に -active-brute を伴うもの) を実行することは、法律や利用規約に違反し、攻撃行為とみなされる可能性があります。
    • スキャンを実行する前に、対象システムの利用規約や、所属組織のセキュリティポリシー、関連法規などを確認し、常に責任ある行動を心がけてください。🙏
    • パッシブスキャン (-passive オプションやデフォルトの挙動) は、公開情報を収集するだけなので、一般的には問題になることは少ないですが、それでも対象への影響をゼロにする保証はありません。
  • 設定ファイルの管理:
    • API キーなどの機密情報を含む可能性がある config.ini ファイルは、適切にアクセス権を設定し、バージョン管理システム (Gitなど) に誤ってコミットしないように注意が必要です (.gitignore に追加するなど)。
    • 環境変数やシークレット管理ツール (例: HashiCorp Vault, AWS Secrets Manager) を使って API キーを管理し、設定ファイルには直接書き込まない方法も検討する価値があります (Amass 自体が環境変数からの読み込みをサポートしているか確認が必要です)。
  • Amass のバージョンアップ:
    • Amass は活発に開発されており、機能追加やバグ修正、データソースの更新などが頻繁に行われます。定期的に最新バージョンにアップデートすることで、ツールの性能や精度を維持することができます。
    • ただし、バージョンアップによってコマンドのオプションや出力形式が変更される可能性もあるため、Python スクリプトの互換性に影響がないか、アップデート後にテストすることが重要です。

これらの点に留意し、計画的に Amass を活用することで、Python と組み合わせた自動化・効率化のメリットを最大限に引き出すことができます。👍

まとめ 🏁

この記事では、OWASP Amass と Python の連携に焦点を当て、その活用方法を解説してきました。重要なポイントを再確認しましょう。

  • OWASP Amass は、アタックサーフェス管理のための強力なオープンソースツールであり、サブドメイン列挙や関連情報の収集に優れています。
  • amass-common (および関連パッケージ) は Amass 内部で使われる Go 言語のパッケージであり、設定管理、イベント処理、データ管理などのコア機能を提供します。Python から直接インポートして使うものではありません。
  • Python から Amass を利用する主な方法は、`subprocess` モジュールを使って Amass コマンドを外部プロセスとして実行し、その結果 (標準出力や -json オプションで出力されたファイル) を Python で処理することです。
  • -json オプションを使うと、構造化された詳細な情報を得られ、Python でのパースや処理が容易になります。
  • -config オプションで設定ファイルを指定することで、API キーの設定やスキャン挙動のカスタマイズが可能です。
  • Python スクリプトを活用することで、複数ドメインの自動スキャン、Nmap や Nuclei など他のツールとの連携、結果のフィルタリングやレポート生成など、高度な自動化ワークフローを構築できます。
  • Amass の利用にあたっては、実行時間、リソース消費、API 制限、エラーハンドリング、そして法的・倫理的な側面に十分注意する必要があります。

Amass は非常に高機能なツールであり、その全てをここで網羅できたわけではありません。しかし、Python と連携させる基本的な方法と応用例を理解することで、皆さんのセキュリティ運用や調査活動を大きく効率化・高度化させるための一歩を踏み出せたはずです。

ぜひ、ここで紹介したコード例を参考に、ご自身の環境や目的に合わせてカスタマイズし、Amass と Python の強力な組み合わせを試してみてください。アタックサーフェスの継続的な監視と管理を通じて、より安全な IT 環境の実現を目指しましょう!🛡️🌐

Happy Hacking (ethically, of course)! 😉

コメント

タイトルとURLをコピーしました