3、Spark MLlib Deep Learning Convolution Neural Network (深度学习-卷积神经网络)3.1
Spark MLlib Deep Learning工具箱,是依据现有深度学习教程《UFLDL教程》中的算法。在SparkMLlib中的实现。详细Spark MLlib Deep Learning(深度学习)文件夹结构:
第一章Neural Net(NN)
1、源代码
2、源代码解析
3、实例
第二章Deep Belief Nets(DBNs)
1、源代码
2、源代码解析
3、实例
第三章Convolution Neural Network(CNN)
1、源代码
2、源代码解析
3、实例
第四章 Stacked Auto-Encoders(SAE)
第五章CAE
第三章Convolution Neural Network (卷积神经网络)
1 源代码
眼下SparkMLlib Deep Learning工具箱源代码的github地址为:
1.1 CNN代码
package CNNimport org.apache.spark._import org.apache.spark.SparkContext._import org.apache.spark.rdd.RDDimport org.apache.spark.Loggingimport org.apache.spark.mllib.regression.LabeledPointimport org.apache.spark.mllib.linalg._import org.apache.spark.mllib.linalg.distributed.RowMatriximport breeze.linalg.{ Matrix => BM, CSCMatrix => BSM, DenseMatrix => BDM, Vector => BV, DenseVector => BDV, SparseVector => BSV, axpy => brzAxpy, svd => brzSvd, accumulate => Accumulate, rot90 => Rot90, sum => Bsum}import breeze.numerics.{ exp => Bexp, tanh => Btanh}import scala.collection.mutable.ArrayBufferimport java.util.Randomimport scala.math._/** * types:网络层类别 * outputmaps:特征map数量 * kernelsize:卷积核k大小 * k: 卷积核 * b: 偏置 * dk: 卷积核的偏导 * db: 偏置的偏导 * scale: pooling大小 */case class CNNLayers( types: String, outputmaps: Double, kernelsize: Double, scale: Double, k: Array[Array[BDM[Double]]], b: Array[Double], dk: Array[Array[BDM[Double]]], db: Array[Double]) extends Serializable/** * CNN(convolution neural network)卷积神经网络 */class CNN( private var mapsize: BDM[Double], private var types: Array[String], private var layer: Int, private var onum: Int, private var outputmaps: Array[Double], private var kernelsize: Array[Double], private var scale: Array[Double], private var alpha: Double, private var batchsize: Double, private var numepochs: Double) extends Serializable with Logging {// var mapsize = new BDM(1, 2, Array(28.0, 28.0))// var types = Array("i", "c", "s", "c", "s")// var layer = 5// var onum = 10 // var outputmaps = Array(0.0, 6.0, 0.0, 12.0, 0.0)// var kernelsize = Array(0.0, 5.0, 0.0, 5.0, 0.0)// var scale = Array(0.0, 0.0, 2.0, 0.0, 2.0)// var alpha = 1.0// var batchsize = 50.0// var numepochs = 1.0 def this() = this(new BDM(1, 2, Array(28.0, 28.0)), Array("i", "c", "s", "c", "s"), 5, 10, Array(0.0, 6.0, 0.0, 12.0, 0.0), Array(0.0, 5.0, 0.0, 5.0, 0.0), Array(0.0, 0.0, 2.0, 0.0, 2.0), 1.0, 50.0, 1.0) /** 设置输入层大小. Default: [28, 28]. */ def setMapsize(mapsize: BDM[Double]): this.type = { this.mapsize = mapsize this } /** 设置网络层类别. Default: [1"i", "c", "s", "c", "s"]. */ def setTypes(types: Array[String]): this.type = { this.types = types this } /** 设置网络层数. Default: 5. */ def setLayer(layer: Int): this.type = { this.layer = layer this } /** 设置输出维度. Default: 10. */ def setOnum(onum: Int): this.type = { this.onum = onum this } /** 设置特征map数量. Default: [0.0, 6.0, 0.0, 12.0, 0.0]. */ def setOutputmaps(outputmaps: Array[Double]): this.type = { this.outputmaps = outputmaps this } /** 设置卷积核k大小. Default: [0.0, 5.0, 0.0, 5.0, 0.0]. */ def setKernelsize(kernelsize: Array[Double]): this.type = { this.kernelsize = kernelsize this } /** 设置scale大小. Default: [0.0, 0.0, 2.0, 0.0, 2.0]. */ def setScale(scale: Array[Double]): this.type = { this.scale = scale this } /** 设置学习因子. Default: 1. */ def setAlpha(alpha: Double): this.type = { this.alpha = alpha this } /** 设置迭代大小. Default: 50. */ def setBatchsize(batchsize: Double): this.type = { this.batchsize = batchsize this } /** 设置迭代次数. Default: 1. */ def setNumepochs(numepochs: Double): this.type = { this.numepochs = numepochs this } /** 卷积神经网络层參数初始化. */ def CnnSetup: (Array[CNNLayers], BDM[Double], BDM[Double], Double) = { var inputmaps1 = 1.0 var mapsize1 = mapsize var confinit = ArrayBuffer[CNNLayers]() for (l <- 0 to layer - 1) { // layer val type1 = types(l) val outputmap1 = outputmaps(l) val kernelsize1 = kernelsize(l) val scale1 = scale(l) val layersconf = if (type1 == "s") { // 每一层參数初始化 mapsize1 = mapsize1 / scale1 val b1 = Array.fill(inputmaps1.toInt)(0.0) val ki = Array(Array(BDM.zeros[Double](1, 1))) new CNNLayers(type1, outputmap1, kernelsize1, scale1, ki, b1, ki, b1) } else if (type1 == "c") { mapsize1 = mapsize1 - kernelsize1 + 1.0 val fan_out = outputmap1 * math.pow(kernelsize1, 2) val fan_in = inputmaps1 * math.pow(kernelsize1, 2) val ki = ArrayBuffer[Array[BDM[Double]]]() for (i <- 0 to inputmaps1.toInt - 1) { // input map val kj = ArrayBuffer[BDM[Double]]() for (j <- 0 to outputmap1.toInt - 1) { // output map val kk = (BDM.rand[Double](kernelsize1.toInt, kernelsize1.toInt) - 0.5) * 2.0 * sqrt(6.0 / (fan_in + fan_out)) kj += kk } ki += kj.toArray } val b1 = Array.fill(outputmap1.toInt)(0.0) inputmaps1 = outputmap1 new CNNLayers(type1, outputmap1, kernelsize1, scale1, ki.toArray, b1, ki.toArray, b1) } else { val ki = Array(Array(BDM.zeros[Double](1, 1))) val b1 = Array(0.0) new CNNLayers(type1, outputmap1, kernelsize1, scale1, ki, b1, ki, b1) } confinit += layersconf } val fvnum = mapsize1(0, 0) * mapsize1(0, 1) * inputmaps1 val ffb = BDM.zeros[Double](onum, 1) val ffW = (BDM.rand[Double](onum, fvnum.toInt) - 0.5) * 2.0 * sqrt(6.0 / (onum + fvnum)) (confinit.toArray, ffb, ffW, alpha) } /** * 执行卷积神经网络算法. */ def CNNtrain(train_d: RDD[(BDM[Double], BDM[Double])], opts: Array[Double]): CNNModel = { val sc = train_d.sparkContext var initStartTime = System.currentTimeMillis() var initEndTime = System.currentTimeMillis() // 參数初始化配置 var (cnn_layers, cnn_ffb, cnn_ffW, cnn_alpha) = CnnSetup // 样本数据划分:训练数据、交叉检验数据 val validation = opts(2) val splitW1 = Array(1.0 - validation, validation) val train_split1 = train_d.randomSplit(splitW1, System.nanoTime()) val train_t = train_split1(0) val train_v = train_split1(1) // m:训练样本的数量 val m = train_t.count // 计算batch的数量 val batchsize = opts(0).toInt val numepochs = opts(1).toInt val numbatches = (m / batchsize).toInt var rL = Array.fill(numepochs * numbatches.toInt)(0.0) var n = 0 // numepochs是循环的次数 for (i <- 1 to numepochs) { initStartTime = System.currentTimeMillis() val splitW2 = Array.fill(numbatches)(1.0 / numbatches) // 依据分组权重,随机划分每组样本数据 for (l <- 1 to numbatches) { // 权重 val bc_cnn_layers = sc.broadcast(cnn_layers) val bc_cnn_ffb = sc.broadcast(cnn_ffb) val bc_cnn_ffW = sc.broadcast(cnn_ffW) // 样本划分 val train_split2 = train_t.randomSplit(splitW2, System.nanoTime()) val batch_xy1 = train_split2(l - 1) // CNNff是进行前向传播 // net = cnnff(net, batch_x); val train_cnnff = CNN.CNNff(batch_xy1, bc_cnn_layers, bc_cnn_ffb, bc_cnn_ffW) // CNNbp是后向传播 // net = cnnbp(net, batch_y); val train_cnnbp = CNN.CNNbp(train_cnnff, bc_cnn_layers, bc_cnn_ffb, bc_cnn_ffW) // 权重更新 // net = cnnapplygrads(net, opts); val train_nnapplygrads = CNN.CNNapplygrads(train_cnnbp, bc_cnn_ffb, bc_cnn_ffW, cnn_alpha) cnn_ffW = train_nnapplygrads._1 cnn_ffb = train_nnapplygrads._2 cnn_layers = train_nnapplygrads._3 // error and loss // 输出误差计算 // net.L = 1/2* sum(net.e(:) .^ 2) / size(net.e, 2); val rdd_loss1 = train_cnnbp._1.map(f => f._5) val (loss2, counte) = rdd_loss1.treeAggregate((0.0, 0L))( seqOp = (c, v) => { // c: (e, count), v: (m) val e1 = c._1 val e2 = (v :* v).sum val esum = e1 + e2 (esum, c._2 + 1) }, combOp = (c1, c2) => { // c: (e, count) val e1 = c1._1 val e2 = c2._1 val esum = e1 + e2 (esum, c1._2 + c2._2) }) val Loss = (loss2 / counte.toDouble) * 0.5 if (n == 0) { rL(n) = Loss } else { rL(n) = 0.09 * rL(n - 1) + 0.01 * Loss } n = n + 1 } initEndTime = System.currentTimeMillis() // 打印输出结果 printf("epoch: numepochs = %d , Took = %d seconds; batch train mse = %f.\n", i, scala.math.ceil((initEndTime - initStartTime).toDouble / 1000).toLong, rL(n - 1)) } // 计算训练误差及交叉检验误差 // Full-batch train mse var loss_train_e = 0.0 var loss_val_e = 0.0 loss_train_e = CNN.CNNeval(train_t, sc.broadcast(cnn_layers), sc.broadcast(cnn_ffb), sc.broadcast(cnn_ffW)) if (validation > 0) loss_val_e = CNN.CNNeval(train_v, sc.broadcast(cnn_layers), sc.broadcast(cnn_ffb), sc.broadcast(cnn_ffW)) printf("epoch: Full-batch train mse = %f, val mse = %f.\n", loss_train_e, loss_val_e) new CNNModel(cnn_layers, cnn_ffW, cnn_ffb) }}/** * NN(neural network) */object CNN extends Serializable { // Initialization mode names /** * sigm激活函数 * X = 1./(1+exp(-P)); */ def sigm(matrix: BDM[Double]): BDM[Double] = { val s1 = 1.0 / (Bexp(matrix * (-1.0)) + 1.0) s1 } /** * tanh激活函数 * f=1.7159*tanh(2/3.*A); */ def tanh_opt(matrix: BDM[Double]): BDM[Double] = { val s1 = Btanh(matrix * (2.0 / 3.0)) * 1.7159 s1 } /** * 克罗内克积 * */ def expand(a: BDM[Double], s: Array[Int]): BDM[Double] = { // val a = BDM((1.0, 2.0), (3.0, 4.0), (5.0, 6.0)) // val s = Array(3, 2) val sa = Array(a.rows, a.cols) var tt = new Array[Array[Int]](sa.length) for (ii <- sa.length - 1 to 0 by -1) { var h = BDV.zeros[Int](sa(ii) * s(ii)) h(0 to sa(ii) * s(ii) - 1 by s(ii)) := 1 tt(ii) = Accumulate(h).data } var b = BDM.zeros[Double](tt(0).length, tt(1).length) for (j1 <- 0 to b.rows - 1) { for (j2 <- 0 to b.cols - 1) { b(j1, j2) = a(tt(0)(j1) - 1, tt(1)(j2) - 1) } } b } /** * convn卷积计算 */ def convn(m0: BDM[Double], k0: BDM[Double], shape: String): BDM[Double] = { //val m0 = BDM((1.0, 1.0, 1.0, 1.0), (0.0, 0.0, 1.0, 1.0), (0.0, 1.0, 1.0, 0.0), (0.0, 1.0, 1.0, 0.0)) //val k0 = BDM((1.0, 1.0), (0.0, 1.0)) //val m0 = BDM((1.0, 1.0, 1.0), (1.0, 1.0, 1.0), (1.0, 1.0, 1.0)) //val k0 = BDM((1.0, 2.0, 3.0), (4.0, 5.0, 6.0), (7.0, 8.0, 9.0)) val out1 = shape match { case "valid" => val m1 = m0 val k1 = k0.t val row1 = m1.rows - k1.rows + 1 val col1 = m1.cols - k1.cols + 1 var m2 = BDM.zeros[Double](row1, col1) for (i <- 0 to row1 - 1) { for (j <- 0 to col1 - 1) { val r1 = i val r2 = r1 + k1.rows - 1 val c1 = j val c2 = c1 + k1.cols - 1 val mi = m1(r1 to r2, c1 to c2) m2(i, j) = (mi :* k1).sum } } m2 case "full" => var m1 = BDM.zeros[Double](m0.rows + 2 * (k0.rows - 1), m0.cols + 2 * (k0.cols - 1)) for (i <- 0 to m0.rows - 1) { for (j <- 0 to m0.cols - 1) { m1((k0.rows - 1) + i, (k0.cols - 1) + j) = m0(i, j) } } val k1 = Rot90(Rot90(k0)) val row1 = m1.rows - k1.rows + 1 val col1 = m1.cols - k1.cols + 1 var m2 = BDM.zeros[Double](row1, col1) for (i <- 0 to row1 - 1) { for (j <- 0 to col1 - 1) { val r1 = i val r2 = r1 + k1.rows - 1 val c1 = j val c2 = c1 + k1.cols - 1 val mi = m1(r1 to r2, c1 to c2) m2(i, j) = (mi :* k1).sum } } m2 } out1 } /** * cnnff是进行前向传播 * 计算神经网络中的每一个节点的输出值; */ def CNNff( batch_xy1: RDD[(BDM[Double], BDM[Double])], bc_cnn_layers: org.apache.spark.broadcast.Broadcast[Array[CNNLayers]], bc_cnn_ffb: org.apache.spark.broadcast.Broadcast[BDM[Double]], bc_cnn_ffW: org.apache.spark.broadcast.Broadcast[BDM[Double]]): RDD[(BDM[Double], Array[Array[BDM[Double]]], BDM[Double], BDM[Double])] = { // 第1层:a(1)=[x] val train_data1 = batch_xy1.map { f => val lable = f._1 val features = f._2 val nna1 = Array(features) val nna = ArrayBuffer[Array[BDM[Double]]]() nna += nna1 (lable, nna) } // 第2至n-1层计算 val train_data2 = train_data1.map { f => val lable = f._1 val nn_a = f._2 var inputmaps1 = 1.0 val n = bc_cnn_layers.value.length // for each layer for (l <- 1 to n - 1) { val type1 = bc_cnn_layers.value(l).types val outputmap1 = bc_cnn_layers.value(l).outputmaps val kernelsize1 = bc_cnn_layers.value(l).kernelsize val scale1 = bc_cnn_layers.value(l).scale val k1 = bc_cnn_layers.value(l).k val b1 = bc_cnn_layers.value(l).b val nna1 = ArrayBuffer[BDM[Double]]() if (type1 == "c") { for (j <- 0 to outputmap1.toInt - 1) { // output map // create temp output map var z = BDM.zeros[Double](nn_a(l - 1)(0).rows - kernelsize1.toInt + 1, nn_a(l - 1)(0).cols - kernelsize1.toInt + 1) for (i <- 0 to inputmaps1.toInt - 1) { // input map // convolve with corresponding kernel and add to temp output map // z = z + convn(net.layers{l - 1}.a{i}, net.layers{l}.k{i}{j}, 'valid'); z = z + convn(nn_a(l - 1)(i), k1(i)(j), "valid") } // add bias, pass through nonlinearity // net.layers{l}.a{j} = sigm(z + net.layers{l}.b{j}) val nna0 = sigm(z + b1(j)) nna1 += nna0 } nn_a += nna1.toArray inputmaps1 = outputmap1 } else if (type1 == "s") { for (j <- 0 to inputmaps1.toInt - 1) { // z = convn(net.layers{l - 1}.a{j}, ones(net.layers{l}.scale) / (net.layers{l}.scale ^ 2), 'valid'); replace with variable // net.layers{l}.a{j} = z(1 : net.layers{l}.scale : end, 1 : net.layers{l}.scale : end, :); val z = convn(nn_a(l - 1)(j), BDM.ones[Double](scale1.toInt, scale1.toInt) / (scale1 * scale1), "valid") val zs1 = z(::, 0 to -1 by scale1.toInt).t + 0.0 val zs2 = zs1(::, 0 to -1 by scale1.toInt).t + 0.0 val nna0 = zs2 nna1 += nna0 } nn_a += nna1.toArray } } // concatenate all end layer feature maps into vector val nn_fv1 = ArrayBuffer[Double]() for (j <- 0 to nn_a(n - 1).length - 1) { nn_fv1 ++= nn_a(n - 1)(j).data } val nn_fv = new BDM[Double](nn_fv1.length, 1, nn_fv1.toArray) // feedforward into output perceptrons // net.o = sigm(net.ffW * net.fv + repmat(net.ffb, 1, size(net.fv, 2))); val nn_o = sigm(bc_cnn_ffW.value * nn_fv + bc_cnn_ffb.value) (lable, nn_a.toArray, nn_fv, nn_o) } train_data2 } /** * CNNbp是后向传播 * 计算权重的平均偏导数 */ def CNNbp( train_cnnff: RDD[(BDM[Double], Array[Array[BDM[Double]]], BDM[Double], BDM[Double])], bc_cnn_layers: org.apache.spark.broadcast.Broadcast[Array[CNNLayers]], bc_cnn_ffb: org.apache.spark.broadcast.Broadcast[BDM[Double]], bc_cnn_ffW: org.apache.spark.broadcast.Broadcast[BDM[Double]]): (RDD[(BDM[Double], Array[Array[BDM[Double]]], BDM[Double], BDM[Double], BDM[Double], BDM[Double], BDM[Double], Array[Array[BDM[Double]]])], BDM[Double], BDM[Double], Array[CNNLayers]) = { // error : net.e = net.o - y val n = bc_cnn_layers.value.length val train_data3 = train_cnnff.map { f => val nn_e = f._4 - f._1 (f._1, f._2, f._3, f._4, nn_e) } // backprop deltas // 输出层的 灵敏度 或者 残差 // net.od = net.e .* (net.o .* (1 - net.o)) // net.fvd = (net.ffW' * net.od) val train_data4 = train_data3.map { f => val nn_e = f._5 val nn_o = f._4 val nn_fv = f._3 val nn_od = nn_e :* (nn_o :* (1.0 - nn_o)) val nn_fvd = if (bc_cnn_layers.value(n - 1).types == "c") { // net.fvd = net.fvd .* (net.fv .* (1 - net.fv)); val nn_fvd1 = bc_cnn_ffW.value.t * nn_od val nn_fvd2 = nn_fvd1 :* (nn_fv :* (1.0 - nn_fv)) nn_fvd2 } else { val nn_fvd1 = bc_cnn_ffW.value.t * nn_od nn_fvd1 } (f._1, f._2, f._3, f._4, f._5, nn_od, nn_fvd) } // reshape feature vector deltas into output map style val sa1 = train_data4.map(f => f._2(n - 1)(1)).take(1)(0).rows val sa2 = train_data4.map(f => f._2(n - 1)(1)).take(1)(0).cols val sa3 = 1 val fvnum = sa1 * sa2 val train_data5 = train_data4.map { f => val nn_a = f._2 val nn_fvd = f._7 val nn_od = f._6 val nn_fv = f._3 var nnd = new Array[Array[BDM[Double]]](n) val nnd1 = ArrayBuffer[BDM[Double]]() for (j <- 0 to nn_a(n - 1).length - 1) { val tmp1 = nn_fvd((j * fvnum) to ((j + 1) * fvnum - 1), 0) val tmp2 = new BDM(sa1, sa2, tmp1.data) nnd1 += tmp2 } nnd(n - 1) = nnd1.toArray for (l <- (n - 2) to 0 by -1) { val type1 = bc_cnn_layers.value(l).types var nnd2 = ArrayBuffer[BDM[Double]]() if (type1 == "c") { for (j <- 0 to nn_a(l).length - 1) { val tmp_a = nn_a(l)(j) val tmp_d = nnd(l + 1)(j) val tmp_scale = bc_cnn_layers.value(l + 1).scale.toInt val tmp1 = tmp_a :* (1.0 - tmp_a) val tmp2 = expand(tmp_d, Array(tmp_scale, tmp_scale)) / (tmp_scale.toDouble * tmp_scale) nnd2 += (tmp1 :* tmp2) } } else if (type1 == "s") { for (i <- 0 to nn_a(l).length - 1) { var z = BDM.zeros[Double](nn_a(l)(0).rows, nn_a(l)(0).cols) for (j <- 0 to nn_a(l + 1).length - 1) { // z = z + convn(net.layers{l + 1}.d{j}, rot180(net.layers{l + 1}.k{i}{j}), 'full'); z = z + convn(nnd(l + 1)(j), Rot90(Rot90(bc_cnn_layers.value(l + 1).k(i)(j))), "full") } nnd2 += z } } nnd(l) = nnd2.toArray } (f._1, f._2, f._3, f._4, f._5, f._6, f._7, nnd) } // dk db calc gradients var cnn_layers = bc_cnn_layers.value for (l <- 1 to n - 1) { val type1 = bc_cnn_layers.value(l).types val lena1 = train_data5.map(f => f._2(l).length).take(1)(0) val lena2 = train_data5.map(f => f._2(l - 1).length).take(1)(0) if (type1 == "c") { for (j <- 0 to lena1 - 1) { for (i <- 0 to lena2 - 1) { val rdd_dk_ij = train_data5.map { f => val nn_a = f._2 val nn_d = f._8 val tmp_d = nn_d(l)(j) val tmp_a = nn_a(l - 1)(i) convn(Rot90(Rot90(tmp_a)), tmp_d, "valid") } val initdk = BDM.zeros[Double](rdd_dk_ij.take(1)(0).rows, rdd_dk_ij.take(1)(0).cols) val (dk_ij, count_dk) = rdd_dk_ij.treeAggregate((initdk, 0L))( seqOp = (c, v) => { // c: (m, count), v: (m) val m1 = c._1 val m2 = m1 + v (m2, c._2 + 1) }, combOp = (c1, c2) => { // c: (m, count) val m1 = c1._1 val m2 = c2._1 val m3 = m1 + m2 (m3, c1._2 + c2._2) }) val dk = dk_ij / count_dk.toDouble cnn_layers(l).dk(i)(j) = dk } val rdd_db_j = train_data5.map { f => val nn_d = f._8 val tmp_d = nn_d(l)(j) Bsum(tmp_d) } val db_j = rdd_db_j.reduce(_ + _) val count_db = rdd_db_j.count val db = db_j / count_db.toDouble cnn_layers(l).db(j) = db } } } // net.dffW = net.od * (net.fv)' / size(net.od, 2); // net.dffb = mean(net.od, 2); val train_data6 = train_data5.map { f => val nn_od = f._6 val nn_fv = f._3 nn_od * nn_fv.t } val train_data7 = train_data5.map { f => val nn_od = f._6 nn_od } val initffW = BDM.zeros[Double](bc_cnn_ffW.value.rows, bc_cnn_ffW.value.cols) val (ffw2, countfffw2) = train_data6.treeAggregate((initffW, 0L))( seqOp = (c, v) => { // c: (m, count), v: (m) val m1 = c._1 val m2 = m1 + v (m2, c._2 + 1) }, combOp = (c1, c2) => { // c: (m, count) val m1 = c1._1 val m2 = c2._1 val m3 = m1 + m2 (m3, c1._2 + c2._2) }) val cnn_dffw = ffw2 / countfffw2.toDouble val initffb = BDM.zeros[Double](bc_cnn_ffb.value.rows, bc_cnn_ffb.value.cols) val (ffb2, countfffb2) = train_data7.treeAggregate((initffb, 0L))( seqOp = (c, v) => { // c: (m, count), v: (m) val m1 = c._1 val m2 = m1 + v (m2, c._2 + 1) }, combOp = (c1, c2) => { // c: (m, count) val m1 = c1._1 val m2 = c2._1 val m3 = m1 + m2 (m3, c1._2 + c2._2) }) val cnn_dffb = ffb2 / countfffb2.toDouble (train_data5, cnn_dffw, cnn_dffb, cnn_layers) } /** * NNapplygrads是权重更新 * 权重更新 */ def CNNapplygrads( train_cnnbp: (RDD[(BDM[Double], Array[Array[BDM[Double]]], BDM[Double], BDM[Double], BDM[Double], BDM[Double], BDM[Double], Array[Array[BDM[Double]]])], BDM[Double], BDM[Double], Array[CNNLayers]), bc_cnn_ffb: org.apache.spark.broadcast.Broadcast[BDM[Double]], bc_cnn_ffW: org.apache.spark.broadcast.Broadcast[BDM[Double]], alpha: Double): (BDM[Double], BDM[Double], Array[CNNLayers]) = { val train_data5 = train_cnnbp._1 val cnn_dffw = train_cnnbp._2 val cnn_dffb = train_cnnbp._3 var cnn_layers = train_cnnbp._4 var cnn_ffb = bc_cnn_ffb.value var cnn_ffW = bc_cnn_ffW.value val n = cnn_layers.length for (l <- 1 to n - 1) { val type1 = cnn_layers(l).types val lena1 = train_data5.map(f => f._2(l).length).take(1)(0) val lena2 = train_data5.map(f => f._2(l - 1).length).take(1)(0) if (type1 == "c") { for (j <- 0 to lena1 - 1) { for (ii <- 0 to lena2 - 1) { cnn_layers(l).k(ii)(j) = cnn_layers(l).k(ii)(j) - cnn_layers(l).dk(ii)(j) } cnn_layers(l).b(j) = cnn_layers(l).b(j) - cnn_layers(l).db(j) } } } cnn_ffW = cnn_ffW + cnn_dffw cnn_ffb = cnn_ffb + cnn_dffb (cnn_ffW, cnn_ffb, cnn_layers) } /** * nneval是进行前向传播并计算输出误差 * 计算神经网络中的每一个节点的输出值,并计算平均误差; */ def CNNeval( batch_xy1: RDD[(BDM[Double], BDM[Double])], bc_cnn_layers: org.apache.spark.broadcast.Broadcast[Array[CNNLayers]], bc_cnn_ffb: org.apache.spark.broadcast.Broadcast[BDM[Double]], bc_cnn_ffW: org.apache.spark.broadcast.Broadcast[BDM[Double]]): Double = { // CNNff是进行前向传播 val train_cnnff = CNN.CNNff(batch_xy1, bc_cnn_layers, bc_cnn_ffb, bc_cnn_ffW) // error and loss // 输出误差计算 val rdd_loss1 = train_cnnff.map { f => val nn_e = f._4 - f._1 nn_e } val (loss2, counte) = rdd_loss1.treeAggregate((0.0, 0L))( seqOp = (c, v) => { // c: (e, count), v: (m) val e1 = c._1 val e2 = (v :* v).sum val esum = e1 + e2 (esum, c._2 + 1) }, combOp = (c1, c2) => { // c: (e, count) val e1 = c1._1 val e2 = c2._1 val esum = e1 + e2 (esum, c1._2 + c2._2) }) val Loss = (loss2 / counte.toDouble) * 0.5 Loss }}
1.2 CNNModel代码
package CNNimport breeze.linalg.{ Matrix => BM, CSCMatrix => BSM, DenseMatrix => BDM, Vector => BV, DenseVector => BDV, SparseVector => BSV}import org.apache.spark.rdd.RDD/** * label:目标矩阵 * features:特征矩阵 * predict_label:预測矩阵 * error:误差 */case class PredictCNNLabel(label: BDM[Double], features: BDM[Double], predict_label: BDM[Double], error: BDM[Double]) extends Serializableclass CNNModel( val cnn_layers: Array[CNNLayers], val cnn_ffW: BDM[Double], val cnn_ffb: BDM[Double]) extends Serializable { /** * 返回预測结果 * 返回格式:(label, feature, predict_label, error) */ def predict(dataMatrix: RDD[(BDM[Double], BDM[Double])]): RDD[PredictCNNLabel] = { val sc = dataMatrix.sparkContext val bc_cnn_layers = sc.broadcast(cnn_layers) val bc_cnn_ffW = sc.broadcast(cnn_ffW) val bc_cnn_ffb = sc.broadcast(cnn_ffb) // CNNff是进行前向传播 val train_cnnff = CNN.CNNff(dataMatrix, bc_cnn_layers, bc_cnn_ffb, bc_cnn_ffW) val rdd_predict = train_cnnff.map { f => val label = f._1 val nna1 = f._2(0)(0) val nnan = f._4 val error = f._4 - f._1 PredictCNNLabel(label, nna1, nnan, error) } rdd_predict } /** * 计算输出误差 * 平均误差; */ def Loss(predict: RDD[PredictCNNLabel]): Double = { val predict1 = predict.map(f => f.error) // error and loss // 输出误差计算 val loss1 = predict1 val (loss2, counte) = loss1.treeAggregate((0.0, 0L))( seqOp = (c, v) => { // c: (e, count), v: (m) val e1 = c._1 val e2 = (v :* v).sum val esum = e1 + e2 (esum, c._2 + 1) }, combOp = (c1, c2) => { // c: (e, count) val e1 = c1._1 val e2 = c2._1 val esum = e1 + e2 (esum, c1._2 + c2._2) }) val Loss = (loss2 / counte.toDouble) * 0.5 Loss }}
转载请注明出处: