大規模データセットの類似性検索をPythonで簡単に実現
はじめに:MilvusとPyMilvusとは?
AI技術の発展に伴い、画像、テキスト、音声などの非構造化データを効率的に扱い、それらの間の類似性を見つけ出す「ベクトル検索」の重要性が高まっています。Milvusは、このベクトル検索に特化したオープンソースのベクトルデータベースです。兆単位のベクトルデータに対しても高速な類似性検索と分析を実現できるスケーラブルなプラットフォームとして設計されています。
Milvusは、データポイントを高次元のベクトル(埋め込みベクトル)として表現し、これらのベクトルを効率的に格納、インデックス付け、検索することができます。特に、近似最近傍探索(ANN)アルゴリズムをサポートしており、検索速度と精度のバランスを取りながら、リアルタイムに近い検索性能を提供します。
そして、PyMilvusは、このMilvusをPythonから簡単かつ効率的に操作するための公式SDK(Software Development Kit)です。PyMilvusを利用することで、Python開発者は、データベース操作の複雑さを意識することなく、慣れ親しんだPythonの構文で、コレクション(データベースのテーブルに相当)の管理、データの挿入・検索、インデックスの作成といったMilvusの強力な機能を活用できます。これにより、AIアプリケーションの開発が大幅に簡素化され、迅速なプロトタイピングや既存のPythonワークフローへのシームレスな統合が可能になります。
このブログ記事では、PyMilvusの基本的な使い方から主要な機能まで、具体例を交えながら詳しく解説していきます。ベクトル検索をあなたのプロジェクトに取り入れる第一歩を踏み出しましょう!💪
インストール方法
PyMilvusのインストールは非常に簡単です。pipコマンドを使用してインストールできます。
前提条件:
- Python 3.8以上が必要です。(以前のバージョンでは3.6や3.7でも動作しましたが、最新版では3.8以上が推奨されています。)
以下のコマンドを実行して、PyMilvusをインストールします。
pip install pymilvus
特定の機能(例:ベクトル生成モデルの利用、バルクインポート機能)を利用したい場合は、追加の依存関係を含めてインストールすることも可能です。
# ベクトル生成モデルを利用する場合
pip install pymilvus[model]
# または zsh を利用している場合
pip install "pymilvus[model]"
# バルクインポート機能を利用する場合
pip install pymilvus[bulk_writer]
# または zsh を利用している場合
pip install "pymilvus[bulk_writer]"
Milvusサーバーのバージョンと互換性のあるPyMilvusバージョンをインストールすることが推奨されています。特定のバージョンをインストールしたい場合は、以下のように指定します。(例として2.4.10を指定)
pip install pymilvus==2.4.10
最新バージョンにアップグレードする場合は、以下のコマンドを実行します。
pip install --upgrade pymilvus
インストールが成功したか確認するには、Pythonインタプリタで以下のコマンドを実行し、エラーが出なければOKです。
from pymilvus import Collection
また、ローカル環境でのテストや小規模なプロトタイピングには、Milvusサーバーを別途構築せずに利用できる軽量版「Milvus Lite」が便利です。Milvus LiteはPyMilvusに含まれており、特別なインストール手順なしに、ファイルパスを指定するだけで利用を開始できます。
# PyMilvusをインストールすればMilvus Liteも利用可能
pip install -U pymilvus
基本的な使い方
PyMilvusの基本的な使い方を見ていきましょう。接続、コレクション管理、データ操作の順に説明します。
1. Milvusサーバーへの接続
まず、Milvusサーバーに接続します。接続にはホスト名とポート番号を指定します。デフォルトではローカルホスト(localhost)のポート19530に接続しようとします。
接続にはconnections
モジュールを使用します。
from pymilvus import connections
# デフォルトのエイリアス 'default' で接続
try:
connections.connect(
alias="default",
host='localhost', # またはMilvusサーバーのIPアドレス
port='19530' # またはMilvusサーバーのポート番号
)
print("✅ Milvusサーバーに接続しました。")
except Exception as e:
print(f"❌ Milvusサーバーへの接続に失敗しました: {e}")
# Zilliz Cloudなどのマネージドサービスに接続する場合 (URIとトークンを使用)
# ENDPOINT = "YOUR_ZILLIZ_CLOUD_ENDPOINT_URI"
# TOKEN = "YOUR_ZILLIZ_CLOUD_API_KEY"
# try:
# connections.connect(
# alias="zilliz",
# uri=ENDPOINT,
# token=TOKEN
# )
# print("✅ Zilliz Cloudに接続しました。")
# except Exception as e:
# print(f"❌ Zilliz Cloudへの接続に失敗しました: {e}")
# Milvus Lite (ローカルファイル) を使用する場合
# from pymilvus import MilvusClient
# client = MilvusClient("milvus_demo.db") # これだけでローカルDBがセットアップされる
# print("✅ Milvus Liteをセットアップしました。")
alias
は接続に名前をつけるためのもので、複数のMilvusクラスターを使い分ける際に便利です。省略した場合はdefault
という名前が使われます。
2. コレクション管理
Milvusでは、データを格納する単位を「コレクション」と呼びます。これはリレーショナルデータベースにおけるテーブルに相当します。コレクションを作成するには、まずスキーマ(データの構造)を定義する必要があります。
スキーマ定義
スキーマは、コレクションに含まれるフィールド(列)の名前、データ型、およびその他の制約を定義します。主要なフィールドタイプには以下のようなものがあります。
DataType.INT64
: 64ビット整数型。プライマリキーによく使われます。DataType.FLOAT_VECTOR
: 浮動小数点数ベクトル型。ベクトルデータを格納します。次元数(dim
)の指定が必須です。DataType.VARCHAR
: 可変長文字列型。メタデータなどに使用します。最大長(max_length
)の指定が必要です。DataType.BOOL
: 真偽値型。DataType.FLOAT
,DataType.DOUBLE
: 浮動小数点数型。DataType.ARRAY
: 配列型。DataType.JSON
: JSON型。
各フィールドには、is_primary=True
でプライマリキーを指定したり、auto_id=True
でIDの自動生成を有効にしたりできます。また、CollectionSchema
のenable_dynamic_field=True
を設定すると、スキーマで定義されていないフィールドも動的に追加できるようになります。
from pymilvus import CollectionSchema, FieldSchema, DataType
# フィールドスキーマの定義
field1 = FieldSchema(
name="book_id", # フィールド名
dtype=DataType.INT64, # データ型
is_primary=True, # プライマリキーとして設定
description="Book primary key" # 説明 (任意)
)
field2 = FieldSchema(
name="word_count",
dtype=DataType.INT64,
description="Word count"
)
field3 = FieldSchema(
name="book_intro",
dtype=DataType.FLOAT_VECTOR, # ベクトル型
dim=768, # ベクトルの次元数
description="Vector embedding of book introduction"
)
field4 = FieldSchema(
name="title",
dtype=DataType.VARCHAR,
max_length=256, # VARCHARの最大長
description="Book title"
)
# コレクションスキーマの作成
# enable_dynamic_field=True にすると、スキーマ定義外のフィールドも追加可能になる
schema = CollectionSchema(
fields=[field1, field2, field3, field4], # フィールドのリスト
description="Book collection schema", # コレクションの説明 (任意)
enable_dynamic_field=True # ダイナミックフィールドを有効化 (任意)
)
collection_name = "my_books"
コレクションの作成、確認、削除
定義したスキーマを使ってコレクションを作成します。Collection
クラスを使用します。
from pymilvus import Collection, utility
# コレクションの作成 (指定したスキーマで)
try:
collection = Collection(
name=collection_name,
schema=schema,
using='default', # 使用する接続エイリアス (省略可)
consistency_level="Strong" # 一貫性レベル (詳細は後述)
)
print(f"✅ コレクション '{collection_name}' を作成しました。")
except Exception as e:
print(f"❌ コレクションの作成に失敗しました: {e}")
# コレクションの存在確認
exists = utility.has_collection(collection_name, using='default')
print(f"コレクション '{collection_name}' は存在しますか? {'はい' if exists else 'いいえ'}")
# 既存のコレクションオブジェクトを取得
if exists:
collection = Collection(collection_name, using='default')
print(f"既存のコレクション '{collection_name}' を取得しました。")
# コレクション一覧の表示
collections = utility.list_collections(using='default')
print(f"現在のコレクション一覧: {collections}")
# コレクションの詳細情報 (スキーマなど) を表示
# print(collection.schema)
# print(collection.description)
# print(collection.name)
# print(collection.is_empty) # データが空かどうか
# print(collection.num_entities) # 格納されているエンティティ(データ)数
# コレクションの削除
# utility.drop_collection(collection_name, using='default')
# print(f"コレクション '{collection_name}' を削除しました。")
3. データ操作
コレクションの準備ができたら、データの挿入、検索、削除などを行います。
データの挿入 (Insert)
リスト形式またはPandas DataFrame形式でデータを用意し、insert
メソッドで挿入します。スキーマで定義したフィールドに対応するデータを与えます。
import random
# 挿入するデータを作成 (リスト形式)
# スキーマで定義したフィールド名に対応するキーを持つ辞書のリスト
# または、フィールドの順序に対応するタプルのリスト
data_to_insert = [
{
"book_id": 1,
"word_count": random.randint(10000, 50000),
"book_intro": [random.random() for _ in range(768)], # 768次元のランダムなベクトル
"title": "はじめてのMilvus",
"publish_year": 2023 # スキーマ定義外のフィールド (dynamic field)
},
{
"book_id": 2,
"word_count": random.randint(15000, 60000),
"book_intro": [random.random() for _ in range(768)],
"title": "ベクトル検索入門",
"author": "AI太郎" # スキーマ定義外のフィールド (dynamic field)
}
# ... 他のデータ
]
# データを挿入
try:
insert_result = collection.insert(data_to_insert)
print(f"✅ データを挿入しました。 Primary Keys: {insert_result.primary_keys}")
# insert後、検索可能にするためにデータをフラッシュ(永続化)することが推奨される
collection.flush()
print("✅ データをフラッシュしました。")
except Exception as e:
print(f"❌ データの挿入に失敗しました: {e}")
# auto_id=True の場合は、book_id を指定せずに挿入できる
# data_auto_id = [
# {
# "word_count": random.randint(10000, 50000),
# "book_intro": [random.random() for _ in range(768)],
# "title": "続・はじめてのMilvus"
# }
# ]
# insert_result_auto = collection.insert(data_auto_id)
# print(f"AutoIDでデータを挿入しました。 Primary Keys: {insert_result_auto.primary_keys}")
# collection.flush()
データの検索 (Search)
ベクトル検索はMilvusの最も重要な機能です。指定したベクトルに類似したベクトルを持つデータを検索します。search
メソッドを使用します。
# 検索条件を設定
# 検索対象のベクトルフィールド
search_field = "book_intro"
# 検索に使用するクエリベクトル (例としてランダムなベクトルを使用)
query_vectors = [[random.random() for _ in range(768)]] # 1つのベクトルで検索する場合
# 検索パラメータ
search_params = {
"metric_type": "L2", # 距離計算メトリック (L2: ユークリッド距離, IP: 内積)
"params": {"nprobe": 10} # インデックスタイプに応じた検索パラメータ (詳細は後述)
}
# 取得する類似データの数
limit = 3
# 検索結果に含めるフィールド
output_fields = ["book_id", "title", "word_count", "publish_year"]
# オプション: フィルタリング条件 (Scalar Filtering / Hybrid Search)
# expr = "word_count > 30000 and publish_year == 2023"
expr = None
# 検索を実行
try:
results = collection.search(
data=query_vectors, # クエリベクトル (複数指定可)
anns_field=search_field, # 検索対象のベクトルフィールド
param=search_params, # 検索パラメータ
limit=limit, # 取得する結果数
expr=expr, # フィルタリング条件 (任意)
output_fields=output_fields, # 結果に含めるフィールド (任意)
consistency_level="Bounded" # 一貫性レベル (任意、詳細は後述)
)
print(f"✅ 検索を実行しました。")
# 検索結果の処理
for i, hits in enumerate(results):
print(f"クエリ {i+1} の検索結果:")
if not hits:
print(" 類似するデータは見つかりませんでした。")
continue
for hit in hits:
print(f" - ID: {hit.id}, Distance: {hit.distance:.4f}, Title: {hit.entity.get('title')}, Word Count: {hit.entity.get('word_count')}, Year: {hit.entity.get('publish_year')}")
except Exception as e:
print(f"❌ 検索に失敗しました: {e}")
search
メソッドは、各クエリベクトルに対して最も類似度の高いデータをlimit
個返します。結果はHit
オブジェクトのリストとして得られ、hit.id
でプライマリキー、hit.distance
でクエリベクトルとの距離(または類似度スコア)、hit.entity.get('フィールド名')
でoutput_fields
で指定したフィールドの値を取得できます。
IDによるデータ取得 (Get)
プライマリキーを指定して特定のデータを取得するにはget
メソッドを使用します。
# 取得したいデータのプライマリキー
pks_to_get = [1, 2] # 挿入したデータのID
try:
get_results = collection.get(
expr = f"book_id in {pks_to_get}", # 取得条件 (プライマリキーを指定)
output_fields=["title", "word_count"] # 取得するフィールド
)
print(f"✅ IDによるデータ取得を実行しました。")
for entity in get_results:
print(f" - ID: {entity.get('book_id')}, Title: {entity.get('title')}, Word Count: {entity.get('word_count')}")
except Exception as e:
print(f"❌ IDによるデータ取得に失敗しました: {e}")
データの削除 (Delete)
プライマリキーを指定してデータを削除するにはdelete
メソッドを使用します。
# 削除したいデータのプライマリキー
pks_to_delete = [1]
try:
# deleteメソッドは現在、式(expression)での指定が推奨されている
delete_expr = f"book_id in {pks_to_delete}"
delete_result = collection.delete(delete_expr)
print(f"✅ データを削除しました。削除数: {delete_result.delete_count}")
# 削除後もflushが必要な場合がある
collection.flush()
print("✅ データをフラッシュしました。")
except Exception as e:
print(f"❌ データの削除に失敗しました: {e}")
PyMilvusの主要機能 🌟
PyMilvusは基本的な操作に加え、ベクトル検索のパフォーマンスや柔軟性を向上させるための様々な機能を提供しています。
1. インデックス作成 (Indexing)
ベクトル検索の速度を大幅に向上させるためには、ベクトルフィールドにインデックスを作成することが不可欠です。Milvusは様々なタイプのインデックスをサポートしており、データの特性や検索要件(速度重視か精度重視かなど)に応じて選択できます。
代表的なインデックスタイプ:
- FLAT: ブルートフォース検索。最も精度が高いが、データ量が多いと検索速度が遅くなる。インデックス作成は不要。
- IVF_FLAT: クラスタリングに基づくインデックス。中規模データセットに適している。検索パラメータ
nprobe
で検索範囲(クラスタ数)を調整。 - IVF_SQ8: IVF_FLATのメモリ使用量を削減したバージョン(ベクトル量子化を使用)。
- HNSW (Hierarchical Navigable Small World): グラフベースのインデックス。高い検索精度と速度を両立できるが、メモリ消費量とインデックス構築時間が大きい傾向がある。検索パラメータ
ef
で検索の精度と速度のトレードオフを調整。 - AUTOINDEX: Milvusがデータに基づいて最適なインデックスタイプとパラメータを自動で選択してくれる。(推奨される場合が多い)
- 他にもDISKANN, SCANNなどがあります。
インデックスはcreate_index
メソッドで作成します。
# インデックスを作成するフィールド
index_field = "book_intro"
# インデックスパラメータの定義 (例: HNSW)
index_params = {
"metric_type":"L2", # 距離メトリック (検索時と同じものを指定)
"index_type": "HNSW", # インデックスタイプ
"params": {"M": 8, "efConstruction": 100} # インデックスタイプ固有の構築パラメータ
}
# AUTOINDEXの例
# index_params_auto = {
# "metric_type": "L2",
# "index_type": "AUTOINDEX",
# "params": {}
# }
try:
# インデックスを作成する前に、コレクションのデータをロードする必要がある場合がある
if collection.has_index():
collection.release() # ロード済みなら一度リリース
print(f"'{index_field}' フィールドにインデックスを作成します...")
collection.create_index(
field_name=index_field,
index_params=index_params
# index_name="my_hnsw_index" # インデックスに名前をつけることも可能 (任意)
)
print("✅ インデックスを作成しました。")
# 検索前にコレクションをメモリにロードする
collection.load()
print("✅ コレクションをメモリにロードしました。検索準備完了です。")
except Exception as e:
print(f"❌ インデックスの作成またはロードに失敗しました: {e}")
# インデックス情報の確認
# print(collection.index())
# インデックスの削除
# collection.drop_index()
# print("インデックスを削除しました。")
インデックスを作成した後、検索を行う前にcollection.load()
を実行して、データをメモリにロードする必要があります。これにより、検索パフォーマンスが向上します。
2. パーティション管理 (Partitioning)
大規模なコレクションを管理しやすくし、検索パフォーマンスを向上させるために、コレクションを「パーティション」に分割することができます。パーティションは、コレクション内のデータのサブセットです。検索時に特定のパーティションを指定することで、検索範囲を限定し、速度を向上させることができます。
例えば、本のジャンルごとにパーティションを作成するなどが考えられます。
# パーティションの作成
partition_name_fiction = "fiction_books"
partition_name_nonfiction = "non_fiction_books"
try:
collection.create_partition(partition_name_fiction)
collection.create_partition(partition_name_nonfiction)
print(f"✅ パーティション '{partition_name_fiction}', '{partition_name_nonfiction}' を作成しました。")
except Exception as e:
print(f"❌ パーティションの作成に失敗しました: {e}")
# パーティション一覧の表示
partitions = collection.partitions
print(f"現在のパーティション一覧: {[p.name for p in partitions]}")
# パーティションを指定してデータを挿入
data_fiction = [{
"book_id": 3, "word_count": 25000, "book_intro": [random.random() for _ in range(768)], "title": "異世界ファンタジー"
}]
collection.insert(data_fiction, partition_name=partition_name_fiction)
collection.flush()
print(f"'{partition_name_fiction}' パーティションにデータを挿入しました。")
# パーティションを指定して検索
try:
results_fiction = collection.search(
data=query_vectors,
anns_field=search_field,
param=search_params,
limit=limit,
partition_names=[partition_name_fiction] # 検索対象のパーティション名をリストで指定
)
print(f"✅ '{partition_name_fiction}' パーティションで検索を実行しました。")
# 結果処理...
except Exception as e:
print(f"❌ パーティション指定検索に失敗しました: {e}")
# パーティションの削除
# collection.drop_partition(partition_name_fiction)
# print(f"パーティション '{partition_name_fiction}' を削除しました。")
3. ハイブリッド検索 (Hybrid Search) と スカラーフィルタリング (Scalar Filtering)
ベクトル類似性検索だけでなく、通常のデータベースのように特定のフィールドの値に基づいてデータをフィルタリング(絞り込み)したい場合があります。これを実現するのが「スカラーフィルタリング」です。検索時にexpr
パラメータに条件式を指定します。
さらに、Milvus 2.4以降では、複数のベクトルフィールドに対する検索を同時に行い、その結果を統合する「ハイブリッド検索(またはマルチベクトル検索)」も可能です。これは、例えば、テキストの意味的な類似性(Dense Vector)とキーワードのマッチング(Sparse Vector)を組み合わせて検索精度を高めたい場合に有効です。
# スカラーフィルタリングの例: word_countが30000より大きく、publish_yearが2023の本を検索
expr_scalar = "word_count > 30000 and publish_year == 2023"
try:
results_filtered = collection.search(
data=query_vectors,
anns_field=search_field,
param=search_params,
limit=limit,
expr=expr_scalar, # 条件式を指定
output_fields=["title", "word_count", "publish_year"]
)
print(f"✅ スカラーフィルタリング検索を実行しました (expr='{expr_scalar}')")
# 結果処理...
for i, hits in enumerate(results_filtered):
print(f"クエリ {i+1} のフィルタリング検索結果:")
if not hits:
print(" 条件に合うデータは見つかりませんでした。")
continue
for hit in hits:
print(f" - ID: {hit.id}, Distance: {hit.distance:.4f}, Title: {hit.entity.get('title')}, Word Count: {hit.entity.get('word_count')}, Year: {hit.entity.get('publish_year')}")
except Exception as e:
print(f"❌ スカラーフィルタリング検索に失敗しました: {e}")
# ハイブリッド検索 (Multi-Vector Search) の概念 (Milvus 2.4以降)
# - スキーマに複数のベクトルフィールド (例: dense_vector, sparse_vector) を定義
# - 各ベクトルフィールドにインデックスを作成
# - searchメソッドで複数の検索リクエスト (各ベクトルフィールドに対するANN検索) をリストで指定
# - rerankパラメータで結果の統合方法を指定 (例: Weighted Reranker, RRFRanker)
# ※ PyMilvusでの具体的なAPIはドキュメントを参照してください。
スカラーフィルタリングは、ベクトル検索の「後」に適用されるのではなく、検索プロセス中に適用されるため、効率的に関連性の高いデータのみを探索できます(ただし、実装の詳細やインデックスタイプにより挙動は異なります)。
4. Time Travel
Milvusは、指定したタイムスタンプ時点でのデータスナップショットに対して検索を実行できる「Time Travel」機能をサポートしています。これにより、過去の特定の時点でのデータ状態を確認したり、データの変更履歴を追跡したりすることが可能になります。
検索時にtravel_timestamp
パラメータ(Unixタイムスタンプ形式)を指定します。
import time
# 現在のタイムスタンプを取得 (例)
# 特定の過去のタイムスタンプを指定することも可能
target_timestamp = int(time.time())
try:
results_timetravel = collection.search(
data=query_vectors,
anns_field=search_field,
param=search_params,
limit=limit,
travel_timestamp=target_timestamp # タイムスタンプを指定
)
print(f"✅ Time Travel検索を実行しました (timestamp={target_timestamp})")
# 結果処理...
except Exception as e:
print(f"❌ Time Travel検索に失敗しました: {e}")
5. 一貫性レベル (Consistency Levels)
Milvusは分散システムであり、データの書き込みと読み取りの間にはわずかな遅延が生じる可能性があります。PyMilvusでは、データの「一貫性レベル」を操作ごとに指定することで、データの鮮度とクエリパフォーマンスのトレードオフを調整できます。
- Strong (デフォルト): 最も強い一貫性。読み取り操作は、それ以前のすべての書き込み操作が完了してから実行されることを保証します。データの完全性が最優先される場合に適しています。
- Bounded Staleness: 指定した時間範囲内でのデータの一貫性を保証します。Strongよりレイテンシが改善される可能性があります。
- Session: 同じセッション(接続)内での読み書きの一貫性を保証します。
- Eventually: 最も弱い一貫性。書き込み操作が最終的には反映されることを保証しますが、読み取り時に最新の状態でない可能性があります。パフォーマンスが最優先される場合に適しています。
一貫性レベルは、コレクション作成時や各操作(search, query, getなど)のパラメータで指定できます。
# コレクション作成時に指定
# collection = Collection(name=collection_name, schema=schema, consistency_level="Bounded")
# search時に指定
# results = collection.search(..., consistency_level="Eventually")
# query時に指定 (queryはgetと似ていますが、より複雑な条件式を使えます)
# query_results = collection.query(expr="book_id > 0", output_fields=["title"], consistency_level="Session")
MilvusClient: よりシンプルなインターフェース
PyMilvus 2.x の後半のバージョンから、MilvusClient
という新しいインターフェースが導入されました。これは従来のORM(Object-Relational Mapping)スタイルのAPI(Collection
やconnections
モジュールを使う方法)よりも、よりシンプルで直感的な操作を提供することを目指しています。
特に、Milvus Lite(ローカルファイルベースのMilvus)を使用する場合や、基本的な操作を素早く行いたい場合に便利です。
from pymilvus import MilvusClient
import numpy as np
# Milvus Liteを使用する場合 (ファイルパスを指定)
client_lite = MilvusClient("milvus_lite_demo.db")
print("✅ Milvus Liteクライアントを作成しました。")
# 通常のMilvusサーバーに接続する場合 (URIを指定)
# client_server = MilvusClient(
# uri="http://localhost:19530", # gRPCの場合は "http://" なしで "localhost:19530"
# # token="YOUR_API_KEY" # 認証が必要な場合
# )
# print("✅ Milvusサーバーへのクライアントを作成しました。")
collection_name_client = "quick_setup"
# --- コレクション操作 ---
# コレクションが存在すれば削除 (デモ用)
if client_lite.has_collection(collection_name=collection_name_client):
client_lite.drop_collection(collection_name=collection_name_client)
print(f"既存のコレクション '{collection_name_client}' を削除しました。")
# コレクション作成 (スキーマ定義をより簡潔に)
# dimでベクトル次元数を指定するだけで、プライマリキー(id)とベクトルフィールド(vector)が自動生成される
try:
client_lite.create_collection(
collection_name=collection_name_client,
dimension=128 # ベクトルの次元数を指定
# auto_id=True, # デフォルトでTrue
# metric_type="L2" # デフォルトでL2
# enable_dynamic_field=True # デフォルトでTrue
)
print(f"✅ コレクション '{collection_name_client}' を作成しました。")
except Exception as e:
print(f"❌ コレクションの作成に失敗しました: {e}")
# --- データ操作 ---
# データ準備 (NumPy配列など)
vectors_to_insert = np.random.rand(10, 128).astype(np.float32) # 128次元のベクトルを10個
data = [
{"id": i, "vector": vectors_to_insert[i], "genre": f"genre_{i % 3}"} # idも指定可能
for i in range(10)
]
# または auto_id=True の場合、idなしの辞書でも良い
# data_auto = [
# {"vector": vectors_to_insert[i], "genre": f"genre_{i % 3}"}
# for i in range(10)
# ]
# データ挿入
try:
res = client_lite.insert(
collection_name=collection_name_client,
data=data
)
print(f"✅ データを挿入しました。成功数: {res['insert_count']}, PKs: {res['ids']}")
except Exception as e:
print(f"❌ データの挿入に失敗しました: {e}")
# --- 検索操作 ---
# クエリベクトル
query_vector = np.random.rand(1, 128).astype(np.float32)
# 検索
try:
search_res = client_lite.search(
collection_name=collection_name_client,
data=query_vector, # クエリベクトル
limit=3, # 上位3件を取得
filter="genre == 'genre_1'", # フィルタリング条件 (オプション)
output_fields=["id", "genre"] # 出力フィールド (オプション, "vector"以外)
)
print(f"✅ 検索を実行しました。")
# 結果の表示 (よりシンプルな形式で返ってくる)
for result in search_res[0]: # search_resはクエリごとの結果リストのリスト
print(f" - ID: {result['id']}, Distance: {result['distance']:.4f}, Genre: {result.get('entity', {}).get('genre')}")
except Exception as e:
print(f"❌ 検索に失敗しました: {e}")
# IDによる取得
try:
get_res = client_lite.get(
collection_name=collection_name_client,
ids=[1, 3, 5] # 取得したいIDのリスト
)
print(f"✅ IDによる取得を実行しました: {get_res}")
except Exception as e:
print(f"❌ IDによる取得に失敗しました: {e}")
# IDによる削除
try:
delete_res = client_lite.delete(
collection_name=collection_name_client,
ids=[0, 4] # 削除したいIDのリスト
)
print(f"✅ IDによる削除を実行しました。成功数: {delete_res}")
except Exception as e:
print(f"❌ IDによる削除に失敗しました: {e}")
# コレクション削除
# client_lite.drop_collection(collection_name=collection_name_client)
# print(f"コレクション '{collection_name_client}' を削除しました。")
MilvusClient
は、内部的に従来のORM APIをラップしており、多くの一般的なユースケースをより少ないコード量で実現できます。どちらのAPIを使用するかは、プロジェクトの要件や好みに応じて選択できます。
まとめ
この記事では、オープンソースのベクトルデータベースMilvusをPythonから操作するためのSDKであるPyMilvusについて、その基本的な使い方から主要な機能までを解説しました。
PyMilvusを使えば、以下のことが簡単に実現できます:
- ✅ Milvusサーバーへの接続 (ローカル、リモート、クラウド、Milvus Lite)
- 🧱 コレクションとスキーマの定義・管理
- 💾 ベクトルデータやメタデータの挿入、更新、削除
- ⚡️ 高速なベクトル類似性検索 (ANN検索)
- 🔍 スカラーフィールドによるデータフィルタリング (ハイブリッド検索)
- ⏱️ パフォーマンス向上のためのインデックス作成と管理
- 🗂️ 大規模データの管理を容易にするパーティション機能
- ⏳ 過去のデータ状態へのアクセス (Time Travel)
- ⚖️ データ鮮度とパフォーマンスを調整する一貫性レベルの設定
- 💡 よりシンプルな操作を提供する
MilvusClient
インターフェースの利用
PyMilvusは、ベクトル検索を活用したAIアプリケーション(セマンティック検索、レコメンデーションシステム、画像・音声認識、RAGなど)をPythonで開発するための強力なツールです。ドキュメントも充実しており、活発なコミュニティによるサポートも期待できます。
ぜひPyMilvusを使って、ベクトルデータベースの世界を探求し、あなたのアプリケーションに新たな可能性を加えてみてください!🎉
コメント