广州我要做网站伊犁州住房城乡建设局网站
- 作者: 五速梦信息网
- 时间: 2026年04月20日 11:04
当前位置: 首页 > news >正文
广州我要做网站,伊犁州住房城乡建设局网站,湖南建设人才网官网,计算机哪个专业最吃香而且最简单文章目录 一、生产者1.引入库2.配置文件3.配置类PublicConfig.javaMessageProducer.java 4.业务处理类 三、消费者1.引入库2.配置类PublicConfig.javaMessageConsumer.java 3.业务类 一、生产者 1.引入库 引入需要依赖的jar包#xff0c;引入POM文件#xff1a; depend… 文章目录 一、生产者1.引入库2.配置文件3.配置类PublicConfig.javaMessageProducer.java 4.业务处理类 三、消费者1.引入库2.配置类PublicConfig.javaMessageConsumer.java 3.业务类 一、生产者 1.引入库 引入需要依赖的jar包引入POM文件 dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactId /dependency2.配置文件 配置Kafka的相关参数或者你项目的cacos或者yaml文件里添加 以下是一个示例配置application.properties ccm.kafka.servers:192.168.1.95:9092,192.168.1.96:9092,192.168.1.97:9092 ccm.kafka.topics.xxx:xxx_content_devTip:建议topic命名规则租户简称项目关键词系统环境的方式更容易区分 3.配置类 PublicConfig.java Data Configuration ConfigurationProperties(prefix ccm.kafka) //配置信息nacos中配置 public class PublicConfig {private String servers;private String alertTopic;} MessageProducer.java Slf4j Component public class MessageProducer {private Producer producerKafka;AutowiredPublicConfig publicConfig;/*** 初始化方法*/PostConstructpublic String init() {Properties props new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, publicConfig.getServers());props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, PLAINTEXT);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, String.valueOf(30 * 1000));props.put(ProducerConfig.ACKS_CONFIG, all);producerKafka new KafkaProducer(props);log.info(kafka message channel created successfully);return OK;}public ResponseData send(String content, String topic) {long startTime System.currentTimeMillis();try {String key UUID.randomUUID().toString().replace(-, );ProducerRecordString, String kafkaMessage new ProducerRecord(topic, key, content);log.info(MessageProducer send key {},message{}, key, content);FutureRecordMetadata send producerKafka.send(kafkaMessage);send.get();log.info(MessageProducer send cost time:{}, System.currentTimeMillis() - startTime);} catch (Exception e) {log.error(MessageProducer Failed to push message:{}, e.getMessage());return ResponseData.errorWithMsg(MessageProducer Failed to push message: e.getMessage());}return null;}} 4.业务处理类 示例代码的业务场景定时生成预警消息发送给下游系统调用。 //启动类注意增加定时注解的支持 SpringBootApplication MapperScan(basePackages {com.xx.xx.mapper,com.xx.xx.crawler.mapper}) EnableScheduling public class CATApp {public static void main(String[] args) {SpringApplication.run(CATApp.class,args);}}Service Slf4j public class CrawlerService {Scheduled(cron \({crawler.scheduled.cron:0 */1 * * * ?}) // 每5分钟执行一次// Scheduled(cron \){crawler.scheduled.cron:0 0 0/1 * * ?}) // 每小时执行一次public void crawlAndSaveAlertInfos() {log.info( crawlAndSaveAlertInfos );//替换成具体的业务场景 ListAlertInfo alertInfos fetchAlertInfoList();if (!alertInfos.isEmpty()) {for (AlertInfo alertInfo : alertInfos) {//发送预警信息到kafka供下游调用crawlerAlertSyncService.sendCrawlerAlertMsgKafka(alertInfo);}}} /**** 预警消息通过Kafka异步同步其他应用/ public interface CrawlerAlertSyncService {void sendCrawlerAlertMsgKafka(AlertInfo alertInfo) ;}Slf4j Service public class CrawlerAlertSyncServiceImpl implements CrawlerAlertSyncService {Autowiredprivate MessageProducer messageProducer;Resourceprivate PublicConfig publicConfig;Overridepublic void sendCrawlerAlertMsgKafka(AlertInfo alertInfo) {String topic publicConfig.getAlertTopic();String servers publicConfig.getServers();log.info(send publish msg to kafka ,topic:{},bizId:{}, topic, alertInfo.getAlertid());log.info(send publish msg to kafka ,servers:{}, servers);String content JSON.toJSONString(alertInfo);log.info(send publish msg to kafka ,content:{}, content);if (StringUtils.isNotBlank(topic)) {messageProducer.send(content, topic);}} }三、消费者 1.引入库 在消费者工程pom文件中配置依赖 dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactId /dependency2.配置类 同样根据该项目情况编写配置类示例代码中仍为读取naco配置 PublicConfig.java Data Configuration Slf4j ConfigurationProperties(prefix xman.kafka) public class PublicConfig {private String servers;private MapString,String topics;public String getTopic(String appCode) {if(Objects.isNull(topics) || topics.isEmpty()){return null;}return topics.get(appCode);}private String alertTopic;private String group; }MessageConsumer.java Slf4j Component public abstract class MessageConsumer {// 用于持续监听kafka消息的专用线程池private ExecutorService threadPool;// 用于持续消费kafka消息的专用线程池private ExecutorService consumerThreadPool;Resourceprivate PublicConfig publicConfig;/** 初始化方法/PostConstructpublic String init() {MessageConfigField messageConfig MessageConfigField.builder().servers(publicConfig.getServers()).topic(publicConfig.getAlertTopic()).group(publicConfig.getGroup()).build();if (StringUtils.isBlank(messageConfig.getServers())) {//没有配置kafka信息return OK;}initThreadPool();KafkaConsumerString, String instance kafkaInstance(messageConfig.getServers(),messageConfig.getGroup(), messageConfig.getTopic(), messageConfig.getClientName(),messageConfig.getUsername(), messageConfig.getPassword());startListen(instance);log.info(ccm kafka消息订阅成功:clientId: messageConfig.getClientName());return OK;}private void initThreadPool() {if (null threadPool) {log.info(initThreadPool start);threadPool Executors.newFixedThreadPool(1);log.info(initThreadPool done);}}private void startListen(KafkaConsumerString, String consumer) {threadPool.submit(() - {TenantContext.setContextCode(CommonConstants.TENANT_CODE);while (true) {try {ConsumerRecordsString, String records consumer.poll(Duration.ofSeconds(10));if (records null || records.isEmpty()) {continue;}for (ConsumerRecordString, String record : records) {OptionalString kafkaMessage Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {String msg kafkaMessage.get();if (StringUtils.isNotBlank(msg)) {log.info(msgJson: msg);consumeMsg(msg);}}}} catch (Exception e) {TimeUnit.SECONDS.sleep(1);log.error(consume error, e);}}});}public static KafkaConsumerString, String kafkaInstance(String servers, String group,String topic, String clientId, String username, String password) {Properties props new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);if (StringUtils.isNotBlank(group)) {props.put(ConsumerConfig.GROUP_ID_CONFIG, group);}props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringDeserializer);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringDeserializer);KafkaConsumerString, String consumer new KafkaConsumer(props);ListString subscribedTopics new ArrayList();subscribedTopics.add(topic);consumer.subscribe(subscribedTopics);return consumer;}/** 核心逻辑,由子类继承实现** param msgData msg*/public abstract void consumeMsg(String msgData) throws Exception;}3.业务类 Slf4j Service RefreshScope public class CmsInfoConsumer extends MessageConsumer {Resourceprivate InfoService infoService;Overridepublic void consumeMsg(String msgData) throws Exception {log.info(CmsWeatherConsumer收到mq消息message{}, msgData);CcmAlertInfoDTO alertInfoDTO JSONObject.parseObject(msgData, CcmAlertInfoDTO.class);try {//to_do 处理消费内容infoService.saveInfoContent(alertInfoDTO);} catch (Exception e) {e.printStackTrace();log.info(同步用户消息失败 e);}} }至此一个简单的通过kafka同步预警消息的应用就开发完了。
- 上一篇: 广州微信网站开发茂名网站建设公司哪个好
- 下一篇: 广州西樵网站制作网站备案好
相关文章
-
广州微信网站开发茂名网站建设公司哪个好
广州微信网站开发茂名网站建设公司哪个好
- 技术栈
- 2026年04月20日
-
广州微信网站开发福建福清市住房和建设局网站
广州微信网站开发福建福清市住房和建设局网站
- 技术栈
- 2026年04月20日
-
广州微网站建设咨询揭秘低价网站建设危害
广州微网站建设咨询揭秘低价网站建设危害
- 技术栈
- 2026年04月20日
-
广州西樵网站制作网站备案好
广州西樵网站制作网站备案好
- 技术栈
- 2026年04月20日
-
广州冼村人很有钱吗深圳搜索引擎优化推广
广州冼村人很有钱吗深圳搜索引擎优化推广
- 技术栈
- 2026年04月20日
-
广州小型企业网站建设用ip访问没有备案的网站
广州小型企业网站建设用ip访问没有备案的网站
- 技术栈
- 2026年04月20日
