Pythonライブラリ PySAML2 詳細解説:SAML認証をマスターしよう! 🔑

プログラミング

現代のWebアプリケーション開発において、セキュリティは最重要課題の一つです。特に、複数のサービス間でユーザー認証情報を安全に連携させるシングルサインオン(SSO)は、利便性とセキュリティの両面から多くのシステムで採用されています。このSSOを実現するための標準規格の一つが SAML 2.0 (Security Assertion Markup Language 2.0) です。

そして、Python環境でSAML 2.0を扱うための強力なライブラリが pysaml2 です。このライブラリは、SAML 2.0の仕様に準拠した純粋なPython実装であり、SAMLの主要な役割である サービスプロバイダー(SP)アイデンティティプロバイダー(IdP) の両方を構築するために必要な機能を提供します。

この記事では、pysaml2 の基本的な概念から、インストール、設定、実際の使い方までを詳しく解説していきます。SAML認証の導入を検討している方、pysaml2 を使ってみたいと考えている方の助けになれば幸いです 😊。

SAML 2.0 と PySAML2 の基本

SAML 2.0 とは?

SAML 2.0は、異なるセキュリティドメイン間で認証および認可データを交換するためのXMLベースのオープンスタンダードです。主に、IdPとSPという二つの役割の間で情報をやり取りします。

  • アイデンティティプロバイダー (IdP): ユーザーの認証を行い、認証情報(アサーション)を生成・提供するエンティティです。例えば、企業の認証基盤 (Azure AD, Oktaなど) や、自社で構築した認証サーバーがこれにあたります。
  • サービスプロバイダー (SP): ユーザーが利用したいサービスを提供するエンティティです。IdPから受け取ったアサーションを検証し、ユーザーにサービスへのアクセスを許可します。Webアプリケーションやクラウドサービスなどが該当します。

ユーザーがSPにアクセスすると、SPはユーザーをIdPにリダイレクトします。IdPでユーザーが認証されると、IdPはユーザー情報を含むSAMLアサーションを生成し、ユーザーを介してSPに送り返します。SPはこのアサーションを検証し、問題がなければユーザーのログインを許可します。これがSAMLによるSSOの基本的な流れです。

PySAML2 とは?

pysaml2 は、前述のSAML 2.0プロトコルをPythonで実装するためのライブラリです。これを使うことで、開発者はSAML 2.0の詳細なXML構造やプロトコルの流れを直接意識することなく、SPやIdPの機能をアプリケーションに組み込むことができます。

主な特徴は以下の通りです:

  • ✅ 純粋なPython実装: 多くの環境で利用可能です。 (現在は主にPython 3をサポート)
  • ✅ SPとIdPの両方をサポート: アプリケーションの要件に合わせてどちらの役割も実装できます。
  • ✅ WSGI互換: DjangoやFlaskなどのWebフレームワークと連携しやすい設計です。
  • ✅ 豊富な設定オプション: 様々なSAMLシナリオに対応するための詳細な設定が可能です。
  • ✅ XML署名・暗号化のサポート: セキュアな通信に必要な機能を提供します。(外部ライブラリ xmlsec1 が必要)
  • ✅ メタデータの処理: SPとIdP間の情報交換に必要なメタデータの生成・解釈をサポートします。
注意: pysaml2 は、SAMLメッセージの署名や暗号化のために xmlsec1 という外部コマンドラインツールに依存しています。利用する環境に事前にインストールしておく必要があります。

インストール 💻

pysaml2 のインストールは、Pythonのパッケージ管理ツールである pip を使って簡単に行えます。

pip install pysaml2

ただし、前述の通り、pysaml2 はXML署名・暗号化のために xmlsec1 バイナリを必要とします。お使いのOSに応じて、事前に xmlsec1 をインストールしてください。

Linux (Debian/Ubuntu):

sudo apt-get update
sudo apt-get install xmlsec1 libxmlsec1-openssl

Linux (RHEL/CentOS/Fedora):

sudo yum install xmlsec1 xmlsec1-openssl

macOS (Homebrew):

brew install xmlsec1

また、依存関係として他のライブラリもインストールされます(例: cryptography, defusedxml, requests など)。

🚀 これで pysaml2 を使う準備が整いました!

設定ファイルの構造 ⚙️

pysaml2 の動作は、Pythonの辞書形式で記述された設定ファイルによって制御されます。SP、IdP、または属性オーソリティ(AA)のいずれを実行する場合でも、設定ファイルの基本的な形式は同じですが、使用されるディレクティブ(設定項目)の一部が異なります。

設定ファイルは通常、Pythonモジュール (.py ファイル) として作成され、その中に CONFIG という名前の辞書を定義します。

以下は、IdPの設定ファイルの基本的な構造例です:

# -*- coding: utf-8 -*-
from saml2 import BINDING_HTTP_REDIRECT

