博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark MLlib Deep Learning Convolution Neural Network (深度学习-卷积神经网络)3.1
阅读量:4919 次
发布时间:2019-06-11

本文共 24992 字,大约阅读时间需要 83 分钟。

3、Spark MLlib Deep Learning Convolution Neural Network (深度学习-卷积神经网络)3.1

Spark MLlib Deep Learning工具箱,是依据现有深度学习教程《UFLDL教程》中的算法。在SparkMLlib中的实现。详细Spark MLlib Deep Learning(深度学习)文件夹结构:

第一章Neural Net(NN)

1、源代码

2、源代码解析

3、实例

第二章Deep Belief Nets(DBNs)

1、源代码

2、源代码解析

3、实例

第三章Convolution Neural Network(CNN)

1、源代码

2、源代码解析

3、实例

第四章 Stacked Auto-Encoders(SAE)

第五章CAE

第三章Convolution Neural Network (卷积神经网络)

1 源代码

眼下SparkMLlib Deep Learning工具箱源代码的github地址为:

1.1 CNN代码 

package CNNimport org.apache.spark._import org.apache.spark.SparkContext._import org.apache.spark.rdd.RDDimport org.apache.spark.Loggingimport org.apache.spark.mllib.regression.LabeledPointimport org.apache.spark.mllib.linalg._import org.apache.spark.mllib.linalg.distributed.RowMatriximport breeze.linalg.{  Matrix => BM,  CSCMatrix => BSM,  DenseMatrix => BDM,  Vector => BV,  DenseVector => BDV,  SparseVector => BSV,  axpy => brzAxpy,  svd => brzSvd,  accumulate => Accumulate,  rot90 => Rot90,  sum => Bsum}import breeze.numerics.{  exp => Bexp,  tanh => Btanh}import scala.collection.mutable.ArrayBufferimport java.util.Randomimport scala.math._/** * types:网络层类别 * outputmaps:特征map数量 * kernelsize:卷积核k大小 * k: 卷积核 * b: 偏置 * dk: 卷积核的偏导 * db: 偏置的偏导 * scale: pooling大小 */case class CNNLayers(  types: String,  outputmaps: Double,  kernelsize: Double,  scale: Double,  k: Array[Array[BDM[Double]]],  b: Array[Double],  dk: Array[Array[BDM[Double]]],  db: Array[Double]) extends Serializable/** * CNN(convolution neural network)卷积神经网络 */class CNN(  private var mapsize: BDM[Double],  private var types: Array[String],  private var layer: Int,  private var onum: Int,  private var outputmaps: Array[Double],  private var kernelsize: Array[Double],  private var scale: Array[Double],  private var alpha: Double,  private var batchsize: Double,  private var numepochs: Double) extends Serializable with Logging {//        var mapsize = new BDM(1, 2, Array(28.0, 28.0))//        var types = Array("i", "c", "s", "c", "s")//        var layer = 5//        var onum = 10  //        var outputmaps = Array(0.0, 6.0, 0.0, 12.0, 0.0)//        var kernelsize = Array(0.0, 5.0, 0.0, 5.0, 0.0)//        var scale = Array(0.0, 0.0, 2.0, 0.0, 2.0)//        var alpha = 1.0//        var batchsize = 50.0//        var numepochs = 1.0  def this() = this(new BDM(1, 2, Array(28.0, 28.0)),    Array("i", "c", "s", "c", "s"), 5, 10,    Array(0.0, 6.0, 0.0, 12.0, 0.0),    Array(0.0, 5.0, 0.0, 5.0, 0.0),    Array(0.0, 0.0, 2.0, 0.0, 2.0),    1.0, 50.0, 1.0)  /** 设置输入层大小. Default: [28, 28]. */  def setMapsize(mapsize: BDM[Double]): this.type = {    this.mapsize = mapsize    this  }  /** 设置网络层类别. Default: [1"i", "c", "s", "c", "s"]. */  def setTypes(types: Array[String]): this.type = {    this.types = types    this  }  /** 设置网络层数. Default: 5. */  def setLayer(layer: Int): this.type = {    this.layer = layer    this  }  /** 设置输出维度. Default: 10. */  def setOnum(onum: Int): this.type = {    this.onum = onum    this  }  /** 设置特征map数量. Default: [0.0, 6.0, 0.0, 12.0, 0.0]. */  def setOutputmaps(outputmaps: Array[Double]): this.type = {    this.outputmaps = outputmaps    this  }  /** 设置卷积核k大小. Default: [0.0, 5.0, 0.0, 5.0, 0.0]. */  def setKernelsize(kernelsize: Array[Double]): this.type = {    this.kernelsize = kernelsize    this  }  /** 设置scale大小. Default: [0.0, 0.0, 2.0, 0.0, 2.0]. */  def setScale(scale: Array[Double]): this.type = {    this.scale = scale    this  }  /** 设置学习因子. Default: 1. */  def setAlpha(alpha: Double): this.type = {    this.alpha = alpha    this  }  /** 设置迭代大小. Default: 50. */  def setBatchsize(batchsize: Double): this.type = {    this.batchsize = batchsize    this  }  /** 设置迭代次数. Default: 1. */  def setNumepochs(numepochs: Double): this.type = {    this.numepochs = numepochs    this  }  /** 卷积神经网络层參数初始化. */  def CnnSetup: (Array[CNNLayers], BDM[Double], BDM[Double], Double) = {    var inputmaps1 = 1.0    var mapsize1 = mapsize    var confinit = ArrayBuffer[CNNLayers]()    for (l <- 0 to layer - 1) { // layer      val type1 = types(l)      val outputmap1 = outputmaps(l)      val kernelsize1 = kernelsize(l)      val scale1 = scale(l)      val layersconf = if (type1 == "s") { // 每一层參数初始化        mapsize1 = mapsize1 / scale1        val b1 = Array.fill(inputmaps1.toInt)(0.0)        val ki = Array(Array(BDM.zeros[Double](1, 1)))        new CNNLayers(type1, outputmap1, kernelsize1, scale1, ki, b1, ki, b1)      } else if (type1 == "c") {        mapsize1 = mapsize1 - kernelsize1 + 1.0        val fan_out = outputmap1 * math.pow(kernelsize1, 2)        val fan_in = inputmaps1 * math.pow(kernelsize1, 2)        val ki = ArrayBuffer[Array[BDM[Double]]]()        for (i <- 0 to inputmaps1.toInt - 1) { // input map          val kj = ArrayBuffer[BDM[Double]]()          for (j <- 0 to outputmap1.toInt - 1) { // output map                      val kk = (BDM.rand[Double](kernelsize1.toInt, kernelsize1.toInt) - 0.5) * 2.0 * sqrt(6.0 / (fan_in + fan_out))            kj += kk          }          ki += kj.toArray        }        val b1 = Array.fill(outputmap1.toInt)(0.0)        inputmaps1 = outputmap1        new CNNLayers(type1, outputmap1, kernelsize1, scale1, ki.toArray, b1, ki.toArray, b1)      } else {        val ki = Array(Array(BDM.zeros[Double](1, 1)))        val b1 = Array(0.0)        new CNNLayers(type1, outputmap1, kernelsize1, scale1, ki, b1, ki, b1)      }      confinit += layersconf    }    val fvnum = mapsize1(0, 0) * mapsize1(0, 1) * inputmaps1    val ffb = BDM.zeros[Double](onum, 1)    val ffW = (BDM.rand[Double](onum, fvnum.toInt) - 0.5) * 2.0 * sqrt(6.0 / (onum + fvnum))    (confinit.toArray, ffb, ffW, alpha)  }  /**   * 执行卷积神经网络算法.   */  def CNNtrain(train_d: RDD[(BDM[Double], BDM[Double])], opts: Array[Double]): CNNModel = {    val sc = train_d.sparkContext    var initStartTime = System.currentTimeMillis()    var initEndTime = System.currentTimeMillis()    // 參数初始化配置    var (cnn_layers, cnn_ffb, cnn_ffW, cnn_alpha) = CnnSetup    // 样本数据划分:训练数据、交叉检验数据    val validation = opts(2)    val splitW1 = Array(1.0 - validation, validation)    val train_split1 = train_d.randomSplit(splitW1, System.nanoTime())    val train_t = train_split1(0)    val train_v = train_split1(1)    // m:训练样本的数量    val m = train_t.count    // 计算batch的数量    val batchsize = opts(0).toInt    val numepochs = opts(1).toInt    val numbatches = (m / batchsize).toInt    var rL = Array.fill(numepochs * numbatches.toInt)(0.0)    var n = 0    // numepochs是循环的次数     for (i <- 1 to numepochs) {      initStartTime = System.currentTimeMillis()      val splitW2 = Array.fill(numbatches)(1.0 / numbatches)      // 依据分组权重,随机划分每组样本数据        for (l <- 1 to numbatches) {        // 权重         val bc_cnn_layers = sc.broadcast(cnn_layers)        val bc_cnn_ffb = sc.broadcast(cnn_ffb)        val bc_cnn_ffW = sc.broadcast(cnn_ffW)        // 样本划分        val train_split2 = train_t.randomSplit(splitW2, System.nanoTime())        val batch_xy1 = train_split2(l - 1)        // CNNff是进行前向传播        // net = cnnff(net, batch_x);        val train_cnnff = CNN.CNNff(batch_xy1, bc_cnn_layers, bc_cnn_ffb, bc_cnn_ffW)        // CNNbp是后向传播        // net = cnnbp(net, batch_y);        val train_cnnbp = CNN.CNNbp(train_cnnff, bc_cnn_layers, bc_cnn_ffb, bc_cnn_ffW)        // 权重更新        //  net = cnnapplygrads(net, opts);         val train_nnapplygrads = CNN.CNNapplygrads(train_cnnbp, bc_cnn_ffb, bc_cnn_ffW, cnn_alpha)        cnn_ffW = train_nnapplygrads._1        cnn_ffb = train_nnapplygrads._2        cnn_layers = train_nnapplygrads._3        // error and loss        // 输出误差计算        // net.L = 1/2* sum(net.e(:) .^ 2) / size(net.e, 2);        val rdd_loss1 = train_cnnbp._1.map(f => f._5)        val (loss2, counte) = rdd_loss1.treeAggregate((0.0, 0L))(          seqOp = (c, v) => {            // c: (e, count), v: (m)            val e1 = c._1            val e2 = (v :* v).sum            val esum = e1 + e2            (esum, c._2 + 1)          },          combOp = (c1, c2) => {            // c: (e, count)            val e1 = c1._1            val e2 = c2._1            val esum = e1 + e2            (esum, c1._2 + c2._2)          })        val Loss = (loss2 / counte.toDouble) * 0.5        if (n == 0) {          rL(n) = Loss        } else {          rL(n) = 0.09 * rL(n - 1) + 0.01 * Loss        }        n = n + 1      }      initEndTime = System.currentTimeMillis()      // 打印输出结果      printf("epoch: numepochs = %d , Took = %d seconds; batch train mse = %f.\n", i, scala.math.ceil((initEndTime - initStartTime).toDouble / 1000).toLong, rL(n - 1))    }    // 计算训练误差及交叉检验误差    // Full-batch train mse    var loss_train_e = 0.0    var loss_val_e = 0.0    loss_train_e = CNN.CNNeval(train_t, sc.broadcast(cnn_layers), sc.broadcast(cnn_ffb), sc.broadcast(cnn_ffW))    if (validation > 0) loss_val_e = CNN.CNNeval(train_v, sc.broadcast(cnn_layers), sc.broadcast(cnn_ffb), sc.broadcast(cnn_ffW))    printf("epoch: Full-batch train mse = %f, val mse = %f.\n", loss_train_e, loss_val_e)    new CNNModel(cnn_layers, cnn_ffW, cnn_ffb)  }}/** * NN(neural network) */object CNN extends Serializable {  // Initialization mode names  /**   * sigm激活函数   * X = 1./(1+exp(-P));   */  def sigm(matrix: BDM[Double]): BDM[Double] = {    val s1 = 1.0 / (Bexp(matrix * (-1.0)) + 1.0)    s1  }  /**   * tanh激活函数   * f=1.7159*tanh(2/3.*A);   */  def tanh_opt(matrix: BDM[Double]): BDM[Double] = {    val s1 = Btanh(matrix * (2.0 / 3.0)) * 1.7159    s1  }  /**   * 克罗内克积   *   */  def expand(a: BDM[Double], s: Array[Int]): BDM[Double] = {    // val a = BDM((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))    // val s = Array(3, 2)    val sa = Array(a.rows, a.cols)    var tt = new Array[Array[Int]](sa.length)    for (ii <- sa.length - 1 to 0 by -1) {      var h = BDV.zeros[Int](sa(ii) * s(ii))      h(0 to sa(ii) * s(ii) - 1 by s(ii)) := 1      tt(ii) = Accumulate(h).data    }    var b = BDM.zeros[Double](tt(0).length, tt(1).length)    for (j1 <- 0 to b.rows - 1) {      for (j2 <- 0 to b.cols - 1) {        b(j1, j2) = a(tt(0)(j1) - 1, tt(1)(j2) - 1)      }    }    b  }  /**   * convn卷积计算   */  def convn(m0: BDM[Double], k0: BDM[Double], shape: String): BDM[Double] = {    //val m0 = BDM((1.0, 1.0, 1.0, 1.0), (0.0, 0.0, 1.0, 1.0), (0.0, 1.0, 1.0, 0.0), (0.0, 1.0, 1.0, 0.0))    //val k0 = BDM((1.0, 1.0), (0.0, 1.0))    //val m0 = BDM((1.0, 1.0, 1.0), (1.0, 1.0, 1.0), (1.0, 1.0, 1.0))    //val k0 = BDM((1.0, 2.0, 3.0), (4.0, 5.0, 6.0), (7.0, 8.0, 9.0))        val out1 = shape match {      case "valid" =>        val m1 = m0        val k1 = k0.t        val row1 = m1.rows - k1.rows + 1        val col1 = m1.cols - k1.cols + 1        var m2 = BDM.zeros[Double](row1, col1)        for (i <- 0 to row1 - 1) {          for (j <- 0 to col1 - 1) {            val r1 = i            val r2 = r1 + k1.rows - 1            val c1 = j            val c2 = c1 + k1.cols - 1            val mi = m1(r1 to r2, c1 to c2)            m2(i, j) = (mi :* k1).sum          }        }        m2      case "full" =>        var m1 = BDM.zeros[Double](m0.rows + 2 * (k0.rows - 1), m0.cols + 2 * (k0.cols - 1))        for (i <- 0 to m0.rows - 1) {          for (j <- 0 to m0.cols - 1) {            m1((k0.rows - 1) + i, (k0.cols - 1) + j) = m0(i, j)          }        }        val k1 = Rot90(Rot90(k0))        val row1 = m1.rows - k1.rows + 1        val col1 = m1.cols - k1.cols + 1        var m2 = BDM.zeros[Double](row1, col1)        for (i <- 0 to row1 - 1) {          for (j <- 0 to col1 - 1) {            val r1 = i            val r2 = r1 + k1.rows - 1            val c1 = j            val c2 = c1 + k1.cols - 1            val mi = m1(r1 to r2, c1 to c2)            m2(i, j) = (mi :* k1).sum          }        }        m2    }    out1  }  /**   * cnnff是进行前向传播   * 计算神经网络中的每一个节点的输出值;   */  def CNNff(    batch_xy1: RDD[(BDM[Double], BDM[Double])],    bc_cnn_layers: org.apache.spark.broadcast.Broadcast[Array[CNNLayers]],    bc_cnn_ffb: org.apache.spark.broadcast.Broadcast[BDM[Double]],    bc_cnn_ffW: org.apache.spark.broadcast.Broadcast[BDM[Double]]): RDD[(BDM[Double], Array[Array[BDM[Double]]], BDM[Double], BDM[Double])] = {    // 第1层:a(1)=[x]    val train_data1 = batch_xy1.map { f =>      val lable = f._1      val features = f._2      val nna1 = Array(features)      val nna = ArrayBuffer[Array[BDM[Double]]]()      nna += nna1      (lable, nna)    }    // 第2至n-1层计算    val train_data2 = train_data1.map { f =>      val lable = f._1      val nn_a = f._2      var inputmaps1 = 1.0      val n = bc_cnn_layers.value.length      // for each layer      for (l <- 1 to n - 1) {        val type1 = bc_cnn_layers.value(l).types        val outputmap1 = bc_cnn_layers.value(l).outputmaps        val kernelsize1 = bc_cnn_layers.value(l).kernelsize        val scale1 = bc_cnn_layers.value(l).scale        val k1 = bc_cnn_layers.value(l).k        val b1 = bc_cnn_layers.value(l).b        val nna1 = ArrayBuffer[BDM[Double]]()        if (type1 == "c") {          for (j <- 0 to outputmap1.toInt - 1) { // output map             // create temp output map            var z = BDM.zeros[Double](nn_a(l - 1)(0).rows - kernelsize1.toInt + 1, nn_a(l - 1)(0).cols - kernelsize1.toInt + 1)            for (i <- 0 to inputmaps1.toInt - 1) { // input map              // convolve with corresponding kernel and add to temp output map              // z = z + convn(net.layers{l - 1}.a{i}, net.layers{l}.k{i}{j}, 'valid');              z = z + convn(nn_a(l - 1)(i), k1(i)(j), "valid")            }            // add bias, pass through nonlinearity            // net.layers{l}.a{j} = sigm(z + net.layers{l}.b{j})            val nna0 = sigm(z + b1(j))            nna1 += nna0          }          nn_a += nna1.toArray          inputmaps1 = outputmap1        } else if (type1 == "s") {          for (j <- 0 to inputmaps1.toInt - 1) {            // z = convn(net.layers{l - 1}.a{j}, ones(net.layers{l}.scale) / (net.layers{l}.scale ^ 2), 'valid'); replace with variable            // net.layers{l}.a{j} = z(1 : net.layers{l}.scale : end, 1 : net.layers{l}.scale : end, :);            val z = convn(nn_a(l - 1)(j), BDM.ones[Double](scale1.toInt, scale1.toInt) / (scale1 * scale1), "valid")            val zs1 = z(::, 0 to -1 by scale1.toInt).t + 0.0            val zs2 = zs1(::, 0 to -1 by scale1.toInt).t + 0.0            val nna0 = zs2            nna1 += nna0          }          nn_a += nna1.toArray        }      }      // concatenate all end layer feature maps into vector      val nn_fv1 = ArrayBuffer[Double]()      for (j <- 0 to nn_a(n - 1).length - 1) {        nn_fv1 ++= nn_a(n - 1)(j).data      }      val nn_fv = new BDM[Double](nn_fv1.length, 1, nn_fv1.toArray)      // feedforward into output perceptrons      // net.o = sigm(net.ffW * net.fv + repmat(net.ffb, 1, size(net.fv, 2)));      val nn_o = sigm(bc_cnn_ffW.value * nn_fv + bc_cnn_ffb.value)      (lable, nn_a.toArray, nn_fv, nn_o)    }    train_data2  }  /**   * CNNbp是后向传播   * 计算权重的平均偏导数   */  def CNNbp(    train_cnnff: RDD[(BDM[Double], Array[Array[BDM[Double]]], BDM[Double], BDM[Double])],    bc_cnn_layers: org.apache.spark.broadcast.Broadcast[Array[CNNLayers]],    bc_cnn_ffb: org.apache.spark.broadcast.Broadcast[BDM[Double]],    bc_cnn_ffW: org.apache.spark.broadcast.Broadcast[BDM[Double]]): (RDD[(BDM[Double], Array[Array[BDM[Double]]], BDM[Double], BDM[Double], BDM[Double], BDM[Double], BDM[Double], Array[Array[BDM[Double]]])], BDM[Double], BDM[Double], Array[CNNLayers]) = {    // error : net.e = net.o - y    val n = bc_cnn_layers.value.length    val train_data3 = train_cnnff.map { f =>      val nn_e = f._4 - f._1      (f._1, f._2, f._3, f._4, nn_e)    }    // backprop deltas    // 输出层的 灵敏度 或者 残差    // net.od = net.e .* (net.o .* (1 - net.o))    // net.fvd = (net.ffW' * net.od)    val train_data4 = train_data3.map { f =>      val nn_e = f._5      val nn_o = f._4      val nn_fv = f._3      val nn_od = nn_e :* (nn_o :* (1.0 - nn_o))      val nn_fvd = if (bc_cnn_layers.value(n - 1).types == "c") {        // net.fvd = net.fvd .* (net.fv .* (1 - net.fv));        val nn_fvd1 = bc_cnn_ffW.value.t * nn_od        val nn_fvd2 = nn_fvd1 :* (nn_fv :* (1.0 - nn_fv))        nn_fvd2      } else {        val nn_fvd1 = bc_cnn_ffW.value.t * nn_od        nn_fvd1      }      (f._1, f._2, f._3, f._4, f._5, nn_od, nn_fvd)    }    // reshape feature vector deltas into output map style    val sa1 = train_data4.map(f => f._2(n - 1)(1)).take(1)(0).rows    val sa2 = train_data4.map(f => f._2(n - 1)(1)).take(1)(0).cols    val sa3 = 1    val fvnum = sa1 * sa2    val train_data5 = train_data4.map { f =>      val nn_a = f._2      val nn_fvd = f._7      val nn_od = f._6      val nn_fv = f._3      var nnd = new Array[Array[BDM[Double]]](n)      val nnd1 = ArrayBuffer[BDM[Double]]()      for (j <- 0 to nn_a(n - 1).length - 1) {        val tmp1 = nn_fvd((j * fvnum) to ((j + 1) * fvnum - 1), 0)        val tmp2 = new BDM(sa1, sa2, tmp1.data)        nnd1 += tmp2      }      nnd(n - 1) = nnd1.toArray      for (l <- (n - 2) to 0 by -1) {        val type1 = bc_cnn_layers.value(l).types        var nnd2 = ArrayBuffer[BDM[Double]]()        if (type1 == "c") {          for (j <- 0 to nn_a(l).length - 1) {            val tmp_a = nn_a(l)(j)            val tmp_d = nnd(l + 1)(j)            val tmp_scale = bc_cnn_layers.value(l + 1).scale.toInt            val tmp1 = tmp_a :* (1.0 - tmp_a)            val tmp2 = expand(tmp_d, Array(tmp_scale, tmp_scale)) / (tmp_scale.toDouble * tmp_scale)            nnd2 += (tmp1 :* tmp2)          }        } else if (type1 == "s") {          for (i <- 0 to nn_a(l).length - 1) {            var z = BDM.zeros[Double](nn_a(l)(0).rows, nn_a(l)(0).cols)            for (j <- 0 to nn_a(l + 1).length - 1) {              // z = z + convn(net.layers{l + 1}.d{j}, rot180(net.layers{l + 1}.k{i}{j}), 'full');              z = z + convn(nnd(l + 1)(j), Rot90(Rot90(bc_cnn_layers.value(l + 1).k(i)(j))), "full")            }            nnd2 += z          }        }        nnd(l) = nnd2.toArray      }      (f._1, f._2, f._3, f._4, f._5, f._6, f._7, nnd)    }    // dk db calc gradients    var cnn_layers = bc_cnn_layers.value    for (l <- 1 to n - 1) {      val type1 = bc_cnn_layers.value(l).types      val lena1 = train_data5.map(f => f._2(l).length).take(1)(0)      val lena2 = train_data5.map(f => f._2(l - 1).length).take(1)(0)      if (type1 == "c") {        for (j <- 0 to lena1 - 1) {          for (i <- 0 to lena2 - 1) {            val rdd_dk_ij = train_data5.map { f =>              val nn_a = f._2              val nn_d = f._8              val tmp_d = nn_d(l)(j)              val tmp_a = nn_a(l - 1)(i)              convn(Rot90(Rot90(tmp_a)), tmp_d, "valid")            }            val initdk = BDM.zeros[Double](rdd_dk_ij.take(1)(0).rows, rdd_dk_ij.take(1)(0).cols)            val (dk_ij, count_dk) = rdd_dk_ij.treeAggregate((initdk, 0L))(              seqOp = (c, v) => {                // c: (m, count), v: (m)                val m1 = c._1                val m2 = m1 + v                (m2, c._2 + 1)              },              combOp = (c1, c2) => {                // c: (m, count)                val m1 = c1._1                val m2 = c2._1                val m3 = m1 + m2                (m3, c1._2 + c2._2)              })            val dk = dk_ij / count_dk.toDouble            cnn_layers(l).dk(i)(j) = dk          }          val rdd_db_j = train_data5.map { f =>            val nn_d = f._8            val tmp_d = nn_d(l)(j)            Bsum(tmp_d)          }          val db_j = rdd_db_j.reduce(_ + _)          val count_db = rdd_db_j.count          val db = db_j / count_db.toDouble          cnn_layers(l).db(j) = db        }      }    }    // net.dffW = net.od * (net.fv)' / size(net.od, 2);    // net.dffb = mean(net.od, 2);    val train_data6 = train_data5.map { f =>      val nn_od = f._6      val nn_fv = f._3      nn_od * nn_fv.t    }    val train_data7 = train_data5.map { f =>      val nn_od = f._6      nn_od    }    val initffW = BDM.zeros[Double](bc_cnn_ffW.value.rows, bc_cnn_ffW.value.cols)    val (ffw2, countfffw2) = train_data6.treeAggregate((initffW, 0L))(      seqOp = (c, v) => {        // c: (m, count), v: (m)        val m1 = c._1        val m2 = m1 + v        (m2, c._2 + 1)      },      combOp = (c1, c2) => {        // c: (m, count)        val m1 = c1._1        val m2 = c2._1        val m3 = m1 + m2        (m3, c1._2 + c2._2)      })    val cnn_dffw = ffw2 / countfffw2.toDouble    val initffb = BDM.zeros[Double](bc_cnn_ffb.value.rows, bc_cnn_ffb.value.cols)    val (ffb2, countfffb2) = train_data7.treeAggregate((initffb, 0L))(      seqOp = (c, v) => {        // c: (m, count), v: (m)        val m1 = c._1        val m2 = m1 + v        (m2, c._2 + 1)      },      combOp = (c1, c2) => {        // c: (m, count)        val m1 = c1._1        val m2 = c2._1        val m3 = m1 + m2        (m3, c1._2 + c2._2)      })    val cnn_dffb = ffb2 / countfffb2.toDouble    (train_data5, cnn_dffw, cnn_dffb, cnn_layers)  }  /**   * NNapplygrads是权重更新   * 权重更新   */  def CNNapplygrads(    train_cnnbp: (RDD[(BDM[Double], Array[Array[BDM[Double]]], BDM[Double], BDM[Double], BDM[Double], BDM[Double], BDM[Double], Array[Array[BDM[Double]]])], BDM[Double], BDM[Double], Array[CNNLayers]),    bc_cnn_ffb: org.apache.spark.broadcast.Broadcast[BDM[Double]],    bc_cnn_ffW: org.apache.spark.broadcast.Broadcast[BDM[Double]],    alpha: Double): (BDM[Double], BDM[Double], Array[CNNLayers]) = {    val train_data5 = train_cnnbp._1    val cnn_dffw = train_cnnbp._2    val cnn_dffb = train_cnnbp._3    var cnn_layers = train_cnnbp._4    var cnn_ffb = bc_cnn_ffb.value    var cnn_ffW = bc_cnn_ffW.value    val n = cnn_layers.length    for (l <- 1 to n - 1) {      val type1 = cnn_layers(l).types      val lena1 = train_data5.map(f => f._2(l).length).take(1)(0)      val lena2 = train_data5.map(f => f._2(l - 1).length).take(1)(0)      if (type1 == "c") {        for (j <- 0 to lena1 - 1) {          for (ii <- 0 to lena2 - 1) {            cnn_layers(l).k(ii)(j) = cnn_layers(l).k(ii)(j) - cnn_layers(l).dk(ii)(j)          }          cnn_layers(l).b(j) = cnn_layers(l).b(j) - cnn_layers(l).db(j)        }      }    }    cnn_ffW = cnn_ffW + cnn_dffw    cnn_ffb = cnn_ffb + cnn_dffb    (cnn_ffW, cnn_ffb, cnn_layers)  }  /**   * nneval是进行前向传播并计算输出误差   * 计算神经网络中的每一个节点的输出值,并计算平均误差;   */  def CNNeval(    batch_xy1: RDD[(BDM[Double], BDM[Double])],    bc_cnn_layers: org.apache.spark.broadcast.Broadcast[Array[CNNLayers]],    bc_cnn_ffb: org.apache.spark.broadcast.Broadcast[BDM[Double]],    bc_cnn_ffW: org.apache.spark.broadcast.Broadcast[BDM[Double]]): Double = {    // CNNff是进行前向传播        val train_cnnff = CNN.CNNff(batch_xy1, bc_cnn_layers, bc_cnn_ffb, bc_cnn_ffW)    // error and loss    // 输出误差计算    val rdd_loss1 = train_cnnff.map { f =>      val nn_e = f._4 - f._1      nn_e    }    val (loss2, counte) = rdd_loss1.treeAggregate((0.0, 0L))(      seqOp = (c, v) => {        // c: (e, count), v: (m)        val e1 = c._1        val e2 = (v :* v).sum        val esum = e1 + e2        (esum, c._2 + 1)      },      combOp = (c1, c2) => {        // c: (e, count)        val e1 = c1._1        val e2 = c2._1        val esum = e1 + e2        (esum, c1._2 + c2._2)      })    val Loss = (loss2 / counte.toDouble) * 0.5    Loss  }}

1.2 CNNModel代码 

package CNNimport breeze.linalg.{  Matrix => BM,  CSCMatrix => BSM,  DenseMatrix => BDM,  Vector => BV,  DenseVector => BDV,  SparseVector => BSV}import org.apache.spark.rdd.RDD/** * label:目标矩阵 * features:特征矩阵 * predict_label:预測矩阵 * error:误差 */case class PredictCNNLabel(label: BDM[Double], features: BDM[Double], predict_label: BDM[Double], error: BDM[Double]) extends Serializableclass CNNModel(  val cnn_layers: Array[CNNLayers],  val cnn_ffW: BDM[Double],  val cnn_ffb: BDM[Double]) extends Serializable {  /**   * 返回预測结果   *  返回格式:(label, feature,  predict_label, error)   */  def predict(dataMatrix: RDD[(BDM[Double], BDM[Double])]): RDD[PredictCNNLabel] = {    val sc = dataMatrix.sparkContext    val bc_cnn_layers = sc.broadcast(cnn_layers)    val bc_cnn_ffW = sc.broadcast(cnn_ffW)    val bc_cnn_ffb = sc.broadcast(cnn_ffb)    // CNNff是进行前向传播    val train_cnnff = CNN.CNNff(dataMatrix, bc_cnn_layers, bc_cnn_ffb, bc_cnn_ffW)    val rdd_predict = train_cnnff.map { f =>      val label = f._1      val nna1 = f._2(0)(0)      val nnan = f._4      val error = f._4 - f._1      PredictCNNLabel(label, nna1, nnan, error)    }    rdd_predict  }  /**   * 计算输出误差   * 平均误差;   */  def Loss(predict: RDD[PredictCNNLabel]): Double = {    val predict1 = predict.map(f => f.error)    // error and loss    // 输出误差计算    val loss1 = predict1    val (loss2, counte) = loss1.treeAggregate((0.0, 0L))(      seqOp = (c, v) => {        // c: (e, count), v: (m)        val e1 = c._1        val e2 = (v :* v).sum        val esum = e1 + e2        (esum, c._2 + 1)      },      combOp = (c1, c2) => {        // c: (e, count)        val e1 = c1._1        val e2 = c2._1        val esum = e1 + e2        (esum, c1._2 + c2._2)      })    val Loss = (loss2 / counte.toDouble) * 0.5    Loss  }}

转载请注明出处:

 

转载于:https://www.cnblogs.com/mfrbuaa/p/5369849.html

你可能感兴趣的文章
Linux运维:安装CentOS7图解
查看>>
CSS隐藏页面元素的方法
查看>>
B. Mike and Feet Codeforces Round #305 (Div. 1) (并查集)
查看>>
学习Python遇到的那些坑
查看>>
目录大纲
查看>>
Configure Database Mirroring
查看>>
Angular 学习笔记(三)
查看>>
哈尔滨理工大学2016新生赛H题
查看>>
Android开发EditText属性
查看>>
String StringBuffer StringBuilder
查看>>
ZBrush中Flatten展平笔刷介绍
查看>>
深度学习的代码框架
查看>>
2017年06月30号课堂笔记
查看>>
浏览器兼容问题
查看>>
实时的.NET程序错误监控产品Exceptionless开源了
查看>>
PAT 甲级 1104 sum of Number Segments
查看>>
oracle表空间创建与用户授权
查看>>
使用用Generic.xaml加载默认的主题资源
查看>>
JavaScript高级编程——Array数组迭代(every()、filter()、foreach()、map()、some(),归并(reduce() 和reduceRight() ))...
查看>>
poj 3070 Fibonacci 矩阵快速幂
查看>>