Python boto3 チートシート

クライアントとリソースの初期化

AWSサービスと対話するための基本的なオブジェクトを作成します。

クライアントの作成 (低レベルAPI)

サービスごとに直接APIを呼び出すためのクライアントを作成します。より細かい制御が可能です。


import boto3

# S3クライアントの作成 (デフォルト設定)
s3_client = boto3.client('s3')

# EC2クライアントの作成 (リージョン指定)
ec2_client = boto3.client('ec2', region_name='ap-northeast-1')

# DynamoDBクライアントの作成 (認証情報とリージョン指定)
dynamodb_client = boto3.client(
    'dynamodb',
    aws_access_key_id='YOUR_ACCESS_KEY',
    aws_secret_access_key='YOUR_SECRET_KEY',
    region_name='us-west-2'
)
  

リソースの作成 (高レベルAPI)

よりオブジェクト指向的なインターフェースを提供します。一部のサービスで利用可能です。


import boto3

# S3リソースの作成
s3_resource = boto3.resource('s3')

# EC2リソースの作成
ec2_resource = boto3.resource('ec2', region_name='ap-northeast-1')

# DynamoDBリソースの作成
dynamodb_resource = boto3.resource('dynamodb')

# リソースオブジェクトから特定のエンティティを取得
bucket = s3_resource.Bucket('my-bucket-name')
instance = ec2_resource.Instance('i-xxxxxxxxxxxxxxxxx')
table = dynamodb_resource.Table('my-table-name')
  

セッションの利用

認証情報やリージョン設定を共通化してクライアント/リソースを作成する場合に便利です。


import boto3

# デフォルトセッション
session = boto3.Session()
s3_client_from_session = session.client('s3')
ec2_resource_from_session = session.resource('ec2')

# 特定のプロファイルを使用するセッション
profile_session = boto3.Session(profile_name='my-profile', region_name='eu-central-1')
lambda_client = profile_session.client('lambda')

# 認証情報を直接指定するセッション
credential_session = boto3.Session(
    aws_access_key_id='YOUR_ACCESS_KEY',
    aws_secret_access_key='YOUR_SECRET_KEY',
    aws_session_token='YOUR_SESSION_TOKEN', # AssumeRoleなどで取得した場合
    region_name='ap-southeast-1'
)
iam_client = credential_session.client('iam')
  

リージョン指定

クライアント/リソース作成時、またはセッション作成時にリージョンを指定します。


import boto3

# クライアント作成時に指定
s3_client = boto3.client('s3', region_name='us-east-1')

# リソース作成時に指定
ec2_resource = boto3.resource('ec2', region_name='ap-northeast-1')

# セッション作成時に指定
session = boto3.Session(region_name='sa-east-1')
sqs_client = session.client('sqs')

# 環境変数 AWS_REGION または AWS_DEFAULT_REGION でも設定可能
# 設定ファイル (~/.aws/config) でも設定可能
# [default]
# region = ap-northeast-1
  

エンドポイントURL指定

LocalStackなど、AWS互換環境や特定のVPCエンドポイントを利用する場合に指定します。


import boto3

# LocalStackのS3エンドポイントを指定
local_s3_client = boto3.client(
    's3',
    region_name='us-east-1', # LocalStackではリージョン名は任意だが指定が必要な場合がある
    endpoint_url='http://localhost:4566', # LocalStackのデフォルトS3ポート
    aws_access_key_id='test', # LocalStack用のダミー認証情報
    aws_secret_access_key='test'
)

# LocalStackのDynamoDBリソースを指定
local_dynamodb_resource = boto3.resource(
    'dynamodb',
    region_name='us-east-1',
    endpoint_url='http://localhost:4566',
    aws_access_key_id='test',
    aws_secret_access_key='test'
)
  

S3 操作 💾

Simple Storage Service (S3) のバケットやオブジェクトを操作します。

目的 クライアントAPI (client) リソースAPI (resource)
バケット一覧取得

response = s3_client.list_buckets()
buckets = [b['Name'] for b in response['Buckets']]
print(buckets)
            

buckets = [b.name for b in s3_resource.buckets.all()]
print(buckets)
            
バケット作成

# リージョン指定が必要な場合がある (us-east-1以外)
s3_client.create_bucket(
    Bucket='my-unique-bucket-name-123',
    CreateBucketConfiguration={
        'LocationConstraint': 'ap-northeast-1' # us-east-1の場合は不要
    }
)
            

s3_resource.create_bucket(
    Bucket='my-unique-bucket-name-456',
    CreateBucketConfiguration={
        'LocationConstraint': 'ap-northeast-1'
    }
)
             
オブジェクト一覧取得

response = s3_client.list_objects_v2(Bucket='my-bucket-name')
if 'Contents' in response:
    objects = [obj['Key'] for obj in response['Contents']]
    print(objects)
# Prefix指定
response_prefix = s3_client.list_objects_v2(Bucket='my-bucket-name', Prefix='folder/')
# Paginator利用 (大量オブジェクトの場合)
paginator = s3_client.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket='my-bucket-name')
for page in pages:
    if 'Contents' in page:
        for obj in page['Contents']:
            print(obj['Key'])
            

