Python Scapy チートシート

cheatsheet

基本操作と表示

Scapyの基本的な使い方とパケット情報の表示方法。

Scapyの起動とインポート


# Scapyを対話的に起動 (root権限が必要な場合が多い)
sudo scapy

# Pythonスクリプト内でScapyの全機能をインポート
from scapy.all import *
      

パケット情報の表示


# パケットのレイヤー構造とフィールドを表示
packet.show()

# パケットの概要(サマリー)を表示 (WiresharkのInfo列に似た形式)
packet.summary()

# パケットを16進数でダンプ表示
hexdump(packet)

# パケットの内容をより詳細に表示 (show()より詳しい)
packet.show2()

# パケットリストのサマリー表示
packet_list.summary()
packet_list.nsummary() # 番号付きサマリー
      

レイヤーとフィールドの確認


# 利用可能な全レイヤー(プロトコル)を表示
ls()

# 特定のレイヤー(例: IP)で利用可能なフィールドを表示
ls(IP)

# 特定のレイヤー(例: TCP)のフィールドとその型を表示
ls(TCP)

# 利用可能なコマンド(関数)一覧を表示
lsc()
      

📡 パケットの構築

様々なプロトコルのパケットをゼロから作成します。

レイヤーのスタック

/ 演算子を使ってレイヤーを重ねてパケットを構築します。左から右へ、下位レイヤーから上位レイヤーの順に記述するのが一般的です。


# IP/TCP パケット
ip_tcp_packet = IP()/TCP()

# Ethernet/IP/UDP パケット
eth_ip_udp_packet = Ether()/IP()/UDP()

# Ethernet/IP/ICMP Echo Request パケット
ping_packet = Ether()/IP(dst="8.8.8.8")/ICMP()

