整站优化提升排名腾讯云怎样做网站

当前位置: 首页 > news >正文

整站优化提升排名,腾讯云怎样做网站,中山东莞网站推广,网站服务器建立目录 一、网络通讯协议设计 1.1、交互模型 1.2、自定义应用层协议 1.2.1、请求和响应格式约定 ​编辑 1.2.2、参数说明 1.2.3、具体例子 1.2.4、特殊栗子 1.3、实现 BrokerServer 1.3.1、属性和构造 1.3.2、启动 BrokerServer 1.3.3、停止 BrokerServer 1.3.4、处…目录 一、网络通讯协议设计 1.1、交互模型 1.2、自定义应用层协议 1.2.1、请求和响应格式约定 ​编辑 1.2.2、参数说明 1.2.3、具体例子 1.2.4、特殊栗子 1.3、实现 BrokerServer 1.3.1、属性和构造 1.3.2、启动 BrokerServer 1.3.3、停止 BrokerServer 1.3.4、处理每一个客户端连接 1.3.5、读取请求和写响应 1.3.6、根据请求计算响应 1.3.7、清除 channel 一、网络通讯协议设计 1.1、交互模型 目前我们需要考虑的交互模型生产者消费者都是客户端都需要通过 网络 和 BrokerServer 进行通信 此处我们使⽤ TCP 协议, 来作为通信的底层协议. 同时在这个基础上⾃定义应⽤层协议, 完成客⼾端对服 务器这边功能的远程调⽤. TCP 是有连接的Connection创建 / 断开 TCP 连接成本还是挺高的需要三次握手啥的那么这里就是用 Channel 来表示 Connection 内部的 “逻辑上” 的连接使得 “一个管道多个网线传输” 的效果使得 TCP连接得到复用 Ps要远程调用的功能就是在 VirtualHost 中 public 的方法. 1.2、自定义应用层协议 1.2.1、请求和响应格式约定 之前我们定义的 Message 对象本体就是二进制的数据因此这里不方便使用 JSON 这种文本协议 / 格式. 因此这里使用 二进制 的方式来设定协议. 请求如下 /*** 表示一个网络通信中的请求对象按照自定义协议的格式展开/ public class Request {private int type;private int length;private byte[] payload;public int getType() {return type;}public void setType(int type) {this.type type;}public int getLength() {return length;}public void setLength(int length) {this.length length;}public byte[] getPayload() {return payload;}public void setPayload(byte[] payload) {this.payload payload;} }响应如下 /** 这个对象表示一个响应是根据自定义应用层协议来的/ public class Response {private int type;private int length;private byte[] payload;public int getType() {return type;}public void setType(int type) {this.type type;}public int getLength() {return length;}public void setLength(int length) {this.length length;}public byte[] getPayload() {return payload;}public void setPayload(byte[] payload) {this.payload payload;} }1.2.2、参数说明 1type是一个整形用来表示当前这个请求和响应是用来干啥的对应 VirtualHost 中的核心 API取值如下 0x1 创建 channel0x2 关闭 channel0x3 创建 exchange0x4 销毁 exchange0x5 创建 queue0x6 销毁 queue0x7 创建 binding0x8 销毁 binding0x9 发送 message0xa 订阅 message0xb 返回 ack0xc 服务器给客⼾端推送的消息.(被订阅的消息) 响应独有的. 2length 就是用来描述 payload 长度防止粘包问题 3payload 就是具体要传输的二进制数据。数据具体是什么会根据当前是请求还是响应以及当前的 type 的不同取值来确定。 比如 type 是 0x3创建交换机同时当前是一个请求此时 payload 里的内容就相当于 exchangeDeclare 的 参数 的序列化的结果. 比如 type 是 0x3创建交换机同时当前是一个响应此时 payload 里的内容就是 exchangDeclare 的 返回结果 的序列化内容. 1.2.3、具体例子 栗子如下 1请求 当前需要远程调用 exchangeDeclare 方法那么我们就需要传递核心 API 以下参数 使用一个公共的父类包装每次 请求 中公共每个请求都要传输的参数 /** 这个类用来表示方法的公共参数/辅助字段* 后续每个方法会有一些不同的参数不同的参数再用不同的子类来表示/ public class BasicArguments implements Serializable {// 表示一次 请求/响应 的身份标识让请求和响应能对的上protected String rid;// 表示这次通信使用的 channel 的身份标识protected String channelId;public String getRid() {return rid;}public void setRid(String rid) {this.rid rid;}public String getChannelId() {return channelId;}public void setChannelId(String channelId) {this.channelId channelId;}}创建 ExchangeDeclareArguments 类当前这个类将来会被序列化成 request 类中的 payload继承 BasicArguments公共参数实现 Serializable 接口避免序列化问题要传递的参数如下 public class ExchangeDeclareArguments extends BasicArguments implements Serializable {private String exchangeName;private ExchangeType exchangeType;private boolean durable;private boolean autoDelete;private MapString, Object arguments;public String getExchangeName() {return exchangeName;}public void setExchangeName(String exchangeName) {this.exchangeName exchangeName;}public ExchangeType getExchangeType() {return exchangeType;}public void setExchangeType(ExchangeType exchangeType) {this.exchangeType exchangeType;}public boolean isDurable() {return durable;}public void setDurable(boolean durable) {this.durable durable;}public boolean isAutoDelete() {return autoDelete;}public void setAutoDelete(boolean autoDelete) {this.autoDelete autoDelete;}public MapString, Object getArguments() {return arguments;}public void setArguments(MapString, Object arguments) {this.arguments arguments;} }2响应 当前 VirtualHost 中的核心 API 返回值都是 Boolean 类型因此我们使用一个公共类来封装响应当前这个类将来会被序列化成 response 类中的 payload 参数 public class BasicReturns implements Serializable {//用来标识唯一的请求和响应protected String rid;//标识一个 channelprotected String channelId;//标识当前这个远程调用方法的返回值protected boolean ok;public String getRid() {return rid;}public void setRid(String rid) {this.rid rid;}public String getChannelId() {return channelId;}public void setChannelId(String channelId) {this.channelId channelId;}public boolean isOk() {return ok;}public void setOk(boolean ok) {this.ok ok;} }Ps其他核心 API 自定义应用层协议也一样 1.2.4、特殊栗子 0xa 订阅 message 这个核心 API 比较特殊参数中有回调函数 1请求 创建 BasicConsumeArguments 类当前这个类将来会被序列化成 request 类中的 payload 表示要传递的参数需要注意的是 Consumer 这个回调在发送的请求中不需要携带这个参数实际上也携带不了 Ps因为服务器收到这个订阅消息请求之后就直接取拿队列中的消息,接着直接反馈给客户端客户端拿到消息后才执行回调方法要拿这个消息干什么事。 这就类似于你去商店订阅报纸接着拿到报纸以后你要对这个报纸做什么商店是不知道的~~ public class BasicConsumeArguments extends BasicArguments implements Serializable {private String consumerTag;private String queueName;private boolean autoAck;//注意 这里的 Consumer 回调函数不用发送给服务器(实际上也发送不了)//因为服务器收到这个订阅消息请求之后就直接取拿队列中的消息,接着直接反馈给客户端//客户端拿到消息后才执行回调方法//这就类似于你去商店订阅报纸接着拿到报纸以后你要对这个报纸做什么商店是不知道的~~public String getConsumerTag() {return consumerTag;}public void setConsumerTag(String consumerTag) {this.consumerTag consumerTag;}public String getQueueName() {return queueName;}public void setQueueName(String queueName) {this.queueName queueName;}public boolean isAutoAck() {return autoAck;}public void setAutoAck(boolean autoAck) {this.autoAck autoAck;} }2响应 创建 SubScribeReturns 类当前这个类将来会被序列化成 response 类中的 payload 参数 来描述响应 这个响应中不光要携带 BasicReturns 返回的公共响应参数还需要带上回调中消息的参数如下 public class SubScribeReturns extends BasicReturns implements Serializable {private String consumerTag;private BasicProperties basicProperties;private byte[] body;public String getConsumerTag() {return consumerTag;}public void setConsumerTag(String consumerTag) {this.consumerTag consumerTag;}public BasicProperties getBasicProperties() {return basicProperties;}public void setBasicProperties(BasicProperties basicProperties) {this.basicProperties basicProperties;}public byte[] getBody() {return body;}public void setBody(byte[] body) {this.body body;} }1.3、实现 BrokerServer 这里的写法就和以前写过的 TCP 回显服务器很类似了只是根据请求计算响应的方式不同 1.3.1、属性和构造 private ServerSocket serverSocket null;//当前考虑一个 BrokerServer 上只有一个 虚拟主机private VirtualHost virtualHost new VirtualHost(default);//使用 哈希表 来标识当前所有会话(哪个客户端正在和服务器进行通信)//key 是 channelId, value 为对应的 Socket 对象private ConcurrentHashMapString, Socket sessions new ConcurrentHashMap();//用线程池来处理多个客户端请求private ExecutorService executorService null;//引入一个 Boolean 变量控制服务器是否继续运行private volatile boolean runnable true;public BrokerServer(int port) throws IOException {serverSocket new ServerSocket(port);}1.3.2、启动 BrokerServer public void start() throws IOException {System.out.println([BrokerServer] 启动);executorService Executors.newCachedThreadPool();while(runnable) {Socket clientSocket serverSocket.accept();//处理连接的逻辑给线程池executorService.submit(() - {processConnection(clientSocket);});}}1.3.3、停止 BrokerServer /** 停止服务器一般是直接 kill 就可以了* 此处这个单独的方法主要是为了后续的单元测试*/public void stop() throws IOException {runnable false;//放弃线程池中的任务并销毁线程executorService.shutdown();serverSocket.close();}1.3.4、处理每一个客户端连接 private void processConnection(Socket clientSocket) {try (InputStream inputStream clientSocket.getInputStream();OutputStream outputStream clientSocket.getOutputStream()) {// 这里需要按照特定格式来读取并解析. 此时就需要用到 DataInputStream 和 DataOutputStreamtry (DataInputStream dataInputStream new DataInputStream(inputStream);DataOutputStream dataOutputStream new DataOutputStream(outputStream)) {while (true) {// 1. 读取请求并解析.Request request readRequest(dataInputStream);// 2. 根据请求计算响应Response response process(request, clientSocket);// 3. 把响应写回给客户端writeResponse(dataOutputStream, response);}}} catch (EOFException | SocketException e) {// 对于这个代码, DataInputStream 如果读到 EOF , 就会抛出一个 EOFException 异常.// 需要借助这个异常来结束循环System.out.println([BrokerServer] connection 关闭! 客户端的地址: clientSocket.getInetAddress().toString() : clientSocket.getPort());} catch (IOException | ClassNotFoundException | MqException e) {System.out.println([BrokerServer] connection 出现异常!);e.printStackTrace();} finally {try {// 当连接处理完了, 就需要记得关闭 socketclientSocket.close();// 一个 TCP 连接中, 可能包含多个 channel. 需要把当前这个 socket 对应的所有 channel 也顺便清理掉.clearClosedSession(clientSocket);} catch (IOException e) {e.printStackTrace();}}} 1.3.5、读取请求和写响应 private Request readRequest(DataInputStream dataInputStream) throws IOException {Request request new Request();request.setType(dataInputStream.readInt());request.setLength(dataInputStream.readInt());byte[] body new byte[request.getLength()];int n dataInputStream.read(body);if(n ! request.getLength()) {throw new IOException(读出请求格式出错);}request.setPayload(body);return request;}private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {dataOutputStream.write(response.getType());dataOutputStream.write(response.getLength());dataOutputStream.write(response.getPayload());dataOutputStream.flush();}1.3.6、根据请求计算响应 这里就是根据不同的 type 类型来远程调用 VirtualHost 中不同的核心 API需要特别注意订阅消息功能的回调函数 private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {//1.将 request 初步解析成 BasicArgumentsBasicArguments basicArguments (BasicArguments) BinaryTool.fromBytes(request.getPayload());System.out.println([Request] rid basicArguments.getRid() , channelId basicArguments.getChannelId() , type request.getType() , length request.getLength());//2.根据 type 的值进一步区分接下来要干什么boolean ok true;if (request.getType() 0x1) {//创建 channelsessions.put(basicArguments.getChannelId(), clientSocket);System.out.println([BrokerServer] 创建 channel 完成channelId basicArguments.getChannelId());} else if(request.getType() 0x2) {//销毁 channelsessions.remove(basicArguments.getChannelId());System.out.println([BrokerServer] 销毁 channel 完成channelId basicArguments.getChannelId());} else if(request.getType() 0x3) {//创建交换机此时 payLoad 就是 ExchangDeclareArguments 了ExchangeDeclareArguments arguments (ExchangeDeclareArguments) basicArguments;ok virtualHost.exchangeDeclare(arguments.getExchangeName(), arguments.getExchangeType(),arguments.isDurable(), arguments.isAutoDelete(), arguments.getArguments());} else if(request.getType() 0x4) {ExchangeDeleteArguments arguments (ExchangeDeleteArguments) basicArguments;ok virtualHost.exchangeDelete(arguments.getExchangeName());} else if(request.getType() 0x5) {QueueDeclareArguments arguments (QueueDeclareArguments) basicArguments;ok virtualHost.queueDeclare(arguments.getQueueName(), arguments.isDurable(),arguments.isExclusive(), arguments.isAutoDelete(), arguments.getArguments());} else if(request.getType() 0x6) {QueueDeleteArguments arguments (QueueDeleteArguments) basicArguments;ok virtualHost.queueDelete(arguments.getQueueName());} else if(request.getType() 0x7) {QueueBindArguments arguments (QueueBindArguments) basicArguments;ok virtualHost.queueBind(arguments.getQueueName(), arguments.getExchangeName(), arguments.getBindingKey());} else if(request.getType() 0x8) {QueueUnBindArguments arguments (QueueUnBindArguments) basicArguments;ok virtualHost.queueUnBind(arguments.getQueueName(), arguments.getExchangeName());} else if(request.getType() 0x9) {BasicPublishArguments arguments (BasicPublishArguments) basicArguments;ok virtualHost.basicPublish(arguments.getExchangeName(), arguments.getRoutingKey(), arguments.getBasicProperties(), arguments.getBody());} else if(request.getType() 0xa) {BasicConsumeArguments arguments (BasicConsumeArguments) basicArguments;ok virtualHost.basicConsume(arguments.getConsumerTag(), arguments.getQueueName(), arguments.isAutoAck(), new Consumer() {//这个回调函数要做的就是把服务器收到的消息可以直接推送回对应的消费者客户端Overridepublic void handlerDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {//首先需要知道收到的消息要发给哪个客户端//此处 consumerTag 其实就是 channelId根据 channelId 去 sessions 中查询既可以得到对应的//socket 对象了从而往里面发送数据//1.根据 channelId 找到 socket 对象Socket clientSocket sessions.get(consumerTag);if(clientSocket null || clientSocket.isClosed()) {throw new MqException([BrokerServer] 订阅消息的客户端已经关闭);}//2.构造响应数据SubScribeReturns subScribeReturns new SubScribeReturns();subScribeReturns.setChannelId(consumerTag);subScribeReturns.setRid();//由于这里只有响应没有请求不需要去对应rid 暂时不需要subScribeReturns.setOk(true);subScribeReturns.setConsumerTag(consumerTag);subScribeReturns.setBasicProperties(basicProperties);subScribeReturns.setBody(body);byte[] payload BinaryTool.toBytes(subScribeReturns);Response response new Response();// 0xc 表示服务器给消费者客户端推送的消息数据response.setType(0xc);response.setLength(payload.length);response.setPayload(payload);//3.把数据写回给客户端// 注意此处的 dataOutputStream 这个对象不能 close// 如果把 dataOutputStream 关闭 就会直接把 clientSocket 里的 outputStream 也关了// 此时就无法继续往 socket 中写后续的数据了DataOutputStream dataOutputStream new DataOutputStream(clientSocket.getOutputStream());writeResponse(dataOutputStream, response);}});} else if(request.getType() 0xb) {//调用 basicAck 确认消息BasicAckArguments arguments (BasicAckArguments) basicArguments;ok virtualHost.basicAck(arguments.getQueueName(), arguments.getMessageId());} else {throw new MqException([BrokerServer] 未知 typetype request.getType());}//构造响应BasicReturns basicReturns new BasicReturns();basicReturns.setRid(basicArguments.getRid());basicReturns.setChannelId(basicArguments.getChannelId());basicReturns.setOk(ok);byte[] payload BinaryTool.toBytes(basicReturns);Response response new Response();response.setType(request.getType());response.setLength(request.getLength());response.setPayload(payload);System.out.println([Response] rid basicReturns.getRid() , channelId basicReturns.getChannelId() , type response.getType() , length response.getLength());return response;}1.3.7、清除 channel 清理 map 中对应的(clientSocket) session 信息 private void clearClosedSession(Socket clientSocket) {ListString toDeleteChannelId new ArrayList();for(Map.EntryString, Socket entry : sessions.entrySet()) {if(entry.getValue() clientSocket) { //这里一个 key 可能对应多个相同的 Socket//在集合类中不能一边用迭代器一边删除会破坏迭代器结构的!//sessions.remove(entry.getKey());//因此这里先记录下 keytoDeleteChannelId.add(entry.getKey());}}for(String channelId : toDeleteChannelId) {sessions.remove(channelId);}System.out.println([BrokerServer] 清理 session 完毕channelId toDeleteChannelId);}