bucket = s3_resource.Bucket('my-bucket-name')
objects = [obj.key for obj in bucket.objects.all()]
print(objects)
# Prefix指定
objects_prefix = [obj.key for obj in bucket.objects.filter(Prefix='folder/')]
print(objects_prefix)
             
オブジェクトアップロード (ファイル)

# ファイルパスからアップロード
s3_client.upload_file(
    Filename='/path/to/local/file.txt', # ローカルファイルパス
    Bucket='my-bucket-name',
    Key='destination/object/key.txt' # S3上のキー
)
# ファイルオブジェクトからアップロード
with open('/path/to/local/image.jpg', 'rb') as f:
    s3_client.upload_fileobj(
        Fileobj=f,
        Bucket='my-bucket-name',
        Key='images/image.jpg',
        ExtraArgs={'ContentType': 'image/jpeg'} # ContentTypeなどを指定
    )
            

bucket = s3_resource.Bucket('my-bucket-name')
# ファイルパスからアップロード
bucket.upload_file(
    Filename='/path/to/local/file.txt',
    Key='destination/object/key.txt'
)
# ファイルオブジェクトからアップロード
with open('/path/to/local/image.jpg', 'rb') as f:
    bucket.upload_fileobj(
        Fileobj=f,
        Key='images/image.jpg',
        ExtraArgs={'ContentType': 'image/jpeg'}
    )
# put_object (バイナリデータや文字列を直接)
s3_resource.Object('my-bucket-name', 'data/text.txt').put(Body=b'Hello S3 from resource')
            
オブジェクトアップロード (バイナリ/文字列)

# バイト列をアップロード
binary_data = b'\x01\x02\x03\x04'
s3_client.put_object(
    Bucket='my-bucket-name',
    Key='binary/data.bin',
    Body=binary_data,
    ContentType='application/octet-stream'
)
# 文字列をアップロード (UTF-8エンコード推奨)
text_data = 'こんにちは、S3!'
s3_client.put_object(
    Bucket='my-bucket-name',
    Key='text/greeting.txt',
    Body=text_data.encode('utf-8'), # バイト列に変換
    ContentType='text/plain; charset=utf-8'
)
            

s3_object = s3_resource.Object('my-bucket-name', 'binary/data_res.bin')
s3_object.put(
    Body=b'\x05\x06\x07\x08',
    ContentType='application/octet-stream'
)

s3_object_text = s3_resource.Object('my-bucket-name', 'text/greeting_res.txt')
s3_object_text.put(
    Body='Hello from resource API!'.encode('utf-8'),
    ContentType='text/plain; charset=utf-8'
)
            
オブジェクトダウンロード (ファイル)

# ファイルにダウンロード
s3_client.download_file(
    Bucket='my-bucket-name',
    Key='destination/object/key.txt',
    Filename='/path/to/save/local/file.txt' # 保存先ファイルパス
)
# ファイルオブジェクトにダウンロード
with open('/path/to/save/local/image.jpg', 'wb') as f:
    s3_client.download_fileobj(
        Bucket='my-bucket-name',
        Key='images/image.jpg',
        Fileobj=f
    )
            

bucket = s3_resource.Bucket('my-bucket-name')
# ファイルにダウンロード
bucket.download_file(
    Key='destination/object/key.txt',
    Filename='/path/to/save/local/file_res.txt'
)
# ファイルオブジェクトにダウンロード
with open('/path/to/save/local/image_res.jpg', 'wb') as f:
    bucket.download_fileobj(
        Key='images/image.jpg',
        Fileobj=f
    )
# get_object (メモリ上に読み込み)
s3_object = s3_resource.Object('my-bucket-name', 'text/greeting.txt')
response = s3_object.get()
body = response['Body'].read()
print(body.decode('utf-8'))
             
オブジェクトダウンロード (バイナリ/メモリ)

response = s3_client.get_object(
    Bucket='my-bucket-name',
    Key='binary/data.bin'
)
# StreamingBody オブジェクトが返る
body_stream = response['Body']
# 全て読み込む
binary_content = body_stream.read()
print(binary_content)
# ストリームとして扱うことも可能
# for chunk in body_stream.iter_chunks(chunk_size=1024):
#     process(chunk)
body_stream.close() # 必要に応じてクローズ
            

s3_object = s3_resource.Object('my-bucket-name', 'text/greeting.txt')
response = s3_object.get()
# response['Body'] は StreamingBody
body = response['Body'].read()
print(body.decode('utf-8')) # テキストの場合はデコード
response['Body'].close()
            
オブジェクト削除

# 単一オブジェクト削除
s3_client.delete_object(
    Bucket='my-bucket-name',
    Key='destination/object/key.txt'
)
# 複数オブジェクト削除
response = s3_client.delete_objects(
    Bucket='my-bucket-name',
    Delete={
        'Objects': [
            {'Key': 'images/image.jpg'},
            {'Key': 'binary/data.bin'},
        ],
        'Quiet': False # Trueにすると成功したオブジェクトは返さない
    }
)
print(response.get('Deleted'))
print(response.get('Errors'))
            

s3_object = s3_resource.Object('my-bucket-name', 'text/greeting.txt')
s3_object.delete()

bucket = s3_resource.Bucket('my-bucket-name')
response = bucket.delete_objects(
    Delete={
        'Objects': [
            {'Key': 'folder/file1.txt'},
            {'Key': 'folder/file2.txt'},
        ]
    }
)
print(response.get('Deleted'))
            
