首页 技术 正文
技术 2022年11月21日
0 收藏 413 点赞 4,698 浏览 4193 个字

1. SparkSessionsparkSession可以视为sqlContext和hiveContext以及StreamingContext的结合体,这些Context的API都可以通过sparkSession使用。创建SparkSession val spark = SparkSession.builder
    .master(“local[2]”)
    .appName(“spark session example”)
    .getOrCreate()1234使用enableHiveSupport就能够支持hive,相当于hiveContext val spark = SparkSession.builder
    .master(“local[2]”)
    .appName(“spark session example”)
    .enableHiveSupport()
    .getOrCreate()12345API操作,与之前的Context基本一致 //读取csv数据
val df0 = spark.read
  .option(“header”,”true”)
  .csv(“src/main/resources/test.csv”)//读取parquet数据
val df1 = spark.read.parquet(“…”)//读取json数据
val df2 = spark.read.json(“…”)//sql查询
val df3 = spark.sql(“xxx”)
1234567891011121314Spark 2.0向后兼容,所以hiveContext以及sqlContext依旧可用,不过官方建议开发者开始使用SparkSession。 2. DataSet,RDD,DataFrameRDD类型安全,面向对象编程风格,序列化以及反序列化开销大。
DataFrame提供了查询优化器,分区剪枝,自动判断是否使用broadcast join等功能,对rdd进行了大量的优化。对spark了解不深的编程/分析人员非常友好。可以视为Row类型的Dataset (Dataset[Row]),非类型安全,不是面向对象编程风格。
DataSet继承了RDD和DataFrame的优点。数据以编码的二进制形式存储,将对象的schema映射为SparkSQL类型,不需要反序列化就可以进行shuffle等操作,每条记录存储的则是强类型值,类型安全,面向对象编程风格。 Dataset的创建dataset可以从rdd,dataFrame转化,也可以从原始数据直接生成。通过toDS方法创建 val ds1 = Seq(“a”,”b”).toDS()
ds1.show//+—–+
//|value|
//+—–+
//|    a|
//|    b|
//+—–+123456789通过createDataSet创建 case class Person(name: String, age: Int)
val data = Seq(Person(“lsw”, 23), Person(“yyy”, 22))
val ds2 = spark.createDataset(data)
ds2.show//+—-+—+
//|name|age|
//+—-+—+
//| lsw| 23|
//| yyy| 22|
//+—-+—+1234567891011 DataSet与RDD使用上的区别Dataset 结合了 rdd 和 DataFrame 上大多数的API,所以spark 1.x基于 rdd 或 DataFrame 的代码可以很容易的改写成spark 2.x版本数据读取RDDssparkContext.textFile(“/path/to/data.txt”)1Datasets//返回 DataFrame
val df = spark.read.text(“/path/to/data.txt”)
//返回 DataSet[String]
val ds1 = spark.read.textFile(“/path/to/data.txt”)
//或者读取成DataFram再转化成Dataset
val ds2 = spark.read.text(“/path/to/data.txt”).as[String]123456
常用APIRDDs//flatMap,filter
val lines = sc.textFile(“/path/to/data.txt”)
val res = lines
  .flatMap(_.split(” “))
  .filter(_ != “”)//reduce
val rdd = sc.makeRDD(Seq(1, 2, 3, 4))
rdd.reduce((a, b) => a + b)123456789Datasets//flatMap,filter
val lines = spark.read.textFile(“/path/to/data.txt”)
val res = lines
  .flatMap(_.split(” “))
  .filter(_ != “”)//reduce
val ds = Seq(1, 2, 3, 4).toDs
ds.reduce((a, b) => a + b)123456789
reduceByKeyRDDsval reduceCountByRDD = wordsPair
  .reduceByKey(_+_)12Datasetsval reduceCountByDs = wordsPairDs
  .mapGroups((key,values) =>(key,values.length))12
RDD,DataFrame,Dataset的相互转化import spark.implicits._
//Dataset转化为RDD
val ds2rdd = ds.rdd
//Dataset转为DataFrame
val ds2df = ds.toDF//RDD转化为Dataset
val rdd2ds = rdd.toDS
//RDD转化为DataFrame
val rdd2df = rdd.toDF//DataFrame转化为RDD
val df2rdd = df.rdd
//DataFrame转化为DataSet
val df2ds = df.as[Type]
12345678910111213141516
wordCountdata.txthello world
hello spark12RDDsval rdd = sc.textFile(“src/main/resources/data.txt”)
val wordCount = rdd
  .map(word => (word,1))
  .reduceByKey(_+_)1234Datasetsimport spark.implicits._
val wordCount1 = lines
  .flatMap(r => r.split(” “))
  .groupByKey(r => r)
  .mapGroups((k, v) => (k, v.length))
wordCount1.show
//  +—–+——–+
//  |value|count(1)|
//  +—–+——–+
//  |hello|       2|
//  |spark|       1|
//  |world|       1|
//  +—–+——–+//也可以直接使用count函数
val wordCount2 = lines
  .flatMap(r => r.split(” “))
  .groupByKey(v => v)
  .count()
wordCount2.show
//  +—–+—+
//  |   _1| _2|
//  +—–+—+
//  |hello|  2|
//  |spark|  1|
//  |world|  1|
//  +—–+—+123456789101112131415161718192021222324252627Dataset性能提升(来自官方) 
  3. CatalogSpark 2.0中添加了标准的API(称为catalog)来访问Spark SQL中的元数据。这个API既可以操作Spark SQL,也可以操作Hive元数据。 获取catalog从SparkSession中获取catalog val catalog = spark.catalog1 查询临时表和元数据中的表返回Dataset[Table]catalog.listTable.show
//  +—-+——–+———–+———+———–+
//  |name|database|description|tableType|isTemporary|
//  +—-+——–+———–+———+———–+
//  |table|   null|      null|TEMPORARY|        true|
//  |act | default|      null| EXTERNAL|       false|
//  +—-+——–+———–+———+———–+
1234567 创建临时表使用createTempView和createOrReplaceTempView取代registerTempTable。例如    df.createTempView(“table”)
df.createOrReplaceTempView(“table”)
12createTempView创建临时表,如果已存在同名表则报错。
createOrReplaceTempView创建临时表,如果存在则进行替换,与老版本的registerTempTable功能相同。 销毁临时表使用dropTempView取代dropTempTable,销毁临时表的同事会清除缓存的数据。spark.dropTempView(“table”)
1 缓存表对数据进行缓存//缓存表有两种方式
df.cache
catalog.cacheTable(“table”)//判断数据是否缓存
catalog.isCached(“table”)
123456catalog相较于之前的API,对metadata的操作更加的简单,直观。

相关推荐
python开发_常用的python模块及安装方法
adodb:我们领导推荐的数据库连接组件bsddb3:BerkeleyDB的连接组件Cheetah-1.0:我比较喜欢这个版本的cheeta…
日期:2022-11-24 点赞:878 阅读:8,996
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:807 阅读:5,510
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:569 阅读:6,353
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:733 阅读:6,137
Android调用系统相机、自定义相机、处理大图片
Android调用系统相机和自定义相机实例本博文主要是介绍了android上使用相机进行拍照并显示的两种方式,并且由于涉及到要把拍到的照片显…
日期:2022-11-24 点赞:512 阅读:7,770
Struts的使用
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:4,848