高校保卫处网站建设工作书店网站建设
- 作者: 五速梦信息网
- 时间: 2026年03月21日 11:12
当前位置: 首页 > news >正文
高校保卫处网站建设工作,书店网站建设,佛山 网站开发,网站按照谁建设 谁负责第五章 RDD Checkpoint RDD 数据可以持久化#xff0c;但是持久化/缓存可以把数据放在内存中#xff0c;虽然是快速的#xff0c;但是也是最不可靠的#xff1b;也可以把数据放在磁盘上#xff0c;也不是完全可靠的#xff01;例如磁盘会损坏等。 Checkpoint的产生就是…第五章 RDD Checkpoint RDD 数据可以持久化但是持久化/缓存可以把数据放在内存中虽然是快速的但是也是最不可靠的也可以把数据放在磁盘上也不是完全可靠的例如磁盘会损坏等。 Checkpoint的产生就是为了更加可靠的数据持久化在Checkpoint的时候一般把数据放在HDFS上这就天然的借助了HDFS天生的高容错、高可靠来实现数据最大程度上的安全实现了RDD的容错和高可用。 在Spark Core中对RDD做checkpoint可以切断做checkpoint RDD的依赖关系将RDD数据保存到可靠存储如HDFS以便数据恢复 演示范例代码如下 import org.apache.spark.{SparkConf, SparkContext} /**
- RDD数据Checkpoint设置案例演示
*/
object SparkCkptTest {
def main(args: Array[String]): Unit {
// 创建应用程序入口SparkContext实例对象
val sc: SparkContext {
// 1.a 创建SparkConf对象设置应用的配置信息
val sparkConf: SparkConf new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix($))
.setMaster(local[2])
// 1.b 传递SparkConf对象构建Context实例
new SparkContext(sparkConf)
}
sc.setLogLevel(WARN)
// TODO: 设置检查点目录将RDD数据保存到那个目录
sc.setCheckpointDir(datas/spark/ckpt/)
// 读取文件数据
val datasRDD sc.textFile(datas/wordcount/wordcount.data)
// TODO: 调用checkpoint函数将RDD进行备份需要RDD中Action函数触发
datasRDD.checkpoint()
datasRDD.count()
// TODO: 再次执行count函数, 此时从checkpoint读取数据
datasRDD.count()
// 应用程序运行结束关闭资源
Thread.sleep(100000)
sc.stop()
}
}持久化和Checkpoint的区别 1、存储位置
Persist 和 Cache 只能保存在本地的磁盘和内存中(或者堆外内存)Checkpoint 可以保存数据到 HDFS 这类可靠的存储上 2、生命周期Cache和Persist的RDD会在程序结束后会被清除或者手动调用unpersist方法Checkpoint的RDD在程序结束后依然存在不会被删除 3、Lineage(血统、依赖链、依赖关系)Persist和Cache不会丢掉RDD间的依赖链/依赖关系因为这种缓存是不可靠的如果出现了一些错误(例如 Executor 宕机)需要通过回溯依赖链重新计算出来Checkpoint会斩断依赖链因为Checkpoint会把结果保存在HDFS这类存储中更加的安全可靠一般不需要回溯依赖链
第六章 外部数据源 Spark可以从外部存储系统读取数据比如RDBMs表中或者HBase表中读写数据这也是企业中常常使用如下两个场景 1、要分析的数据存储在HBase表中需要从其中读取数据数据分析 日志数据电商网站的商家操作日志订单数据保险行业订单数据 2、使用Spark进行离线分析以后往往将报表结果保存到MySQL表中网站基本分析pv、uv。。。。。 6.1 HBase 数据源 Spark可以从HBase表中读写Read/Write数据底层采用TableInputFormat和TableOutputFormat方式与MapReduce与HBase集成完全一样使用输入格式InputFormat和输出格式OutputFoamt。
HBase Sink 回 顾 MapReduce 向 HBase 表 中 写 入 数 据 使 用 TableReducer 其 中 OutputFormat 为TableOutputFormat读取数据KeyImmutableBytesWritableValuePut。 写 入 数 据 时 需 要 将 RDD 转换为 RDD[(ImmutableBytesWritable, Put)] 类 型 调 用saveAsNewAPIHadoopFile方法数据保存至HBase表中。 HBase Client连接时需要设置依赖Zookeeper地址相关信息及表的名称通过Configuration设置属性值进行传递。 范例演示将词频统计结果保存HBase表表的设计 代码如下 import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** - 将RDD数据保存至HBase表中 */ object SparkWriteHBase { def main(args: Array[String]): Unit { // 创建应用程序入口SparkContext实例对象 val sc: SparkContext { // 1.a 创建SparkConf对象设置应用的配置信息 val sparkConf: SparkConf new SparkConf() .setAppName(this.getClass.getSimpleName.stripSuffix($)) .setMaster(local[2]) // 1.b 传递SparkConf对象构建Context实例 new SparkContext(sparkConf) } sc.setLogLevel(WARN) // TODO: 1、构建RDD val list List((hadoop, 234), (spark, 3454), (hive, 343434), (ml, 8765)) val outputRDD: RDD[(String, Int)] sc.parallelize(list, numSlices 2) // TODO: 2、将数据写入到HBase表中, 使用saveAsNewAPIHadoopFile函数要求RDD是(key, Value) // TODO: 组装RDD[(ImmutableBytesWritable, Put)] /**
- HBase表的设计
- 表的名称htb_wordcount
- Rowkey: word
- 列簇: info
- 字段名称 count
/
val putsRDD: RDD[(ImmutableBytesWritable, Put)] outputRDD.mapPartitions{ iter
iter.map { case (word, count)
// 创建Put实例对象
val put new Put(Bytes.toBytes(word))
// 添加列
put.addColumn(
// 实际项目中使用HBase时插入数据先将所有字段的值转为String再使用Bytes转换为字节数组
Bytes.toBytes(info), Bytes.toBytes(cout), Bytes.toBytes(count.toString)
)
// 返回二元组
(new ImmutableBytesWritable(put.getRow), put)
}
}
// 构建HBase Client配置信息
val conf: Configuration HBaseConfiguration.create()
// 设置连接Zookeeper属性
conf.set(hbase.zookeeper.quorum, node1.itcast.cn)
conf.set(hbase.zookeeper.property.clientPort, 2181)
conf.set(zookeeper.znode.parent, /hbase)
// 设置将数据保存的HBase表的名称
conf.set(TableOutputFormat.OUTPUT_TABLE, htb_wordcount)
/
def saveAsNewAPIHadoopFile(
path: String,// 保存的路径
keyClass: Class[], // Key类型
valueClass: Class[], // Value类型
outputFormatClass: Class[_ : NewOutputFormat[_, _]], // 输出格式OutputFormat实现
conf: Configuration self.context.hadoopConfiguration // 配置信息
): Unit
*/
putsRDD.saveAsNewAPIHadoopFile(
datas/spark/htb-output- System.nanoTime(), //
classOf[ImmutableBytesWritable], //
classOf[Put], //
classOf[TableOutputFormat[ImmutableBytesWritable]], //
conf
)
// 应用程序运行结束关闭资源
sc.stop()
}
}运行完成以后使用hbase shell查看数据 HBase Source 回 顾 MapReduce 从 读 HBase 表 中 的 数 据 使 用 TableMapper 其 中 InputFormat 为TableInputFormat读取数据KeyImmutableBytesWritableValueResult。
从HBase表读取数据时同样需要设置依赖Zookeeper地址信息和表的名称使用Configuration设置属性形式如下
此外读取的数据封装到RDD中Key和Value类型分别为ImmutableBytesWritable和Result,不支持Java Serializable导致处理数据时报序列化异常。设置Spark Application使用Kryo序列化性能要比Java 序列化要好创建SparkConf对象设置相关属性如下所示 范例演示从HBase表读取词频统计结果代码如下 import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration} import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** - 从HBase 表中读取数据封装到RDD数据集
/
object SparkReadHBase {
def main(args: Array[String]): Unit {
// 创建应用程序入口SparkContext实例对象
val sc: SparkContext {
// 1.a 创建SparkConf对象设置应用的配置信息
val sparkConf: SparkConf new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix($))
.setMaster(local[2])
// TODO: 设置使用Kryo 序列化方式
.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)
// TODO: 注册序列化的数据类型
.registerKryoClasses(Array(classOf[ImmutableBytesWritable], classOf[Result]))
// 1.b 传递SparkConf对象构建Context实例
new SparkContext(sparkConf)
}
sc.setLogLevel(WARN)
// TODO: a. 读取HBase Client 配置信息
val conf: Configuration HBaseConfiguration.create()
conf.set(hbase.zookeeper.quorum, node1.itcast.cn)
conf.set(hbase.zookeeper.property.clientPort, 2181)
conf.set(zookeeper.znode.parent, /hbase)
// TODO: b. 设置读取的表的名称
conf.set(TableInputFormat.INPUT_TABLE, htb_wordcount)
/
def newAPIHadoopRDDK, V, F : NewInputFormat[K, V]: RDD[(K, V)]
*/
val resultRDD: RDD[(ImmutableBytesWritable, Result)] sc.newAPIHadoopRDD(
conf, //
classOf[TableInputFormat], //
classOf[ImmutableBytesWritable], //
classOf[Result] //
)
println(sCount \({resultRDD.count()})
resultRDD
.take(5)
.foreach { case (rowKey, result)
println(sRowKey \){Bytes.toString(rowKey.get())})
// HBase表中的每条数据封装在result对象中解析获取每列的值
result.rawCells().foreach { cell
val cf Bytes.toString(CellUtil.cloneFamily(cell))
val column Bytes.toString(CellUtil.cloneQualifier(cell))
val value Bytes.toString(CellUtil.cloneValue(cell))
val version cell.getTimestamp
println(s\t \(cf:\)column \(value, version \)version)
}
}
// 应用程序运行结束关闭资源
sc.stop()
}
}运行结果
6.2 MySQL 数据源 实际开发中常常将分析结果RDD保存至MySQL表中使用foreachPartition函数此外Spark中提供JdbcRDD用于从MySQL表中读取数据。 调用RDD#foreachPartition函数将每个分区数据保存至MySQL表中保存时考虑降低RDD分区数目和批量插入提升程序性能。 范例演示将词频统计WordCount结果保存MySQL表tb_wordcount。 建表语句 USE db_test ; CREATE TABLE tb_wordcount ( count varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL, word varchar(100) NOT NULL, PRIMARY KEY (word) ) ENGINEInnoDB DEFAULT CHARSETutf8mb4 COLLATEutf8mb4_0900_ai_ci ;演示代码 import java.sql.{Connection, DriverManager, PreparedStatement} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** - 将词频统计结果保存到MySQL表中 */ object SparkWriteMySQL { def main(args: Array[String]): Unit { // 创建应用程序入口SparkContext实例对象 val sc: SparkContext { // 1.a 创建SparkConf对象设置应用的配置信息 val sparkConf: SparkConf new SparkConf() .setAppName(this.getClass.getSimpleName.stripSuffix($)) .setMaster(local[2]) // 1.b 传递SparkConf对象构建Context实例 new SparkContext(sparkConf) } sc.setLogLevel(WARN) // 1. 从HDFS读取文本数据封装集合RDD val inputRDD: RDD[String] sc.textFile(datas/wordcount/wordcount.data) // 2. 处理数据调用RDD中函数 val resultRDD: RDD[(String, Int)] inputRDD // 3.a 每行数据分割为单词 .flatMap(line line.split(\s)) // 3.b 转换为二元组表示每个单词出现一次 .map(word (word, 1)) // 3.c 按照Key分组聚合 .reduceByKey((tmp, item) tmp item) // 3. 输出结果RDD保存到MySQL数据库 resultRDD // 对结果RDD保存到外部存储系统时考虑降低RDD分区数目 .coalesce(1) // 对分区数据操作 .foreachPartition{iter saveToMySQL(iter)} // 应用程序运行结束关闭资源 sc.stop() } /**
- 将每个分区中的数据保存到MySQL表中
- param datas 迭代器封装RDD中每个分区的数据 */ def saveToMySQL(datas: Iterator[(String, Int)]): Unit { // a. 加载驱动类 Class.forName(com.mysql.cj.jdbc.Driver) // 声明变量 var conn: Connection null var pstmt: PreparedStatement null try{ // b. 获取连接 conn DriverManager.getConnection( jdbc:mysql://node1.itcast.cn:3306/?serverTimezoneUTCcharacterEncodingutf8useUnic odetrue, root, 123456 ) // c. 获取PreparedStatement对象 val insertSql INSERT INTO db_test.tb_wordcount (word, count) VALUES(?, ?) pstmt conn.prepareStatement(insertSql) conn.setAutoCommit(false) // d. 将分区中数据插入到表中批量插入 datas.foreach{case (word, count) pstmt.setString(1, word) pstmt.setLong(2, count.toLong) // 加入批次 pstmt.addBatch() } // TODO: 批量插入 pstmt.executeBatch() conn.commit() }catch { case e: Exception e.printStackTrace() }finally { if(null ! pstmt) pstmt.close() if(null ! conn) conn.close() } } }运行程序查看数据库表的数据
相关文章
-
高唐企业网站建设wordpress企业模板主题
高唐企业网站建设wordpress企业模板主题
- 技术栈
- 2026年03月21日
-
高速公路建设论坛网站phpcms内容管理系统
高速公路建设论坛网站phpcms内容管理系统
- 技术栈
- 2026年03月21日
-
高水平的番禺网站建设做阀门网站电话
高水平的番禺网站建设做阀门网站电话
- 技术栈
- 2026年03月21日
-
高校服务地方专题网站建设网站建设使用什么软件比较好
高校服务地方专题网站建设网站建设使用什么软件比较好
- 技术栈
- 2026年03月21日
-
高校建设主流网站wordpress选择幻灯片模版没有用
高校建设主流网站wordpress选择幻灯片模版没有用
- 技术栈
- 2026年03月21日
-
高校廉洁文化建设网站linux宝塔面板做网站
高校廉洁文化建设网站linux宝塔面板做网站
- 技术栈
- 2026年03月21日