CONFIG = {
    # === 基本情報 ===
    "entityid": "https://idp.example.com/idp.xml",  # IdP/SPの一意な識別子 (URI形式)
    "description": "My Example IdP",             # 説明(任意)
    "name": "Example IdP",                       # 表示名(任意)

    # === サービス設定 ===
    "service": {
        # --- IdPの設定 ---
        "idp": {
            "name": "Example IdP Service",
            "endpoints": {
                # シングルサインオンサービス (SSO) エンドポイント
                "single_sign_on_service": [
                    ("https://idp.example.com/sso/redirect", BINDING_HTTP_REDIRECT),
                    # 他のバインディングも追加可能 (例: BINDING_HTTP_POST)
                ],
                # シングルログアウトサービス (SLO) エンドポイント
                "single_logout_service": [
                    ("https://idp.example.com/slo/redirect", BINDING_HTTP_REDIRECT),
                ],
            },
            "policy": {
                "default": {
                    "lifetime": {"minutes": 15},  # アサーションの有効期間
                    "attribute_restrictions": None, # 属性制限 (Noneは制限なし)
                    "name_form": "urn:oasis:names:tc:SAML:2.0:attrname-format:uri",
                },
            },
            "subject_data": "./idp_subject.db", # ユーザー情報DB (例)
            "name_id_format": ["urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress"],
            # SPごとの設定 (任意)
            # "sp": {
            #     "https://sp.example.com/sp.xml": { ... }
            # }
        },
        # --- SPの設定 (IdPとSPを兼ねる場合) ---
        # "sp": { ... }
    },

    # === 鍵と証明書 ===
    "key_file": "pki/mykey.pem",         # 自身の秘密鍵ファイル (署名・復号用)
    "cert_file": "pki/mycert.pem",        # 自身の公開鍵証明書ファイル (署名検証・暗号化用)
    # "encryption_keypairs": [{ ... }], # 暗号化専用の鍵ペア (任意)
    # "ca_certs": "pki/cacert.pem",     # 信頼するCA証明書 (任意)

    # === メタデータ ===
    "metadata": {
        # ローカルに保存されたメタデータファイル (IdPはSPのメタデータ、SPはIdPのメタデータ)
        "local": ["metadata/sp.xml"],
        # メタデータURL (動的に取得する場合)
        # "remote": [{"url": "https://sp.example.com/metadata.xml", "cert": "sp_cert.pem"}]
    },

    # === 属性関連 ===
    "attribute_map_dir": "attributemaps", # 属性名をマッピングするディレクトリ

    # === セキュリティ・その他 ===
    "xmlsec_binary": "/usr/bin/xmlsec1",  # xmlsec1コマンドのパス (環境に合わせて変更)
    "logging": {                         # ロギング設定
        "version": 1,
        "formatters": { ... },
        "handlers": { ... },
        "loggers": { ... },
    },
    "debug": 1,                          # デバッグモード (1=有効, 0=無効)
    "verify_ssl_cert": True,             # メタデータ取得時などのSSL証明書検証 (True推奨)
    "accepted_time_diff": 60,            # 許容される時刻のずれ(秒)
    "organization": {                    # 組織情報 (メタデータ用)
        "name": [("Example Org", "en")],
        "display_name": [("Example Organization", "en")],
        "url": [("http://www.example.org", "en")],
    },
    "contact_person": [                  # 連絡先情報 (メタデータ用)
        {
            "given_name": "Support",
            "sur_name": "Team",
            "email_address": ["support@example.org"],
            "contact_type": "technical",
        }
    ],
    # その他の詳細設定...
}

主要な設定項目について見ていきましょう。

設定キー 説明 役割 備考
entityid SAMLエンティティ(SPまたはIdP)の一意な識別子。通常はURL形式です。 共通 非常に重要。SPとIdPの間でこのIDを使って互いを識別します。
service 提供するサービス(”sp”, “idp”, “aa”)とその詳細設定を含む辞書。 共通 SPまたはIdP、あるいはその両方の設定をここで行います。
service["idp"] IdPとしての設定。SSO/SLOエンドポイント、ポリシー、NameIDフォーマットなどを定義します。 IdP
service["sp"] SPとしての設定。ACS (Assertion Consumer Service)エンドポイント、SLOエンドポイント、利用するIdPの情報などを定義します。 SP
key_file 自身の秘密鍵(PEM形式)へのパス。SAMLメッセージへの署名や、受信した暗号化アサーションの復号に使用されます。 共通 セキュリティ上、厳重に管理する必要があります。
cert_file 自身の公開鍵証明書(PEM形式)へのパス。署名を検証してもらうため、また暗号化に使用してもらうために相手(IdP/SP)に提供します。メタデータに含まれることが多いです。 共通 通常、秘密鍵とペアになっています。
metadata 連携する相手(IdPまたはSP)のメタデータ情報を指定します。localでローカルファイル、remoteでURLを指定できます。 共通 メタデータには相手のentityID、エンドポイントURL、公開鍵証明書などが含まれます。
attribute_map_dir 属性名マッピングファイルを格納するディレクトリへのパス。異なるシステム間で属性名を変換するために使用されます。 共通 例: IdPが送る mail 属性を、SP側で email として扱いたい場合など。
xmlsec_binary xmlsec1 コマンドへのフルパス。 共通 環境によってパスが異なるため、正しく設定する必要があります。
debug デバッグログを有効にするかどうか (1=有効, 0=無効)。 共通 開発中は 1 に設定すると問題解決に役立ちます。
verify_ssl_cert remote でメタデータを取得する際などに、相手サーバーのSSL証明書を検証するかどうか。 共通 本番環境では True にすることが強く推奨されます。
accepted_time_diff SAMLメッセージのタイムスタンプと現在時刻との許容される差(秒)。ネットワーク遅延などを考慮します。 共通 デフォルトは60秒程度。
organization / contact_person 自身のメタデータに含める組織情報や連絡先情報。 共通 相手に提供するメタデータに表示されます。
service["sp"]["authn_requests_signed"] SPがIdPに送信する認証リクエスト (AuthnRequest) に署名するかどうか。 SP IdPが署名を要求する場合に True にします。
service["sp"]["want_assertions_signed"] IdPから受信するアサーションが署名されていることを要求するかどうか。 SP セキュリティを高めるために True にすることが推奨されます。

これらの設定項目を適切に構成することで、pysaml2 は指定された役割(SPまたはIdP)として機能し、他のSAMLエンティティと通信できるようになります。設定ファイルはアプリケーションの要件に応じてカスタマイズが必要です。

💡 メタデータの生成

pysaml2 の配布物には、設定ファイルから自身のメタデータXMLファイルを生成するためのスクリプト (make_metadata.py など) が含まれていることがあります。これを利用すると、手動でXMLを作成する手間が省けます。生成されたメタデータファイルを連携先のIdP/SPに提供します。

基本的な使い方 (SP編) 🚀