署名付きURL生成

# GETリクエスト用URL (ダウンロード)
get_url = s3_client.generate_presigned_url(
    'get_object',
    Params={'Bucket': 'my-bucket-name', 'Key': 'private/document.pdf'},
    ExpiresIn=3600 # URLの有効期限 (秒)
)
print(f"Download URL: {get_url}")

# PUTリクエスト用URL (アップロード)
put_url = s3_client.generate_presigned_url(
    'put_object',
    Params={'Bucket': 'my-upload-bucket', 'Key': 'user-uploads/new_file.zip'},
    ExpiresIn=600,
    HttpMethod='PUT' # HTTPメソッド指定
)
print(f"Upload URL: {put_url}")
# アップロード例: requests.put(put_url, data=file_content)
            

リソースAPIには直接的な generate_presigned_url メソッドはありません。クライアントAPIを使用します。


# リソースオブジェクトからクライアントを取得して使用
s3_object = s3_resource.Object('my-bucket-name', 'private/document.pdf')
presigned_url = s3_object.meta.client.generate_presigned_url(
    'get_object',
    Params={'Bucket': s3_object.bucket_name, 'Key': s3_object.key},
    ExpiresIn=3600
)
print(presigned_url)
            

マルチパートアップロード

大容量ファイルを効率的にアップロードします。自動的に行われるupload_fileupload_fileobjの内部でも利用されていますが、手動で制御することも可能です。


import boto3
from boto3.s3.transfer import TransferConfig

# 設定 (しきい値や同時実行数など)
config = TransferConfig(
    multipart_threshold=1024 * 25, # 25MBを超えたらマルチパート開始
    max_concurrency=10,           # 最大同時アップロード数
    multipart_chunksize=1024 * 25, # 各パートのサイズ (25MB)
    use_threads=True              # スレッドを使用
)

s3 = boto3.client('s3')

# upload_file で設定を適用
s3.upload_file(
    '/path/to/large/file.zip',
    'my-large-file-bucket',
    'destination/large_file.zip',
    Config=config
)

# upload_fileobj でも同様
with open('/path/to/another/large_file.mov', 'rb') as f:
    s3.upload_fileobj(
        f,
        'my-large-file-bucket',
        'videos/large_movie.mov',
        Config=config
    )
   

EC2 操作 💻

Elastic Compute Cloud (EC2) のインスタンスや関連リソースを管理します。

目的 クライアントAPI (client) リソースAPI (resource)
インスタンス一覧取得

# 全てのインスタンス情報取得
response = ec2_client.describe_instances()
for reservation in response['Reservations']:
    for instance in reservation['Instances']:
        print(f"ID: {instance['InstanceId']}, State: {instance['State']['Name']}")

# 特定の状態のインスタンスをフィルタリング
response_running = ec2_client.describe_instances(
    Filters=[{'Name': 'instance-state-name', 'Values': ['running']}]
)
# タグでフィルタリング
response_tag = ec2_client.describe_instances(
    Filters=[{'Name': 'tag:Environment', 'Values': ['production']}]
)
            

# 全てのインスタンス
for instance in ec2_resource.instances.all():
    print(f"ID: {instance.id}, State: {instance.state['Name']}")

# 実行中のインスタンス
for instance in ec2_resource.instances.filter(
    Filters=[{'Name': 'instance-state-name', 'Values': ['running']}]
):
    print(f"Running ID: {instance.id}")

# タグでフィルタリング
for instance in ec2_resource.instances.filter(
    Filters=[{'Name': 'tag:Project', 'Values': ['Blue']}]
):
     print(f"Project Blue ID: {instance.id}")
            
インスタンス情報取得 (ID指定)

instance_id = 'i-xxxxxxxxxxxxxxxxx'
response = ec2_client.describe_instances(InstanceIds=[instance_id])
if response['Reservations']:
    instance_info = response['Reservations'][0]['Instances'][0]
    print(f"Private IP: {instance_info.get('PrivateIpAddress')}")
else:
    print(f"Instance {instance_id} not found.")
            

instance_id = 'i-yyyyyyyyyyyyyyyyy'
try:
    instance = ec2_resource.Instance(instance_id)
    instance.load() # 情報をロード (APIコール発生)
    print(f"Instance Type: {instance.instance_type}")
    print(f"Public DNS: {instance.public_dns_name}")
except ClientError as e:
    if e.response['Error']['Code'] == 'InvalidInstanceID.NotFound':
        print(f"Instance {instance_id} not found.")
    else:
        raise e
            
インスタンス起動

response = ec2_client.run_instances(
    ImageId='ami-xxxxxxxxxxxxxxxxx', # AMI ID
    InstanceType='t2.micro',
    MinCount=1,
    MaxCount=1,
    KeyName='my-key-pair', # キーペア名
    SecurityGroupIds=['sg-xxxxxxxxxxxxxxxxx'], # セキュリティグループID
    SubnetId='subnet-xxxxxxxxxxxxxxxxx', # サブネットID
    TagSpecifications=[ # タグ付け
        {
            'ResourceType': 'instance',
            'Tags': [
                {'Key': 'Name', 'Value': 'MyBoto3Instance'},
                {'Key': 'Environment', 'Value': 'Development'},
            ]
        },
    ]
    # UserData='#!/bin/bash\nyum update -y' # 起動スクリプト
)
instance_id = response['Instances'][0]['InstanceId']
print(f"Started instance: {instance_id}")
# 起動完了を待つ (Waiter)
waiter = ec2_client.get_waiter('instance_running')
waiter.wait(InstanceIds=[instance_id])
print(f"Instance {instance_id} is now running.")
            

