怎樣用C#進行儀器的設備改造?因為儀器控製程式是C#開發的,所以客戶端最好是c#,考慮到節省流量(伺服器是按流量收費的),檔案要壓縮,C#下要實現檔案壓縮功能。而服務端選擇java建構restful API進行上傳的的方案。
依照領導的要求,要改造一台儀器,加入點功能,將測量資料上傳到伺服器。儀器測量節拍大概是20s,數據量目前不大,每次測量大概不到2M左右,都是浮點數據和整形數據。
起初想用TCP長連接實現的,但考慮到現場環境。典型的製造業車間,電磁環境複雜,網路訊號不穩,所以不考慮TCP長連接實現。短連接也不在考慮範圍內,以後儀器數量多了之後頻繁的建立連接開銷也很大,伺服器有可能受不了(阿里雲的乞丐版)。所以選擇用restful提交,http的通訊可以多執行緒調度。
儀器控製程式是C#開發的,所以客戶端最好是c#。服務端我想用springboot,很方便。
考慮到新增的上傳功能不能影響先前的測量節拍,所以要多執行緒實作。可惜我又很懶,不想考慮線程協調問題,最後選擇訊息佇列實作。
考慮到節省流量(伺服器是按流量收費的),檔案要壓縮,C#下要實現檔案壓縮功能。
從測量檔案讀取數據,將參數存入資料庫,測量原始資料打包放在檔案伺服器上。
最後的技術方案就是C#做客戶端,java建置服務端restful API進行上傳
整體架構如下圖:
所使用的技術如下:
C#的Restful客戶端:RestSharp
#java的Restful服務端:springboot
C#端訊息佇列:NetMQ
傳輸檔案採用MultipartFile方式,因為客戶端的ResrSharp只能採用這種方式傳遞檔案
@RestController @RequestMapping(value = "upload") public class FileRestController { Logger logger = LogManager.getLogger(FileRestController.class); @RequestMapping(value = "file", method = RequestMethod.POST) public @ResponseBody RestResult getZipFile(@RequestParam("file") MultipartFile file) throws IOException, URISyntaxException { RestResult result = new RestResult(); if (!file.getName().isEmpty()) { InputStream stream = file.getInputStream(); // String directory = FileRestController.class.getProtectionDomain().getCodeSource().getLocation().toURI().getPath(); String directory = "/usr/local/haliang/files/"; try { directory = URLDecoder.decode(directory, "utf-8"); } catch (java.io.UnsupportedEncodingException e) { return null; } FileOutputStream fs = new FileOutputStream(directory + file.getOriginalFilename()); logger.info("文件所在的目录: " + directory + "/files/" + file.getOriginalFilename()); byte[] buffer = new byte[1024 * 1024]; int bytesum = 0; int byteread = 0; while ((byteread = stream.read(buffer)) != -1) { bytesum += byteread; fs.write(buffer, 0, byteread); fs.flush(); } fs.close(); stream.close(); logger.info("成功接收文件: " + directory + file.getOriginalFilename()); } return result; } }四、客戶端客戶端架構如下圖:
它對socket通訊進行了封裝,使得我們不需要寫socket函數呼叫就能完成複雜的網路通訊。
它跟Socket的區別是:普通的socket是端到端的(1:1的關係),而ZMQ卻是可以N:M的關係,人們對BSD套接字的了解較多的是點對點的連接,點對點連接需要明確地建立連接、銷毀連接、選擇協定(TCP/UDP)和處理錯誤等,而ZMQ屏蔽了這些細節,讓你的網路程式設計更為簡單。
它是一個訊息處理佇列庫,可在多個執行緒、核心和主機盒之間彈性伸縮。和一般意義上的消息隊列產品不同的是,它沒有訊息隊列伺服器,而更像是網路通訊庫。從網路通訊的角度來看,它處於會話層之上,應用層之下,屬於傳輸層。
由请求端发起请求,然后等待回应端应答。一个请求必须对应一个回应,从请求端的角度来看是发-收配对,从回应端的角度是收-发对。跟一对一结对模型的区别在于请求端可以是1~N个。
请求端和回应端都可以是1:N的模型。通常把1认为是server,N认为是Client。ZeroMQ可以很好的支持路由功能(实现路由功能的组件叫作Device),把1:N扩展为N:M(只需要加入若干路由节点)。从这个模型看,更底层的端点地址是对上层隐藏的。每个请求都隐含有回应地址,而应用则不关心它。通常把该模型主要用于远程调用及任务分配等。
(NetMQ请求响应C#调用案例)
发布端单向分发数据,且不关心是否把全部信息发送给订阅端。如果发布端开始发布信息时,订阅端尚未连接上来,则这些信息会被直接丢弃。订阅端未连接导致信息丢失的问题,可以通过与请求回应模型组合来解决。订阅端只负责接收,而不能反馈,且在订阅端消费速度慢于发布端的情况下,会在订阅端堆积数据。该模型主要用于数据分发。天气预报、微博明星粉丝可以应用这种经典模型。 (NetMQ发布订阅模式C#调用案例)
Server端作为Push端,而Client端作为Pull端,如果有多个Client端同时连接到Server端,则Server端会在内部做一个负载均衡,采用平均分配的算法,将所有消息均衡发布到Client端上。与发布订阅模型相比,推拉模型在没有消费者的情况下,发布的消息不会被消耗掉;在消费者能力不够的情况下,能够提供多消费者并行消费解决方案。该模型主要用于多任务并行。
(NetMQ推拉模式C#调用案例)
TCP:ZeroMQ基于消息,消息模式,而非字节流。
XMPP:ZeroMQ更简单、快速、更底层。Jabber可建在ZeroMQ之上。
AMQP:完成相同的工作,ZeroMQ要快100倍,而且不需要代理(规范更简洁——少278页)
IPC:ZeroMQ可以跨多个主机盒,而非单台机器。
CORBA:ZeroMQ不会将复杂到恐怖的消息格式强加于你。
RPC:ZeroMQ完全是异步的,你可以随时增加/删除参与者。
RFC 1149:ZeroMQ比它快多了!
29west LBM:ZeroMQ是自由软件!
IBM低延迟:ZeroMQ是自由软件!
Tibco:仍然是自由软件!
一般都是发布者先启动,绑定监听端口。封装了一个发送函数,主要是发送原先软件生成测量文件的路径。
public class Publisher { public int Port { get; set; } private PublisherSocket socket; /// <summary> /// 构造函数 /// </summary> /// <param name="port">绑定的端口</param> public Publisher(int port) { Port = port; } /// <summary> /// 启动发布端 /// </summary> public void Start() { NetMQContext context = NetMQContext.Create(); this.socket = context.CreatePublisherSocket(); this.socket.Bind("tcp://127.0.0.1:" + Port); } /// <summary> /// 发送数据 /// </summary> /// <param name="result"></param> public void Send(string result) { socket.SendFrame(result); } }
订阅者启动时候连接端口。防止线程阻塞,订阅者是新开一个线程运行的。
public class Subscribe { private delegate void GetDataHandler(string message); private event GetDataHandler onGetData; public int Port { get; set; } public string TempDirectory { get; set; } public bool isRunning { get; set; } public string domain { get; set; } public Subscribe(int port, string domain) { Port = port; this.domain = domain; onGetData += ProcessData; } private SubscriberSocket socket; public void Start() { this.isRunning = true; NetMQContext context = NetMQContext.Create(); socket = context.CreateSubscriberSocket(); socket.Connect("tcp://127.0.0.1:" + Port); socket.Subscribe(""); Thread t = new Thread(new ThreadStart(StartSub)); t.Start(); } private void StartSub() { while (isRunning) { Thread.Sleep(10000); string result = socket.ReceiveFrameString(Encoding.UTF8); onGetData(result); } } private void ProcessData(string path) { Console.WriteLine("收到文件:" + path); string compressedFile = Compress.CompressFile(TempDirectory, path); new RestPost(domain).Post(compressedFile); }
压缩使用DotNetZip组件,非常简单好用。
public class Compress { public static string CompressFile(string temp,string txtPath) { string txtFileName = System.IO.Path.GetFileNameWithoutExtension(txtPath); string compressedFileName = temp+"/"+txtFileName + ".zip"; ZipFile file=new ZipFile(); file.AddFile(txtPath,""); file.Save(compressedFileName); return compressedFileName; } }
使用RestSharp组件,也是非常简单。异步回调,不影响性能。
public class RestPost { public string Domain { get; set; } public RestPost(string domain) { Domain = domain; } public void Post(string path) { RestRequest request = new RestRequest(Method.POST); request.AddFile("file", path); RestClient client = new RestClient {BaseUrl = new Uri("http://" + Domain + "/upload/file")}; client.ExecuteAsync(request, (response) => { if (response.StatusCode == HttpStatusCode.OK) { Console.WriteLine("上传成功...\n" + response.Content); } else { Console.WriteLine($"出错啦:{response.Content}"); } } ); } }
写代码之前一定要搞清楚需求,设计好架构
注意消息队列启动时候的线程问题
异步执行
相关文章:
相关视频:
以上是儀器設備改造技術,實現測量數據上傳到伺服器的功能的詳細內容。更多資訊請關注PHP中文網其他相關文章!