在物联网项目中,我们的软件与车牌识别通讯,通常使用MQTT通讯更简单。
MQTT配制
测试工具
C# MqttHelper
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using System;
using System.Collections.Generic;
using YUTU.DAL.Common;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Forms;
using YUTU.BLL.Global;
using System.Threading;
namespace YUTU.BLL.Common.ClientMidle
{
public class MqttHelper
{
public IMqttClient _mqttClient;
string serverIP, serverPort, user, pwd;
public MqttHelper(string ipAddr, string port, string userName, string userPwd)
{
serverIP = ipAddr;
serverPort = port;
user = userName;
pwd = userPwd;
}
public bool blconntion = false;
MqttClientOptions options = null;
public Action GetActionSubscribe;
public async void ConntionServer() //async
{
try
{
LogMessage("serverIP[" + serverIP + "]serverPort[" + serverPort + "]user[" + user + "]pwd[" + pwd + "]");
options = new MqttClientOptions() { ClientId = Guid.NewGuid().ToString("D") };
options.ChannelOptions = new MqttClientTcpOptions()
{
Server = serverIP,
Port = Convert.ToInt32(serverPort)
};
options.Credentials = new MqttClientCredentials()
{
Username = user,
Password = pwd
};
options.CleanSession = true;
options.KeepAlivePeriod = TimeSpan.FromSeconds(100.5);
options.KeepAliveSendInterval = TimeSpan.FromSeconds(20000);
if (null != _mqttClient)
{
await _mqttClient.DisconnectAsync();//await
_mqttClient = null;
}
_mqttClient = new MqttFactory().CreateMqttClient();
_mqttClient.ApplicationMessageReceived += (sender, args) =>
{
GetActionSubscribe?.Invoke(Encoding.UTF8.GetString(args.ApplicationMessage.Payload));
//ActionSubscribe?.Invoke(Encoding.UTF8.GetString(args.ApplicationMessage.Payload));
LogMessage($"ApplicationMessageReceived--ClientID:{args.ClientId} | TOPIC:{args.ApplicationMessage.Topic} | Payload:{Encoding.UTF8.GetString(args.ApplicationMessage.Payload)} | QoS:{args.ApplicationMessage.QualityOfServiceLevel} | Retain:{args.ApplicationMessage.Retain}");
};
_mqttClient.Connected += (sender, args) =>
{
LogMessage($"Connected--Client is Connected: IsSessionPresent:{args.IsSessionPresent}");
};
_mqttClient.Disconnected += (sender, args) =>
{
LogMessage($"Client is DisConnected ClientWasConnected:{args.ClientWasConnected}");
};
await _mqttClient.ConnectAsync(options);//await
}
catch (Exception ex)
{
LogMessage("ConntionServer异常:" + ex.Message);
}
}
public async void DisConntionServer()
{
try
{
LogMessage("开始执行DisConntionServer");
if (null != _mqttClient && _mqttClient.IsConnected)
{
await _mqttClient.DisconnectAsync();
_mqttClient.Dispose();
_mqttClient = null;
}
LogMessage("执行DisConntionServer成功...");
}
catch (Exception ex)
{
LogMessage("DisConntionServer异常:" + ex.Message);
}
}
public async void HeartConntionServer()
{
try
{
LogMessage("开始执行HeartConntionServer");
if (null != _mqttClient && options != null)
{
await _mqttClient.ConnectAsync(options);
}
LogMessage("执行HeartConntionServer成功...");
}
catch (Exception ex)
{
LogMessage("HeartConntionServer异常:" + ex.Message);
}
}
public void SendMessage(string Topic, string Payload)
{
try
{
LogMessage("SendMessage---Topic[" + Topic + "]Payload[" + Payload + "]");
Task.Factory.StartNew(async () =>
{
var msg = new MqttApplicationMessage()
{
Topic = Topic,
Payload = Encoding.UTF8.GetBytes(Payload),
QualityOfServiceLevel =
(MqttQualityOfServiceLevel)
Enum.Parse(typeof(MqttQualityOfServiceLevel), "1"),//Level
Retain = false
};
if (null != _mqttClient)
{
await _mqttClient.PublishAsync(msg);
}
else
{
LogMessage("SendMessage_mqttClient为null");
}
LogMessage("SendMessage成功:" + Payload);
});
}
catch (Exception ex)
{
LogMessage("SendMessage异常:" + ex.Message);
}
}
public async void SubscribeAsync(string Topic)
{
try
{
LogMessage("SubscribeAsync---Topic[" + Topic + "]");
if (_mqttClient == null) return;
Thread.Sleep(1000);
await _mqttClient.SubscribeAsync(
new List
{
new TopicFilter(
Topic,
(MqttQualityOfServiceLevel)
System.Enum.Parse(typeof (MqttQualityOfServiceLevel), "1"))
});
}
catch (Exception ex)
{
LogMessage("SubscribeAsync异常:" + ex.Message);
}
}
}
}
订阅消息
private MqttHelper _mqttHelper = null;
private static string mqttIP = ConfigurationManager.AppSettings["MqttIP"];
private static string mqttTCPPort = ConfigurationManager.AppSettings["MqttTCPPort"];
private static string mqttUser = ConfigurationManager.AppSettings["MqttUser"];
private static string mqttPwd = ConfigurationManager.AppSettings["MqttPwd"];
public void Init()
{
try
{
//MQTT参数配置
string ipAddr = mqttIP;
string port = mqttTCPPort;
string userName = mqttUser;
string userPwd = mqttPwd;
//mqtt消息队列
_mqttHelper = new MqttHelper(ipAddr, port, userName, userPwd);
}
catch (Exception e)
{
throw new Exception($"Init异常,{e.Message}");
}
}
public void TurnOn()
{
try
{
if (_mqttHelper == null)
throw new Exception("Init异常");
_mqttHelper.ConntionServer();
//接收订阅消息
_mqttHelper.GetActionSubscribe = (obj) =>
{
var msgObj = JsonConvert.DeserializeObject(obj);
if (msgObj != null)
{
ReceiveMsgCallback?.Invoke(msgObj);
}
};
//订阅消息
_mqttHelper.SubscribeAsync($"071c55ad-f1a5a757/device/message/up/ivs_result");
//MQTT心跳
MessageListenerTimer = new System.Timers.Timer(240000);
MessageListenerTimer.Elapsed += new ElapsedEventHandler(MessageListenerTimer_Elapsed);
MessageListenerTimer.Enabled = true;
}
catch (Exception e)
{
throw new Exception($"TurnOn异常,{e.Message}");
}
}
public void TurnOff()
{
try
{
MessageListenerTimer.Enabled = false;
if (_mqttHelper != null)
_mqttHelper.DisConntionServer();
}
catch (Exception e)
{
throw new Exception($"TurnOff异常,{e.Message}");
}
}
解析车号
var license = msgObj.payload.AlarmInfoPlate.result.PlateResult.license;
// 解码车号
string base64License = System.Text.Encoding.UTF8.GetString(Convert.FromBase64String(license));