ここでは、Webアプリケーション(例: Flask)をSAML SPとして動作させる基本的な流れを見ていきます。ユーザーが保護されたリソースにアクセスしようとした際に、IdPにリダイレクトして認証を行い、認証後に戻ってきたユーザーの情報を処理します。

1. 設定ファイルの準備

まず、上記で説明したようなSP用の設定ファイル (sp_conf.py など) を作成します。entityid, key_file, cert_file, metadata (IdPのメタデータ情報), xmlsec_binary, および service["sp"]["endpoints"]["assertion_consumer_service"] (ACS URL) などを正しく設定します。

2. Saml2Client の初期化

アプリケーション内で pysaml2.client.Saml2Client を初期化します。このクライアントオブジェクトが、SAMLリクエストの生成やレスポンスの処理を行います。

import os
from saml2 import config
from saml2.client import Saml2Client

# 設定ファイルのパス
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
CONFIG_FILE = os.path.join(BASE_DIR, 'sp_conf.py')

# 設定オブジェクトのロード
sp_config = config.Config()
sp_config.load_file(CONFIG_FILE) # ファイルからロードする場合

# Saml2Client インスタンスの作成
sp_client = Saml2Client(config=sp_config)

3. 認証リクエスト (AuthnRequest) の開始

ユーザーが認証を必要とするページにアクセスした際に、IdPへのリダイレクト処理を開始します。

from flask import Flask, redirect, request, session, url_for
from saml2 import BINDING_HTTP_REDIRECT

app = Flask(__name__)
app.secret_key = 'your_very_secret_key' # Flaskセッションのために必要

# ... (sp_client の初期化は上記参照) ...

# 設定からIdPのentityIDを取得 (通常は一つだが、複数設定も可能)
# ここでは最初のIdPを使う例
idp_entity_id = list(sp_client.config.metadata.identity_providers())[0]

@app.route('/login')
def login():
    # 既に認証済みかチェック (実際のアプリではもっと堅牢なチェックが必要)
    if 'user_info' in session:
        return redirect(url_for('user_profile'))

    # どのIdPに対して認証要求を送るか指定
    # required_idp_entity_id = "https://idp.example.com/idp.xml" # 特定のIdPを指定する場合
    required_idp_entity_id = idp_entity_id # 設定ファイルから取得したIdP

    # RelayState: IdP認証後に戻ってくる際にSPが状態を保持するための値 (例: 元々アクセスしようとしたURL)
    relay_state = url_for('user_profile', _external=True) # 認証後に遷移させたいURL

    # 使用するSSO Binding (通常はHTTP-Redirect)
    binding = BINDING_HTTP_REDIRECT

    # AuthnRequestを作成し、リダイレクト先のURLとヘッダーを取得
    try:
        session_id, result = sp_client.prepare_for_authenticate(
            entityid=required_idp_entity_id,
            relay_state=relay_state,
            binding=binding,
            # sign=True, # リクエストに署名する場合 (設定ファイルで指定も可能)
            # force_authn=False, # 強制再認証を要求するかどうか
        )
    except Exception as e:
        # エラーハンドリング
        print(f"Error preparing AuthnRequest: {e}")
        return "SAML Error", 500

    # result はリダイレクトに必要な情報を含む辞書
    # result['url'] にリダイレクト先URL
    # result['headers'] に含めるべきHTTPヘッダー (通常はLocationヘッダー)

    # Flaskのリダイレクト機能を使ってIdPへリダイレクト
    # headers は [(HeaderName, HeaderValue), ...] の形式
    response = redirect(result['url'], code=302)
    for header, value in result['headers']:
        response.headers[header] = value

    # pysaml2内部で状態を追跡するためにセッション情報を保存
    session['saml_outstanding_queries'] = {session_id: relay_state}

    return response

@app.route('/protected')
def protected_resource():
    if 'user_info' not in session:
        # 認証されていなければログインページへリダイレクト
        return redirect(url_for('login'))
    # 認証済みユーザー向けの処理
    return f"Welcome {session['user_info'].get('uid', 'Unknown User')}! This is a protected resource."

@app.route('/profile')
def user_profile():
     if 'user_info' not in session:
        return redirect(url_for('login'))
     user_info = session['user_info']
     # ユーザー情報の表示など
     return f"

User Profile

{user_info}
"

prepare_for_authenticate メソッドが、AuthnRequestを生成し、IdPへのリダイレクトに必要なURLとHTTPヘッダー(通常は Location ヘッダー)を返します。Flaskの redirect を使ってユーザーをIdPのログインページへ転送します。同時に、pysaml2 が後でレスポンスを照合できるように、セッションに関連情報を保存します (saml_outstanding_queries)。

4. 認証レスポンス (SAMLResponse) の処理 (ACS)

IdPでの認証が成功すると、IdPはユーザーをSPのACS (Assertion Consumer Service) URLにリダイレクト(またはPOST)して戻します。このリクエストには SAMLResponse パラメータが含まれています。ACSエンドポイントでこのレスポンスを処理します。

from saml2.response import AuthnResponse, StatusError, VerificationError

