Pythonでネットワークパケット解析!pyshark徹底解説 🕵️‍♀️

Python

はじめに:pysharkとは?

pysharkは、ネットワークプロトコルアナライザであるWiresharkのコマンドラインユーティリティTSharkのPythonラッパーです。PythonからWiresharkの強力なパケット解析機能(ディセクタ)を利用することを可能にします。

Pythonにはいくつかのパケット解析モジュールが存在しますが、pysharkの特徴は、自身でパケットを解析するのではなく、TSharkが生成するXMLやJSON出力を利用してパケット情報を提供するところにあります。これにより、Wireshark/TSharkに実装されている全てのディセクタ(プロトコル解析器)を利用できるという大きな利点があります。Windows、Linux、macOSなど、主要なOSで動作が確認されています。

pysharkを使えば、以下のようなことがPythonスクリプトで実現可能になります。

  • ライブキャプチャ:ネットワークインターフェースからリアルタイムでパケットをキャプチャし、分析する。
  • ファイルキャプチャ:既存のキャプチャファイル(.pcap, .pcapngなど)を読み込み、詳細に分析する。
  • フィルタリング:BPF(Berkeley Packet Filter)構文やWiresharkの表示フィルタ構文を使って、目的のパケットのみを効率的に処理する。
  • 自動化:特定のパケットを検出したらアラートを送信したり、定期的なトラフィックレポートを生成したりするスクリプトを作成する。
  • 統計分析:キャプチャしたデータから統計情報を抽出し、ネットワークの傾向を分析する。

ネットワークエンジニアがトラブルシューティングに利用したり、セキュリティエンジニアが不正な通信を検出したりと、様々な用途で活用できます。アイデア次第で、その可能性は無限大です!✨

インストール 🔧

pysharkを使用するには、まずPythonとTShark(Wireshark)がインストールされている必要があります。pysharkはPython 3.7以上をサポートしています。

1. TShark (Wireshark) のインストール

pysharkは内部でTSharkコマンドを使用するため、Wireshark(TSharkが含まれています)を先にインストールする必要があります。

  • Windows: Wiresharkの公式サイトからインストーラーをダウンロードし、インストールします。インストール時に「TShark」コンポーネントが選択されていることを確認してください。通常、インストール後、TSharkのパスは自動的に認識されますが、環境によっては手動でパスを指定する必要がある場合があります(例: C:\Program Files\Wireshark\tshark.exe)。
  • Linux (Ubuntu/Debian):
    sudo apt update
    sudo apt install tshark
    または、Wireshark全体をインストールする場合は:
    sudo apt install wireshark
    インストール中に、一般ユーザーがキャプチャを実行できるようにするかどうか尋ねられる場合があります。必要に応じて許可してください。
  • macOS: Wiresharkの公式サイトからインストーラーをダウンロードしてインストールするか、Homebrewを使用できます。
    brew install wireshark
    macOSでは、libxmlのインストールが必要になる場合があります。エラーが発生した場合は、以下のコマンドを試してください。
    xcode-select --install
    pip install libxml
注意: TSharkがシステムのPATHに含まれていない場合、pysharkはTSharkを見つけられずエラー(TSharkNotFoundException)になります。その場合は、スクリプト内でTSharkの実行可能ファイルのパスを明示的に指定する必要があります(後述)。

2. pyshark のインストール

pipを使って簡単にインストールできます。

pip install pyshark

最新の開発版を試したい場合は、GitHubリポジトリから直接インストールすることも可能です。

git clone https://github.com/KimiNewt/pyshark.git
cd pyshark/src
python setup.py install

TSharkのパス指定

TSharkが標準的な場所にインストールされていない、またはPATHが通っていない場合、pysharkのCaptureオブジェクトを初期化する際にtshark_pathパラメータでTShark実行ファイルのフルパスを指定します。

import pyshark

# 例: Windowsの場合
tshark_custom_path = r'C:\Program Files\Wireshark\tshark.exe'
# 例: Linux/macOSの場合 (もし /usr/local/bin にあれば)
# tshark_custom_path = '/usr/local/bin/tshark'

# FileCaptureの場合
cap = pyshark.FileCapture('path/to/your/capture.pcap', tshark_path=tshark_custom_path)

# LiveCaptureの場合
capture = pyshark.LiveCapture(interface='eth0', tshark_path=tshark_custom_path)

