クライアントとリソースの初期化
AWSサービスと対話するための基本的なオブジェクトを作成します。
クライアントの作成 (低レベルAPI)
サービスごとに直接APIを呼び出すためのクライアントを作成します。より細かい制御が可能です。
import boto3
# S3クライアントの作成 (デフォルト設定)
s3_client = boto3.client('s3')
# EC2クライアントの作成 (リージョン指定)
ec2_client = boto3.client('ec2', region_name='ap-northeast-1')
# DynamoDBクライアントの作成 (認証情報とリージョン指定)
dynamodb_client = boto3.client(
'dynamodb',
aws_access_key_id='YOUR_ACCESS_KEY',
aws_secret_access_key='YOUR_SECRET_KEY',
region_name='us-west-2'
)
リソースの作成 (高レベルAPI)
よりオブジェクト指向的なインターフェースを提供します。一部のサービスで利用可能です。
import boto3
# S3リソースの作成
s3_resource = boto3.resource('s3')
# EC2リソースの作成
ec2_resource = boto3.resource('ec2', region_name='ap-northeast-1')
# DynamoDBリソースの作成
dynamodb_resource = boto3.resource('dynamodb')
# リソースオブジェクトから特定のエンティティを取得
bucket = s3_resource.Bucket('my-bucket-name')
instance = ec2_resource.Instance('i-xxxxxxxxxxxxxxxxx')
table = dynamodb_resource.Table('my-table-name')
セッションの利用
認証情報やリージョン設定を共通化してクライアント/リソースを作成する場合に便利です。
import boto3
# デフォルトセッション
session = boto3.Session()
s3_client_from_session = session.client('s3')
ec2_resource_from_session = session.resource('ec2')
# 特定のプロファイルを使用するセッション
profile_session = boto3.Session(profile_name='my-profile', region_name='eu-central-1')
lambda_client = profile_session.client('lambda')
# 認証情報を直接指定するセッション
credential_session = boto3.Session(
aws_access_key_id='YOUR_ACCESS_KEY',
aws_secret_access_key='YOUR_SECRET_KEY',
aws_session_token='YOUR_SESSION_TOKEN', # AssumeRoleなどで取得した場合
region_name='ap-southeast-1'
)
iam_client = credential_session.client('iam')
リージョン指定
クライアント/リソース作成時、またはセッション作成時にリージョンを指定します。
import boto3
# クライアント作成時に指定
s3_client = boto3.client('s3', region_name='us-east-1')
# リソース作成時に指定
ec2_resource = boto3.resource('ec2', region_name='ap-northeast-1')
# セッション作成時に指定
session = boto3.Session(region_name='sa-east-1')
sqs_client = session.client('sqs')
# 環境変数 AWS_REGION または AWS_DEFAULT_REGION でも設定可能
# 設定ファイル (~/.aws/config) でも設定可能
# [default]
# region = ap-northeast-1
エンドポイントURL指定
LocalStackなど、AWS互換環境や特定のVPCエンドポイントを利用する場合に指定します。
import boto3
# LocalStackのS3エンドポイントを指定
local_s3_client = boto3.client(
's3',
region_name='us-east-1', # LocalStackではリージョン名は任意だが指定が必要な場合がある
endpoint_url='http://localhost:4566', # LocalStackのデフォルトS3ポート
aws_access_key_id='test', # LocalStack用のダミー認証情報
aws_secret_access_key='test'
)
# LocalStackのDynamoDBリソースを指定
local_dynamodb_resource = boto3.resource(
'dynamodb',
region_name='us-east-1',
endpoint_url='http://localhost:4566',
aws_access_key_id='test',
aws_secret_access_key='test'
)
S3 操作 💾
Simple Storage Service (S3) のバケットやオブジェクトを操作します。
目的 | クライアントAPI (client) | リソースAPI (resource) |
---|---|---|
バケット一覧取得 |
|
|
バケット作成 |
|
|
オブジェクト一覧取得 |
|
|
オブジェクトアップロード (ファイル) |
|
|
オブジェクトアップロード (バイナリ/文字列) |
|
|
オブジェクトダウンロード (ファイル) |
|
|
オブジェクトダウンロード (バイナリ/メモリ) |
|
|
オブジェクト削除 |
|
|
署名付きURL生成 |
|
リソースAPIには直接的な generate_presigned_url メソッドはありません。クライアントAPIを使用します。
|
マルチパートアップロード
大容量ファイルを効率的にアップロードします。自動的に行われるupload_file
やupload_fileobj
の内部でも利用されていますが、手動で制御することも可能です。
import boto3
from boto3.s3.transfer import TransferConfig
# 設定 (しきい値や同時実行数など)
config = TransferConfig(
multipart_threshold=1024 * 25, # 25MBを超えたらマルチパート開始
max_concurrency=10, # 最大同時アップロード数
multipart_chunksize=1024 * 25, # 各パートのサイズ (25MB)
use_threads=True # スレッドを使用
)
s3 = boto3.client('s3')
# upload_file で設定を適用
s3.upload_file(
'/path/to/large/file.zip',
'my-large-file-bucket',
'destination/large_file.zip',
Config=config
)
# upload_fileobj でも同様
with open('/path/to/another/large_file.mov', 'rb') as f:
s3.upload_fileobj(
f,
'my-large-file-bucket',
'videos/large_movie.mov',
Config=config
)
EC2 操作 💻
Elastic Compute Cloud (EC2) のインスタンスや関連リソースを管理します。
目的 | クライアントAPI (client) | リソースAPI (resource) |
---|---|---|
インスタンス一覧取得 |
|
|
インスタンス情報取得 (ID指定) |
|
|
インスタンス起動 |
|
|
インスタンス停止 |
|
|
インスタンス再起動 |
|
|
インスタンス終了 |
|
|
セキュリティグループ操作
# セキュリティグループ作成
sg_response = ec2_client.create_security_group(
GroupName='my-boto3-sg',
Description='Security group created via boto3',
VpcId='vpc-xxxxxxxxxxxxxxxxx' # VPC IDを指定
)
sg_id = sg_response['GroupId']
print(f"Created Security Group: {sg_id}")
# インバウンドルール追加 (SSH from anywhere)
ec2_client.authorize_security_group_ingress(
GroupId=sg_id,
IpPermissions=[
{
'IpProtocol': 'tcp',
'FromPort': 22,
'ToPort': 22,
'IpRanges': [{'CidrIp': '0.0.0.0/0'}]
}
]
)
print(f"Added SSH rule to {sg_id}")
# アウトバウンドルール削除 (例: デフォルトの全許可ルールを削除)
# まずデフォルトルールを取得する必要がある場合がある
# describe_security_groups で詳細を確認
# ec2_client.revoke_security_group_egress(...)
# セキュリティグループ削除
# ec2_client.delete_security_group(GroupId=sg_id) # 依存関係がない場合のみ可能
DynamoDB 操作 📦
NoSQLデータベースサービス DynamoDB のテーブルや項目を操作します。
目的 | クライアントAPI (client) | リソースAPI (resource – Tableオブジェクト) |
---|---|---|
テーブル一覧取得 |
|
|
項目追加/更新 (PutItem) |
|
|
項目取得 (GetItem) |
|
|
項目更新 (UpdateItem) |
|
|
項目削除 (DeleteItem) |
|
|
クエリ (Query) |
|
|
スキャン (Scan) |
|
|
Lambda 操作 ⚡
AWS Lambda 関数の管理と実行を行います。
関数一覧取得
import boto3
lambda_client = boto3.client('lambda')
response = lambda_client.list_functions()
functions = response.get('Functions', [])
for func in functions:
print(f"Name: {func['FunctionName']}, Runtime: {func['Runtime']}")
# Markerを使ったページネーション
while response.get('NextMarker'):
response = lambda_client.list_functions(Marker=response['NextMarker'])
functions.extend(response.get('Functions', []))
print(f"\nTotal functions: {len(functions)}")
関数実行 (同期)
関数が完了するまで待機し、結果を受け取ります。
import json
function_name = 'my-lambda-function'
payload = {'key1': 'value1', 'key2': 123}
response = lambda_client.invoke(
FunctionName=function_name,
InvocationType='RequestResponse', # 同期実行 (デフォルト)
Payload=json.dumps(payload) # ペイロードはJSON文字列 (バイト列でも可)
# LogType='Tail' # 実行ログをレスポンスに含める (Base64エンコードされる)
)
print(f"Status Code: {response['StatusCode']}") # 200なら成功
if 'FunctionError' in response:
print(f"Function Error: {response['FunctionError']}")
# エラーペイロードがある場合
error_payload = json.loads(response['Payload'].read())
print(f"Error Payload: {error_payload}")
else:
# 成功時のペイロード
result_payload = json.loads(response['Payload'].read())
print(f"Result Payload: {result_payload}")
# LogType='Tail' を指定した場合
# if 'LogResult' in response:
# import base64
# log_result = base64.b64decode(response['LogResult']).decode('utf-8')
# print("\nExecution Log:")
# print(log_result)
関数実行 (非同期)
関数を起動するだけで、完了を待たずにすぐにレスポンスが返ります。
function_name = 'my-async-lambda-function'
payload = {'message': 'Process this asynchronously'}
response = lambda_client.invoke(
FunctionName=function_name,
InvocationType='Event', # 非同期実行
Payload=json.dumps(payload)
)
print(f"Status Code: {response['StatusCode']}") # 202なら起動成功
# 非同期実行ではPayloadやFunctionErrorは返らない
関数作成 (Zipファイルから)
デプロイパッケージ (Zip) を用いて新しい関数を作成します。
# 事前に lambda_function.py を含む deployment.zip を用意
# zip deployment.zip lambda_function.py
with open('deployment.zip', 'rb') as f:
zip_content = f.read()
response = lambda_client.create_function(
FunctionName='my-new-boto3-function',
Runtime='python3.9',
Role='arn:aws:iam::123456789012:role/MyLambdaExecutionRole', # 実行ロールARN
Handler='lambda_function.lambda_handler', # ファイル名.関数名
Code={'ZipFile': zip_content},
Description='Function created via boto3',
Timeout=15, # 秒
MemorySize=256, # MB
Publish=True, # Trueにすると $LATEST と同時にバージョンも発行
Tags={
'Project': 'Boto3Demo',
'Creator': 'Script'
}
# Environment={'Variables': {'MY_VAR': 'my_value'}} # 環境変数
)
print(f"Created function ARN: {response['FunctionArn']}")
print(f"Version: {response.get('Version')}") # Publish=Trueの場合
関数コード更新
# 新しいコードを含む updated_deployment.zip を用意
with open('updated_deployment.zip', 'rb') as f:
new_zip_content = f.read()
response = lambda_client.update_function_code(
FunctionName='my-new-boto3-function',
ZipFile=new_zip_content,
Publish=True # 更新後、新しいバージョンを発行
)
print(f"Updated function: {response['FunctionName']}")
print(f"New Version: {response['Version']}")
関数設定更新
response = lambda_client.update_function_configuration(
FunctionName='my-new-boto3-function',
Description='Updated description',
Timeout=30,
MemorySize=512,
Environment={
'Variables': {
'MY_VAR': 'new_value',
'ANOTHER_VAR': 'added_value'
}
}
# Role='arn:aws:iam::...' # 実行ロールの変更
# Handler='new_handler.handler' # ハンドラの変更
)
print(f"Updated configuration for {response['FunctionName']}")
関数削除
function_name = 'my-new-boto3-function'
# qualifier = '1' # 特定のバージョンを削除する場合
try:
lambda_client.delete_function(FunctionName=function_name)
print(f"Successfully requested deletion of function: {function_name}")
except lambda_client.exceptions.ResourceNotFoundException:
print(f"Function {function_name} not found.")
except Exception as e:
print(f"Error deleting function {function_name}: {e}")
エラーハンドリングと待機 ⏳
API呼び出し時のエラー処理や、リソースの状態変化を待つ方法です。
ClientError 例外処理
boto3 が AWS API からのエラーを受け取った際に発生する例外です。
from botocore.exceptions import ClientError
import boto3
s3_client = boto3.client('s3')
non_existent_bucket = 'this-bucket-surely-does-not-exist-abcdef123'
try:
response = s3_client.head_bucket(Bucket=non_existent_bucket)
print(f"Bucket {non_existent_bucket} exists.") # 通常ここは実行されない
except ClientError as e:
error_code = e.response['Error']['Code']
error_message = e.response['Error']['Message']
http_status_code = e.response['ResponseMetadata']['HTTPStatusCode']
print(f"An error occurred ({error_code}): {error_message}")
print(f"HTTP Status Code: {http_status_code}")
if error_code == 'NoSuchBucket':
print("The specified bucket does not exist.")
elif error_code == 'AccessDenied':
print("Permission denied to access the bucket.")
else:
print("An unexpected error occurred.")
# 特定のエラーコードだけを捕捉することも可能 (あまり推奨されない)
# except s3_client.exceptions.NoSuchBucket as e:
# print("Caught NoSuchBucket specifically.")
Waiter の利用
特定のリソースが目的の状態になるまでポーリングして待機します。
import boto3
from botocore.exceptions import WaiterError
ec2_client = boto3.client('ec2')
s3_client = boto3.client('s3')
# --- EC2インスタンスの実行完了待ち ---
instance_id = 'i-xxxxxxxxxxxxxxxxx' # 起動直後のインスタンスID
print(f"Waiting for instance {instance_id} to be running...")
instance_running_waiter = ec2_client.get_waiter('instance_running')
try:
instance_running_waiter.wait(
InstanceIds=[instance_id],
WaiterConfig={ # オプション: ポーリング間隔と最大試行回数
'Delay': 15, # ポーリング間隔 (秒)
'MaxAttempts': 40 # 最大試行回数 (デフォルト40回)
}
)
print(f"Instance {instance_id} is now running! 🎉")
except WaiterError as e:
print(f"Waiter failed for instance {instance_id}: {e}")
# --- S3オブジェクトの存在待ち ---
bucket_name = 'my-bucket-name'
object_key = 'path/to/expected/file.txt'
print(f"Waiting for object s3://{bucket_name}/{object_key} to exist...")
object_exists_waiter = s3_client.get_waiter('object_exists')
try:
object_exists_waiter.wait(
Bucket=bucket_name,
Key=object_key,
WaiterConfig={'Delay': 5, 'MaxAttempts': 12}
)
print(f"Object s3://{bucket_name}/{object_key} now exists! ✅")
except WaiterError as e:
print(f"Waiter failed for object s3://{bucket_name}/{object_key}: {e}")
# 利用可能なWaiterはサービスごとに異なります
# print(ec2_client.waiter_names)
# print(s3_client.waiter_names)
Paginator の利用
大量の結果を返す可能性のあるAPI呼び出しを、ページ単位で簡単に処理します。
import boto3
s3_client = boto3.client('s3')
iam_client = boto3.client('iam')
# --- S3オブジェクト一覧 (大量の場合) ---
bucket_name = 'my-large-bucket'
print(f"Listing objects in bucket {bucket_name} using paginator:")
paginator = s3_client.get_paginator('list_objects_v2')
# paginate() はイテレータを返す
page_iterator = paginator.paginate(Bucket=bucket_name, Prefix='archive/')
object_count = 0
try:
for page in page_iterator:
# print(f"--- Processing Page ---")
if 'Contents' in page:
for obj in page['Contents']:
# print(f" Key: {obj['Key']}, Size: {obj['Size']}")
object_count += 1
# 他のページ情報も利用可能 (例: CommonPrefixes)
# if 'CommonPrefixes' in page:
# print(f" Common Prefixes: {[cp['Prefix'] for cp in page['CommonPrefixes']]}")
except ClientError as e:
print(f"Error listing objects: {e}")
print(f"\nTotal objects found in prefix 'archive/': {object_count}")
# --- IAMユーザー一覧 ---
print("\nListing IAM users using paginator:")
paginator_iam = iam_client.get_paginator('list_users')
user_iterator = paginator_iam.paginate(PaginationConfig={'PageSize': 50}) # 1ページあたりの取得数
user_count = 0
for page in user_iterator:
for user in page.get('Users', []):
# print(f" User: {user['UserName']}, ARN: {user['Arn']}")
user_count += 1
print(f"\nTotal IAM users found: {user_count}")
# build_full_result() ですべての結果を結合した辞書を取得することも可能 (メモリ注意)
# full_result = page_iterator.build_full_result()
# all_objects = full_result.get('Contents', [])
# print(f"Total objects (using build_full_result): {len(all_objects)}")
高度な設定 ⚙️
boto3 の動作をカスタマイズするための設定です。
リトライ設定
API呼び出しが失敗した場合の再試行動作を制御します。botocore.config.Config
を使用します。
import boto3
from botocore.config import Config
# デフォルトのリトライモードは 'legacy'
# 'standard' や 'adaptive' モードに変更可能
# Standardモード: 最大3回リトライ (一般的なエラーコード)
config_standard = Config(
retries={
'max_attempts': 4, # 初回 + 最大3回リトライ
'mode': 'standard'
}
)
s3_client_standard = boto3.client('s3', config=config_standard)
# Adaptiveモード: クライアントサイドでレート制限を考慮し、より積極的にリトライ
# (ThrottlingException などに対して効果的)
config_adaptive = Config(
retries={
'max_attempts': 6,
'mode': 'adaptive'
}
)
dynamodb_client_adaptive = boto3.client('dynamodb', config=config_adaptive)
# リトライを無効化
config_no_retry = Config(
retries={'max_attempts': 0}
)
lambda_client_no_retry = boto3.client('lambda', config=config_no_retry)
# リトライ設定の確認 (実際のオブジェクトから)
# print(s3_client_standard.meta.config.retries)
# print(dynamodb_client_adaptive.meta.config.retries)
# print(lambda_client_no_retry.meta.config.retries)
# --- 使用例 ---
try:
# Throttling が発生しやすい操作を Adaptive モードで実行
response = dynamodb_client_adaptive.scan(TableName='my-very-busy-table')
# ... process response ...
except ClientError as e:
print(f"DynamoDB scan failed after adaptive retries: {e}")
タイムアウト設定
API呼び出しの接続タイムアウトと読み取りタイムアウトを設定します。
import boto3
from botocore.config import Config
# 接続タイムアウトを5秒、読み取りタイムアウトを10秒に設定
config_timeout = Config(
connect_timeout=5, # 接続確立までのタイムアウト (秒)
read_timeout=10 # レスポンス受信開始までのタイムアウト (秒)
)
s3_client_with_timeout = boto3.client('s3', config=config_timeout)
sqs_client_with_timeout = boto3.client('sqs', region_name='us-west-2', config=config_timeout)
# --- 使用例 ---
try:
# ネットワークが不安定な場合にタイムアウトが発生する可能性がある
response = sqs_client_with_timeout.receive_message(
QueueUrl='https://sqs.us-west-2.amazonaws.com/123456789012/MyQueue',
WaitTimeSeconds=5 # ロングポーリング時間とは別
)
# ... process messages ...
except ClientError as e:
# botocore.exceptions.ConnectTimeoutError や ReadTimeoutError は
# ClientError としてラップされることが多いが、直接補足できる場合もある
print(f"SQS receive failed: {e}")
if 'ConnectTimeoutError' in str(e):
print("Connection timed out.")
elif 'ReadTimeoutError' in str(e):
print("Read timed out.")
プロキシ設定
HTTP/HTTPSプロキシ経由でAWS APIに接続する場合に設定します。
import boto3
from botocore.config import Config
# プロキシサーバーのアドレスを指定
proxies = {
'http': 'http://proxy.example.com:8080',
'https': 'http://secureproxy.example.com:8443', # HTTPSプロキシもhttp://で始まる場合がある
}
# プロキシ設定を含むConfigオブジェクトを作成
config_proxy = Config(
proxies=proxies
)
# プロキシを使用するクライアントを作成
ec2_client_proxy = boto3.client('ec2', region_name='eu-west-1', config=config_proxy)
# プロキシ認証が必要な場合 (環境変数で設定する方が一般的)
# proxies_auth = {
# 'http': 'http://user:password@proxy.example.com:8080',
# 'https': 'http://user:password@secureproxy.example.com:8443',
# }
# config_proxy_auth = Config(proxies=proxies_auth)
# secrets_manager_proxy = boto3.client('secretsmanager', config=config_proxy_auth)
# 環境変数 HTTP_PROXY, HTTPS_PROXY, NO_PROXY を設定することでも
# boto3 (内部の urllib3/requests) はプロキシ設定を認識します。
# export HTTPS_PROXY=http://secureproxy.example.com:8443
# --- 使用例 ---
try:
response = ec2_client_proxy.describe_regions()
print("Successfully connected via proxy to describe regions.")
print([r['RegionName'] for r in response['Regions']])
except ClientError as e:
print(f"Failed to connect via proxy: {e}")