Deep Learning for JavaをApache Sparkで動かす方法

Data Science

2019.12.12

Topics

はじめに

Java用の機械学習フレームワークの一つのDeep Learning for Java (DL4J)のプログラムを、大規模データ処理基盤のApache Sparkで動かす手順について、簡単に紹介したいと思います。もっともDL4JがJava用といっても、Sparkのメインプログラミング言語がScalaなので、本記事ではJavaは使わずにScalaを使います。

本記事で注目するDL4Jの特徴は、以下のようなものです。

  • Scalaを使うことにより、Javaを使うよりもコンパクトなプログラムが書け、Scalaの並列処理機能も簡単に使える
  • Sparkを使うことにより、分散処理を簡単に導入できる

対抗馬としてはIntel BigDLがあり、両者ともまだまだ開発途上という点も共通しています。異なる点としてはDL4JはJVM用として作られているのに対し、BigDLはIntelアーキテクチャ用に作られています。オープンソースのコミュニティとしてはDL4Jの方が活発なようです。
(参考:User Group for BigDL and Analytics ZooDeeplearning4j’s Gitter

なんと言っても両者は同時に使うこともでき(同じjarにDL4J一式とBigDL一式を含めることができる)、その辺りの技術検証も興味深いのですが、今回は入門編ということでDL4Jのみを扱います。

以上の前置きのもとで、本記事では以下のJavaサンプルプログラムを、1)Scala用に書き換え、2)Spark用に変更を加えるという手順について説明します。
MLPMnistTwoLayerExample.java

本記事の内容をについてはJava/Scalaの開発環境は準備されているものとし、また本記事のプログラムの動作確認にはSpark Shell及びApache Toreeを使っています。
どちらを用いる場合も、DL4Jのライブラリをspark-submitあるいはspark-shellのオプションで指定する必要があります。

// 例
 --jars dl4j-spark-cluster-1.0.0-beta5-bin.jar

Deep Learning for Java (DL4J)のプログラムが、簡単にApache Sparkで動かせるようになることが伝わりますと幸いです。

JavaプログラムをScalaプログラムに変更する

JavaのコードをScalaのコードに変更するには、機械的に文法を変えるだけで済みます。
例えば、

// Java
final int numRows = 28;

は、

// Scala
val numRows = 28

のように書き換えていきます。また、オリジナルコードでは出力先がログになっていますが、ログファイルの管理が面倒なため標準出力に変更します。

全て変更すると、以下のようなコードになります。

// Scala
/* *****************************************************************************
 * Copyright (c) 2019 NHN Techorus Corp.
 *
 * This program and the accompanying materials are made available under the
 * terms of the Apache License, Version 2.0 which is available at
 * https://www.apache.org/licenses/LICENSE-2.0.
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 *
 * SPDX-License-Identifier: Apache-2.0
 ******************************************************************************/
package org.deeplearning4j.examples.feedforward.mnist

import org.deeplearning4j.datasets.iterator.impl.MnistDataSetIterator
import org.deeplearning4j.nn.conf.MultiLayerConfiguration
import org.deeplearning4j.nn.conf.NeuralNetConfiguration
import org.deeplearning4j.nn.conf.layers.DenseLayer
import org.deeplearning4j.nn.conf.layers.OutputLayer
import org.deeplearning4j.nn.multilayer.MultiLayerNetwork
import org.deeplearning4j.nn.weights.WeightInit
import org.deeplearning4j.optimize.listeners.ScoreIterationListener
import org.nd4j.evaluation.classification.Evaluation
import org.nd4j.linalg.activations.Activation
import org.nd4j.linalg.dataset.api.iterator.DataSetIterator
import org.nd4j.linalg.learning.config.Nadam
import org.nd4j.linalg.lossfunctions.LossFunctions.LossFunction

object MLPMnistTwoLayerExample {