# ACSエンドポイント (設定ファイルの service["sp"]["endpoints"]["assertion_consumer_service"] で指定したURL)
@app.route('/saml/acs/', methods=['POST']) # IdPからのレスポンス形式に合わせて GET or POST
def acs():
    # セッションから outstanding_queries を取得
    outstanding_queries = session.get('saml_outstanding_queries', {})

    # リクエストから SAMLResponse を取得
    saml_response_data = request.form.get('SAMLResponse') # POSTの場合
    # saml_response_data = request.args.get('SAMLResponse') # GETの場合 (Redirect Binding)

    if not saml_response_data:
        return "Missing SAMLResponse", 400

    try:
        # SAMLResponse をパースして検証
        # outstanding_queries を渡して、対応するリクエストがあるか確認
        authn_response: AuthnResponse = sp_client.parse_authn_request_response(
            saml_response_data,
            BINDING_HTTP_POST, # IdPが使用したBindingを指定
            outstanding_queries
        )
    except StatusError as e:
        # IdPがエラーを返した場合
        print(f"SAML Status Error: {e}")
        return f"IdP returned an error: {e}", 400
    except VerificationError as e:
        # 署名検証などに失敗した場合
        print(f"SAML Verification Error: {e}")
        return f"Failed to verify SAML response: {e}", 400
    except Exception as e:
        # その他のパースエラーなど
        print(f"Error processing SAML response: {e}")
        return "Error processing SAML response", 500

    if authn_response is None:
        # 応答が不正、または outstanding_queries に対応するIDがない場合など
        print("Could not parse AuthnResponse or session mismatch.")
        return "Invalid SAML response or session mismatch", 400

    # 処理済みの outstanding query ID を取得 (RelayState を取り出すため)
    session_id = authn_response.in_response_to
    if session_id in outstanding_queries:
        original_relay_state = outstanding_queries.pop(session_id)
        session['saml_outstanding_queries'] = outstanding_queries # セッション更新
    else:
        # 対応するリクエストがない (不正なレスポンスの可能性)
        print(f"Unsolicited response or session mismatch for ID: {session_id}")
        return "Unsolicited response or session mismatch", 400


    # 認証成功! ユーザー情報を取得
    user_identity = authn_response.get_identity() # ユーザー属性を含む辞書
    user_name_id = authn_response.get_nameid() # NameIDオブジェクト
    # session_info = authn_response.session_info() # セッション情報 (インデックスなど)

    # 取得したユーザー情報をセッションに保存
    session['user_info'] = user_identity
    session['user_name_id'] = str(user_name_id) # 必要に応じて文字列化

    print(f"User authenticated: {user_identity}")
    print(f"NameID: {user_name_id}")

    # RelayStateで指定されたURL (認証後に遷移したかった元のURL) にリダイレクト
    # RelayStateがない場合はデフォルトのページへ
    redirect_url = original_relay_state or url_for('user_profile')
    return redirect(redirect_url)

parse_authn_request_response メソッドが、受け取った SAMLResponse をデコードし、署名の検証、時刻の検証、アサーションの復号(必要な場合)などを行います。検証が成功すると、AuthnResponse オブジェクトが返され、get_identity() でユーザー属性、get_nameid() でNameIDを取得できます。これらの情報をセッションに保存し、ユーザーを適切なページ(RelayStateで指定されたURLなど)にリダイレクトします。

5. ログアウト (SLO)

シングルログアウト (SLO) を実装する場合、SPからログアウトリクエストを開始するか、IdPからのログアウトリクエストを処理する必要があります。

from saml2 import BINDING_HTTP_REDIRECT, BINDING_HTTP_POST
from saml2.response import LogoutResponse, StatusSuccess

@app.route('/logout')
def logout():
    if 'user_info' not in session or 'user_name_id' not in session:
        # 既にログアウトしているか、認証情報がない
        return redirect(url_for('home')) # ホーム画面などにリダイレクト

    # NameIDとSessionIndexを取得 (SLOリクエストに必要)
    name_id_str = session.get('user_name_id')
    # session_index = session.get('saml_session_index') # IdPによっては必要

    if not name_id_str:
         return "Missing NameID for logout", 500

    try:
        # LogoutRequest を作成
        # IdPのSLOエンドポイント情報などは設定ファイルから読み込まれる
        session_id, result = sp_client.global_logout(
            subject_id=name_id_str,
            # session_index=session_index, # IdPが必要とする場合
            # reason="urn:oasis:names:tc:SAML:2.0:logout:user",
            binding=BINDING_HTTP_REDIRECT # IdPのSLOエンドポイントが対応するBinding
        )
    except Exception as e:
        print(f"Error initiating SLO: {e}")
        # SLOが開始できなくてもローカルセッションは破棄する
        session.pop('user_info', None)
        session.pop('user_name_id', None)
        session.pop('saml_outstanding_queries', None)
        # session.pop('saml_session_index', None)
        return redirect(url_for('home')) # エラーがあってもホームへ

    # ログアウト要求の状態を保存
    session['saml_logout_queries'] = {session_id: url_for('home', _external=True)} # ログアウト後に戻るURL

    # IdPのSLOエンドポイントへリダイレクト
    response = redirect(result['url'], code=302)
    for header, value in result['headers']:
        response.headers[header] = value

    return response

# SLOレスポンス処理エンドポイント (IdPから戻ってくる)
@app.route('/saml/sls/response/', methods=['GET', 'POST']) # Bindingによる
def sls_response():
    binding = BINDING_HTTP_REDIRECT if request.method == 'GET' else BINDING_HTTP_POST
    logout_response_data = request.args.get('SAMLResponse') if binding == BINDING_HTTP_REDIRECT else request.form.get('SAMLResponse')

    if not logout_response_data:
        return "Missing SAMLResponse for SLO", 400

    outstanding_logout_queries = session.get('saml_logout_queries', {})

    try:
        # LogoutResponse をパースして検証
        logout_resp: LogoutResponse = sp_client.parse_logout_request_response(
            logout_response_data,
            binding,
            outstanding=outstanding_logout_queries
        )
    except Exception as e:
        print(f"Error processing SLO response: {e}")
        # エラーでもローカルセッションは破棄してホームへ
        session.pop('user_info', None)
        session.pop('user_name_id', None)
        session.pop('saml_outstanding_queries', None)
        session.pop('saml_logout_queries', None)
        return redirect(url_for('home'))

    if logout_resp is None:
        print("Could not parse LogoutResponse or session mismatch.")
        return "Invalid SLO response or session mismatch", 400

    # 対応するクエリIDを取得
    in_response_to = logout_resp.in_response_to
    redirect_url = url_for('home') # デフォルトのリダイレクト先
    if in_response_to in outstanding_logout_queries:
        redirect_url = outstanding_logout_queries.pop(in_response_to)
        session['saml_logout_queries'] = outstanding_logout_queries

    # ログアウト成功かどうかステータスを確認
    if logout_resp.status_ok():
        print("SLO successful according to IdP.")
    else:
        print(f"SLO failed according to IdP: {logout_resp.status_message()}")

    # ローカルセッションを破棄
    session.pop('user_info', None)
    session.pop('user_name_id', None)
    session.pop('saml_outstanding_queries', None)
    session.pop('saml_logout_queries', None)
    # session.pop('saml_session_index', None)

    return redirect(redirect_url)


