一 、 概述
为了弥补代码的遗失,木舟IOT平台正在加班加点进行研发,后面不只是针对于IOT设备接入上报,告警,视频管理,组态数据可视化大屏,后面还会有快速搭建微服务平台,利用surging.cli工具根据数据库表生成微服务,中间服务,能让程序员快速完成BOSS交给的任务,从而在这个内卷的社会能占有一席之地。这些都是没有完成任务的空话,现在发此篇的目的是作者有能力开发出优秀的IOT平台,先介绍一个比较突出的功能,就是可以基于共享或者独立配置添加网络组件, 下面来介绍一下如何添加网络组件。
一键运行打包成品下载:https://pan.baidu.com/s/11hcf9ieCkJxlGrzvIuxeQA?pwd=ajsr
测试用户:fanly
测试密码:123456
为了让大家节约时间,能尽快运行产品看到效果,上面有 一键运行打包成品可以进行下载测试运行。
二、如何测试运行
以下是目录结构,
IDE:consul 注册中心
kayak.client: 网关
kayak.server:微服务
apache-skywalking-apm:skywalking链路跟踪
以上是目录结构,大家不需要一个个运行,只需要打开运行startup.bat,如果需要测试skywalking ,只需要apache-skywalking-apm\bin\startup.bat 文件就可以了,以下是运行的界面
三、如何添加组件
1.添加http服务组件,
打开平台界面,然后点击设备接入->网络组件,然后可以看到如下界面
再点击新增组件或者编辑组件,完成后注意启动状态是关闭状态,此时并不能对于该组件功能进行访问调用,只有把启动状态打开,才能访问调用
以上是http服务组件,启动完成后,如果设置了webservice和swagger,你可以访问webservice和swagger,看是否可以访问
2.添加/编辑Tcp服务组件
当添加/编辑Tcp组件时,设置Host:127.0.0.1 ,port:248并且还有解析方式选项,选项里面有不处理,固定长度,分隔符,自定义脚本,下面我们就来看自定义脚本
添加脚本如下:
parser.Fixed(4).Handler( function(buffer){ var buf = BytesUtils.Slice(buffer,1,4); parser.Fixed(buffer.ReadableBytes).Result(buf); }).Handler( function(buffer){parser.Fixed(8).Result(buffer);} ).Handler(function(buffer){ parser.Result('处理完成','gb2312').Complete(); } )
而基于TCP服务代码如下,需要继承于TcpBehavior
internal class TcpDeviceDataService : TcpBehavior, ITcpDeviceDataService { private readonly IDeviceProvider _deviceProvider; public TcpDeviceDataService(IDeviceProvider deviceProvider) { _deviceProvider = deviceProvider; } public override void Load(string clientId, NetworkProperties tcpServerProperties) { var deviceStatus =_deviceProvider.IsConnected(clientId);this.Parser.HandlePayload().Subscribe(async buffer => await ParserBuffer(buffer)); } public override void DeviceStatusProcess(DeviceStatus status, string clientId, NetworkProperties tcpServerProperties) { //throw new NotImplementedException(); } public async Task ParserBuffer(IByteBuffer buffer) { List<string> result = newList<string>();while (buffer.ReadableBytes > 0) { result.Add(buffer.ReadString(this.Parser.GetNextFixedRecordLength(), Encoding.GetEncoding("gb2312"))); } // var str= buffer.ReadString(buffer.ReadableBytes, Encoding.UTF8);var byteBuffer = Unpooled.Buffer(); byteBuffer.WriteString("\r\n", Encoding.UTF8); byteBuffer.WriteString("处理完成", Encoding.GetEncoding("gb2312")); await Sender.SendAndFlushAsync(byteBuffer); // await Sender.SendAndFlushAsync("消息已接收",Encoding.GetEncoding("gb2312"));this.Parser.Close(); } public Task<bool> ChangeDeviceStage(string deviceId) { thrownew NotImplementedException(); } }
用测试Tcp调试工具结果如下
3.添加/编辑UDP服务组件
当添加/编辑UDP组件时, 设置Host:127.0.0.1 ,port:267 并且可以是否开启组播
而基于udp服务代码如下,需要继承于UdpBehavior
internal class UdpDeviceDataService : UdpBehavior, IUdpDeviceDataService { public Task<bool> ChangeDeviceStage(string deviceId) { thrownew NotImplementedException(); } public override async Task Dispatch(IEnumerable<byte> bytes) { await Sender.SendAndFlushAsync("\r\n", Encoding.UTF8); await Sender.SendAndFlushAsync("处理完成", Encoding.GetEncoding("gb2312")); } }
测试结果如下:
4.添加/编辑WebSocket服务组件
当添加/编辑WebSocket组件时, 设置Host:127.0.0.1 ,port:55
而基于websocket服务代码如下,需要继承于WSBehavior
internalclass WSDeviceDataService : WSBehavior, IWSDeviceDataService{ protectedoverridevoid OnMessage(MessageEventArgs e) { this.Client.Value.SendTo($"send:{e.Data},\r\n reply:hello,welcome to you!",ID); } protectedoverridevoid OnOpen() { }}
测试结果如下:
5.添加/编辑UDP服务组件
当添加/编辑WebSocket组件时, 设置Host:127.0.0.1 ,port:345
添加greet.proto文件,脚本如下:
syntax = "proto3"; package Greet; service Greeter { // Sends a greeting rpc ChangeDeviceStage (DeviceRequest) returns (DeviceReply) {}} message DeviceRequest { string deviceId = 1;} message DeviceReply { bool message = 1;}
然后再创建GreeterBehavior,继承Greeter.GreeterBase, IServiceBehavior,代码如下
publicpartialclass GreeterBehavior : Greeter.GreeterBase, IServiceBehavior{ private ServerReceivedDelegate received; publicevent ServerReceivedDelegate Received { add { if (value == null) { received += value; } } remove { received -= value; } } publicstring MessageId { get; } = Guid.NewGuid().ToString("N");publicasync Task Write(objectresult,int statusCode = 200,string exceptionMessage = "") { if (received == null)return;var message = newTransportMessage(MessageId,new ReactiveResultMessage { ExceptionMessage = exceptionMessage, StatusCode = statusCode, Result = result }); await received(message); } public T CreateProxy<T>(stringkey)where T : class{returnServiceLocator.GetService<IServiceProxyFactory>().CreateProxy<T>(key); } publicobject CreateProxy(Type type) { returnServiceLocator.GetService<IServiceProxyFactory>().CreateProxy(type); } publicobjectCreateProxy(string key, Type type) { returnServiceLocator.GetService<IServiceProxyFactory>().CreateProxy(key, type); } public T CreateProxy<T>() where T : class{returnServiceLocator.GetService<IServiceProxyFactory>().CreateProxy<T>(); } public T GetService<T>(stringkey)where T : class{if(ServiceLocator.Current.IsRegisteredWithKey<T>(key))returnServiceLocator.GetService<T>(key);elsereturnServiceLocator.GetService<IServiceProxyFactory>().CreateProxy<T>(key); } public T GetService<T>() where T : class{if(ServiceLocator.Current.IsRegistered<T>())returnServiceLocator.GetService<T>();elsereturnServiceLocator.GetService<IServiceProxyFactory>().CreateProxy<T>(); } publicobject GetService(Type type) { if(ServiceLocator.Current.IsRegistered(type))returnServiceLocator.GetService(type);elsereturnServiceLocator.GetService<IServiceProxyFactory>().CreateProxy(type); } publicobjectGetService(string key, Type type) { if (ServiceLocator.Current.IsRegisteredWithKey(key, type)) return ServiceLocator.GetService(key, type); elsereturnServiceLocator.GetService<IServiceProxyFactory>().CreateProxy(key, type); } publicvoid Publish(IntegrationEvent @event) { GetService<IEventBus>().Publish(@event); }}
而基于grpc服务代码如下,需要继承于刚刚创建的GreeterBehavior
publicclass GrpcDeviceDataService : GreeterBehavior, IGrpcDeviceDataService { publicoverrideTask<DeviceReply> ChangeDeviceStage(DeviceRequest request, ServerCallContext context) { returnTask.FromResult(new DeviceReply { Message =true }) ; } }
以下是测试结果:
6.添加/编辑MQTT服务组件
当添加/编辑MQTT组件时, 设置Host:127.0.0.1 ,port:425
而基于mqtt服务代码如下,需要继承于MqttBehavior
publicclass MQTTDeviceDataService : MqttBehavior, IMQTTDeviceDataService { publicoverrideasyncTask<bool> Authorized(stringusername,string password) { bool result = false;if (username == "admin" && password == "123456") result =true;returnawait Task.FromResult(result); } publicasyncTask<bool> IsOnline(string deviceId) { returnawaitbase.GetDeviceIsOnine(deviceId); } publicasync Task Publish(string deviceId, WillMessage message) { var willMessage = new MqttWillMessage { WillMessage = message.Message, Qos = message.Qos, Topic = message.Topic, WillRetain = message.WillRetain }; await Publish(deviceId, willMessage); await RemotePublish(deviceId, willMessage); } }
以下是测试结果:
三、总结
木舟IOT平台会在github开源社区版本,可以自由更改代码,用于商业项目,但不能自营平台,如低代码平台,IOT平台等,如有违反,后果自负,还有最好不要更改命名空间,然后跟公司说是自己研发的,如果知道后,我在博客全网通报此人,以前surging相关的事件就算了,就当没发生过。,如果碰到困难,比较紧急的话,可以联系作者,加群:744677125