Socket(套接字)網(wǎng)絡(luò)編程是在網(wǎng)絡(luò)中實(shí)現(xiàn)兩個(gè)進(jìn)程間通信的一種方式滴肿,其中由 ip 和端口組成 Socket 的作用就是唯一標(biāo)識(shí)網(wǎng)絡(luò)中端點(diǎn),以實(shí)現(xiàn)端與端之間的通信佃迄。其中最常用的傳輸層通信協(xié)議包括 TCP 和 UDP泼差,與具體語言無關(guān)。很多語言都提供了基于 TCP 和 UDP 的 Socket API呵俏,本文使用 Java 實(shí)現(xiàn)基于 Socket 的 C/S 通信模型堆缘。
TCP
- 通過建立連接實(shí)現(xiàn)端到端的通信,一對(duì)基于 TCP 通信的 Socket 由客戶端 ip普碎、客戶端端口吼肥、服務(wù)端 ip、服務(wù)端端口描述麻车;
- Java 的 io 包和 net 包分別提供了輸入輸出流和套接字相關(guān)的 API缀皱,基于這些 API 可以輕易構(gòu)建 TCP 服務(wù)端和客戶端。
Server.java
package tcp;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.ServerSocket;
import java.net.Socket;
/**
* TCP 服務(wù)端類
*
* @author ywh
* @since 3/4/2019
*/
public class Server {
/** 服務(wù)端端口 */
private final static int SERVER_PORT = 5_000;
/** 客戶端請(qǐng)求處理類 */
private static class RequestHandler extends Thread {
/** 每個(gè)客戶端請(qǐng)求處理線程對(duì)應(yīng)一個(gè) socket */
private Socket socket;
/** 請(qǐng)求處理標(biāo)記动猬,默認(rèn)為 true啤斗,false 表示結(jié)束處理 */
private boolean flag = true;
/** 結(jié)束標(biāo)記 */
private static final String END_TAG = "bye";
RequestHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
super.run();
// 打印客戶端的 ip 和端口
System.out.println(String.format("新的客戶端接入:%s:%d", socket.getInetAddress(), socket.getPort()));
try {
// 發(fā)送到客戶端:打印流
PrintStream output = new PrintStream(socket.getOutputStream());
// 從客戶端接收:輸入流緩沖
BufferedReader input = new BufferedReader(
new InputStreamReader(socket.getInputStream())
);
// 循環(huán)處理客戶端消息,直到接收到結(jié)束標(biāo)記
while (flag) {
// 阻塞赁咙,等待客戶端消息
String msg = input.readLine();
if (END_TAG.equalsIgnoreCase(msg)) {
flag = false;
}
System.out.println(String.format("接收到客戶端 [%s:%d] 消息:%s", socket.getInetAddress(), socket.getPort(), msg));
output.println("返回消息長(zhǎng)度:" + msg.length());
}
// 資源釋放
input.close();
output.close();
} catch (IOException ex) {
ex.printStackTrace();
} finally {
try {
socket.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
System.out.println(String.format("客戶端 [%s:%d] 已經(jīng)斷開連接", socket.getInetAddress(), socket.getPort()));
}
}
public static void main(String[] args) throws IOException {
ServerSocket server = new ServerSocket(SERVER_PORT);
System.out.println(String.format("服務(wù)端 [%s:%d] 啟動(dòng)...", server.getInetAddress(), server.getLocalPort()));
while (true) {
// 阻塞钮莲,等待客戶端接入
Socket client = server.accept();
// 創(chuàng)建線程處理接入的客戶端 Socket(不建議手動(dòng)創(chuàng)建線程執(zhí)行,而應(yīng)使用線程池)
RequestHandler requestHandler = new RequestHandler(client);
requestHandler.start();
}
}
}
Client.java
package tcp;
import java.io.*;
import java.net.Inet4Address;
import java.net.InetSocketAddress;
import java.net.Socket;
/**
* TCP 客戶端
*
* @author ywh
* @since 3/4/2019
*/
public class Client {
/** 服務(wù)端端口 */
private final static int SERVER_PORT = 5_000;
/** 連接超時(shí)時(shí)間 */
private final static int TIMEOUT = 3_000;
/** 結(jié)束標(biāo)記 */
private static final String END_TAG = "bye";
/**
* 連接服務(wù)端后發(fā)送數(shù)據(jù)
*
* @param client
* @throws IOException
*/
private static void process(Socket client) throws IOException {
// 從標(biāo)準(zhǔn)輸入流接收數(shù)據(jù)
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
// 發(fā)送到服務(wù)端:發(fā)送從 reader 接收的數(shù)據(jù)
PrintStream output = new PrintStream(client.getOutputStream());
// 從服務(wù)端接收
BufferedReader input = new BufferedReader(
new InputStreamReader(client.getInputStream())
);
boolean flag = true;
while (flag) {
// 從標(biāo)準(zhǔn)輸入流讀入一行彼水,發(fā)送到服務(wù)端
System.out.print("發(fā)送到服務(wù)端:");
String msg = reader.readLine();
output.println(msg);
// 從服務(wù)端接收(阻塞)
String response = input.readLine();
if (END_TAG.equalsIgnoreCase(msg)) {
flag = false;
}
System.out.println(response);
}
// 資源釋放
input.close();
output.close();
}
public static void main(String[] args) throws IOException {
// 創(chuàng)建客戶端 Socket崔拥,設(shè)置超時(shí)時(shí)間、連接的服務(wù)端 ip 和端口
Socket client = new Socket();
client.setSoTimeout(3000);
client.connect(
new InetSocketAddress(Inet4Address.getLocalHost(), SERVER_PORT), 3000
);
System.out.println(
String.format("客戶端 [%s:%d] 連接到服務(wù)端 [%s:%d] ",
client.getLocalAddress(), client.getLocalPort(),
client.getInetAddress(), client.getPort()
)
);
try {
process(client);
} catch (IOException ex) {
ex.printStackTrace();
}
client.close();
System.out.println(String.format("客戶端 [%s:%d] 已斷開連接", client.getLocalAddress(), client.getLocalPort()));
}
}
- 服務(wù)端進(jìn)程需要指定一個(gè)端口創(chuàng)建
ServerSocket
用于監(jiān)聽來自客戶端的接入請(qǐng)求(此時(shí)在循環(huán)中阻塞)猿涨,每有新的客戶端接入即創(chuàng)建一個(gè)處理請(qǐng)求的線程(每個(gè)線程對(duì)應(yīng)一個(gè)客戶端Socket
)握童; - 請(qǐng)求處理線程啟動(dòng)后,創(chuàng)建輸入和輸出流叛赚,在循環(huán)中阻塞等待客戶端消息和返回消息給客戶端澡绩,直到接收到結(jié)束標(biāo)記才退出循環(huán)、釋放資源俺附;
- 客戶端進(jìn)程創(chuàng)建
Socket
的同時(shí)需要指定接入服務(wù)端的 ip 和端口(InetSocketAddress
)肥卡、連接超時(shí)時(shí)間(而自身的端口則隨機(jī)分配); - 成功接入服務(wù)端后事镣,消息交互與服務(wù)端類似(阻塞等待接收消息并打印步鉴、發(fā)送結(jié)束標(biāo)記后斷開連接和釋放資源);
- 分別執(zhí)行 Server.java 和 Client.java,輸出結(jié)果:
Client
Server
UDP
- 區(qū)別于 TCP氛琢,UDP 沒有建立連接的過程喊递,而且任何一端都可以作為消息的發(fā)送者和接收者,Socket 由源端口和目的端口描述阳似;根據(jù)接收者的數(shù)量骚勘,通信方式還可以分為單播、廣播和組播撮奏;
- UDP 通信不區(qū)分服務(wù)端與客戶端俏讹,但在示例中為了方便標(biāo)識(shí)通信的兩端,還是定義為 Server(接收消息畜吊、返回特定格式的響應(yīng)報(bào)文)和 Client(搜索并發(fā)送消息給網(wǎng)絡(luò)中的 Server)泽疆。
Server.java
package udp;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.util.UUID;
/**
* UDP 消息服務(wù)端類
* 根據(jù)接收的消息提供服務(wù)(提供的服務(wù)為返回消息長(zhǎng)度)
*
* @author ywh
* @since 3/4/2019
*/
public class Server {
/** 服務(wù)端端端口 */
private final static int SERVER_PORT = 5_000;
private static final String EXIT_CMD = "exit";
private static class Worker extends Thread {
/** 消息提供者唯一標(biāo)識(shí) */
private final String id;
/** 每個(gè)消息提供者對(duì)應(yīng)一個(gè) Socket */
private DatagramSocket datagramSocket = null;
/** 請(qǐng)求處理標(biāo)記,默認(rèn)為 true玲献,false 表示結(jié)束處理 */
private boolean flag = true;
public Worker() {
super();
this.id = UUID.randomUUID().toString();
}
/**
* 處理接收到的消息殉疼,返回消息長(zhǎng)度到指定的 ip 和端口
*/
@Override
public void run() {
super.run();
System.out.println("消息接收端啟動(dòng)...");
try {
datagramSocket = new DatagramSocket(SERVER_PORT);
while(flag) {
// 構(gòu)建 DatagramPacket 緩沖,用于存放從 Searcher 接收的消息
final byte[] buffer = new byte[1024];
DatagramPacket receivePacket = new DatagramPacket(buffer, buffer.length);
datagramSocket.receive(receivePacket);
String[] data = new String(receivePacket.getData()).split("&");
// 從接收消息解析出響應(yīng)的目標(biāo)端口捌年,默認(rèn)格式為:port&id&msg
int responsePort = Integer.parseInt(data[0]);
String clientId = data[1];
String msg = data[2].trim();
System.out.println(
String.format(
"從 (%s)[%s:%d] 接收到消息:%s",
clientId,
receivePacket.getAddress().getHostAddress(),
receivePacket.getPort(),
msg
)
);
byte[] responseData = String.format(
"%d&%s&消息長(zhǎng)度:%d", datagramSocket.getPort(), this.id, msg.length()
).getBytes();
DatagramPacket responsePacket = new DatagramPacket(
responseData, responseData.length, receivePacket.getAddress(), responsePort
);
datagramSocket.send(responsePacket);
}
} catch (IOException ex) {
ex.printStackTrace();
} finally {
if (datagramSocket != null) {
datagramSocket.close();
datagramSocket = null;
}
}
}
public void exit() {
flag = false;
}
}
public static void main(String[] args) throws IOException {
Worker worker = new Worker();
worker.start();
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
while(!EXIT_CMD.equalsIgnoreCase(reader.readLine())) {}
worker.exit();
}
}
Client.java
package udp;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
/**
* UDP 消息客戶類株依,
* 模擬服務(wù)請(qǐng)求方,自動(dòng)搜索消息接收者
*
* @author ywh
* @since 3/4/2019
*/
public class Client {
/** 服務(wù)端端端口 */
private final static int SERVER_PORT = 5_000;
/** 監(jiān)聽端口 */
private static final int LISTEN_PORT = 30_000;
/**
* 服務(wù)端類
*/
private static class Server {
final int port;
final String ip;
final String id;
private Server(String ip, int port, String id) {
this.port = port;
this.ip = ip;
this.id = id;
}
@Override
public String toString() {
return "Server{" +
"port=" + port +
", ip='" + ip + '\'' +
", id='" + id + '\'' +
'}';
}
}
private static class Worker extends Thread {
private final int listenPort = LISTEN_PORT;
private final List<Server> serverList = new ArrayList<>();
private boolean flag = true;
private DatagramSocket datagramSocket = null;
public Worker() {
super();
}
@Override
public void run() {
super.run();
try {
// 監(jiān)聽消息回復(fù)端口
datagramSocket = new DatagramSocket(listenPort);
while (flag) {
final byte[] buffer = new byte[1024];
DatagramPacket receivePacket = new DatagramPacket(buffer, buffer.length);
datagramSocket.receive(receivePacket);
String[] data = new String(receivePacket.getData()).split("&");
// 從接收消息解析出響應(yīng)的目標(biāo)端口延窜,默認(rèn)格式為:port&id&msg
int responsePort = Integer.parseInt(data[0]);
String serverId = data[1];
String msg = data[2];
String serverIp = receivePacket.getAddress().getHostAddress();
int serverPort = receivePacket.getPort();
System.out.println(
String.format(
"從 (%s)[%s:%d] 接收到處理結(jié)果:%s", serverId, serverIp, serverPort, msg
)
);
// 添加到設(shè)備列表
serverList.add(new Server(serverIp, serverPort, serverId));
}
} catch (IOException ex) {
ex.printStackTrace();
} finally {
if (datagramSocket != null) {
datagramSocket.close();
datagramSocket = null;
}
}
}
List<Server> exit() {
flag = false;
return serverList;
}
}
/**
* 監(jiān)聽廣播消息
*
* @return
* @throws InterruptedException
*/
private static Worker listen() throws InterruptedException {
Worker worker = new Worker();
worker.start();
return worker;
}
/**
* 發(fā)送廣播消息
*
* @throws IOException
*/
private static void send() throws IOException {
System.out.println("客戶端發(fā)送廣播消息...");
DatagramSocket datagramSocket = new DatagramSocket();
byte[] requestData = String.format("%d&%s&%s", LISTEN_PORT, UUID.randomUUID().toString(), "求本消息的長(zhǎng)度").getBytes();
DatagramPacket requestPacket = new DatagramPacket(requestData, requestData.length);
requestPacket.setAddress(InetAddress.getByName("255.255.255.255"));
requestPacket.setPort(SERVER_PORT);
datagramSocket.send(requestPacket);
datagramSocket.close();
System.out.println("客戶端發(fā)送廣播消息結(jié)束...");
}
public static void main(String[] args) throws IOException, InterruptedException{
System.out.println("客戶端啟動(dòng)...");
Worker worker = listen();
// 發(fā)送消息請(qǐng)求服務(wù)
send();
// 輸入任意內(nèi)容即退出
System.in.read();
List<Server> serverList = worker.exit();
for(Server server: serverList) {
System.out.println("Server: " + server.toString());
}
}
}
- 雖然前面已經(jīng)提到 UDP 通信不區(qū)分客戶端和服務(wù)端,但在本場(chǎng)景中 Server 在網(wǎng)絡(luò)中可建立多個(gè)逆瑞,其作用都是接收來自來自 Client 的廣播消息荠藤、返回該消息的長(zhǎng)度获高;
- 區(qū)別于 TCP 直接通過 Socket 的輸入輸出流收發(fā)消息亿汞,UDP 的消息都是以
byte
數(shù)組的形式封裝在DatagramPacket
中嘁圈,其中DatagramPacket
攜帶著 ip 和端口等信息魏身; - Client 發(fā)送廣播報(bào)文到網(wǎng)絡(luò)中所有主機(jī)的指定端口止吐,并在接收到響應(yīng)的同時(shí)記錄可為其提供服務(wù)的主機(jī)以供最終輸出:
requestPacket.setAddress(InetAddress.getByName("255.255.255.255"));
requestPacket.setPort(SERVER_PORT);
- 客戶端在向服務(wù)端發(fā)送消息的同時(shí)監(jiān)聽一個(gè)回傳端口宝踪,并在消息攜帶上該端口(格式為
port&id&msg
),使得服務(wù)端可以把響應(yīng)消息發(fā)送到客戶端的指定端口碍扔; - 分別執(zhí)行 Server.java 和 Client.java瘩燥,輸出結(jié)果:
Server
Client