基本操作と表示
Scapyの基本的な使い方とパケット情報の表示方法。
Scapyの起動とインポート
# Scapyを対話的に起動 (root権限が必要な場合が多い)
sudo scapy
# Pythonスクリプト内でScapyの全機能をインポート
from scapy.all import *
パケット情報の表示
# パケットのレイヤー構造とフィールドを表示
packet.show()
# パケットの概要(サマリー)を表示 (WiresharkのInfo列に似た形式)
packet.summary()
# パケットを16進数でダンプ表示
hexdump(packet)
# パケットの内容をより詳細に表示 (show()より詳しい)
packet.show2()
# パケットリストのサマリー表示
packet_list.summary()
packet_list.nsummary() # 番号付きサマリー
レイヤーとフィールドの確認
# 利用可能な全レイヤー(プロトコル)を表示
ls()
# 特定のレイヤー(例: IP)で利用可能なフィールドを表示
ls(IP)
# 特定のレイヤー(例: TCP)のフィールドとその型を表示
ls(TCP)
# 利用可能なコマンド(関数)一覧を表示
lsc()
📡 パケットの構築
様々なプロトコルのパケットをゼロから作成します。
レイヤーのスタック
/
演算子を使ってレイヤーを重ねてパケットを構築します。左から右へ、下位レイヤーから上位レイヤーの順に記述するのが一般的です。
# IP/TCP パケット
ip_tcp_packet = IP()/TCP()
# Ethernet/IP/UDP パケット
eth_ip_udp_packet = Ether()/IP()/UDP()
# Ethernet/IP/ICMP Echo Request パケット
ping_packet = Ether()/IP(dst="8.8.8.8")/ICMP()
# ペイロード(Rawデータ)を追加
http_get = Ether()/IP(dst="example.com")/TCP(dport=80)/Raw(load="GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
# Rawの代わりに直接文字列も可能
# http_get = Ether()/IP(dst="example.com")/TCP(dport=80)/"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n"
print(ping_packet.summary())
ping_packet.show()
フィールド値の設定
パケット作成時に引数でフィールド値を指定したり、作成後に属性としてアクセスして値を設定・変更できます。
# 作成時に指定
ip_layer = IP(src="192.168.1.100", dst="192.168.1.1", ttl=128)
tcp_layer = TCP(dport=80, sport=12345, flags="S") # SYN パケット
packet = ip_layer/tcp_layer
# 作成後に設定・変更
packet = IP()/TCP()
packet[IP].dst = "10.0.0.1"
packet[IP].src = "192.168.0.10" # 自動でsrc IPが設定される場合もあるが、明示的に指定可能
packet[TCP].dport = 443
packet[TCP].flags = "SA" # SYN-ACK パケット
# フィールドの削除 (Scapyがデフォルト値を再計算または設定)
del(packet[IP].chksum)
del(packet[TCP].chksum)
del(packet[IP].len)
packet.show()
共通プロトコルの例
プロトコル | 基本的な作成例 | 備考 |
---|---|---|
Ethernet |
|
送信時に指定しない場合、sendp() などで自動補完されることがある |
IP |
|
src は送信インターフェースから自動設定されることが多い |
TCP |
|
flags : S(SYN), A(ACK), P(PSH), R(RST), U(URG), F(FIN), E(ECE), C(CWR) |
UDP |
|
シンプルなヘッダ |
ICMP |
|
type=0, code=0 はEcho reply |
ARP |
|
op=1 (request), op=2 (reply) |
DNS |
|
UDPポート53でよく使われる |
📤 パケットの送信と受信 📥
作成したパケットをネットワークに送信し、応答を受信します。
送信関数
関数 | レイヤー | 説明 | 戻り値 |
---|---|---|---|
send() |
L3 (IPなど) | パケットを送信する。L2ヘッダはScapyが自動生成(ルーティング含む)。応答は受信しない。 | なし |
sendp() |
L2 (Ethernetなど) | パケットを送信する。L2ヘッダを自分で指定する必要がある。応答は受信しない。 | なし |
sr() |
L3 (IPなど) | パケットを送信し、応答を受信する。応答パケットと未応答パケットのペアを返す。 | (answered, unanswered) のタプル |
sr1() |
L3 (IPなど) | パケットを送信し、最初の応答パケットのみを受信する。タイムアウトすると None を返す。 |
応答パケット (Packet ) または None |
srp() |
L2 (Ethernetなど) | パケットを送信し、応答を受信する(L2)。応答パケットと未応答パケットのペアを返す。 | (answered, unanswered) のタプル |
srp1() |
L2 (Ethernetなど) | パケットを送信し、最初の応答パケットのみを受信する(L2)。タイムアウトすると None を返す。 |
応答パケット (Packet ) または None |
送信・受信オプション
# send() / sendp() のオプション
packet = IP(dst="8.8.8.8")/ICMP()
send(packet, count=5, inter=0.5, verbose=False, return_packets=True)
# count: 送信回数
# inter: 送信間隔(秒)
# loop: 0以外を指定するとCtrl+Cまで無限ループ送信
# verbose: 詳細な送信メッセージの表示制御 (デフォルトはTrue)
# iface: 送信するインターフェースを指定
# return_packets: 送信したパケットのリストを返すか (デフォルトはFalse)
# sr() / sr1() / srp() / srp1() のオプション
ans, unans = sr(IP(dst="192.168.1.1/24")/TCP(dport=80, flags="S"),
timeout=1, # タイムアウト時間(秒)
retry=2, # 再送回数(タイムアウトした場合)
verbose=0, # 詳細表示レベル (0: 静か, 1: 通常, 2: 詳細)
multi=True, # 複数の応答を待つか (sr/srpのみ)
filter="tcp port 80", # 受信時のBPFフィルタ
iface="eth0") # 使用するインターフェース
# sr1() の使用例
response = sr1(IP(dst="8.8.8.8")/ICMP(), timeout=1, verbose=0)
if response:
response.show()
else:
print("No response received.")
# sr() の結果の確認
if ans:
print("Received answers:")
ans.summary()
# 例: 最初の応答ペア (送信パケット, 受信パケット) を表示
# ans[0][0].show() # Sent packet
# ans[0][1].show() # Received packet
if unans:
print("Unanswered packets:")
unans.summary()
🔍 パケットのスニッフィングとキャプチャ
ネットワーク上を流れるパケットをキャプチャし、ファイルに保存・読み込みします。
sniff() 関数
sniff()
関数はネットワークインターフェースからパケットをキャプチャします。
# 基本的なスニッフィング (指定した数のパケットをキャプチャ)
packets = sniff(count=10)
packets.summary()
# インターフェースを指定してスニッフィング
packets_eth0 = sniff(iface="eth0", count=5)
# BPFフィルタを使用して特定のパケットのみキャプチャ
# 例: TCPポート80のパケットのみ
tcp_packets = sniff(filter="tcp port 80", count=10)
# 例: 特定ホストからのICMPパケット
icmp_from_host = sniff(filter="icmp and src host 192.168.1.1", count=5)
# BPF構文: host, net, port, proto (tcp, udp, icmp, arp), src, dst, and, or, not など
# タイムアウトを指定 (指定時間後にスニッフィング終了)
timed_sniff = sniff(timeout=10)
# パケット受信ごとに関数を実行 (prn)
def print_packet_summary(packet):
print(packet.summary())
sniff(prn=print_packet_summary, count=5)
# 受信パケットをラムダ関数でフィルタリング (lfilter)
# 例: ARPパケットのみ処理
arp_packets = sniff(lfilter=lambda pkt: pkt.haslayer(ARP), count=3)
# スニッフィングを停止せずにパケットを処理 (非同期スニッフィング)
# from scapy.all import AsyncSniffer (または sniff(store=False, prn=...) )
sniffer = AsyncSniffer(prn=lambda x: x.summary(), filter="udp port 53")
sniffer.start()
# ... 何か他の処理 ...
# time.sleep(10) # 例: 10秒間スニッフィング
results = sniffer.stop() # スニッフィングを停止し、結果を取得 (オプション)
# store=False でメモリにパケットを保存しない (大量トラフィック向け)
sniff(prn=lambda x: x.summary(), store=False)
# セッションを使用して高度なスニッフィング (例: IPフラグメント再構築)
sniff(filter="tcp", session=IPSession, prn=lambda x: x.summary())
# 利用可能なセッション: IPSession, TCPSession など
PCAPファイルの操作 💾
キャプチャしたパケットをPCAPファイルに保存したり、既存のPCAPファイルを読み込んだりします。
# パケットリストをPCAPファイルに書き込む
packets_to_save = sniff(count=5)
wrpcap("capture.pcap", packets_to_save)
# 既存のPCAPファイルを読み込む
read_packets = rdpcap("capture.pcap")
read_packets.summary()
# PCAPファイルに追記する
more_packets = sniff(count=3)
wrpcap("capture.pcap", more_packets, append=True)
# 巨大なPCAPファイルをジェネレータとして読み込む (メモリ効率が良い)
# from scapy.utils import PcapReader
for pkt in PcapReader("large_capture.pcap"):
# 各パケットに対する処理
if pkt.haslayer(TCP):
print(pkt.summary())
# WrpcapSink を使ってリアルタイムに書き込む (Scapy Pipesの一部)
# from scapy.pipetool import WrpcapSink
# sink = WrpcapSink("output.pcap")
# sink.start()
# # ... パケットを生成またはスニッフィングして sink.write(packet) ...
# sink.stop()
🛠️ パケットの解析と操作
キャプチャしたパケットや作成したパケットの内部を解析し、内容を変更します。
レイヤーへのアクセス
packet = Ether()/IP(dst="8.8.8.8")/TCP(dport=80)
# インデックスでアクセス (0から)
ether_layer = packet[0]
ip_layer = packet[1]
tcp_layer = packet[2]
# レイヤークラス名でアクセス
ip_layer_by_name = packet[IP]
tcp_layer_by_name = packet[TCP]
# 特定のレイヤーが存在するか確認
if packet.haslayer(IP):
print("Packet has IP layer")
if packet.haslayer("TCP"): # 文字列でも指定可能
print("Packet has TCP layer")
if not packet.haslayer(UDP):
print("Packet does not have UDP layer")
# 特定のレイヤーを取得 (存在しない場合はNone)
udp_layer = packet.getlayer(UDP)
if udp_layer:
udp_layer.show()
else:
print("UDP layer not found.")
# 最上位レイヤー(ペイロード)を取得
payload = packet.payload
print(type(payload)) # 次のレイヤー (例: )
# IPレイヤーのペイロードはTCPレイヤー
print(type(packet[IP].payload)) #
# TCPレイヤーのペイロード (もしあればRawレイヤー)
# print(type(packet[TCP].payload)) # 例:
フィールドへのアクセスと変更
packet = IP(src="1.1.1.1", dst="2.2.2.2")/TCP(sport=1000, dport=2000)
# フィールド値の読み取り
source_ip = packet[IP].src
dest_port = packet[TCP].dport
print(f"Source IP: {source_ip}, Dest Port: {dest_port}")
# フィールド値の変更
print("Original packet summary:", packet.summary())
packet[IP].src = "3.3.3.3"
packet[TCP].dport = 80
packet[TCP].flags = "S" # フラグを追加
print("Modified packet summary:", packet.summary())
# チェックサムや長さフィールドの自動再計算
# Scapyは通常、パケットを送信する際やshow2()などで自動的に再計算する
# 明示的に再計算させたい場合:
# del packet[IP].chksum
# del packet[TCP].chksum
# packet.show2() # 再計算がトリガーされる
# 変更後のパケット表示
packet.show()
レイヤーの削除と追加
packet = Ether()/IP()/TCP()/Raw(load="Some data")
print("Original layers:")
packet.show()
# レイヤーの削除 (delを使用)
del packet[Raw]
print("\nAfter removing Raw layer:")
packet.show()
del packet.payload # 最上位レイヤーを削除 (この場合はTCP)
print("\nAfter removing payload (TCP):")
packet.show()
# レイヤーの追加 ( / 演算子を使用)
packet = Ether()/IP()
packet = packet/UDP(dport=53) # UDPレイヤーを追加
packet = packet/DNS(rd=1, qd=DNSQR(qname="test.com")) # DNSレイヤーを追加
print("\nAfter adding UDP and DNS layers:")
packet.show()
🚀 高度なテクニック
より複雑なネットワーク操作や自動化タスク。
Traceroute
traceroute()
関数を使用して、ターゲットまでのネットワーク経路を調査します。
target = "google.com"
# デフォルトはUDPプローブ
ans, unans = traceroute(target, maxttl=20, verbose=0)
# ans.summary()
ans.show() # より詳細な結果表示
# ICMPプローブを使用
ans_icmp, _ = traceroute(target, maxttl=15, proto="icmp", verbose=0)
ans_icmp.show()
# TCP SYNプローブを使用 (特定のポートへ)
ans_tcp, _ = traceroute(target, dport=80, maxttl=10, verbose=0)
ans_tcp.show()
# 結果をグラフで表示 (matplotlibとpygraphvizが必要な場合がある)
# ans.graph() # 新しいウィンドウで表示
# ans.graph(target="> traceroute_graph.svg") # ファイルに保存
ARPスキャン (ローカルネットワーク)
arping()
関数を使用して、ローカルネットワーク上のアクティブなホストを発見します。
# ローカルネットワークの範囲を指定
network = "192.168.1.0/24"
ans, unans = arping(network, verbose=0)
print("ARP Scan Results:")
if ans:
ans.summary(lambda s, r: r.sprintf("%Ether.src% %ARP.psrc%"))
else:
print("No hosts found.")
# よりシンプルな方法 (srpを使用)
# ans, unans = srp(Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=network), timeout=2, verbose=0)
# if ans:
# print("Hosts found:")
# for sent, received in ans:
# print(f"IP: {received.psrc} - MAC: {received.hwsrc}")
リアルタイムでのパケット変更 (Linux – NetfilterQueue) 🐧
LinuxのNetfilterQueueと連携して、通過するパケットをリアルタイムで変更します(別途 netfilterqueue
ライブラリが必要)。
# 注意: このコードを実行するには netfilterqueue ライブラリと
# iptables でキューを設定する必要があります。
# sudo pip install netfilterqueue
# sudo iptables -I INPUT -j NFQUEUE --queue-num 1
# sudo iptables -I OUTPUT -j NFQUEUE --queue-num 1
# 処理が終わったらルールを削除:
# sudo iptables -D INPUT -j NFQUEUE --queue-num 1
# sudo iptables -D OUTPUT -j NFQUEUE --queue-num 1
try:
from netfilterqueue import NetfilterQueue
except ImportError:
print("NetfilterQueue is not installed. Skipping example.")
NetfilterQueue = None
if NetfilterQueue:
def modify_packet(packet_nfq):
try:
scapy_packet = IP(packet_nfq.get_payload()) # NFQペイロードをScapyパケットに変換
# --- ここでパケットを変更するロジック ---
if scapy_packet.haslayer(TCP) and scapy_packet[TCP].dport == 80:
if scapy_packet.haslayer(Raw):
payload = scapy_packet[Raw].load.decode('utf-8', errors='ignore')
# 例: HTTP GETリクエストのHostヘッダーを変更 (単純な例)
if "Host:" in payload:
new_payload = payload.replace("Host: ", "Host: hacked.by.scapy. ")
scapy_packet[Raw].load = new_payload.encode('utf-8')
print("Modified HTTP Host header.")
# チェックサムと長さを再計算させるために削除
del scapy_packet[IP].len
del scapy_packet[IP].chksum
del scapy_packet[TCP].chksum
packet_nfq.set_payload(bytes(scapy_packet)) # 変更したパケットをNFQに戻す
# ----------------------------------------
packet_nfq.accept() # パケットを通過させる (変更有無に関わらず)
# packet_nfq.drop() # パケットを破棄する場合
except Exception as e:
print(f"Error processing packet: {e}")
packet_nfq.accept() # エラー時もとりあえず通過させる
queue = NetfilterQueue()
queue.bind(1, modify_packet) # キュー番号1にバインド
try:
print("Starting packet modification queue...")
queue.run()
except KeyboardInterrupt:
print("Stopping queue.")
finally:
queue.unbind()
print("Queue unbound. Remember to remove iptables rules.")
コメント