# IdP Initiated SLO リクエスト処理エンドポイント
@app.route('/saml/sls/request/', methods=['GET', 'POST']) # Bindingによる
def sls_request():
    binding = BINDING_HTTP_REDIRECT if request.method == 'GET' else BINDING_HTTP_POST
    logout_request_data = request.args.get('SAMLRequest') if binding == BINDING_HTTP_REDIRECT else request.form.get('SAMLRequest')
    relay_state = request.args.get('RelayState') or request.form.get('RelayState')

    if not logout_request_data:
        return "Missing SAMLRequest for SLO", 400

    try:
        # IdPからのLogoutRequestをパース
        # 注意: response=False を指定してリクエストとして処理
        req_info = sp_client.parse_logout_request(logout_request_data, binding)
    except Exception as e:
        print(f"Error processing IdP initiated SLO request: {e}")
        # 不正なリクエストでも処理を試みる (ベストエフォート)
        # ただし、この場合LogoutResponseを返すのは難しい
        session.pop('user_info', None) # とりあえずローカルセッションは破棄
        session.pop('user_name_id', None)
        return "Error processing SLO request", 500 # エラーを返す

    if req_info is None:
         print("Could not parse IdP initiated SLO request.")
         return "Invalid IdP initiated SLO request", 400

    # リクエスト情報から NameID や SessionIndex を取得
    name_id = req_info.name_id
    session_indexes = req_info.session_indexes

    print(f"Received IdP initiated SLO request for NameID: {name_id}")
    print(f"Session Indexes: {session_indexes}")

    # 対応するローカルセッションを探して破棄
    # (実際のアプリケーションでは NameID や SessionIndex を元にセッションを特定する)
    # ここでは単純化のため、現在のセッションを破棄
    session.pop('user_info', None)
    session.pop('user_name_id', None)
    session.pop('saml_outstanding_queries', None)
    session.pop('saml_logout_queries', None)
    # session.pop('saml_session_index', None)


    # IdPにLogoutResponseを返す準備
    try:
        response_args = sp_client.response_args(req_info.message) # レスポンス生成のための引数を取得
        logout_resp = sp_client.create_logout_response(
            in_response_to=response_args['in_response_to'],
            destination=response_args['destination'],
            status=StatusSuccess, # ログアウト成功のステータス
            issuer_entityid=sp_client.config.entityid,
            sign_response=True # レスポンスに署名する (推奨)
        )
    except Exception as e:
        print(f"Error creating SLO response: {e}")
        return "Error creating SLO response", 500

    # レスポンスを返すためのHTTP Bindingを選択 (通常はリクエストと同じBinding)
    # ここでは Redirect Binding を使う例
    http_info = sp_client.apply_binding(
        binding=BINDING_HTTP_REDIRECT,
        msg_str=str(logout_resp),
        destination=response_args['destination'],
        relay_state=relay_state,
        response=True # レスポンスであることを示す
    )

    # IdPのSLOレスポンスエンドポイントへリダイレクト
    response = redirect(http_info['url'], code=302)
    for header, value in http_info['headers']:
        response.headers[header] = value

    return response


# 仮のホームページ
@app.route('/')
def home():
    if 'user_info' in session:
        logout_url = url_for('logout')
        profile_url = url_for('user_profile')
        protected_url = url_for('protected_resource')
        return f'Logged in as {session["user_info"].get("uid", "N/A")}. Profile | Protected | Logout'
    else:
        login_url = url_for('login')
        return f'You are not logged in. Login'

if __name__ == '__main__':
    # 注意: Flaskの開発サーバーはHTTPSを直接サポートしません。
    # 本番環境ではGunicorn/uWSGI + Nginx/Apache などを使用し、HTTPSを設定してください。
    # テスト目的でHTTPSが必要な場合、 `ssl_context='adhoc'` などを使えますが、自己署名証明書になります。
    app.run(debug=True, port=8087) # SPはポート8087で実行する例

SPからログアウトを開始する場合 (/logout)、global_logout メソッドでLogoutRequestを生成し、ユーザーをIdPのSLOエンドポイントにリダイレクトします。IdPがログアウト処理を完了すると、SPのSLOレスポンス処理エンドポイント (/saml/sls/response/) にLogoutResponseを送り返してきます。ここでレスポンスを検証し、ローカルセッションを破棄します。

逆に、IdPからログアウトが開始された場合 (IdP Initiated SLO)、IdPはSPのSLOリクエスト処理エンドポイント (/saml/sls/request/) にLogoutRequestを送ってきます。SPはこのリクエストを parse_logout_request で処理し、対象ユーザーのローカルセッションを破棄した後、IdPにLogoutResponseを送り返します。

重要: 上記のコードは基本的な流れを示すサンプルです。実際のアプリケーションでは、より堅牢なエラーハンドリング、セッション管理、セキュリティ対策(CSRF対策など)、IdPとの詳細な設定調整が必要です。特に、本番環境では必ずHTTPSを使用してください。

基本的な使い方 (IdP編) 🏛️

次に、pysaml2 を使ってIdPを構築する基本的な流れを見ていきます。SPからの認証リクエストを受け取り、ユーザー認証(ここでは簡略化)を行い、SAMLアサーションを生成してSPに返します。

