1. 原理和理论基础(参考)
2. Spark代码实例:
1)windows 单机
import org.apache.spark.mllib.classification.NaiveBayes
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.{SparkConf, SparkContext}object local_NaiveBayes { System.setProperty("hadoop.dir.home","E:/zhuangji/winutil/") def main(args:Array[String]) {
val conf = new SparkConf().setMaster("local[2]").setAppName("NaiveBayes")
val sc = new SparkContext(conf) //initiated data and labeled
val data = sc.textFile("E:/Java_WS/ScalaDemo/data/sample_naive_bayes_data.txt")
val parsedData = data.map {
line =>
val parts = line.split(',')
LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split( ' ').map(_.toDouble)) )
} // split data
val splits=parsedData.randomSplit(Array(0.6,0.4),seed=11L)
val training=splits(0)
val test=splits(1) //model and calculated precision & accuracy
val model=NaiveBayes.train(training,lambda=1.0,modelType="multinomial") val predictionAndLabel=test.map(p=>(model.predict(p.features),p.label))
val accuracy=1.0*predictionAndLabel.filter(x=>x._1==x._2).count()/test.count() //save and load model
model.save(sc,"E:/Spark/models/NaiveBayes")
val sameModel=NaiveBayesModel.load(sc,"E:/Spark/models/NaiveBayes")
}}
2)集群模式
需要打包,然后通过spark-submit 提交到yarn client或者cluster中:
spark-submit –class myNaiveBayes –master yarn ScalaDemo.jar
import org.apache.spark.mllib.classification.{NaiveBayesModel, NaiveBayes}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.{SparkConf, SparkContext}object myNaiveBayes { def main(args:Array[String]) { val conf = new SparkConf().setAppName("NaiveBayes")
val sc = new SparkContext(conf) //initiated data and labeled
val data = sc.textFile("hdfs://nameservice1/user/hive/spark/data/sample_naive_bayes_data.txt")
val parsedData = data.map {
line =>
val parts = line.split(',')
LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split( ' ').map(_.toDouble)) )
} // split data
val splits=parsedData.randomSplit(Array(0.6,0.4),seed=11L)
val training=splits(0)
val test=splits(1) //model and calculated precision & accuracy
val model=NaiveBayes.train(training,lambda=1.0,modelType="multinomial") val predictionAndLabel=test.map(p=>(model.predict(p.features),p.label))
val accuracy=1.0*predictionAndLabel.filter(x=>x._1==x._2).count()/test.count() //save and load model
model.save(sc,"hdfs://nameservice1/user/hive/spark/NaiveBayes/model")
val sameModel=NaiveBayesModel.load(sc,"hdfs://nameservice1/user/hive/spark/NaiveBayes/model")
}}
3)pyspark 代码实例
可以直接利用spark-submit提交,但注意无法到集群(cluster模式目前不支持独立集群、 mesos集群以及python应用程序)
spark-submit pyNaiveBayes.py
#-*- coding:utf-8 -*-
from pyspark.mllib.classification import NaiveBayes,NaiveBayesModel
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark import SparkContextif __name__=="__main__":
sc=SparkContext(appName="PythonPi") def parseLine(line):
parts=line.split(',')
label=float(parts[0])
features=Vectors.dense([float(x) for x in parts[1].split(' ')])
return LabeledPoint(label,features)
data=sc.textFile("hdfs://nameservice1/user/hive/spark/data/sample_naive_bayes_data.txt").map(parseLine) training,test=data.randomSplit([0.6,0.4],seed=0)
model=NaiveBayes.train(training,1.0) predictionAndLabel=test.map(lambda p:(model.predict(p.features),p.label))
accuracy=1.0*predictionAndLabel.filter(lambda(x,v):x==v).count()/test.count() model.save(sc, "hdfs://nameservice1/user/hive/spark/PythonNaiveBayes/model")
sameModel = NaiveBayesModel.load(sc, "hdfs://nameservice1/user/hive/spark/PythonNaiveBayes/model")
}
3. Python
from sklearn import naive_bayes
import random##拆分训练集和测试集
def SplitData(data,M,k,seed):
test=[]
train=[]
random.seed(seed)
for line in data:
if random.randint(0,M)==k:
test.append(''.join(line))
else:
train.append(''.join(line))
return train,test##按分割符拆分X,Y
def parseData(data,delimiter1,delimiter2):
x=[]
y=[]
for line in data:
parts = line.split(delimiter1)
x1 = [float(a) for a in parts[1].split(delimiter2)]
y1 = float(parts[0])
##print x1,y1
x.append(x1)
y.append(y1)
return x,y##读取数据
data=open('e:/java_ws/scalademo/data/sample_naive_bayes_data.txt','r')
training,test=SplitData(data,4,2,10)
trainingX,trainingY=parseData(training,',',' ')
testX,testY=parseData(test,',',' ')##建模
model=naive_bayes.GaussianNB()
model.fit(trainingX,trainingY)##评估
for b in testX:
print(model.predict(b),b)