- 系列文章目录
- 前言
- 一、WebSocket简介
- 二、WebSocket产生背景
- 三、WebSocket实现原理
- 四、WebSocket协议举例
- 五、WebSocket使用
- 1.WebSocket 介绍
- 2.WebSocket API
- 3.WebSocket事件
- 1.open
- 2.Message
- 3.Error
- 4.Close
- 4.WebSocket 方法
- 1.send()
- 2.close()
- 5.WebSocket 属性
- 1.readyState
- 2.bufferedAmount
- 3.protocol
- 六、WebSocket语言支持
- 七、WebSocket通信
- 1.连接握手
- 1.请求
- 2.应答
- 2.数据传输(双工)
- 3.关闭请求
- 八、WebSocket前世今生
- 1.WebSocket前世
- 1.Http keep-alive
- 2.轮询(Polling)
- 3.长轮询(long polling)
- 2.WebSocket今生
- 1.什么是Websocket?它的特点又有哪些?
- 2.WebSocket的通讯流程和协议解析
- 3.总结
- 九、WebSocket协议进一步理解
- 1.WebSocket是什么
- 2.WebSocket出现之前的实时技术
- 3.WebSocket应用场景
- 4.WebSocket协议栈
- 5.WebSocket与HTTP的区别
- 6.WebSocket握手过程
- 7.WebSocket帧格式
- 8.WebSocket分片传输
- 9.WebSocket相关扩展
- 十、WebSocket 能解决什么问题
- 十一、WebSocket工作原理
- 十二、进一步解析什么是WebSocket协议(附代码)
- 1.websocket背景
- 2.websocket概念
- 3.websocket特点
- 1.websocket优点
- 2.websocket缺点
- 4.websocket协议通信过程
- 1.handshake
- 1.客户端
- 2.服务端
- 3.data transfer
- 5.epoll反应堆模型下实现http协议
- 1.客户端结构体
- 2.int http_response(struct qsevent *ev)
- 3.int http_response(struct qsevent *ev)
- 4.总代码
- 6.epoll反应堆模型下实现websocket协议
- 7.C1000K reactor模型,epoll实现,连接并回发一段数据,测试正常
- 总结
WebSocket协议是基于TCP的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端。
WebSocket通信协议于2011年被IETF定为标准RFC 6455,并被RFC7936所补充规范。
- webSocket是什么:
- 1、WebSocket是一种在单个TCP连接上进行全双工通信的协议
- 2、WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据
- 3、在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输
- 4、需要安装第三方包:cmd中:go get -u -v github.com/gorilla/websocket
WebSocket 是一种标准协议,用于在客户端和服务端之间进行双向数据传输。但它跟 HTTP 没什么关系,它是一种基于 TCP 的一种独立实现。
以前客户端想知道服务端的处理进度,要不停地使用 Ajax 进行轮询,让浏览器隔个几秒就向服务器发一次请求,这对服务器压力较高。另外一种轮询就是采用 long poll 的方式,这就跟打电话差不多,没收到消息就一直不挂电话,也就是说,客户端发起连接后,如果没消息,就一直不返回 Response 给客户端,连接阶段一直是阻塞的。
而 WebSocket 解决了 HTTP 的这几个难题。首先,当服务器完成协议升级后( HTTP -> WebSocket ),服务端可以主动推送信息给客户端,解决了轮询造成的同步延迟问题。由于 WebSocket 只需要一次 HTTP 握手,服务端就能一直与客户端保持通讯,直到关闭连接,这样就解决了服务器需要反复解析 HTTP 协议,减少了资源的开销。
WebSocket协议支持(在受控环境中运行不受信任的代码的)客户端与(选择加入该代码的通信的)远程主机之间进行全双工通信。用于此的安全模型是Web浏览器常用的基于原始的安全模式。 协议包括一个开放的握手以及随后的TCP层上的消息帧。 该技术的目标是为基于浏览器的、需要和服务器进行双向通信的(服务器不能依赖于打开多个HTTP连接(例如,使用XMLHttpRequest或和长轮询))应用程序提供一种通信机制。 websocket 是一个基于应用层的网络协议,建立在tcp 协议之上,和 http 协议可以说是兄弟的关系,但是这个兄弟有点依赖 http ,为什么这么说呢?我们都知道 HTTP 实现了三次握手来建立通信连接,实际上 websocket 的创始人很聪明,他不想重复的去造轮子,反正我兄弟已经实现了握手了,我干嘛还要重写一套呢?先让它去冲锋陷阵呢,我坐收渔翁之利不是更香 吗,所以一般来说,我们会先用 HTTP 先进行三次握手,再向服务器请求升级为websocket 协议,这就好比说,嘿兄弟你先去给我排个队占个坑位建个小房子,到时候我在把这房子改造成摩天大楼。而且一般来说 80 和 443 端口一般 web 服务端都会外放出去,这样可以有效的避免防火墙的限制。当然,你创建的 websocket 服务端进程的端口也需要外放出去。
很多人会想问,web开发 使用 HTTP 协议不是已经差不多够用了吗?为什么还要我再多学一种呢?这不是搞事情嘛,仔细想想,一门新技术的产生必然有原因的,如果没有需求,我们干嘛那么蛋疼去写那么多东西,就是因为 HTTP 这个协议有些业务需求支持太过于鸡肋了,从 HTTP 0.9 到现在的 HTTP3.0 ,HTTP协议可以说说是在普通的web开发领域已经是十分完善且高效的了,说这个协议养活了全球半数的公司也不为过吧,像 2.0 服务器推送技术,3.0 采用了 UDP 而放弃了原来的 TCP ,这些改动都是为了进一步提升协议的性能,然而大家现在还是基本使用的 HTTP 1.1 这个最为经典的协议, 也是让开发者挺尴尬的。
绝大多数的web开发都是应用层开发者,大多数都是基于已有的应用层去开发应用,可以说我们最熟悉、日常打交道最多的就是应用层协议了,底下 TCP/IP 协议我们基本很少会去处理,当然大厂可能就不一样了,自己弄一套协议也是正常的,这大概也是程序员和码农的区别吧,搬砖还是创新,差别还是很大的。网络这种分层协议的好处我在之前的文章也说过了,这种隔离性很方便就可以让我们基于原来的基础去拓展,具有较好的兼容性。
总的来说,它就是一种依赖HTTP协议的,支持全双工通信的一种应用层网络协议。
二、WebSocket产生背景简单的说,WebSocket协议之前,双工通信是通过多个http链接来实现,这导致了效率低下。WebSocket解决了这个问题。下面是标准RFC6455中的产生背景概述。
长久以来, 创建实现客户端和用户端之间双工通讯的web app都会造成HTTP轮询的滥用: 客户端向主机不断发送不同的HTTP呼叫来进行询问。
-
这会导致一系列的问题:
-
1.服务器被迫为每个客户端使用许多不同的底层TCP连接:一个用于向客户端发送信息,其它用于接收每个传入消息。
-
2.有些协议有很高的开销,每一个客户端和服务器之间都有HTTP头。
-
3.客户端脚本被迫维护从传出连接到传入连接的映射来追踪回复。
一个更简单的解决方案是使用单个TCP连接双向通信。 这就是WebSocket协议所提供的功能。 结合WebSocket API ,WebSocket协议提供了一个用来替代HTTP轮询实现网页到远程主机的双向通信的方法。
WebSocket协议被设计来取代用HTTP作为传输层的双向通讯技术,这些技术只能牺牲效率和可依赖性其中一方来提高另一方,因为HTTP最初的目的不是为了双向通讯。
三、WebSocket实现原理 在实现websocket连线过程中,需要通过浏览器发出websocket连线请求,然后服务器发出回应,这个过程通常称为“握手” 。在 WebSocket API,浏览器和服务器只需要做一个握手的动作,然后,浏览器和服务器之间就形成了一条快速通道。两者之间就直接可以数据互相传送。在此WebSocket 协议中,为我们实现即时服务带来了两大好处:
- 1. Header:互相沟通的Header是很小的-大概只有 2 Bytes。
- 2. Server Push:服务器的推送,服务器不再被动的接收到浏览器的请求之后才返回数据,而是在有新数据时就主动推送给浏览器。
- 浏览器请求:
- GET /webfin/websocket/ HTTP/1.1。
- Host: localhost。
- Upgrade: websocket。
- Connection: Upgrade。
- Sec-WebSocket-Key: xqBt3ImNzJbYqRINxEFlkg==。
- Origin: http://服务器地址。
- Sec-WebSocket-Version: 13。
- 服务器回应:
- HTTP/1.1 101 Switching Protocols。
- Upgrade: websocket。
- Connection: Upgrade。
- Sec-WebSocket-Accept: K7DJLdLooIwIG/MOpvWFB3y3FE8=。
- WebSocket借用http请求进行握手,相比正常的http请求,多了一些内容。其中:
- Upgrade: websocket。
- Connection: Upgrade。 表示希望将http协议升级到Websocket协议。Sec-WebSocket-Key是浏览器随机生成的base64 encode的值,用来询问服务器是否是支持WebSocket。
- 服务器返回:
- Upgrade: websocket。
- Connection: Upgrade。 告诉浏览器即将升级的是Websocket协议
Sec-WebSocket-Accept是将请求包“Sec-WebSocket-Key”的值,与”258EAFA5-E914-47DA-95CA-C5AB0DC85B11″这个字符串进行拼接,然后对拼接后的字符串进行sha-1运算,再进行base64编码得到的。用来说明自己是WebSocket助理服务器。
Sec-WebSocket-Version是WebSocket协议版本号。RFC6455要求使用的版本是13,之前草案的版本均应当被弃用。
五、WebSocket使用 1.WebSocket 介绍WebSocket 发起单个请求,服务端不需要等待客服端,客户端在任何时候也能发消息到服务端,减少了轮询时候的延迟.经历一次连接后,服务器能给客户端发多次。下图是轮询与WebSocket的区别。 基于http的实时消息是相当的复杂,在无状态的请求中维持回话的状态增加了复杂度,跨域也很麻烦,使用ajax处理请求有序请求需要考虑更多。通过ajax进行交流也不简单。每一个延伸http功能的目的不是增加他的复杂度。websocket 可以大大简化实时通信应用中的链接。
Websocket是一种底层网络协议,可以让你在这个基础上建立别的标准协议。比如在WebSocket的客户端的基础上使用XMPP登录不同的聊天服务器,因为所有的XMPP服务理解相同的标准协议。WebSocket是web应用的一种创新。
为了与其他平台竞争,WebSocket是H5应用提供的一部分先进功能。每个操作系统都需要网络功能,能够让应用使用Sockets与别的主机进行通信,是每个大平台的核心功能。在很多方面,让Web应用表现的像操作系统平台是html5的趋势。像socket这样底层的网络协议APIs不会符合原始的安全模型,也不会有web api那样的设计风格。WebSocket给H5应用提供TCP的方式不会消弱网络安全且有现代的Api。
WebSocket是Html5平台的一个重要组件也是开发者强有力的工具。简单的说,你需要WebSocket创建世界级的web应用。它弥补了http不适合实时通信的重大缺陷。异步、双向通信模式,通过传输层协议使WebSocket具有普遍灵活性。想象一下你能用WebSocket创建正真实实时应用的所有方式。比如聊天、协作文档编辑、大规模多人在线游戏(MMO),股票交易应用等等。
WebSocket是一个协议,但也有一个WebSocket API,这让你的应用去控制WebSocket的协议去响应被服务端触发的事件。API是W3C开发,协议是IETE制定。现代浏览器支持WebSocket API,这包括使用全双工和双向链接的方法和特性。让你执行像打开关闭链接、发送接收消息、监听服务端事件等必要操作。
2.WebSocket APIWebSocket API其实就是一个使用WebSocket协议的接口,通过它来建立全双工通道来收发消息,简单易学,要连接远程服务器,只需要创建一个WebSocket对象实体,并传入一个服务端的URL。在客户端和服务端一开始握手的期间,http协议升级到WebSocket协议就建立了连接,底层都是TCP协议。一旦建立连接,通过WebSocket接口可以反复的发送消息。在你的代码里面,你可以使用异步事件监听连接生命周期的每个阶段。
WebSocket API是纯事件驱动,一旦建立全双工连接,当服务端给客户端发送数据或者资源,它能自动发送状态改变的数据和通知。所以你不需要为了状态的更新而去轮训Server,在客户端监听即可。
首先,我们需要通过调用WebSocket构造函数来创建一个WebSocket连接,构造函数会返回一个WebSocket实例,可以用来监听事件。这些事件会告诉你什么时候连接建立,什么时候消息到达,什么时候连接关闭了,以及什么时候发生了错误。WebSocket协议定义了两种URL方案,WS和WSS分别代表了客户端和服务端之间未加密和加密的通信。WS(WebSocket)类似于Http URL,而WSS(WebSocket Security)URL 表示连接是基于安全传输层(TLS/SSL)和https的连接是同样的安全机制。
WebSocket的构造函数需要一个URL参数和一个可选的协议参数(一个或者多个协议的名字),协议的参数例如XMPP(Extensible Messaging and Presence Protocol)、SOAP(Simple Object Access Protocol)或者自定义协议。而URL参数需要以WS://或者WSS://开头,例如:ws://www.websocket.org,如果URL有语法错误,构造函数会抛出异常。
// Create new WebSocket connection
var ws = new WebSocket("ws://www.websocket.org");
//测试了下链接不上。
第二个参数是协议名称,是可选的,服务端和客服端使用的协议必须一致,这样收发消息彼此才能理解,你可以定义一个或多个客户端使用的协议,服务端会选择一个来使用,一个客服端和一个服务端之间只能有一个协议。当然都得基于WebSocket,WebSocket的重大好处之一就是基于WebSocket协议的广泛使用,让你的Web能够拥有传统桌面程序那样的能力。
言归正传,我们回到构造函数,在第一次握手之后,和协议的名称一起,客户端会发送一个Sec-WebSocket-Protocol 头,服务端会选择0个或一个协议,响应会带上同样的Sec-WebSocket-Protocol 头,否则会关闭连接。通过协议协商(Protocol negotiation ),我们可以知道给定的WebSocket服务器所支持的协议和版本,然后应用选择协议使用。
// Connecting to the server with one protocol called myProtocol
var ws = new WebSocket("ws://echo.websocket.org", "myProtocol");
//myProtocol 是假设的一个定义好的且符合标准的协议。
你可以传递一个协议的数组。
var echoSocket = new WebSocket("ws://echo.websocket.org", ["com.kaazing.echo","example.imaginary.protocol"])
//服务端会选择其中一个使用
echoSocket.onopen = function(e) {
// Check the protocol chosen by the server
console.log(echoSocket.protocol);
}
输出:com.kaazing.echo
协议这个参数有三种。
- 1.注册协议:根据RFC6455(WebSocket 协议)和IANA被官方注册的标准协议。例如 微软的SOAP。 详情可以参考:http://www.iana.org/assignments/websocket/websocket.xml
看到两个华为的:
- 2.开放协议:被广泛使用的标注协议,例如XMPP和STOMP。但没有被正式注册。
- 3.自定义协议:自己编写和使用的WebSocket的协议。 协议会再后续章节给出详细介绍,下面先看事件、对象和方法以及实例。
WebSocket API是纯事件驱动,通过监听事件可以处理到来的数据和改变的链接状态。客户端不需要为了更新数据而轮训服务器。服务端发送数据后,消息和事件会异步到达。WebSocket编程遵循一个异步编程模型,只需要对WebSocket对象增加回调函数就可以监听事件。你也可以使用addEventListener()方法来监听。而一个WebSocket对象分四类不同事件。
1.open一旦服务端响应WebSocket连接请求,就会触发open事件。响应的回调函数称为onopen。
// Event handler for the WebSocket connection opening
ws.onopen = function(e) {
console.log("Connection open...");
};
open事件触发的时候,意味着协议握手结束,WebSocket已经准备好收发数据。如果你的应用收到open事件,就可以确定服务端已经处理了建立连接的请求,且同意和你的应用通信。
2.Message当消息被接受会触发消息事件,响应的回调函数叫做onmessage。如下:
// 接受文本消息的事件处理实例:
ws.onmessage = function(e) {
if(typeof e.data === "string"){
console.log("String message received", e, e.data);
} else {
console.log("Other message received", e, e.data);
}
};
除了文本消息,WebSocket消息机制还能处理二进制数据,有Blob和ArrayBuffer两种类型,在读取到数据之前需要决定好数据的类型。
// 设置二进制数据类型为blob(默认类型)
ws.binaryType = "blob";
// Event handler for receiving Blob messages
ws.onmessage = function(e) {
if(e.data instanceof Blob){
console.log("Blob message received", e.data);
var blob = new Blob(e.data);
}
};
//ArrayBuffer
ws.binaryType = "arraybuffer";
ws.onmessage = function(e) {
if(e.data instanceof ArrayBuffer){
console.log("ArrayBuffer Message Received", + e.data);
// e.data即ArrayBuffer类型
var a = new Uint8Array(e.data);
}
};
3.Error
如果发生意外的失败会触发error事件,相应的函数称为onerror,错误会导致连接关闭。如果你收到一个错误事件,那么你很快会收到一个关闭事件,在关闭事件中也许会告诉你错误的原因。而对错误事件的处理比较适合做重连的逻辑。
//异常处理
ws.onerror = function(e) {
console.log("WebSocket Error: " , e);
//Custom function for handling errors
handleErrors(e);
};
4.Close
不言而喻,当连接关闭的时候回触发这个事件,对应onclose方法,连接关闭之后,服务端和客户端就不能再收发消息。
WebSocket的规范其实还定义了ping和pong 架构(frames),可以用来做keep-alive,心跳,网络状态查询,latency instrumentation(延迟仪表?),但是目前 WebSocket API还没有公布这些特性,尽管浏览器支持了ping,但不会触发ping事件,相反,浏览器会自动响应pong,第八章会将更多关于ping和pong的细节。
当然你可以调用close方法断开与服务端的链接来触发onclose事件:
ws.onclose = function(e) {
console.log("Connection closed", e);
};
连接失败和成功的关闭握手都会触发关闭事件,WebSocket的对象的readyState属性就代表连接的状态(2代表正在关闭,3代表已经关闭)。关闭事件有三个属性可以用来做异常处理和重获: wasClean,code和reason。wasClean是一个bool值,代表连接是否干净的关闭。 如果是响应服务端的close事件,这个值为true,如果是别的原因,比如因为是底层TCP连接关闭,wasClean为false。code和reason代表关闭连接时服务端发送的状态,这两个属性和给入close方法的code和reason参数是对应的,稍后会描述细节。
4.WebSocket 方法WebSocket 对象有两个方法:send()和close()。
1.send()一旦在服务端和客户端建立了全双工的双向连接,可以使用send方法去发送消息。
//发送一个文本消息
ws.send("Hello WebSocket!");
当连接是open的时候send()方法传送数据,当连接关闭或获取不到的时候回抛出异常。一个通常的错误是人们喜欢在连接open之前发送消息。如下所示:
// 这将不会工作
var ws = new WebSocket("ws://echo.websocket.org")
ws.send("Initial data");
正确的姿势如下,应该等待open事件触发后再发送消息。
var ws = new WebSocket("ws://echo.websocket.org")
ws.onopen = function(e) {
ws.send("Initial data");
}
如果想通过响应别的事件去发送消息,可以检查readyState属性的值为open的时候来实现。
function myEventHandler(data) {
if (ws.readyState === WebSocket.OPEN) {
//open的时候即可发送
ws.send(data);
} else {
// Do something else in this case.
//Possibly ignore the data or enqueue it.
}
}
发送二进制数据:
// Send a Blob
var blob = new Blob("blob contents");
ws.send(blob);
// Send an ArrayBuffer
var a = new Uint8Array([8,6,7,5,3,0,9]);
ws.send(a.buffer);
Blob对象和JavaScript File API一起使用的时候相当有用,可以发送或接受文件,大部分的多媒体文件,图像,视频和音频文件。这一章末尾会结合File API提供读取文件内容来发送WebSocket消息的实例代码。
2.close()使用close方法来关闭连接,如果连接以及关闭,这方法将什么也不做。调用close方法只后,将不能发送数据。
ws.close();
close方法可以传入两个可选的参数,code(numerical)和reason(string),以告诉服务端为什么终止连接。第三章讲到关闭握手的时候再详细讨论这两个参数。
// 成功结束会话
ws.close(1000, "Closing normally");
//1000是状态码,代表正常结束。
5.WebSocket 属性
WebSocket对象有三个属性,readyState,bufferedAmount和Protocol。
1.readyStateWebSocket对象通过只读属性readyState来传达连接状态,它会更加连接状态自动改变。下表展示了readyState属性的四个不同的值。 了解当前连接的状态有助于我们调试。
有时候需要检查传输数据的大小,尤其是客户端传输大量数据的时候。虽然send()方法会马上执行,但数据并不是马上传输。浏览器会缓存应用流出的数据,你可以使用bufferedAmount属性检查已经进入队列但还未被传输的数据大小。这个值不包含协议框架、操作系统缓存和网络软件的开销。
下面这个例子展示了如何使用bufferedAmount属性每秒更新发送。如果网络不能处理这个频率,它会自适应。
// 10k
var THRESHOLD = 10240;
//建立连接
var ws = new WebSocket("ws://echo.websocket.org");
// Listen for the opening event
ws.onopen = function () {
setInterval( function() {
//缓存未满的时候发送
if (ws.bufferedAmount on('open', function ($ws, $request) {
var_dump($request->fd, $request->get, $request->server); //request 对象包含请求的相关信息
//$ws->push($request->fd, "hello, welcome\n");
});
//监听WebSocket消息事件
$ws->on('message', function ($ws, $frame) { // frame 是存储信息的变量,也就是传输帧
echo "Message: {$frame->data}\n";
$ws->push($frame->fd, "server: {$frame->data}");
});
//监听WebSocket连接关闭事件
$ws->on('close', function ($ws, $fd) { // fd 是客户端的标志
echo "client-{$fd} is closed\n";
});
$ws->start(); // 启动这个进程
我们可以发现,相比于 HTTP 的头部,websocket 的数据结构十分的简单小巧,没有像 HTTP 协议一样老是带着笨重的头部,这一设计让websocket的报文可以在体量上更具优势,所以传输效率来说更高 。
当然,我们传输的文本也不能在大街上裸跑啊,既然 HTTP 都有衣服穿了(HTTPS),websocket(ws) 当然也有 (wss)。
在以前的文章我们也简单聊过 HTTPS 是个什么东西,大家不了解可以去翻一下之前的文章,总的来说就是使用了非对称加密算法进行了对称加密密钥的传输,后续采用对称加密解密的方式进行数据安全处理。
如果你的业务需要支撑双全工的通信,那么 websocket 便是一个很不错的选择。网上大多数关于 websocket 的文章,大多是基于前端学习者的角度,他们使用 Chrome 的console 的调试实验,本篇文章更多是基于后端开发者的角度。希望对你有所帮助。
十二、进一步解析什么是WebSocket协议(附代码) 1.websocket背景websocket协议诞生于HTTP协议之后。在websocket协议没出现之前,当时人们发现创建需要客户端和服务器之间双向通信的web应用程序(例如,即时消息和游戏应用程序)需要滥用HTTP来轮询服务器更新,这将导致以下几个问题:
- 服务器被迫为每个客户机使用许多不同的底层TCP连接:一个用于向客户机发送信息,一个用于每个传入消息。
- 协议的开销很大,因为每个客户机到服务器的消息都有一个HTTP报头。
- 客户端脚本被迫维护从传出连接到传入连接的映射,以跟踪响应。
2.同时HTTP协议的问题也体现在数据刷新方式上,以前实现方式是以下三种:
- 客户端定时查询:比如10s产生一次,但这样势必会产生大量无效的请求,如果服务器数据没有更新的话,会产生大量的带宽浪费。
- 长轮询机制:客户端依旧发送请求给服务端,当数据更新时候,服务端再下发数据给客户端。但实际上,服务器并不是没有数据更新才响应客户端,而是等待一个超时时间才结束此次长轮询请求。当遇到数据更新比较频繁的场景,长轮询就没有优势。
- HTTP Streaming:客户端发送获取数据更新请求到服务器,服务器保持该请求的响应数据流一直打开,只有数据更新就实时发送给客户端。
- 设想很美好,但却带来新的问题: 1.违背了HTTP协议本身的语义,客户端和服务端不再是请求-响应的方式,而是二者之前直接建立的单向通信通道。 2.服务端只要的到数据更新就发送数据给客户端,所以需要协商数据更新的开始和结尾,数据容易出现错误。 3.客户端和服务端之前的网络中介可能会缓存响应数据,客户端就无法获得真正的更新数据。
面对以上问题,websocket也因此而出现。
2.websocket概念WebSocket协议允许在受控环境中运行不受信任代码的客户端与远程主机之间进行双向通信,而远程主机已经选择从该代码进行通信。为此使用的安全模型是web浏览器常用的基于起源的安全模型。该协议包括一个开放握手,然后是基本的消息帧,在TCP之上分层。该技术的目标是为基于浏览器的应用程序提供一种机制,这种应用程序需要与服务器进行双向通信,而不依赖于打开多个HTTP连接。
- 保持连接状态:websocket需要先创建连接,使其成为有状态的协议。
- 更好支持二进制:定义了二进制帧,增加安全性。
- 支持扩展:定义了扩展,可以自己实现部分部分自定义。
- 压缩效果好:可以沿用上下文的内容,有更好的压缩效果。
- 开发要求高: 前端后端都增加了一定的难度。
- 推送消息相对复杂。
- HTTP协议已经很成熟,现今websocket则太新了一点。
协议有两个部分:handshake(握手)和 data transfer(数据传输)。
1.handshake 1.客户端客户端握手报文是在HTTP的基础上发送一次HTTP协议升级请求。
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Origin: http://example.com
Sec-WebSocket-Protocol: chat, superchat
Sec-WebSocket-Version: 13
Sec-WebSocket-Key 是由浏览器随机生成的,提供基本的防护,防止恶意或者无意的连接。
Sec-WebSocket-Version 表示 WebSocket 的版本,最初 WebSocket 协议太多,不同厂商都有自己的协议版本,不过现在已经定下来了。如果服务端不支持该版本,需要返回一个 Sec-WebSocket-Versionheader,里面包含服务端支持的版本号。
2.服务端服务端响应握手也是在HTTP协议基础上回应一个Switching Protocols。
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
Sec-WebSocket-Protocol: chat
Linux下对应实现代码,注释在代码中。
int websocket_handshake(struct qsevent *ev)
{
char linebuf[128];
int index = 0;
char sec_data[128] = {0};
char sec_accept[32] = {0};
do
{
memset(linebuf, 0, sizeof(linebuf));//清空以暂存一行报文
index = readline(ev->buffer, index, linebuf);//获取一行报文
if(strstr(linebuf, "Sec-WebSocket-Key"))//如果一行报文里面包括了Sec-WebSocket-Key
{
strcat(linebuf, GUID);//和GUID连接起来
SHA1(linebuf+WEBSOCK_KEY_LENGTH, strlen(linebuf+WEBSOCK_KEY_LENGTH), sec_data);//SHA1
base64_encode(sec_data, strlen(sec_data), sec_accept);//base64编码
memset(ev->buffer, 0, MAX_BUFLEN);//清空服务端数据缓冲区
ev->length = sprintf(ev->buffer,//组装握手响应报文到数据缓冲区,下一步有进行下发
"HTTP/1.1 101 Switching Protocols\r\n"
"Upgrade: websocket\r\n"
"Connection: Upgrade\r\n"
"Sec-websocket-Accept: %s\r\n\r\n", sec_accept);
break;
}
}while(index != -1 && (ev->buffer[index] != '\r') || (ev->buffer[index] != '\n'));//遇到空行之前
return 0;
}
3.data transfer
先看数据包格式。
- FIN:指示这是消息中的最后一个片段。第一个片段也可能是最后的片段。
- RSV1, RSV2, RSV3:一般情况下全为 0。当客户端、服务端协商采用 WebSocket 扩展时,这三个标志位可以非0,且值的含义由扩展进行定义。如果出现非零的值,且并没有采用 WebSocket 扩展,连接出错。
- opcode:操作代码。
%x0:表示一个延续帧。当 Opcode 为 0 时,表示本次数据传输采用了数据分片,当前收到的数据帧为其中一个数据分片;
%x1:表示这是一个文本帧(frame);
%x2:表示这是一个二进制帧(frame);
%x3-7:保留的操作代码,用于后续定义的非控制帧;
%x8:表示连接断开;
%x9:表示这是一个 ping 操作;
%xA:表示这是一个 pong 操作;
%xB-F:保留的操作代码,用于后续定义的控制帧。
- mask:是否需要掩码。
- Payload length: 7bit or 7 + 16bit or 7 + 64bit
表示数据载荷的长度
x 为 0~126:数据的长度为 x 字节;
x 为 126:后续 2 个字节代表一个 16 位的无符号整数,该无符号整数的值为数据的长度;
x 为 127:后续 8 个字节代表一个 64 位的无符号整数(最高位为 0),该无符号整数的值为数据的长度。
- payload data:消息体。 下面是服务端的代码实现:
#define GUID "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
enum
{
WS_HANDSHAKE = 0, //握手
WS_TANSMISSION = 1, //通信
WS_END = 2, //end
};
typedef struct _ws_ophdr{
unsigned char opcode:4,
rsv3:1,
rsv2:1,
rsv1:1,
fin:1;
unsigned char pl_len:7,
mask:1;
}ws_ophdr;//协议前两个字节
typedef struct _ws_head_126{
unsigned short payload_lenght;
char mask_key[4];
}ws_head_126;//协议mask和消息体长度
/*解码*/
void websocket_umask(char *payload, int length, char *mask_key)
{
int i = 0;
for( ; ibuffer;//协议前两个自己
printf("ws_recv_data length=%d\n", ophdr->pl_len);
if(ophdr->pl_len buffer + sizeof(ws_ophdr) + 4;//获取消息地址
if(ophdr->mask)//如果消息是掩码
{
websocket_umask(payload, ophdr->pl_len, ev->buffer+2);//解码,异或
printf("payload:%s\n", payload);
}
printf("payload : %s\n", payload);//消息回显
}
else if (hdr->pl_len == 126) {
ws_head_126 *hdr126 = ev->buffer + sizeof(ws_ophdr);
} else {
ws_head_127 *hdr127 = ev->buffer + sizeof(ws_ophdr);
}
return 0;
}
int websocket_request(struct qsevent *ev)
{
if(ev->status_machine == WS_HANDSHAKE)
{
websocket_handshake(ev);//握手
ev->status_machine = WS_TANSMISSION;//设置标志位
}else if(ev->status_machine == WS_TANSMISSION){
websocket_transmission(ev);//通信
}
return 0;
}
代码是基于reactor百万并发服务器框架实现的。
5.epoll反应堆模型下实现http协议struct qsevent{
int fd; //clientfd
int events; //事件:读、写或异常
int status; //是否位于epfd红黑监听树上
void *arg; //参数
long last_active; //上次数据收发的事件
int (*callback)(int fd, int event, void *arg); //回调函数,单回调,后面修改成多回调
unsigned char buffer[MAX_BUFLEN]; //数据缓冲区
int length; //数据长度
/*http param*/
int method; //http协议请求头部
char resource[MAX_BUFLEN]; //请求的资源
int ret_code; //响应状态码
};
2.int http_response(struct qsevent *ev)
当客户端发送tcp连接时,服务端的listenfd会触发输入事件会调用ev->callback即accept_cb回调函数响应连接并获得clientfd,连接之后,http数据报文发送上来,服务端的clientfd触发输入事件会调用ev->callback即recv_cb回调函数进行数据接收,并解析http报文。
int http_request(struct qsevent *ev)
{
char linebuf[1024] = {0};//用于从buffer中获取每一行的请求报文
int idx = readline(ev->buffer, 0, linebuf);//读取第一行请求方法,readline函数,后面介绍
if(strstr(linebuf, "GET"))//strstr判断是否存在GET请求方法
{
ev->method = HTTP_METHOD_GET;//GET方法表示客户端需要获取资源
int i = 0;
while(linebuf[sizeof("GET ") + i] != ' ')i++;//跳过空格
linebuf[sizeof("GET ") + i] = '\0';
sprintf(ev->resource, "./%s/%s", HTTP_METHOD_ROOT, linebuf+sizeof("GET "));//将资源的名字以文件路径形式存储在ev->resource中
printf("resource:%s\n", ev->resource);//回显
}
else if(strstr(linebuf, "POST"))//POST的请求方法,暂时没写,方法差不多
{}
return 0;
}
3.int http_response(struct qsevent *ev)
服务器对客户端的响应报文数据进行http封装储存在buffer中,事件触发时在send_cb回调函数发送给客户端。详细解释请看代码注释。
int http_response(struct qsevent *ev)
{
if(ev == NULL)return -1;
memset(ev->buffer, 0, MAX_BUFLEN);//清空缓冲区准备储存报文
printf("resource:%s\n", ev->resource);//resource:客户端请求的资源文件,通过http_reques函数获取
int filefd = open(ev->resource, O_RDONLY);//只读方式打开获得文件句柄
if(filefd == -1)//获取失败则发送404 NOT FOUND
{
ev->ret_code = 404;//404状态码
ev->length = sprintf(ev->buffer,//将下面数据传入ev->buffer
/***状态行***/
/*版本号 状态码 状态码描述 */
"HTTP/1.1 404 NOT FOUND\r\n"
/***消息报头***/
/*获取当前时间*/
"date: Thu, 11 Nov 2021 12:28:52 GMT\r\n"
/*响应正文类型; 编码方式*/
"Content-Type: text/html;charset=ISO-8859-1\r\n"
/*响应正文长度 空行*/
"Content-Length: 85\r\n\r\n"
/***响应正文***/
"404 Not Found404\r\n\r\n");
}
else
{
struct stat stat_buf; //文件信息
fstat(filefd, &stat_buf); //fstat通过文件句柄获取文件信息
if(S_ISDIR(stat_buf.st_mode)) //如果文件是一个目录
{
printf(ev->buffer, //同上,将404放入buffer中
"HTTP/1.1 404 Not Found\r\n"
"Date: Thu, 11 Nov 2021 12:28:52 GMT\r\n"
"Content-Type: text/html;charset=ISO-8859-1\r\n"
"Content-Length: 85\r\n\r\n"
"404 Not Found404\r\n\r\n" );
}
else if (S_ISREG(stat_buf.st_mode)) //如果文件是存在
{
ev->ret_code = 200; //200状态码
ev->length = sprintf(ev->buffer, //length记录长度,buffer储存响应报文
"HTTP/1.1 200 OK\r\n"
"Date: Thu, 11 Nov 2021 12:28:52 GMT\r\n"
"Content-Type: text/html;charset=ISO-8859-1\r\n"
"Content-Length: %ld\r\n\r\n",
stat_buf.st_size );//文件长度储存在stat_buf.st_size中
}
return ev->length;//返回报文长度
}
}
4.总代码
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define HTTP_METHOD_ROOT "html"
#define MAX_BUFLEN 4096
#define MAX_EPOLLSIZE 1024
#define MAX_EPOLL_EVENTS 1024
#define HTTP_METHOD_GET 0
#define HTTP_METHOD_POST 1
typedef int (*NCALLBACK)(int, int, void*);
struct qsevent{
int fd;
int events;
int status;
void *arg;
long last_active;
int (*callback)(int fd, int event, void *arg);
unsigned char buffer[MAX_BUFLEN];
int length;
/*http param*/
int method;
char resource[MAX_BUFLEN];
int ret_code;
};
struct qseventblock{
struct qsevent *eventsarrry;
struct qseventblock *next;
};
struct qsreactor{
int epfd;
int blkcnt;
struct qseventblock *evblk;
};
int recv_cb(int fd, int events, void *arg);
int send_cb(int fd, int events, void *arg);
struct qsevent *qsreactor_idx(struct qsreactor *reactor, int sockfd);
int readline(char *allbuf, int idx, char *linebuf)
{
int len = strlen(allbuf);
for( ; idxevents = 0;
ev->fd = fd;
ev->arg = arg;
ev->callback = callback;
ev->last_active = time(NULL);
return;
}
int qs_event_add(int epfd, int events, struct qsevent *ev)
{
struct epoll_event epv = {0, {0}};;
epv.events = ev->events = events;
epv.data.ptr = ev;
if(ev->status == 1)
{
if(epoll_ctl(epfd, EPOLL_CTL_MOD, ev->fd, &epv) status == 0)
{
if(epoll_ctl(epfd, EPOLL_CTL_ADD, ev->fd, &epv) status = 1;
}
return 0;
}
int qs_event_del(int epfd, struct qsevent *ev)
{
struct epoll_event epv = {0, {0}};
if(ev->status != 1)
return -1;
ev->status = 0;
epv.data.ptr = ev;
if((epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &epv)))
{
perror("EPOLL_CTL_DEL error\n");
return -1;
}
return 0;
}
int sock(short port)
{
int fd = socket(AF_INET, SOCK_STREAM, 0);
fcntl(fd, F_SETFL, O_NONBLOCK);
struct sockaddr_in ser_addr;
memset(&ser_addr, 0, sizeof(ser_addr));
ser_addr.sin_addr.s_addr = htonl(INADDR_ANY);
ser_addr.sin_family = AF_INET;
ser_addr.sin_port = htons(port);
bind(fd, (struct sockaddr*)&ser_addr, sizeof(ser_addr));
if(listen(fd, 20) buffer, 0, linebuf);
if(strstr(linebuf, "GET"))
{
ev->method = HTTP_METHOD_GET;
int i = 0;
while(linebuf[sizeof("GET ") + i] != ' ')i++;
linebuf[sizeof("GET ") + i] = '\0';
sprintf(ev->resource, "./%s/%s", HTTP_METHOD_ROOT, linebuf+sizeof("GET "));
printf("resource:%s\n", ev->resource);
}
else if(strstr(linebuf, "POST"))
{}
return 0;
}
int http_response(struct qsevent *ev)
{
if(ev == NULL)return -1;
memset(ev->buffer, 0, MAX_BUFLEN);
printf("resource:%s\n", ev->resource);
int filefd = open(ev->resource, O_RDONLY);
if(filefd == -1)
{
ev->ret_code = 404;
ev->length = sprintf(ev->buffer,
"HTTP/1.1 404 NOT FOUND\r\n"
"date: Thu, 11 Nov 2021 12:28:52 GMT\r\n"
"Content-Type: text/html;charset=ISO-8859-1\r\n"
"Content-Length: 85\r\n\r\n"
"404 Not Found404\r\n\r\n");
}
else
{
struct stat stat_buf;
fstat(filefd, &stat_buf);
if(S_ISDIR(stat_buf.st_mode))
{
printf(ev->buffer,
"HTTP/1.1 404 Not Found\r\n"
"Date: Thu, 11 Nov 2021 12:28:52 GMT\r\n"
"Content-Type: text/html;charset=ISO-8859-1\r\n"
"Content-Length: 85\r\n\r\n"
"404 Not Found404\r\n\r\n" );
}
else if (S_ISREG(stat_buf.st_mode))
{
ev->ret_code = 200;
ev->length = sprintf(ev->buffer,
"HTTP/1.1 200 OK\r\n"
"Date: Thu, 11 Nov 2021 12:28:52 GMT\r\n"
"Content-Type: text/html;charset=ISO-8859-1\r\n"
"Content-Length: %ld\r\n\r\n",
stat_buf.st_size );
}
return ev->length;
}
}
int qsreactor_init(struct qsreactor *reactor)
{
if(reactor == NULL)
return -1;
memset(reactor, 0, sizeof(struct qsreactor));
reactor->epfd = epoll_create(1);
if(reactor->epfd epfd);
return -2;
}
memset(block, 0, sizeof(block));
struct qsevent *evs = (struct qsevent*)malloc(MAX_EPOLLSIZE * sizeof(struct qsevent));
if(evs == NULL)
{
printf("evsnit malloc error\n");
close(reactor->epfd);
return -3;
}
memset(evs, 0, sizeof(evs));
block->next = NULL;
block->eventsarrry = evs;
reactor->blkcnt = 1;
reactor->evblk = block;
return 0;
}
int qsreactor_alloc(struct qsreactor *reactor)
{
if(reactor == NULL)return -1;
if(reactor->evblk == NULL)return -1;
struct qseventblock *tailblock = reactor->evblk;
while(tailblock->next != NULL)
tailblock = tailblock->next;
struct qseventblock *newblock = (struct qseventblock*)malloc(sizeof(struct qseventblock));
if(newblock == NULL)
{
printf("newblock alloc error\n");
return -1;
}
memset(newblock, 0, sizeof(newblock));
struct qsevent *neweventarray = (struct qsevent*)malloc(sizeof(struct qsevent) * MAX_EPOLLSIZE);
if(neweventarray == NULL)
{
printf("neweventarray malloc error\n");
return -1;
}
memset(neweventarray, 0, sizeof(neweventarray));
newblock->eventsarrry = neweventarray;
newblock->next = NULL;
tailblock->next = newblock;
reactor->blkcnt++;
return 0;
}
struct qsevent *qsreactor_idx(struct qsreactor *reactor, int sockfd)
{
int index = sockfd / MAX_EPOLLSIZE;
while(index >= reactor->blkcnt)qsreactor_alloc(reactor);
int i=0;
struct qseventblock *idxblock = reactor->evblk;
while(i++next;
return &idxblock->eventsarrry[sockfd%MAX_EPOLLSIZE];
}
int qsreactor_destory(struct qsreactor *reactor)
{
close(reactor->epfd);
free(reactor->evblk);
reactor = NULL;
return 0;
}
int qsreactor_addlistener(struct qsreactor *reactor, int sockfd, NCALLBACK acceptor)
{
if(reactor == NULL)return -1;
if(reactor->evblk == NULL)return -1;
struct qsevent *event = qsreactor_idx(reactor, sockfd);
qs_event_set(event, sockfd, acceptor, reactor);
qs_event_add(reactor->epfd, EPOLLIN, event);
return 0;
}
int send_cb(int fd, int events, void *arg)
{
struct qsreactor *reactor = (struct qsreactor*)arg;
struct qsevent *ev = qsreactor_idx(reactor, fd);
http_response(ev);
int ret = send(fd, ev->buffer, ev->length, 0);
if(ret epfd, ev);
printf("clent[%d] ", fd);
perror("send error\n");
close(fd);
}
else if(ret > 0)
{
if(ev->ret_code == 200)
{
int filefd = open(ev->resource, O_RDONLY);
struct stat stat_buf;
fstat(filefd, &stat_buf);
sendfile(fd, filefd, NULL, stat_buf.st_size);
close(filefd);
}
printf("send to client[%d]:%s", fd, ev->buffer);
qs_event_del(reactor->epfd, ev);
qs_event_set(ev, fd, recv_cb, reactor);
qs_event_add(reactor->epfd, EPOLLIN, ev);
}
return ret;
}
int recv_cb(int fd, int events, void *arg)
{
struct qsreactor *reactor = (struct qsreactor*)arg;
struct qsevent *ev = qsreactor_idx(reactor, fd);
int len = recv(fd, ev->buffer, MAX_BUFLEN, 0);
qs_event_del(reactor->epfd, ev);
if(len > 0)
{
ev->length = len;
ev->buffer[len] = '\0';
printf("client[%d]:%s", fd, ev->buffer);
http_request(ev);
qs_event_del(reactor->epfd, ev);
qs_event_set(ev, fd, send_cb, reactor);
qs_event_add(reactor->epfd, EPOLLOUT, ev);
}
else if(len == 0)
{
qs_event_del(reactor->epfd, ev);
close(fd);
printf("client[%d] close\n", fd);
}
else
{
qs_event_del(reactor->epfd, ev);
printf("client[%d]", fd);
perror("reacv error,\n");
close(fd);
}
return 0;
}
int accept_cb(int fd, int events, void *arg)
{
struct qsreactor *reactor = (struct qsreactor*)arg;
if(reactor == NULL)return -1;
struct sockaddr_in client_addr;
socklen_t len = sizeof(client_addr);
int clientfd;
if((clientfd = accept(fd, (struct sockaddr*)&client_addr, &len)) == -1)
{
if(errno != EAGAIN && errno != EINTR)
{}
perror("accept error\n");
return -1;
}
int flag = 0;
if((flag = fcntl(clientfd, F_SETFL, O_NONBLOCK)) epfd, EPOLLIN, event);
printf("new connect [%s:%d], pos[%d]\n",
inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), clientfd);
return 0;
}
int qsreactor_run(struct qsreactor *reactor)
{
if(reactor == NULL)
return -1;
if(reactor->evblk == NULL)
return -1;
if(reactor->epfd epfd, events, MAX_EPOLL_EVENTS, 1000);
if(nready callback(ev->fd, events[i].events, ev->arg);
}
if((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT))
{
ev->callback(ev->fd, events[i].events, ev->arg);
}
}
}
}
int main(int argc, char **argv)
{
unsigned short port = atoi(argv[1]);
int sockfd = sock(port);
struct qsreactor *reactor = (struct qsreactor*)malloc(sizeof(struct qsreactor));
qsreactor_init(reactor);
qsreactor_addlistener(reactor, sockfd, accept_cb);
qsreactor_run(reactor);
qsreactor_destory(reactor);
close(sockfd);
}
6.epoll反应堆模型下实现websocket协议
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define MAX_BUFLEN 4096
#define MAX_EPOLLSIZE 1024
#define MAX_EPOLL_EVENTS 1024
#define GUID "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
enum
{
WS_HANDSHAKE = 0,
WS_TANSMISSION = 1,
WS_END = 2,
};
typedef struct _ws_ophdr{
unsigned char opcode:4,
rsv3:1,
rsv2:1,
rsv1:1,
fin:1;
unsigned char pl_len:7,
mask:1;
}ws_ophdr;
typedef struct _ws_head_126{
unsigned short payload_lenght;
char mask_key[4];
}ws_head_126;
typedef struct _ws_head_127{
long long payload_lenght;
char mask_key[4];
}ws_head_127;
typedef int (*NCALLBACK)(int, int, void*);
struct qsevent{
int fd;
int events;
int status;
void *arg;
long last_active;
int (*callback)(int fd, int event, void *arg);
unsigned char buffer[MAX_BUFLEN];
int length;
/*websocket param*/
int status_machine;
};
struct qseventblock{
struct qsevent *eventsarrry;
struct qseventblock *next;
};
struct qsreactor{
int epfd;
int blkcnt;
struct qseventblock *evblk;
};
int recv_cb(int fd, int events, void *arg);
int send_cb(int fd, int events, void *arg);
struct qsevent *qsreactor_idx(struct qsreactor *reactor, int sockfd);
int readline(char *allbuf, int idx, char *linebuf)
{
int len = strlen(allbuf);
for(;idx data, bptr->length);
out_str[bptr->length-1] = '\0';
size = bptr->length;
BIO_free_all(bio);
return size;
}
#define WEBSOCK_KEY_LENGTH 19
int websocket_handshake(struct qsevent *ev)
{
char linebuf[128];
int index = 0;
char sec_data[128] = {0};
char sec_accept[32] = {0};
do
{
memset(linebuf, 0, sizeof(linebuf));
index = readline(ev->buffer, index, linebuf);
if(strstr(linebuf, "Sec-WebSocket-Key"))
{
strcat(linebuf, GUID);
SHA1(linebuf+WEBSOCK_KEY_LENGTH, strlen(linebuf+WEBSOCK_KEY_LENGTH), sec_data);
base64_encode(sec_data, strlen(sec_data), sec_accept);
memset(ev->buffer, 0, MAX_BUFLEN);
ev->length = sprintf(ev->buffer,
"HTTP/1.1 101 Switching Protocols\r\n"
"Upgrade: websocket\r\n"
"Connection: Upgrade\r\n"
"Sec-websocket-Accept: %s\r\n\r\n", sec_accept);
break;
}
}while(index != -1 && (ev->buffer[index] != '\r') || (ev->buffer[index] != '\n'));
return 0;
}
void websocket_umask(char *payload, int length, char *mask_key)
{
int i = 0;
for( ; ibuffer;
printf("ws_recv_data length=%d\n", ophdr->pl_len);
if(ophdr->pl_len buffer + sizeof(ws_ophdr) + 4;
if(ophdr->mask)
{
websocket_umask(payload, ophdr->pl_len, ev->buffer+2);
printf("payload:%s\n", payload);
}
memset(ev->buffer, 0, ev->length);
strcpy(ev->buffer, "00ok");
}
return 0;
}
int websocket_request(struct qsevent *ev)
{
if(ev->status_machine == WS_HANDSHAKE)
{
websocket_handshake(ev);
ev->status_machine = WS_TANSMISSION;
}else if(ev->status_machine == WS_TANSMISSION){
websocket_transmission(ev);
}
return 0;
}
void qs_event_set(struct qsevent *ev, int fd, NCALLBACK callback, void *arg)
{
ev->events = 0;
ev->fd = fd;
ev->arg = arg;
ev->callback = callback;
ev->last_active = time(NULL);
return;
}
int qs_event_add(int epfd, int events, struct qsevent *ev)
{
struct epoll_event epv = {0, {0}};;
epv.events = ev->events = events;
epv.data.ptr = ev;
if(ev->status == 1)
{
if(epoll_ctl(epfd, EPOLL_CTL_MOD, ev->fd, &epv) status == 0)
{
if(epoll_ctl(epfd, EPOLL_CTL_ADD, ev->fd, &epv) status = 1;
}
return 0;
}
int qs_event_del(int epfd, struct qsevent *ev)
{
struct epoll_event epv = {0, {0}};
if(ev->status != 1)
return -1;
ev->status = 0;
epv.data.ptr = ev;
if((epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &epv)))
{
perror("EPOLL_CTL_DEL error\n");
return -1;
}
return 0;
}
int sock(short port)
{
int fd = socket(AF_INET, SOCK_STREAM, 0);
fcntl(fd, F_SETFL, O_NONBLOCK);
struct sockaddr_in ser_addr;
memset(&ser_addr, 0, sizeof(ser_addr));
ser_addr.sin_addr.s_addr = htonl(INADDR_ANY);
ser_addr.sin_family = AF_INET;
ser_addr.sin_port = htons(port);
bind(fd, (struct sockaddr*)&ser_addr, sizeof(ser_addr));
if(listen(fd, 20) epfd = epoll_create(1);
if(reactor->epfd epfd);
return -2;
}
memset(block, 0, sizeof(block));
struct qsevent *evs = (struct qsevent*)malloc(MAX_EPOLLSIZE * sizeof(struct qsevent));
if(evs == NULL)
{
printf("evsnit malloc error\n");
close(reactor->epfd);
return -3;
}
memset(evs, 0, sizeof(evs));
block->next = NULL;
block->eventsarrry = evs;
reactor->blkcnt = 1;
reactor->evblk = block;
return 0;
}
int qsreactor_alloc(struct qsreactor *reactor)
{
if(reactor == NULL)return -1;
if(reactor->evblk == NULL)return -1;
struct qseventblock *tailblock = reactor->evblk;
while(tailblock->next != NULL)
tailblock = tailblock->next;
struct qseventblock *newblock = (struct qseventblock*)malloc(sizeof(struct qseventblock));
if(newblock == NULL)
{
printf("newblock alloc error\n");
return -1;
}
memset(newblock, 0, sizeof(newblock));
struct qsevent *neweventarray = (struct qsevent*)malloc(sizeof(struct qsevent) * MAX_EPOLLSIZE);
if(neweventarray == NULL)
{
printf("neweventarray malloc error\n");
return -1;
}
memset(neweventarray, 0, sizeof(neweventarray));
newblock->eventsarrry = neweventarray;
newblock->next = NULL;
tailblock->next = newblock;
reactor->blkcnt++;
return 0;
}
struct qsevent *qsreactor_idx(struct qsreactor *reactor, int sockfd)
{
int index = sockfd / MAX_EPOLLSIZE;
while(index >= reactor->blkcnt)qsreactor_alloc(reactor);
int i=0;
struct qseventblock *idxblock = reactor->evblk;
while(i++next;
return &idxblock->eventsarrry[sockfd%MAX_EPOLLSIZE];
}
int qsreactor_destory(struct qsreactor *reactor)
{
close(reactor->epfd);
free(reactor->evblk);
reactor = NULL;
return 0;
}
int qsreactor_addlistener(struct qsreactor *reactor, int sockfd, NCALLBACK acceptor)
{
if(reactor == NULL)return -1;
if(reactor->evblk == NULL)return -1;
struct qsevent *event = qsreactor_idx(reactor, sockfd);
qs_event_set(event, sockfd, acceptor, reactor);
qs_event_add(reactor->epfd, EPOLLIN, event);
return 0;
}
int send_cb(int fd, int events, void *arg)
{
struct qsreactor *reactor = (struct qsreactor*)arg;
struct qsevent *ev = qsreactor_idx(reactor, fd);
int ret = send(fd, ev->buffer, ev->length, 0);
if(ret epfd, ev);
printf("clent[%d] ", fd);
perror("send error\n");
close(fd);
}
else if(ret > 0)
{
printf("send to client[%d]:\n%s\n", fd, ev->buffer);
qs_event_del(reactor->epfd, ev);
qs_event_set(ev, fd, recv_cb, reactor);
qs_event_add(reactor->epfd, EPOLLIN, ev);
}
return ret;
}
int recv_cb(int fd, int events, void *arg)
{
struct qsreactor *reactor = (struct qsreactor*)arg;
struct qsevent *ev = qsreactor_idx(reactor, fd);
int len = recv(fd, ev->buffer, MAX_BUFLEN, 0);
qs_event_del(reactor->epfd, ev);
if(len > 0)
{
ev->length = len;
ev->buffer[len] = '\0';
printf("client[%d]:\n%s\n", fd, ev->buffer);
websocket_request(ev);
qs_event_del(reactor->epfd, ev);
qs_event_set(ev, fd, send_cb, reactor);
qs_event_add(reactor->epfd, EPOLLOUT, ev);
}
else if(len == 0)
{
qs_event_del(reactor->epfd, ev);
close(fd);
printf("client[%d] close\n", fd);
}
else
{
qs_event_del(reactor->epfd, ev);
printf("client[%d]", fd);
perror("reacv error,\n");
close(fd);
}
return 0;
}
int accept_cb(int fd, int events, void *arg)
{
struct qsreactor *reactor = (struct qsreactor*)arg;
if(reactor == NULL)return -1;
struct sockaddr_in client_addr;
socklen_t len = sizeof(client_addr);
int clientfd;
if((clientfd = accept(fd, (struct sockaddr*)&client_addr, &len)) == -1)
{
if(errno != EAGAIN && errno != EINTR)
{}
perror("accept error\n");
return -1;
}
int flag = 0;
if((flag = fcntl(clientfd, F_SETFL, O_NONBLOCK)) status_machine = WS_HANDSHAKE;
qs_event_set(event, clientfd, recv_cb, reactor);
qs_event_add(reactor->epfd, EPOLLIN, event);
printf("new connect [%s:%d], pos[%d]\n",
inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), clientfd);
return 0;
}
int qsreactor_run(struct qsreactor *reactor)
{
if(reactor == NULL)
return -1;
if(reactor->evblk == NULL)
return -1;
if(reactor->epfd epfd, events, MAX_EPOLL_EVENTS, 1000);
if(nready callback(ev->fd, events[i].events, ev->arg);
}
if((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT))
{
ev->callback(ev->fd, events[i].events, ev->arg);
}
}
}
}
int main(int argc, char **argv)
{
unsigned short port = atoi(argv[1]);
int sockfd = sock(port);
struct qsreactor *reactor = (struct qsreactor*)malloc(sizeof(struct qsreactor));
qsreactor_init(reactor);
qsreactor_addlistener(reactor, sockfd, accept_cb);
qsreactor_run(reactor);
qsreactor_destory(reactor);
close(sockfd);
}
7.C1000K reactor模型,epoll实现,连接并回发一段数据,测试正常
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define MAX_BUFLEN 4096
#define MAX_EPOLLSIZE 1024
#define MAX_EPOLL_EVENTS 1024
typedef int (*NCALLBACK)(int, int, void*);
struct qsevent{
int fd;
int events;
int status;
void *arg;
long last_active;
int (*callback)(int fd, int event, void *arg);
unsigned char buffer[MAX_BUFLEN];
int length;
};
struct qseventblock{
struct qsevent *eventsarrry;
struct qseventblock *next;
};
struct qsreactor{
int epfd;
int blkcnt;
struct qseventblock *evblk;
};
int recv_cb(int fd, int events, void *arg);
int send_cb(int fd, int events, void *arg);
struct qsevent *qsreactor_idx(struct qsreactor *reactor, int sockfd);
void qs_event_set(struct qsevent *ev, int fd, NCALLBACK callback, void *arg)
{
ev->events = 0;
ev->fd = fd;
ev->arg = arg;
ev->callback = callback;
ev->last_active = time(NULL);
return;
}
int qs_event_add(int epfd, int events, struct qsevent *ev)
{
struct epoll_event epv = {0, {0}};;
epv.events = ev->events = events;
epv.data.ptr = ev;
if(ev->status == 1)
{
if(epoll_ctl(epfd, EPOLL_CTL_MOD, ev->fd, &epv) status == 0)
{
if(epoll_ctl(epfd, EPOLL_CTL_ADD, ev->fd, &epv) status = 1;
}
return 0;
}
int qs_event_del(int epfd, struct qsevent *ev)
{
struct epoll_event epv = {0, {0}};
if(ev->status != 1)
return -1;
ev->status = 0;
epv.data.ptr = ev;
if((epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &epv)))
{
perror("EPOLL_CTL_DEL error\n");
return -1;
}
return 0;
}
int sock(short port)
{
int fd = socket(AF_INET, SOCK_STREAM, 0);
fcntl(fd, F_SETFL, O_NONBLOCK);
struct sockaddr_in ser_addr;
memset(&ser_addr, 0, sizeof(ser_addr));
ser_addr.sin_addr.s_addr = htonl(INADDR_ANY);
ser_addr.sin_family = AF_INET;
ser_addr.sin_port = htons(port);
bind(fd, (struct sockaddr*)&ser_addr, sizeof(ser_addr));
if(listen(fd, 20) epfd = epoll_create(1);
if(reactor->epfd epfd);
return -2;
}
memset(block, 0, sizeof(block));
struct qsevent *evs = (struct qsevent*)malloc(MAX_EPOLLSIZE * sizeof(struct qsevent));
if(evs == NULL)
{
printf("evsnit malloc error\n");
close(reactor->epfd);
return -3;
}
memset(evs, 0, sizeof(evs));
block->next = NULL;
block->eventsarrry = evs;
reactor->blkcnt = 1;
reactor->evblk = block;
return 0;
}
int qsreactor_alloc(struct qsreactor *reactor)
{
if(reactor == NULL)return -1;
if(reactor->evblk == NULL)return -1;
struct qseventblock *tailblock = reactor->evblk;
while(tailblock->next != NULL)
tailblock = tailblock->next;
struct qseventblock *newblock = (struct qseventblock*)malloc(sizeof(struct qseventblock));
if(newblock == NULL)
{
printf("newblock alloc error\n");
return -1;
}
memset(newblock, 0, sizeof(newblock));
struct qsevent *neweventarray = (struct qsevent*)malloc(sizeof(struct qsevent) * MAX_EPOLLSIZE);
if(neweventarray == NULL)
{
printf("neweventarray malloc error\n");
return -1;
}
memset(neweventarray, 0, sizeof(neweventarray));
newblock->eventsarrry = neweventarray;
newblock->next = NULL;
tailblock->next = newblock;
reactor->blkcnt++;
return 0;
}
struct qsevent *qsreactor_idx(struct qsreactor *reactor, int sockfd)
{
int index = sockfd / MAX_EPOLLSIZE;
while(index >= reactor->blkcnt)qsreactor_alloc(reactor);
int i=0;
struct qseventblock *idxblock = reactor->evblk;
while(i++next;
return &idxblock->eventsarrry[sockfd%MAX_EPOLLSIZE];
}
int qsreactor_destory(struct qsreactor *reactor)
{
close(reactor->epfd);
free(reactor->evblk);
reactor = NULL;
return 0;
}
int qsreactor_addlistener(struct qsreactor *reactor, int sockfd, NCALLBACK acceptor)
{
if(reactor == NULL)return -1;
if(reactor->evblk == NULL)return -1;
struct qsevent *event = qsreactor_idx(reactor, sockfd);
qs_event_set(event, sockfd, acceptor, reactor);
qs_event_add(reactor->epfd, EPOLLIN, event);
return 0;
}
int send_cb(int fd, int events, void *arg)
{
struct qsreactor *reactor = (struct qsreactor*)arg;
struct qsevent *ev = qsreactor_idx(reactor, fd);
int ret = send(fd, ev->buffer, ev->length, 0);
if(ret epfd, ev);
printf("clent[%d] ", fd);
perror("send error\n");
close(fd);
}
else if(ret > 0)
{
printf("send to client[%d]:%s", fd, ev->buffer);
qs_event_del(reactor->epfd, ev);
qs_event_set(ev, fd, recv_cb, reactor);
qs_event_add(reactor->epfd, EPOLLIN, ev);
}
return ret;
}
int recv_cb(int fd, int events, void *arg)
{
struct qsreactor *reactor = (struct qsreactor*)arg;
struct qsevent *ev = qsreactor_idx(reactor, fd);
int len = recv(fd, ev->buffer, MAX_BUFLEN, 0);
qs_event_del(reactor->epfd, ev);
if(len > 0)
{
ev->length = len;
ev->buffer[len] = '\0';
printf("client[%d]:%s", fd, ev->buffer);
qs_event_del(reactor->epfd, ev);
qs_event_set(ev, fd, send_cb, reactor);
qs_event_add(reactor->epfd, EPOLLOUT, ev);
}
else if(len == 0)
{
qs_event_del(reactor->epfd, ev);
close(fd);
printf("client[%d] close\n", fd);
}
else
{
qs_event_del(reactor->epfd, ev);
printf("client[%d]", fd);
perror("reacv error,\n");
close(fd);
}
return 0;
}
int accept_cb(int fd, int events, void *arg)
{
struct qsreactor *reactor = (struct qsreactor*)arg;
if(reactor == NULL)return -1;
struct sockaddr_in client_addr;
socklen_t len = sizeof(client_addr);
int clientfd;
if((clientfd = accept(fd, (struct sockaddr*)&client_addr, &len)) == -1)
{
if(errno != EAGAIN && errno != EINTR)
{}
perror("accept error\n");
return -1;
}
int flag = 0;
if((flag = fcntl(clientfd, F_SETFL, O_NONBLOCK)) epfd, EPOLLIN, event);
printf("new connect [%s:%d], pos[%d]\n",
inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), clientfd);
return 0;
}
int qsreactor_run(struct qsreactor *reactor)
{
if(reactor == NULL)
return -1;
if(reactor->evblk == NULL)
return -1;
if(reactor->epfd epfd, events, MAX_EPOLL_EVENTS, 1000);
if(nready callback(ev->fd, events[i].events, ev->arg);
}
if((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT))
{
ev->callback(ev->fd, events[i].events, ev->arg);
}
}
}
}
int main(int argc, char **argv)
{
unsigned short port = atoi(argv[1]);
int sockfd = sock(port);
struct qsreactor *reactor = (struct qsreactor*)malloc(sizeof(struct qsreactor));
qsreactor_init(reactor);
qsreactor_addlistener(reactor, sockfd, accept_cb);
qsreactor_run(reactor);
qsreactor_destory(reactor);
close(sockfd);
}
总结
以上就是今天要讲的内容,本文详细介绍了Linux网络基础第四篇之WebSocket协议的使用,网络协议提供了大量的方法供我们使用,非常的便捷,我们务必掌握。希望大家多多支持!另外如果上述有任何问题,请懂哥指教,不过没关系,主要是自己能坚持,更希望有一起学习的同学可以帮我指正,但是如果可以请温柔一点跟我讲,爱与和平是永远的主题,爱各位了。加油啊!