     def scala() = {
        //number of rows and columns in the input pictures
        val numRows = 28
        val numColumns = 28
        val outputNum = 10 // number of output classes
        val batchSize = 64 // batch size for each epoch
        val rngSeed = 123 // random number seed for reproducibility
        val numEpochs = 15 // number of epochs to perform
        val rate = 0.0015 // learning rate

        //Get the DataSetIterators:
        val mnistTrain = new MnistDataSetIterator(batchSize, 6000, false, true, true, rngSeed) // 1/10 scale
        val mnistTest = new MnistDataSetIterator(batchSize, 1000, false, false, true, rngSeed) // 1/10 scale

        println("Build model....")
        val conf = new NeuralNetConfiguration.Builder()
            .seed(rngSeed) //include a random seed for reproducibility
            .activation(Activation.RELU)
            .weightInit(WeightInit.XAVIER)
            .updater(new Nadam())
            .l2(rate * 0.005) // regularize learning model
            .list()
            .layer(new DenseLayer.Builder() //create the first input layer.
                    .nIn(numRows * numColumns)
                    .nOut(500)
                    .build())
            .layer(new DenseLayer.Builder() //create the second input layer
                    .nIn(500)
                    .nOut(100)
                    .build())
            .layer(new OutputLayer.Builder(LossFunction.NEGATIVELOGLIKELIHOOD) //create hidden layer
                    .activation(Activation.SOFTMAX)
                    .nOut(outputNum)
                    .build())
            .build()

        val model = new MultiLayerNetwork(conf)
        model.init()
        model.setListeners(new ScoreIterationListener(5))  //print the score with every iteration

        println("Train model....");
        model.fit(mnistTrain, numEpochs)
        println("Evaluate model....")
        val eval = model.evaluate[Evaluation](mnistTest)
        println(eval.stats())
        println("****************Example finished********************")
    }

}

なお、modelという変数にMultiLayerNetworkクラスのインスタンスを入れ学習及び評価をしていますが、評価値を計算する関数evaluationのJava API仕様が最近(1.0.0-beta3)から変更になりました。

// 1.0.0-beta2まで
public Evaluation evaluate(DataSetIterator iterator) {
    return evaluate(iterator, null);
}
// 1.0.0-beta3から
public <T extends Evaluation> T evaluate(DataSetIterator iterator) {
    return (T)evaluate(iterator, null);
}

型引数を省略してScalaで実行すると次のような例外が出ますので、上記プログラムでは適当な型を指定するように修正してあります。

Name: java.lang.ClassCastException
Message: org.deeplearning4j.eval.Evaluation cannot be cast to scala.runtime.Nothing$

実行例は以下です。


Build model....
Train model....
Evaluate model....


========================Evaluation Metrics========================
 # of classes:    10
 Accuracy:        0.9738
 Precision:       0.9735
 Recall:          0.9738
 F1 Score:        0.9736
Precision, recall & F1: macro-averaged (equally weighted avg. of 10 classes)


=========================Confusion Matrix=========================
    0    1    2    3    4    5    6    7    8    9
---------------------------------------------------
  966    0    0    1    1    2    6    0    4    0 | 0 = 0
    0 1119    3    1    0    1    4    0    7    0 | 1 = 1
    7    2 1002    0    2    0    4    5    9    1 | 2 = 2
    0    0    5  976    0   16    0    2   11    0 | 3 = 3
    1    1    6    0  952    0    5    1    1   15 | 4 = 4
    3    0    0    1    1  873    8    0    4    2 | 5 = 5
    3    3    0    1    3    3  944    0    1    0 | 6 = 6
    2    5   10    3    1    0    0  994    5    8 | 7 = 7
    3    0    3    1    5    7    5    1  948    1 | 8 = 8
    2    6    0    5    9    9    1    5    8  964 | 9 = 9

Confusion matrix format: Actual (rowClass) predicted as (columnClass) N times
==================================================================
****************Example finished********************
Time elapsed: 218801 millisecs
Time elapsed: 218 secs

ScalaプログラムをSparkで動くように変更する

