网站的项目建设周期政协系统网站建设
- 作者: 五速梦信息网
- 时间: 2026年04月20日 08:11
当前位置: 首页 > news >正文
网站的项目建设周期,政协系统网站建设,单页面网站,已有域名如何在花生壳网站做二级域名托管目录 一、概述简介1.1. cloud Stream是什么1.2. 设计思想1.3. 标准流程1.4. 注解 二、基于注解代码练习2.1. 消息驱动之生产者2.2. 消息驱动之消费者2.3. 目前存在的问题2.4. 分组解决重复消费问题2.5. 消息持久化 三、函数式编程练习 本篇文章所涉及到的demo练习 使用的cloud … 目录 一、概述简介1.1. cloud Stream是什么1.2. 设计思想1.3. 标准流程1.4. 注解 二、基于注解代码练习2.1. 消息驱动之生产者2.2. 消息驱动之消费者2.3. 目前存在的问题2.4. 分组解决重复消费问题2.5. 消息持久化 三、函数式编程练习 本篇文章所涉及到的demo练习 使用的cloud 2021.0.3 springboot2.6.8
一、概述简介
官网https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/
官网概述https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-preface-notable-deprecations
1.1. cloud Stream是什么
官方定义Spring Cloud Stream是一个用于构建 与 共享消息系统 连接的高度可扩展的事件驱动微服务。
目前主流的消息框架有
ActiveMQRabbitMQRocketMQKafka 假设公司业务项目用了RabbitMQ而大数据项目用了Kafka。这时候就会出现有两个消息框架相对于程序员来说其实并不友好还得两个都掌握正常对于一个程序员来说熟练一个消息框架都不错了何况还搞了两个并且两个维护起来也不好维护。 RabbitMQ和Kafka是两个不同的框架两个消息模型上也存在着差异并且代码上用法也不一样。Spring Cloud Stream就是不再关注具体MQ的细节可以在不改代码的基础上来完成Rabbit和Kafka两个不同的消息中间件的切换这里的切换指的是原本用的RabbitMQ但是用着用着发现kafka比较符合所以想要换框架。 总结成一句话屏蔽底层消息中间件的差异降低切换成本统一消息的编程模型
也就是基于Stream可以和如下消息中间件来整合使用 1.2. 设计思想
常规的MQ设计如下
Message生产者/消费者之间靠消息媒介传递信息内容MessageChannel消息必须走特定的通道队列假如发消息会先发到消息队列当中消息队列的消息如何被消费呢订阅的人可以进行消费
cloud Stream设计如下
通过定义绑定器Binder作为中间层实现了应用程序与消息中间件细节之间的隔离。 在没有绑定器这个概念的情况下我们的SpringBoot应用要直接与消息中间件进行信息交互的时候由于各消息中间件构建的初衷不同它们的实现细节上会有较大的差异性通过定义绑定器作为中间层完美地实现了应用程序与消息中间件细节之间的隔离。Stream对消息中间件的进一步封装可以做到代码层面对中间件的无感知甚至于动态的切换中间件(rabbitmq切换为kafka)使得微服务开发的高度解耦服务可以关注更多自己的业务流程 注意左图是官网的架构图
Binder可以生成BindingBinding用来绑定消息容器的生产者和消费者它有两种类型INPUT和OUTPUTINPUT对应于消费者OUTPUT对应于生产者。 stream为了屏蔽差异抽象出来了一个Binder层。而spring官方提供了这两个框架的实现。 假如想要通过stream连接RabbitMQ就使用
dependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-stream-rabbit/artifactId
/dependency假如想要通过stream连接Kafka就使用
dependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-stream-kafka/artifactId
/dependencyStream中的消息通信方式遵循了发布-订阅模式Topic主题进行广播在RabbitMQ就是Exchange在Kakfa中就是Topic。
1.3. 标准流程 Binder 很方便的连接中间件,屏蔽差异Channel 通道是队列Queue的一种抽象在消息通讯系统中就是实现存储和转发的媒介通过Channe对队列进行配置Source源发送者和Sink水槽接受者 简单的可理解为参照对象是Spring Cloud Stream自身从Stream发布消息就是输出接受消息就是输入。
1.4. 注解 注解完全是基于官方给的模型而定的通过stream使用消息中间件也是非常简单的直接使用以下注解就可以使用。 注意注解依然是能用的但是官方明确表示注解已经被弃用弃用并不是不能用而是用了会画横杠不建议用。但是功能是没有问题的低版本的cloud是没有被弃用的。针对于注解和函数式编程两种我都会进行使用。 题外话学技术永远是这样技术一直在不断的更新迭代真正学习一个技术并不是要掌握编码使用而是要掌握他到底是什么能干什么要去深入理解他对于编码我认为其实不是很重要。就算你今天掌握了官方最新用法回头人家又改写法了。 二、基于注解代码练习
生产者就是消息发送者消费者就是消息接受者。这里我就不用kafka了我直接用的是RabbitMQ。
windows下安装RabbitMQhttps://blog.csdn.net/weixin_43888891/article/details/126514021
2.1. 消息驱动之生产者
1.创建项目可以是聚合可以是普通springboot项目 2.添加pom 因为是和RabbitMQ整合所以就是引入的spring-cloud-starter-stream-rabbit启动器 propertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetproject.build.sourceEncodingUTF-8/project.build.sourceEncodingspringboot.version2.6.8/springboot.versionspringcloud.version2021.0.3/springcloud.version
/propertiesdependencyManagementdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-dependencies/artifactIdversion\({springboot.version}/versiontypepom/typescopeimport/scope/dependencydependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-dependencies/artifactIdversion\){springcloud.version}/versiontypepom/typescopeimport/scope/dependency/dependencies
/dependencyManagement
dependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-stream-rabbit/artifactId/dependency
/dependencies3.添加application配置
server:port: 8801spring:application:name: cloud-stream-providercloud:stream:binders: # 在此处配置要绑定的rabbitmq的服务信息defaultRabbit: # 表示定义的名称用于于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理output: # 这个名字是一个通道的名称destination: studyExchange # 表示要使用的Exchange名称定义content-type: application/json # 设置消息类型本次为json文本则设置“text/plain”binder: defaultRabbit # 设置要绑定的消息服务的具体设置4.添加接口
public interface IMessageProvider {public String send();
}5.添加实现类
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;import javax.annotation.Resource;
import java.util.UUID;// 可以理解为是一个消息的发送管道的定义
EnableBinding(Source.class)
public class MessageProviderImpl implements IMessageProvider {// 消息的发送管道Resourceprivate MessageChannel output;Overridepublic String send() {String serial UUID.randomUUID().toString();// 创建并发送消息this.output.send(MessageBuilder.withPayload(serial).build());System.out.println(***serial: serial);return serial;}
}6.添加controller控制器
RestController
public class SendMessageController {Autowiredprivate IMessageProvider iMessageProvider;GetMapping(send)public String send() {return iMessageProvider.send();}
}7.测试
1首先要保证RabbitMQ是可以访问的http://localhost:15672 2启动项目访问http://localhost:8801/send
下图波峰代表发送消息成功 启动后会创建交换机名称就是application.yml当中的destination属性设置的 注意停止服务后并没有删除交换机
2.2. 消息驱动之消费者
1.创建项目 2.添加pompom和发送者依赖一模一样 3.添加application配置
server:port: 8802spring:application:name: cloud-stream-consumercloud:stream:binders: # 在此处配置要绑定的rabbitmq的服务信息defaultRabbit: # 表示定义的名称用于于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理input: # 这个名字是一个通道的名称destination: studyExchange # 表示要使用的Exchange名称定义content-type: application/json # 设置消息类型本次为对象json如果是文本则设置“text/plain”binder: defaultRabbit # 设置要绑定的消息服务的具体设置4.添加监听消费者只负责接受消息
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;Component
EnableBinding(Sink.class)
public class ReceiveMessageListener {Value(${server.port})private String serverPort;StreamListener(Sink.INPUT)public void input(MessageString message) {System.out.println(消费者1号——-接收到的消息 message.getPayload() \t port: serverPort);}
}5.测试
1启动RabbitMQ 2启动发送消息端服务 3启动消费者服务启动后会发现他自动会向这个交换机当中添加一个队列。 发送消息http://localhost:8801/send 接受消息 注意当停止服务后消息队列会被自动删除
2.3. 目前存在的问题
1.依照8802, clone出来一份运行8803主要用来演示多个消费者的场景 2.启动8801生产者 3.启动8802消费者 4.启动8803消费者 当三个服务都启动后通过RabbitMQ界面会发现一个交换机绑定了两个队列 运行后会发现存在两个问题
有重复消费问题消息持久化问题
1重复消费问题
发送消息后两个消费者都收到了消息http://localhost:8801/send 比如在如下场景中订单系统我们做集群部署都会从RabbitMQ中获取订单信息那如果一个订单同时被两个服务获取到那么就会造成数据错误我们得避免这种情况。这时我们就可以使用Stream中的消息分组来解决 注意在Stream中处于同一个group中的多个消费者是竞争关系就能够保证消息只会被其中一个应用消费一次。不同组是可以全面消费的(重复消费)同一组内会发生竞争关系只有其中一个可以消费。 2消息持久化问题
当生产者发送消息的时候消费者恰好宕机了但是过一会消费者恢复了但是消息却没收到。那也就是意味着消息队列是临时消息队列。针对于这一点大家也可以测试一下加深一下印象。
2.4. 分组解决重复消费问题
原理 微服务应用放置于同一个group中,就能够保证消息只会被其中一个应用消费一次。同一个组内会发生竞争关系,只有其中一个可以消费。
接下来直接调整两个消费者为同一个组添加如下配置 当两个消费者都设置好后启动会发现一个问题 实际上分到一个组对于RabbitMQ来说就是两个消费者监听了一个队列。一个队列那也就意味着当队列收到一条消息哪个消费者谁先消费就是谁的消费完队列里面就没有了也就是只有一个消费者能消费到消息 注意假如不设置group属性的时候默认是启动一个消费者就会创建一个消费队列启动多个服务就会创建多个队列。stream默认使用的是RabbitMQ的topic交换机。当发送者向这个交换机发送消息的时候两个队列就都会接收到。关于RabbitMQ相关知识本篇不记录后续会专门写RabbitMQ相关文章。 最终测试8802/8803实现了轮询分组,每次只有一个消费者8801模块的发的消息只能被8802或8803其中一个接收到,这样避免了重复消费。 2.5. 消息持久化
当三个项目都启动着的时候现在我们要做几件事
停止8802和8803并去除掉8802的分组group: gxs8803不去分组信息停止掉项目的时候会发现消息队列并没有删除说明一旦设置分组信息消息队列就不再是临时队列。 8801发送4条消息启动8802然后消息并没有打印没有收到消息注意8802是去掉分组信息的再启动8803,有分组属性配置,后台打出来了MQ上的消息 原因就是当两个项目都停止的时候队列并未删除而8803还绑定了这个队列所以他就算宕机了又重启了依然可以收到消息。而8802没有设置分组信息他再启动后系统会给他创建一个临时队列自然而然收不到之前的消息了。 三、函数式编程练习
官网介绍https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#spring_cloud_function
由于文章篇幅过长函数式编程将在下一篇文章来记录
- 上一篇: 网站的文本链接怎么做网络平台推广方法
- 下一篇: 网站的新闻栏与产品栏如何做九亭做网站
相关文章
-
网站的文本链接怎么做网络平台推广方法
网站的文本链接怎么做网络平台推广方法
- 技术栈
- 2026年04月20日
-
网站的微信推广怎么做WordPress降低加载时间
网站的微信推广怎么做WordPress降低加载时间
- 技术栈
- 2026年04月20日
-
网站的网站搭建微营销推广软件
网站的网站搭建微营销推广软件
- 技术栈
- 2026年04月20日
-
网站的新闻栏与产品栏如何做九亭做网站
网站的新闻栏与产品栏如何做九亭做网站
- 技术栈
- 2026年04月20日
-
网站的信任度东莞网站优化推广方案
网站的信任度东莞网站优化推广方案
- 技术栈
- 2026年04月20日
-
网站的形式有哪些泉州网站排名
网站的形式有哪些泉州网站排名
- 技术栈
- 2026年04月20日






