您当前的位置: 首页 >  物联网

qianbo_insist

暂无认证

  • 0浏览

    0关注

    399博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

modbus协议的物联网tcp server

qianbo_insist 发布时间:2021-07-26 21:21:16 ,浏览量:0

定义tcp server协议的数据结构

这里使用了libuv为基础,来制作高性能的服务器,而作为服务器,我们必须支撑一种或者多种协议,这里使用类似于modbus协议的,数据结构被写死了,并不灵活。以0x69开头,0x16为结尾,如果需要适合其他协议,需要改写代码,一种方法是使用js来做脚本,或者json来做,用js更为简单。并且可以以执行脚本的方式来执行一些初始化的指令。

//协议头部和回调函数设置
typedef struct tcp_settings
{
	urls url;
	//datatype 不同类型
	//
	char datatype = 0;
	unsigned char head = 0x69; //0x69 = 105
	//如果有头部起始位置一定为0 一个字节如0x69
	char headoffset = 0;
	//起始头部为0 或者 1个字节 .2 .3 .4
	char headlength = 0;
	//从第一个字节开始是id,从0开始算
	char idoffset = 1;
	//4个字节为id长度,设备id的长度
	char idlength = 4;
	//命令长度
	char cmdlength = 1;
	//命令偏移量
	char cmdoffset = 5;
	//代表包长内容的只有一个字节
	char content_len = 1;
	//代表包长度的是第6个字节为起始位置,从0开始算
	char content_offset = 6;

	char includeht = 0;
	//是否包含crc校验和end
	char includecrcend = 0;
	// 0 是没有校验,2是2个字节crc16 
	char crclen = 2; 

	char headfieldslen = 0;
	
	char timestamp_offset = 0;
	char timestamp_len = 0;
	char type_offset = 0;
	char type_len = 0;
	char memo_offset = 0;
	char memo_len = 0;

	//一个结束字节,如果为零则忽略end
	char end_start = 1;
	//end_start 如果为零则忽略
	unsigned char end = 0x16;
	
	/*保留 
	big endian 0 
	little endian 1
	*/
	char bl = enum_big;

	/*计算出来的长度
	   初始化为-1,
	   否则大于0
	*/
	int hlen_calc = -1;
	
	uv_loop_t * uv_loop = NULL;
}tcp_settings;
主代码

要达到高性能的代码, 1、少量的拷贝 尽量少拷贝数据 2、线程池 使用线程池处理数据 3、异步 使用异步方式接收链接和接收数据 libuv支持了线程池的处理方式,以及本身就是异步方式的api,给我们提供了很大的方便性。

定义线程池数据结构
typedef struct thread_work {
	thread_work(client_t* cli, tcp_unit * unit) :
		request(),
		client(cli),
		data(unit),
		error(false) {
		//保存数据指针,传到处理线程
		request.data = this;
	}
	uint32_t id =0;
	client_t* client = NULL;
	//把数据接过来进行处理
	tcp_unit * data = NULL;
	uv_work_t request;
	bool error;
}thread_work;
code
#include "uv.h"
#include "protocol.h"
#include "util.h"
#include 
#include 

using namespace std;
#define MAX_WRITE_HANDLES 1000

//考虑quick js
class tcp_server
{
	//包头和包体的
	tcp_settings _set;
	//第二代数据结构,把解析放到脚本里,不要用c++解析,只接收数据
	tcp_settings2 _set2;
	//tcpserver
	uv_tcp_t * _server = NULL;
public:
	tcp_server(){}
	~tcp_server() {}
protected:
	//客户端断开连接
	static void on_close(uv_handle_t* handle) {
		//客户端下线
		client_t* client = (client_t*)handle->data;
		client->clean();
		delete client;
		//client_offline(client->deviceid);
	}

	