pysaml2 の配布物には、IdPのサンプル実装 (example/idp2 など) が含まれていることがあり、これを参考にすると良いでしょう。

1. 設定ファイルの準備

IdP用の設定ファイル (idp_conf.py) を作成します。entityid, key_file, cert_file, metadata (連携するSPのメタデータ), xmlsec_binary, および service["idp"]["endpoints"] (SSO, SLOエンドポイント), service["idp"]["policy"] などを設定します。

2. IdPServer の初期化

IdPの機能を提供するために saml2.server.Server (またはIdPServer) クラスを使用します。

import os
from saml2 import config
from saml2.server import Server

# 設定ファイルのパス
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
CONFIG_FILE = os.path.join(BASE_DIR, 'idp_conf.py')

# 設定オブジェクトのロード
idp_config = config.Config()
idp_config.load_file(CONFIG_FILE)

# IdPサーバーインスタンスの作成
idp_server = Server(config=idp_config)

3. 認証リクエスト (AuthnRequest) の処理 (SSOエンドポイント)

SPからIdPのSSOエンドポイントにAuthnRequestが送られてきます。これを受け取り、パースして、ユーザー認証プロセスを開始します。

from flask import Flask, request, render_template, redirect, session, url_for, make_response
from saml2 import BINDING_HTTP_REDIRECT, BINDING_HTTP_POST
from saml2.authn_context import PASSWORD
from saml2.ident import IdentDict
from saml2.s_utils import sid, rndstr

app = Flask(__name__)
app.secret_key = 'another_very_secret_key' # セッション用

# ... (idp_server の初期化は上記参照) ...

# 簡易的なユーザーデータベース (本来はDBやLDAPなどを使う)
USER_DB = {
    'user1': {'password': 'password1', 'givenName': 'Taro', 'sn': 'Yamada', 'mail': 'taro@example.com'},
    'user2': {'password': 'password2', 'givenName': 'Hanako', 'sn': 'Sato', 'mail': 'hanako@example.com'},
}

# SSOエンドポイント (HTTP-Redirect Binding用)
@app.route('/sso/redirect')
def sso_redirect():
    # リクエストから SAMLRequest と RelayState を取得
    saml_request = request.args.get('SAMLRequest')
    relay_state = request.args.get('RelayState')
    signature = request.args.get('Signature')
    sig_alg = request.args.get('SigAlg')

    if not saml_request:
        return "Missing SAMLRequest", 400

    try:
        # AuthnRequest をパース
        req_info = idp_server.parse_authn_request(
            saml_request,
            BINDING_HTTP_REDIRECT,
            # signature=signature, # 署名検証が必要な場合
            # sigalg=sig_alg
        )
    except Exception as e:
        print(f"Error parsing AuthnRequest: {e}")
        return "Invalid SAMLRequest", 400

    if req_info is None:
        print("Could not parse AuthnRequest")
        return "Invalid SAMLRequest", 400

    # パースされたリクエスト情報をセッションに保存 (ログインフォーム表示のため)
    # req_info.message はパースされた AuthnRequest オブジェクト
    # req_info.sender() は SP の entityID
    session['authn_request_id'] = req_info.message.id
    session['relay_state'] = relay_state
    session['sp_entity_id'] = req_info.sender()
    session['binding_out'] = BINDING_HTTP_POST # SPへのレスポンスで使用するBinding (ACSが対応するもの)

    # ユーザーにログインフォームを表示
    return render_template('login.html', sp_entity_id=req_info.sender())

# ログインフォームの処理
@app.route('/login', methods=['POST'])
def handle_login():
    username = request.form.get('username')
    password = request.form.get('password')

    # セッションからリクエスト情報を取得
    authn_request_id = session.get('authn_request_id')
    relay_state = session.get('relay_state')
    sp_entity_id = session.get('sp_entity_id')
    binding_out = session.get('binding_out', BINDING_HTTP_POST)

    if not all([authn_request_id, sp_entity_id]):
        return "Session expired or invalid request", 400

    # ユーザー認証 (簡易版)
    user_info = USER_DB.get(username)
    if user_info and user_info['password'] == password:
        # 認証成功
        print(f"User '{username}' authenticated successfully.")

        # NameID を生成 (ここではemailを使用)
        name_id_format = idp_server.config.getattr("name_id_format", "idp")[0] # 設定から取得
        name_id = idp_server.ident.make_nameid(
            format=name_id_format,
            sp_name_qualifier=sp_entity_id, # SPの識別子
            name_qualifier=idp_server.config.entityid, # IdP自身の識別子
            user_id=user_info['mail']
        )

        # 認証コンテキスト (パスワード認証を使ったことを示す)
        authn_context = {
            'class_ref': PASSWORD,
            'authn_instant': sid(), # 現在時刻のタイムスタンプ
            # 'authenticating_authority': []
        }

        # ユーザー属性情報 (SPが要求するもの、またはIdPが提供するもの)
        # 属性名は attribute_map_dir でマッピングされる場合がある
        attribute_statement = {
            'givenName': [user_info['givenName']],
            'sn': [user_info['sn']],
            'mail': [user_info['mail']],
            'uid': [username], # 例としてユーザーIDも追加
        }

        try:
            # SAML アサーションを含む Response を作成
            # sp_entity_id, AuthnRequestのID, NameID, 属性, 認証コンテキストなどを渡す
            resp_args = idp_server.response_args(req_info.message) # リクエストからレスポンスに必要な情報を取得

            authn_resp = idp_server.create_authn_response(
                identity=attribute_statement, # ユーザー属性
                in_response_to=authn_request_id,
                destination=resp_args['destination'], # ACS URL
                sp_entity_id=sp_entity_id,
                userid=username, # 内部的なユーザーID (ログ用など)
                name_id=name_id,
                authn=authn_context,
                sign_response=True, # レスポンス全体に署名
                sign_assertion=True, # アサーションに署名 (両方True推奨)
                # encrypt_assertion=True, # アサーションを暗号化する場合 (SPの公開鍵が必要)
            )
        except Exception as e:
            print(f"Error creating AuthnResponse: {e}")
            return "Error creating SAML response", 500

        # 作成した Response を指定された Binding で SP に送る準備
        http_info = idp_server.apply_binding(
            binding=binding_out, # SPのACSが対応する Binding
            msg_str=f"{authn_resp}", # 文字列化
            destination=resp_args['destination'],
            relay_state=relay_state,
            response=True
        )

        # セッション情報をクリア
        session.pop('authn_request_id', None)
        session.pop('relay_state', None)
        session.pop('sp_entity_id', None)
        session.pop('binding_out', None)

        # HTTPレスポンスを生成してSPに返す
        if binding_out == BINDING_HTTP_POST:
            # POST Binding の場合はHTMLフォームを自動サブミットする形式
            body = rndstr(size=10000) # 適切なHTMLを生成する必要がある (pysaml2のサンプル参照)
            body = http_info['body'] # apply_bindingが生成したHTMLを使う
            response = make_response(body)
            # response.headers = http_info['headers'] # Content-Typeなど
            return response
        elif binding_out == BINDING_HTTP_REDIRECT:
            # Redirect Binding の場合
            response = redirect(http_info['url'], code=302)
            for header, value in http_info['headers']:
                response.headers[header] = value
            return response
        else:
            return "Unsupported binding", 500

    else:
        # 認証失敗
        print(f"Authentication failed for user '{username}'.")
        return render_template('login.html', error="Invalid username or password", sp_entity_id=sp_entity_id)

