网站建设下拉导航栏医院网站建设怎么样

当前位置: 首页 > news >正文

网站建设下拉导航栏,医院网站建设怎么样,求职网站,培训学校加盟费用Storm 集成 Redis 详解 一、简介二、集成案例三、storm-redis 实现原理四、自定义RedisBolt实现词频统计一、简介 Storm-Redis 提供了 Storm 与 Redis 的集成支持#xff0c;你只需要引入对应的依赖即可使用#xff1a; dependencygroupIdorg.apache.storm…Storm 集成 Redis 详解 一、简介 二、集成案例 三、storm-redis 实现原理 四、自定义RedisBolt实现词频统计 一、简介 Storm-Redis 提供了 Storm 与 Redis 的集成支持你只需要引入对应的依赖即可使用 dependencygroupIdorg.apache.storm/groupIdartifactIdstorm-redis/artifactIdversion\({storm.version}/versiontypejar/type /dependency Storm-Redis 使用 Jedis 为 Redis 客户端并提供了如下三个基本的 Bolt 实现 RedisLookupBolt从 Redis 中查询数据RedisStoreBolt存储数据到 RedisRedisFilterBolt : 查询符合条件的数据 RedisLookupBolt、RedisStoreBolt、RedisFilterBolt 均继承自 AbstractRedisBolt 抽象类。我们可以通过继承该抽象类实现自定义 RedisBolt进行功能的拓展。 二、集成案例 2.1 项目结构 这里首先给出一个集成案例进行词频统计并将最后的结果存储到 Redis。项目结构如下 用例源码下载地址storm-redis-integration 2.2 项目依赖 项目主要依赖如下 propertiesstorm.version1.2.2/storm.version /propertiesdependenciesdependencygroupIdorg.apache.storm/groupIdartifactIdstorm-core/artifactIdversion\){storm.version}/version/dependencydependencygroupIdorg.apache.storm/groupIdartifactIdstorm-redis/artifactIdversion${storm.version}/version/dependency /dependencies2.3 DataSourceSpout /*** 产生词频样本的数据源/ public class DataSourceSpout extends BaseRichSpout {private ListString list Arrays.asList(Spark, Hadoop, HBase, Storm, Flink, Hive);private SpoutOutputCollector spoutOutputCollector;Overridepublic void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {this.spoutOutputCollector spoutOutputCollector;}Overridepublic void nextTuple() {// 模拟产生数据String lineData productData();spoutOutputCollector.emit(new Values(lineData));Utils.sleep(1000);}Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields(line));}/** 模拟数据/private String productData() {Collections.shuffle(list);Random random new Random();int endIndex random.nextInt(list.size()) % (list.size()) 1;return StringUtils.join(list.toArray(), \t, 0, endIndex);}}产生的模拟数据格式如下 Spark HBase Hive Flink Storm Hadoop HBase Spark Flink HBase Storm HBase Hadoop Hive Flink HBase Flink Hive Storm Hive Flink Hadoop HBase Hive Hadoop Spark HBase Storm2.4 SplitBolt /** 将每行数据按照指定分隔符进行拆分/ public class SplitBolt extends BaseRichBolt {private OutputCollector collector;Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector collector;}Overridepublic void execute(Tuple input) {String line input.getStringByField(line);String[] words line.split(\t);for (String word : words) {collector.emit(new Values(word, String.valueOf(1)));}}Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields(word, count));} }2.5 CountBolt /** 进行词频统计/ public class CountBolt extends BaseRichBolt {private MapString, Integer counts new HashMap();private OutputCollector collector;Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collectorcollector;}Overridepublic void execute(Tuple input) {String word input.getStringByField(word);Integer count counts.get(word);if (count null) {count 0;}count;counts.put(word, count);// 输出collector.emit(new Values(word, String.valueOf(count)));}Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields(word, count));} }2.6 WordCountStoreMapper 实现 RedisStoreMapper 接口定义 tuple 与 Redis 中数据的映射关系即需要指定 tuple 中的哪个字段为 key哪个字段为 value并且存储到 Redis 的何种数据结构中。 /** 定义 tuple 与 Redis 中数据的映射关系/ public class WordCountStoreMapper implements RedisStoreMapper {private RedisDataTypeDescription description;private final String hashKey wordCount;public WordCountStoreMapper() {description new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH, hashKey);}Overridepublic RedisDataTypeDescription getDataTypeDescription() {return description;}Overridepublic String getKeyFromTuple(ITuple tuple) {return tuple.getStringByField(word);}Overridepublic String getValueFromTuple(ITuple tuple) {return tuple.getStringByField(count);} }2.7 WordCountToRedisApp /** 进行词频统计 并将统计结果存储到 Redis 中*/ public class WordCountToRedisApp {private static final String DATA_SOURCE_SPOUT dataSourceSpout;private static final String SPLIT_BOLT splitBolt;private static final String COUNT_BOLT countBolt;private static final String STORE_BOLT storeBolt;//在实际开发中这些参数可以将通过外部传入 使得程序更加灵活private static final String REDIS_HOST 192.168.200.226;private static final int REDIS_PORT 6379;public static void main(String[] args) {TopologyBuilder builder new TopologyBuilder();builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout());// splitbuilder.setBolt(SPLIT_BOLT, new SplitBolt()).shuffleGrouping(DATA_SOURCE_SPOUT);// countbuilder.setBolt(COUNT_BOLT, new CountBolt()).shuffleGrouping(SPLIT_BOLT);// save to redisJedisPoolConfig poolConfig new JedisPoolConfig.Builder().setHost(REDIS_HOST).setPort(REDIS_PORT).build();RedisStoreMapper storeMapper new WordCountStoreMapper();RedisStoreBolt storeBolt new RedisStoreBolt(poolConfig, storeMapper);builder.setBolt(STORE_BOLT, storeBolt).shuffleGrouping(COUNT_BOLT);// 如果外部传参 cluster 则代表线上环境启动否则代表本地启动if (args.length 0 args[0].equals(cluster)) {try {StormSubmitter.submitTopology(ClusterWordCountToRedisApp, new Config(), builder.createTopology());} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {e.printStackTrace();}} else {LocalCluster cluster new LocalCluster();cluster.submitTopology(LocalWordCountToRedisApp,new Config(), builder.createTopology());}} }2.8 启动测试 可以用直接使用本地模式运行也可以打包后提交到服务器集群运行。本仓库提供的源码默认采用 maven-shade-plugin 进行打包打包命令如下

