这里以两个客户端为例,一个客户端发广播,一个客户端收到广播后回消息
由于UDP默认是在主线程收发消息的,这会阻塞主程序,所以这里做了一点封装,将通信功能放到子线程
代码中线程和异常处理,用到了一些工具类,替换成自己的代码就行了
import com.easing.commons.java.code.Console;
import com.easing.commons.java.thread.Threads;
@SuppressWarnings("all")
public class ZZZ {
public static void main(String[] args) {
startClient2();
Threads.sleep(3000);
startClient1();
}
//先发消息
public static void startClient1() {
UdpSocketClient client = UdpSocketClient.init("客户端2", null, 18006);
client.eventHandler((packet, timestamp, ip, port) -> {
Console.info(client.name, "read", new String(packet).trim());
});
client.read();
Console.info(client.name, "write", "Hello World");
client.sendBroadcast("Hello World".getBytes(), 18005);
}
//收到消息后回消息
public static void startClient2() {
UdpSocketClient client = UdpSocketClient.init("客户端1", null, 18005);
client.eventHandler((packet, timestamp, ip, port) -> {
Console.info(client.name, "read", new String(packet).trim());
Console.info(client.name, "write", "Me Hello Too");
client.sendUnicast("Me Hello Too".getBytes(), ip, port);
});
client.read();
}
}
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.easing.commons.java.code.Console;
import com.easing.commons.java.helper.exception.BizException;
import com.easing.commons.java.thread.WorkThread;
import lombok.SneakyThrows;
@SuppressWarnings("all")
public class UdpSocketClient {
public String name;
DatagramSocket socket;
EventHandler eventHandler;
ExecutorService sender = Executors.newSingleThreadExecutor();
boolean close = false;
@SneakyThrows
public static UdpSocketClient init(String name, String ip, Integer port) {
UdpSocketClient client = new UdpSocketClient();
if (ip != null && port != null)
client.socket = new DatagramSocket(port, InetAddress.getByName(ip));
else if (port != null)
client.socket = new DatagramSocket(port);
else
client.socket = new DatagramSocket();
client.name = name;
return client;
}
//开始读数据
public UdpSocketClient read() {
WorkThread.postByLoop("#UDP读数据线程", () -> {
if (close)
throw BizException.THREAD_NORMAL_EXIT;
byte[] buffer = new byte[1024 * 1024];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
socket.receive(packet);
byte[] receiveBytes = Arrays.copyOf(buffer, packet.getLength());
long timestamp = System.currentTimeMillis();
String ip = packet.getAddress().getHostAddress();
int port = packet.getPort();
if (eventHandler != null)
eventHandler.onReceive(receiveBytes, timestamp, ip, port);
});
return this;
}
//设置数据接收监听器
public UdpSocketClient eventHandler(EventHandler eventHandler) {
this.eventHandler = eventHandler;
return this;
}
//关闭
public void close() {
this.close = true;
this.socket = null;
this.eventHandler = null;
this.sender.shutdown();
this.sender = null;
}
//发送单播消息
@SneakyThrows
public UdpSocketClient sendUnicast(byte[] data, String ip, int port) {
sender.submit(() -> {
try {
InetAddress address = InetAddress.getByName(ip);
DatagramPacket packet = new DatagramPacket(data, data.length, address, port);
socket.send(packet);
} catch (Exception e) {
Console.error(e);
}
});
return this;
}
//发送组播消息
//组播地址范围为224.0.0.0~239.255.255.255,发送端和接收端地址一样即可接收
@SneakyThrows
public UdpSocketClient sendMulticast(byte[] data, String ip, int port) {
sender.submit(() -> {
try {
InetAddress address = InetAddress.getByName(ip);
DatagramPacket packet = new DatagramPacket(data, data.length, address, port);
socket.send(packet);
} catch (Exception e) {
Console.error(e);
}
});
return this;
}
//发送广播消息
@SneakyThrows
public UdpSocketClient sendBroadcast(byte[] data, int port) {
sender.submit(() -> {
try {
InetAddress address = InetAddress.getByName("255.255.255.255");
DatagramPacket packet = new DatagramPacket(data, data.length, address, port);
socket.send(packet);
} catch (Exception e) {
Console.error(e);
}
});
return this;
}
//Socket回调
public interface EventHandler {
void onReceive(byte[] packet, long timestamp, String ip, int port);
}
}