または、pysharkが読み込む設定ファイル (config.ini) にパスを記述することも可能です。このファイルは、スクリプト実行時のカレントディレクトリ、またはpysharkライブラリのインストールディレクトリ内に置くことができます。

[tshark]
tshark_path = C:\Program Files\Wireshark\tshark.exe

[dumpcap]
dumpcap_path = C:\Program Files\Wireshark\dumpcap.exe

基本的な使い方 📖

pysharkには主に2つのキャプチャ方法があります:ライブキャプチャとファイルキャプチャです。

機能ライブキャプチャ (`LiveCapture`)ファイルキャプチャ (`FileCapture`)
目的ネットワークインターフェースからリアルタイムでパケットを取得既存のキャプチャファイル(.pcapなど)からパケットを読み込む
主なクラスpyshark.LiveCapturepyshark.FileCapture
パケット取得方法sniff(), sniff_continuously(), apply_on_packets()イテレータとして直接利用、またはインデックスアクセス
主な引数interface, bpf_filter, display_filter, tshark_path などinput_file (必須), display_filter, decode_as, tshark_path など

ファイルキャプチャ (`FileCapture`)

保存済みのキャプチャファイル(例:mycapture.pcap)を読み込んで、含まれるパケットを分析します。

import pyshark

# キャプチャファイルを開く
try:
    cap = pyshark.FileCapture('path/to/your/mycapture.pcap')
    print(f"キャプチャファイルを読み込みました: {cap.input_filename}")

    # パケットを一つずつ処理する (イテレータとして)
    packet_count = 0
    for packet in cap:
        packet_count += 1
        print(f"--- パケット {packet_count} ({packet.length} bytes) ---")
        print(f"Timestamp: {packet.sniff_time}")

        # レイヤー情報を表示 (例: Ethernet, IP, TCP)
        if 'ETH' in packet:
            print(f"  Ethernet: Src={packet.eth.src}, Dst={packet.eth.dst}")
        if 'IP' in packet:
            print(f"  IP: Src={packet.ip.src}, Dst={packet.ip.dst}, Proto={packet.ip.proto}")
        if 'TCP' in packet:
            print(f"  TCP: SrcPort={packet.tcp.srcport}, DstPort={packet.tcp.dstport}, Flags={packet.tcp.flags}")
        elif 'UDP' in packet:
            print(f"  UDP: SrcPort={packet.udp.srcport}, DstPort={packet.udp.dstport}")

        # 最初の5パケットだけ表示して終了
        if packet_count >= 5:
            break

    # 特定のインデックスのパケットにアクセス (注意: ファイルが大きい場合はメモリを消費する可能性あり)
    # print("\n--- 最初のパケットの詳細 ---")
    # if len(cap) > 0:
    #    print(cap[0])

except FileNotFoundError:
    print("エラー: 指定されたキャプチャファイルが見つかりません。")
except Exception as e:
    print(f"エラーが発生しました: {e}")
finally:
    if 'cap' in locals() and cap:
        cap.close() # ファイルハンドルを解放

FileCaptureオブジェクトはイテレータとして機能するため、forループで簡単にパケットを順次処理できます。また、リストのようにインデックス(例:cap[0])で特定のパケットにアクセスすることも可能ですが、ファイルが大きい場合に全パケットをメモリに読み込もうとする可能性があるため注意が必要です。

フィルタリングも可能です。display_filter引数を使ってWiresharkの表示フィルタと同じ構文でフィルタを適用できます。

# HTTPトラフィックのみをフィルタリングして読み込む
http_cap = pyshark.FileCapture('path/to/your/mycapture.pcap', display_filter='http')

for packet in http_cap:
    print(f"HTTP Packet: {packet.highest_layer} - Time: {packet.sniff_time}")
    # HTTPレイヤーの詳細にアクセス
    if 'HTTP' in packet:
        print(f"  Request Method: {getattr(packet.http, 'request_method', 'N/A')}")
        print(f"  Request URI: {getattr(packet.http, 'request_uri', 'N/A')}")
        print(f"  Host: {getattr(packet.http, 'host', 'N/A')}")

http_cap.close()

ライブキャプチャ (`LiveCapture`)

指定したネットワークインターフェース(例:eth0Wi-Fi)からリアルタイムでパケットをキャプチャします。

ライブキャプチャにはいくつかの方法があります。

  1. sniff(timeout=...) または sniff(packet_count=...): 指定した時間またはパケット数だけキャプチャし、その後でまとめて処理します。
  2. sniff_continuously(packet_count=...): パケットが到着するたびに処理するジェネレータを返します。指定したパケット数に達するか、中断されるまでキャプチャを続けます。
  3. apply_on_packets(callback, timeout=...): パケットが到着するたびに指定したコールバック関数を実行します。

