小伙伴们,你们好呀,我是老寇,跟我一起学习使用Vertx实现TCP-Server
实现TCP-Server【响应式】
实现过程
代码比较简单,懒得讲解啦
代码比较简单,懒得讲解啦
代码比较简单,懒得讲解啦
tcp-server【响应式】
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>5.0.0</version>
</dependency>
VertxConfig
/**
* @author laokou
*/
@Configuration
public class VertxConfig {
@Bean(destroyMethod = "close")
public Vertx vertx() {
VertxOptions vertxOptions = new VertxOptions();
vertxOptions.setMaxEventLoopExecuteTime(30);
vertxOptions.setWorkerPoolSize(40);
vertxOptions.setMaxWorkerExecuteTime(30);
vertxOptions.setMaxEventLoopExecuteTimeUnit(TimeUnit.SECONDS);
vertxOptions.setMaxWorkerExecuteTimeUnit(TimeUnit.SECONDS);
vertxOptions.setPreferNativeTransport(true);
vertxOptions.setInternalBlockingPoolSize(40);
vertxOptions.setEventLoopPoolSize(Math.max(32, 2 * CpuCoreSensor.availableProcessors()));
return Vertx.vertx(vertxOptions);
}
}
TcpServerProperties
/**
* @author laokou
*/
@Data
@Component
@ConfigurationProperties(prefix = "spring.tcp-server")
public class TcpServerProperties {
private String host = "0.0.0.0";
private Set<Integer> ports = new HashSet<>(0);
private int acceptBacklog = -1;
private ClientAuth clientAuth = ClientAuth.NONE;
private boolean sni = false;
private boolean useProxyProtocol = false;
private long proxyProtocolTimeout = 30L;
private TimeUnit proxyProtocolTimeoutUnit = TimeUnit.SECONDS;
private boolean registerWriteHandler = false;
}
VertxTcpServer
/**
* @author laokou
*/
@Slf4j
final class VertxTcpServer extends AbstractVerticle {
private final TcpServerProperties properties;
private final Vertx vertx;
private volatile Flux<NetServer> netServer;
private boolean isClosed = false;
VertxTcpServer(Vertx vertx, TcpServerProperties properties) {
this.vertx = vertx;
this.properties = properties;
}
@Override
public synchronized void start() {
netServer = getTcpServerOptions().map(vertx::createNetServer)
.doOnNext(server -> server.connectHandler(socket -> {
socket.handler(buffer -> log.info("【Vertx-Tcp-Server】 => 接收数据:{}", buffer.toString()))
.closeHandler(close -> log.info("【Vertx-Tcp-Server】 => 连接关闭"));
}).listen().onComplete(result -> {
if (isClosed) {
return;
}
if (result.succeeded()) {
log.info("【Vertx-Tcp-Server】 => TCP服务启动成功,端口:{}", result.result().actualPort());
}
else {
Throwable ex = result.cause();
log.error("【Vertx-Tcp-Server】 => TCP服务启动失败,错误信息:{}", ex.getMessage(), ex);
}
}));
netServer.subscribeOn(Schedulers.boundedElastic()).subscribe();
}
@Override
public synchronized void stop() {
isClosed = true;
netServer.doOnNext(server -> server.close().onComplete(result -> {
if (result.succeeded()) {
log.info("【Vertx-Tcp-Server】 => HTTP服务停止成功,端口:{}", server.actualPort());
}
else {
Throwable ex = result.cause();
log.error("【Vertx-Tcp-Server】 => HTTP服务停止失败,错误信息:{}", ex.getMessage(), ex);
}
})).subscribeOn(Schedulers.boundedElastic()).subscribe();
}
public void deploy() {
// 部署服务
vertx.deployVerticle(this);
// 停止服务
Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
}
private Flux<NetServerOptions> getTcpServerOptions() {
return Flux.fromIterable(properties.getPorts()).map(this::getTcpServerOption);
}
private NetServerOptions getTcpServerOption(int port) {
NetServerOptions options = new NetServerOptions();
options.setHost(properties.getHost());
options.setPort(port);
options.setClientAuth(properties.getClientAuth());
options.setSni(properties.isSni());
options.setUseProxyProtocol(properties.isUseProxyProtocol());
options.setProxyProtocolTimeout(properties.getProxyProtocolTimeout());
options.setProxyProtocolTimeoutUnit(properties.getProxyProtocolTimeoutUnit());
options.setRegisterWriteHandler(properties.isRegisterWriteHandler());
options.setAcceptBacklog(properties.getAcceptBacklog());
return options;
}
}
这个只是一个demo,实际生产中,比较复杂,会出现粘包和拆包,需要自定义相关规则
我是老寇,我们下次再见啦!