Scalaのプログラムに、いくつかの変更を加えてSparkで実行できるようにします。

  1. Sparkでプログラムを実行するためのSparkSessionインスタンスの準備
  2. Sparkプログラミングの決り文句的なものです。

           val spark = SparkSession
            .builder
            .appName("Spark EMNIST")
            .getOrCreate()
    
  3. 分散深層学習のための仕組みの準備
  4. 分散深層学習でよく使われるパラメータサーバの、DL4J独自実装のコードを加えます。DL4JオススメのGradient Sharingは設定等が難しいので、ここでは使用が簡単なParameter Averagingを使っています。

           val batchSizePerWorker = 8
           val tm =  new ParameterAveragingTrainingMaster.Builder(batchSizePerWorker)
            .averagingFrequency(2)
            .rngSeed(1234)
            .aggregationDepth(5)
            .build()
    
  5. RDDの準備
  6. Sparkを使わない処理ならばイテレータで順番にデータを読み出せば良いのですが、Sparkで処理するためにはorg.apache.spark.rdd.RDDやorg.apache.spark.sql.Datasetの形にしなければならないので、一度読み出してメモリに入れる処理を追加しています。

            //Get the DataSetIterators:
            val mnistTrain = new MnistDataSetIterator(batchSize, true, rngSeed)
            val mnistTest = new MnistDataSetIterator(batchSize, false, rngSeed)
    
            val mnistTrainDatasetList = ArrayBuffer.empty[DataSet]
            while(mnistTrain.hasNext()) {
                mnistTrainDatasetList.append(mnistTrain.next())
            }
            val mnistTestDatasetList = ArrayBuffer.empty[DataSet]
            while(mnistTest.hasNext()) {
                mnistTestDatasetList.append(mnistTest.next())
            }
    
  7. Spark用のニューラルネットワークモデルの準備
  8. ネットワークモデルを表現するorg.deeplearning4j.nn.multilayer.MultiLayerNetworkから、Spark用のラッパークラスであるorg.deeplearning4j.spark.impl.multilayer.SparkDl4jMultiLayerに変更しています。Spark用なので、Sparkの実行に必要なSparkContextやパラメータサーバを表す変数を引数として渡しています。

            //val model = new MultiLayerNetwork(conf)
            val model = new SparkDl4jMultiLayer(spark.sparkContext, conf, tm)
    
  9. ループの追加
  10. org.deeplearning4j.nn.multilayer.MultiLayerNetworkには指定された回数だけループするfit(DataSetIterator iterator, int numEpochs)関数があるのですが、org.deeplearning4j.spark.impl.multilayer.SparkDl4jMultiLayerには無いのでループする処理を追加しています。また、訓練用のデータは予めRDDを作成しています。

           val trainRdd = spark.sparkContext.parallelize(mnistTrainDatasetList)
           for(i <- 0 until epochs) {
             model.fit(trainRdd)
             println("Epoch " + i + " complete")
           }
    

これらの変更を加えたコードは、以下のようになります。

// Scala on Spark
/* *****************************************************************************
 * Copyright (c) 2019 NHN Techorus Corp.
 *
 * This program and the accompanying materials are made available under the
 * terms of the Apache License, Version 2.0 which is available at
 * https://www.apache.org/licenses/LICENSE-2.0.
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 *
 * SPDX-License-Identifier: Apache-2.0
 ******************************************************************************/
import org.deeplearning4j.datasets.iterator.impl.MnistDataSetIterator
import org.deeplearning4j.nn.conf.MultiLayerConfiguration
import org.deeplearning4j.nn.conf.NeuralNetConfiguration
import org.deeplearning4j.nn.conf.layers.DenseLayer
import org.deeplearning4j.nn.conf.layers.OutputLayer
import org.deeplearning4j.nn.multilayer.MultiLayerNetwork
import org.deeplearning4j.nn.weights.WeightInit
import org.deeplearning4j.optimize.listeners.ScoreIterationListener
import org.nd4j.evaluation.classification.Evaluation
import org.nd4j.linalg.activations.Activation
import org.nd4j.linalg.dataset.api.iterator.DataSetIterator
import org.nd4j.linalg.learning.config.Nadam
import org.nd4j.linalg.lossfunctions.LossFunctions.LossFunction

import org.nd4j.linalg.dataset.DataSet
import org.deeplearning4j.spark.impl.multilayer.SparkDl4jMultiLayer
import org.deeplearning4j.spark.impl.paramavg.ParameterAveragingTrainingMaster
import scala.collection.mutable.ArrayBuffer

object MLPMnistTwoLayerExample {

     def spark() = {
         
       val spark = SparkSession
        .builder
        .appName("Spark EMNIST")
       val batchSizePerWorker = 8
       val tm =  new ParameterAveragingTrainingMaster.Builder(batchSizePerWorker)
        .averagingFrequency(2)
        .rngSeed(1234)
        .aggregationDepth(5)
        .build()

        //number of rows and columns in the input pictures
        val numRows = 28
        val numColumns = 28
        val outputNum = 10 // number of output classes
        val batchSize = 64 // batch size for each epoch
        val rngSeed = 123 // random number seed for reproducibility
        val numEpochs = 15 // number of epochs to perform
        val rate = 0.0015 // learning rate

        //Get the DataSetIterators:
        val mnistTrain = new MnistDataSetIterator(batchSize, true, rngSeed)
        val mnistTest = new MnistDataSetIterator(batchSize, false, rngSeed)

        val mnistTrainDatasetList = ArrayBuffer.empty[DataSet]
        while(mnistTrain.hasNext()) {
            mnistTrainDatasetList.append(mnistTrain.next())
        }
        val mnistTestDatasetList = ArrayBuffer.empty[DataSet]
        while(mnistTest.hasNext()) {
            mnistTestDatasetList.append(mnistTest.next())
        }

        println("Build model....")
        val conf = new NeuralNetConfiguration.Builder()
            .seed(rngSeed) //include a random seed for reproducibility
            .activation(Activation.RELU)
            .weightInit(WeightInit.XAVIER)
            .updater(new Nadam())
            .l2(rate * 0.005) // regularize learning model
            .list()
            .layer(new DenseLayer.Builder() //create the first input layer.
                    .nIn(numRows * numColumns)
                    .nOut(500)
                    .build())
            .layer(new DenseLayer.Builder() //create the second input layer
                    .nIn(500)
                    .nOut(100)
                    .build())
            .layer(new OutputLayer.Builder(LossFunction.NEGATIVELOGLIKELIHOOD) //create hidden layer
                    .activation(Activation.SOFTMAX)
                    .nOut(outputNum)
                    .build())
            .build()

        val model = new SparkDl4jMultiLayer(spark.sparkContext, conf, tm)
        model.setListeners(new ScoreIterationListener(5))  //print the score with every iteration

        println("Train model....");
        val trainRdd = spark.sparkContext.parallelize(mnistTrainDatasetList)
        for(i <- 0 until epochs) {
          model.fit(trainRdd)
          println("Epoch " + i + " complete")
        }
        println("Evaluate model....")
        val testRdd = spark.sparkContext.parallelize(mnistTestDatasetList)
        val eval = model.evaluate[Evaluation](testRdd)
        println(eval.stats())
        println("****************Example finished********************")
        spark.stop()
    }
}