例1: sniff() を使う (指定時間)

import pyshark
import time

# 'eth0' インターフェースから10秒間キャプチャ (インターフェース名は環境に合わせて変更)
# Windowsの場合: 'イーサネット', 'Wi-Fi' など get_if_list() で確認可能
# Linux/macOSの場合: 'eth0', 'en0', 'wlan0' など
try:
    interface_name = 'eth0' # ここを実際のインターフェース名に変更
    print(f"インターフェース '{interface_name}' でキャプチャを開始します (10秒間)...")
    capture = pyshark.LiveCapture(interface=interface_name)
    capture.sniff(timeout=10) # 10秒間キャプチャ

    print(f"\nキャプチャ完了。合計 {len(capture)} パケットを取得しました。")

    # 取得したパケットを処理
    for i, packet in enumerate(capture):
        print(f"--- Packet {i+1} ---")
        print(packet) # パケットの概要を表示
        if i >= 4: # 最初の5パケットだけ表示
             break

except Exception as e:
    print(f"エラーが発生しました: {e}")
finally:
    if 'capture' in locals() and capture and capture.eventloop.is_running():
        capture.close()

例2: sniff_continuously() を使う (ジェネレータ)

import pyshark

try:
    interface_name = 'eth0' # ここを実際のインターフェース名に変更
    print(f"インターフェース '{interface_name}' で継続的にキャプチャを開始します (最初の5パケットを取得)...")
    # BPFフィルタでTCPポート80番のみをキャプチャ
    capture = pyshark.LiveCapture(interface=interface_name, bpf_filter='tcp port 80')

    # packet_countを指定しない場合は Ctrl+C で停止するまで継続
    packet_iterator = capture.sniff_continuously(packet_count=5)

    for packet in packet_iterator:
        print(f"到着したパケット: {packet.sniff_timestamp}")
        if 'IP' in packet and 'TCP' in packet:
             print(f"  {packet.ip.src}:{packet.tcp.srcport} -> {packet.ip.dst}:{packet.tcp.dstport}")
        else:
             print(packet.highest_layer)

    print("\n指定パケット数をキャプチャしました。")

except Exception as e:
    print(f"エラーが発生しました: {e}")
finally:
    # sniff_continuously は内部で close を呼ぶことが多いが、念のため
    if 'capture' in locals() and capture and capture.eventloop.is_running():
        capture.close()

bpf_filter引数を使用すると、カーネルレベルで効率的にパケットをフィルタリングできます(BPF構文)。これは、display_filter(Wireshark構文、TShark側で処理)よりもパフォーマンスが良い場合があります。

例3: apply_on_packets() を使う (コールバック)

import pyshark
import time

# パケット到着時に呼び出されるコールバック関数
def print_packet_summary(packet):
    timestamp = packet.sniff_time
    summary = f"[{timestamp}] "
    if 'IP' in packet:
        summary += f"IP {packet.ip.src} -> {packet.ip.dst} "
        if 'TCP' in packet:
            summary += f"TCP {packet.tcp.srcport} -> {packet.tcp.dstport}"
        elif 'UDP' in packet:
            summary += f"UDP {packet.udp.srcport} -> {packet.udp.dstport}"
        elif 'ICMP' in packet:
            summary += f"ICMP Type={packet.icmp.type}"
        else:
             summary += f"Proto={packet.ip.proto}"
    elif 'ARP' in packet:
        summary += f"ARP Who has {packet.arp.dst_proto_ipv4}? Tell {packet.arp.src_proto_ipv4}"
    else:
        summary += f"Other ({packet.highest_layer})"
    print(summary)

try:
    interface_name = 'eth0' # ここを実際のインターフェース名に変更
    print(f"インターフェース '{interface_name}' でキャプチャを開始し、コールバックを呼び出します (15秒間)...")
    capture = pyshark.LiveCapture(interface=interface_name)

    # 15秒間、パケットが来るたびに print_packet_summary を実行
    capture.apply_on_packets(print_packet_summary, timeout=15)

    print("\nキャプチャ時間終了。")

except Exception as e:
    print(f"エラーが発生しました: {e}")
finally:
    if 'capture' in locals() and capture and capture.eventloop.is_running():
        capture.close()