instances = ec2_resource.create_instances(
    ImageId='ami-yyyyyyyyyyyyyyyyy',
    InstanceType='t3.small',
    MinCount=1,
    MaxCount=1,
    KeyName='my-other-key-pair',
    SecurityGroupIds=['sg-yyyyyyyyyyyyyyyyy'],
    SubnetId='subnet-yyyyyyyyyyyyyyyyy',
    TagSpecifications=[
        {'ResourceType': 'instance', 'Tags': [{'Key': 'Name', 'Value': 'MyResourceInstance'}]}
    ]
)
instance = instances[0] # create_instancesはリストを返す
print(f"Created instance: {instance.id}")
# 起動完了を待つ
instance.wait_until_running()
instance.load() # 状態を更新
print(f"Instance {instance.id} is running with state: {instance.state['Name']}")
             
インスタンス停止

instance_ids = ['i-xxxxxxxxxxxxxxxxx', 'i-zzzzzzzzzzzzzzzzz']
response = ec2_client.stop_instances(InstanceIds=instance_ids)
print(f"Stopping instances: {response['StoppingInstances']}")
# 停止完了を待つ (Waiter)
waiter = ec2_client.get_waiter('instance_stopped')
waiter.wait(InstanceIds=instance_ids)
print(f"Instances {instance_ids} are stopped.")
            

instance_id = 'i-yyyyyyyyyyyyyyyyy'
instance = ec2_resource.Instance(instance_id)
response = instance.stop()
print(f"Stopping instance {instance.id}: {response['StoppingInstances']}")
# 停止完了を待つ
instance.wait_until_stopped()
instance.load()
print(f"Instance {instance.id} is stopped with state: {instance.state['Name']}")
            
インスタンス再起動

instance_ids = ['i-xxxxxxxxxxxxxxxxx']
ec2_client.reboot_instances(InstanceIds=instance_ids)
print(f"Rebooting instances: {instance_ids}")
            

instance_id = 'i-yyyyyyyyyyyyyyyyy'
instance = ec2_resource.Instance(instance_id)
instance.reboot()
print(f"Rebooting instance {instance.id}")
            
インスタンス終了

instance_ids = ['i-xxxxxxxxxxxxxxxxx', 'i-zzzzzzzzzzzzzzzzz']
response = ec2_client.terminate_instances(InstanceIds=instance_ids)
print(f"Terminating instances: {response['TerminatingInstances']}")
# 終了完了を待つ (Waiter)
waiter = ec2_client.get_waiter('instance_terminated')
waiter.wait(InstanceIds=instance_ids)
print(f"Instances {instance_ids} are terminated.")
            

instance_id = 'i-yyyyyyyyyyyyyyyyy'
instance = ec2_resource.Instance(instance_id)
response = instance.terminate()
print(f"Terminating instance {instance.id}: {response['TerminatingInstances']}")
# 終了完了を待つ
instance.wait_until_terminated()
# 終了後はload()するとエラーになることが多い
print(f"Instance {instance.id} is terminated.")
            

セキュリティグループ操作


# セキュリティグループ作成
sg_response = ec2_client.create_security_group(
    GroupName='my-boto3-sg',
    Description='Security group created via boto3',
    VpcId='vpc-xxxxxxxxxxxxxxxxx' # VPC IDを指定
)
sg_id = sg_response['GroupId']
print(f"Created Security Group: {sg_id}")

# インバウンドルール追加 (SSH from anywhere)
ec2_client.authorize_security_group_ingress(
    GroupId=sg_id,
    IpPermissions=[
        {
            'IpProtocol': 'tcp',
            'FromPort': 22,
            'ToPort': 22,
            'IpRanges': [{'CidrIp': '0.0.0.0/0'}]
        }
    ]
)
print(f"Added SSH rule to {sg_id}")

# アウトバウンドルール削除 (例: デフォルトの全許可ルールを削除)
# まずデフォルトルールを取得する必要がある場合がある
# describe_security_groups で詳細を確認
# ec2_client.revoke_security_group_egress(...)

# セキュリティグループ削除
# ec2_client.delete_security_group(GroupId=sg_id) # 依存関係がない場合のみ可能
  

DynamoDB 操作 📦

NoSQLデータベースサービス DynamoDB のテーブルや項目を操作します。

目的 クライアントAPI (client) リソースAPI (resource – Tableオブジェクト)
テーブル一覧取得

response = dynamodb_client.list_tables()
print(response['TableNames'])
            

# リソースオブジェクト自体には list_tables がない
# クライアントAPIを使うか、Sessionからクライアントを取得する
dynamodb_client_from_resource = dynamodb_resource.meta.client
response = dynamodb_client_from_resource.list_tables()
print(response['TableNames'])
             
項目追加/更新 (PutItem)

