盘锦建设小学网站评估企业网站建设
- 作者: 五速梦信息网
- 时间: 2026年03月21日 10:14
当前位置: 首页 > news >正文
盘锦建设小学网站,评估企业网站建设,行业网站程序,瑞安商城网站建设一、SparkSQL Spark SQL和我们之前讲Hive的时候说的hive on spark是不一样的。hive on spark是表示把底层的mapreduce引擎替换为spark引擎。而Spark SQL是Spark自己实现的一套SQL处理引擎。Spark SQL是Spark中的一个模块#xff0c;主要用于进行结构化数据的处理。它提供的最核…一、SparkSQL Spark SQL和我们之前讲Hive的时候说的hive on spark是不一样的。hive on spark是表示把底层的mapreduce引擎替换为spark引擎。而Spark SQL是Spark自己实现的一套SQL处理引擎。Spark SQL是Spark中的一个模块主要用于进行结构化数据的处理。它提供的最核心的编程抽象就是DataFrame。 DataFrameRDDSchema 。 它其实和关系型数据库中的表非常类似RDD可以认为是表中的数据Schema是表结构信息。DataFrame可以通过很多来源进行构建包括结构化的数据文件Hive中的表外部的关系型数据库以及RDD 注意 Spark1.3出现的 DataFrame Spark1.6出现了 DataSet 在Spark2.0中两者统一DataFrame等于DataSet[Row] 二、SparkSession 要使用Spark SQL首先需要创建一个SpakSession对象。SparkSession中包含了SparkContext和SqlContext所以说想通过SparkSession来操作RDD的话需要先通过它来获取SparkContext 这个SqlContext是使用sparkSQL操作hive的时候会用到的。 SparkSession包含了SparkContext和SqlContext 1SparkContext 用于操作RDD 2SqlContext 用于操作hive 正常使用SparkSession操作DataFrame就可以了 三、创建DataFrame 使用SparkSession可以从RDD、HIve表或者其它数据源创建DataFrame 那下面我们来使用JSON文件来创建一个DataFrame
- 引用依赖 dependencygroupIdorg.apache.spark/groupIdartifactIdspark-sql_2.11/artifactIdversion2.4.3/version !– scopeprovided/scope–/dependency
- Scala代码 package com.sanqian.scala.sqlimport org.apache.spark.SparkConf import org.apache.spark.sql.SparkSessionobject SqlDemoScala {def main(args: Array[String]): Unit {val conf new SparkConf().setMaster(local)//创建SparkSession对象里面包含SparkContext和SqlContextval sparkSession SparkSession.builder().appName(SqlDemoScala).config(conf).getOrCreate()//读取json文件, 创建DataFrameval df sparkSession.read.json(D:\data\spark\student.json)//查DataFrame中的数据df.show()sparkSession.stop()} }3. Java代码 package com.sanqian.java.sql;import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession;public class SqlDemoJava {public static void main(String[] args) {SparkConf conf new SparkConf();conf.setMaster(local);//创建SparkSession对象里面包含SparkContext和SqlContextSparkSession sparkSession SparkSession.builder().appName(SqlDemoJava).config(conf).getOrCreate();//读取json文件获取DataSetRowDatasetRow df sparkSession.read().json(D:\data\spark\student.json);df.show();sparkSession.stop();} }4. DataFrame 和DataSet的转换 由于DataFrame等于DataSet[Row]它们两个可以互相转换所以创建哪个都是一样的。咱们前面的scala代码默认创建的是DataFramejava代码默认创建的是DataSet。 尝试对他们进行转换 1在Scala代码中将DataFrame转换为DataSet[Row]对后面的操作没有影响 //将DataFrame转换为DataSet[Row] val stuDf sparkSession.read.json(D:\student.json).as(stu) 2在Java代码中将DataSet[Row]转换为DataFrame //将DatasetRow转换为DataFrame DatasetRow stuDf sparkSession.read().json(D:\student.json).toDF(); 四、DataFrame常见算子操作
- 官方文档 2. DataFrame算子 • printSchema() 打印schema信息 • show() 默认显示所有的数据可以通过参数控制显示多少条 • select() 查询数据中指定字段信息在使用\(对数据做一些操作需要添加隐式转换函数否则语法报错 • filter()、where() 对数据进行 过滤where底层调用的就是filter • groupBy() : 对数据进行分组求和 • count() : 求和 注意在使用\)对数据做一些操作需要添加隐式转换函数否则语法报错 import sparkSession.implicits._ Scala代码 package com.sanqian.scala.sqlimport org.apache.spark.SparkConf import org.apache.spark.sql.SparkSessionobject DataFrameOpScala {def main(args: Array[String]): Unit {val conf new SparkConf()conf.setMaster(local)val sparkSession SparkSession.builder().appName(DataFrameOpScala).config(conf).getOrCreate()val df sparkSession.read.json(D:\data\spark\student.json)//打印schema信息df.printSchema()//默认显示所有数据可以通过参数控制显示多少条df.show(2)//查询数据中的指定字段信息df.select(name, age).show()//在使用select的时候可以对数据做一些操作需要添加隐式转换函数否则语法报错import sparkSession.implicits._df.select(\(name, \)age 1).show()df.filter($age 18).show()//对数据进行分组求和df.groupBy(age).count().show()sparkSession.stop()} }Java代码 package com.sanqian.java.sql;import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import static org.apache.spark.sql.functions.col;public class DataFrameOpJava {public static void main(String[] args) {SparkConf conf new SparkConf();conf.setMaster(local);//创建SparkSession对象里面包含SparkContext和SqlContextSparkSession sparkSession SparkSession.builder().appName(DataFrameOpJava).config(conf).getOrCreate();DatasetRow ds sparkSession.read().json(D:\data\spark\student.json);//打印schema信息ds.printSchema();ds.show(2);ds.select(name, age).show();//在select的时候可以对数据做一些操作,需要引入import static org.apache.spark.sql.functions.col;ds.select(col(name), col(age).plus(1)).show();//对数据进行过滤ds.filter(col(age).gt(18)).show();ds.where(col(age).gt(18)).show();//对数据进行分组求和ds.groupBy(age).count().show();sparkSession.stop();} }这些就是针对DataFrame的一些常见的操作。但是现在这种方式其实用起来还是不方便只是提供了一些类似于可以操作表的算子很对一些简单的查询还是可以的但是针对一些复杂的操作使用算子写起来就很麻烦了所以我们希望能够直接支持用sql的方式执行Spark SQL也是支持的。 五、DataFrame的sql操作 想要实现直接支持sql语句查询DataFrame中的数据 需要两步操作 1. 先将DataFrame注册为一个临时表 2. 使用sparkSession中的sql函数执行sql语句 1. Scala代码 package com.sanqian.scala.sqlimport org.apache.spark.SparkConf import org.apache.spark.sql.SparkSessionobject DataFrameSqlScala {def main(args: Array[String]): Unit {val conf new SparkConf()conf.setMaster(local)val sparkSession SparkSession.builder().appName(DataFrameOpScala).config(conf).getOrCreate()val df sparkSession.read.json(D:\data\spark\student.json)//将DataFrame注册为一个临时表df.createOrReplaceTempView(student)//使用sql查询临时表中的数据sparkSession.sql(select age, count() as num from student group by age).show()sparkSession.stop()} }2. Java代码 package com.sanqian.java.sql;import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession;public class DataFrameSqlJava {public static void main(String[] args) {SparkConf conf new SparkConf();conf.setMaster(local);//创建SparkSession对象里面包含SparkContext和SqlContextSparkSession sparkSession SparkSession.builder().appName(DataFrameOpJava).config(conf).getOrCreate();DatasetRow ds sparkSession.read().json(D:\data\spark\student.json);ds.createOrReplaceTempView(student);sparkSession.sql(select age, count() as num from student group by age).show();sparkSession.stop();} }六、RDD转换为DataFrame 为什么要将RDD转换为DataFrame? 在实际工作中我们可能会先把hdfs上的一些日志数据加载进来然后进行一些处理最终变成结构化的数据希望对这些数据做一些统计分析当然了我们可以使用spark中提供的transformation算子来实现只不过会有一些麻烦毕竟是需要写代码的如果能够使用sql实现其实是更加方便的。所以可以针对我们前面创建的RDD将它转换为DataFrame这样就可以使用dataFrame中的一些算子或者直接写sql来操作数据了。 Spark SQL支持这两种方式将RDD转换为DataFrame 1. 反射方式 2. 编程方式 一反射方式 下面来看一下反射方式 这种方式是使用反射来推断RDD中的元数据。基于反射的方式代码比较简洁也就是说当你在写代码的时候已经知道了RDD中的元数据这样的话使用反射这种方式是一种非常不错的选择。Scala具有隐式转换的特性所以spark sql的scala接口是支持自动将包含了case class的RDD转换为DataFrame的 下面来举一个例子
- scala代码 package com.sanqian.scala.sqlimport org.apache.spark.SparkConf import org.apache.spark.sql.SparkSessionobject RddToDataFrameByReflectScala {def main(args: Array[String]): Unit {val conf new SparkConf()conf.setMaster(local)val sparkSession SparkSession.builder().appName(RddToDataFrameByReflectScala).config(conf).getOrCreate()val sc sparkSession.sparkContextval rdd sc.parallelize(Array((jack, 18), (tom, 20), (jess, 30)))//基于反射直接将包含Student对象的RDD转换为DataFrame//需要导入隐式转换import sparkSession.implicits._val df rdd.map(tup Student(tup._1, tup.2)).toDF()df.createOrReplaceTempView(student)//执行sql查询val rdd2 sparkSession.sql(select name, age from student where age 18)rdd2.show()//将DataFrame转化为RDDrdd2.map(row Student(row(0).toString, row(1).toString.toInt)).collect().foreach(println())//使用row的getAs()方法获取指定列名的值rdd2.map(row Student(row.getAsString, row.getAsInt)).collect().foreach(println(_))sparkSession.stop()} } //定义一个Student case class Student(name: String, age: Int)2. java代码 package com.sanqian.java.sql;import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import scala.Tuple2; import sun.awt.windows.WPrinterJob;import java.io.Serializable; import java.util.Arrays; import java.util.List;public class RddToDataFrameByReflectJava {public static void main(String[] args) {SparkConf conf new SparkConf();conf.setMaster(local);//创建SparkSession对象里面包含SparkContext和SqlContextSparkSession sparkSession SparkSession.builder().appName(RddToDataFrameByReflectJava).config(conf).getOrCreate();//获取SparkContext//从sparkSession中获取的是scala中的sparkContext所以需要转换成java中的sparkContextJavaSparkContext sc JavaSparkContext.fromSparkContext(sparkSession.sparkContext());Tuple2String, Integer t1 new Tuple2(jack, 18);Tuple2String, Integer t2 new Tuple2(tom, 20);Tuple2String, Integer t3 new Tuple2(jess, 30);JavaRDDTuple2String, Integer rdd sc.parallelize(Arrays.asList(t1, t2, t3));JavaRDDStudent rdd2 rdd.map(new FunctionTuple2String, Integer, Student() {Overridepublic Student call(Tuple2String, Integer tup) throws Exception {return new Student(tup._1, tup._2);}});//注意Student这个类必须声明为public并且必须实现序列化DatasetRow df sparkSession.createDataFrame(rdd2, Student.class);df.createOrReplaceTempView(student);//执行sql查询DatasetRow df2 sparkSession.sql(select name, age from student where age 18);df2.show();//将DataFrame转化为RDD注意这里需要转为JavaRDDJavaRDDRow resRDD df2.javaRDD();//从row中取数据封装成student打印到控制台ListStudent resList resRDD.map(new FunctionRow, Student() {Overridepublic Student call(Row row) throws Exception {return new Student(row.getAs(name).toString(), Integer.parseInt(row.getAs(age).toString()));}}).collect();for(Student stu: resList){System.out.println(stu);}sparkSession.stop();} } 二编程方式 这种方式是通过编程接口来创建DataFrame你可以在程序运行时动态构建一份元数据就是Schema然后将其应用到已经存在的RDD上。这种方式的代码比较冗长但是如果在编写程序时还不知道RDD的元数据只有在程序运行时才能动态得知其元数据那么只能通过这种动态构建元数据的方式。也就是说当case calss中的字段无法预先定义的时候就只能用编程方式动态指定元数据了。
- Scala代码 package com.sanqian.scala.sqlimport org.apache.spark.SparkConf import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession}/*** 需求使用编程方式实现RDD转换为DataFrame**/ object RddToDataFrameByProgramScala {def main(args: Array[String]): Unit {val conf new SparkConf().setMaster(local)//创建SparkSession对象里面包含SparkContext和SqlContextval sparkSession SparkSession.builder().appName(RddToDataFrameByProgramScala).config(conf).getOrCreate()//获取SparkContextval sc sparkSession.sparkContextval dataRDD sc.parallelize(Array((jack, 18), (tom, 20), (jessic, 30)))//组装rowRDDval rowRDD dataRDD.map(tup Row(tup._1, tup.2))//指定元数据信息【这个元数据信息就可以动态从外部获取了比较灵活】val schema StructType(Array(StructField(name, StringType, true),StructField(age, IntegerType, true)))//组装DataFrameval stuDf sparkSession.createDataFrame(rowRDD, schema)//下面就可以通过DataFrame的方式操作dataRDD中的数据了stuDf.createOrReplaceTempView(student)//执行sql查询val resDf sparkSession.sql(select name,age from student where age 18)//将DataFrame转化为RDDval resRDD resDf.rddresRDD.map(row (row(0).toString, row(1).toString.toInt)).collect().foreach(println())sparkSession.stop()} }
- Java代码 package com.sanqian.java.sql;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import scala.Tuple2;import java.util.ArrayList; import java.util.Arrays; import java.util.List;public class RddToDataFrameByProgramJava {public static void main(String[] args) {SparkConf conf new SparkConf();conf.setMaster(local); //创建SparkSession对象里面包含SparkContext和SqlContextSparkSession sparkSession SparkSession.builder().appName(RddToDataFrameByProgramJava).config(conf).getOrCreate();//获取SparkContext//从sparkSession中获取的是scala中的sparkContext所以需要转换成java中的sparkContextJavaSparkContext sc JavaSparkContext.fromSparkContext(sparkSession.sparkContext());Tuple2String, Integer t1 new Tuple2String, Integer(jack, 18);Tuple2String, Integer t2 new Tuple2String, Integer(tom, 20);Tuple2String, Integer t3 new Tuple2String, Integer(jessic,30);JavaRDDTuple2String, Integer dataRDD sc.parallelize(Arrays.asList(t1, t2, t3)); //组装rowRDDJavaRDDRow rowRDD dataRDD.map(new FunctionTuple2String, Integer , Row () {Overridepublic Row call (Tuple2 String, Integer tup) throws Exception {return RowFactory.create(tup._1, tup._2);}}); //指定元数据信息ArrayListStructField structFieldList new ArrayListStructField();structFieldList.add(DataTypes.createStructField(name, DataTypes.StringType, true));structFieldList.add(DataTypes.createStructField(age, DataTypes.IntegerType, true));StructType schema DataTypes.createStructType(structFieldList); //构建DataFrameDatasetRow stuDf sparkSession.createDataFrame(rowRDD, schema);stuDf.createOrReplaceTempView(student); //执行sql查询DatasetRow resDf sparkSession.sql(select name,age from student where age 18); //将DataFrame转化为RDD注意这里需要转为JavaRDDJavaRDD Row resRDD resDf.javaRDD();ListTuple2String, Integer resList resRDD.map(new FunctionRow, Tuple2String, Integer() {Overridepublic Tuple2String, Integer call(Row row) throws Exception {return new Tuple2String, Integer(row.getString(0), row.getInt(1));}}).collect();for (Tuple2String, Integer tup : resList) {System.out.println(tup);}sparkSession.stop();} } 七、load和save操作 对于Spark SQL的DataFrame来说无论是从什么数据源创建出来的DataFrame都有一些共同的load和save操作。 load操作主要用于加载数据创建出DataFrame save操作主要用于将DataFrame中的数据保存到文件中。 我们前面操作json格式的数据的时候好像没有使用load方法而是直接使用的json方法这是什么特殊用法吗查看json方法的源码会发现它底层调用的是format和load方法 def json(paths: String): DataFrame format(json).load(paths : _) 注意如果看不到源码需要点击idea右上角的download source提示信息下载依赖的源码。 我们如果使用原始的format和load方法加载数据此时如果不指定format则默认读取的数据源格式是parquet也可以手动指定数据源格式。Spark SQL 内置了一些常见的数据源类型比如json, parquet, jdbc, orc, csv, text 通过这个功能就可以在不同类型的数据源之间进行转换了。
- Scala代码 package com.sanqian.scala.sqlimport org.apache.spark.SparkConf import org.apache.spark.sql.SparkSessionobject LoadAndSaveOpScala {def main(args: Array[String]): Unit {val conf new SparkConf().setMaster(local)//创建SparkSession对象里面包含SparkContext和SqlContextval sparkSession SparkSession.builder().appName(LoadAndSaveOpScala).config(conf).getOrCreate()//读取数据val stuDf sparkSession.read.format(json).load(D:\data\spark\student.json)//保存数据stuDf.select(name, age).write.format(csv).save(hdfs://bigdata01:9000/out-save001)sparkSession.stop()} }
- Java代码 package com.sanqian.java.sql;import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession;public class LoadAndSaveOpJava {public static void main(String[] args) {SparkConf conf new SparkConf();conf.setMaster(local);//创建SparkSession对象里面包含SparkContext和SqlContextSparkSession sparkSession SparkSession.builder().config(conf).appName(LoadAndSaveOpJava).getOrCreate();//读取数据DatasetRow df sparkSession.read().format(json).load(D:\data\spark\student.json);//保存数据df.select(name, age).write().format(csv).save(hdfs://bigdata01:9000/out-save002);sparkSession.stop();} }八、SaveMode Spark SQL对于save操作提供了不同的save mode。 主要用来处理当目标位置已经有数据时应该如何处理。save操作不会执行锁操作并且也不是原子的因此是有一定风险出现脏数据的。 SaveMode 解释1SaveMode.ErrorIfExists (默认)如果目标位置已经存在数据那么抛出一个异常2SaveMode.Append如果目标位置已经存在数据那么将数据追加进去3SaveMode.Overwrite如果目标位置已经存在数据那么就将已经存在的数据删除用新数据进行覆盖4SaveMode.Ignore如果目标位置已经存在数据那么就忽略不做任何操作 //保存数据df.select(name, age).write().format(csv).mode(SaveMode.Append).save(hdfs://bigdata01:9000/out-save002); 执行之后的结果确实是追加到之前的结果目录中了 九、内置函数 Spark中提供了很多内置的函数 1聚合函数 avg, count, countDistinct, first, last, max, mean, min, sum, sumDistinct 2集合函数array_contains, explode, size 3日期/时间函数 datediff, date_add, date_sub, add_months, last_day, next_day, months_between, current_date, current_timestamp, date_format 4数学函数abs, ceil, floor, round 5混合函数if, isnull, md5, not, rand, when 6字符串函数concat, get_json_object, length, reverse, split, upper 7窗口函数denseRank, rank, rowNumber 其实这里面的函数和hive中的函数是类似的 注意 SparkSQL中的SQL函数文档不全其实在使用这些函数的时候大家完全可以去查看hive中sql的文档使用的时候都是一样的。
- 上一篇: 排行榜哪个网站最好wordpress入侵过程
- 下一篇: 盘锦网站建设哪家好上海设计公司电话
相关文章
-
排行榜哪个网站最好wordpress入侵过程
排行榜哪个网站最好wordpress入侵过程
- 技术栈
- 2026年03月21日
-
排名好的移动网站建设厦门市建设工程造价协会官方网站
排名好的移动网站建设厦门市建设工程造价协会官方网站
- 技术栈
- 2026年03月21日
-
拍卖网站怎么做小程序模板使用
拍卖网站怎么做小程序模板使用
- 技术栈
- 2026年03月21日
-
盘锦网站建设哪家好上海设计公司电话
盘锦网站建设哪家好上海设计公司电话
- 技术栈
- 2026年03月21日
-
盘锦网站开发番禺网站开发哪家专业
盘锦网站开发番禺网站开发哪家专业
- 技术栈
- 2026年03月21日
-
盘锦微信网站建设深圳网站建设解决方案
盘锦微信网站建设深圳网站建设解决方案
- 技术栈
- 2026年03月21日