# ペイロード(Rawデータ)を追加
http_get = Ether()/IP(dst="example.com")/TCP(dport=80)/Raw(load="GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
# Rawの代わりに直接文字列も可能
# http_get = Ether()/IP(dst="example.com")/TCP(dport=80)/"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n"

print(ping_packet.summary())
ping_packet.show()
      

フィールド値の設定

パケット作成時に引数でフィールド値を指定したり、作成後に属性としてアクセスして値を設定・変更できます。


# 作成時に指定
ip_layer = IP(src="192.168.1.100", dst="192.168.1.1", ttl=128)
tcp_layer = TCP(dport=80, sport=12345, flags="S") # SYN パケット
packet = ip_layer/tcp_layer

# 作成後に設定・変更
packet = IP()/TCP()
packet[IP].dst = "10.0.0.1"
packet[IP].src = "192.168.0.10" # 自動でsrc IPが設定される場合もあるが、明示的に指定可能
packet[TCP].dport = 443
packet[TCP].flags = "SA" # SYN-ACK パケット

# フィールドの削除 (Scapyがデフォルト値を再計算または設定)
del(packet[IP].chksum)
del(packet[TCP].chksum)
del(packet[IP].len)

packet.show()
      

共通プロトコルの例

プロトコル 基本的な作成例 備考
Ethernet
Ether(src="00:11:22:33:44:55", dst="FF:FF:FF:FF:FF:FF")
送信時に指定しない場合、sendp()などで自動補完されることがある
IP
IP(src="192.168.1.10", dst="8.8.8.8", ttl=64)
srcは送信インターフェースから自動設定されることが多い
TCP
TCP(sport=1024, dport=80, flags="S")
flags: S(SYN), A(ACK), P(PSH), R(RST), U(URG), F(FIN), E(ECE), C(CWR)
UDP
UDP(sport=53, dport=12345)
シンプルなヘッダ
ICMP
ICMP(type=8, code=0) # Echo request (ping)
type=0, code=0はEcho reply
ARP
ARP(pdst="192.168.1.1", hwdst="ff:ff:ff:ff:ff:ff", psrc="192.168.1.100")
op=1 (request), op=2 (reply)
DNS
DNS(rd=1, qd=DNSQR(qname="example.com"))
UDPポート53でよく使われる

📤 パケットの送信と受信 📥

作成したパケットをネットワークに送信し、応答を受信します。

送信関数

関数 レイヤー 説明 戻り値
send() L3 (IPなど) パケットを送信する。L2ヘッダはScapyが自動生成(ルーティング含む)。応答は受信しない。 なし
sendp() L2 (Ethernetなど) パケットを送信する。L2ヘッダを自分で指定する必要がある。応答は受信しない。 なし
sr() L3 (IPなど) パケットを送信し、応答を受信する。応答パケットと未応答パケットのペアを返す。 (answered, unanswered) のタプル
sr1() L3 (IPなど) パケットを送信し、最初の応答パケットのみを受信する。タイムアウトすると None を返す。 応答パケット (Packet) または None
srp() L2 (Ethernetなど) パケットを送信し、応答を受信する(L2)。応答パケットと未応答パケットのペアを返す。 (answered, unanswered) のタプル
srp1() L2 (Ethernetなど) パケットを送信し、最初の応答パケットのみを受信する(L2)。タイムアウトすると None を返す。 応答パケット (Packet) または None

送信・受信オプション


# send() / sendp() のオプション
packet = IP(dst="8.8.8.8")/ICMP()
send(packet, count=5, inter=0.5, verbose=False, return_packets=True)
# count: 送信回数
# inter: 送信間隔(秒)
# loop: 0以外を指定するとCtrl+Cまで無限ループ送信
# verbose: 詳細な送信メッセージの表示制御 (デフォルトはTrue)
# iface: 送信するインターフェースを指定
# return_packets: 送信したパケットのリストを返すか (デフォルトはFalse)

# sr() / sr1() / srp() / srp1() のオプション
ans, unans = sr(IP(dst="192.168.1.1/24")/TCP(dport=80, flags="S"),
                timeout=1,         # タイムアウト時間(秒)
                retry=2,           # 再送回数(タイムアウトした場合)
                verbose=0,         # 詳細表示レベル (0: 静か, 1: 通常, 2: 詳細)
                multi=True,        # 複数の応答を待つか (sr/srpのみ)
                filter="tcp port 80", # 受信時のBPFフィルタ
                iface="eth0")      # 使用するインターフェース

# sr1() の使用例
response = sr1(IP(dst="8.8.8.8")/ICMP(), timeout=1, verbose=0)
if response:
    response.show()
else:
    print("No response received.")

# sr() の結果の確認
if ans:
    print("Received answers:")
    ans.summary()
    # 例: 最初の応答ペア (送信パケット, 受信パケット) を表示
    # ans[0][0].show() # Sent packet
    # ans[0][1].show() # Received packet
if unans:
    print("Unanswered packets:")
    unans.summary()
      

🔍 パケットのスニッフィングとキャプチャ

ネットワーク上を流れるパケットをキャプチャし、ファイルに保存・読み込みします。

sniff() 関数

sniff() 関数はネットワークインターフェースからパケットをキャプチャします。


# 基本的なスニッフィング (指定した数のパケットをキャプチャ)
packets = sniff(count=10)
packets.summary()

# インターフェースを指定してスニッフィング
packets_eth0 = sniff(iface="eth0", count=5)

# BPFフィルタを使用して特定のパケットのみキャプチャ
# 例: TCPポート80のパケットのみ
tcp_packets = sniff(filter="tcp port 80", count=10)
# 例: 特定ホストからのICMPパケット
icmp_from_host = sniff(filter="icmp and src host 192.168.1.1", count=5)
# BPF構文: host, net, port, proto (tcp, udp, icmp, arp), src, dst, and, or, not など

# タイムアウトを指定 (指定時間後にスニッフィング終了)
timed_sniff = sniff(timeout=10)

# パケット受信ごとに関数を実行 (prn)
def print_packet_summary(packet):
    print(packet.summary())

sniff(prn=print_packet_summary, count=5)

# 受信パケットをラムダ関数でフィルタリング (lfilter)
# 例: ARPパケットのみ処理
arp_packets = sniff(lfilter=lambda pkt: pkt.haslayer(ARP), count=3)

# スニッフィングを停止せずにパケットを処理 (非同期スニッフィング)
# from scapy.all import AsyncSniffer (または sniff(store=False, prn=...) )
sniffer = AsyncSniffer(prn=lambda x: x.summary(), filter="udp port 53")
sniffer.start()
# ... 何か他の処理 ...
# time.sleep(10) # 例: 10秒間スニッフィング
results = sniffer.stop() # スニッフィングを停止し、結果を取得 (オプション)

# store=False でメモリにパケットを保存しない (大量トラフィック向け)
sniff(prn=lambda x: x.summary(), store=False)

# セッションを使用して高度なスニッフィング (例: IPフラグメント再構築)
sniff(filter="tcp", session=IPSession, prn=lambda x: x.summary())
# 利用可能なセッション: IPSession, TCPSession など
      

PCAPファイルの操作 💾

キャプチャしたパケットをPCAPファイルに保存したり、既存のPCAPファイルを読み込んだりします。


# パケットリストをPCAPファイルに書き込む
packets_to_save = sniff(count=5)
wrpcap("capture.pcap", packets_to_save)

# 既存のPCAPファイルを読み込む
read_packets = rdpcap("capture.pcap")
read_packets.summary()

# PCAPファイルに追記する
more_packets = sniff(count=3)
wrpcap("capture.pcap", more_packets, append=True)

# 巨大なPCAPファイルをジェネレータとして読み込む (メモリ効率が良い)
# from scapy.utils import PcapReader
for pkt in PcapReader("large_capture.pcap"):
    # 各パケットに対する処理
    if pkt.haslayer(TCP):
        print(pkt.summary())

# WrpcapSink を使ってリアルタイムに書き込む (Scapy Pipesの一部)
# from scapy.pipetool import WrpcapSink
# sink = WrpcapSink("output.pcap")
# sink.start()
# # ... パケットを生成またはスニッフィングして sink.write(packet) ...
# sink.stop()
      

🛠️ パケットの解析と操作

キャプチャしたパケットや作成したパケットの内部を解析し、内容を変更します。

レイヤーへのアクセス


packet = Ether()/IP(dst="8.8.8.8")/TCP(dport=80)

# インデックスでアクセス (0から)
ether_layer = packet[0]
ip_layer = packet[1]
tcp_layer = packet[2]

# レイヤークラス名でアクセス
ip_layer_by_name = packet[IP]
tcp_layer_by_name = packet[TCP]

# 特定のレイヤーが存在するか確認
if packet.haslayer(IP):
    print("Packet has IP layer")
if packet.haslayer("TCP"): # 文字列でも指定可能
    print("Packet has TCP layer")
if not packet.haslayer(UDP):
    print("Packet does not have UDP layer")

# 特定のレイヤーを取得 (存在しない場合はNone)
udp_layer = packet.getlayer(UDP)
if udp_layer:
    udp_layer.show()
else:
    print("UDP layer not found.")

# 最上位レイヤー(ペイロード)を取得
payload = packet.payload
print(type(payload)) # 次のレイヤー (例: )
# IPレイヤーのペイロードはTCPレイヤー
print(type(packet[IP].payload)) # 
# TCPレイヤーのペイロード (もしあればRawレイヤー)
# print(type(packet[TCP].payload)) # 例: 
      

フィールドへのアクセスと変更


packet = IP(src="1.1.1.1", dst="2.2.2.2")/TCP(sport=1000, dport=2000)

# フィールド値の読み取り
source_ip = packet[IP].src
dest_port = packet[TCP].dport
print(f"Source IP: {source_ip}, Dest Port: {dest_port}")

# フィールド値の変更
print("Original packet summary:", packet.summary())
packet[IP].src = "3.3.3.3"
packet[TCP].dport = 80
packet[TCP].flags = "S" # フラグを追加
print("Modified packet summary:", packet.summary())

# チェックサムや長さフィールドの自動再計算
# Scapyは通常、パケットを送信する際やshow2()などで自動的に再計算する
# 明示的に再計算させたい場合:
# del packet[IP].chksum
# del packet[TCP].chksum
# packet.show2() # 再計算がトリガーされる

# 変更後のパケット表示
packet.show()
      

レイヤーの削除と追加


packet = Ether()/IP()/TCP()/Raw(load="Some data")
print("Original layers:")
packet.show()

# レイヤーの削除 (delを使用)
del packet[Raw]
print("\nAfter removing Raw layer:")
packet.show()

del packet.payload # 最上位レイヤーを削除 (この場合はTCP)
print("\nAfter removing payload (TCP):")
packet.show()

# レイヤーの追加 ( / 演算子を使用)
packet = Ether()/IP()
packet = packet/UDP(dport=53) # UDPレイヤーを追加
packet = packet/DNS(rd=1, qd=DNSQR(qname="test.com")) # DNSレイヤーを追加
print("\nAfter adding UDP and DNS layers:")
packet.show()
      

🚀 高度なテクニック

より複雑なネットワーク操作や自動化タスク。

Traceroute

traceroute() 関数を使用して、ターゲットまでのネットワーク経路を調査します。


target = "google.com"
# デフォルトはUDPプローブ
ans, unans = traceroute(target, maxttl=20, verbose=0)
# ans.summary()
ans.show() # より詳細な結果表示

# ICMPプローブを使用
ans_icmp, _ = traceroute(target, maxttl=15, proto="icmp", verbose=0)
ans_icmp.show()

# TCP SYNプローブを使用 (特定のポートへ)
ans_tcp, _ = traceroute(target, dport=80, maxttl=10, verbose=0)
ans_tcp.show()

# 結果をグラフで表示 (matplotlibとpygraphvizが必要な場合がある)
# ans.graph() # 新しいウィンドウで表示
# ans.graph(target="> traceroute_graph.svg") # ファイルに保存
      

ARPスキャン (ローカルネットワーク)

arping() 関数を使用して、ローカルネットワーク上のアクティブなホストを発見します。


# ローカルネットワークの範囲を指定
network = "192.168.1.0/24"
ans, unans = arping(network, verbose=0)

print("ARP Scan Results:")
if ans:
    ans.summary(lambda s, r: r.sprintf("%Ether.src% %ARP.psrc%"))
else:
    print("No hosts found.")

# よりシンプルな方法 (srpを使用)
# ans, unans = srp(Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=network), timeout=2, verbose=0)
# if ans:
#     print("Hosts found:")
#     for sent, received in ans:
#         print(f"IP: {received.psrc} - MAC: {received.hwsrc}")
      

リアルタイムでのパケット変更 (Linux – NetfilterQueue) 🐧

LinuxのNetfilterQueueと連携して、通過するパケットをリアルタイムで変更します(別途 netfilterqueue ライブラリが必要)。


# 注意: このコードを実行するには netfilterqueue ライブラリと
#       iptables でキューを設定する必要があります。
# sudo pip install netfilterqueue
# sudo iptables -I INPUT -j NFQUEUE --queue-num 1
# sudo iptables -I OUTPUT -j NFQUEUE --queue-num 1
# 処理が終わったらルールを削除:
# sudo iptables -D INPUT -j NFQUEUE --queue-num 1
# sudo iptables -D OUTPUT -j NFQUEUE --queue-num 1

try:
    from netfilterqueue import NetfilterQueue
except ImportError:
    print("NetfilterQueue is not installed. Skipping example.")
    NetfilterQueue = None

if NetfilterQueue:
    def modify_packet(packet_nfq):
        try:
            scapy_packet = IP(packet_nfq.get_payload()) # NFQペイロードをScapyパケットに変換

            # --- ここでパケットを変更するロジック ---
            if scapy_packet.haslayer(TCP) and scapy_packet[TCP].dport == 80:
                if scapy_packet.haslayer(Raw):
                    payload = scapy_packet[Raw].load.decode('utf-8', errors='ignore')
                    # 例: HTTP GETリクエストのHostヘッダーを変更 (単純な例)
                    if "Host:" in payload:
                         new_payload = payload.replace("Host: ", "Host: hacked.by.scapy. ")
                         scapy_packet[Raw].load = new_payload.encode('utf-8')
                         print("Modified HTTP Host header.")

                         # チェックサムと長さを再計算させるために削除
                         del scapy_packet[IP].len
                         del scapy_packet[IP].chksum
                         del scapy_packet[TCP].chksum

                         packet_nfq.set_payload(bytes(scapy_packet)) # 変更したパケットをNFQに戻す

            # ----------------------------------------

            packet_nfq.accept() # パケットを通過させる (変更有無に関わらず)
            # packet_nfq.drop() # パケットを破棄する場合

        except Exception as e:
            print(f"Error processing packet: {e}")
            packet_nfq.accept() # エラー時もとりあえず通過させる

    queue = NetfilterQueue()
    queue.bind(1, modify_packet) # キュー番号1にバインド

    try:
        print("Starting packet modification queue...")
        queue.run()
    except KeyboardInterrupt:
        print("Stopping queue.")
    finally:
        queue.unbind()
        print("Queue unbound. Remember to remove iptables rules.")

      

コメント

タイトルとURLをコピーしました