# 型を指定する必要がある (S: String, N: Number, B: Binary, BOOL: Boolean, ...)
response = dynamodb_client.put_item(
    TableName='my-data-table',
    Item={
        'userId': {'S': 'user123'},
        'timestamp': {'N': '1678886400'},
        'data': {'M': { # Map型
            'message': {'S': 'Hello from client'},
            'is_processed': {'BOOL': False}
        }},
        'tags': {'L': [ # List型
            {'S': 'tag1'},
            {'S': 'tag2'}
        ]}
    }
)
print(response)
            

import decimal # 数値型にはDecimalを使うのが推奨される

table = dynamodb_resource.Table('my-data-table')
response = table.put_item(
   Item={
        'userId': 'user456',
        'timestamp': decimal.Decimal('1678887000'), # Decimalを使用
        'data': {
            'message': 'Hello from resource',
            'is_processed': True
        },
        'tags': ['tagA', 'tagB'],
        'optional_attr': None # Noneは属性として保存されない
    }
)
print(response)
            
項目取得 (GetItem)

response = dynamodb_client.get_item(
    TableName='my-data-table',
    Key={
        'userId': {'S': 'user123'},
        'timestamp': {'N': '1678886400'} # パーティションキーとソートキー (あれば)
    },
    ProjectionExpression='userId, data.message' # 取得する属性を指定
)
if 'Item' in response:
    print(response['Item'])
else:
    print("Item not found.")
            

table = dynamodb_resource.Table('my-data-table')
response = table.get_item(
    Key={
        'userId': 'user456',
        'timestamp': decimal.Decimal('1678887000')
    },
    ProjectionExpression='tags, data.is_processed'
)
if 'Item' in response:
    item = response['Item']
    print(item)
    # Decimal型は Decimal オブジェクトとして返る
    print(type(item.get('timestamp'))) # 
else:
    print("Item not found.")
            
項目更新 (UpdateItem)

response = dynamodb_client.update_item(
    TableName='my-data-table',
    Key={
        'userId': {'S': 'user123'},
        'timestamp': {'N': '1678886400'}
    },
    UpdateExpression='SET data.is_processed = :val1, #tag_attr[0] = :val2 ADD version :inc', # SET, REMOVE, ADD, DELETE
    ExpressionAttributeNames={ # 予約語を避けるため (#)
        '#tag_attr': 'tags'
    },
    ExpressionAttributeValues={ # 値をプレースホルダで渡す (:)
        ':val1': {'BOOL': True},
        ':val2': {'S': 'updated_tag'},
        ':inc': {'N': '1'} # 数値のインクリメント
    },
    ReturnValues='UPDATED_NEW' # 更新後の値を返す
)
print(response['Attributes'])
            

table = dynamodb_resource.Table('my-data-table')
response = table.update_item(
    Key={
        'userId': 'user456',
        'timestamp': decimal.Decimal('1678887000')
    },
    UpdateExpression='SET data.message = :msg REMOVE optional_attr ADD counter :c',
    ExpressionAttributeValues={
        ':msg': 'Updated message via resource',
        ':c': decimal.Decimal('1') # カウンターアトム操作
    },
    ReturnValues='ALL_NEW' # 更新後の全属性を返す
)
print(response['Attributes'])
             
項目削除 (DeleteItem)

response = dynamodb_client.delete_item(
    TableName='my-data-table',
    Key={
        'userId': {'S': 'user123'},
        'timestamp': {'N': '1678886400'}
    },
    ReturnValues='ALL_OLD' # 削除前の値を返す
)
if 'Attributes' in response:
    print("Deleted item:", response['Attributes'])
else:
    print("Item already deleted or not found.")
            

table = dynamodb_resource.Table('my-data-table')
response = table.delete_item(
    Key={
        'userId': 'user456',
        'timestamp': decimal.Decimal('1678887000')
    },
    ConditionExpression='attribute_exists(data.message)' # 条件付き削除
    # ReturnValues='ALL_OLD'
)
print(response) # 成功時はメタデータのみ、失敗時は例外発生
            
クエリ (Query)

# 特定のuserIdを持つ項目をtimestampで降順に取得
response = dynamodb_client.query(
    TableName='my-data-table',
    KeyConditionExpression='userId = :uid', # パーティションキーは必須
    FilterExpression='data.is_processed = :processed', # 結果に対する追加フィルタ
    ExpressionAttributeValues={
        ':uid': {'S': 'user789'},
        ':processed': {'BOOL': True}
    },
    ProjectionExpression='timestamp, data.message',
    ScanIndexForward=False # Falseで降順、True(デフォルト)で昇順
)
print(response['Items'])
            

from boto3.dynamodb.conditions import Key, Attr

table = dynamodb_resource.Table('my-data-table')
# 特定のuserIdで、timestampが特定の値より大きい項目を取得
response = table.query(
    KeyConditionExpression=Key('userId').eq('user789') & Key('timestamp').gt(decimal.Decimal('1678880000')),
    FilterExpression=Attr('tags').contains('important') | Attr('data.message').begins_with('urgent'),
    ProjectionExpression='userId, timestamp, tags'
)
print(response['Items'])
# GSI (Global Secondary Index) を利用する場合
# response_gsi = table.query(
#    IndexName='my-gsi-name',
#    KeyConditionExpression=Key('gsi_partition_key').eq('value')
# )
             