インターフェース名の確認方法:

  • Windows: コマンドプロンプトで `tshark -D` または `getmac /v` を実行。
  • Linux: ターミナルで `tshark -D` または `ip addr` を実行。
  • macOS: ターミナルで `tshark -D` または `ifconfig` を実行。

pyshark自体にはインターフェース一覧を取得する直接的な関数は用意されていないようですが、tshark -Dコマンドの結果をサブプロセスで取得するなどで対応できます。

パケットの解析 (ディセクション) 🔬

pysharkで取得したパケットオブジェクトは、複数のレイヤー(例:Ethernet, IP, TCP, UDP, HTTP, DNS)の情報を持っています。各レイヤーには、そのプロトコル固有のフィールドが含まれています。

パケット内のレイヤーやフィールドにアクセスするには、ドット記法または辞書形式のアクセスを使用します。レイヤー名は大文字小文字を区別しません(内部で小文字に変換されます)。

import pyshark

cap = pyshark.FileCapture('path/to/your/capture.pcap', display_filter='tcp or dns')

for packet in cap:
    print(f"\n--- Packet {packet.number} ---")

    # レイヤーの存在確認
    if 'IP' in packet:
        ip_layer = packet.ip # ドット記法でIPレイヤーを取得
        print(f"  Source IP: {ip_layer.src}")
        print(f"  Destination IP: {ip_layer.dst}")
        print(f"  TTL: {ip_layer.ttl}")
        # 辞書形式でのアクセスも可能
        # print(f"  Source IP (dict): {packet['ip'].src}")

    if 'TCP' in packet:
        tcp_layer = packet.tcp
        print(f"  Source Port: {tcp_layer.srcport}")
        print(f"  Destination Port: {tcp_layer.dstport}")
        print(f"  Sequence Number: {tcp_layer.seq}")
        print(f"  Window Size: {tcp_layer.window_size_value}")
        print(f"  Flags: ACK={tcp_layer.flags_ack}, SYN={tcp_layer.flags_syn}, FIN={tcp_layer.flags_fin}")
        # ペイロード (データ部分) があれば表示
        if hasattr(tcp_layer, 'payload'):
             # payload は通常、コロン区切りの16進数文字列
             print(f"  TCP Payload (hex): {tcp_layer.payload[:60]}...") # 長い場合は一部表示

    if 'UDP' in packet:
        udp_layer = packet.udp
        print(f"  Source Port: {udp_layer.srcport}")
        print(f"  Destination Port: {udp_layer.dstport}")
        print(f"  Length: {udp_layer.length}")

    if 'DNS' in packet:
        dns_layer = packet.dns
        print("  --- DNS Info ---")
        if hasattr(dns_layer, 'qry_name'):
             print(f"    Query Name: {dns_layer.qry_name}")
             print(f"    Query Type: {dns_layer.qry_type}") # AAAA (IPv6), A (IPv4), CNAME, etc.
        if hasattr(dns_layer, 'resp_name'):
             print(f"    Response Name: {dns_layer.resp_name}")
             print(f"    Response Addr: {getattr(dns_layer, 'a', 'N/A')}") # IPv4アドレス
             print(f"    Response Addr (IPv6): {getattr(dns_layer, 'aaaa', 'N/A')}") # IPv6アドレス

    # 利用可能なレイヤー名を確認
    # print(f"  Available Layers: {packet.layers}")
    # 特定レイヤーのフィールド名を確認
    # if 'IP' in packet:
    #    print(f"  IP Fields: {dir(packet.ip)}")

cap.close()

注意点:

  • フィールド名はTShark/Wiresharkの表示フィルタで使用されるものとほぼ同じですが、ハイフン(-)はアンダースコア(_)に置き換えられます(例:frame-numberframe_number)。
  • 特定のフィールドが存在しない場合(例えば、オプションフィールドなど)にアクセスしようとするとAttributeErrorが発生する可能性があります。hasattr(layer, 'field_name')で存在を確認するか、getattr(layer, 'field_name', default_value)を使用すると安全です。
  • フィールドの値は文字列として返されることが多いです。数値として扱いたい場合は、int()float()で変換する必要があります。16進数のフィールド(例:checksum)には.hex_value属性で整数値を取得できる場合があります。
  • ペイロードデータは通常、コロンで区切られた16進数文字列(例: '1a:2b:3c:...')で表現されます。バイナリデータとして扱いたい場合は、bytes.fromhex(payload.replace(':', ''))のように変換が必要です。codecsライブラリなどを使ってエンコーディングを扱うこともできます。
  • packet.highest_layer属性で、そのパケットで最も上位(アプリケーション層に近い)のプロトコル名を取得できます。
  • packet.layers属性で、そのパケットに含まれる全レイヤーオブジェクトのリストを取得できます。同じプロトコルが複数存在する場合(例: トンネリング)もリストに含まれます。
  • dir(packet.layer_name)を実行すると、そのレイヤーで利用可能なフィールド(属性)の一覧を確認できます。

