本篇实现了spark上softmax算法,算法原理参考:http://deeplearning.stanford.edu/wiki/index.php/Softmax_Regression
训练样本采用稀疏形势表达,因此实现了2个类SparserVector和DenseMatrix分别表示稀疏向量和密集矩阵。
完整代码没有贴出,可以在这里下载http://download.csdn.net/detail/liangliang8086/6731397
主要代码:
package spark.ml.classification
import java.util.Random
import scala.collection.mutable.HashMap
import scala.io.Source
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD;
import org.apache.spark.util.Vector
import java.lang.Math
import org.apache.spark.broadcast.Broadcast
import spark.ml.utils.SparserVector
import spark.ml.utils.DenseMatrix
object SparseSoftmax {
val labelNum = 10; // 类别数
val dimNum = 781; // 维度
val iteration = 20; // 迭代次数
val alpha = 0.1 // 迭代步长
val lambda = 0.1
val rand = new Random(42)
/**
* 定义一个数据点
*/
case class DataPoint(x: SparserVector, y: Int)
/**
* 解析一个训练样本,构造DataPoint结构
* @param 训练样本
*/
def parsePoint(line: String): DataPoint = {
var features = new SparserVector(dimNum)
val fields = line.split(" ")
val y = fields(0).toInt
fields.filter(_.contains(":")).foreach(f => {
val feature = f.split(":")
features.insert(feature(0).toInt, feature(1).toDouble)
})
DataPoint(features, y)
}
/**
* exp(w dot x)
*/
def prob(x: SparserVector, w: Array[Double]): Double = {
val features = x.elements
val s = features.keySet.map(k => w(k) * features.get(k).get).reduce(_ + _)
Math.exp(s)
}
/**
* 计算一个样本属于每个类的概率
* */
def h(x: SparserVector, w: DenseMatrix): Array[Double] = {
val matrix = w.matrix
val probs = matrix.map(m => prob(x, m))
val sum = probs.reduce(_+_)
probs.map(p=>p/sum).toArray
}
/**
* 计算梯度
*/
def gradient(p: DataPoint, wb: Broadcast[DenseMatrix]): DenseMatrix = {
val w = wb.value
val rowNum = w.rows
val columnNum = w.columns
var g = new DenseMatrix(rowNum, columnNum)
for (j <- 1 to labelNum) {
val y = if (p.y == j) 1 else 0
val v = p.x * (y - prob(p.x, w(j - 1))) //here v is Vector, however g(j) is Array[Double], any better way?
for (k <- 0 until columnNum)
g(j-1)(k) = v(k)
}
g
}
/**
* 根据样本训练参数
*/
def train(sc: SparkContext, dataPoints: RDD[DataPoint]): DenseMatrix = {
var w = new DenseMatrix(labelNum, dimNum).rand
val wb = sc.broadcast(w)
//开始迭代
for (i <- 0 until iteration) {
val g = dataPoints.map(p => gradient(p, wb)).reduce(_ + _) + w * lambda
w -= g * alpha
}
w
}
/**
* 预测分类
* */
def predict(w: DenseMatrix, dataPoints: RDD[DataPoint]): Double = {
def label(probs: Array[Double]): Int = {
var y = 1
var maxp = 0.0
for (i <- 0 until probs.length) {
if (probs(i) > maxp) {
maxp = probs(i)
y = i + 1
}
}
y
}
val correct = dataPoints.map(p => {
val y = label(h(p.x, w))
if (y == p.y) 1 else 0
}).reduce(_ + _)
(correct * 1.0) / dataPoints.count
}
def main(args: Array[String]): Unit = {
val trainfile = "data/mnist/mnist";
val sc = new SparkContext("local", "Softmax")
val trainset = sc.textFile(trainfile, 2).map(parsePoint).cache
val w = train(sc, trainset)
val testfile = "data/mnist/mnist.t";
val testset = sc.textFile(testfile, 2).map(parsePoint).cache
val accuracy = predict(w, testset)
println(accuracy)
}
}