Azure Databricks を使用した取得拡張生成
取得拡張生成 (RAG) は、外部のナレッジ ソースを統合することによって大規模言語モデルを強化する AI の最先端のアプローチです。 Azure Databricks は、RAG アプリケーションを開発するための堅牢なプラットフォームを提供します。これにより、非構造化データを取得と応答の生成に適した形式に変換できます。 このプロセスには、ユーザーのクエリの理解、関連するデータの取得、言語モデルを使用した応答の生成など、一連の手順が含まれます。 Azure Databricks によって提供されるフレームワークは、RAG アプリケーションの迅速な反復とデプロイをサポートし、最新の情報と独自の知識を含めることができる高品質のドメイン固有の応答を保証します。
このラボは完了するまで、約 40 分かかります。
注: Azure Databricks ユーザー インターフェイスは継続的な改善の対象となります。 この演習の手順が記述されてから、ユーザー インターフェイスが変更されている場合があります。
開始する前に
管理レベルのアクセス権を持つ Azure サブスクリプションが必要です。
Azure Databricks ワークスペースをプロビジョニングする
ヒント: 既に Azure Databricks ワークスペースがある場合は、この手順をスキップして、既存のワークスペースを使用できます。
この演習には、新しい Azure Databricks ワークスペースをプロビジョニングするスクリプトが含まれています。 このスクリプトは、この演習で必要なコンピューティング コアに対する十分なクォータが Azure サブスクリプションにあるリージョンに、Premium レベルの Azure Databricks ワークスペース リソースを作成しようとします。また、使用するユーザー アカウントのサブスクリプションに、Azure Databricks ワークスペース リソースを作成するための十分なアクセス許可があることを前提としています。 十分なクォータやアクセス許可がないためにスクリプトが失敗した場合は、Azure portal で、Azure Databricks ワークスペースを対話形式で作成してみてください。
- Web ブラウザーで、
https://portal.azure.comの Azure portal にサインインします。 -
ページ上部の検索バーの右側にある [>_] ボタンを使用して、Azure portal に新しい Cloud Shell を作成します。PowerShell 環境を選択します。 次に示すように、Azure portal の下部にあるペインに、Cloud Shell のコマンド ライン インターフェイスが表示されます。