# login.html (テンプレート例)
# 


# 
# IdP Login
# 
#   

Login for {{ sp_entity_id }}

# {% if error %} #

{{ error }}

# {% endif %} #
# Username:
# Password:
# #
# #

SSOエンドポイント (/sso/redirect) は、SPからのAuthnRequestを受け取ります。parse_authn_request でリクエストを検証・パースし、必要な情報(RelayState, SPのentityIDなど)をセッションに保存して、ユーザーにログインフォームを表示します。

ユーザーがログインフォームを送信すると (/login)、入力された情報でユーザー認証を行います(ここでは簡易的な辞書を使用)。認証が成功したら、create_authn_response を使ってSAMLアサーションを含むResponseメッセージを作成します。この際、NameID、ユーザー属性、認証コンテキストなどを指定します。最後に、apply_binding を使って、SPのACS URLへ指定されたBinding(通常はHTTP-POST)でResponseを送り返します。

4. シングルログアウト (SLO) の処理

IdPはSPからのLogoutRequestを受け付けるエンドポイントと、自身がログアウトを開始した場合にSPにLogoutRequestを送る処理、そしてSPからのLogoutResponseを処理するエンドポイントを持つ必要があります。実装はSP編のSLO処理と対になります。

# SLOリクエスト処理エンドポイント (SP Initiated SLO)
@app.route('/slo/redirect/request', methods=['GET']) # Bindingによる
@app.route('/slo/post/request', methods=['POST'])
def slo_request():
    binding = BINDING_HTTP_REDIRECT if request.method == 'GET' else BINDING_HTTP_POST
    saml_request = request.args.get('SAMLRequest') if binding == BINDING_HTTP_REDIRECT else request.form.get('SAMLRequest')
    relay_state = request.args.get('RelayState') if binding == BINDING_HTTP_REDIRECT else request.form.get('RelayState')

    if not saml_request:
        return "Missing SAMLRequest for SLO", 400

    try:
        # SPからのLogoutRequestをパース
        req_info = idp_server.parse_logout_request(saml_request, binding)
    except Exception as e:
        print(f"Error processing SP initiated SLO request: {e}")
        return "Error processing SLO request", 500

    if req_info is None:
        print("Could not parse SP initiated SLO request.")
        return "Invalid SP initiated SLO request", 400

    print(f"Received SLO request from {req_info.sender()} for NameID: {req_info.name_id}")

    # ここで、NameIDやSessionIndexを元に関連するセッションを無効化する処理を実装
    # (このサンプルでは省略)

    # SPにLogoutResponseを返す準備
    try:
        response_args = idp_server.response_args(req_info.message)
        logout_resp = idp_server.create_logout_response(
            in_response_to=response_args['in_response_to'],
            destination=response_args['destination'], # SPのSLOレスポンスエンドポイント
            status=StatusSuccess,
            issuer_entityid=idp_server.config.entityid,
            sign_response=True
        )
    except Exception as e:
        print(f"Error creating SLO response: {e}")
        return "Error creating SLO response", 500

    # レスポンスを返すBinding (通常はリクエストと同じか、SPが指定したもの)
    binding_out = response_args.get('binding', binding)
    http_info = idp_server.apply_binding(
        binding=binding_out,
        msg_str=str(logout_resp),
        destination=response_args['destination'],
        relay_state=relay_state,
        response=True
    )

    # レスポンスを返す
    if binding_out == BINDING_HTTP_REDIRECT:
        response = redirect(http_info['url'], code=302)
        for header, value in http_info['headers']:
            response.headers[header] = value
        return response
    elif binding_out == BINDING_HTTP_POST:
        body = http_info['body'] # HTMLフォーム
        response = make_response(body)
        # response.headers = http_info['headers']
        return response
    else:
        return "Unsupported binding for SLO response", 500