高度な機能 ✨

キャプチャフィルタと表示フィルタ

pysharkでは、2種類のフィルタを利用できます。

フィルタ種類引数名構文処理タイミング特徴
キャプチャフィルタbpf_filterBPF (Berkeley Packet Filter)パケットキャプチャ時 (OSカーネル/ドライバレベル)
  • パフォーマンスが高い (不要なパケットは早期に破棄)
  • フィルタ可能な情報が限られる (主にヘッダ情報)
  • tcpdump と同じ構文
'tcp port 80', 'host 192.168.1.1', 'udp and port 53'
表示フィルタdisplay_filterWireshark 表示フィルタパケット解析時 (TShark/pyshark レベル)
  • 非常に柔軟で強力なフィルタリングが可能
  • ディセクタが解析した全てのフィールドを利用可能
  • キャプチャ後に適用されるため、キャプチャ自体の負荷は減らない
'http.request.method == "GET"', 'ip.addr == 10.0.0.5', 'dns.qry.name contains "example"'

パフォーマンスが重要なライブキャプチャでは、まずbpf_filterで大まかに絞り込み、さらに詳細な条件で絞り込みたい場合にdisplay_filterを併用するのが効果的です。ファイルキャプチャでは主にdisplay_filterが使われます。

# ライブキャプチャで両方のフィルタを使用する例
# 192.168.1.100 との間で、TCPポート443 (HTTPS) の通信のみをキャプチャし、
# さらに TLS Client Hello メッセージのみを処理する
capture = pyshark.LiveCapture(
    interface='eth0',
    bpf_filter='host 192.168.1.100 and tcp port 443',
    display_filter='tls.handshake.type == 1' # 1はClient Helloを示す
)

for packet in capture.sniff_continuously():
    print(f"TLS Client Hello from {packet.ip.src} to {packet.ip.dst}")
    # TLSレイヤーの詳細にアクセス
    if 'TLS' in packet:
         print(f"  TLS Version: {packet.tls.record_version}")
         # SNI (Server Name Indication) を取得
         sni = getattr(packet.tls, 'handshake_extensions_server_name', 'N/A')
         print(f"  Server Name Indication (SNI): {sni}")

プロトコルのデコード指定 (`decode_as`)

標準的でないポートで特定のプロトコルが使われている場合など、TSharkが自動でプロトコルを認識できない場合に、強制的にデコードする方法を指定できます。decode_as引数に辞書形式で指定します。

# TCPポート8888番のトラフィックをHTTPとしてデコードする
cap = pyshark.FileCapture(
    'path/to/capture_with_custom_port.pcap',
    decode_as={'tcp.port==8888': 'http'}
)

for packet in cap:
    if 'HTTP' in packet:
        print(f"Detected HTTP on port 8888: {packet.ip.src} -> {packet.ip.dst}")
        # HTTPレイヤーの処理...
    elif 'TCP' in packet and (packet.tcp.srcport == '8888' or packet.tcp.dstport == '8888'):
        print(f"TCP on port 8888 (not decoded as HTTP): {packet.ip.src} -> {packet.ip.dst}")

cap.close()

暗号化されたトラフィックの復号

WEPやWPA/WPA2-PSKで暗号化されたWi-Fiトラフィックを復号するためのキーを指定できます(ただし、TLS/SSLのような上位レイヤーの暗号化は別途設定が必要です)。encryption_typedecryption_key引数を使用します。

# WPA-PSKで暗号化されたWi-Fiキャプチャファイルを復号
try:
    wifi_cap = pyshark.FileCapture(
        'path/to/encrypted_wifi.pcap',
        decryption_key='your_wifi_password',
        encryption_type='wpa-pwd' # または 'wpa-psk', 'wep'
    )
    print("WiFiキャプチャを復号中...")
    for packet in wifi_cap:
        # 復号されたIPパケットなどを処理
        if 'IP' in packet:
            print(f"Decrypted IP Packet: {packet.ip.src} -> {packet.ip.dst}")

    wifi_cap.close()
