首页 技术 正文
技术 2022年11月17日
0 收藏 554 点赞 4,256 浏览 5056 个字

Spark Shell

Example 1 – Process Data from List:

scala> val pairs = sc.parallelize( List(
("This", 2),
("is", 3),
("Spark", 5),
("is", 3)
) )
...
scala> pairs.collect().foreach(println)
(This,2)
(is,3)
(Spark,5)
(is,3)
// Reduce Pairs by Keys:
scala> val pair1 = pairs.reduceByKey((x,y) => x+y, 4)
...
scala> pair1.collect.foreach(println)
(Spark,5)
(is,6)
(This,2)
// Decrease values by 1:
scala> val pair2 = pairs.mapValues( x=>x-1 )
scala> pair2.collect.foreach(println)
(This,1)
(is,2)
(Spark,4)
(is,2)
// Group Values by Keys:
scala> pairs.groupByKey.collect().foreach(println)
(Spark,CompactBuffer(5))
(is,CompactBuffer(3, 3))
(This,CompactBuffer(2))

Example 2 – Process Data from Local Text File

// Create an RDD from local test file:
scala> val testFile = sc.textFile("File:///home/PATH_TO_SPARK_HOME/README.MD")

RDD transformation and action can now be applied on the textFile

// This will display the number of lines in this textFile:
scala> textFile.count()
// or simply:
scala> textFile.count
// Note: if no argument, no parenthesis needed
// This will display the first line:
scala> textFile.first
// Filter lines containing "Spark":
scala> val linesWithSpark = textFile.filter (
line => line.contains("Spark")
)
// or simply:
scala> val linesWithSpark = textFile.filter(_.contains ("Spark"))
// Note: underscore "_" means every element in textFile
// Collect the content of linesWithSpark:
scala> linesWithSpark.collect ()
// Print lines of content of linesWithSpark:
scala> linesWithSpark.foreach (println)
// Map each line to #terms in it:
scala> numOfTermsPerLine = textFile.map ( line => line.split(" ").size )// or simply:
scala> numOfTermsPerLine = textFile.map ( _.split(" ").size )
// Aggregate the numOfTermsPerLine to the max #terms:
scala> numOfTermsPerLine.reduce ( (a, b) => if (a>b) a else b )// or use package Math.max:
scala> import java.lang.Math
scala> numOfTermsPerLine.reduce ( (a, b) => Math.max(a, b))
// Convert RDD textFile to an 1-D array of terms:
scala> val terms = textFile.flatMap ( _.split(" ") )// Convert RDD textFile to an 2-D array of lines of terms:
scala> val terms_ = textFile.map ( _.split(" ") )
// Calculate the vocabulary size in textFile:
scala> terms.distinct().count()// or simply:
scala> terms.distinct.count
// Find longest line together with the length in textFile:
scala> val lineLengthPair = textFile.map (
line => (line, line.length) )
scala> val lineWithMaxLength = lineLengthPair.reduce (
(pair1, pair2) => if pair1._2 >= pair2._2 pair1 else pair2 )// alternatively, in a concise way:
scala> val lineWithMaxLength = textfile.map (
line => (line, line.length) ).reduce (
(pair1, pair2) => if (pair1._2 >= pair2._2) pair1 else pair2 )
// Find out all lines with "Spark" along with line number (start with 0)
// and output with format <line_no: line_content>
scala> val lineIndexPair = textFile.zipWithIndex()
scala> val lineIndexPairWithSpark = lineIndexPair.filter (
_._1.contains("Spark"))
scala> lineIndexPairWithSpark.foreach (
pair => println ( pair._2 + ": " + pair._1 )// alternatively, in a concise way:
scala> textFile.zipWithIndex().filter (
_._1.contains("Spark")).foreach (
pair => println(pair._2 + ": ", pair._1) )

Example 3 – Process Data from Local CSV file

Download CSV file by

wget --content-disposition https://webcms3.cse.unsw.edu.au/files/cc5bb4af124130f899cddad80af071f1ad478c3c8eb7440433291459bb603ff1/attachment

Define a name-field mapping for the CSV file

scala> val aucid = 0
scala> val bid = 1
scala> val bidtime = 2
scala> val bidder= 3
scala> val bidderrate = 4
scala> val openbid = 5
scala> val price = 6
scala> val itemtype = 7
scala> val dtl = 8
// Create an RDD as a 2-D array from CSV file:
scala> val auctionRDD = sc.textFile("file:///home/PATH-TO-CSV-FILE/auction.csv")
.map ( _.split(",") )
// Count total number of item types in the auction:
scala> auctionRDD.map ( _(itemtype).distinct.count )// itemtype was previously defined as 7 to index 8th column
// Count total number of bids per itemtype:
scala> auctionRDD.map ( line => ( line(itemtype), 1 )
.reduceByKey ( _ + _ , 4)
.foreach( pair => println (pair._1 + "," + pair._2)
// Find maximum number of bids for each auction
scala> auctionRDD.map ( line => ( line(aucid), 1 ) )
.reduceByKey ( _ + _ , 4)
.reduce ( (pair1, pair2) => if ( pair1._2 >= pair2._2 ) pair1 else pair2 )
._2
// Find top-5 most number of bids for each auction
scala> auctionRDD.map ( line => (line(aucid), 1) )
.reduceByKey ( _ + _ , 4)
.map ( _.swap )
.sortByKey (false)
.map ( _.swap )
.take (5)

Example 4 – Word Count on HDFS Text File

Download & put data file to HDFS by:

wget --content-disposition https://webcms3.cse.unsw.edu.au/files/33c7707c8b646a686e33af7e2f2fc006b53ff8c13d8317976bd262d8c6daae66/attachment
hdfs dfs -put pg100.txt Input/
// Create an RDD from HDFS:
scala> val pg100RDD = sc.textFile ("hdfs://HOST-NAME:PORT/user/USER-NAME/Input/pg100.txt")
// Word count:
scala> pg100RDD.flapMap ( _.split(" ") )
.map ( term => (term, 1) )
.reduceByKey ( _ + _ , 3)
.saveAsTextFile ( "OUTPUT-PATH" )

Example N – Spark Graph-X programming

# Download graph data tiny-graph.txt
$ wget --content-disposition https://webcms3.cse.unsw.edu.au/files/ae6f45a3d64c0b35a3bd4d0c2740cc673f000dc60ec17d0e882faf6c20f74509/attachment
// Import Graphx relavent classes:
scala> import org.apache.spark.graphx._
// Load graph data as RDD:
scala> val tinyGraphRDD = sc.textFile ("file:///home/PATH-TO-GRAPH-DATA/tiny-graph.txt")
// Convert raw data <index, srcVertex, destVertex, weight>
// into graphx readable edges:
scala> val edges = tinyGraphRDD.map ( _.split(" ") )
.map ( line =>
Edge ( line(1).toLong,
line(2).toLong,
line(3).toDouble
)
)
// Create a graph:
scala> val graph = Graph.fromEdges[Double, Double] (edges, 0.0)
// Now the graph has been created,
// show the triplets of this graph:
scala> graph.triplets.collect.foreach ( println )

Written with StackEdit.

相关推荐
python开发_常用的python模块及安装方法
adodb:我们领导推荐的数据库连接组件bsddb3:BerkeleyDB的连接组件Cheetah-1.0:我比较喜欢这个版本的cheeta…
日期:2022-11-24 点赞:878 阅读:9,000
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:807 阅读:5,512
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:569 阅读:6,358
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:733 阅读:6,141
Android调用系统相机、自定义相机、处理大图片
Android调用系统相机和自定义相机实例本博文主要是介绍了android上使用相机进行拍照并显示的两种方式,并且由于涉及到要把拍到的照片显…
日期:2022-11-24 点赞:512 阅读:7,771
Struts的使用
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:4,849