企业网站建设开发免费素材网免费素材图库

当前位置: 首页 > news >正文

企业网站建设开发,免费素材网免费素材图库,网页下载网站,网盘搜索网站 怎么做引用的是rabbitMQ官方示例的库#xff1a;github.com/rabbitmq/amqp091-go在网络编程中我们知道tcp连接的创建、交互、销毁等相关操作的代价都是很高的#xff0c;所以就要去实现如何复用这些连接#xff0c;并要做到高效并可靠。预期效果#xff1a;项目初始化…引用的是rabbitMQ官方示例的库github.com/rabbitmq/amqp091-go在网络编程中我们知道tcp连接的创建、交互、销毁等相关操作的代价都是很高的所以就要去实现如何复用这些连接并要做到高效并可靠。预期效果项目初始化构建时可以自定义选择生产者开启多个connection每个connection可以启动多少个channel【都是全局复用的】因为rabbitMQ所有的命令都是基本都是通过channel去操作完成的所以这个channel很重要也是我们想要复用的重点。初始化创建完connection和channel后当生产者需要发送一条消息的时候我们可以通过一些策略去选择它发送到哪个connection和channel我这里采用的就是随机选择也可以采用哈希取模、轮询权重算法等这个可以根据自身业务来做。我简单画了一个效果图定义RabbitMQ结构体以及Config结构体type Config struct {Host stringPort intUser stringPassword string }type RabbitMQ struct {ctx context.Contextn intm *sync.MutexConn *amqp.ConnectionChannel []*amqp.Channel }实例化RabbitMQ结构体func (mq *RabbitMQ) New(config Config) (rabbitmq *RabbitMQ) {configString : fmt.Sprintf(amqp://%s:%s%s:%d/, config.User, config.Password, config.Host, config.Port)conn, err : amqp.Dial(configString)if err ! nil {log.Panicf(amqp connect error: %v \n, err)}rabbitmq RabbitMQ{ctx: context.Background(),m: sync.Mutex{},Conn: conn,}return }一、创建消费者// ConsumeWithWork rabbitmq消费消息[work模式 channelNums可以设置当前连接开启多少个channel] func (mq *RabbitMQ) ConsumeWithWork(queueName string, channelNums int) {for i : 0; i channelNums; i {go func(i int) {ch, err : mq.Conn.Channel()if err ! nil {log.Panicf(amqp open a channel error: %v \n, err)}q, err : ch.QueueDeclare(queueName, // nametrue, // durablefalse, // delete when unusedfalse, // exclusivefalse, // no-waitnil, // arguments)if err ! nil {log.Panicf(amqp declare a queue error: %v \n, err)}err ch.Qos(1, // prefetch count0, // prefetch sizefalse, // global)if err ! nil {log.Panicf(amqp set QoS error: %v \n, err)}msg, err : ch.Consume(q.Name, // queue, // consumerfalse, // auto-ackfalse, // exclusivefalse, // no-localfalse, // no-waitnil, // args)if err ! nil {log.Panicf(amqp register a consumer error: %v \n, err)}log.Printf( [work-%d] Waiting for messages. To exit press CTRLC, i)for d : range msg {time.Sleep(2 * time.Second)fmt.Printf([work-%d] Received a message: %s \n, i, d.Body)err d.Ack(false)if err ! nil {log.Printf(work_one Ack Err: %v, err)}}}(i)}var forever chan struct{}-forever }二、创建生产者组// NewPlusherGroups 创建生产者组 func NewPlusherGroups(config Config, connNums, channelNums int) (plusherGroups map[int]*RabbitMQ) {plusherGroups make(map[int]*RabbitMQ, connNums)for i : 0; i connNums; i {var rabbitmq *RabbitMQrabbitmq rabbitmq.New(config)rabbitmq.n ifor cN : 0; cN channelNums; cN {ch, err : rabbitmq.Conn.Channel()if err ! nil {log.Panicf(amqp open a channel error: %v \n, err)}rabbitmq.Channel append(rabbitmq.Channel, ch)}plusherGroups[i] rabbitmq}return }三、将消息随机分发给不同的connection、channel// SendMessageWithWork 生产者发送消息[work模式(many conn and many channel)] func SendMessageWithWork(plusherGroups map[int]*RabbitMQ, queueName, body string) bool {if plusherGroups nil {log.Panicln(SendMessageWithWork plusherGroups params is nil!)}rand.Seed(time.Now().UnixNano())//获取连接个数connNums : len(plusherGroups)//随机分配一个连接对象randConnIndex : rand.Intn(connNums)//选择随机分配的连接对象conn : plusherGroups[randConnIndex]//既然采用了发布者复用conn、channel的形式那么一定要加锁处理//这里为每个对象的操作进行加锁(非线程安全不加锁会报错的)//至于在存在并发竞争的情况下会存在一定性能损耗但是我们配置好适量的conn和channel这个基本可以忽略conn.m.Lock()defer conn.m.Unlock()//获取当前对象的channel个数channelNums : len(conn.Channel)//随机分配一个channel对象randChannelIndex : rand.Intn(channelNums)//选择随机分配的channelch : conn.Channel[randChannelIndex]q, err : ch.QueueDeclare(queueName, // nametrue, // durablefalse, // delete when unusedfalse, // exclusivefalse, // no-waitnil, // arguments)if err ! nil {log.Panicf(amqp declare a queue error: %v \n, err)}body fmt.Sprintf(conn[%d] channel[%d] send message : %s, randConnIndex, randChannelIndex, body)err ch.PublishWithContext(conn.ctx,, // exchangeq.Name, // routing keyfalse, // mandatoryfalse,amqp.Publishing{DeliveryMode: amqp.Persistent,ContentType: text/plain,Body: []byte(body),})if err ! nil {log.Panicf(amqp publish a message error: %v \n, err)}return true }四、main函数调用消费者package mainimport (rabbitmq go-test/rabbitmq/package )func main() {queueName : task_queueconfig : rabbitmq.Config{Host: 192.168.6.103,Port: 5672,User: root,Password: root,}var mq *rabbitmq.RabbitMQmq mq.New(config)//开启N个消费者mq.ConsumeWithWork(queueName, 3) } 五、main函数调用生产者组发送消息package mainimport (fmtgithub.com/gin-gonic/ginrabbitmq go-test/rabbitmq/packagenet/httptime )func main() {var messageNo intqueueName : task_queueconfig : rabbitmq.Config{Host: 192.168.6.103,Port: 5672,User: root,Password: root,}//conn连接数connNums : 2//channel连接数channelNums : 3//启动N个不同conn的连接并且每个连接对应的channel为N个的rabbitmq实例plusherGroup : rabbitmq.NewPlusherGroups(config, connNums, channelNums)e : gin.Default()e.GET(/, func(c *gin.Context) {body : fmt.Sprintf(这是第%d条消息…, messageNo)if rabbitmq.SendMessageWithWork(plusherGroup, queueName, body) true {messageNoc.JSON(200, gin.H{code: 200,msg: success,})} else {c.JSON(200, gin.H{code: 500,msg: error,})}})server : http.Server{Addr: :18776,Handler: e,ReadTimeout: time.Minute,WriteTimeout: time.Minute,}if err : server.ListenAndServe(); err ! nil {panic(any(HttpServer启动失败))} }执行流程启动消费者进程可以看到我们用3个协程开启了3个work也就是对应了3个channel启动生产者组进程这里用的gin框架正常启动我们可以看到rabbitMQ的控制台中一共3个连接1个是消费者进程另外2个是生产者组进程这2个正好和我们上面配置的connNums参数匹配我们可以看到rabbitMQ的控制台中一共9个channel3个是消费者进程另外6个是生产者组进程这6个正好和我们上面配置的channelNums参数匹配调用发送消息ab.exe -n 1000 -c 1000 http://127.0.0.1:18776/我们来看消费者日志打印情况标红的可以证明我们在发送消息时让生产者根据我们的随机分配策略选择connection和channel