except Exception as e:
    print(f"復号エラーまたはファイルエラー: {e}")

TLS/SSLの復号には、SSLキーログファイル(環境変数 `SSLKEYLOGFILE` で指定されたファイル)のパスを `override_prefs` パラメータで指定する方法があります。

ssl_keylog_file = '/path/to/your/sslkeylog.log'
tls_cap = pyshark.FileCapture(
    'path/to/tls_capture.pcap',
    display_filter='tls',
    override_prefs={'ssl.keylog_file': ssl_keylog_file}
)
# ... (復号されたHTTP/2などの中身を解析) ...
tls_cap.close()

その他のオプション

  • only_summaries=True: パケットの概要情報のみを取得します。より高速ですが、詳細なフィールドにはアクセスできません。
  • keep_packets=False: FileCaptureで、処理済みのパケットをメモリに保持しないようにします。巨大なファイルを処理する際のメモリ消費を抑えます。ただし、インデックスアクセス(cap[i])やlen(cap)は使えなくなります。
  • use_json=True (または use_ek=True): TSharkの出力をXMLではなくJSON(Elasticsearch形式)で受け取ります。より高速になる場合がありますが、若干情報量が少ない可能性があります。
  • override_prefs={...}: TSharkの動作設定(Preferences)を上書きします。例: {'protocols.dns.desegment_dns_messages': 'TRUE'}
  • custom_parameters={...} または [...]: TSharkコマンドに任意のパラメータを追加します。
  • disable_protocol='protocol_name': 特定のプロトコルディセクタを無効にします。

実践的な例 💡

これまでの知識を活かして、いくつかの具体的なユースケースを見てみましょう。

例1: HTTPリクエスト/レスポンスの抽出

import pyshark

def analyze_http_traffic(pcap_file):
    try:
        cap = pyshark.FileCapture(pcap_file, display_filter='http')
        print(f"Analyzing HTTP traffic in {pcap_file}...")

        for packet in cap:
            if 'HTTP' in packet:
                http_layer = packet.http
                ip_layer = packet.ip
                tcp_layer = packet.tcp

                # HTTPリクエストかレスポンスかを判断
                if hasattr(http_layer, 'request_method'): # リクエスト
                    print(f"\n[Request] {packet.sniff_time}")
                    print(f"  {ip_layer.src}:{tcp_layer.srcport} -> {ip_layer.dst}:{tcp_layer.dstport}")
                    print(f"  Method: {http_layer.request_method}")
                    print(f"  URI: {http_layer.request_full_uri}")
                    if hasattr(http_layer, 'user_agent'):
                        print(f"  User-Agent: {http_layer.user_agent}")
                    if hasattr(http_layer, 'host'):
                        print(f"  Host: {http_layer.host}")

                elif hasattr(http_layer, 'response_code'): # レスポンス
                    print(f"\n[Response] {packet.sniff_time}")
                    print(f"  {ip_layer.src}:{tcp_layer.srcport} -> {ip_layer.dst}:{tcp_layer.dstport}")
                    print(f"  Status Code: {http_layer.response_code} ({getattr(http_layer, 'response_phrase', '')})")
                    if hasattr(http_layer, 'content_type'):
                        print(f"  Content-Type: {http_layer.content_type}")
                    if hasattr(http_layer, 'content_length_header'):
                         print(f"  Content-Length: {http_layer.content_length_header}")
                    # レスポンスボディの一部 (存在すれば)
                    if hasattr(http_layer, 'file_data'):
                        # file_data は通常テキスト形式でデコードされたもの
                        print(f"  Response Body (partial): {http_layer.file_data[:100].replace('\\n', ' ')}...")

        cap.close()

    except FileNotFoundError:
        print(f"Error: File not found - {pcap_file}")
    except Exception as e:
        print(f"An error occurred: {e}")

# 使用例
analyze_http_traffic('path/to/your/web_traffic.pcap')

例2: DNSクエリと応答の監視 (ライブ)

import pyshark
import sys

