青岛网站建设软件企业为什么要做短视频营销
- 作者: 五速梦信息网
- 时间: 2026年03月21日 09:54
当前位置: 首页 > news >正文
青岛网站建设软件,企业为什么要做短视频营销,做张家界旅游网站多少钱,wordpress 新窗口打开nofollow链接背景
由于当时项目周期赶#xff0c;引入了一个PLC4X组件#xff0c;上手快。接下来就是使用这个组件遇到的一些问题#xff1a;
关闭连接NioEventLoop没有释放导致oom设计思想是一个设备一个连接#xff0c;而不是一个网关一个连接连接断开后客户端无从感知 前两个问题解…背景
由于当时项目周期赶引入了一个PLC4X组件上手快。接下来就是使用这个组件遇到的一些问题
关闭连接NioEventLoop没有释放导致oom设计思想是一个设备一个连接而不是一个网关一个连接连接断开后客户端无从感知 前两个问题解决方案参考上篇文章最后一个问题虽然可以通过isConnect()方法获取到状态但是连接断开后这个状态并没有更新只能代码实现失败重连。 所以为了解决以上问题我打算重新封装一个Modbus组件。
步骤
代码如下所示目前只分享modbus-core相关的代码。
modbus-core实现设备读写指令的下发以及应答。modbus-app实现通用的可灵活配置的modbus设备接入层通过更新配置信息即可快速引入新设备无需手写代码重启应用。 为了快速实现modbus组件封装这里引入了Vertx框架(基于事件异步)官网链接,而不是原生的Netty框架。
引入架包
!– 目前我这里引入最新的版本(4.4.4) –
dependencygroupIdio.vertx/groupIdartifactIdvertx-core/artifactIdversion${vertx.version}/version/dependency工具类
ByteUtil
package com.bho.modbus.utils;import java.nio.ByteBuffer;public class ByteUtil {/*** 字节数组转字符串* param bytes* return/public static String bytesToHexString(byte[] bytes) {StringBuffer sb new StringBuffer(bytes.length);String sTemp;for (int i 0; i bytes.length; i) {sTemp Integer.toHexString(0xFF bytes[i]);if (sTemp.length() 2) {sb.append(0);}sb.append(sTemp.toUpperCase());}return sb.toString();}/** int整型转字节数组* param data* param offset* param len* return/public static byte[] intToBytes(int data, int offset, int len) {ByteBuffer buffer ByteBuffer.allocate(4);buffer.putInt(data);byte[] bytes buffer.array();if (len - offset 4) {return bytes;}byte[] dest new byte[len];System.arraycopy(bytes, offset, dest, 0, len);return dest;}/** 字节数组转int整型* param bytes* param offset* param len* return/public static int bytesToInt(byte[] bytes, int offset, int len) {ByteBuffer buffer ByteBuffer.allocate(4);for (int i len; i 4; i ) {buffer.put((byte) 0x00);}for (int i offset; i offset len; i) {buffer.put(bytes[i]);}buffer.flip();return buffer.getInt();}}
Crc16
package com.bho.modbus.utils;public class Crc16 {/** 获取CRC16校验码* param arr_buff* return/public static byte[] getCrc16(byte[] arr_buff) {int len arr_buff.length;// 预置 1 个 16 位的寄存器为十六进制FFFF, 称此寄存器为 CRC寄存器。int crc 0xFFFF;int i, j;for (i 0; i len; i) {// 把第一个 8 位二进制数据 与 16 位的 CRC寄存器的低 8 位相异或, 把结果放于 CRC寄存器crc ((crc 0xFF00) | (crc 0x00FF) ^ (arr_buff[i] 0xFF));for (j 0; j 8; j) {// 把 CRC 寄存器的内容右移一位( 朝低位)用 0 填补最高位, 并检查右移后的移出位if ((crc 0x0001) 0) {// 如果移出位为 1, CRC寄存器与多项式A001进行异或crc crc 1;crc crc ^ 0xA001;} else// 如果移出位为 0,再次右移一位crc crc 1;}}return intToBytes(crc);}private static byte[] intToBytes(int value) {byte[] src new byte[2];src1 ((value 8) 0xFF);src0 (value 0xFF);return src;}}
实体类
ModbusMode
目前只实现了以下两种通信方式可根据自己需求加入其它通信方式。
package com.bho.modbus.model;import com.bho.modbus.utils.ByteUtil;
import com.bho.modbus.utils.Crc16;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import lombok.extern.log4j.Log4j2;import java.nio.ByteOrder;Log4j2
public enum ModbusMode {/** 【事务ID(2) 协议标识(2) 数据长度(2)】 从机地址(1) 功能码(1) 数据区(N)/TCP,/** 从机地址(1) 功能码(1) 数据区(N) 【校验码(2)】/RTU,;public ByteToMessageDecoder getDecoder() {if (this ModbusMode.TCP) {return new LengthFieldBasedFrameDecoder(ByteOrder.BIG_ENDIAN, 65536, 4,2, 0, 6, true);}if (this ModbusMode.RTU){return new LengthFieldBasedFrameDecoder(ByteOrder.BIG_ENDIAN, 65536, 2,1, 2, 0, true);}return null;}public byte[] readData(byte[] bytes) {int len bytes.length;if (this ModbusMode.RTU) {byte[] tempArr new byte[len - 2];System.arraycopy(bytes, 0, tempArr, 0, tempArr.length);byte[] crc16 Crc16.getCrc16(tempArr);if (crc16[0] ! bytes[len -2] || crc16[1] ! bytes[len - 1]) {log.error(Modbus receive illegal data:{}, ByteUtil.bytesToHexString(bytes));return null;}if (log.isDebugEnabled()) {log.debug(read data:{}, ByteUtil.bytesToHexString(tempArr));}return tempArr;}if (this ModbusMode.TCP) {if (log.isDebugEnabled()) {log.debug(read data:{}, ByteUtil.bytesToHexString(bytes));}return bytes;}return null;}public byte[] writeData(byte[] bytes) {if (log.isDebugEnabled()) {log.debug(write data:{},ByteUtil.bytesToHexString(bytes));}int len bytes.length;if (this ModbusMode.RTU) {byte[] crc16 Crc16.getCrc16(bytes);byte[] tempArr new byte[len 2];System.arraycopy(bytes, 0, tempArr, 0, len);tempArr[len] crc16[0];tempArr[len 1] crc16[1];return tempArr;}if (this ModbusMode.TCP) {byte[] tempArr new byte[len 6];tempArr[1] 0x01;byte[] lenBytes ByteUtil.intToBytes(len, 2, 2);tempArr[4] lenBytes[0];tempArr[5] lenBytes[1];System.arraycopy(bytes, 0, tempArr, 6, len);return tempArr;}return null;}}
ModbusFunc
功能码
package com.bho.modbus.model;/* Modbus常见功能码/
public enum ModbusFunc {/** 错误代码* 01非法的功能码* 02非法的寄存器地址* 03非法的数据值* 04从机故障//** 请求:* 功能代码1字节 0x01* 起始地址2字节 0x0000-0xffff* 线圈数量2字节 0x0001-0x07d0(2000)** 正确响应:* 功能代码1字节 0x01* 字节数1字节 N(读线圈个数/8余数不为0则加1)* 线圈状态N字节** 错误响应:* 功能代码1字节 0x81* 错误代码1字节 0x01-0x04/READ_COILS((byte)0x01),//读连续线圈状态READ_DISCRETE_COILS((byte)0x02),//读离散线圈状态 同上/** 请求:* 功能代码1字节 0x03* 起始地址2字节 0x0000-0xffff* 寄存器数量2字节 0x0001-0x007d(125)** 正确响应:* 功能代码1字节 0x03* 字节数1字节 2N(N为寄存器数量)* 寄存器数量2N字节** 错误响应:* 功能代码1字节 0x83* 错误代码1字节 0x01-0x04/READ_HOLDING_REGISTERS((byte)0x03),//读保持寄存器值READ_INPUT_REGISTERS((byte)0x04),//读输入寄存器值 同上/** 请求:* 功能代码1字节 0x05* 起始地址2字节 0x0000-0xffff* 线圈状态2字节 0x0000/0xff00** 正确响应:* 功能代码1字节 0x05* 起始地址2字节 0x0000-0xffff* 线圈状态2字节 0x0000/0xff00** 错误响应:* 功能代码1字节 0x85* 错误代码1字节 0x01-0x04/WRITE_SINGLE_COILS((byte)0x05),//写单个线圈/** 请求:* 功能代码1字节 0x06* 起始地址2字节 0x0000-0xffff* 寄存器值2字节 0x0000-0xffff** 正确响应:* 功能代码1字节 0x06* 起始地址2字节 0x0000-0xffff* 寄存器值2字节 0x0000-0xffff** 错误响应:* 功能代码1字节 0x86* 错误代码1字节 0x01-0x04/WRITE_SINGLE_HOLDING_REGISTERS((byte)0x06),//写单个保持寄存器/** 请求:* 功能代码1字节 0x10* 起始地址2字节 0x0000-0xffff* 写入寄存器个数2字节 0x0001-0x007b(123)* 写入字节数1字节 2N(N为寄存器个数)* 寄存器值2N字节 0x0000-0xffff** 正确响应:* 功能代码1字节 0x10* 起始地址2字节 0x0000-0xffff* 写入寄存器个数2字节 0x0001-0x007b(123)** 错误响应:* 功能代码1字节 0x90* 错误代码1字节 0x01-0x04/WRITE_MULTI_HOLDING_REGISTERS((byte)0x10),//写多个保持寄存器/** 请求:* 功能代码1字节 0x0F* 起始地址2字节 0x0000-0xffff* 写入线圈个数2字节 0x0001-0x07b0(1968)* 写入字节数1字节 N(N为线圈个数/8,余数不为0则加1)* 线圈状态N字节** 正确响应:* 功能代码1字节 0x0F* 起始地址2字节 0x0000-0xffff* 写入线圈个数2字节 0x0001-0x07b0(1968)** 错误响应:* 功能代码1字节 0x8F* 错误代码1字节 0x01-0x04*/WRITE_MULTI_COILS((byte)0x0F),//写多个线圈;private byte func;ModbusFunc(byte func) {this.func func;}public byte getFunc() {return func;}
}
ModbusParamConfig
下发指令参数配置信息
package com.bho.modbus.model;import lombok.Data;Data
public class ModbusParamConfig {private RegisterType registerType;//寄存器类型private int registerAddress;//寄存器地址private String name;//指标名称private DataType dataType;//指标数据类型private int numberSplit;//(除)倍数public enum RegisterType {COIL,HOLDING_REGISTER,INPUTREGISTER;}public enum DataType {BOOL,FLOAT,INT;}}
SendCmdTask
下发指令任务
package com.bho.modbus.model;import com.alibaba.fastjson.JSONObject;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import lombok.Data;import java.util.List;Data
public class SendCmdTask {private ListModbusParamConfig paramConfigs;//参数列表private JSONObject reqParam;//请求参数 写数据必填private Boolean isWrite;//是否是写数据private Integer slaveId;//从机IDprivate Integer reqTimeout;//请求超时时间(秒)private PromiseJSONObject promise;private Long timerId;public SendCmdTask(Vertx vertx, ListModbusParamConfig paramConfigs, JSONObject reqParam, Boolean isWrite, Integer slaveId, Integer reqTimeout) {this.paramConfigs paramConfigs;this.reqParam reqParam;this.isWrite isWrite;this.slaveId slaveId;this.reqTimeout Math.max(reqTimeout, 5);PromiseJSONObject promise Promise.promise();this.promise promise;this.timerId vertx.setTimer(reqTimeout * 1000, hh - promise.tryFail(Request timeout));}
}
核心类
package com.bho.modbus.core;import com.alibaba.fastjson.JSONObject;
import com.bho.modbus.model.SendCmdTask;
import com.bho.modbus.model.ModbusFunc;
import com.bho.modbus.model.ModbusMode;
import com.bho.modbus.model.ModbusParamConfig;import com.bho.modbus.utils.ByteUtil;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.buffer.impl.BufferImpl;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.impl.NetSocketImpl;
import lombok.extern.log4j.Log4j2;import java.util.;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;Log4j2
public class ModbusConnection {private String ip;//从机IPprivate Integer port;//从机端口private AtomicBoolean isAlive;//从机是否在线private ModbusMode mode;//通讯模式private NetSocket netSocket;//客户端连接private boolean isInitiativeClose;//是否是主动关闭连接private Long failRetryTimerId;//失败重试定时器IDprivate Integer failRetryIntervalSecond;//连接断开后重连间隔时间private Integer reqTimeoutSecond 1;//请求超时时间private Long queueTimerId;//队列定时器private ConcurrentLinkedQueueSendCmdTask writeQueue;//写队列 优先写private ConcurrentLinkedQueueSendCmdTask readQueue;//读队列private MapString, Promisebyte[] promiseMap;private Vertx vertx;public ModbusConnection(Vertx vertx, String ip, Integer port, Integer failRetryIntervalSecond, ModbusMode mode) {this.vertx vertx;this.ip ip;this.port port;this.failRetryIntervalSecond failRetryIntervalSecond;this.mode mode;this.isAlive new AtomicBoolean(false);this.writeQueue new ConcurrentLinkedQueue();this.readQueue new ConcurrentLinkedQueue();this.promiseMap new ConcurrentHashMap();consumerTaskQueue(true);}/** 建立连接* return*/public FutureBoolean connect(){NetClient netClient vertx.createNetClient();return vertx.executeBlocking(b - {netClient.connect(port, ip).onSuccess(socket - {log.info(Modbus connect success, ip:{}, port:{}, ip, port);netSocket socket;isAlive.set(true);b.tryComplete(true);NetSocketImpl netSocketImpl (NetSocketImpl) socket;netSocketImpl.channelHandlerContext().pipeline().addFirst(mode.getDecoder());socket.handler(buf - {byte[] bytes mode.readData(buf.getBytes());if (bytes null) {return;}int slaveId ByteUtil.bytesToInt(bytes, 0, 1);int funcNo ByteUtil.bytesToInt(bytes, 1, 1);int errFuncNo funcNo - 128;String key String.format(%s%s, slaveId, funcNo);String errKey String.format(%s_%s, slaveId, errFuncNo);if (promiseMap.containsKey(key)) {Promisebyte[] promise promiseMap.get(key);byte[] content new byte[bytes.length - 2];System.arraycopy(bytes, 2, content, 0, content.length);promise.tryComplete(content);} else if (promiseMap.containsKey(errKey)) {Promisebyte[] promise promiseMap.get(errKey);int data ByteUtil.bytesToInt(bytes, 2, 1);switch (data) {case 1:promise.tryFail(Illegal function code);break;case 2:promise.tryFail(Illegal register address);break;case 3:promise.tryFail(Illegal data value);break;case 4:promise.tryFail(Slave fault);break;}}});socket.closeHandler(h - {if (!isInitiativeClose) {log.error(Modbus connect close, ip:{}, port:{}, ip, port);failRetryTimerId vertx.setTimer(failRetryIntervalSecond * 1000, hh - connect());} else {log.info(Modbus connect close, ip:{}, port:{}, ip, port);}});}).onFailure(err - {log.error(Modbus connect fail, ip:{}, port:{}, msg:{}, ip, port, err.getMessage());isAlive.set(false);b.fail(err.getMessage());failRetryTimerId vertx.setTimer(failRetryIntervalSecond * 1000, h - connect());});});}/*** 是否在线* return/public boolean isActive() {return isAlive.get();}/** 断开连接/public void close() {isInitiativeClose true;if (failRetryTimerId ! null) {vertx.cancelTimer(failRetryTimerId);}if (queueTimerId ! null) {vertx.cancelTimer(queueTimerId);}if (netSocket ! null) {netSocket.close();}}/** 下发读写任务串行 优先写任务* 若并行可直接调用executeTask执行任务无需排队等候一个个消费任务* param task 读写任务* return/public PromiseJSONObject offerTask(SendCmdTask task) {if (task.getIsWrite()) {writeQueue.offer(task);} else {readQueue.offer(task);}return task.getPromise();}/** 消费任务队列 500毫秒轮询一次 优先消费写任务* param delayFlag/private void consumerTaskQueue(boolean delayFlag){if(delayFlag){queueTimerId vertx.setTimer(500,id-{consumerTaskQueue(false);});return;}if(writeQueue.isEmpty() readQueue.isEmpty()){consumerTaskQueue(true);return;}if(!writeQueue.isEmpty()){SendCmdTask sendCmdTask writeQueue.poll();sendCmdTask.getPromise().future().onComplete(h-{consumerTaskQueue(false);});executeTask(sendCmdTask);return;}if(!readQueue.isEmpty()){SendCmdTask sendCmdTask readQueue.poll();sendCmdTask.getPromise().future().onComplete(h-{consumerTaskQueue(false);});executeTask(sendCmdTask);}}private FutureVoid executeTask(SendCmdTask sendCmdTask){vertx.cancelTimer(sendCmdTask.getTimerId());FutureJSONObject future;if (sendCmdTask.getIsWrite()) {future executeWrite(sendCmdTask.getReqParam(), sendCmdTask.getParamConfigs(), sendCmdTask.getSlaveId());} else {future executeQuery(sendCmdTask.getParamConfigs(), sendCmdTask.getSlaveId());}return future.onSuccess(res - sendCmdTask.getPromise().tryComplete(res)).onFailure(err - sendCmdTask.getPromise().tryFail(err)).map(o - null);}/** 写数据* param reqParam 下发参数* param paramConfigs 参数配置列表* param slaveId 从机ID* return/private FutureJSONObject executeWrite(JSONObject reqParam, ListModbusParamConfig paramConfigs, Integer slaveId) {if (!isActive()) {return Future.failedFuture(Gateway offline);}boolean isMerge isMergeSendCmd(paramConfigs);if (isMerge) {int registerAddress paramConfigs.get(0).getRegisterAddress();ModbusParamConfig.RegisterType registerType paramConfigs.get(0).getRegisterType();Promisebyte[] promise Promise.promise();ListString keyList paramConfigs.stream().map(ModbusParamConfig::getName).collect(Collectors.toList());return vertx.executeBlocking(h - {Buffer buffer getWriteCmd(registerAddress, slaveId, reqParam, keyList, registerType, promise);netSocket.write(buffer);promise.future().onSuccess(buf - {h.complete(reqParam);}).onFailure(err - {log.error(Modbus executeWrite fail, ip:{}, port:{}, slaveId:{}, msg:{}, ip, port, slaveId, err.getMessage());h.tryFail(err.getMessage());});});}ListFutureObject futures new ArrayList();Future blockingFuture Future.succeededFuture();for (int i 0; i paramConfigs.size(); i) {ModbusParamConfig paramConfig paramConfigs.get(i);ModbusParamConfig.RegisterType registerType paramConfig.getRegisterType();Promisebyte[] promise Promise.promise();blockingFuture blockingFuture.compose(suc - singleExecuteWrite(slaveId, reqParam, promise, registerType, paramConfig),err - singleExecuteWrite(slaveId, reqParam, promise, registerType, paramConfig));futures.add(blockingFuture);}return commonReplyResult(futures, paramConfigs);}private FutureObject singleExecuteWrite(int slaveId, JSONObject reqParam, Promisebyte[] promise, ModbusParamConfig.RegisterType registerType, ModbusParamConfig paramConfig) {return vertx.executeBlocking(h - {Buffer buffer getWriteCmd(paramConfig.getRegisterAddress(), slaveId, reqParam, Arrays.asList(paramConfig.getName()), registerType, promise);netSocket.write(buffer);promise.future().onSuccess(buf - {h.tryComplete(reqParam.get(paramConfig.getName()));}).onFailure(err - {log.error(Modbus executeWrite fail, ip:{}, port:{}, slaveId:{}, key:{}, msg:{},ip, port, slaveId, paramConfig.getName(), err.getMessage());h.tryFail(err.getMessage());});});}/** 读数据* param paramConfigs 参数配置列表* param slaveId 从机ID* return*/private FutureJSONObject executeQuery(ListModbusParamConfig paramConfigs, Integer slaveId) {if (!isActive()) {return Future.failedFuture(Gateway offline);}boolean isMerge isMergeSendCmd(paramConfigs);if (isMerge) {int registerAddress paramConfigs.get(0).getRegisterAddress();ModbusParamConfig.RegisterType registerType paramConfigs.get(0).getRegisterType();int num paramConfigs.size();Promisebyte[] promise Promise.promise();Buffer buffer getQueryCmd(registerAddress, num, slaveId, registerType, promise);return vertx.executeBlocking(h - {netSocket.write(buffer);promise.future().onSuccess(buf - {JSONObject jsonObject new JSONObject();for (int i 0; i paramConfigs.size(); i) {ModbusParamConfig paramConfig paramConfigs.get(i);switch (registerType) {case COIL:Integer pow Double.valueOf(Math.pow(2, i % 8)).intValue();jsonObject.put(paramConfig.getName(), (pow buf[i / 8 1]) pow);break;case INPUT_REGISTER:case HOLDING_REGISTER:jsonObject.put(paramConfig.getName(), getValue(ByteUtil.bytesToInt(buf, i * 2 1, 2), paramConfig.getNumberSplit(), paramConfig.getDataType()));break;}}h.complete(jsonObject);}).onFailure(err - {log.error(Modbus executeQuery fail, ip:{}, port:{}, slaveId:{}, msg:{}, ip, port, slaveId, err.getMessage());h.tryFail(err.getMessage());});});}ListFutureObject futures new ArrayList();Future blockingFuture Future.succeededFuture();for (int i 0; i paramConfigs.size(); i) {ModbusParamConfig paramConfig paramConfigs.get(i);ModbusParamConfig.RegisterType registerType paramConfig.getRegisterType();Promisebyte[] promise Promise.promise();blockingFuture blockingFuture.compose(suc - singleExecuteQuery(slaveId, promise, registerType, paramConfig),err - singleExecuteQuery(slaveId, promise, registerType, paramConfig));futures.add(blockingFuture);}return commonReplyResult(futures, paramConfigs);}private FutureObject singleExecuteQuery(int slaveId, Promisebyte[] promise, ModbusParamConfig.RegisterType registerType, ModbusParamConfig paramConfig) {return vertx.executeBlocking(h - {Buffer buffer getQueryCmd(paramConfig.getRegisterAddress(), 1, slaveId, paramConfig.getRegisterType(), promise);netSocket.write(buffer);promise.future().onSuccess(buf - {switch (registerType) {case COIL:h.complete(Integer.valueOf(buf[1]) 1);break;case INPUT_REGISTER:case HOLDING_REGISTER:h.complete(getValue(ByteUtil.bytesToInt(buf, 1, 2), paramConfig.getNumberSplit(), paramConfig.getDataType()));break;}}).onFailure(err - {log.error(Modbus executeQuery fail, ip:{}, port:{}, slaveId:{}, key:{}, msg:{},ip, port, slaveId, paramConfig.getName(), err.getMessage());h.tryFail(err.getMessage());});});}/*** 如果所有参数寄存器类型一致并且地址连续 则合并成一条命令下发* param paramConfigs* return 是否可以合并下发命令/private boolean isMergeSendCmd(ListModbusParamConfig paramConfigs) {if (paramConfigs.size() 1) {return false;}int lastPos paramConfigs.get(0).getRegisterAddress();ModbusParamConfig.RegisterType registerType paramConfigs.get(0).getRegisterType();for (int i 1; i paramConfigs.size(); i) {int curPos paramConfigs.get(i).getRegisterAddress();if (curPos - lastPos ! 1) {return false;}ModbusParamConfig.RegisterType curRegisterType paramConfigs.get(i).getRegisterType();if (registerType ! curRegisterType) {return false;}lastPos curPos;}return true;}/** 获取查询数据命令* param startPos 查询地址* param num 查询数量* param slaveId 从机ID* param registerType 寄存器类型* param promise* return*/private Buffer getQueryCmd(int startPos, int num, int slaveId, ModbusParamConfig.RegisterType registerType, Promisebyte[] promise) {byte[] bytes new byte[6];bytes[0] ByteUtil.intToBytes(slaveId, 3, 1)[0];switch (registerType) {case COIL:bytes[1] ModbusFunc.READ_COILS.getFunc();break;case HOLDING_REGISTER:bytes[1] ModbusFunc.READ_HOLDING_REGISTERS.getFunc();break;case INPUT_REGISTER:bytes[1] ModbusFunc.READ_INPUTREGISTERS.getFunc();break;}Integer func ByteUtil.bytesToInt(bytes, 1, 1);String key String.format(%s%s, slaveId, func);byte[] startPosBytes ByteUtil.intToBytes(startPos, 0, 4);bytes[2] startPosBytes[2];bytes[3] startPosBytes[3];byte[] numBytes ByteUtil.intToBytes(num, 0, 4);bytes[4] numBytes[2];bytes[5] numBytes[3];Buffer buffer new BufferImpl();buffer.appendBytes(mode.writeData(bytes));promiseMap.put(key, promise);long timeId vertx.setTimer(reqTimeoutSecond * 1000, h - promise.tryFail(Request timeout));promise.future().onComplete(res - {promiseMap.remove(key);vertx.cancelTimer(timeId);});return buffer;}/*** 获取写数据命令* param startPos 查询地址* param slaveId 从机ID* param reqParam 写参数* param keys 参数列表* param registerType 寄存器类型* param promise* return*/private Buffer getWriteCmd(int startPos, int slaveId, JSONObject reqParam,ListString keys, ModbusParamConfig.RegisterType registerType, Promisebyte[] promise) {int len keys.size() 1 ? 6 : (registerType ModbusParamConfig.RegisterType.HOLDING_REGISTER ?7 keys.size() * 2 : 7 Double.valueOf(Math.ceil(keys.size() / 8.0)).intValue());byte[] bytes new byte[len];bytes[0] ByteUtil.intToBytes(slaveId, 3, 1)[0];byte[] startPosBytes ByteUtil.intToBytes(startPos, 0, 4);bytes[2] startPosBytes[2];bytes[3] startPosBytes[3];if (keys.size() 1) {switch (registerType) {case COIL:bytes[1] ModbusFunc.WRITE_SINGLE_COILS.getFunc();boolean value reqParam.getBoolean(keys.get(0));if (value) {bytes4 0xFF;} else {bytes[4] 0x00;}bytes[5] 0x00;break;case HOLDING_REGISTER:bytes[1] ModbusFunc.WRITE_SINGLE_HOLDING_REGISTERS.getFunc();byte[] dataArr ByteUtil.intToBytes(reqParam.getInteger(keys.get(0)), 2, 2);bytes[4] dataArr[0];bytes[5] dataArr[1];break;}} else {byte[] dataNum ByteUtil.intToBytes(keys.size(), 2, 2);bytes[4] dataNum[0];bytes[5] dataNum[1];switch (registerType) {case COIL:bytes[1] ModbusFunc.WRITE_MULTI_COILS.getFunc();int dataSize Double.valueOf(Math.ceil(keys.size() / 8.0)).intValue();bytes[6] ByteUtil.intToBytes(dataSize, 3, 1)[0];for (int i 0; i dataSize; i 2) {int sum 0;int startIndex i * 8;int endIndex (i 2) * 8;endIndex endIndex keys.size() ? keys.size() : endIndex;for (int j startIndex; j endIndex; j) {sum Double.valueOf(Math.pow(2, j)).intValue() * (reqParam.getBoolean(keys.get(j)) ? 1 : 0);}byte[] sumArr ByteUtil.intToBytes(sum, 2, 2);if (i 8 keys.size()) {bytes[i 7] sumArr[0];bytes[i 8] sumArr[1];} else {bytes[i 7] sumArr[1];}}break;case HOLDING_REGISTER:bytes[1] ModbusFunc.WRITE_MULTI_HOLDINGREGISTERS.getFunc();bytes[6] ByteUtil.intToBytes(keys.size() * 2, 3, 1)[0];for (int i 0; i keys.size(); i) {String paramKey keys.get(i);Integer value reqParam.getInteger(paramKey);byte[] dataArr ByteUtil.intToBytes(value, 2, 2);bytes[i * 2 7] dataArr[0];bytes[i * 2 8] dataArr[1];}break;}}Integer func ByteUtil.bytesToInt(bytes, 1, 1);String key String.format(%s%s, slaveId, func);Buffer buffer new BufferImpl();buffer.appendBytes(mode.writeData(bytes));promiseMap.put(key, promise);long timeId vertx.setTimer(reqTimeoutSecond * 1000, h - promise.tryFail(Request timeout));promise.future().onComplete(res - {promiseMap.remove(key);vertx.cancelTimer(timeId);});return buffer;}private FutureJSONObject commonReplyResult(ListFutureObject futures, ListModbusParamConfig paramConfigs) {return vertx.executeBlocking(b - {Future.join(futures).onComplete(h - {JSONObject okJson new JSONObject();JSONObject errJson new JSONObject();for (int i 0; i paramConfigs.size(); i) {ModbusParamConfig paramConfig paramConfigs.get(i);FutureObject objectFuture futures.get(i);if (objectFuture.succeeded()) {okJson.put(paramConfig.getName(), objectFuture.result());} else {errJson.put(paramConfig.getName(), objectFuture.cause().getMessage());}}if (okJson.size() 0) {b.tryComplete(okJson);} else {b.tryFail(errJson.getString(paramConfigs.get(0).getName()));}});});}private Object getValue(int value, int numberSplit, ModbusParamConfig.DataType dataType) {if (numberSplit 1) {return value;}Float temp value * 1f / numberSplit;switch (dataType) {case INT :return Math.round(temp);case FLOAT:return temp;}return temp;}}
测试
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.bho.modbus.model.ModbusMode;
import com.bho.modbus.core.ModbusConnection;
import com.bho.modbus.model.ModbusParamConfig;
import com.bho.modbus.model.SendCmdTask;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import lombok.extern.log4j.Log4j2;import java.util.List;Log4j2
public class TestModbus {public static final String READ_DATA [ { \name: \a\, \registerType: \HOLDING_REGISTER\, \registerAddress: 504, \dataType: \FLOAT\, \numberSplit: 10 }, { \name: \b\, \registerType: \HOLDING_REGISTER\, \registerAddress: 505, \dataType: \FLOAT\, \numberSplit: 10 }, { \name: \c\, \registerType: \HOLDING_REGISTER\, \registerAddress: 506, \dataType: \FLOAT\, \numberSplit: 10 }, { \name: \d\, \registerType: \HOLDING_REGISTER\, \registerAddress: 507, \dataType: \INT\, \numberSplit: 1 }, { \name: \e\, \registerType: \HOLDING_REGISTER\, \registerAddress: 508, \dataType: \INT\, \numberSplit: 1 }];private static final String WRITE_DATA [ { \name: \do0\, \registerType: \COIL\, \registerAddress: 20, \dataType: \BOOL\, \numberSplit: 1 } ,{ \name: \do1\, \registerType: \COIL\, \registerAddress: 21, \dataType: \BOOL\, \numberSplit: 1 } ];public static void main(String[] args) {testReadData();
// testWriteData();;}private static void testWriteData() {Vertx vertx Vertx.vertx();ModbusConnection connection new ModbusConnection(vertx,127.0.0.1, 502, 30, ModbusMode.TCP);FutureBoolean connectFuture connection.connect();JSONObject reqParam new JSONObject();reqParam.put(do0, false);reqParam.put(do1, false);ListModbusParamConfig modbusParamConfigs JSONArray.parseArray(WRITE_DATA, ModbusParamConfig.class);connectFuture.onComplete(con - {if (connectFuture.succeeded()) {SendCmdTask task new SendCmdTask(vertx, modbusParamConfigs, null, false, 21, 10);PromiseJSONObject promise connection.offerTask(task);promise.future().onSuccess(suc - {log.info(read:suc);}).onFailure(err - System.err.println(err.getMessage()));SendCmdTask task2 new SendCmdTask(vertx, modbusParamConfigs, reqParam, true, 21, 10);PromiseJSONObject promise2 connection.offerTask(task2);promise2.future().onSuccess(suc - {log.info(write:suc);}).onFailure(err - System.err.println(err.getMessage()));} else {System.err.println(gateway offline);}});}private static void testReadData() {Vertx vertx Vertx.vertx();ModbusConnection connection new ModbusConnection(vertx,127.0.0.1, 502, 30, ModbusMode.TCP);FutureBoolean connectFuture connection.connect();ListModbusParamConfig modbusParamConfigs JSONArray.parseArray(READ_DATA, ModbusParamConfig.class);connectFuture.onComplete(con - {if (connection.isActive()) {SendCmdTask task new SendCmdTask(vertx, modbusParamConfigs, null, false, 2, 10);PromiseJSONObject promise connection.offerTask(task);promise.future().onSuccess(suc - {log.info(suc);}).onFailure(err - System.err.println(err.getMessage()));} else {System.err.println(gateway offline);}});}
}
运行结果如下 其实这两个读写示例如果是一个网关可以共用一个Modbus连接。
modbus-app配置参数
格式如下
{readable: {devType01: {ReportData: [{name : xxx,registerType : COIL,registerAddress : 1,dataType : BOOL,numberSplit : 1}]},devType02: {ReportData: [{name : a,registerType : HOLDING_REGISTER,registerAddress : 1,dataType : INT,numberSplit : 1},{name : b,registerType : HOLDING_REGISTER,registerAddress : 2,dataType : INT,numberSplit : 10},{name: c,registerType: ,dataType: FLOAT,mbScript: (a*10000b)/10}]}},writable: {devType01: {Control: [{name: operation,registerType: COIL,registerAddress: 21,dataType: BOOL,numberSplit: 1}]}},readDataPeriods: [{period : 60,deviceTypes: [devType01]},{period : 600,deviceTypes: [devType02,devType03]}]
}具体怎么实现这边就不过多讲解了…
结束
不保证代码正确我这边只是大概实现了一下仅供参考。若有问题请批评指出我会虚心接受并积极修复问题。
- 上一篇: 青岛网站建设平台排课系统网络架构
- 下一篇: 青岛网站建设微动力注册深圳公司代理记账报税
相关文章
-
青岛网站建设平台排课系统网络架构
青岛网站建设平台排课系统网络架构
- 技术栈
- 2026年03月21日
-
青岛网站建设价格开发公司工程部
青岛网站建设价格开发公司工程部
- 技术栈
- 2026年03月21日
-
青岛网站建设公司排行百度网站链接提交
青岛网站建设公司排行百度网站链接提交
- 技术栈
- 2026年03月21日
-
青岛网站建设微动力注册深圳公司代理记账报税
青岛网站建设微动力注册深圳公司代理记账报税
- 技术栈
- 2026年03月21日
-
青岛网站建设维护友情链接如何交换
青岛网站建设维护友情链接如何交换
- 技术栈
- 2026年03月21日
-
青岛网站建设迅优广州网页设计培训学校
青岛网站建设迅优广州网页设计培训学校
- 技术栈
- 2026年03月21日