スキャン (Scan)

# テーブル全体をスキャンして特定の属性を持つ項目を取得 (非効率なので注意)
response = dynamodb_client.scan(
    TableName='my-data-table',
    FilterExpression='attribute_exists(error_code)',
    ProjectionExpression='userId, timestamp, error_code'
)
items = response['Items']
# 結果が1MBを超える場合は LastEvaluatedKey を使ってページングが必要
while 'LastEvaluatedKey' in response:
    response = dynamodb_client.scan(
        TableName='my-data-table',
        FilterExpression='attribute_exists(error_code)',
        ProjectionExpression='userId, timestamp, error_code',
        ExclusiveStartKey=response['LastEvaluatedKey']
    )
    items.extend(response['Items'])
print(f"Found {len(items)} items with error_code.")
            

from boto3.dynamodb.conditions import Attr

table = dynamodb_resource.Table('my-data-table')
# 特定のタグを含む項目をスキャン
response = table.scan(
    FilterExpression=Attr('tags').contains('archive')
)
items = response['Items']
# リソースAPIのscanもページングが必要な場合がある
while 'LastEvaluatedKey' in response:
    response = table.scan(
        FilterExpression=Attr('tags').contains('archive'),
        ExclusiveStartKey=response['LastEvaluatedKey']
    )
    items.extend(response['Items'])
print(f"Found {len(items)} archived items.")
             

AWS Lambda 関数の管理と実行を行います。

関数一覧取得


import boto3

lambda_client = boto3.client('lambda')

response = lambda_client.list_functions()
functions = response.get('Functions', [])
for func in functions:
    print(f"Name: {func['FunctionName']}, Runtime: {func['Runtime']}")

# Markerを使ったページネーション
while response.get('NextMarker'):
    response = lambda_client.list_functions(Marker=response['NextMarker'])
    functions.extend(response.get('Functions', []))

print(f"\nTotal functions: {len(functions)}")
    

関数実行 (同期)

関数が完了するまで待機し、結果を受け取ります。


import json

function_name = 'my-lambda-function'
payload = {'key1': 'value1', 'key2': 123}

response = lambda_client.invoke(
    FunctionName=function_name,
    InvocationType='RequestResponse', # 同期実行 (デフォルト)
    Payload=json.dumps(payload) # ペイロードはJSON文字列 (バイト列でも可)
    # LogType='Tail' # 実行ログをレスポンスに含める (Base64エンコードされる)
)

print(f"Status Code: {response['StatusCode']}") # 200なら成功

if 'FunctionError' in response:
    print(f"Function Error: {response['FunctionError']}")
    # エラーペイロードがある場合
    error_payload = json.loads(response['Payload'].read())
    print(f"Error Payload: {error_payload}")
else:
    # 成功時のペイロード
    result_payload = json.loads(response['Payload'].read())
    print(f"Result Payload: {result_payload}")

# LogType='Tail' を指定した場合
# if 'LogResult' in response:
#     import base64
#     log_result = base64.b64decode(response['LogResult']).decode('utf-8')
#     print("\nExecution Log:")
#     print(log_result)
    

関数実行 (非同期)

関数を起動するだけで、完了を待たずにすぐにレスポンスが返ります。


function_name = 'my-async-lambda-function'
payload = {'message': 'Process this asynchronously'}

response = lambda_client.invoke(
    FunctionName=function_name,
    InvocationType='Event', # 非同期実行
    Payload=json.dumps(payload)
)

print(f"Status Code: {response['StatusCode']}") # 202なら起動成功
# 非同期実行ではPayloadやFunctionErrorは返らない
    

関数作成 (Zipファイルから)

デプロイパッケージ (Zip) を用いて新しい関数を作成します。


# 事前に lambda_function.py を含む deployment.zip を用意
# zip deployment.zip lambda_function.py

with open('deployment.zip', 'rb') as f:
    zip_content = f.read()

response = lambda_client.create_function(
    FunctionName='my-new-boto3-function',
    Runtime='python3.9',
    Role='arn:aws:iam::123456789012:role/MyLambdaExecutionRole', # 実行ロールARN
    Handler='lambda_function.lambda_handler', # ファイル名.関数名
    Code={'ZipFile': zip_content},
    Description='Function created via boto3',
    Timeout=15, # 秒
    MemorySize=256, # MB
    Publish=True, # Trueにすると $LATEST と同時にバージョンも発行
    Tags={
        'Project': 'Boto3Demo',
        'Creator': 'Script'
    }
    # Environment={'Variables': {'MY_VAR': 'my_value'}} # 環境変数
)

print(f"Created function ARN: {response['FunctionArn']}")
print(f"Version: {response.get('Version')}") # Publish=Trueの場合
    

関数コード更新


# 新しいコードを含む updated_deployment.zip を用意

with open('updated_deployment.zip', 'rb') as f:
    new_zip_content = f.read()

response = lambda_client.update_function_code(
    FunctionName='my-new-boto3-function',
    ZipFile=new_zip_content,
    Publish=True # 更新後、新しいバージョンを発行
)

print(f"Updated function: {response['FunctionName']}")
print(f"New Version: {response['Version']}")
    