def monitor_dns(interface):
    print(f"Monitoring DNS traffic on interface '{interface}'... Press Ctrl+C to stop.")
    try:
        # UDPポート53番をキャプチャ (標準的なDNS)
        capture = pyshark.LiveCapture(interface=interface, bpf_filter='udp port 53')

        for packet in capture.sniff_continuously():
            if 'DNS' in packet:
                dns_layer = packet.dns
                ip_layer = packet.ip

                # QRフラグでクエリ(0)か応答(1)か判断
                if dns_layer.flags_qr == '0': # Query
                    if hasattr(dns_layer, 'qry_name'):
                        print(f"[DNS Query] {packet.sniff_timestamp} | {ip_layer.src} -> {ip_layer.dst} | Query: {dns_layer.qry_name} ({dns_layer.qry_type})")
                elif dns_layer.flags_qr == '1': # Response
                    if hasattr(dns_layer, 'resp_name'):
                        resp_info = f"Resp: {dns_layer.resp_name}"
                        if hasattr(dns_layer, 'a'):
                            resp_info += f" A: {dns_layer.a}"
                        if hasattr(dns_layer, 'aaaa'):
                            resp_info += f" AAAA: {dns_layer.aaaa}"
                        if hasattr(dns_layer, 'cname'):
                             resp_info += f" CNAME: {dns_layer.cname}"
                        print(f"[DNS Response] {packet.sniff_timestamp} | {ip_layer.src} -> {ip_layer.dst} | {resp_info}")

    except KeyboardInterrupt:
        print("\nCapture stopped by user.")
    except Exception as e:
        print(f"\nAn error occurred: {e}")
    finally:
        if 'capture' in locals() and capture and capture.eventloop.is_running():
            capture.close()

# 使用例 (コマンドライン引数からインターフェース名を取得)
if len(sys.argv) > 1:
    interface_to_monitor = sys.argv[1]
    monitor_dns(interface_to_monitor)
else:
    print("Usage: python monitor_dns.py ")
    print("Example: python monitor_dns.py eth0")

例3: 特定IPアドレスとの通信量集計

import pyshark
from collections import defaultdict

def summarize_traffic_by_ip(pcap_file, target_ip):
    traffic_summary = defaultdict(lambda: {'sent_bytes': 0, 'recv_bytes': 0, 'sent_pkts': 0, 'recv_pkts': 0})
    try:
        # 特定IPとの通信のみフィルタリング
        cap = pyshark.FileCapture(pcap_file, display_filter=f'ip.addr == {target_ip}')
        print(f"Summarizing traffic for IP {target_ip} in {pcap_file}...")

        for packet in cap:
            if 'IP' in packet:
                ip_layer = packet.ip
                packet_len = int(packet.length) # パケット全体の長さ

                if ip_layer.src == target_ip: # target_ipからの送信
                    other_ip = ip_layer.dst
                    traffic_summary[other_ip]['sent_bytes'] += packet_len
                    traffic_summary[other_ip]['sent_pkts'] += 1
                elif ip_layer.dst == target_ip: # target_ipへの受信
                    other_ip = ip_layer.src
                    traffic_summary[other_ip]['recv_bytes'] += packet_len
                    traffic_summary[other_ip]['recv_pkts'] += 1

        cap.close()

        print("\n--- Traffic Summary ---")
        print(f"Target IP: {target_ip}")
        if not traffic_summary:
            print("No traffic found for the target IP.")
            return

        # 通信量が多い順にソートして表示
        sorted_summary = sorted(traffic_summary.items(), key=lambda item: item[1]['sent_bytes'] + item[1]['recv_bytes'], reverse=True)

        print(f"{'Remote IP':<20} {'Sent Pkts':>10} {'Sent Bytes':>12} {'Recv Pkts':>10} {'Recv Bytes':>12}")
        print("-" * 70)
        for remote_ip, data in sorted_summary:
            print(f"{remote_ip:<20} {data['sent_pkts']:>10} {data['sent_bytes']:>12} {data['recv_pkts']:>10} {data['recv_bytes']:>12}")

    except FileNotFoundError:
        print(f"Error: File not found - {pcap_file}")
    except Exception as e:
        print(f"An error occurred: {e}")

# 使用例
summarize_traffic_by_ip('path/to/your/network_traffic.pcap', '192.168.1.1')

