文章目录
1. 引言
- 1. 引言
- 2. WebSocket使用步骤
- 2.1 引入依赖
- 2.2 创建WebSocket配置类
- 2.3 WebSocket服务类
- 2.4 前端页面
最近遇到一个生活场景,需要把消息队列里的故障消息,推送给PC客户端,并在客户端主动语音播报。 这个功能涉及语音合成和通知推送,对于通知推送使用了WebSocket
,下面来记录下。
SpringBoot
集成WebSocket
org.springframework.boot
spring-boot-starter-websocket
2.2 创建WebSocket配置类
/**
* WebSocket配置
*/
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
2.3 WebSocket服务类
@Slf4j
@ServerEndpoint("/websocket/{userId}")
@Component
public class WebSocketServer {
// 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
private static int onlineCount = 0;
// concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
private static CopyOnWriteArraySet webSocketSet =
new CopyOnWriteArraySet();
//与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
//接收userId
private String userId="";
public static String baseUrl = "http://118.89.68.236:9000";
/**
* 连接建立成功调用的方法*/
@OnOpen
public void onOpen(Session session,@PathParam("userId") String userId) {
this.session = session;
webSocketSet.add(this); //加入set中
addOnlineCount(); //在线数加1
log.info("有新窗口开始监听:"+userId+",当前在线人数为" + getOnlineCount());
this.userId = userId;
try {
sendMessage("连接成功");
} catch (IOException e) {
log.error("websocket IO异常");
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
webSocketSet.remove(this); //从set中删除
subOnlineCount(); //在线数减1
log.info("有一连接关闭!当前在线人数为" + getOnlineCount());
}
/**
* 收到客户端消息后调用的方法
* @param message 客户端发送过来的消息*/
@OnMessage
public void onMessage(String message, Session session) {
log.info("收到来自窗口"+userId+"的信息:"+message);
//群发消息
for (WebSocketServer item : webSocketSet) {
try {
item.sendMessage(message);
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("发生错误");
error.printStackTrace();
}
/**
* 实现服务器主动推送-字符串
*/
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
/**
* 实现服务器主动推送- byte数组
*/
public void sendAudio(byte[] data) throws IOException {
ByteBuffer buffer = ByteBuffer.wrap(data);
this.session.getBasicRemote().sendBinary(buffer);
}
/**
* 群发自定义消息-发送音频文件
* */
public static void sendToAudio(String message){
InputStream inputStream = null;
for (WebSocketServer item : webSocketSet) {
try {
inputStream = sendGet(message);
byte[] bytes = IOUtils.toByteArray(inputStream);
item.sendAudio(bytes);
} catch (Exception e) {
}
}
}
/**
* 群发自定义消息-发送文本文件
* */
public static void sendInfo(String message,@PathParam("userId") String userId){
log.info("推送消息到窗口"+userId+",推送内容:"+message);
for (WebSocketServer item : webSocketSet) {
try {
//这里可以设定只推送给这个userId的,为null则全部推送
if(userId==null) {
item.sendMessage(message);
}else if(item.userId.equals(userId)){
item.sendMessage(message);
}
} catch (IOException e) {
continue;
}
}
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
public static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
public static InputStream sendGet(String message) {
InputStream inputStream = null;
try {
String url = encode(baseUrl,message);
URL serverUrl = new URL(url);
URLConnection connection = serverUrl.openConnection();
//设置超时时间
connection.setConnectTimeout(5000);
connection.setReadTimeout(15000);
connection.setRequestProperty("accept", "*/*");
connection.setRequestProperty("connection", "Keep-Alive");
connection.connect();
inputStream = connection.getInputStream();
} catch (Exception e){
e.printStackTrace();
} finally {
}
return inputStream;
}
public static String encode(String baseUrl, String message) throws UnsupportedEncodingException {
String title = "format: yaml\nmode: mspk\naudio: 14\nspeaker: Aida\nvocoder: melgan";
String encodeContent = URLEncoder.encode(message, "UTF-8");
String encodeTitle = URLEncoder.encode(title, "UTF-8");
String result = baseUrl + "/synthesize?text=" + encodeContent + "&kwargs=" + encodeTitle;
log.info("request path : {}",result);
return result;
}
}
对于上面根据文字产生音频文件的两个方法:sendGet()
、encode()
。返回的是文件流,对于的服务使用的是ttskit
,可以去GitHub
搜一下。
通过使用IOUtils.toByteArray(inputStream)
方法将InputStream
转为byte[]
数组,使用的commons-io
包中的工具类。
commons-io
commons-io
2.6
最后就可以在业务代码中,调用WebSocket
服务方法了。此处使用定时任务模拟消息推送。
@Slf4j
@Configuration
@EnableScheduling
public class TTSTask {
public static int i = 1;
@Scheduled(cron = "0/30 * * * * ?")
private void configureTasks() {
log.info("执行静态定时任务时间: " + LocalDateTime.now());
String message = "收到一条故障信息,南京市江北新区高新路"+ i + "号杆塔故障";
log.info("播报信息“{}",message);
WebSocketServer.sendToAudio(message);
i++;
}
}
2.4 前端页面
DOCTYPE html>
语音自动播报
语音自动播报
$(function() {
var socket;
if(typeof(WebSocket) == "undefined") {
alert("您的浏览器不支持WebSocket");
return;
}
$("#btnConnection").click(function() {
//实现化WebSocket对象,指定要连接的服务器地址与端口
socket = new WebSocket("ws://127.00.1:8890/websocket/88888");
//打开事件
socket.onopen = function() {
console.log("Socket 已打开");
};
//获得消息事件
socket.onmessage = function(msg) {
console.log("获得消息:",msg);
console.log("获得消息:",msg.data);
q('#audio').src = URL.createObjectURL(msg.data)
q('#audio').hidden = false
};
//关闭事件
socket.onclose = function() {
console.log("Socket已关闭");
};
//发生了错误事件
socket.onerror = function() {
console.log("发生了错误");
}
});
$("#btnClose").click(function() {
socket.close();
});
});
function q(selector) {
return document.querySelector(selector)
}
这里只说一点,对于 socket.onmessage
方法的回调结果,对于byte[]
数组,使用blob
接收的,对于前端audio
标签可以直接使用