関数設定更新


response = lambda_client.update_function_configuration(
    FunctionName='my-new-boto3-function',
    Description='Updated description',
    Timeout=30,
    MemorySize=512,
    Environment={
        'Variables': {
            'MY_VAR': 'new_value',
            'ANOTHER_VAR': 'added_value'
        }
    }
    # Role='arn:aws:iam::...' # 実行ロールの変更
    # Handler='new_handler.handler' # ハンドラの変更
)
print(f"Updated configuration for {response['FunctionName']}")
    

関数削除


function_name = 'my-new-boto3-function'
# qualifier = '1' # 特定のバージョンを削除する場合

try:
    lambda_client.delete_function(FunctionName=function_name)
    print(f"Successfully requested deletion of function: {function_name}")
except lambda_client.exceptions.ResourceNotFoundException:
    print(f"Function {function_name} not found.")
except Exception as e:
    print(f"Error deleting function {function_name}: {e}")

    

エラーハンドリングと待機 ⏳

API呼び出し時のエラー処理や、リソースの状態変化を待つ方法です。

ClientError 例外処理

boto3 が AWS API からのエラーを受け取った際に発生する例外です。


from botocore.exceptions import ClientError
import boto3

s3_client = boto3.client('s3')
non_existent_bucket = 'this-bucket-surely-does-not-exist-abcdef123'

try:
    response = s3_client.head_bucket(Bucket=non_existent_bucket)
    print(f"Bucket {non_existent_bucket} exists.") # 通常ここは実行されない
except ClientError as e:
    error_code = e.response['Error']['Code']
    error_message = e.response['Error']['Message']
    http_status_code = e.response['ResponseMetadata']['HTTPStatusCode']

    print(f"An error occurred ({error_code}): {error_message}")
    print(f"HTTP Status Code: {http_status_code}")

    if error_code == 'NoSuchBucket':
        print("The specified bucket does not exist.")
    elif error_code == 'AccessDenied':
        print("Permission denied to access the bucket.")
    else:
        print("An unexpected error occurred.")

# 特定のエラーコードだけを捕捉することも可能 (あまり推奨されない)
# except s3_client.exceptions.NoSuchBucket as e:
#     print("Caught NoSuchBucket specifically.")
  

Waiter の利用

特定のリソースが目的の状態になるまでポーリングして待機します。


import boto3
from botocore.exceptions import WaiterError

ec2_client = boto3.client('ec2')
s3_client = boto3.client('s3')

# --- EC2インスタンスの実行完了待ち ---
instance_id = 'i-xxxxxxxxxxxxxxxxx' # 起動直後のインスタンスID
print(f"Waiting for instance {instance_id} to be running...")
instance_running_waiter = ec2_client.get_waiter('instance_running')
try:
    instance_running_waiter.wait(
        InstanceIds=[instance_id],
        WaiterConfig={ # オプション: ポーリング間隔と最大試行回数
            'Delay': 15, # ポーリング間隔 (秒)
            'MaxAttempts': 40 # 最大試行回数 (デフォルト40回)
        }
    )
    print(f"Instance {instance_id} is now running! 🎉")
except WaiterError as e:
    print(f"Waiter failed for instance {instance_id}: {e}")


# --- S3オブジェクトの存在待ち ---
bucket_name = 'my-bucket-name'
object_key = 'path/to/expected/file.txt'
print(f"Waiting for object s3://{bucket_name}/{object_key} to exist...")
object_exists_waiter = s3_client.get_waiter('object_exists')
try:
    object_exists_waiter.wait(
        Bucket=bucket_name,
        Key=object_key,
        WaiterConfig={'Delay': 5, 'MaxAttempts': 12}
    )
    print(f"Object s3://{bucket_name}/{object_key} now exists! ✅")
except WaiterError as e:
    print(f"Waiter failed for object s3://{bucket_name}/{object_key}: {e}")

# 利用可能なWaiterはサービスごとに異なります
# print(ec2_client.waiter_names)
# print(s3_client.waiter_names)
  

Paginator の利用

大量の結果を返す可能性のあるAPI呼び出しを、ページ単位で簡単に処理します。


import boto3

s3_client = boto3.client('s3')
iam_client = boto3.client('iam')

# --- S3オブジェクト一覧 (大量の場合) ---
bucket_name = 'my-large-bucket'
print(f"Listing objects in bucket {bucket_name} using paginator:")
paginator = s3_client.get_paginator('list_objects_v2')

# paginate() はイテレータを返す
page_iterator = paginator.paginate(Bucket=bucket_name, Prefix='archive/')

object_count = 0
try:
    for page in page_iterator:
        # print(f"--- Processing Page ---")
        if 'Contents' in page:
            for obj in page['Contents']:
                # print(f"  Key: {obj['Key']}, Size: {obj['Size']}")
                object_count += 1
        # 他のページ情報も利用可能 (例: CommonPrefixes)
        # if 'CommonPrefixes' in page:
        #     print(f"  Common Prefixes: {[cp['Prefix'] for cp in page['CommonPrefixes']]}")

except ClientError as e:
    print(f"Error listing objects: {e}")

print(f"\nTotal objects found in prefix 'archive/': {object_count}")


