import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.sql.hive.HiveContext//spark-shell --driver-class-path /home/hadoop/hive/lib/mysql-connector-java-5.1.46.jar
object playuser {
def main(args: Array[String]): Unit = {
val cf = new SparkConf().setMaster("master").setAppName("NetworkWordCount")
val sc = new SparkContext(cf)
val sqlContext = new SQLContext(sc)
val hc = new HiveContext(sc)
val format = new java.text.SimpleDateFormat("yyyy-MM-dd")
val date = format.format(new java.util.Date().getTime - * * * * )
// val lg = sc.textFile("hdfs://master:9000/data/" + date + "*/01/*.gz")
val lg = sc.textFile("hdfs://master:9000/data/2018-05-1*/21/*.gz") //val date1 = format.format(("27648000000").toLong)
val url ="jdbc:mysql://196.168.100.88:3306/sharpbi?user=biadmin&password=bi_12345"
//val url2 = "jdbc:mysql://rds3dabp9v2v7v596tai.mysql.rds.aliyuncs.com/r2d2?user=r2d2_admin&password=Vj0kHdve3" // insert into mysql
import sqlContext.implicits._ val filed2=lg.map(l=>(
l.split("modeType\":\"").last.split("\"").head.replace("{","null"),
l.split("packageName\":\"").last.split("\"").head.replace("{","null"),
l.split("siteName\":\"").last.split("\"").head.replace("{","null"),
l.split("playType\":\"").last.split("\"").head.replace("{","null"),
format.format(l.split("rectime\":").last.split(",").head.replace("{","").toLong),
format.format(l.split("time\":\"").last.split("\"").head.replace("{","").toLong),
l.split("playtime\":\"").last.split("\"").head.replace("{","null").toString,
l.split("custom_uuid\":\"").last.split("\"").head.replace("{","null").toString
)).toDF("modeType","packageName","siteName","playType","rectimedate","timedate","playtime","custom_uuid").registerTempTable("playuser") val playuser = sqlContext.sql("select modeType,packageName,siteName,playType,rectimedate,timedate,sum(playtime) as playtime,count(custom_uuid) as playstotal,count(distinct custom_uuid) customtotal from playuser group by modeType,packageName,siteName,playType,rectimedate,timedate") val prop = new java.util.Properties
//append 是增
playuser.write.mode("append").jdbc(url, "sharpbi.playuser", prop)
// F1.write.mode("Overwrite").jdbc(url, "sharpbi.test", prop) 重新建表,覆盖原数据
// F1.insertIntoJDBC(url, "day_uv", false)
val stud_scoreDF = sqlContext.read.jdbc(url,"sharpbi.playuser",prop)
stud_scoreDF.count() }
}