午安,打工人! 最近终于终于有点闲空了,就翻了翻之前写的学习笔记,就发现了上一篇文章。依稀记得当时还是在学校里费了不少功夫才完成的,就突然想拿出来重新回顾一下顺便改进改进,如若您发现有错误或者不合理的地方,恳请指点一二,在此感谢!
前言这一类文章暂时没有提供可视化界面,我会在基础功能差不多具备的情况下进行设计和开发。 本篇文章思路可以参考上一篇,主要是对代码进行修改,总体还是不难的。
目录 common : 设计主要是用来存储用户信息的以及后期一些公共类。 socketclient:客户端 websocket: 服务端
目前就存放一个用户,大伙就随便看看就行。
@Data
public class User implements Serializable {
private String username;
private String message;
private String ip;
public User(String username,String message,String ip){
this.username = username;
this.message = message;
this.ip = ip;
}
public User(){
}
}
websocket
1、CoreServer
CoreServer 服务端主要实现处理,在这里进行线程的启动以及初始化,基本实现思路大体一致: 1.开启阻塞连接后,等待连接。 2.每当一个客户端进行连接,就创建一个线程来单独对此客户端进行交互,同时在相互连接成功时将线程信息存放到一个集合中,用于后期消息发送以及退出连接等操作。 3.广播消息线程:该线程一开始是存放于上述线程步骤中的,然后发现每一个连接都会创建一个该线程,虽然功能目前不受影响但是总感觉不合理,所以就将该线程放在此处,因为博主觉得该广播消息处理器应该只能有一个也就是单例的,如有不合理还请指出!
public class CoreServer {
//广播消息发送线程,该处理器只能有一个,应该设计成单例
private static final SendMessage sendActive = new SendMessage();
private ServerSocket server;
//用于存放当前连接的客户端信息。
public static List connectThread = new ArrayList();
CoreServer() throws IOException {
server = new ServerSocket(9999);
}
//初始化服务器
@SneakyThrows
public void start() {
System.err.println("正在启动加载服务器.....");
//启动定时器,按时刷新在线人数
// timer();
//启动 发送广播消息线程
sendActive.start();
System.err.println("服务器启动成功!");
while (true) {
Socket socket = server.accept(); //等待连接
//每一个socket交给一个processorHandler处理,用来处理客户端发来的消息
ProcessorHandler processorHandler = new ProcessorHandler(socket);
processorHandler.start();
}
}
// @Scheduled(cron = "0 */1 * * * ?")
// private void timer() {
// new Timer().schedule(new TimerTask() {
// @Override
// public void run() {
// try {
// //do Something
// System.err.println("系统消息:当前在线人数 ["+connectThread.size()+"] 人!");
// } catch (Exception e) {
// e.printStackTrace();
// }
// }
// },0,1000*5);
//
// }
//在客户端连接和退出时进行提醒
public static void onLineClient() {
System.err.println("系统消息:当前在线人数 [" + connectThread.size() + "] 人!");
}
public static void main(String[] args) throws IOException {
CoreServer coreServer = new CoreServer();
coreServer.start();
}
}
2、SendMessage 广播消息线程
这块就比较简单,就是进行数据传输,大伙依旧看看就好
package com.chenxh.socket.handler;
import com.chenxh.socket.server.CoreServer;
import java.io.BufferedWriter;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.Scanner;
public class SendMessage extends Thread {
@Override
public void run() {
Scanner scan = new Scanner(System.in);
while (true) {
String line = scan.nextLine();
//将服务器消息分发给每个客户端
CoreServer.connectThread.forEach(item -> {
sendActive(item.getSocket(), line);
});
}
}
public static void sendActive(Socket socket, String str) {
/*
* 串联一个流连接,提升效率
*/
try {
if (!socket.isOutputShutdown()) {
OutputStream out = socket.getOutputStream();
OutputStreamWriter osw = new OutputStreamWriter(out, "UTF-8");
BufferedWriter bw = new BufferedWriter(osw);
//pw.print以及 bw.writer 都可以。两个选一个就行
// PrintWriter pw = new PrintWriter(bw, true);//此处注意参数true,若不添加此方法我们需要频繁调用flush方法来保证消息的及时性。
// pw.println(str);
bw.write(str);
bw.newLine();
bw.flush();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
3、与客户端交互线程
该线程主要用于接收和处理客户端发来的指令,目前程序控制的都是通过Scanner进行数据动态传输。 这块也不难,代码都有注释。 主要功能: 1.当客户端进行连接时会提示用户登录并显示在线人数 2.客户端进行键盘输入,在服务器控制台进行查看。 3.当客户端输入"end"时退出连接,并终止数据传输以及清除该连接数据。
package com.chenxh.socket.handler;
import com.chenxh.entity.User;
import com.chenxh.socket.server.CoreServer;
import lombok.Getter;
import org.apache.commons.lang3.ObjectUtils;
import java.io.*;
import java.net.Socket;
public class ProcessorHandler extends Thread {
private final ProcessorHandler processorHandler = this;
@Getter
private Socket socket;
private User user;
public ProcessorHandler(Socket socket) throws IOException, ClassNotFoundException {
this.socket = socket;
//在第一次连接时会调用该方法,并且第一连接时客户端传来的对象流我们再次进行读取以及存储
user = (User) new ObjectInputStream(socket.getInputStream()).readObject();
if (ObjectUtils.isEmpty(user)) {
throw new RuntimeException("deserializer exception please check params");
}
System.err.println("系统提示:" + "[" + user.getIp() + "]" + user.getUsername() + " 进入聊天室");
//将连接信息添加到集合中
CoreServer.connectThread.add(processorHandler);
CoreServer.onLineClient();
}
@Override
public void run() {
//接收处理客户发送来的消息线程
new ReceiveMessage().start();
}
//退出连接操作
public void exitServer() throws IOException {
//清楚存储的连接信息
CoreServer.connectThread.remove(processorHandler);
//停止对该socket的消息接收和发送。
socket.shutdownOutput();
socket.shutdownInput();
System.err.println("用户:" + user.getUsername() + " 退出聊天室!");
//显示在线人数
CoreServer.onLineClient();
}
class ReceiveMessage extends Thread {
@Override
public void run() {
try {
InputStream in = socket.getInputStream();
InputStreamReader isr = new InputStreamReader(in, "utf-8");
BufferedReader br = new BufferedReader(isr);
String str = null;
while ((str = br.readLine()) != null) {
if ("end".equals(str)) { //当客户端输入 end 认为客户端退出连接
exitServer();
} else {
System.out.println("[" + user.getIp() + "]" + user.getUsername() + "说:" + str);
}
}
} catch (Exception e) {
//用于处理非正常退出操作(end)
try {
exitServer();
} catch (IOException ex) {
System.err.println("系统异常:用户退出异常!");
}
}
}
}
}
socketclient
客户端
1、Client启动类 建立连接,并交由线程单独处理。
public class Client{
private Socket socket;
//缓存线程池
ExecutorService executorService = Executors.newCachedThreadPool();
public void connect() throws IOException {
socket = new Socket("127.0.0.1",9999);
User user = new User();
user.setUsername("Ccc");
executorService.execute(new UserThread(socket,user));
}
public static void main(String[] args) throws IOException {
Client client = new Client();
client.connect();
}
}
2、UserThread
用户线程 用于处理将消息发送到客户端以及收客户端发送来的广播消息。
package com.chenxh.socket.handler;
import com.chenxh.entity.User;
import lombok.SneakyThrows;
import java.io.*;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
public class UserThread implements Runnable {
private User user;
private Socket socket;
public UserThread(Socket socket, User user) throws IOException {
this.socket = socket;
this.user = user;
//第一次连接时将登陆进行传到服务端。
new ObjectOutputStream(socket.getOutputStream()).writeObject(new User("Ccc", "", socket.getInetAddress().getHostAddress()));
}
@SneakyThrows
@Override
public void run() {
//发送到服务器
new SendMessage().start();
//接收广播消息
new ReceiveMessage().start();
}
//发送线程
class SendMessage extends Thread{
@Override
public void run() {
try {
/*
* 串联一个流连接,提升效率
*/
OutputStream out = socket.getOutputStream();
OutputStreamWriter osw = new OutputStreamWriter(out, "UTF-8");
BufferedWriter bw = new BufferedWriter(osw);
PrintWriter pw = new PrintWriter(bw, true);//此处注意参数true,若不添加此方法我们需要频繁调用flush方法来保证消息的及时性。
Scanner scan = new Scanner(System.in);
while (true) {
String line = scan.nextLine();
bw.write(line);
bw.newLine();
bw.flush();
}
} catch (Exception e) {
System.err.println("消息提示:您已从服务器断开连接!");
}
}
}
//接收广播消息线程
class ReceiveMessage extends Thread{
@Override
public void run() {
try {
InputStream in = socket.getInputStream();
InputStreamReader isr = new InputStreamReader(in, "utf-8");
BufferedReader br = new BufferedReader(isr);
String str = null;
while ((str = br.readLine()) != null) {
System.out.println("[广播消息]:"+ str);
}
} catch (Exception e) {
System.err.println("消息提示:您已从服务器断开连接!");
}
}
}
}
代码效果实例: 服务器启动连接: 消息发送:
广播消息
正常退出:
点击终止异常退出
服务器异常:
没法上传视频 , – !大伙就将就看看。若发现不正确不合理的地方,还请能指点一二!