実行例は以下です。

Build model....
Train model....
Epoch 0 complete
Epoch 1 complete
Epoch 2 complete
Epoch 3 complete
Epoch 4 complete
Evaluate model....


========================Evaluation Metrics========================
 # of classes:    10
 Accuracy:        0.8510
 Precision:       0.8626
 Recall:          0.8446
 F1 Score:        0.8468
Precision, recall & F1: macro-averaged (equally weighted avg. of 10 classes)


=========================Confusion Matrix=========================
   0   1   2   3   4   5   6   7   8   9
-----------------------------------------
  62   0   1   2   0  18   3   1   1   0 | 0 = 0
   0 119   0   1   0   0   0   0   0   0 | 1 = 1
   1   2  67   7   4   0   0   3   8   2 | 2 = 2
   0   0   0  99   1   2   1   3   5   0 | 3 = 3
   0   0   0   0  98   1   1   0   0   2 | 4 = 4
   0   0   0   2   3  70   2   0   4   3 | 5 = 5
   0   1   0   0   2   9  77   0   4   0 | 6 = 6
   0   4   1   0   5   0   0  86   0   5 | 7 = 7
   0   1   0   7   2   3   1   0  86   3 | 8 = 8
   0   3   0   1   8   4   0   0   1  87 | 9 = 9

Confusion matrix format: Actual (rowClass) predicted as (columnClass) N times
==================================================================
****************Example finished********************
Time elapsed: 138505 millisecs
Time elapsed: 138 secs

ScalaのプログラムをSpark対応にするために5箇所ほどの変更点を上げましたが、決り文句的なものばかりで、簡単に分散処理を導入できることが分かります。ループ回数が同じ場合、Sparkで実行する方が処理時間が短くなりました。ただし、測定した精度(AccuracyからF1値まで全て)が落ちてしまっています。深層学習において分散処理を導入すると精度が落ちてしまうというのはよく知られており、DL4Jの実装ではどのような原因で精度が落ち、どう対処するのが良いかということは違う記事で触れたいと思います。

おわりに

本記事では、Java用の機械学習フレームワークの一つのDeep Learning for Java (DL4J)のサンプルプログラムを例に、JavaのサンプルプログラムをSpark用に変更し、分散処理を導入する手順を簡単に説明しました。
細かい手順や詳細な実行情報については省略しましたが、プログラムの変更は簡単であることがお分かり頂けたと思います。

Deep Learning for Javaは2019年12月現在バージョン1.0.0-beta5が公開されており、活発に開発が続いています。活発すぎて機能追加や仕様変更も多く追いつくのが大変ですが、今のうちから慣れておけばバージョン1.0.0が出る頃には使いこなせるようになるのではないかと思います。
Deep Learning for Javaの開発プロジェクトには、前処理用のDataVecハイパーパラメータチューニングのArbiter強化学習用のRL4J、はてはScalaのためのND4S、PythonのためのJumpy/PyDL4J/PyDataVecなど多くのサブプロジェクトがあり、今後が楽しみです。

ts

大学で民俗学や宗教についてのフィールドワークを楽しんでいたのですが,うっかり新設された結び目理論と幾何学を勉強する研究室に移ってしまい,さらに大学院では一般相対性理論を研究するという迷走した人生を歩んでいます.プログラミングが苦手で勉強中です.

Recommends

こちらもおすすめ

Special Topics

注目記事はこちら