重庆网站seo案例教育类网站开发
- 作者: 五速梦信息网
- 时间: 2026年03月21日 03:49
当前位置: 首页 > news >正文
重庆网站seo案例,教育类网站开发,企业网站建设长沙,wordpress 营销插件文章目录 第1关#xff1a;统计共享单车每天的平均使用时间第2关#xff1a;统计共享单车在指定地点的每天平均次数第3关#xff1a;统计共享单车指定车辆每次使用的空闲平均时间第4关#xff1a;统计指定时间共享单车使用次数第5关#xff1a;统计共享单车线路流量 第1关… 文章目录 第1关统计共享单车每天的平均使用时间第2关统计共享单车在指定地点的每天平均次数第3关统计共享单车指定车辆每次使用的空闲平均时间第4关统计指定时间共享单车使用次数第5关统计共享单车线路流量 第1关统计共享单车每天的平均使用时间 任务描述 本关任务使用Hbase的MapReduce对已经存在 Hbase 的共享单车运行数据进行分析统计共享单车每天的平均使用时间其中共享单车运行数据在Hbase的t_shared_bicycle表中表结构可在编程要求中进行查看。 相关知识 为了完成本关任务你需要掌握 如何配置Hbase的MapReduce类 如何使用Hbase的MapReduce进行数据分析。 如何配置Hbase的MapReduce类 MapReduce是运行在Job上的一个并行计算框架分为Map节点和Reduce节点。 Hbase提供了org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil的initTableMapperJob和initTableReducerJob两个方法来完成MapReduce的配置。 initTableMapperJob 方法: /** *在提交TableMap作业之前使用它。 它会适当地设置 * 工作。 * * param table要读取的表名。 * param scan具有列时间范围等的扫描实例。 * param mapper要使用的mapper类。 * param outputKeyClass输出键的类。 * param outputValueClass输出值的类。 * param job当前要调整的工作。 确保传递的作业是 *携带所有必要的HBase配置。 * throws IOException设置细节失败。 */ public static void initTableMapperJob(String table, Scan scan, Class? extends TableMapper mapper, Class? outputKeyClass, Class? outputValueClass, Job job) throws IOException / ** initTableReducerJob 方法: /** *在提交TableReduce作业之前使用它。 它会 *适当设置JobConf。 * * param table输出表。 * param reducer要使用的reducer类。 * param job当前要调整的工作。 * throws IOException确定区域计数失败时。 / public static void initTableReducerJob(String table, Class? extends TableReducer reducer, Job job) throws IOException 如何使用Hbase的MapReduce进行数据分析 下面我们以统计每个城市的酒店个数的例子来介绍MapReduce的Map节点和Reduce节点 Map节点执行类需要继承抽象类TableMapper,实现其map方法结构如下 public static class MyMapper extends TableMapperText, DoubleWritable { Override protected void map(ImmutableBytesWritable rowKey, Result result, Context context) { }} 在 map 方法中可从输入表原数据表得到行数据最后向 Reduce 节点 输出键值对key/value 。 String cityId Bytes.toString(result.getValue(“cityInfo”.getBytes(), “cityId”.getBytes())); DoubleWritable i new DoubleWritable(1); context.write(new Text(cityId),i); 下面介绍Reduce节点Reduce节点执行类需要继承抽象类TableReducer,实现其reduce方法 public static class MyTableReducer extends TableReducerText, DoubleWritable, ImmutableBytesWritable { Override public void reduce(Text key, Iterable values, Context context) { } } 在reduce方法里会接收map 方法里相同key 的集合,最后把结果存到输出到表里。 double sum 0; for (DoubleWritable num:values){ sum num.get(); } Put put new Put(Bytes.toBytes(key.toString())); put.addColumn(“total_infos”.getBytes(),“total”.getBytes(),Bytes.toBytes(String.valueOf(sum))); context.write(null,put);//initTableReducerJob 设置了表名所以在这里无需设置了 编程要求 在右侧代码窗口完成代码编写 MapReduce类已经配置好只需完成MapReduce的数据分析 在map方法中获取输入表t_shared_bicycle的相关信息计算出使用时间结束时间 - 开始时间,并把使用时间和开始时间的日期传给reduce 在reduce方法中通过使用时间和开始时间的日期计算共享单车每天平均使用时间并把每天平均使用时间四舍五入保留两位有效数字存入到列族为info字段为avgTimeROWKEY 为avgTime的表里。 t_shared_bicycle表结构如下 列族名称 字段 对应的文件的描述 ROWKEY 格式为骑行id info beginTime 开始时间 trip_id info endTime 结束时间 trip_id info bicycleId 车辆id trip_id info departure 出发地 trip_id info destination 目的地 trip_id info city 所在城市 trip_id info start_longitude 开始经度 trip_id info stop_longitude 结束经度 trip_id info start_latitude 开始纬度 trip_id info stop_latitude 结束纬度 trip_id 测试说明 平台会对你编写的代码进行测试若是与预期输出相同则算通关。 开始你的任务吧祝你成功 示例代码如下 package com.educoder.bigData.sharedbicycle; import java.io.IOException; import java.text.ParseException; import java.util.Collection; import java.util.Date; import java.util.HashMap; import java.util.Locale; import java.util.Map; import java.util.Scanner; import java.math.RoundingMode; import java.math.BigDecimal; import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.commons.lang3.time.FastDateFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import com.educoder.bigData.util.HBaseUtil; /** 统计共享单车每天的平均使用时间/ public class AveragetTimeMapReduce extends Configured implements Tool {public static final byte[] family info.getBytes();public static class MyMapper extends TableMapperText, BytesWritable {protected void map(ImmutableBytesWritable rowKey, Result result, Context context)throws IOException, InterruptedException {/********** Begin ********/long beginTime Long.parseLong(Bytes.toString(result.getValue(family, beginTime.getBytes())));long endTime Long.parseLong(Bytes.toString(result.getValue(family, endTime.getBytes())));// 转化为yyyy-MM-ddString format DateFormatUtils.format(beginTime, yyyy-MM-dd, Locale.CHINA);// 计算时间long useTime endTime - beginTime;// 拼装value ,包含日期 使用时间BytesWritable bytesWritable new BytesWritable(Bytes.toBytes(format _ useTime));context.write(new Text(avgTime), bytesWritable);/********** End ******/}}public static class MyTableReducer extends TableReducerText, BytesWritable, ImmutableBytesWritable {Overridepublic void reduce(Text key, IterableBytesWritable values, Context context)throws IOException, InterruptedException {/******* Begin ******/double sum 0;int length 0;MapString, Long map new HashMapString, Long();for (BytesWritable price : values) {byte[] copyBytes price.copyBytes();String string Bytes.toString(copyBytes);String[] split string.split(_);if (map.containsKey(split[0])) {Long integer map.get(split[0]) Long.parseLong(split[1]);map.put(split[0], integer);} else {map.put(split[0], Long.parseLong(split[1]));}}// 统计map value平均值CollectionLong values2 map.values();for (Long i : values2) {length;sum i;}BigDecimal decimal new BigDecimal(sum / length /1000);BigDecimal setScale decimal.setScale(2, RoundingMode.HALF_DOWN);Put put new Put(Bytes.toBytes(key.toString()));put.addColumn(family, avgTime.getBytes(), Bytes.toBytes(setScale.toString()));context.write(null, put);// initTableReducerJob 设置了 表名所以在这里无需设置了/******* End *******/}}public int run(String[] args) throws Exception {// 配置JobConfiguration conf HBaseUtil.conf;// Scanner sc new Scanner(System.in);// String arg1 sc.next();// String arg2 sc.next();String arg1 t_shared_bicycle;String arg2 t_bicycle_avgtime;try {HBaseUtil.createTable(arg2, new String[] { info });} catch (Exception e) {// 创建表失败e.printStackTrace();}Job job configureJob(conf, new String[] { arg1, arg2 });return job.waitForCompletion(true) ? 0 : 1;}private Job configureJob(Configuration conf, String[] args) throws IOException {String tablename args[0];String targetTable args[1];Job job new Job(conf, tablename);Scan scan new Scan();scan.setCaching(300);scan.setCacheBlocks(false);// 在mapreduce程序中千万不要设置允许缓存// 初始化Mapreduce程序TableMapReduceUtil.initTableMapperJob(tablename, scan, MyMapper.class, Text.class, BytesWritable.class, job);// 初始化ReduceTableMapReduceUtil.initTableReducerJob(targetTable, // output tableMyTableReducer.class, // reducer classjob);job.setNumReduceTasks(1);return job;} }第2关统计共享单车在指定地点的每天平均次数 任务描述 本关任务使用Hbase的MapReduce对已经存在 Hbase 的共享单车运行数据进行分析统计共享单车每天在指定地点的平均次数其中共享单车运行数据在Hbase的t_shared_bicycle表中表结构可在编程要求中进行查看。 相关知识 为了完成本关任务你需要掌握 如何配置Hbase的MapReduce类 如何使用Hbase的MapReduce进行数据分析 如何使用过滤器过滤读取到的数据。 如何配置Hbase的MapReduce类 MapReduce是运行在Job上的一个并行计算框架分为Map节点和Reduce节点。 Hbase提供了org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil的initTableMapperJob和initTableReducerJob两个方法来完成MapReduce的配置。 initTableMapperJob方法: / *在提交TableMap作业之前使用它。 它会适当地设置 * 工作。 * * param table要读取的表名。 * param scan具有列时间范围等的扫描实例。 * param mapper要使用的mapper类。 * param outputKeyClass输出键的类。 * param outputValueClass输出值的类。 * param job当前要调整的工作。 确保传递的作业是 *携带所有必要的HBase配置。 * throws IOException设置细节失败。 */ public static void initTableMapperJob(String table, Scan scan, Class? extends TableMapper mapper, Class? outputKeyClass, Class? outputValueClass, Job job) throws IOException / ** initTableReducerJob方法: /** *在提交TableReduce作业之前使用它。 它会 *适当设置JobConf。 * * param table输出表。 * param reducer要使用的reducer类。 * param job当前要调整的工作。 * throws IOException确定区域计数失败时。 / public static void initTableReducerJob(String table, Class? extends TableReducer reducer, Job job) throws IOException 如何使用Hbase的MapReduce进行数据分析 下面我们以统计每个城市的酒店个数的例子来介绍MapReduce的Map节点和Reduce节点 Map节点执行类需要继承抽象类TableMapper,实现其map方法结构如下 public static class MyMapper extends TableMapperText, DoubleWritable { Override protected void map(ImmutableBytesWritable rowKey, Result result, Context context) { }} 在 map 方法中可从输入表原数据表得到行数据最后向 Reduce 节点 输出键值对key/value 。 String cityId Bytes.toString(result.getValue(“cityInfo”.getBytes(), “cityId”.getBytes())); DoubleWritable i new DoubleWritable(1); context.write(new Text(cityId),i); 下面介绍Reduce节点Reduce节点执行类需要继承抽象类TableReducer,实现其reduce方法 public static class MyTableReducer extends TableReducerText, DoubleWritable, ImmutableBytesWritable { Override public void reduce(Text key, Iterable values, Context context) { } } 在reduce方法里会 接收map 方法里 相同key 的集合,最后把结果存到输出到表里。 double sum 0; for (DoubleWritable num:values){ sum num.get(); } Put put new Put(Bytes.toBytes(key.toString())); put.addColumn(“total_infos”.getBytes(),“total”.getBytes(),Bytes.toBytes(String.valueOf(sum))); context.write(null,put);//initTableReducerJob 设置了表名所以在这里无需设置了 如何使用过滤器过滤读取到的数据 请查看 HBase高级特性过滤器系列 。 编程要求 在右侧代码窗口完成代码编写 MapReduce类需要进行配置请在configureJob方法里配置表数据过滤器过滤条件为只获取目的地包含韩庄村,出发地为河北省保定市雄县的数据。 在map方法中获取输入表t_shared_bicycle的相关信息通过开始时间获取当天日期并传入到reduce 在reduce方法中通过当天日期计算共享单车每天平均次数并把每天次数四舍五入保留两位有效数字存入到列族为info字段为avgNumROWKEY 为河北省保定市雄县-韩庄村的表里。 t_shared_bicycle表结构如下 列族名称 字段 对应的文件的描述 ROWKEY 格式为骑行id info beginTime 开始时间 trip_id info endTime 结束时间 trip_id info bicycleId 车辆id trip_id info departure 出发地 trip_id info destination 目的地 trip_id info city 所在城市 trip_id info start_longitude 开始经度 trip_id info stop_longitude 结束经度 trip_id info start_latitude 开始纬度 trip_id info stop_latitude 结束纬度 trip_id 测试说明 平台会对你编写的代码进行测试若是与预期输出相同则算通关。 开始你的任务吧祝你成功 示例代码如下 package com.educoder.bigData.sharedbicycle; import java.io.IOException; import java.math.BigDecimal; import java.math.RoundingMode; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Locale; import java.util.Map; import java.util.Scanner; import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.filter.SubstringComparator; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import com.educoder.bigData.util.HBaseUtil; /** 共享单车每天在韩庄村的平均空闲时间/ public class AverageVehicleMapReduce extends Configured implements Tool {public static final byte[] family info.getBytes();public static class MyMapper extends TableMapperText, BytesWritable {protected void map(ImmutableBytesWritable rowKey, Result result, Context context)throws IOException, InterruptedException {/********** Begin ********/// 时间String beginTime Bytes.toString(result.getValue(family, beginTime.getBytes()));// 转化为yyyy-MM-ddString format DateFormatUtils.format(Long.parseLong(beginTime), yyyy-MM-dd, Locale.CHINA);BytesWritable bytesWritable new BytesWritable(Bytes.toBytes(format));context.write(new Text(河北省保定市雄县-韩庄村), bytesWritable);/********** End ******/}}public static class MyTableReducer extends TableReducerText, BytesWritable, ImmutableBytesWritable {Overridepublic void reduce(Text key, IterableBytesWritable values, Context context)throws IOException, InterruptedException {/******* Begin ******/double sum 0;int length 0;MapString, Integer map new HashMapString, Integer();for (BytesWritable price : values) {byte[] copyBytes price.copyBytes();String string Bytes.toString(copyBytes);if (map.containsKey(string)) {Integer integer map.get(string) 1;map.put(string, integer);} else {map.put(string, new Integer(1));}}// 统计map value平均值CollectionInteger values2 map.values();for (Integer i : values2) {length;sum i;}BigDecimal decimal new BigDecimal(sum / length);BigDecimal setScale decimal.setScale(2, RoundingMode.HALF_DOWN);Put put new Put(Bytes.toBytes(key.toString()));put.addColumn(family, avgNum.getBytes(), Bytes.toBytes(setScale.toString()));context.write(null, put);// initTableReducerJob 设置了 表名所以在这里无需设置了/******* End ******/}}public int run(String[] args) throws Exception {// 配置JobConfiguration conf HBaseUtil.conf;//Scanner sc new Scanner(System.in);//String arg1 sc.next();//String arg2 sc.next();String arg1 t_shared_bicycle;String arg2 t_bicycle_avgnum;try {HBaseUtil.createTable(arg2, new String[] { info });} catch (Exception e) {// 创建表失败e.printStackTrace();}Job job configureJob(conf, new String[] { arg1, arg2 });return job.waitForCompletion(true) ? 0 : 1;}private Job configureJob(Configuration conf, String[] args) throws IOException {String tablename args[0];String targetTable args[1];Job job new Job(conf, tablename);Scan scan new Scan();scan.setCaching(300);scan.setCacheBlocks(false);// 在mapreduce程序中千万不要设置允许缓存/******* Begin ******///设置过滤ArrayListFilter listForFilters new ArrayListFilter();Filter destinationFilter new SingleColumnValueFilter(Bytes.toBytes(info), Bytes.toBytes(destination),CompareOperator.EQUAL, new SubstringComparator(韩庄村));Filter departure new SingleColumnValueFilter(Bytes.toBytes(info), Bytes.toBytes(departure),CompareOperator.EQUAL, Bytes.toBytes(河北省保定市雄县));listForFilters.add(departure);listForFilters.add(destinationFilter);scan.setCaching(300);scan.setCacheBlocks(false);// 在mapreduce程序中千万不要设置允许缓存Filter filters new FilterList(listForFilters);scan.setFilter(filters);/******* End *******/// 初始化Mapreduce程序TableMapReduceUtil.initTableMapperJob(tablename, scan, MyMapper.class, Text.class, BytesWritable.class, job);// 初始化ReduceTableMapReduceUtil.initTableReducerJob(targetTable, // output tableMyTableReducer.class, // reducer classjob);job.setNumReduceTasks(1);return job;} }第3关统计共享单车指定车辆每次使用的空闲平均时间 任务描述 本关任务使用Hbase的MapReduce对已经存在Hbase 的共享单车运行数据进行分析统计共享单车指定车辆每次使用的平均空闲时间其中共享单车运行数据在Hbase的t_shared_bicycle表中表结构可在编程要求中进行查看。 相关知识 为了完成本关任务你需要掌握 如何配置Hbase的MapReduce类 如何使用Hbase的MapReduce进行数据分析。 如何配置Hbase的MapReduce类 MapReduce是运行在Job上的一个并行计算框架分为Map节点和Reduce节点。 Hbase提供了org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil的initTableMapperJob和initTableReducerJob两个方法来完成MapReduce的配置。 initTableMapperJob 方法: / *在提交TableMap作业之前使用它。 它会适当地设置 * 工作。 * * param table要读取的表名。 * param scan具有列时间范围等的扫描实例。 * param mapper要使用的mapper类。 * param outputKeyClass输出键的类。 * param outputValueClass输出值的类。 * param job当前要调整的工作。 确保传递的作业是 *携带所有必要的HBase配置。 * throws IOException设置细节失败。 */ public static void initTableMapperJob(String table, Scan scan, Class? extends TableMapper mapper, Class? outputKeyClass, Class? outputValueClass, Job job) throws IOException / ** initTableReducerJob方法: /** *在提交TableReduce作业之前使用它。 它会 *适当设置JobConf。 * * param table输出表。 * param reducer要使用的reducer类。 * param job当前要调整的工作。 * throws IOException确定区域计数失败时。 / public static void initTableReducerJob(String table, Class? extends TableReducer reducer, Job job) throws IOException 如何使用Hbase的MapReduce进行数据分析 下面我们以统计每个城市的酒店个数的例子来介绍MapReduce的Map节点和Reduce节点 Map节点执行类需要继承抽象类TableMapper,实现其map方法结构如下 public static class MyMapper extends TableMapperText, DoubleWritable { Override protected void map(ImmutableBytesWritable rowKey, Result result, Context context) { }} 在 map 方法中可从输入表原数据表得到行数据最后向 Reduce 节点 输出键值对key/value 。 String cityId Bytes.toString(result.getValue(“cityInfo”.getBytes(), “cityId”.getBytes())); DoubleWritable i new DoubleWritable(1); context.write(new Text(cityId),i); 下面介绍Reduce节点Reduce节点执行类需要继承抽象类TableReducer,实现其reduce方法 public static class MyTableReducer extends TableReducerText, DoubleWritable, ImmutableBytesWritable { Override public void reduce(Text key, Iterable values, Context context) { } } 在reduce方法里会接收map 方法里 相同key 的集合,最后把结果存到输出到表里。 double sum 0; for (DoubleWritable num:values){ sum num.get(); } Put put new Put(Bytes.toBytes(key.toString())); put.addColumn(“total_infos”.getBytes(),“total”.getBytes(),Bytes.toBytes(String.valueOf(sum))); context.write(null,put);//initTableReducerJob 设置了表名所以在这里无需设置了 编程要求 在右侧代码窗口完成代码编写 MapReduce类需要进行配置请在configureJob方法里配置表数据过滤器过滤条件为只获取车辆id为5996的数据。 在map方法中获取输入表t_shared_bicycle的相关信息获取开始时间和结束时间,并传入到reduce 在reduce方法中通过开始时间和结束时间计算每次使用的平均空闲时间并把空闲时间单位转化为小时四舍五入保留两位有效数字存入到列族为info字段为freeTimeROWKEY 为5996的表里。 t_shared_bicycle表结构如下 列族名称 字段 对应的文件的描述 ROWKEY 格式为骑行id info beginTime 开始时间 trip_id info endTime 结束时间 trip_id info bicycleId 车辆id trip_id info departure 出发地 trip_id info destination 目的地 trip_id info city 所在城市 trip_id info start_longitude 开始经度 trip_id info stop_longitude 结束经度 trip_id info start_latitude 开始纬度 trip_id info stop_latitude 结束纬度 trip_id 测试说明 平台会对你编写的代码进行测试若是与预期输出相同则算通关。 开始你的任务吧祝你成功 示例代码如下 package com.educoder.bigData.sharedbicycle; import java.io.IOException; import java.math.BigDecimal; import java.math.RoundingMode; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import com.educoder.bigData.util.HBaseUtil; /** * 统计5996共享单车每次使用的空闲平均时间*/ public class FreeTimeMapReduce extends Configured implements Tool {public static final byte[] family info.getBytes();public static class MyMapper extends TableMapperText, BytesWritable {protected void map(ImmutableBytesWritable rowKey, Result result, Context context)throws IOException, InterruptedException {// 时间long beginTime Long.parseLong(Bytes.toString(result.getValue(family, beginTime.getBytes())));long endTime Long.parseLong(Bytes.toString(result.getValue(family, endTime.getBytes())));// 拼装value ,包含开始时间和结束时间BytesWritable bytesWritable new BytesWritable(Bytes.toBytes(beginTime _ endTime));context.write(new Text(5996), bytesWritable);}}public static class MyTableReducer extends TableReducerText, BytesWritable, ImmutableBytesWritable {Overridepublic void reduce(Text key, IterableBytesWritable values, Context context)throws IOException, InterruptedException {long freeTime 0;long beginTime 0;int length 0;for (BytesWritable time : values) {byte[] copyBytes time.copyBytes();String timeLong Bytes.toString(copyBytes);String[] split timeLong.split(_);if(beginTime 0) {beginTime Long.parseLong(split[0]);continue;}else {//空闲时间 本次开始时间 - 上一次结束时间 freeTime freeTime beginTime - Long.parseLong(split[1]);//重新设置开始时间beginTime Long.parseLong(split[0]);length ;}}Put put new Put(Bytes.toBytes(key.toString()));BigDecimal decimal new BigDecimal(freeTime / length /1000 /60 /60);BigDecimal setScale decimal.setScale(2, RoundingMode.HALF_DOWN);put.addColumn(family, freeTime.getBytes(), Bytes.toBytes(setScale.toString()));context.write(null, put);// initTableReducerJob 设置了 表名所以在这里无需设置了}}public int run(String[] args) throws Exception {// 配置JobConfiguration conf HBaseUtil.conf;// Scanner sc new Scanner(System.in);// String arg1 sc.next();// String arg2 sc.next();String arg1 t_shared_bicycle;String arg2 t_bicycle_freetime;try {HBaseUtil.createTable(arg2, new String[] { info });} catch (Exception e) {// 创建表失败e.printStackTrace();}Job job configureJob(conf, new String[] { arg1, arg2 });return job.waitForCompletion(true) ? 0 : 1;}private Job configureJob(Configuration conf, String[] args) throws IOException {String tablename args[0];String targetTable args[1];Job job new Job(conf, tablename);Scan scan new Scan();scan.setCaching(300);scan.setCacheBlocks(false);// 在mapreduce程序中千万不要设置允许缓存Filter filter new SingleColumnValueFilter(Bytes.toBytes(info), Bytes.toBytes(bicycleId),CompareOperator.EQUAL, Bytes.toBytes(5996));scan.setFilter(filter);// 初始化Mapreduce程序TableMapReduceUtil.initTableMapperJob(tablename, scan, MyMapper.class, Text.class, BytesWritable.class, job);// 初始化ReduceTableMapReduceUtil.initTableReducerJob(targetTable, // output tableMyTableReducer.class, // reducer classjob);job.setNumReduceTasks(1);return job;} }第4关统计指定时间共享单车使用次数 任务描述 本关任务使用Hbase的MapReduce对已经存在Hbase的共享单车运行数据进行分析统计共享单车指定时间的使用次数其中共享单车运行数据在Hbase的t_shared_bicycle表中表结构可在编程要求中进行查看。 相关知识 为了完成本关任务你需要掌握 如何配置Hbase的MapReduce类 如何使用Hbase的MapReduce进行数据分析。 如何配置Hbase的MapReduce类 MapReduce是运行在Job上的一个并行计算框架分为Map节点和Reduce节点。 Hbase提供了org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil的initTableMapperJob和initTableReducerJob两个方法来完成MapReduce的配置。 initTableMapperJob方法: /** *在提交TableMap作业之前使用它。 它会适当地设置 * 工作。 * * param table要读取的表名。 * param scan具有列时间范围等的扫描实例。 * param mapper要使用的mapper类。 * param outputKeyClass输出键的类。 * param outputValueClass输出值的类。 * param job当前要调整的工作。 确保传递的作业是 *携带所有必要的HBase配置。 * throws IOException设置细节失败。 */ public static void initTableMapperJob(String table, Scan scan, Class? extends TableMapper mapper, Class? outputKeyClass, Class? outputValueClass, Job job) throws IOException / ** initTableReducerJob方法: /** *在提交TableReduce作业之前使用它。 它会 *适当设置JobConf。 * * param table输出表。 * param reducer要使用的reducer类。 * param job当前要调整的工作。 * throws IOException确定区域计数失败时。 / public static void initTableReducerJob(String table, Class? extends TableReducer reducer, Job job) throws IOException 如何使用Hbase的MapReduce进行数据分析 下面我们以统计每个城市的酒店个数的例子来介绍MapReduce的Map节点和Reduce节点 Map节点执行类需要继承抽象类TableMapper,实现其map方法结构如下 public static class MyMapper extends TableMapperText, DoubleWritable { Override protected void map(ImmutableBytesWritable rowKey, Result result, Context context) { }} 在 map 方法中可从输入表原数据表得到行数据 最后向 Reduce 节点 输出键值对key/value 。 String cityId Bytes.toString(result.getValue(“cityInfo”.getBytes(), “cityId”.getBytes())); DoubleWritable i new DoubleWritable(1); context.write(new Text(cityId),i); 下面介绍Reduce节点Reduce节点执行类需要继承抽象类TableReducer,实现其reduce方法 public static class MyTableReducer extends TableReducerText, DoubleWritable, ImmutableBytesWritable { Override public void reduce(Text key, Iterable values, Context context) { } } 在reduce方法里会接收map 方法里 相同key 的集合,最后把结果存到输出到表里。 double sum 0; for (DoubleWritable num:values){ sum num.get(); } Put put new Put(Bytes.toBytes(key.toString())); put.addColumn(“total_infos”.getBytes(),“total”.getBytes(),Bytes.toBytes(String.valueOf(sum))); context.write(null,put);//initTableReducerJob 设置了表名所以在这里无需设置了 编程要求 在右侧代码窗口Begin-End处完成代码编写 MapReduce类需要进行配置请在configureJob方法里配置表数据过滤器过滤条件为只获取开始时间大于等于2017-08-01结束时间小于等于2017-09-01的数据。 在map方法中获取输入表t_shared_bicycle的相关信息次数设为1并传入到reduce 在reduce方法中通过次数计算共享单车使用总次数并把总次数存入到列族为info字段为usageRateROWKEY 为departure的表里。 t_shared_bicycle表结构如下 列族名称 字段 对应的文件的描述 ROWKEY 格式为骑行id info beginTime 开始时间 trip_id info endTime 结束时间 trip_id info bicycleId 车辆id trip_id info departure 出发地 trip_id info destination 目的地 trip_id info city 所在城市 trip_id info start_longitude 开始经度 trip_id info stop_longitude 结束经度 trip_id info start_latitude 开始纬度 trip_id info stop_latitude 结束纬度 trip_id beginTime 和 endTime 在 Hbase 中以时间戳的形式存储 测试说明 平台会对你编写的代码进行测试若是与预期输出相同则算通关。 开始你的任务吧祝你成功 示例代码如下 package com.educoder.bigData.sharedbicycle; import java.io.IOException; import java.util.ArrayList; import org.apache.commons.lang3.time.FastDateFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import com.educoder.bigData.util.HBaseUtil; /** 共享单车使用次数统计/ public class UsageRateMapReduce extends Configured implements Tool {public static final byte[] family info.getBytes();public static class MyMapper extends TableMapperText, IntWritable {protected void map(ImmutableBytesWritable rowKey, Result result, Context context)throws IOException, InterruptedException {/********** Begin ********/// 次数IntWritable doubleWritable new IntWritable(1);context.write(new Text(departure), doubleWritable);/********** End ******/}}public static class MyTableReducer extends TableReducerText, IntWritable, ImmutableBytesWritable {Overridepublic void reduce(Text key, IterableIntWritable values, Context context)throws IOException, InterruptedException {/******* Begin ******/ int totalNum 0;for (IntWritable num : values) {int d num.get();totalNum d;}Put put new Put(Bytes.toBytes(key.toString()));put.addColumn(family, usageRate.getBytes(), Bytes.toBytes(String.valueOf(totalNum)));context.write(null, put);// initTableReducerJob 设置了 表名所以在这里无需设置了/******* End ******/}}public int run(String[] args) throws Exception {// 配置JobConfiguration conf HBaseUtil.conf;// Scanner sc new Scanner(System.in);// String arg1 sc.next();// String arg2 sc.next();String arg1 t_shared_bicycle;String arg2 t_bicycle_usagerate;try {HBaseUtil.createTable(arg2, new String[] { info });} catch (Exception e) {// 创建表失败e.printStackTrace();}Job job configureJob(conf, new String[] { arg1, arg2 });return job.waitForCompletion(true) ? 0 : 1;}private Job configureJob(Configuration conf, String[] args) throws IOException {String tablename args[0];String targetTable args[1];Job job new Job(conf, tablename);ArrayListFilter listForFilters new ArrayListFilter();FastDateFormat instance FastDateFormat.getInstance(yyyy-MM-dd);Scan scan new Scan();scan.setCaching(300);scan.setCacheBlocks(false);// 在mapreduce程序中千万不要设置允许缓存/******* Begin ******/try {Filter destinationFilter new SingleColumnValueFilter(Bytes.toBytes(info), Bytes.toBytes(beginTime),CompareOperator.GREATER_OR_EQUAL, Bytes.toBytes(String.valueOf(instance.parse(2017-08-01).getTime())));Filter departure new SingleColumnValueFilter(Bytes.toBytes(info), Bytes.toBytes(endTime),CompareOperator.LESS_OR_EQUAL, Bytes.toBytes(String.valueOf(instance.parse(2017-09-01).getTime())));listForFilters.add(departure);listForFilters.add(destinationFilter);}catch (Exception e) {e.printStackTrace();return null;}Filter filters new FilterList(listForFilters);scan.setFilter(filters);/******* End *******/// 初始化Mapreduce程序TableMapReduceUtil.initTableMapperJob(tablename, scan, MyMapper.class, Text.class, IntWritable.class, job);// 初始化ReduceTableMapReduceUtil.initTableReducerJob(targetTable, // output tableMyTableReducer.class, // reducer classjob);job.setNumReduceTasks(1);return job;} }第5关统计共享单车线路流量 任务描述 本关任务使用Hbase的MapReduce对已经存在Hbase的共享单车运行数据进行分析统计共享单车线路次数其中共享单车运行数据在Hbase的t_shared_bicycle表中表结构可在编程要求中进行查看。 相关知识 为了完成本关任务你需要掌握 如何配置Hbase的MapReduce类 如何使用Hbase的MapReduce进行数据分析。 如何配置Hbase的MapReduce类 MapReduce是运行在Job上的一个并行计算框架分为Map节点和Reduce节点。 Hbase提供了org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil的initTableMapperJob和initTableReducerJob两个方法来完成MapReduce的配置。 initTableMapperJob 方法: / *在提交TableMap作业之前使用它。 它会适当地设置 * 工作。 * * param table要读取的表名。 * param scan具有列时间范围等的扫描实例。 * param mapper要使用的mapper类。 * param outputKeyClass输出键的类。 * param outputValueClass输出值的类。 * param job当前要调整的工作。 确保传递的作业是 *携带所有必要的HBase配置。 * throws IOException设置细节失败。 */ public static void initTableMapperJob(String table, Scan scan, Class? extends TableMapper mapper, Class? outputKeyClass, Class? outputValueClass, Job job) throws IOException / ** initTableReducerJob方法: /** *在提交TableReduce作业之前使用它。 它会 *适当设置JobConf。 * * param table输出表。 * param reducer要使用的reducer类。 * param job当前要调整的工作。 * throws IOException确定区域计数失败时。 / public static void initTableReducerJob(String table, Class? extends TableReducer reducer, Job job) throws IOException 如何使用Hbase的MapReduce进行数据分析 下面我们以统计每个城市的酒店个数的例子来介绍MapReduce的Map节点和Reduce节点 Map节点执行类需要继承抽象类TableMapper,实现其map方法结构如下 public static class MyMapper extends TableMapperText, DoubleWritable { Override protected void map(ImmutableBytesWritable rowKey, Result result, Context context) { }} 在 map 方法中可从输入表原数据表得到行数据 最后向 Reduce 节点 输出键值对key/value 。 String cityId Bytes.toString(result.getValue(“cityInfo”.getBytes(), “cityId”.getBytes())); DoubleWritable i new DoubleWritable(1); context.write(new Text(cityId),i); 下面介绍Reduce节点Reduce节点执行类需要继承抽象类TableReducer,实现其reduce方法 public static class MyTableReducer extends TableReducerText, DoubleWritable, ImmutableBytesWritable { Override public void reduce(Text key, Iterable values, Context context) { } } 在reduce方法里会接收map 方法里 相同key 的集合,最后把结果存到输出到表里。 double sum 0; for (DoubleWritable num:values){ sum num.get(); } Put put new Put(Bytes.toBytes(key.toString())); put.addColumn(“total_infos”.getBytes(),“total”.getBytes(),Bytes.toBytes(String.valueOf(sum))); context.write(null,put);//initTableReducerJob 设置了表名所以在这里无需设置了 编程要求 在右侧代码窗口完成代码编写 MapReduce类已经配置好不需要再进行配置 在map方法中获取输入表t_shared_bicycle的相关信息设置1为线路次数把开始经度、结束经度、开始维度、结束维度、出发地、目的地、线路次数传入到reduce 在reduce方法中通过线路次数计算共享单车每个路线的使用次数存入到列族为info字段为lineTotal的表里ROWKEY 格式为开始经度-结束经度_开始维度-结束维度_出发地—目的地。 平台会输出前五的单车线路流量进行评测 t_shared_bicycle表结构如下 列族名称 字段 对应的文件的描述 ROWKEY 格式为骑行id info beginTime 开始时间 trip_id info endTime 结束时间 trip_id info bicycleId 车辆id trip_id info departure 出发地 trip_id info destination 目的地 trip_id info city 所在城市 trip_id info start_longitude 开始经度 trip_id info stop_longitude 结束经度 trip_id info start_latitude 开始纬度 trip_id info stop_latitude 结束纬度 trip_id 测试说明 平台会对你编写的代码进行测试若是与预期输出相同则算通关。 开始你的任务吧祝你成功 示例代码如下 package com.educoder.bigData.sharedbicycle; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import com.educoder.bigData.util.HBaseUtil; /** 共享单车线路流量统计/ public class LineTotalMapReduce extends Configured implements Tool {public static final byte[] family info.getBytes();public static class MyMapper extends TableMapperText, IntWritable {protected void map(ImmutableBytesWritable rowKey, Result result, Context context)throws IOException, InterruptedException {/********** Begin ********/ // 开始经纬度String start_latitude Bytes.toString(result.getValue(family, start_latitude.getBytes()));String start_longitude Bytes.toString(result.getValue(family, start_longitude.getBytes()));// 结束经纬度String stop_latitude Bytes.toString(result.getValue(family, stop_latitude.getBytes()));String stop_longitude Bytes.toString(result.getValue(family, stop_longitude.getBytes()));// 出发地String departure Bytes.toString(result.getValue(family, departure.getBytes()));// 目的地String destination Bytes.toString(result.getValue(family, destination.getBytes()));// 拼装value IntWritable doubleWritable new IntWritable(1);context.write(new Text(start_latitude - start_longitude _ stop_latitude - stop_longitude _ departure - destination), doubleWritable);/********** End ******/ }}public static class MyTableReducer extends TableReducerText, IntWritable, ImmutableBytesWritable {Overridepublic void reduce(Text key, IterableIntWritable values, Context context)throws IOException, InterruptedException {/******* Begin ******/ int totalNum 0;for (IntWritable num : values) {int d num.get();totalNum d;}Put put new Put(Bytes.toBytes(key.toString() totalNum ));put.addColumn(family, lineTotal.getBytes(), Bytes.toBytes(String.valueOf(totalNum)));context.write(null, put);// initTableReducerJob 设置了 表名所以在这里无需设置了/******* End *********/}}public int run(String[] args) throws Exception {// 配置JobConfiguration conf HBaseUtil.conf;// Scanner sc new Scanner(System.in);// String arg1 sc.next();// String arg2 sc.next();String arg1 t_shared_bicycle;String arg2 t_bicycle_linetotal;try {HBaseUtil.createTable(arg2, new String[] { info });} catch (Exception e) {// 创建表失败e.printStackTrace();}Job job configureJob(conf, new String[] { arg1, arg2 });return job.waitForCompletion(true) ? 0 : 1;}private Job configureJob(Configuration conf, String[] args) throws IOException {String tablename args[0];String targetTable args[1];Job job new Job(conf, tablename);Scan scan new Scan();scan.setCaching(300);scan.setCacheBlocks(false);// 在mapreduce程序中千万不要设置允许缓存// 初始化Mapreduce程序TableMapReduceUtil.initTableMapperJob(tablename, scan, MyMapper.class, Text.class, IntWritable.class, job);// 初始化ReduceTableMapReduceUtil.initTableReducerJob(targetTable, // output tableMyTableReducer.class, // reducer classjob);job.setNumReduceTasks(1);return job;} }
- 上一篇: 重庆万州网站建设天津建设网首页
- 下一篇: 重庆网站seo什么意思做软件开发
相关文章
-
重庆万州网站建设天津建设网首页
重庆万州网站建设天津建设网首页
- 技术栈
- 2026年03月21日
-
重庆外贸网站建设公司cpanel面板 WordPress
重庆外贸网站建设公司cpanel面板 WordPress
- 技术栈
- 2026年03月21日
-
重庆所有做网站的公司排名大兴网站建设推广
重庆所有做网站的公司排名大兴网站建设推广
- 技术栈
- 2026年03月21日
-
重庆网站seo什么意思做软件开发
重庆网站seo什么意思做软件开发
- 技术栈
- 2026年03月21日
-
重庆网站备案大厅做国际网站找阿里
重庆网站备案大厅做国际网站找阿里
- 技术栈
- 2026年03月21日
-
重庆网站建设 九度互联女生做网站编辑好吗
重庆网站建设 九度互联女生做网站编辑好吗
- 技术栈
- 2026年03月21日
