はじめに:pysharkとは?
pyshark
は、ネットワークプロトコルアナライザであるWiresharkのコマンドラインユーティリティTShark
のPythonラッパーです。PythonからWiresharkの強力なパケット解析機能(ディセクタ)を利用することを可能にします。
Pythonにはいくつかのパケット解析モジュールが存在しますが、pyshark
の特徴は、自身でパケットを解析するのではなく、TShark
が生成するXMLやJSON出力を利用してパケット情報を提供するところにあります。これにより、Wireshark/TSharkに実装されている全てのディセクタ(プロトコル解析器)を利用できるという大きな利点があります。Windows、Linux、macOSなど、主要なOSで動作が確認されています。
pyshark
を使えば、以下のようなことがPythonスクリプトで実現可能になります。
- ライブキャプチャ:ネットワークインターフェースからリアルタイムでパケットをキャプチャし、分析する。
- ファイルキャプチャ:既存のキャプチャファイル(.pcap, .pcapngなど)を読み込み、詳細に分析する。
- フィルタリング:BPF(Berkeley Packet Filter)構文やWiresharkの表示フィルタ構文を使って、目的のパケットのみを効率的に処理する。
- 自動化:特定のパケットを検出したらアラートを送信したり、定期的なトラフィックレポートを生成したりするスクリプトを作成する。
- 統計分析:キャプチャしたデータから統計情報を抽出し、ネットワークの傾向を分析する。
ネットワークエンジニアがトラブルシューティングに利用したり、セキュリティエンジニアが不正な通信を検出したりと、様々な用途で活用できます。アイデア次第で、その可能性は無限大です!✨
インストール 🔧
pyshark
を使用するには、まずPythonとTShark(Wireshark)がインストールされている必要があります。pyshark
はPython 3.7以上をサポートしています。
1. TShark (Wireshark) のインストール
pyshark
は内部でTShark
コマンドを使用するため、Wireshark(TSharkが含まれています)を先にインストールする必要があります。
- Windows: Wiresharkの公式サイトからインストーラーをダウンロードし、インストールします。インストール時に「TShark」コンポーネントが選択されていることを確認してください。通常、インストール後、TSharkのパスは自動的に認識されますが、環境によっては手動でパスを指定する必要がある場合があります(例:
C:\Program Files\Wireshark\tshark.exe
)。 - Linux (Ubuntu/Debian):
または、Wireshark全体をインストールする場合は:sudo apt update sudo apt install tshark
インストール中に、一般ユーザーがキャプチャを実行できるようにするかどうか尋ねられる場合があります。必要に応じて許可してください。sudo apt install wireshark
- macOS:
Wiresharkの公式サイトからインストーラーをダウンロードしてインストールするか、Homebrewを使用できます。
macOSでは、brew install wireshark
libxml
のインストールが必要になる場合があります。エラーが発生した場合は、以下のコマンドを試してください。xcode-select --install pip install libxml
2. pyshark のインストール
pip
を使って簡単にインストールできます。
pip install pyshark
最新の開発版を試したい場合は、GitHubリポジトリから直接インストールすることも可能です。
git clone https://github.com/KimiNewt/pyshark.git
cd pyshark/src
python setup.py install
TSharkのパス指定
TSharkが標準的な場所にインストールされていない、またはPATHが通っていない場合、pyshark
のCaptureオブジェクトを初期化する際にtshark_path
パラメータでTShark実行ファイルのフルパスを指定します。
import pyshark
# 例: Windowsの場合
tshark_custom_path = r'C:\Program Files\Wireshark\tshark.exe'
# 例: Linux/macOSの場合 (もし /usr/local/bin にあれば)
# tshark_custom_path = '/usr/local/bin/tshark'
# FileCaptureの場合
cap = pyshark.FileCapture('path/to/your/capture.pcap', tshark_path=tshark_custom_path)
# LiveCaptureの場合
capture = pyshark.LiveCapture(interface='eth0', tshark_path=tshark_custom_path)
または、pyshark
が読み込む設定ファイル (config.ini
) にパスを記述することも可能です。このファイルは、スクリプト実行時のカレントディレクトリ、またはpyshark
ライブラリのインストールディレクトリ内に置くことができます。
[tshark]
tshark_path = C:\Program Files\Wireshark\tshark.exe
[dumpcap]
dumpcap_path = C:\Program Files\Wireshark\dumpcap.exe
基本的な使い方 📖
pyshark
には主に2つのキャプチャ方法があります:ライブキャプチャとファイルキャプチャです。
機能 | ライブキャプチャ (`LiveCapture`) | ファイルキャプチャ (`FileCapture`) |
---|---|---|
目的 | ネットワークインターフェースからリアルタイムでパケットを取得 | 既存のキャプチャファイル(.pcapなど)からパケットを読み込む |
主なクラス | pyshark.LiveCapture | pyshark.FileCapture |
パケット取得方法 | sniff() , sniff_continuously() , apply_on_packets() | イテレータとして直接利用、またはインデックスアクセス |
主な引数 | interface , bpf_filter , display_filter , tshark_path など | input_file (必須), display_filter , decode_as , tshark_path など |
ファイルキャプチャ (`FileCapture`)
保存済みのキャプチャファイル(例:mycapture.pcap
)を読み込んで、含まれるパケットを分析します。
import pyshark
# キャプチャファイルを開く
try:
cap = pyshark.FileCapture('path/to/your/mycapture.pcap')
print(f"キャプチャファイルを読み込みました: {cap.input_filename}")
# パケットを一つずつ処理する (イテレータとして)
packet_count = 0
for packet in cap:
packet_count += 1
print(f"--- パケット {packet_count} ({packet.length} bytes) ---")
print(f"Timestamp: {packet.sniff_time}")
# レイヤー情報を表示 (例: Ethernet, IP, TCP)
if 'ETH' in packet:
print(f" Ethernet: Src={packet.eth.src}, Dst={packet.eth.dst}")
if 'IP' in packet:
print(f" IP: Src={packet.ip.src}, Dst={packet.ip.dst}, Proto={packet.ip.proto}")
if 'TCP' in packet:
print(f" TCP: SrcPort={packet.tcp.srcport}, DstPort={packet.tcp.dstport}, Flags={packet.tcp.flags}")
elif 'UDP' in packet:
print(f" UDP: SrcPort={packet.udp.srcport}, DstPort={packet.udp.dstport}")
# 最初の5パケットだけ表示して終了
if packet_count >= 5:
break
# 特定のインデックスのパケットにアクセス (注意: ファイルが大きい場合はメモリを消費する可能性あり)
# print("\n--- 最初のパケットの詳細 ---")
# if len(cap) > 0:
# print(cap[0])
except FileNotFoundError:
print("エラー: 指定されたキャプチャファイルが見つかりません。")
except Exception as e:
print(f"エラーが発生しました: {e}")
finally:
if 'cap' in locals() and cap:
cap.close() # ファイルハンドルを解放
FileCapture
オブジェクトはイテレータとして機能するため、for
ループで簡単にパケットを順次処理できます。また、リストのようにインデックス(例:cap[0]
)で特定のパケットにアクセスすることも可能ですが、ファイルが大きい場合に全パケットをメモリに読み込もうとする可能性があるため注意が必要です。
フィルタリングも可能です。display_filter
引数を使ってWiresharkの表示フィルタと同じ構文でフィルタを適用できます。
# HTTPトラフィックのみをフィルタリングして読み込む
http_cap = pyshark.FileCapture('path/to/your/mycapture.pcap', display_filter='http')
for packet in http_cap:
print(f"HTTP Packet: {packet.highest_layer} - Time: {packet.sniff_time}")
# HTTPレイヤーの詳細にアクセス
if 'HTTP' in packet:
print(f" Request Method: {getattr(packet.http, 'request_method', 'N/A')}")
print(f" Request URI: {getattr(packet.http, 'request_uri', 'N/A')}")
print(f" Host: {getattr(packet.http, 'host', 'N/A')}")
http_cap.close()
ライブキャプチャ (`LiveCapture`)
指定したネットワークインターフェース(例:eth0
やWi-Fi
)からリアルタイムでパケットをキャプチャします。
ライブキャプチャにはいくつかの方法があります。
sniff(timeout=...)
またはsniff(packet_count=...)
: 指定した時間またはパケット数だけキャプチャし、その後でまとめて処理します。sniff_continuously(packet_count=...)
: パケットが到着するたびに処理するジェネレータを返します。指定したパケット数に達するか、中断されるまでキャプチャを続けます。apply_on_packets(callback, timeout=...)
: パケットが到着するたびに指定したコールバック関数を実行します。
例1: sniff()
を使う (指定時間)
import pyshark
import time
# 'eth0' インターフェースから10秒間キャプチャ (インターフェース名は環境に合わせて変更)
# Windowsの場合: 'イーサネット', 'Wi-Fi' など get_if_list() で確認可能
# Linux/macOSの場合: 'eth0', 'en0', 'wlan0' など
try:
interface_name = 'eth0' # ここを実際のインターフェース名に変更
print(f"インターフェース '{interface_name}' でキャプチャを開始します (10秒間)...")
capture = pyshark.LiveCapture(interface=interface_name)
capture.sniff(timeout=10) # 10秒間キャプチャ
print(f"\nキャプチャ完了。合計 {len(capture)} パケットを取得しました。")
# 取得したパケットを処理
for i, packet in enumerate(capture):
print(f"--- Packet {i+1} ---")
print(packet) # パケットの概要を表示
if i >= 4: # 最初の5パケットだけ表示
break
except Exception as e:
print(f"エラーが発生しました: {e}")
finally:
if 'capture' in locals() and capture and capture.eventloop.is_running():
capture.close()
例2: sniff_continuously()
を使う (ジェネレータ)
import pyshark
try:
interface_name = 'eth0' # ここを実際のインターフェース名に変更
print(f"インターフェース '{interface_name}' で継続的にキャプチャを開始します (最初の5パケットを取得)...")
# BPFフィルタでTCPポート80番のみをキャプチャ
capture = pyshark.LiveCapture(interface=interface_name, bpf_filter='tcp port 80')
# packet_countを指定しない場合は Ctrl+C で停止するまで継続
packet_iterator = capture.sniff_continuously(packet_count=5)
for packet in packet_iterator:
print(f"到着したパケット: {packet.sniff_timestamp}")
if 'IP' in packet and 'TCP' in packet:
print(f" {packet.ip.src}:{packet.tcp.srcport} -> {packet.ip.dst}:{packet.tcp.dstport}")
else:
print(packet.highest_layer)
print("\n指定パケット数をキャプチャしました。")
except Exception as e:
print(f"エラーが発生しました: {e}")
finally:
# sniff_continuously は内部で close を呼ぶことが多いが、念のため
if 'capture' in locals() and capture and capture.eventloop.is_running():
capture.close()
bpf_filter
引数を使用すると、カーネルレベルで効率的にパケットをフィルタリングできます(BPF構文)。これは、display_filter
(Wireshark構文、TShark側で処理)よりもパフォーマンスが良い場合があります。
例3: apply_on_packets()
を使う (コールバック)
import pyshark
import time
# パケット到着時に呼び出されるコールバック関数
def print_packet_summary(packet):
timestamp = packet.sniff_time
summary = f"[{timestamp}] "
if 'IP' in packet:
summary += f"IP {packet.ip.src} -> {packet.ip.dst} "
if 'TCP' in packet:
summary += f"TCP {packet.tcp.srcport} -> {packet.tcp.dstport}"
elif 'UDP' in packet:
summary += f"UDP {packet.udp.srcport} -> {packet.udp.dstport}"
elif 'ICMP' in packet:
summary += f"ICMP Type={packet.icmp.type}"
else:
summary += f"Proto={packet.ip.proto}"
elif 'ARP' in packet:
summary += f"ARP Who has {packet.arp.dst_proto_ipv4}? Tell {packet.arp.src_proto_ipv4}"
else:
summary += f"Other ({packet.highest_layer})"
print(summary)
try:
interface_name = 'eth0' # ここを実際のインターフェース名に変更
print(f"インターフェース '{interface_name}' でキャプチャを開始し、コールバックを呼び出します (15秒間)...")
capture = pyshark.LiveCapture(interface=interface_name)
# 15秒間、パケットが来るたびに print_packet_summary を実行
capture.apply_on_packets(print_packet_summary, timeout=15)
print("\nキャプチャ時間終了。")
except Exception as e:
print(f"エラーが発生しました: {e}")
finally:
if 'capture' in locals() and capture and capture.eventloop.is_running():
capture.close()
パケットの解析 (ディセクション) 🔬
pyshark
で取得したパケットオブジェクトは、複数のレイヤー(例:Ethernet, IP, TCP, UDP, HTTP, DNS)の情報を持っています。各レイヤーには、そのプロトコル固有のフィールドが含まれています。
パケット内のレイヤーやフィールドにアクセスするには、ドット記法または辞書形式のアクセスを使用します。レイヤー名は大文字小文字を区別しません(内部で小文字に変換されます)。
import pyshark
cap = pyshark.FileCapture('path/to/your/capture.pcap', display_filter='tcp or dns')
for packet in cap:
print(f"\n--- Packet {packet.number} ---")
# レイヤーの存在確認
if 'IP' in packet:
ip_layer = packet.ip # ドット記法でIPレイヤーを取得
print(f" Source IP: {ip_layer.src}")
print(f" Destination IP: {ip_layer.dst}")
print(f" TTL: {ip_layer.ttl}")
# 辞書形式でのアクセスも可能
# print(f" Source IP (dict): {packet['ip'].src}")
if 'TCP' in packet:
tcp_layer = packet.tcp
print(f" Source Port: {tcp_layer.srcport}")
print(f" Destination Port: {tcp_layer.dstport}")
print(f" Sequence Number: {tcp_layer.seq}")
print(f" Window Size: {tcp_layer.window_size_value}")
print(f" Flags: ACK={tcp_layer.flags_ack}, SYN={tcp_layer.flags_syn}, FIN={tcp_layer.flags_fin}")
# ペイロード (データ部分) があれば表示
if hasattr(tcp_layer, 'payload'):
# payload は通常、コロン区切りの16進数文字列
print(f" TCP Payload (hex): {tcp_layer.payload[:60]}...") # 長い場合は一部表示
if 'UDP' in packet:
udp_layer = packet.udp
print(f" Source Port: {udp_layer.srcport}")
print(f" Destination Port: {udp_layer.dstport}")
print(f" Length: {udp_layer.length}")
if 'DNS' in packet:
dns_layer = packet.dns
print(" --- DNS Info ---")
if hasattr(dns_layer, 'qry_name'):
print(f" Query Name: {dns_layer.qry_name}")
print(f" Query Type: {dns_layer.qry_type}") # AAAA (IPv6), A (IPv4), CNAME, etc.
if hasattr(dns_layer, 'resp_name'):
print(f" Response Name: {dns_layer.resp_name}")
print(f" Response Addr: {getattr(dns_layer, 'a', 'N/A')}") # IPv4アドレス
print(f" Response Addr (IPv6): {getattr(dns_layer, 'aaaa', 'N/A')}") # IPv6アドレス
# 利用可能なレイヤー名を確認
# print(f" Available Layers: {packet.layers}")
# 特定レイヤーのフィールド名を確認
# if 'IP' in packet:
# print(f" IP Fields: {dir(packet.ip)}")
cap.close()
注意点:
- フィールド名はTShark/Wiresharkの表示フィルタで使用されるものとほぼ同じですが、ハイフン(
-
)はアンダースコア(_
)に置き換えられます(例:frame-number
→frame_number
)。 - 特定のフィールドが存在しない場合(例えば、オプションフィールドなど)にアクセスしようとすると
AttributeError
が発生する可能性があります。hasattr(layer, 'field_name')
で存在を確認するか、getattr(layer, 'field_name', default_value)
を使用すると安全です。 - フィールドの値は文字列として返されることが多いです。数値として扱いたい場合は、
int()
やfloat()
で変換する必要があります。16進数のフィールド(例:checksum)には.hex_value
属性で整数値を取得できる場合があります。 - ペイロードデータは通常、コロンで区切られた16進数文字列(例:
'1a:2b:3c:...'
)で表現されます。バイナリデータとして扱いたい場合は、bytes.fromhex(payload.replace(':', ''))
のように変換が必要です。codecs
ライブラリなどを使ってエンコーディングを扱うこともできます。 packet.highest_layer
属性で、そのパケットで最も上位(アプリケーション層に近い)のプロトコル名を取得できます。packet.layers
属性で、そのパケットに含まれる全レイヤーオブジェクトのリストを取得できます。同じプロトコルが複数存在する場合(例: トンネリング)もリストに含まれます。dir(packet.layer_name)
を実行すると、そのレイヤーで利用可能なフィールド(属性)の一覧を確認できます。
高度な機能 ✨
キャプチャフィルタと表示フィルタ
pyshark
では、2種類のフィルタを利用できます。
フィルタ種類 | 引数名 | 構文 | 処理タイミング | 特徴 | 例 |
---|---|---|---|---|---|
キャプチャフィルタ | bpf_filter | BPF (Berkeley Packet Filter) | パケットキャプチャ時 (OSカーネル/ドライバレベル) |
| 'tcp port 80' , 'host 192.168.1.1' , 'udp and port 53' |
表示フィルタ | display_filter | Wireshark 表示フィルタ | パケット解析時 (TShark/pyshark レベル) |
| 'http.request.method == "GET"' , 'ip.addr == 10.0.0.5' , 'dns.qry.name contains "example"' |
パフォーマンスが重要なライブキャプチャでは、まずbpf_filter
で大まかに絞り込み、さらに詳細な条件で絞り込みたい場合にdisplay_filter
を併用するのが効果的です。ファイルキャプチャでは主にdisplay_filter
が使われます。
# ライブキャプチャで両方のフィルタを使用する例
# 192.168.1.100 との間で、TCPポート443 (HTTPS) の通信のみをキャプチャし、
# さらに TLS Client Hello メッセージのみを処理する
capture = pyshark.LiveCapture(
interface='eth0',
bpf_filter='host 192.168.1.100 and tcp port 443',
display_filter='tls.handshake.type == 1' # 1はClient Helloを示す
)
for packet in capture.sniff_continuously():
print(f"TLS Client Hello from {packet.ip.src} to {packet.ip.dst}")
# TLSレイヤーの詳細にアクセス
if 'TLS' in packet:
print(f" TLS Version: {packet.tls.record_version}")
# SNI (Server Name Indication) を取得
sni = getattr(packet.tls, 'handshake_extensions_server_name', 'N/A')
print(f" Server Name Indication (SNI): {sni}")
プロトコルのデコード指定 (`decode_as`)
標準的でないポートで特定のプロトコルが使われている場合など、TSharkが自動でプロトコルを認識できない場合に、強制的にデコードする方法を指定できます。decode_as
引数に辞書形式で指定します。
# TCPポート8888番のトラフィックをHTTPとしてデコードする
cap = pyshark.FileCapture(
'path/to/capture_with_custom_port.pcap',
decode_as={'tcp.port==8888': 'http'}
)
for packet in cap:
if 'HTTP' in packet:
print(f"Detected HTTP on port 8888: {packet.ip.src} -> {packet.ip.dst}")
# HTTPレイヤーの処理...
elif 'TCP' in packet and (packet.tcp.srcport == '8888' or packet.tcp.dstport == '8888'):
print(f"TCP on port 8888 (not decoded as HTTP): {packet.ip.src} -> {packet.ip.dst}")
cap.close()
暗号化されたトラフィックの復号
WEPやWPA/WPA2-PSKで暗号化されたWi-Fiトラフィックを復号するためのキーを指定できます(ただし、TLS/SSLのような上位レイヤーの暗号化は別途設定が必要です)。encryption_type
とdecryption_key
引数を使用します。
# WPA-PSKで暗号化されたWi-Fiキャプチャファイルを復号
try:
wifi_cap = pyshark.FileCapture(
'path/to/encrypted_wifi.pcap',
decryption_key='your_wifi_password',
encryption_type='wpa-pwd' # または 'wpa-psk', 'wep'
)
print("WiFiキャプチャを復号中...")
for packet in wifi_cap:
# 復号されたIPパケットなどを処理
if 'IP' in packet:
print(f"Decrypted IP Packet: {packet.ip.src} -> {packet.ip.dst}")
wifi_cap.close()
except Exception as e:
print(f"復号エラーまたはファイルエラー: {e}")
TLS/SSLの復号には、SSLキーログファイル(環境変数 `SSLKEYLOGFILE` で指定されたファイル)のパスを `override_prefs` パラメータで指定する方法があります。
ssl_keylog_file = '/path/to/your/sslkeylog.log'
tls_cap = pyshark.FileCapture(
'path/to/tls_capture.pcap',
display_filter='tls',
override_prefs={'ssl.keylog_file': ssl_keylog_file}
)
# ... (復号されたHTTP/2などの中身を解析) ...
tls_cap.close()
その他のオプション
only_summaries=True
: パケットの概要情報のみを取得します。より高速ですが、詳細なフィールドにはアクセスできません。keep_packets=False
:FileCapture
で、処理済みのパケットをメモリに保持しないようにします。巨大なファイルを処理する際のメモリ消費を抑えます。ただし、インデックスアクセス(cap[i]
)やlen(cap)
は使えなくなります。use_json=True
(またはuse_ek=True
): TSharkの出力をXMLではなくJSON(Elasticsearch形式)で受け取ります。より高速になる場合がありますが、若干情報量が少ない可能性があります。override_prefs={...}
: TSharkの動作設定(Preferences)を上書きします。例:{'protocols.dns.desegment_dns_messages': 'TRUE'}
custom_parameters={...}
または[...]
: TSharkコマンドに任意のパラメータを追加します。disable_protocol='protocol_name'
: 特定のプロトコルディセクタを無効にします。
実践的な例 💡
これまでの知識を活かして、いくつかの具体的なユースケースを見てみましょう。
例1: HTTPリクエスト/レスポンスの抽出
import pyshark
def analyze_http_traffic(pcap_file):
try:
cap = pyshark.FileCapture(pcap_file, display_filter='http')
print(f"Analyzing HTTP traffic in {pcap_file}...")
for packet in cap:
if 'HTTP' in packet:
http_layer = packet.http
ip_layer = packet.ip
tcp_layer = packet.tcp
# HTTPリクエストかレスポンスかを判断
if hasattr(http_layer, 'request_method'): # リクエスト
print(f"\n[Request] {packet.sniff_time}")
print(f" {ip_layer.src}:{tcp_layer.srcport} -> {ip_layer.dst}:{tcp_layer.dstport}")
print(f" Method: {http_layer.request_method}")
print(f" URI: {http_layer.request_full_uri}")
if hasattr(http_layer, 'user_agent'):
print(f" User-Agent: {http_layer.user_agent}")
if hasattr(http_layer, 'host'):
print(f" Host: {http_layer.host}")
elif hasattr(http_layer, 'response_code'): # レスポンス
print(f"\n[Response] {packet.sniff_time}")
print(f" {ip_layer.src}:{tcp_layer.srcport} -> {ip_layer.dst}:{tcp_layer.dstport}")
print(f" Status Code: {http_layer.response_code} ({getattr(http_layer, 'response_phrase', '')})")
if hasattr(http_layer, 'content_type'):
print(f" Content-Type: {http_layer.content_type}")
if hasattr(http_layer, 'content_length_header'):
print(f" Content-Length: {http_layer.content_length_header}")
# レスポンスボディの一部 (存在すれば)
if hasattr(http_layer, 'file_data'):
# file_data は通常テキスト形式でデコードされたもの
print(f" Response Body (partial): {http_layer.file_data[:100].replace('\\n', ' ')}...")
cap.close()
except FileNotFoundError:
print(f"Error: File not found - {pcap_file}")
except Exception as e:
print(f"An error occurred: {e}")
# 使用例
analyze_http_traffic('path/to/your/web_traffic.pcap')
例2: DNSクエリと応答の監視 (ライブ)
import pyshark
import sys
def monitor_dns(interface):
print(f"Monitoring DNS traffic on interface '{interface}'... Press Ctrl+C to stop.")
try:
# UDPポート53番をキャプチャ (標準的なDNS)
capture = pyshark.LiveCapture(interface=interface, bpf_filter='udp port 53')
for packet in capture.sniff_continuously():
if 'DNS' in packet:
dns_layer = packet.dns
ip_layer = packet.ip
# QRフラグでクエリ(0)か応答(1)か判断
if dns_layer.flags_qr == '0': # Query
if hasattr(dns_layer, 'qry_name'):
print(f"[DNS Query] {packet.sniff_timestamp} | {ip_layer.src} -> {ip_layer.dst} | Query: {dns_layer.qry_name} ({dns_layer.qry_type})")
elif dns_layer.flags_qr == '1': # Response
if hasattr(dns_layer, 'resp_name'):
resp_info = f"Resp: {dns_layer.resp_name}"
if hasattr(dns_layer, 'a'):
resp_info += f" A: {dns_layer.a}"
if hasattr(dns_layer, 'aaaa'):
resp_info += f" AAAA: {dns_layer.aaaa}"
if hasattr(dns_layer, 'cname'):
resp_info += f" CNAME: {dns_layer.cname}"
print(f"[DNS Response] {packet.sniff_timestamp} | {ip_layer.src} -> {ip_layer.dst} | {resp_info}")
except KeyboardInterrupt:
print("\nCapture stopped by user.")
except Exception as e:
print(f"\nAn error occurred: {e}")
finally:
if 'capture' in locals() and capture and capture.eventloop.is_running():
capture.close()
# 使用例 (コマンドライン引数からインターフェース名を取得)
if len(sys.argv) > 1:
interface_to_monitor = sys.argv[1]
monitor_dns(interface_to_monitor)
else:
print("Usage: python monitor_dns.py ")
print("Example: python monitor_dns.py eth0")
例3: 特定IPアドレスとの通信量集計
import pyshark
from collections import defaultdict
def summarize_traffic_by_ip(pcap_file, target_ip):
traffic_summary = defaultdict(lambda: {'sent_bytes': 0, 'recv_bytes': 0, 'sent_pkts': 0, 'recv_pkts': 0})
try:
# 特定IPとの通信のみフィルタリング
cap = pyshark.FileCapture(pcap_file, display_filter=f'ip.addr == {target_ip}')
print(f"Summarizing traffic for IP {target_ip} in {pcap_file}...")
for packet in cap:
if 'IP' in packet:
ip_layer = packet.ip
packet_len = int(packet.length) # パケット全体の長さ
if ip_layer.src == target_ip: # target_ipからの送信
other_ip = ip_layer.dst
traffic_summary[other_ip]['sent_bytes'] += packet_len
traffic_summary[other_ip]['sent_pkts'] += 1
elif ip_layer.dst == target_ip: # target_ipへの受信
other_ip = ip_layer.src
traffic_summary[other_ip]['recv_bytes'] += packet_len
traffic_summary[other_ip]['recv_pkts'] += 1
cap.close()
print("\n--- Traffic Summary ---")
print(f"Target IP: {target_ip}")
if not traffic_summary:
print("No traffic found for the target IP.")
return
# 通信量が多い順にソートして表示
sorted_summary = sorted(traffic_summary.items(), key=lambda item: item[1]['sent_bytes'] + item[1]['recv_bytes'], reverse=True)
print(f"{'Remote IP':<20} {'Sent Pkts':>10} {'Sent Bytes':>12} {'Recv Pkts':>10} {'Recv Bytes':>12}")
print("-" * 70)
for remote_ip, data in sorted_summary:
print(f"{remote_ip:<20} {data['sent_pkts']:>10} {data['sent_bytes']:>12} {data['recv_pkts']:>10} {data['recv_bytes']:>12}")
except FileNotFoundError:
print(f"Error: File not found - {pcap_file}")
except Exception as e:
print(f"An error occurred: {e}")
# 使用例
summarize_traffic_by_ip('path/to/your/network_traffic.pcap', '192.168.1.1')
よくある問題とトラブルシューティング ⚠️
-
pyshark.tshark.tshark.TSharkNotFoundException: TShark not found.
これが最も一般的な問題です。TSharkがインストールされていないか、システムのPATH環境変数に含まれていない場合に発生します。
解決策:
- Wireshark (TSharkを含む) が正しくインストールされているか確認してください。
- TSharkの実行可能ファイル (
tshark
またはtshark.exe
) が存在するパスを確認します。 - そのパスをシステムのPATH環境変数に追加します。
- または、
pyshark.LiveCapture
やpyshark.FileCapture
を初期化する際に、tshark_path
パラメータでTSharkのフルパスを明示的に指定します。cap = pyshark.FileCapture('file.pcap', tshark_path='/usr/local/bin/tshark')
- スクリプトのカレントディレクトリかpysharkのインストールディレクトリに
config.ini
ファイルを作成し、そこにパスを記述します。
-
権限エラー (Permission Denied)
特にLinuxやmacOSでライブキャプチャを実行する際に発生します。ネットワークインターフェースへのアクセスには通常、管理者権限(rootまたはsudo)が必要です。
解決策:
- スクリプトを
sudo python your_script.py
のように管理者権限で実行します(セキュリティリスクを理解した上で)。 - (推奨) Wireshark/TSharkのインストール時に推奨される設定を行い、一般ユーザーでもキャプチャできるように権限グループ(例:
wireshark
グループ)を設定し、実行ユーザーをそのグループに追加します。設定方法はOSによって異なります。 setcap
コマンド(Linux)などを使用して、dumpcap
(TSharkが内部で使用するキャプチャツール)に必要な権限を与える方法もありますが、注意が必要です。
- スクリプトを
-
パフォーマンスの問題 / メモリ消費
大量のパケットを処理する場合や、非常に大きなキャプチャファイルを扱う場合に、処理速度が遅くなったり、メモリを大量に消費したりすることがあります。
解決策:
- フィルタリングを活用する:
bpf_filter
やdisplay_filter
を使って、処理対象のパケットをできるだけ絞り込みます。特にbpf_filter
は効率的です。 - 概要のみ取得: 詳細なフィールド情報が不要な場合は、
only_summaries=True
オプションを使用します。 - メモリ保持を無効化: 巨大なファイルを処理する場合は、
FileCapture
でkeep_packets=False
オプションを使用し、処理済みパケットをメモリに保持しないようにします。 - JSON出力の利用:
use_json=True
(またはuse_ek=True
) を試すと、XMLよりも高速になる場合があります。 - 処理の最適化: Pythonコード内のループ処理やデータ構造を見直し、ボトルネックがないか確認します。
apply_on_packets
やジェネレータ (sniff_continuously
) は、一度に全パケットをメモリに読み込むよりも効率的な場合があります。
- フィルタリングを活用する:
-
特定のプロトコルが解析されない / フィールドが見つからない
期待しているプロトコルレイヤーやフィールドがパケットオブジェクトに存在しない場合があります。
解決策:
- Wireshark/TSharkで確認: まず、同じパケットをWireshark GUIや
tshark
コマンドで開き、期待する情報が解析されているか確認します。Wiresharkでも解析できていなければ、ディセクタの問題か、パケット自体にその情報が含まれていない可能性があります。 - ディセクタの有効化/設定: 特定のプロトコルディセクタが無効になっている、または設定が必要な場合があります。Wiresharkのプロトコル設定を確認し、必要であれば
override_prefs
でpysharkに設定を渡します。 decode_as
の使用: 非標準ポートでプロトコルが動作している場合、decode_as
パラメータで強制的にデコードを指定します。- フィールド名の確認: アクセスしようとしているフィールド名が正しいか確認します(ハイフンはアンダースコアに変換)。
dir(packet.layer_name)
で利用可能な属性を確認します。 - レイヤーの存在確認:
if 'LAYER_NAME' in packet:
やhasattr(packet, 'layer_name')
でレイヤーが存在するか確認してからフィールドにアクセスします。
- Wireshark/TSharkで確認: まず、同じパケットをWireshark GUIや
まとめ 🎉
pyshark
は、Pythonを使ってネットワークパケットをプログラム的に解析するための非常に強力で便利なライブラリです。Wireshark/TSharkの豊富なプロトコルディセクタを活用できるため、複雑なネットワーク分析やタスクの自動化を容易にします。
ライブキャプチャ、ファイルキャプチャ、柔軟なフィルタリング、プロトコルの詳細な解析など、多くの機能を提供しており、ネットワーク監視、トラブルシューティング、セキュリティ分析、研究開発など、幅広い分野で役立ちます。
インストール時の依存関係(特にTSharkのパス)や権限、パフォーマンスに関する注意点もありますが、基本的な使い方をマスターすれば、Pythonによるネットワークプログラミングの可能性が大きく広がります。ぜひ、あなたのプロジェクトでpyshark
を活用してみてください!🚀
コメント