# --- IAMユーザー一覧 ---
print("\nListing IAM users using paginator:")
paginator_iam = iam_client.get_paginator('list_users')
user_iterator = paginator_iam.paginate(PaginationConfig={'PageSize': 50}) # 1ページあたりの取得数

user_count = 0
for page in user_iterator:
    for user in page.get('Users', []):
        # print(f"  User: {user['UserName']}, ARN: {user['Arn']}")
        user_count += 1

print(f"\nTotal IAM users found: {user_count}")

# build_full_result() ですべての結果を結合した辞書を取得することも可能 (メモリ注意)
# full_result = page_iterator.build_full_result()
# all_objects = full_result.get('Contents', [])
# print(f"Total objects (using build_full_result): {len(all_objects)}")
  

高度な設定 ⚙️

boto3 の動作をカスタマイズするための設定です。

リトライ設定

API呼び出しが失敗した場合の再試行動作を制御します。botocore.config.Config を使用します。


import boto3
from botocore.config import Config

# デフォルトのリトライモードは 'legacy'
# 'standard' や 'adaptive' モードに変更可能

# Standardモード: 最大3回リトライ (一般的なエラーコード)
config_standard = Config(
    retries={
        'max_attempts': 4, # 初回 + 最大3回リトライ
        'mode': 'standard'
    }
)
s3_client_standard = boto3.client('s3', config=config_standard)

# Adaptiveモード: クライアントサイドでレート制限を考慮し、より積極的にリトライ
# (ThrottlingException などに対して効果的)
config_adaptive = Config(
    retries={
        'max_attempts': 6,
        'mode': 'adaptive'
    }
)
dynamodb_client_adaptive = boto3.client('dynamodb', config=config_adaptive)

# リトライを無効化
config_no_retry = Config(
    retries={'max_attempts': 0}
)
lambda_client_no_retry = boto3.client('lambda', config=config_no_retry)

# リトライ設定の確認 (実際のオブジェクトから)
# print(s3_client_standard.meta.config.retries)
# print(dynamodb_client_adaptive.meta.config.retries)
# print(lambda_client_no_retry.meta.config.retries)

# --- 使用例 ---
try:
    # Throttling が発生しやすい操作を Adaptive モードで実行
    response = dynamodb_client_adaptive.scan(TableName='my-very-busy-table')
    # ... process response ...
except ClientError as e:
    print(f"DynamoDB scan failed after adaptive retries: {e}")

   

タイムアウト設定

API呼び出しの接続タイムアウトと読み取りタイムアウトを設定します。


import boto3
from botocore.config import Config

# 接続タイムアウトを5秒、読み取りタイムアウトを10秒に設定
config_timeout = Config(
    connect_timeout=5,  # 接続確立までのタイムアウト (秒)
    read_timeout=10     # レスポンス受信開始までのタイムアウト (秒)
)

s3_client_with_timeout = boto3.client('s3', config=config_timeout)
sqs_client_with_timeout = boto3.client('sqs', region_name='us-west-2', config=config_timeout)

# --- 使用例 ---
try:
    # ネットワークが不安定な場合にタイムアウトが発生する可能性がある
    response = sqs_client_with_timeout.receive_message(
        QueueUrl='https://sqs.us-west-2.amazonaws.com/123456789012/MyQueue',
        WaitTimeSeconds=5 # ロングポーリング時間とは別
    )
    # ... process messages ...
except ClientError as e:
    # botocore.exceptions.ConnectTimeoutError や ReadTimeoutError は
    # ClientError としてラップされることが多いが、直接補足できる場合もある
    print(f"SQS receive failed: {e}")
    if 'ConnectTimeoutError' in str(e):
        print("Connection timed out.")
    elif 'ReadTimeoutError' in str(e):
        print("Read timed out.")

   

プロキシ設定

HTTP/HTTPSプロキシ経由でAWS APIに接続する場合に設定します。


import boto3
from botocore.config import Config

# プロキシサーバーのアドレスを指定
proxies = {
    'http': 'http://proxy.example.com:8080',
    'https': 'http://secureproxy.example.com:8443', # HTTPSプロキシもhttp://で始まる場合がある
}

# プロキシ設定を含むConfigオブジェクトを作成
config_proxy = Config(
    proxies=proxies
)

# プロキシを使用するクライアントを作成
ec2_client_proxy = boto3.client('ec2', region_name='eu-west-1', config=config_proxy)

# プロキシ認証が必要な場合 (環境変数で設定する方が一般的)
# proxies_auth = {
#     'http': 'http://user:password@proxy.example.com:8080',
#     'https': 'http://user:password@secureproxy.example.com:8443',
# }
# config_proxy_auth = Config(proxies=proxies_auth)
# secrets_manager_proxy = boto3.client('secretsmanager', config=config_proxy_auth)

# 環境変数 HTTP_PROXY, HTTPS_PROXY, NO_PROXY を設定することでも
# boto3 (内部の urllib3/requests) はプロキシ設定を認識します。
# export HTTPS_PROXY=http://secureproxy.example.com:8443

# --- 使用例 ---
try:
    response = ec2_client_proxy.describe_regions()
    print("Successfully connected via proxy to describe regions.")
    print([r['RegionName'] for r in response['Regions']])
except ClientError as e:
    print(f"Failed to connect via proxy: {e}")