Deep Learning for JavaをApache Sparkで動かす方法
2019.12.12
はじめに
Java用の機械学習フレームワークの一つのDeep Learning for Java (DL4J)のプログラムを、大規模データ処理基盤のApache Sparkで動かす手順について、簡単に紹介したいと思います。もっともDL4JがJava用といっても、Sparkのメインプログラミング言語がScalaなので、本記事ではJavaは使わずにScalaを使います。
本記事で注目するDL4Jの特徴は、以下のようなものです。
- Scalaを使うことにより、Javaを使うよりもコンパクトなプログラムが書け、Scalaの並列処理機能も簡単に使える
- Sparkを使うことにより、分散処理を簡単に導入できる
対抗馬としてはIntel BigDLがあり、両者ともまだまだ開発途上という点も共通しています。異なる点としてはDL4JはJVM用として作られているのに対し、BigDLはIntelアーキテクチャ用に作られています。オープンソースのコミュニティとしてはDL4Jの方が活発なようです。
(参考:User Group for BigDL and Analytics Zoo、Deeplearning4j’s Gitter)
なんと言っても両者は同時に使うこともでき(同じjarにDL4J一式とBigDL一式を含めることができる)、その辺りの技術検証も興味深いのですが、今回は入門編ということでDL4Jのみを扱います。
以上の前置きのもとで、本記事では以下のJavaサンプルプログラムを、1)Scala用に書き換え、2)Spark用に変更を加えるという手順について説明します。
MLPMnistTwoLayerExample.java
本記事の内容をについてはJava/Scalaの開発環境は準備されているものとし、また本記事のプログラムの動作確認にはSpark Shell及びApache Toreeを使っています。
どちらを用いる場合も、DL4Jのライブラリをspark-submitあるいはspark-shellのオプションで指定する必要があります。
// 例 --jars dl4j-spark-cluster-1.0.0-beta5-bin.jar
Deep Learning for Java (DL4J)のプログラムが、簡単にApache Sparkで動かせるようになることが伝わりますと幸いです。
JavaプログラムをScalaプログラムに変更する
JavaのコードをScalaのコードに変更するには、機械的に文法を変えるだけで済みます。
例えば、
// Java final int numRows = 28;
は、
// Scala val numRows = 28
のように書き換えていきます。また、オリジナルコードでは出力先がログになっていますが、ログファイルの管理が面倒なため標準出力に変更します。
全て変更すると、以下のようなコードになります。
// Scala /* ***************************************************************************** * Copyright (c) 2019 NHN Techorus Corp. * * This program and the accompanying materials are made available under the * terms of the Apache License, Version 2.0 which is available at * https://www.apache.org/licenses/LICENSE-2.0. * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations * under the License. * * SPDX-License-Identifier: Apache-2.0 ******************************************************************************/ package org.deeplearning4j.examples.feedforward.mnist import org.deeplearning4j.datasets.iterator.impl.MnistDataSetIterator import org.deeplearning4j.nn.conf.MultiLayerConfiguration import org.deeplearning4j.nn.conf.NeuralNetConfiguration import org.deeplearning4j.nn.conf.layers.DenseLayer import org.deeplearning4j.nn.conf.layers.OutputLayer import org.deeplearning4j.nn.multilayer.MultiLayerNetwork import org.deeplearning4j.nn.weights.WeightInit import org.deeplearning4j.optimize.listeners.ScoreIterationListener import org.nd4j.evaluation.classification.Evaluation import org.nd4j.linalg.activations.Activation import org.nd4j.linalg.dataset.api.iterator.DataSetIterator import org.nd4j.linalg.learning.config.Nadam import org.nd4j.linalg.lossfunctions.LossFunctions.LossFunction object MLPMnistTwoLayerExample { def scala() = { //number of rows and columns in the input pictures val numRows = 28 val numColumns = 28 val outputNum = 10 // number of output classes val batchSize = 64 // batch size for each epoch val rngSeed = 123 // random number seed for reproducibility val numEpochs = 15 // number of epochs to perform val rate = 0.0015 // learning rate //Get the DataSetIterators: val mnistTrain = new MnistDataSetIterator(batchSize, 6000, false, true, true, rngSeed) // 1/10 scale val mnistTest = new MnistDataSetIterator(batchSize, 1000, false, false, true, rngSeed) // 1/10 scale println("Build model....") val conf = new NeuralNetConfiguration.Builder() .seed(rngSeed) //include a random seed for reproducibility .activation(Activation.RELU) .weightInit(WeightInit.XAVIER) .updater(new Nadam()) .l2(rate * 0.005) // regularize learning model .list() .layer(new DenseLayer.Builder() //create the first input layer. .nIn(numRows * numColumns) .nOut(500) .build()) .layer(new DenseLayer.Builder() //create the second input layer .nIn(500) .nOut(100) .build()) .layer(new OutputLayer.Builder(LossFunction.NEGATIVELOGLIKELIHOOD) //create hidden layer .activation(Activation.SOFTMAX) .nOut(outputNum) .build()) .build() val model = new MultiLayerNetwork(conf) model.init() model.setListeners(new ScoreIterationListener(5)) //print the score with every iteration println("Train model...."); model.fit(mnistTrain, numEpochs) println("Evaluate model....") val eval = model.evaluate[Evaluation](mnistTest) println(eval.stats()) println("****************Example finished********************") } }
なお、modelという変数にMultiLayerNetworkクラスのインスタンスを入れ学習及び評価をしていますが、評価値を計算する関数evaluationのJava API仕様が最近(1.0.0-beta3)から変更になりました。
// 1.0.0-beta2まで public Evaluation evaluate(DataSetIterator iterator) { return evaluate(iterator, null); }
// 1.0.0-beta3から public <T extends Evaluation> T evaluate(DataSetIterator iterator) { return (T)evaluate(iterator, null); }
型引数を省略してScalaで実行すると次のような例外が出ますので、上記プログラムでは適当な型を指定するように修正してあります。
Name: java.lang.ClassCastException Message: org.deeplearning4j.eval.Evaluation cannot be cast to scala.runtime.Nothing$
実行例は以下です。
Build model.... Train model.... Evaluate model.... ========================Evaluation Metrics======================== # of classes: 10 Accuracy: 0.9738 Precision: 0.9735 Recall: 0.9738 F1 Score: 0.9736 Precision, recall & F1: macro-averaged (equally weighted avg. of 10 classes) =========================Confusion Matrix========================= 0 1 2 3 4 5 6 7 8 9 --------------------------------------------------- 966 0 0 1 1 2 6 0 4 0 | 0 = 0 0 1119 3 1 0 1 4 0 7 0 | 1 = 1 7 2 1002 0 2 0 4 5 9 1 | 2 = 2 0 0 5 976 0 16 0 2 11 0 | 3 = 3 1 1 6 0 952 0 5 1 1 15 | 4 = 4 3 0 0 1 1 873 8 0 4 2 | 5 = 5 3 3 0 1 3 3 944 0 1 0 | 6 = 6 2 5 10 3 1 0 0 994 5 8 | 7 = 7 3 0 3 1 5 7 5 1 948 1 | 8 = 8 2 6 0 5 9 9 1 5 8 964 | 9 = 9 Confusion matrix format: Actual (rowClass) predicted as (columnClass) N times ================================================================== ****************Example finished******************** Time elapsed: 218801 millisecs Time elapsed: 218 secs
ScalaプログラムをSparkで動くように変更する
Scalaのプログラムに、いくつかの変更を加えてSparkで実行できるようにします。
- Sparkでプログラムを実行するためのSparkSessionインスタンスの準備
- 分散深層学習のための仕組みの準備
- RDDの準備
- Spark用のニューラルネットワークモデルの準備
- ループの追加
Sparkプログラミングの決り文句的なものです。
val spark = SparkSession .builder .appName("Spark EMNIST") .getOrCreate()
分散深層学習でよく使われるパラメータサーバの、DL4J独自実装のコードを加えます。DL4JオススメのGradient Sharingは設定等が難しいので、ここでは使用が簡単なParameter Averagingを使っています。
val batchSizePerWorker = 8 val tm = new ParameterAveragingTrainingMaster.Builder(batchSizePerWorker) .averagingFrequency(2) .rngSeed(1234) .aggregationDepth(5) .build()
Sparkを使わない処理ならばイテレータで順番にデータを読み出せば良いのですが、Sparkで処理するためにはorg.apache.spark.rdd.RDDやorg.apache.spark.sql.Datasetの形にしなければならないので、一度読み出してメモリに入れる処理を追加しています。
//Get the DataSetIterators: val mnistTrain = new MnistDataSetIterator(batchSize, true, rngSeed) val mnistTest = new MnistDataSetIterator(batchSize, false, rngSeed) val mnistTrainDatasetList = ArrayBuffer.empty[DataSet] while(mnistTrain.hasNext()) { mnistTrainDatasetList.append(mnistTrain.next()) } val mnistTestDatasetList = ArrayBuffer.empty[DataSet] while(mnistTest.hasNext()) { mnistTestDatasetList.append(mnistTest.next()) }
ネットワークモデルを表現するorg.deeplearning4j.nn.multilayer.MultiLayerNetworkから、Spark用のラッパークラスであるorg.deeplearning4j.spark.impl.multilayer.SparkDl4jMultiLayerに変更しています。Spark用なので、Sparkの実行に必要なSparkContextやパラメータサーバを表す変数を引数として渡しています。
//val model = new MultiLayerNetwork(conf) val model = new SparkDl4jMultiLayer(spark.sparkContext, conf, tm)
org.deeplearning4j.nn.multilayer.MultiLayerNetworkには指定された回数だけループするfit(DataSetIterator iterator, int numEpochs)関数があるのですが、org.deeplearning4j.spark.impl.multilayer.SparkDl4jMultiLayerには無いのでループする処理を追加しています。また、訓練用のデータは予めRDDを作成しています。
val trainRdd = spark.sparkContext.parallelize(mnistTrainDatasetList) for(i <- 0 until epochs) { model.fit(trainRdd) println("Epoch " + i + " complete") }
これらの変更を加えたコードは、以下のようになります。
// Scala on Spark /* ***************************************************************************** * Copyright (c) 2019 NHN Techorus Corp. * * This program and the accompanying materials are made available under the * terms of the Apache License, Version 2.0 which is available at * https://www.apache.org/licenses/LICENSE-2.0. * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations * under the License. * * SPDX-License-Identifier: Apache-2.0 ******************************************************************************/ import org.deeplearning4j.datasets.iterator.impl.MnistDataSetIterator import org.deeplearning4j.nn.conf.MultiLayerConfiguration import org.deeplearning4j.nn.conf.NeuralNetConfiguration import org.deeplearning4j.nn.conf.layers.DenseLayer import org.deeplearning4j.nn.conf.layers.OutputLayer import org.deeplearning4j.nn.multilayer.MultiLayerNetwork import org.deeplearning4j.nn.weights.WeightInit import org.deeplearning4j.optimize.listeners.ScoreIterationListener import org.nd4j.evaluation.classification.Evaluation import org.nd4j.linalg.activations.Activation import org.nd4j.linalg.dataset.api.iterator.DataSetIterator import org.nd4j.linalg.learning.config.Nadam import org.nd4j.linalg.lossfunctions.LossFunctions.LossFunction import org.nd4j.linalg.dataset.DataSet import org.deeplearning4j.spark.impl.multilayer.SparkDl4jMultiLayer import org.deeplearning4j.spark.impl.paramavg.ParameterAveragingTrainingMaster import scala.collection.mutable.ArrayBuffer object MLPMnistTwoLayerExample { def spark() = { val spark = SparkSession .builder .appName("Spark EMNIST") val batchSizePerWorker = 8 val tm = new ParameterAveragingTrainingMaster.Builder(batchSizePerWorker) .averagingFrequency(2) .rngSeed(1234) .aggregationDepth(5) .build() //number of rows and columns in the input pictures val numRows = 28 val numColumns = 28 val outputNum = 10 // number of output classes val batchSize = 64 // batch size for each epoch val rngSeed = 123 // random number seed for reproducibility val numEpochs = 15 // number of epochs to perform val rate = 0.0015 // learning rate //Get the DataSetIterators: val mnistTrain = new MnistDataSetIterator(batchSize, true, rngSeed) val mnistTest = new MnistDataSetIterator(batchSize, false, rngSeed) val mnistTrainDatasetList = ArrayBuffer.empty[DataSet] while(mnistTrain.hasNext()) { mnistTrainDatasetList.append(mnistTrain.next()) } val mnistTestDatasetList = ArrayBuffer.empty[DataSet] while(mnistTest.hasNext()) { mnistTestDatasetList.append(mnistTest.next()) } println("Build model....") val conf = new NeuralNetConfiguration.Builder() .seed(rngSeed) //include a random seed for reproducibility .activation(Activation.RELU) .weightInit(WeightInit.XAVIER) .updater(new Nadam()) .l2(rate * 0.005) // regularize learning model .list() .layer(new DenseLayer.Builder() //create the first input layer. .nIn(numRows * numColumns) .nOut(500) .build()) .layer(new DenseLayer.Builder() //create the second input layer .nIn(500) .nOut(100) .build()) .layer(new OutputLayer.Builder(LossFunction.NEGATIVELOGLIKELIHOOD) //create hidden layer .activation(Activation.SOFTMAX) .nOut(outputNum) .build()) .build() val model = new SparkDl4jMultiLayer(spark.sparkContext, conf, tm) model.setListeners(new ScoreIterationListener(5)) //print the score with every iteration println("Train model...."); val trainRdd = spark.sparkContext.parallelize(mnistTrainDatasetList) for(i <- 0 until epochs) { model.fit(trainRdd) println("Epoch " + i + " complete") } println("Evaluate model....") val testRdd = spark.sparkContext.parallelize(mnistTestDatasetList) val eval = model.evaluate[Evaluation](testRdd) println(eval.stats()) println("****************Example finished********************") spark.stop() } }
実行例は以下です。
Build model.... Train model.... Epoch 0 complete Epoch 1 complete Epoch 2 complete Epoch 3 complete Epoch 4 complete Evaluate model.... ========================Evaluation Metrics======================== # of classes: 10 Accuracy: 0.8510 Precision: 0.8626 Recall: 0.8446 F1 Score: 0.8468 Precision, recall & F1: macro-averaged (equally weighted avg. of 10 classes) =========================Confusion Matrix========================= 0 1 2 3 4 5 6 7 8 9 ----------------------------------------- 62 0 1 2 0 18 3 1 1 0 | 0 = 0 0 119 0 1 0 0 0 0 0 0 | 1 = 1 1 2 67 7 4 0 0 3 8 2 | 2 = 2 0 0 0 99 1 2 1 3 5 0 | 3 = 3 0 0 0 0 98 1 1 0 0 2 | 4 = 4 0 0 0 2 3 70 2 0 4 3 | 5 = 5 0 1 0 0 2 9 77 0 4 0 | 6 = 6 0 4 1 0 5 0 0 86 0 5 | 7 = 7 0 1 0 7 2 3 1 0 86 3 | 8 = 8 0 3 0 1 8 4 0 0 1 87 | 9 = 9 Confusion matrix format: Actual (rowClass) predicted as (columnClass) N times ================================================================== ****************Example finished******************** Time elapsed: 138505 millisecs Time elapsed: 138 secs
ScalaのプログラムをSpark対応にするために5箇所ほどの変更点を上げましたが、決り文句的なものばかりで、簡単に分散処理を導入できることが分かります。ループ回数が同じ場合、Sparkで実行する方が処理時間が短くなりました。ただし、測定した精度(AccuracyからF1値まで全て)が落ちてしまっています。深層学習において分散処理を導入すると精度が落ちてしまうというのはよく知られており、DL4Jの実装ではどのような原因で精度が落ち、どう対処するのが良いかということは違う記事で触れたいと思います。
おわりに
本記事では、Java用の機械学習フレームワークの一つのDeep Learning for Java (DL4J)のサンプルプログラムを例に、JavaのサンプルプログラムをSpark用に変更し、分散処理を導入する手順を簡単に説明しました。
細かい手順や詳細な実行情報については省略しましたが、プログラムの変更は簡単であることがお分かり頂けたと思います。
Deep Learning for Javaは2019年12月現在バージョン1.0.0-beta5が公開されており、活発に開発が続いています。活発すぎて機能追加や仕様変更も多く追いつくのが大変ですが、今のうちから慣れておけばバージョン1.0.0が出る頃には使いこなせるようになるのではないかと思います。
Deep Learning for Javaの開発プロジェクトには、前処理用のDataVec、ハイパーパラメータチューニングのArbiter、強化学習用のRL4J、はてはScalaのためのND4S、PythonのためのJumpy/PyDL4J/PyDataVecなど多くのサブプロジェクトがあり、今後が楽しみです。
テックブログ新着情報のほか、AWSやGoogle Cloudに関するお役立ち情報を配信中!
Follow @twitter大学で民俗学や宗教についてのフィールドワークを楽しんでいたのですが,うっかり新設された結び目理論と幾何学を勉強する研究室に移ってしまい,さらに大学院では一般相対性理論を研究するという迷走した人生を歩んでいます.プログラミングが苦手で勉強中です.
Recommends
こちらもおすすめ
-
5分でわかる、1時間でできる、 OSSコントリビューション
2017.10.23
-
VAEを用いたUNIXセッションのなりすまし検出
2018.12.17
-
画像の特徴を用いたランク学習の論文紹介
2019.6.28
Special Topics
注目記事はこちら
データ分析入門
これから始めるBigQuery基礎知識
2024.02.28
AWSの料金が 10 %割引になる!
『AWSの請求代行リセールサービス』
2024.07.16