1. なぜAI/データサイエンスでScalaなのか? 🤔
Pythonがデータサイエンス界隈で広く使われている中、なぜScalaを学ぶ価値があるのでしょうか?いくつかの重要な理由があります。
- JVM言語としての強み: ScalaはJava Virtual Machine (JVM) 上で動作します。これにより、豊富なJavaライブラリ資産を活用でき、Javaで構築された既存システムとの連携も容易です。また、JVMの成熟した実行環境は、パフォーマンスと安定性の面で大きな利点となります。
-
関数型プログラミングとオブジェクト指向の融合: Scalaは関数型プログラミングとオブジェクト指向プログラミングの両方のパラダイムをサポートしています。
- 関数型プログラミングの特性(不変性、副作用の排除、高階関数など)は、特に大規模なデータ処理や並列処理において、コードの簡潔性、安全性、テスト容易性を高めます。データの変換処理をパイプラインのように記述できるため、見通しが良くなります。✨
- オブジェクト指向の特性は、システムの構造化や再利用可能なコンポーネントの作成に役立ちます。
- 静的型付け言語: Scalaは静的型付け言語です。コンパイル時に型エラーを検出できるため、大規模なプロジェクトやチーム開発において、実行時エラーを減らし、コードの信頼性と保守性を向上させます。型推論機能も強力なため、冗長な型宣言を省略でき、コードを簡潔に保てます。✅
- Apache Sparkとの高い親和性: ビッグデータ処理のデファクトスタンダードであるApache Sparkは、Scalaで実装されています。そのため、SparkのAPIはScalaで最も自然かつ効率的に利用できます。Sparkの最新機能も、まずScala APIとして提供されることが多いです。📊
- パフォーマンス: 一般的に、ScalaはJVM上で動作するため、Python (CPython) と比較して実行速度が高速な場合があります。特にCPUバウンドな計算処理や大規模データの処理において、その差が顕著になることがあります。
これらの理由から、特に大規模データの処理、分散コンピューティング、リアルタイム分析、堅牢な機械学習パイプラインの構築といった領域でScalaは強力な選択肢となります。
2. Scala開発環境の構築 🛠️
Scalaで開発を始めるために、まずは開発環境を整えましょう。主に以下のツールを使用します。
-
Java Development Kit (JDK): ScalaはJVM上で動作するため、JDK(バージョン8以上推奨、Sparkのバージョンによってはより新しいものが必要)が必須です。Adoptium (旧AdoptOpenJDK) や Oracle JDK などをインストールしてください。
# macOS (Homebrew) brew install openjdk@11 # Ubuntu sudo apt update sudo apt install openjdk-11-jdk
-
sbt (Scala Build Tool): Scalaプロジェクトのビルド、依存関係管理、テスト実行などを行うための標準的なツールです。公式サイトからインストールできます。
# macOS (Homebrew) brew install sbt # Ubuntu (SDKMAN! を使うのが便利) curl -s "https://get.sdkman.io" | bash source "$HOME/.sdkman/bin/sdkman-init.sh" sdk install sbt
- 統合開発環境 (IDE): IntelliJ IDEA に Scalaプラグイン をインストールするのが最も一般的で強力な組み合わせです。コード補完、デバッグ、リファクタリングなどの機能が充実しており、開発効率が大幅に向上します。Visual Studio Code に Metals (Scala Language Server) を導入する選択肢もあります。
簡単なsbtプロジェクトの作成
ターミナルで以下のコマンドを実行し、sbtプロジェクトの雛形を作成してみましょう。
# プロジェクト用のディレクトリを作成
mkdir my-scala-app
cd my-scala-app
# sbt new コマンドでテンプレートからプロジェクトを作成
# scala/hello-world.g8 はシンプルなテンプレート
sbt new scala/hello-world.g8
対話形式でプロジェクト名などを聞かれます。完了すると、以下のようなディレクトリ構造が生成されます。
my-scala-app/
├── build.sbt # プロジェクト定義ファイル
├── project/ # sbtプラグインなどの設定
│ └── build.properties
└── src/
├── main/
│ └── scala/
│ └── Main.scala # メインのソースコード
└── test/
└── scala/
└── MainSpec.scala # テストコード
build.sbt
ファイルにプロジェクト名、Scalaバージョン、依存ライブラリなどを記述します。
// build.sbt の例
val scala3Version = "3.3.1" // 例: Scala 3 を使う場合
lazy val root = project
.in(file("."))
.settings(
name := "MyScalaApp", // プロジェクト名
version := "0.1.0-SNAPSHOT",
scalaVersion := scala3Version, // 使用するScalaバージョン
libraryDependencies ++= Seq( // 依存ライブラリ
"org.scalatest" %% "scalatest" % "3.2.17" % Test,
// データサイエンスでよく使うライブラリを追加していく
// 例: Spark
// "org.apache.spark" %% "spark-core" % "3.5.0",
// "org.apache.spark" %% "spark-sql" % "3.5.0"
)
)
sbt run
コマンドで src/main/scala/Main.scala
が実行され、sbt test
でテストが実行されます。IntelliJ IDEAでこのプロジェクトを開けば、すぐに開発を始められます。
3. Scalaの基本文法 (データサイエンス向け) 📝
Scalaの文法は多岐にわたりますが、ここでは特にデータサイエンスの文脈でよく使われる基本的な要素に焦点を当てて解説します。
変数の宣言: val と var
val
: 再代入不可能な変数を宣言します(immutable)。関数型プログラミングでは、不変性を重視するため、基本的にはval
を使用します。これにより、予期せぬ状態変化を防ぎ、コードの安全性を高めます。var
: 再代入可能な変数を宣言します(mutable)。状態を変化させる必要がある場合に限定的に使用します。
// val: 不変 (推奨)
val message: String = "Hello, Scala!"
val count: Int = 10
// message = "Hi" // これはコンパイルエラーになる
// var: 可変
var mutableCounter: Int = 0
mutableCounter = mutableCounter + 1 // OK
println(mutableCounter) // 1
型推論
Scalaは強力な型推論機能を持っています。多くの場合、変数の型を明示的に記述しなくても、コンパイラが初期値から型を推論してくれます。これにより、コードが簡潔になります。
val city = "Tokyo" // String型と推論される
val population = 14000000 // Int型と推論される
val temperature = 25.5 // Double型と推論される
// 型を明示することも可能 (特に複雑な場合やAPIの境界など)
val names: List[String] = List("Alice", "Bob", "Charlie")
関数定義
def
キーワードを使って関数を定義します。引数の型は必須ですが、戻り値の型は多くの場合推論されます(再帰関数など一部例外あり)。
// シンプルな関数
def add(x: Int, y: Int): Int = {
x + y
}
// 戻り値の型推論 (Unitは戻り値がないことを示す型)
def printMessage(msg: String): Unit = {
println(msg)
}
// 括弧やreturnを省略できる場合もある (本体が単一式の場合)
def multiply(a: Int, b: Int) = a * b
// 実行
val sum = add(5, 3) // sum は 8
printMessage("Scala is fun!")
val product = multiply(4, 6) // product は 24
Scalaでは関数は第一級オブジェクトであり、変数に代入したり、他の関数の引数として渡したり、戻り値として返すことができます(高階関数)。これはデータ処理パイプラインを構築する上で非常に強力です。
// 関数を変数に代入
val square: Int => Int = x => x * x // Int を受け取り Int を返す関数
println(square(5)) // 25
// 高階関数の例 (関数を引数に取る)
def operate(x: Int, y: Int, func: (Int, Int) => Int): Int = {
func(x, y)
}
val difference = operate(10, 4, (a, b) => a - b) // difference は 6
println(difference)
コレクション操作 (Map, Filter, Reduce)
データサイエンスでは、データの集合(リスト、配列、セットなど)に対する操作が頻繁に発生します。Scalaのコレクションライブラリは非常に豊富で、関数型のスタイルで効率的なデータ操作を実現できます。特に map
, filter
, reduce
は基本かつ重要です。
val numbers = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// map: 各要素に関数を適用し、新しいコレクションを生成
val squaredNumbers = numbers.map(x => x * x)
// squaredNumbers: List(1, 4, 9, 16, 25, 36, 49, 64, 81, 100)
println(s"Squared: $squaredNumbers")
// filter: 条件を満たす要素のみを含む新しいコレクションを生成
val evenNumbers = numbers.filter(x => x % 2 == 0)
// evenNumbers: List(2, 4, 6, 8, 10)
println(s"Even: $evenNumbers")
// reduce: コレクションの要素を左から右へ畳み込み、単一の結果を生成
val sumOfNumbers = numbers.reduce((acc, curr) => acc + curr)
// sumOfNumbers: 55 (1 + 2 + ... + 10)
// より簡潔な記法: numbers.reduce(_ + _)
println(s"Sum: $sumOfNumbers")
// 組み合わせ: 偶数を2乗して合計する
val sumOfSquaredEvens = numbers
.filter(_ % 2 == 0) // 偶数を抽出: List(2, 4, 6, 8, 10)
.map(x => x * x) // 各要素を2乗: List(4, 16, 36, 64, 100)
.reduce(_ + _) // 合計: 220
println(s"Sum of squared evens: $sumOfSquaredEvens")
これらの操作は不変であり、元のコレクション numbers
は変更されません。各ステップで新しいコレクションが生成されます。これにより、処理の見通しが良くなり、デバッグも容易になります。
パターンマッチ
Scalaのパターンマッチは、Javaの switch
文を大幅に拡張した強力な機能です。値の型、構造、内容に基づいて分岐処理を記述できます。データ構造の分解や条件分岐を簡潔かつ安全に行えます。
def describe(x: Any): String = x match {
case 5 => "Five"
case "hello" => "Greeting"
case i: Int => s"An integer: $i"
case s: String => s"A string: '$s'"
case list: List[_] => s"A list with ${list.length} elements"
case (a, b) => s"A tuple: ($a, $b)" // タプルの分解
case _ => "Something else" // デフォルトケース
}
println(describe(5)) // Five
println(describe("hello")) // Greeting
println(describe(100)) // An integer: 100
println(describe(List(1, 2, 3))) // A list with 3 elements
println(describe((10, "Scala"))) // A tuple: (10, Scala)
println(describe(3.14)) // Something else
// Option型の処理にもよく使われる
val maybeValue: Option[Int] = Some(42)
// val maybeValue: Option[Int] = None
val valueDescription = maybeValue match {
case Some(v) => s"Value is $v"
case None => "No value"
}
println(valueDescription)
パターンマッチは、特にデータクラス (case class) の分解や、APIからの応答処理、エラーハンドリングなどで威力を発揮します。
Case Class
case class
は、データ保持を主目的としたクラスを簡潔に定義するための特別な構文です。データ分析やデータモデリングにおいて非常に便利です。
- コンストラクタ引数が自動的に公開
val
フィールドになる。 apply
メソッドが自動生成され、new
なしでインスタンス化できる。toString
,equals
,hashCode
メソッドが適切に自動実装される。copy
メソッドが自動生成され、一部のフィールドを変更した新しいインスタンスを簡単に作成できる(不変性を保ちやすい)。- パターンマッチとの相性が非常に良い。
// データレコードを表すCase Class
case class Person(name: String, age: Int, city: String)
// インスタンス化 (new 不要)
val alice = Person("Alice", 30, "New York")
val bob = Person("Bob", 25, "London")
println(alice) // Person(Alice,30,New York) - toStringが自動実装
println(alice.name) // Alice - フィールドにアクセス
// copyメソッドで新しいインスタンスを作成
val aliceOlder = alice.copy(age = 31)
println(aliceOlder) // Person(Alice,31,New York)
println(alice) // Person(Alice,30,New York) - 元のインスタンスは不変
// パターンマッチで利用
def greet(person: Person): String = person match {
case Person("Alice", age, _) => s"Hi Alice! You are $age."
case Person(name, _, "London") => s"Hello $name from London!"
case Person(name, age, city) => s"Hello $name ($age) from $city."
}
println(greet(alice)) // Hi Alice! You are 30.
println(greet(bob)) // Hello Bob from London!
println(greet(Person("Charlie", 40, "Paris"))) // Hello Charlie (40) from Paris.
データフレームの行や、APIのレスポンスなどを表現するのに case class
は最適です。
4. Scalaによるデータ処理ライブラリ 📊
Scalaの標準コレクションライブラリだけでも多くのデータ操作が可能ですが、より大規模なデータや特定のタスクには専用のライブラリが役立ちます。特にApache SparkはScalaとの相性が抜群です。
Scala Collections API の高度な活用
map
, filter
, reduce
以外にも、多くの便利な高階関数が用意されています。
flatMap
: 各要素に関数を適用し、結果のコレクションを「平坦化」して連結します。ネストされたリストの処理や、Option型の処理などでよく使われます。groupBy
: 指定したキーに基づいて要素をグループ化し、Map
を生成します。sortBy
/sortWith
: 要素をソートします。collect
: パターンマッチを組み合わせたmap
のような操作を行います。部分関数 (PartialFunction) を使います。foldLeft
/foldRight
:reduce
と似ていますが、初期値を指定できます。partition
: 条件に基づいてコレクションを2つに分割します。zip
/zipWithIndex
: 複数のコレクションを組み合わせたり、インデックスを付与したりします。
val words = List("Scala", "is", "powerful", "and", "fun")
// flatMap: 各単語を文字のリストに分解し、平坦化
val chars = words.flatMap(word => word.toList)
// chars: List(S, c, a, l, a, i, s, p, o, w, e, r, f, u, l, a, n, d, f, u, n)
println(s"Chars: $chars")
// groupBy: 単語の長さでグループ化
val groupedByLength = words.groupBy(word => word.length)
// groupedByLength: Map(
// 5 -> List(Scala),
// 2 -> List(is),
// 8 -> List(powerful),
// 3 -> List(and, fun)
// )
println(s"Grouped by length: $groupedByLength")
// sortBy: 文字列の長さでソート
val sortedByLength = words.sortBy(word => word.length)
// sortedByLength: List(is, and, fun, Scala, powerful)
println(s"Sorted by length: $sortedByLength")
// collect: 特定の条件を満たす要素だけ変換 (例: 長さ3の単語を大文字に)
val uppercaseLen3 = words.collect {
case word if word.length == 3 => word.toUpperCase
}
// uppercaseLen3: List(AND, FUN)
println(s"Uppercase length 3: $uppercaseLen3")
// foldLeft: 文字列を連結 (初期値付き)
val concatenated = words.foldLeft("Words:")((acc, word) => acc + " " + word)
// concatenated: "Words: Scala is powerful and fun"
println(concatenated)
Apache Spark の紹介と Scala API の基本
Apache Spark は、大規模データ処理のための高速で汎用的なクラスタコンピューティングシステムです。Scalaで書かれており、Scala APIが最もネイティブで表現力豊かです。
Sparkの主要な抽象化概念:
- SparkSession: Sparkアプリケーションのエントリーポイント。Sparkクラスタへの接続を管理し、DataFrameやRDDを作成するために使用されます。
- Resilient Distributed Datasets (RDD): Sparkの基本的なデータ抽象化。クラスタ全体に分散された、不変の要素のコレクションです。フォールトトレラント性(耐障害性)を持ちます。低レベルAPIですが、柔軟な操作が可能です。
-
DataFrame / Dataset: RDD上に構築された、より構造化されたデータ抽象化。テーブル形式のデータ(列名と型を持つ)を扱います。SQLライクな操作や最適化された実行計画(Catalyst Optimizer)の恩恵を受けられます。
- DataFrame: 型情報が実行時にチェックされる (
Row
オブジェクトの集まり)。 - Dataset: コンパイル時に型チェックが行われる (Case Classなどのオブジェクトの集まり)。ScalaやJava APIで利用でき、型安全性が高い。
DataFrame
はDataset[Row]
のエイリアスです。
- DataFrame: 型情報が実行時にチェックされる (
AI/データサイエンスでは、主に DataFrame
や Dataset
を使用することが多いです。
SparkSessionの作成
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("MySparkApp") // アプリケーション名
.master("local[*]") // ローカルモードで実行 (全コア使用)
// .master("spark://your-master-url:7077") // クラスタモードの場合
.getOrCreate()
// SparkContext (RDD API用)
val sc = spark.sparkContext
// implicits をインポートすると、便利な変換 (e.g., RDD to DataFrame) が使えるようになる
import spark.implicits._
println(s"Spark version: ${spark.version}")
// アプリケーション終了時に stop() を呼ぶ
// spark.stop()
注意: .master("local[*]")
はローカルマシン上でテストや開発を行う際に便利ですが、実際の分散環境ではクラスタマネージャー (YARN, Mesos, Kubernetes) やスタンドアロンクラスタのマスターURLを指定します。
DataFrameの基本操作 (CSV読み込み、加工、集計)
例として、簡単なCSVファイルを読み込み、基本的なデータ操作を行ってみましょう。
data/sales.csv
(例):
product,category,price,quantity
Laptop,Electronics,1200,5
Mouse,Electronics,25,10
Keyboard,Electronics,75,8
Desk,Furniture,300,2
Chair,Furniture,150,4
Monitor,Electronics,300,6
import org.apache.spark.sql.functions._ // col(), avg(), sum() などの関数を使うため
import org.apache.spark.sql.types._ // スキーマ定義用
// CSVファイルを読み込む (ヘッダーあり、型を推論)
val salesDF = spark.read
.option("header", "true") // 1行目をヘッダーとして使用
.option("inferSchema", "true") // 型を自動推論
.csv("data/sales.csv") // ファイルパス
// スキーマを表示
salesDF.printSchema()
// root
// |-- product: string (nullable = true)
// |-- category: string (nullable = true)
// |-- price: integer (nullable = true)
// |-- quantity: integer (nullable = true)
// データを表示 (最初の数行)
salesDF.show()
// +--------+-----------+-----+--------+
// | product| category|price|quantity|
// +--------+-----------+-----+--------+
// | Laptop|Electronics| 1200| 5|
// | Mouse|Electronics| 25| 10|
// |Keyboard|Electronics| 75| 8|
// | Desk| Furniture| 300| 2|
// | Chair| Furniture| 150| 4|
// | Monitor|Electronics| 300| 6|
// +--------+-----------+-----+--------+
// 列を選択 (select)
val productAndPriceDF = salesDF.select("product", "price")
productAndPriceDF.show()
// 条件でフィルタリング (filter / where)
val electronicsDF = salesDF.filter($"category" === "Electronics") // $"" 記法 (推奨)
// または salesDF.filter(col("category") === "Electronics")
// または salesDF.where("category = 'Electronics'")
electronicsDF.show()
// 新しい列を追加 (withColumn) - 売上金額 (price * quantity)
val salesWithRevenueDF = salesDF.withColumn("revenue", $"price" * $"quantity")
salesWithRevenueDF.show()
// +--------+-----------+-----+--------+-------+
// | product| category|price|quantity|revenue|
// +--------+-----------+-----+--------+-------+
// | Laptop|Electronics| 1200| 5| 6000|
// | Mouse|Electronics| 25| 10| 250|
// |Keyboard|Electronics| 75| 8| 600|
// | Desk| Furniture| 300| 2| 600|
// | Chair| Furniture| 150| 4| 600|
// | Monitor|Electronics| 300| 6| 1800|
// +--------+-----------+-----+--------+-------+
// グループ化と集計 (groupBy, agg) - カテゴリごとの平均価格と合計数量
val categorySummaryDF = salesWithRevenueDF
.groupBy("category") // カテゴリでグループ化
.agg(
avg("price").alias("average_price"), // 平均価格
sum("quantity").alias("total_quantity"), // 合計数量
sum("revenue").alias("total_revenue") // 合計売上
)
categorySummaryDF.show()
// +-----------+-------------+--------------+-------------+
// | category|average_price|total_quantity|total_revenue|
// +-----------+-------------+--------------+-------------+
// |Electronics| 400.0| 29| 8650|
// | Furniture| 225.0| 6| 1200|
// +-----------+-------------+--------------+-------------+
// 結果をソート (orderBy / sort) - 合計売上降順
val sortedSummaryDF = categorySummaryDF.orderBy(desc("total_revenue")) // desc() は降順
sortedSummaryDF.show()
// Spark SQLを使うことも可能
salesWithRevenueDF.createOrReplaceTempView("sales_table") // 一時テーブルとして登録
val sqlResultDF = spark.sql("""
SELECT category, AVG(price) as avg_price, SUM(quantity) as total_qty
FROM sales_table
WHERE price > 100
GROUP BY category
ORDER BY avg_price DESC
""")
sqlResultDF.show()
// --- Dataset API (型安全) ---
// Case Class を定義
case class Sale(product: String, category: String, price: Option[Int], quantity: Option[Int])
// CSV読み込み時にスキーマを指定し、Dataset[Sale] に変換
// 注意: CSVから直接Datasetに読む場合、型が合わないとエラーになるか、nullになる
// 安全のため、一旦DataFrameで読んでからas[T]で変換するのが良い
val salesDS = salesDF.as[Sale] // DataFrameをDataset[Sale]に変換
// Dataset APIを使った操作 (コンパイル時型チェックが効く)
val expensiveElectronicsDS = salesDS
.filter(sale => sale.category == "Electronics" && sale.price.exists(_ > 100))
expensiveElectronicsDS.show()
// Dataset操作は型安全だが、DataFrame APIの方が最適化が効きやすい場合もある
// 用途に応じて使い分けるのが良い
// 忘れずにSparkSessionを停止
spark.stop()
Spark DataFrame/Dataset APIは、ScalaコレクションAPIと非常によく似た操作 (select
, filter
, map
, groupBy
, agg
など) を提供しますが、これらはクラスタ全体に分散されたデータに対して遅延評価され、最適化された上で実行されます。💡
5. 機械学習ライブラリとの連携 ⚙️
ScalaとSparkは、機械学習パイプラインの構築にも強力な基盤を提供します。
Spark MLlib の概要と使い方
Spark MLlib は、Spark上で動作する機械学習ライブラリです。一般的な機械学習アルゴリズム(分類、回帰、クラスタリング、推薦など)や、特徴量エンジニアリング、パイプライン構築、モデル評価、永続化のためのツールを提供します。
MLlibの主要コンポーネント:
-
Transformers: あるDataFrameを別のDataFrameに変換するアルゴリズム。例: 特徴量スケーリング、カテゴリ変数のエンコーディング、学習済みモデルによる予測。
transform()
メソッドを持つ。 -
Estimators: DataFrameを入力として学習し、Transformer(通常はモデル)を生成するアルゴリズム。例: ロジスティック回帰、決定木、KMeans。
fit()
メソッドを持つ。 - Pipelines: 複数のTransformersとEstimatorsを連結し、一連のワークフローとして定義・実行するための仕組み。データの前処理からモデル学習までをパイプライン化することで、コードが整理され、再利用性が高まる。
- Evaluation: モデルの性能を評価するためのメトリクス(例: Accuracy, RMSE, AUC)。
- Parameter Tuning: クロスバリデーションやTrain-Validation Splitを用いたハイパーパラメータチューニング。
簡単な分類モデルの例 (ロジスティック回帰)
ここでは、簡単なデータセットを使ってロジスティック回帰モデルを学習し、予測を行う例を示します。MLlibでは、多くのアルゴリズムが特徴量を Vector
型の単一カラムにまとめることを要求します。
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.feature.{VectorAssembler, StringIndexer}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}
val spark = SparkSession.builder.appName("MLlibExample").master("local[*]").getOrCreate()
import spark.implicits._
// サンプルデータの作成 (実際はファイルから読み込むことが多い)
val trainingData = spark.createDataFrame(Seq(
(0.0, "high", 1.0, 0.0),
(1.0, "low", 0.0, 1.0),
(2.0, "medium", 1.0, 1.0),
(3.0, "high", 0.5, 0.0),
(4.0, "low", 1.0, 1.0),
(5.0, "medium", 0.0, 1.0),
(6.0, "low", 0.8, 1.0),
(7.0, "high", 0.2, 0.0)
)).toDF("id", "category", "feature1", "label") // label が目的変数 (0 or 1)
// 1. 特徴量エンジニアリング
// カテゴリ特徴量 (category) を数値インデックスに変換
val categoryIndexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex")
.setHandleInvalid("keep") // 未知のカテゴリへの対処法
// 数値特徴量 (feature1) と変換後のカテゴリ特徴量 (categoryIndex) を
// 単一のベクトルカラム (features) にまとめる
val assembler = new VectorAssembler()
.setInputCols(Array("categoryIndex", "feature1"))
.setOutputCol("features")
// 2. モデル (Estimator) の定義
// ロジスティック回帰モデル
val lr = new LogisticRegression()
.setLabelCol("label") // 目的変数カラム
.setFeaturesCol("features") // 特徴量ベクトルカラム
.setMaxIter(10) // 最大反復回数
.setRegParam(0.3) // 正則化パラメータ
.setElasticNetParam(0.8) // ElasticNet混合パラメータ
// 3. パイプラインの構築
// 上記のステージ (Indexer, Assembler, Model) を連結
val pipeline = new Pipeline()
.setStages(Array(categoryIndexer, assembler, lr))
// 4. モデルの学習 (Pipelineを学習データにfitさせる)
val model = pipeline.fit(trainingData)
// 5. テストデータの準備
val testData = spark.createDataFrame(Seq(
(8.0, "high", 0.9),
(9.0, "low", 0.1),
(10.0, "medium", 0.7),
(11.0, "unknown", 0.5) // StringIndexerで未知カテゴリとして扱われる
)).toDF("id", "category", "feature1")
// 6. 予測 (学習済みモデルを使ってテストデータを変換)
val predictions = model.transform(testData)
// 予測結果を表示
predictions.select("id", "category", "feature1", "probability", "prediction").show()
// probability は各クラス (0, 1) の確率を示すVector, prediction は予測されたクラス (0 or 1)
// +---+--------+--------+--------------------+----------+
// | id|category|feature1| probability|prediction|
// +---+--------+--------+--------------------+----------+
// |8.0| high| 0.9|[0.43...,0.56...]| 1.0|
// |9.0| low| 0.1|[0.68...,0.31...]| 0.0|
// |10.0| medium| 0.7|[0.50...,0.49...]| 0.0|
// |11.0| unknown| 0.5|[0.56...,0.43...]| 0.0| <- unknown は handleInvalid='keep' で処理
// +---+--------+--------+--------------------+----------+
// 7. モデル評価 (例: BinaryClassificationEvaluatorでAUCを計算)
// ※ 実際の評価にはラベル付きのテストデータが必要
// ここでは例として学習データで評価してみる (過学習の可能性があるため非推奨)
val trainingPredictions = model.transform(trainingData)
val evaluator = new BinaryClassificationEvaluator()
.setLabelCol("label")
.setRawPredictionCol("rawPrediction") // probability or rawPrediction
.setMetricName("areaUnderROC") // AUC (Area Under ROC Curve)
val auc = evaluator.evaluate(trainingPredictions)
println(s"Area Under ROC Curve (on training data) = $auc")
// 8. ハイパーパラメータチューニング (例: CrossValidator)
// パラメータグリッドを定義
val paramGrid = new ParamGridBuilder()
.addGrid(lr.regParam, Array(0.1, 0.01)) // 正則化パラメータを試す
.addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0)) // ElasticNetを試す
.build()
// クロスバリデーションを設定 (Estimator=pipeline, Evaluator, ParamGrid)
val cv = new CrossValidator()
.setEstimator(pipeline) // パイプライン全体を対象にする
.setEvaluator(evaluator) // 上記の評価指標
.setEstimatorParamMaps(paramGrid) // 試すパラメータの組み合わせ
.setNumFolds(3) // 3-Fold Cross Validation
.setParallelism(2) // 並列実行数
// クロスバリデーションを実行して最適なモデルを取得
val cvModel = cv.fit(trainingData) // これには時間がかかる場合がある
// 最適なモデルで予測
val bestPredictions = cvModel.transform(testData)
bestPredictions.select("id", "category", "feature1", "prediction").show()
// 最適なパラメータを確認 (例)
// cvModel.bestModel.asInstanceOf[PipelineModel].stages(2) match {
// case lrModel: LogisticRegressionModel =>
// println(s"Best RegParam: ${lrModel.getRegParam}")
// println(s"Best ElasticNetParam: ${lrModel.getElasticNetParam}")
// }
spark.stop()
このように、MLlibのパイプラインを使うことで、前処理からモデル学習、評価、チューニングまでの一連の流れを体系的に記述できます。
その他のライブラリ (Breezeなど)
Spark MLlib以外にも、Scalaエコシステムにはデータサイエンスや数値計算に役立つライブラリが存在します。
-
Breeze: NumPyに似た機能を提供する数値計算ライブラリ。線形代数、数値処理、最適化アルゴリズムなどが含まれています。Sparkを使わない小〜中規模のデータ分析や、カスタムアルゴリズムの実装に便利です。
// Breeze の使用例 (build.sbt に依存関係を追加する必要あり) // libraryDependencies += "org.scalanlp" %% "breeze" % "2.1.0" // libraryDependencies += "org.scalanlp" %% "breeze-natives" % "2.1.0" // ネイティブ実装で高速化 import breeze.linalg._ import breeze.numerics._ // ベクトルと行列の作成 val v = DenseVector(1.0, 2.0, 3.0) val m = DenseMatrix((1.0, 2.0), (3.0, 4.0)) println(v) println(m) // 要素ごとの操作 val v2 = v * 2.0 // 各要素を2倍 val v_sin = sin(v) // 各要素のsin // 線形代数演算 val dotProduct = v dot v // 内積 val matrixProduct = m * m // 行列積 println(s"v * 2.0 = $v2") println(s"sin(v) = $v_sin") println(s"v dot v = $dotProduct") println(s"m * m = $matrixProduct") // 統計関数 val mean_v = mean(v) val variance_v = variance(v) println(s"Mean of v: $mean_v, Variance of v: $variance_v")
- その他のライブラリ: 特定の目的(例: ディープラーニング、グラフ処理、可視化)に応じて、様々なライブラリが開発されています。例えば、Deeplearning4j (DL4J) はJVMベースのディープラーニングフレームワークで、Scala APIも提供されています。可視化ライブラリはPythonほど充実していませんが、Vegas (Vega-Liteベース) や、他のライブラリとの連携などが考えられます。
6. 実践的なTips ✨
ScalaとSparkを効果的に使うためのいくつかのヒントを紹介します。
-
遅延評価 (Lazy Evaluation): Sparkの変換操作 (
map
,filter
,select
など) は遅延評価されます。つまり、実際のアクション操作 (collect
,count
,save
など) が呼び出されるまで実行されません。これにより、Sparkは実行計画全体を最適化できます。この挙動を理解しておくことが重要です。 -
不変性の活用: Scalaの不変なデータ構造 (
val
, immutable collections, case class) やSparkの不変な RDD/DataFrame/Dataset を積極的に利用することで、副作用を減らし、コードの予測可能性と並列処理の安全性を高めます。 -
エラーハンドリング (Option, Try, Either):
Option[T]
: 値が存在しない可能性を明示的に扱うための型 (Some(value)
またはNone
)。NullPointerException を避けるのに役立ちます。map
,flatMap
,getOrElse
などで安全に処理できます。Try[T]
: 成功 (Success(value)
) または失敗 (Failure(exception)
) を表す型。例外が発生する可能性のある操作をラップするのに便利です。Either[L, R]
: 2つの可能性のある型を表し、慣例的に左側 (Left
) にエラー、右側 (Right
) に成功値を入れます。より詳細なエラー情報を伝えたい場合に有用です。
// Option の例 val config: Map[String, String] = Map("host" -> "localhost", "port" -> "8080") val maybeTimeout: Option[String] = config.get("timeout") // キーが存在しない場合 None を返す val timeout = maybeTimeout.map(_.toInt).getOrElse(5000) // 文字列をIntに変換、なければデフォルト値 println(s"Timeout: $timeout") // Try の例 import scala.util.Try def parseToInt(s: String): Try[Int] = Try(s.toInt) val result1 = parseToInt("123") // Success(123) val result2 = parseToInt("abc") // Failure(java.lang.NumberFormatException: For input string: "abc") result1.foreach(value => println(s"Parsed successfully: $value")) result2.recover { case e: NumberFormatException => println(s"Failed to parse: $e") }
- テスト: 大規模なデータ処理や機械学習パイプラインでは、テストが不可欠です。ScalaTest や Specs2 などのテスティングフレームワークを使って、ユニットテストや統合テストを作成しましょう。Sparkアプリケーションのテストには、ローカルでSparkSessionを起動して小さなデータセットでテストするなどの工夫が必要です。
-
パフォーマンスチューニング (Spark): Spark UI (通常は
http://localhost:4040
) を活用して、ジョブの実行状況、ステージ、タスク、シャッフル、メモリ使用量などを監視し、ボトルネックを特定します。パーティション数、メモリ割り当て、データシリアライズ形式、シャッフル挙動などのチューニングが重要になることがあります。
7. まとめと今後の学習ステップ 🚀
この記事では、AI/データサイエンス分野におけるScalaの活用について、基本的な概念からApache Sparkを使った実践的なデータ処理、機械学習までを解説しました。
Scalaは、その関数型とオブジェクト指向の融合、静的型付けによる安全性、JVMの基盤、そしてSparkとの卓越した親和性により、特に大規模データ処理や堅牢なシステム構築において強力な選択肢となります。
もちろん、学習曲線はPythonに比べるとやや急かもしれませんが、習得すれば複雑な問題をエレガントかつ効率的に解決する力を得られるでしょう。
今後の学習ステップとしては、以下のような方向性が考えられます。
- Scala言語深化: より高度な関数型プログラミング(Monad、Type Classなど)、Implicits、マクロなどを学ぶ。
- Spark深掘り: Sparkの内部構造、パフォーマンスチューニング、ストリーミング処理 (Structured Streaming)、GraphX (グラフ処理) などを探求する。
- MLlib応用: より多くのアルゴリズムや、特徴量エンジニアリングのテクニック、モデル評価・選択方法を学ぶ。カスタムTransformer/Estimatorの作成。
- エコシステム活用: Breeze, Akka (並行・分散アプリケーション), Play Framework (Web) など、他のScalaライブラリやフレームワークに触れてみる。
- 実践プロジェクト: 実際のデータセットを使って、データ分析や機械学習のプロジェクトに取り組む。Kaggleなどのコンペティションに参加するのも良いでしょう。
ぜひ、ScalaとSparkの世界を探求し、データサイエンスのスキルをさらに高めてください!💪
コメント