首页/文章列表/文章详情

物联网之使用Vertx实现TCP最佳实践【响应式】

编程知识612025-05-19评论

小伙伴们,你们好呀,我是老寇,跟我一起学习使用Vertx实现TCP-Server

实现TCP-Server【响应式】

Vertx-Core地址

实现过程

查看源码

代码比较简单,懒得讲解啦
代码比较简单,懒得讲解啦
代码比较简单,懒得讲解啦
tcp-server【响应式】
<dependency> <groupId>io.vertx</groupId> <artifactId>vertx-core</artifactId> <version>5.0.0</version></dependency>

VertxConfig

/** * @author laokou */@Configurationpublic class VertxConfig { @Bean(destroyMethod ="close") public Vertx vertx() { VertxOptions vertxOptions = new VertxOptions(); vertxOptions.setMaxEventLoopExecuteTime(30); vertxOptions.setWorkerPoolSize(40); vertxOptions.setMaxWorkerExecuteTime(30); vertxOptions.setMaxEventLoopExecuteTimeUnit(TimeUnit.SECONDS); vertxOptions.setMaxWorkerExecuteTimeUnit(TimeUnit.SECONDS); vertxOptions.setPreferNativeTransport(true); vertxOptions.setInternalBlockingPoolSize(40); vertxOptions.setEventLoopPoolSize(Math.max(32, 2 * CpuCoreSensor.availableProcessors())); return Vertx.vertx(vertxOptions); }}

TcpServerProperties

/** * @author laokou */@Data@Component@ConfigurationProperties(prefix ="spring.tcp-server")public class TcpServerProperties { private String host ="0.0.0.0"; private Set<Integer> ports = new HashSet<>(0); private int acceptBacklog = -1; private ClientAuth clientAuth = ClientAuth.NONE; private boolean sni = false; private boolean useProxyProtocol = false; private long proxyProtocolTimeout = 30L; private TimeUnit proxyProtocolTimeoutUnit = TimeUnit.SECONDS; private boolean registerWriteHandler = false;}

VertxTcpServer

/** * @author laokou */@Slf4jfinal class VertxTcpServer extends AbstractVerticle { private final TcpServerProperties properties; private final Vertx vertx; private volatile Flux<NetServer> netServer; private boolean isClosed = false; VertxTcpServer(Vertx vertx, TcpServerProperties properties) { this.vertx = vertx; this.properties = properties; } @Override public synchronized void start() { netServer = getTcpServerOptions().map(vertx::createNetServer) .doOnNext(server -> server.connectHandler(socket -> { socket.handler(buffer -> log.info("【Vertx-Tcp-Server】 => 接收数据:{}", buffer.toString())) .closeHandler(close -> log.info("【Vertx-Tcp-Server】 => 连接关闭")); }).listen().onComplete(result -> { if (isClosed) { return; } if (result.succeeded()) { log.info("【Vertx-Tcp-Server】 => TCP服务启动成功,端口:{}", result.result().actualPort()); } else { Throwable ex = result.cause(); log.error("【Vertx-Tcp-Server】 => TCP服务启动失败,错误信息:{}", ex.getMessage(), ex); } })); netServer.subscribeOn(Schedulers.boundedElastic()).subscribe(); } @Override public synchronized void stop() { isClosed = true; netServer.doOnNext(server -> server.close().onComplete(result -> { if (result.succeeded()) { log.info("【Vertx-Tcp-Server】 => HTTP服务停止成功,端口:{}", server.actualPort()); } else { Throwable ex = result.cause(); log.error("【Vertx-Tcp-Server】 => HTTP服务停止失败,错误信息:{}", ex.getMessage(), ex); } })).subscribeOn(Schedulers.boundedElastic()).subscribe(); } public void deploy() { // 部署服务 vertx.deployVerticle(this); // 停止服务 Runtime.getRuntime().addShutdownHook(new Thread(this::stop)); } private Flux<NetServerOptions> getTcpServerOptions() { return Flux.fromIterable(properties.getPorts()).map(this::getTcpServerOption); } private NetServerOptions getTcpServerOption(int port) { NetServerOptions options = new NetServerOptions(); options.setHost(properties.getHost()); options.setPort(port); options.setClientAuth(properties.getClientAuth()); options.setSni(properties.isSni()); options.setUseProxyProtocol(properties.isUseProxyProtocol()); options.setProxyProtocolTimeout(properties.getProxyProtocolTimeout()); options.setProxyProtocolTimeoutUnit(properties.getProxyProtocolTimeoutUnit()); options.setRegisterWriteHandler(properties.isRegisterWriteHandler()); options.setAcceptBacklog(properties.getAcceptBacklog()); return options; }}

这个只是一个demo,实际生产中,比较复杂,会出现粘包和拆包,需要自定义相关规则

我是老寇,我们下次再见啦!

神弓

k↑

这个人很懒...

用户评论 (0)

发表评论

captcha