mvn clean package -D maven.test.skiptrue启动后查看 Redis 中的数据 三、storm-redis 实现原理

3.1 AbstractRedisBolt RedisLookupBolt、RedisStoreBolt、RedisFilterBolt 均继承自 AbstractRedisBolt 抽象类和我们自定义实现 Bolt 一样AbstractRedisBolt 间接继承自 BaseRichBolt。 AbstractRedisBolt 中比较重要的是 prepare 方法在该方法中通过外部传入的 jedis 连接池配置 ( jedisPoolConfig/jedisClusterConfig) 创建用于管理 Jedis 实例的容器 JedisCommandsInstanceContainer。 public abstract class AbstractRedisBolt extends BaseTickTupleAwareRichBolt {protected OutputCollector collector;private transient JedisCommandsInstanceContainer container;private JedisPoolConfig jedisPoolConfig;private JedisClusterConfig jedisClusterConfig;……Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {// FIXME: stores map (stormConf), topologyContext and expose these to derived classesthis.collector collector;if (jedisPoolConfig ! null) {this.container JedisCommandsContainerBuilder.build(jedisPoolConfig);} else if (jedisClusterConfig ! null) {this.container JedisCommandsContainerBuilder.build(jedisClusterConfig);} else {throw new IllegalArgumentException(Jedis configuration not found);}}……. }JedisCommandsInstanceContainer 的 build() 方法如下实际上就是创建 JedisPool 或 JedisCluster 并传入容器中。 public static JedisCommandsInstanceContainer build(JedisPoolConfig config) {JedisPool jedisPool new JedisPool(DEFAULT_POOL_CONFIG, config.getHost(), config.getPort(), config.getTimeout(), config.getPassword(), config.getDatabase());return new JedisContainer(jedisPool);}public static JedisCommandsInstanceContainer build(JedisClusterConfig config) {JedisCluster jedisCluster new JedisCluster(config.getNodes(), config.getTimeout(), config.getTimeout(), config.getMaxRedirections(), config.getPassword(), DEFAULT_POOL_CONFIG);return new JedisClusterContainer(jedisCluster);}3.2 RedisStoreBolt和RedisLookupBolt RedisStoreBolt 中比较重要的是 process 方法该方法主要从 storeMapper 中获取传入 key/value 的值并按照其存储类型 dataType 调用 jedisCommand 的对应方法进行存储。 RedisLookupBolt 的实现基本类似从 lookupMapper 中获取传入的 key 值并进行查询操作。 public class RedisStoreBolt extends AbstractRedisBolt {private final RedisStoreMapper storeMapper;private final RedisDataTypeDescription.RedisDataType dataType;private final String additionalKey;public RedisStoreBolt(JedisPoolConfig config, RedisStoreMapper storeMapper) {super(config);this.storeMapper storeMapper;RedisDataTypeDescription dataTypeDescription storeMapper.getDataTypeDescription();this.dataType dataTypeDescription.getDataType();this.additionalKey dataTypeDescription.getAdditionalKey();}public RedisStoreBolt(JedisClusterConfig config, RedisStoreMapper storeMapper) {super(config);this.storeMapper storeMapper;RedisDataTypeDescription dataTypeDescription storeMapper.getDataTypeDescription();this.dataType dataTypeDescription.getDataType();this.additionalKey dataTypeDescription.getAdditionalKey();}Overridepublic void process(Tuple input) {String key storeMapper.getKeyFromTuple(input);String value storeMapper.getValueFromTuple(input);JedisCommands jedisCommand null;try {jedisCommand getInstance();switch (dataType) {case STRING:jedisCommand.set(key, value);break;case LIST:jedisCommand.rpush(key, value);break;case HASH:jedisCommand.hset(additionalKey, key, value);break;case SET:jedisCommand.sadd(key, value);break;case SORTED_SET:jedisCommand.zadd(additionalKey, Double.valueOf(value), key);break;case HYPER_LOG_LOG:jedisCommand.pfadd(key, value);break;case GEO:String[] array value.split(:);if (array.length ! 2) {throw new IllegalArgumentException(value structure should be longitude:latitude);}double longitude Double.valueOf(array[0]);double latitude Double.valueOf(array[1]);jedisCommand.geoadd(additionalKey, longitude, latitude, key);break;default:throw new IllegalArgumentException(Cannot process such data type: dataType);}collector.ack(input);} catch (Exception e) {this.collector.reportError(e);this.collector.fail(input);} finally {returnInstance(jedisCommand);}}……… } 3.3 JedisCommands JedisCommands 接口中定义了所有的 Redis 客户端命令它有以下三个实现类分别是 Jedis、JedisCluster、ShardedJedis。Strom 中主要使用前两种实现类具体调用哪一个实现类来执行命令由传入的是 jedisPoolConfig 还是 jedisClusterConfig 来决定。 3.4 RedisMapper 和 TupleMapper RedisMapper 和 TupleMapper 定义了 tuple 和 Redis 中的数据如何进行映射转换。 1. TupleMapper TupleMapper 主要定义了两个方法 getKeyFromTuple(ITuple tuple) 从 tuple 中获取那个字段作为 Key getValueFromTuple(ITuple tuple)从 tuple 中获取那个字段作为 Value

  1. RedisMapper 定义了获取数据类型的方法 getDataTypeDescription(),RedisDataTypeDescription 中 RedisDataType 枚举类定义了所有可用的 Redis 数据类型 public class RedisDataTypeDescription implements Serializable { public enum RedisDataType { STRING, HASH, LIST, SET, SORTED_SET, HYPER_LOG_LOG, GEO }……}3. RedisStoreMapper RedisStoreMapper 继承 TupleMapper 和 RedisMapper 接口用于数据存储时没有定义额外方法。
  2. RedisLookupMapper RedisLookupMapper 继承 TupleMapper 和 RedisMapper 接口 定义了 declareOutputFields 方法声明输出的字段。定义了 toTuple 方法将查询结果组装为 Storm 的 Values 的集合并用于发送。 下面的例子表示从输入 Tuple 的获取 word 字段作为 key使用 RedisLookupBolt 进行查询后将 key 和查询结果 value 组装为 values 并发送到下一个处理单元。 class WordCountRedisLookupMapper implements RedisLookupMapper {private RedisDataTypeDescription description;private final String hashKey wordCount;public WordCountRedisLookupMapper() {description new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH, hashKey);}Overridepublic ListValues toTuple(ITuple input, Object value) {String member getKeyFromTuple(input);ListValues values Lists.newArrayList();values.add(new Values(member, value));return values;}Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields(wordName, count));}Overridepublic RedisDataTypeDescription getDataTypeDescription() {return description;}Overridepublic String getKeyFromTuple(ITuple tuple) {return tuple.getStringByField(word);}Overridepublic String getValueFromTuple(ITuple tuple) {return null;} } 5. RedisFilterMapper RedisFilterMapper 继承 TupleMapper 和 RedisMapper 接口用于查询数据时定义了 declareOutputFields 方法声明输出的字段。如下面的实现 Override public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields(wordName, count)); } 四、自定义RedisBolt实现词频统计 4.1 实现原理 自定义 RedisBolt主要利用 Redis 中哈希结构的 hincrby key field 命令进行词频统计。在 Redis 中 hincrby 的执行效果如下。hincrby 可以将字段按照指定的值进行递增如果该字段不存在的话还会新建该字段并赋值为 0。通过这个命令可以非常轻松的实现词频统计功能。 redis HSET myhash field 5 (integer) 1 redis HINCRBY myhash field 1 (integer) 6 redis HINCRBY myhash field -1 (integer) 5 redis HINCRBY myhash field -10 (integer) -5 redis 4.2 项目结构 4.3 自定义RedisBolt的代码实现 /*** 自定义 RedisBolt 利用 Redis 的哈希数据结构的 hincrby key field 命令进行词频统计/ public class RedisCountStoreBolt extends AbstractRedisBolt {private final RedisStoreMapper storeMapper;private final RedisDataTypeDescription.RedisDataType dataType;private final String additionalKey;public RedisCountStoreBolt(JedisPoolConfig config, RedisStoreMapper storeMapper) {super(config);this.storeMapper storeMapper;RedisDataTypeDescription dataTypeDescription storeMapper.getDataTypeDescription();this.dataType dataTypeDescription.getDataType();this.additionalKey dataTypeDescription.getAdditionalKey();}Overrideprotected void process(Tuple tuple) {String key storeMapper.getKeyFromTuple(tuple);String value storeMapper.getValueFromTuple(tuple);JedisCommands jedisCommand null;try {jedisCommand getInstance();if (dataType RedisDataTypeDescription.RedisDataType.HASH) {jedisCommand.hincrBy(additionalKey, key, Long.valueOf(value));} else {throw new IllegalArgumentException(Cannot process such data type for Count: dataType);}collector.ack(tuple);} catch (Exception e) {this.collector.reportError(e);this.collector.fail(tuple);} finally {returnInstance(jedisCommand);}}Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {} }4.4 CustomRedisCountApp /** 利用自定义的 RedisBolt 实现词频统计*/ public class CustomRedisCountApp {private static final String DATA_SOURCE_SPOUT dataSourceSpout;private static final String SPLIT_BOLT splitBolt;private static final String STORE_BOLT storeBolt;private static final String REDIS_HOST 192.168.200.226;private static final int REDIS_PORT 6379;public static void main(String[] args) {TopologyBuilder builder new TopologyBuilder();builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout());// splitbuilder.setBolt(SPLIT_BOLT, new SplitBolt()).shuffleGrouping(DATA_SOURCE_SPOUT);// save to redis and countJedisPoolConfig poolConfig new JedisPoolConfig.Builder().setHost(REDIS_HOST).setPort(REDIS_PORT).build();RedisStoreMapper storeMapper new WordCountStoreMapper();RedisCountStoreBolt countStoreBolt new RedisCountStoreBolt(poolConfig, storeMapper);builder.setBolt(STORE_BOLT, countStoreBolt).shuffleGrouping(SPLIT_BOLT);// 如果外部传参 cluster 则代表线上环境启动,否则代表本地启动if (args.length 0 args[0].equals(cluster)) {try {StormSubmitter.submitTopology(ClusterCustomRedisCountApp, new Config(), builder.createTopology());} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {e.printStackTrace();}} else {LocalCluster cluster new LocalCluster();cluster.submitTopology(LocalCustomRedisCountApp,new Config(), builder.createTopology());}} }参考资料 Storm Redis Integration