# SLOレスポンス処理エンドポイント (IdP Initiated SLO のレスポンスが返ってくる)
@app.route('/slo/redirect/response', methods=['GET']) # Bindingによる
@app.route('/slo/post/response', methods=['POST'])
def slo_response():
    binding = BINDING_HTTP_REDIRECT if request.method == 'GET' else BINDING_HTTP_POST
    saml_response = request.args.get('SAMLResponse') if binding == BINDING_HTTP_REDIRECT else request.form.get('SAMLResponse')

    if not saml_response:
        return "Missing SAMLResponse for SLO", 400

    # outstanding クエリの管理が必要 (IdP側から開始した場合)

    try:
        # SPからのLogoutResponseをパース
        resp_info = idp_server.parse_logout_request_response(saml_response, binding) # outstandingが必要
    except Exception as e:
        print(f"Error processing SLO response from SP: {e}")
        # エラーでもIdP側のログアウト処理は継続する想定
        return redirect(url_for('idp_home')) # IdPのホームなどへ

    if resp_info is None:
        print("Could not parse SLO response from SP.")
        return "Invalid SLO response from SP", 400

    if resp_info.status_ok():
        print(f"SLO response from {resp_info.sender()} is OK.")
    else:
        print(f"SLO response from {resp_info.sender()} indicates failure: {resp_info.status_message()}")

    # ログアウト完了後の処理 (例: IdPのホームページへリダイレクト)
    return redirect(url_for('idp_home'))


@app.route('/idp')
def idp_home():
    # IdPの管理画面や情報ページなど (サンプル用)
    return "IdP Home Page"

if __name__ == '__main__':
    # IdPはポート 8088 で実行する例
    # 本番環境ではHTTPSが必須
    app.run(debug=True, port=8088)
注意: IdPの実装はSPよりも複雑になる傾向があります。特に、ユーザー認証、セッション管理、複数のSPとの連携、属性の提供ポリシーなどを考慮する必要があります。上記のコードは非常に基本的な例であり、実際のIdP構築にはより多くの機能とセキュリティ対策が必要です。

より高度なトピックと考慮事項 🤔

基本的な SP/IdP の実装以外にも、pysaml2 は様々な機能を提供し、考慮すべき点があります。

  • メタデータの動的読み込み・管理: 設定ファイルの metadata セクションで remote を使用すると、URLからメタデータを定期的に取得・更新できます。また、独自のメタデータローダーを実装して、データベースなどからメタデータを読み込むことも可能です。これは、多数のSP/IdPと連携するマルチテナント環境などで役立ちます。
  • 属性マッピング: attribute_map_dir で指定したディレクトリにマッピングファイル (例: to_local.py, from_local.py) を置くことで、IdPが送信する属性名とSPが内部で使用する属性名を変換できます。これにより、異なる命名規則を持つシステム間の連携が容易になります。
  • アサーションの暗号化: SPはIdPに対してアサーションの暗号化を要求できます (want_assertions_encrypted)。IdPはSPの公開鍵(メタデータに含まれる)を使ってアサーションを暗号化し、SPは自身の秘密鍵で復号します。これにより、中間者によるアサーション内容の盗聴を防ぎます。pysaml2 では設定で暗号化を有効にできます (encrypt_assertion など)。
  • 署名の要件: SPは認証リクエスト (authn_requests_signed) やログアウトリクエスト/レスポンスに署名できます。また、IdPからのアサーション (want_assertions_signed) やレスポンス全体 (want_response_signed) の署名を要求できます。IdP側も同様に署名の生成・検証を行います。相互に署名を要求・検証することで、メッセージの改ざんやなりすましを防ぎます。
  • NameID ポリシー: SPはAuthnRequestで希望するNameIDの形式 (NameIDPolicy Format) を指定できます (例: emailAddress, persistent, transient)。IdPは可能な範囲で要求された形式、または設定されたデフォルト形式でNameIDを生成します。
  • IdP Discovery / WAYF (Where Are You From): SPが複数のIdPと連携している場合、ユーザーにどのIdPで認証するかを選択させる仕組み(IdP Discovery Service や WAYF)が必要になることがあります。pysaml2 自体はこのUIを提供しませんが、連携は可能です。
  • エラーハンドリングとロギング: SAML通信では様々なエラーが発生し得ます(設定ミス、署名検証失敗、タイムアウト、IdP側のエラーなど)。適切なエラーハンドリングと詳細なロギングは、問題発生時の調査に不可欠です。pysaml2logging 設定やデバッグモード (debug=1) が役立ちます。
  • セキュリティに関する注意点:
    • 秘密鍵は厳重に管理してください。
    • xmlsec1 が正しくインストールされ、設定されていることを確認してください。
    • 本番環境では必ずHTTPSを使用してください。
    • 署名の検証 (want_assertions_signed など) を有効にすることを強く推奨します。
    • リプレイ攻撃を防ぐため、accepted_time_diff を適切に設定し、一度処理したメッセージIDを記録しておくなどの対策を検討してください (pysaml2 は内部でこれを行います)。
    • XML External Entity (XXE) 攻撃などのXML関連の脆弱性にも注意が必要です。pysaml2defusedxml を利用して対策していますが、常に最新版を利用することが望ましいです。(過去には PySAML2 にもXXE脆弱性が報告されたことがあります – 例: JVNDB-2016-007923)

まとめ ✨

pysaml2 は、Python環境でSAML 2.0ベースのシングルサインオン (SSO) を実装するための強力で柔軟なライブラリです。SPとしてもIdPとしても機能し、Webアプリケーションに堅牢な認証連携機能を追加することができます。

この記事では、SAMLの基本から pysaml2 のインストール、設定ファイルの構造、そしてSPとIdPの基本的な実装例までを解説しました。設定項目が多く、最初は少し複雑に感じるかもしれませんが、ドキュメントやサンプルコードを参考に、一つずつ理解していくことで、強力な認証基盤を構築できるはずです。

セキュリティは常に最優先事項です。pysaml2 を利用する際は、署名や暗号化、HTTPSの利用、適切なエラーハンドリングなど、セキュリティに関するベストプラクティスに従うことを忘れないでください。

Happy SAML-ing! 🎉

Python SAML PySAML2 SSO 認証 セキュリティ

コメント

タイトルとURLをコピーしました