よくある問題とトラブルシューティング ⚠️

  • pyshark.tshark.tshark.TSharkNotFoundException: TShark not found.

    これが最も一般的な問題です。TSharkがインストールされていないか、システムのPATH環境変数に含まれていない場合に発生します。

    解決策:

    1. Wireshark (TSharkを含む) が正しくインストールされているか確認してください。
    2. TSharkの実行可能ファイル (tshark または tshark.exe) が存在するパスを確認します。
    3. そのパスをシステムのPATH環境変数に追加します。
    4. または、pyshark.LiveCapturepyshark.FileCapture を初期化する際に、tshark_pathパラメータでTSharkのフルパスを明示的に指定します。
      cap = pyshark.FileCapture('file.pcap', tshark_path='/usr/local/bin/tshark')
    5. スクリプトのカレントディレクトリかpysharkのインストールディレクトリにconfig.iniファイルを作成し、そこにパスを記述します。
  • 権限エラー (Permission Denied)

    特にLinuxやmacOSでライブキャプチャを実行する際に発生します。ネットワークインターフェースへのアクセスには通常、管理者権限(rootまたはsudo)が必要です。

    解決策:

    1. スクリプトをsudo python your_script.pyのように管理者権限で実行します(セキュリティリスクを理解した上で)。
    2. (推奨) Wireshark/TSharkのインストール時に推奨される設定を行い、一般ユーザーでもキャプチャできるように権限グループ(例: wiresharkグループ)を設定し、実行ユーザーをそのグループに追加します。設定方法はOSによって異なります。
    3. setcapコマンド(Linux)などを使用して、dumpcap(TSharkが内部で使用するキャプチャツール)に必要な権限を与える方法もありますが、注意が必要です。
  • パフォーマンスの問題 / メモリ消費

    大量のパケットを処理する場合や、非常に大きなキャプチャファイルを扱う場合に、処理速度が遅くなったり、メモリを大量に消費したりすることがあります。

    解決策:

    1. フィルタリングを活用する: bpf_filterdisplay_filterを使って、処理対象のパケットをできるだけ絞り込みます。特にbpf_filterは効率的です。
    2. 概要のみ取得: 詳細なフィールド情報が不要な場合は、only_summaries=Trueオプションを使用します。
    3. メモリ保持を無効化: 巨大なファイルを処理する場合は、FileCapturekeep_packets=Falseオプションを使用し、処理済みパケットをメモリに保持しないようにします。
    4. JSON出力の利用: use_json=True (または use_ek=True) を試すと、XMLよりも高速になる場合があります。
    5. 処理の最適化: Pythonコード内のループ処理やデータ構造を見直し、ボトルネックがないか確認します。apply_on_packetsやジェネレータ (sniff_continuously) は、一度に全パケットをメモリに読み込むよりも効率的な場合があります。
  • 特定のプロトコルが解析されない / フィールドが見つからない

    期待しているプロトコルレイヤーやフィールドがパケットオブジェクトに存在しない場合があります。

    解決策:

    1. Wireshark/TSharkで確認: まず、同じパケットをWireshark GUIやtsharkコマンドで開き、期待する情報が解析されているか確認します。Wiresharkでも解析できていなければ、ディセクタの問題か、パケット自体にその情報が含まれていない可能性があります。
    2. ディセクタの有効化/設定: 特定のプロトコルディセクタが無効になっている、または設定が必要な場合があります。Wiresharkのプロトコル設定を確認し、必要であればoverride_prefsでpysharkに設定を渡します。
    3. decode_asの使用: 非標準ポートでプロトコルが動作している場合、decode_asパラメータで強制的にデコードを指定します。
    4. フィールド名の確認: アクセスしようとしているフィールド名が正しいか確認します(ハイフンはアンダースコアに変換)。dir(packet.layer_name)で利用可能な属性を確認します。
    5. レイヤーの存在確認: if 'LAYER_NAME' in packet:hasattr(packet, 'layer_name') でレイヤーが存在するか確認してからフィールドにアクセスします。

まとめ 🎉

pysharkは、Pythonを使ってネットワークパケットをプログラム的に解析するための非常に強力で便利なライブラリです。Wireshark/TSharkの豊富なプロトコルディセクタを活用できるため、複雑なネットワーク分析やタスクの自動化を容易にします。

ライブキャプチャ、ファイルキャプチャ、柔軟なフィルタリング、プロトコルの詳細な解析など、多くの機能を提供しており、ネットワーク監視、トラブルシューティング、セキュリティ分析、研究開発など、幅広い分野で役立ちます。

インストール時の依存関係(特にTSharkのパス)や権限、パフォーマンスに関する注意点もありますが、基本的な使い方をマスターすれば、Pythonによるネットワークプログラミングの可能性が大きく広がります。ぜひ、あなたのプロジェクトでpysharkを活用してみてください!🚀

コメント

タイトルとURLをコピーしました