注: Bash 環境を使用するクラウド シェルを以前に作成した場合は、それを PowerShell に切り替えます。
-
ペインの上部にある区分線をドラッグして Cloud Shell のサイズを変更したり、ペインの右上にある — 、 ⤢ 、X アイコンを使用して、ペインを最小化または最大化したり、閉じたりすることができます。 Azure Cloud Shell の使い方について詳しくは、Azure Cloud Shell のドキュメントをご覧ください。
-
PowerShell のペインで、次のコマンドを入力して、リポジトリを複製します。
rm -r mslearn-databricks -f git clone https://github.com/MicrosoftLearning/mslearn-databricks -
リポジトリをクローンした後、次のコマンドを入力して setup-serverless.ps1 スクリプトを実行します。これにより、使用可能なリージョンに Azure Databricks ワークスペースがプロビジョニングされます。
./mslearn-databricks/setup-serverless.ps1 -
メッセージが表示された場合は、使用するサブスクリプションを選択します (これは、複数の Azure サブスクリプションへのアクセス権を持っている場合にのみ行います)。
- スクリプトの完了まで待ちます。通常、約 5 分かかりますが、さらに時間がかかる場合もあります。
ノートブックを作成し、必要なライブラリをインストールする
-
Azure portal で、スクリプトによって作成された msl-xxxxxxx リソース グループ (または既存の Azure Databricks ワークスペースを含むリソース グループ) に移動します。
-
Azure Databricks Service リソース (セットアップ スクリプトを使って作成した場合は、databricks-xxxxxxx という名前) を選択します。
-
Azure Databricks ワークスペースの [概要] ページで、[ワークスペースの起動] ボタンを使用して、新しいブラウザー タブで Azure Databricks ワークスペースを開きます。サインインを求められた場合はサインインします。
ヒント: Databricks ワークスペース ポータルを使用すると、さまざまなヒントと通知が表示される場合があります。 これらは無視し、指示に従ってこの演習のタスクを完了してください。
-
サイド バーで [(+) 新規] タスクを使用して、Notebook を作成します。 既定のコンピューティングとして [サーバーレス] を選択します。
-
最初のコード セルに次のコードを入力して実行し、必要なライブラリをインストールします。
%pip install transformers==4.53.0 databricks-vectorsearch==0.56 torch dbutils.library.restartPython()
データの取り込み
-
ノートブックの新しいセルに次の SQL クエリを入力して、この演習のデータを既定のカタログに格納するために使用する新しいボリュームを作成します。
%sql CREATE VOLUME <catalog_name>.default.RAG_lab; <catalog_name>をワークスペースの名前に置き換えます。これは、Azure Databricks によって既定のカタログがその名前で自動的に作成されるためです。- セルの左側にある [▸ セルの実行] メニュー オプションを使用して実行を行います。 そして、コードによって実行される Spark ジョブが完了するまで待ちます。
-
新しいセルで、次のコードを実行して、”シェル” コマンドを使用して GitHub から Unity カタログにデータをダウンロードします。**
%sh wget -O /Volumes/<catalog_name>/default/RAG_lab/enwiki-latest-pages-articles.xml https://github.com/MicrosoftLearning/mslearn-databricks/raw/main/data/enwiki-latest-pages-articles.xml -
新しいセルで、次のコードを実行して、生データからデータフレームを作成します。
from pyspark.sql import SparkSession # Create a Spark session spark = SparkSession.builder \ .appName("RAG-DataPrep") \ .getOrCreate() # Read the XML file raw_df = spark.read.format("xml") \ .option("rowTag", "page") \ .load("/Volumes/<catalog_name>/default/RAG_lab/enwiki-latest-pages-articles.xml") # Show the DataFrame raw_df.show(5) # Print the schema of the DataFrame raw_df.printSchema() -
新しいセルで、次のコードを実行し、
<catalog_name>を Unity カタログの名前に置き換えて、データをクリーンアップして前処理し、関連するテキスト フィールドを抽出します。from pyspark.sql.functions import col clean_df = raw_df.select(col("title"), col("revision.text._VALUE").alias("text")) clean_df = clean_df.na.drop() clean_df.write.format("delta").mode("overwrite").saveAsTable("<catalog_name>.default.wiki_pages") clean_df.show(5)カタログ (Ctrl + Alt + C) エクスプローラーを開いてペインを更新すると、既定の Unity カタログに Delta テーブルが作成されます。
埋め込みを生成し、ベクトル検索を実装する
Databricks の Mosaic AI ベクトル検索は、Azure Databricks プラットフォーム内に統合されたベクトル データベース ソリューションです。 Hierarchical Navigable Small World (HNSW) アルゴリズムを使用して、埋め込みのストレージと取得を最適化します。 これにより、効率的な最近隣検索が可能になり、そのハイブリッド キーワード類似性検索機能は、ベクトル ベースとキーワード ベースの検索手法を組み合わせることにより、より関連性の高い結果を提供します。
-
新しいセルで、差分同期インデックスを作成する前に、次の SQL クエリを実行してソース テーブルのデータ フィードの変更機能を有効にします。
%sql ALTER TABLE <catalog_name>.default.wiki_pages SET TBLPROPERTIES (delta.enableChangeDataFeed = true) -
新しいセルで、次のコードを実行して、ベクトル検索インデックスを作成します。
from databricks.vector_search.client import VectorSearchClient client = VectorSearchClient() client.create_endpoint( name="vector_search_endpoint", endpoint_type="STANDARD" ) index = client.create_delta_sync_index( endpoint_name="vector_search_endpoint", source_table_name="<catalog_name>.default.wiki_pages", index_name="<catalog_name>.default.wiki_index", pipeline_type="TRIGGERED", primary_key="title", embedding_source_column="text", embedding_model_endpoint_name="databricks-gte-large-en" )注:ベクトル検索のエンドポイントとインデックスの作成には数分かかる場合があります。 操作が完了するまで待ってから続行してください。
カタログ (Ctrl + Alt + C) エクスプローラーを開いてペインを更新すると、既定の Unity カタログにインデックスが作成されます。
注: 次のコード セルを実行する前に、エンドポイントとインデックスの両方がオンラインであることを確認します。
- 左側のサイド バーで [コンピューティング] を選択してから [ベクトル検索] タブを選択して、エンドポイントの状態が [オンライン] であることを確認します。
- [カタログ] ペインでインデックスを右クリックし、[カタログ エクスプローラーで開く] を選択します。 インデックスの状態が [オンライン] になるまで待ちます (これには 5 ~ 10 分かかる場合があります)。
- エラーが発生した場合は、インデックスの同期にさらに時間がかかる場合があります。
index.sync()を実行して、同期を手動でトリガーできます。
-
新しいセルで、次のコードを実行して、クエリ ベクトルに基づいて関連するドキュメントを検索します。
results_dict=index.similarity_search( query_text="Anthropology fields", columns=["title", "text"], num_results=1 ) display(results_dict)
出力で、クエリ プロンプトに関連する対応する Wiki ページが見つかることを確認します。
取得したデータを使用してプロンプトを拡張する
これで、外部データ ソースからの追加のコンテキストを提供することで、大規模言語モデルの機能を強化できるようになりました。 そうすることで、モデルはより正確でコンテキストに関連する応答を生成できます。
-
新しいセルで、次のコードを実行して、取得したデータをユーザーのクエリと組み合わせて、LLM のリッチ プロンプトを作成します。
# Convert the dictionary to a DataFrame results = spark.createDataFrame([results_dict['result']['data_array'][0]]) from transformers import pipeline # Load the summarization model summarizer = pipeline("summarization", model="facebook/bart-large-cnn", framework="pt") # Extract the string values from the DataFrame column (serverless-compatible) text_data = [row["_2"] for row in results.select("_2").collect()] # Pass the extracted text data to the summarizer function summary = summarizer(text_data, max_length=512, min_length=100, do_sample=True) def augment_prompt(query_text): context = " ".join([item['summary_text'] for item in summary]) return f"Query: {query_text}\nContext: {context}" prompt = augment_prompt("Explain the significance of Anthropology") print(prompt) -
新しいセルで、次のコードを実行して、LLM を使用して応答を生成します。
from transformers import GPT2LMHeadModel, GPT2Tokenizer tokenizer = GPT2Tokenizer.from_pretrained("gpt2") model = GPT2LMHeadModel.from_pretrained("gpt2") inputs = tokenizer(prompt, return_tensors="pt") outputs = model.generate( inputs["input_ids"], max_length=300, num_return_sequences=1, repetition_penalty=2.0, top_k=50, top_p=0.95, temperature=0.7, do_sample=True ) response = tokenizer.decode(outputs[0], skip_special_tokens=True) print(response)
クリーンアップ
Azure Databricks を調べ終わったら、不要な Azure コストがかからないように、また、サブスクリプションの容量を解放するために、作成したリソースを削除することができます。