	static void alloc_cb(uv_handle_t * handle, size_t suggested_size, uv_buf_t* buf) {
		int buflen = 0;
		int headlen = 0;
		int bodylen = 0;
		client_t* client = (client_t*)handle->data;
		char *head = &(client->head[0]);     //数据接收的头部
		char *pos = head + client->recvlen; //位置指向数据已经接收的下一个字节
		//得到头部长度
#ifdef _USE_SETTING1
		headlen = get_headlen(client->config);
#else
		headlen = get_headlen2(client->config2);
#endif
		if (client->status == enum_head) //接收头部字节
		{
			buflen = headlen - client->recvlen;
			*buf = uv_buf_init(pos, buflen);
		}
		else  //接收数据部分字节
		{
			//得到包体长度
#ifdef _USE_SETTING1
			bodylen = get_bodylen(client->config, head); 
#else
			bodylen = get_bodylen2(client->config2, head); 
#endif
			//printf("the body len is %d\n", bodylen);
			if (bodylen > 0) {
				if (client->buffer_data == NULL) {
					//总长度
					//printf("create memory\n");
					client->buffer_data = new tcp_unit();
					//加上头部长度
					client->buffer_data->headlen = headlen;
					client->buffer_data->data = new char[bodylen + headlen];
					client->buffer_data->tlen = bodylen + headlen;
					//数据接收的长度加上头部的长度,开始接收数据体
					client->buffer_data->recvlen = headlen;
					//拷贝头部
					memcpy(client->buffer_data->data, head, headlen);
					*buf = uv_buf_init(client->buffer_data->data + headlen, bodylen);
				}
				else {
					//前面加了头部
					char * pos = client->buffer_data->data + client->buffer_data->recvlen;
					buflen = client->buffer_data->tlen - client->buffer_data->recvlen;
					*buf = uv_buf_init(pos, buflen);
				}
			}
			else { //否则没有包体,只有包头
				client->buffer_data->tlen = bodylen;
				client->buffer_data->recvlen = 0;
			}
		}

	}
	//


	
	static void worker(uv_work_t* req) {		
		thread_work * rb = (thread_work *)req->data;
		tcp_unit *tu = rb->data;

		tcp_server *server =(tcp_server*)rb->client->data;
		server->on_data(tu);
	}
	static void after_worker(uv_work_t* req,int status) {

		thread_work *work = static_cast(req->data);
		tcp_unit * tu = work->data;
		free(tu->data);
		free(tu);
		free(work);
		
	}
	static int tcp_parser_execute(client_t* client, char *data, int size)
	{
		int headlen = 0;
#ifdef _USE_SETTING1
		headlen = get_headlen(client->config);
#else
		headlen = get_headlen2(client->config2);
#endif
		if (client->status == enum_head)
		{
			client->recvlen += size;
			//如果头部字节已经接收完毕
			if (headlen == client->recvlen)
			{
				client->status = enum_body; //开始接收包体数据
				//头部已经接收完毕则发生事件
				//继承加入可以发生事件,如加入列表
				tcp_server * server = (tcp_server *)client->data;
				server->on_headers_complete(client);
			}
		}
		else if (client->status == enum_body)
		{
			client->buffer_data->recvlen += size;
			if (client->buffer_data->tlen == client->buffer_data->recvlen) {//数据已经接收完毕
																			//包头数据初始化
				client->recvlen = 0;
				client->status = enum_head; //开始重新接收包头
				if (client->buffer_data->tlen > 0) {
					//发生事件
					tcp_server * server = (tcp_server *)client->data;
					server->on_message_complete(client);

					thread_work * work = new thread_work(client,client->buffer_data);
					work->id = client->deviceid;
					client->buffer_data = NULL;

					//消息体接收结束,交给线程池处理
					int status = uv_queue_work(client->config->uv_loop,
						&work->request,
						worker,
						after_worker);
					CHECK(status, "uv_queue_work");
				}
			}
		}
		return client->recvlen;

	}

	static void on_read(uv_stream_t* tcp, ssize_t nread, const uv_buf_t * buf) {
		ssize_t parsed;
		client_t* client = (client_t*)tcp->data;
		if (nread >= 0) {
			parsed = (ssize_t)tcp_parser_execute(
				client, buf->base, nread);
			if (parsed parser;
				
				uv_close((uv_handle_t*)&client->tcp, on_close);
			}
		}
		else {
			if (nread != UV_EOF) {
				UVERR(nread, "read");
			}
			uv_close((uv_handle_t*)&client->tcp, on_close);
		}

	}

	static void on_connect(uv_stream_t* server_handle, int status) {
		CHECK(status, "connect");
		printf("connected!\n");
		tcp_server * server  = (tcp_server *)server_handle->data;
		client_t* client = (client_t*)calloc(1, sizeof(client_t));
		client->config   = &(server->_set);
		//这里还没有改成读json文件,后面停用脚本文件
		client->config2  = &(server->_set2);
		client->data     = server;
		uv_loop_t * uv_loop = client->config->uv_loop;

		uv_tcp_init(uv_loop, &client->tcp);

		client->tcp.data = client;

		int r = uv_accept(server_handle, (uv_stream_t*)&client->tcp);
		if (r == 0) {
			uv_read_start((uv_stream_t*)&client->tcp, alloc_cb, on_read);
		}
		else {
			CHECK(r, "accept");
			uv_close((uv_handle_t*)(&client->tcp), on_close);
		}
	}



public:
	//头部接收完毕
	virtual int on_headers_complete(void *param) {
		return 0;
	}

	virtual int on_message_complete(void *param) {
		return 0;
	}
	//收到完整一帧数据
	virtual int on_data(tcp_unit * data) {
		return 0;
	}
	int start(const char * ip, uint16_t port) {
		int r = uv_tcp_init(_set.uv_loop, _server);
		//保存用户数据,是tcp server 本身指针
		_server->data = this; // &_set;
		CHECK(r, "tcp_init");
		r = uv_tcp_keepalive(_server, 1, 60);
		CHECK(r, "tcp_keepalive");
		struct sockaddr_in address;
		r = uv_ip4_addr(ip, port, &address);
		CHECK(r, "ip4_addr");
		r = uv_tcp_bind(_server, (const struct sockaddr*)&address, 0);
		CHECK(r, "tcp_bind");
		r = uv_listen((uv_stream_t*)_server, MAX_WRITE_HANDLES, on_connect);
		CHECK(r, "uv_listen");

		//tcp_servers.push(port, server);
		return 0;

	}
	int  tcp_init(const char * configfile,uv_loop_t * uv_loop)
	{
	//fix me ,here is not use configfile
		_server = new uv_tcp_t();
		_set.uv_loop = uv_loop;
#if  0
		if (read_config_protocol(configfile, _set) != 0)
		{
			cout             
关注
打赏
1663161521
查看更多评论
0.0387s