概述
??套接字(socket)是一個(gè)抽象層屈雄,應(yīng)用程序可以通過它發(fā)送或接收數(shù)據(jù)源梭,可對其進(jìn)行像對文件一樣的打開、讀寫和關(guān)閉等操作裹驰。套接字允許應(yīng)用程序?qū)/O插入到網(wǎng)絡(luò)中,并與網(wǎng)絡(luò)中的其他應(yīng)用程序進(jìn)行通信片挂。網(wǎng)絡(luò)套接字是IP地址與端口的組合幻林。
一、Web服務(wù)器
??Socket最初是加利福尼亞大學(xué)Berkeley分校為Unix系統(tǒng)開發(fā)的網(wǎng)絡(luò)通信接口音念。后來隨著TCP/IP網(wǎng)絡(luò)的發(fā)展沪饺,Socket成為最為通用的應(yīng)用程序接口,也是在Internet上進(jìn)行應(yīng)用開發(fā)最為通用的API闷愤。
下面開始創(chuàng)建一個(gè)簡陋的Web服務(wù)器
創(chuàng)建Main類監(jiān)聽端口等待請求:
/**
* 簡陋的Web服務(wù)器
*/
public class Main {
// 靜態(tài)文件位置
private final String classPath = this.getClass().getClassLoader().getResource("./").getPath();
// 靜態(tài)文件類型
public static Set<String> fileType;
static {
fileType = new HashSet<>();
fileType.add("png");
fileType.add("ico");
}
public static void main(String[] args) {
new Main(80);
}
public Main(int port) {
try {
// 開啟服務(wù)監(jiān)聽端口
ServerSocket server = new ServerSocket(port);
while (true) {
// 阻塞整葡,等待請求
Socket client = server.accept();
// TODO 使用線程池
// 創(chuàng)建線程處理請求
new Thread(new Handle(client, classPath)).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
創(chuàng)建請求處理類Handle:
/**
* web請求處理
*/
public class Handle implements Runnable {
private java.net.Socket socket;
private String classPath;
public Handle(java.net.Socket socket, String classPath) {
this.socket = socket;
this.classPath = classPath;
}
@Override
public void run() {
try (InputStream inputStream = socket.getInputStream();
OutputStream outputStream = socket.getOutputStream()) {
BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
PrintWriter out = new PrintWriter(outputStream, true);
// 請求類型
String method = null;
// 請求頭屬性
String[] headers;
// 請求地址
String path = null;
// 接收請求頭屬性
String line;
while ((line = br.readLine()) != null && !"".equals(line)) {
if (method == null) {
headers = line.split(" ");
method = headers[0];
path = headers[1];
}
}
// 處理路徑
if (path != null) {
// 請求地址處理
String[] pathVariable = path.split("/");
if (pathVariable.length == 0) {
out.println("HTTP/1.1 200 OK");
out.println();
out.println("<font style=\"color:red;font-size:50\">Hello world!</font>");
return;
}
// 請求靜態(tài)文件處理
String lastRoute = pathVariable[pathVariable.length - 1];
if (lastRoute.contains(".")) {
// 處理靜態(tài)文件
if (Main.fileType.contains(lastRoute.split("[.]")[1])) {
File file = new File(classPath + lastRoute);
if (file.exists()) {
try (FileInputStream fis = new FileInputStream(file)) {
this.socket.getOutputStream().write(fis.readAllBytes());
} catch (IOException e) {
e.printStackTrace();
}
return;
}
}
}
// 404
out.println("<font style=\"font-size:50\">404</font>");
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (this.socket != null) {
this.socket.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
以上一個(gè)非常簡陋的Web服務(wù)器就完成了,接下來對本地80端口進(jìn)行測試
二讥脐、WebSocket
??WebSocket是一種在單個(gè)TCP連接上進(jìn)行全雙工通信的協(xié)議遭居。WebSocket通信協(xié)議于2011年被IETF定為標(biāo)準(zhǔn)RFC 6455,并由RFC7936補(bǔ)充規(guī)范旬渠。WebSocket API也被W3C定為標(biāo)準(zhǔn)俱萍。
??WebSocket使得客戶端和服務(wù)器之間的數(shù)據(jù)交換變得更加簡單,允許服務(wù)端主動(dòng)向客戶端推送數(shù)據(jù)告丢。在WebSocket API中枪蘑,瀏覽器和服務(wù)器只需要完成一次握手,兩者之間就直接可以創(chuàng)建持久性的連接岖免,并進(jìn)行雙向數(shù)據(jù)傳輸腥寇。
下面開始創(chuàng)建WebSocket服務(wù)器:
創(chuàng)建后續(xù)需要用到的工具類CodingUtil:
public class CodingUtil {
/**
* 字節(jié)數(shù)組轉(zhuǎn)長整型
*
* @param bytes 字節(jié)數(shù)組
* @return Long
*/
public static long bytesToLong(byte[] bytes) {
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
buffer.put(bytes);
buffer.flip();
return buffer.getLong();
}
/**
* 對Sec-WebSocket-Key密鑰進(jìn)行加密
* 生成Sec-WebSocket-Accept值
*
* @param key 密鑰
* @return String
* @throws NoSuchAlgorithmException
*/
public static String encryption(String key) throws NoSuchAlgorithmException {
MessageDigest md = MessageDigest.getInstance("SHA1");
md.update((key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11").getBytes());
byte[] bytes = md.digest();
return new String(Base64.getEncoder().encode(bytes));
}
/**
* 取指定位置bit
*
* @param b 字節(jié)
* @param index 位置
* @return
*/
public static int getByteBit(byte b, int index) {
return (b & 0x80 >> index) >> (7 - index);
}
/**
* 取指定區(qū)間bit
*
* @param b 字節(jié)
* @param start 開始位置
* @param end 結(jié)束位置
* @return int
*/
public static int getByteBits(byte b, int start, int end) {
return (b << start & 0xff) >> (7 - end + start);
}
}
CodingUtil類中encryption()方法是根據(jù)WebSocket協(xié)議https://tools.ietf.org/html/rfc6455#section-1.3,對接收到的Sec-WebSocket-Key值與258EAFA5-E914-47DA-95CA-C5AB0DC85B11連接進(jìn)行SHA1加密后再通過Base64編碼得到Sec-WebSocket-Accept值再返回給客戶端觅捆。
創(chuàng)建WebSocketService類作為服務(wù)器啟動(dòng)入口:
public class WebSocketService {
private SocketManager socketManager;
public WebSocketService() {
this.socketManager = new SocketManager();
}
/**
* 監(jiān)聽指定端口 默認(rèn)80
* 等待客戶端連接
* 連接后創(chuàng)建一個(gè)線程進(jìn)行處理
*
* @return WebSocketService
*/
public WebSocketService start(int port) {
new Thread(new ManageCore(socketManager)).start();
try {
ServerSocket server = new ServerSocket(port);
while (true) {
Socket client = server.accept();
new Thread(new SocketHandle(client, socketManager)).start();
}
} catch (IOException e) {
e.printStackTrace();
}
return this;
}
}
該類中創(chuàng)建了SocketManager類來管理客戶端連接,實(shí)例化ManageCore類作為控制中心交給一個(gè)線程進(jìn)行管理麻敌。開啟Socket服務(wù)監(jiān)聽指定端口栅炒,使用socket的accept()方法阻塞線程等待請求。當(dāng)請求到來開啟一個(gè)線程進(jìn)行處理(SocketHandle類)术羔。
創(chuàng)建SocketManager類:
public class ManageCore implements Runnable {
private SocketManager socketManager;
public ManageCore(SocketManager socketManager) {
this.socketManager = socketManager;
}
@Override
public void run() {
Scanner scanner = new Scanner(System.in);
Map<String, String> message = new HashMap<>();
message.put("name", "admin");
// 廣播消息
while (true) {
message.put("content", scanner.nextLine());
socketManager.broadcast(JSON.toJSONString(message));
}
}
}
創(chuàng)建SocketManager類:
public class SocketManager {
private LinkedList<OutputStream> outputStreams;
public SocketManager() {
outputStreams = new LinkedList<>();
}
/**
* 添加緩存
*
* @param outputStream
*/
public void add(OutputStream outputStream) {
outputStreams.add(outputStream);
}
/**
* 刪除緩存
*
* @param outputStream
*/
public void remove(OutputStream outputStream) {
outputStreams.remove(outputStream);
}
/**
* 廣播
*
* @param message 消息
*/
public void broadcast(String message) {
Iterator<OutputStream> iterator = outputStreams.listIterator();
while (iterator.hasNext()) {
try {
OutputStream outputStream = iterator.next();
push(message.getBytes(), outputStream);
} catch (IOException e) {
iterator.remove();
}
}
}
/**
* 通知
*
* @param message 消息
*/
public void notice(String message, OutputStream outputStream) {
try {
push(message.getBytes(), outputStream);
} catch (IOException e) {
remove(outputStream);
}
}
/**
* 推消息
*
* @param bytes
* @param outputStream
* @throws IOException
*/
public void push(byte[] bytes, OutputStream outputStream) throws IOException {
outputStream.write(new byte[]{(byte) 0x81, (byte) bytes.length});
outputStream.write(bytes);
}
/**
* 返回當(dāng)前緩存的連接數(shù)
*
* @return int
*/
public int getCurrentConnNum() {
return outputStreams.size();
}
}
接下來創(chuàng)建SocketHandle類來對請求進(jìn)行處理:
public class SocketHandle implements Runnable {
private Socket socket;
private SocketManager socketManager;
/**
* @param socket 與客戶端之間的連接
*/
public SocketHandle(Socket socket, SocketManager socketManager) {
this.socket = socket;
this.socketManager = socketManager;
}
@Override
public void run() {
try (InputStream inputStream = socket.getInputStream();
OutputStream outputStream = socket.getOutputStream()) {
// 處理報(bào)文,過濾請求
MessageFilter messageFilter = new MessageFilter();
messageFilter.doFilter(inputStream, outputStream);
String path = messageFilter.getPath();
// 緩存與客戶端的消息發(fā)送通道
socketManager.add(outputStream);
// 開啟接收消息
Thread receiveThread = new Thread(new WebSocketReceive(inputStream, socketManager));
receiveThread.start();
// 發(fā)送心跳包
while (receiveThread.isAlive()) {
Thread.sleep(15000);
socketManager.notice("h", outputStream);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (this.socket != null) {
this.socket.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
當(dāng)瀏覽器使用websocket請求時(shí)服務(wù)端接收到的報(bào)文如下:
GET / HTTP/1.1
Host: localhost
Connection: Upgrade
Pragma: no-cache
Cache-Control: no-cache
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.90 Safari/537.36
Upgrade: websocket
Origin: file://
Sec-WebSocket-Version: 13
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-CN,zh;q=0.9
Cookie: _ga=GA1.1.420931640.1569149707
Sec-WebSocket-Key: q+RL7D/fr9/WQHlF2OK/Nw==
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
我們可以創(chuàng)建一個(gè)過濾器來對報(bào)文進(jìn)行過濾(根據(jù)自己的需求進(jìn)行過濾)赢赊,判斷是WebSocket請求還是其他請求,若修改請求處理線程類SocketHandle级历,可根據(jù)過濾結(jié)果使用對應(yīng)的處理器释移,使WebSocket服務(wù)器與上面的Web服務(wù)器結(jié)合使用:
public class MessageFilter {
private String path;
public void doFilter(InputStream inputStream, OutputStream outputStream) throws Exception {
// 讀取請求頭屬性
BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
String line;
HashMap<String, String> headers = new HashMap<>();
while ((line = br.readLine()) != null && !"".equals(line)) {
String[] header = line.split(": ");
if (header.length == 2) {
headers.put(header[0], header[1]);
} else {
String[] parame = line.split(" ");
if (parame.length > 2) {
this.path = parame[1];
}
}
}
// 驗(yàn)證請求
if (whether(headers)) {
throw new Exception("缺少必要的header");
}
// 返回請求頭
PrintWriter out = new PrintWriter(outputStream);
out.println("HTTP/1.1 101 Switching Protocols");
out.println("Connection: Upgrade");
out.println("Sec-WebSocket-Accept: " + CodingUtil.encryption(headers.get("Sec-WebSocket-Key")));
out.println("Upgrade: websocket");
out.println();
out.flush();
}
/**
* 確定是否存在對應(yīng)屬性
*
* @param headers 請求頭屬性
* @return boolean
*/
public boolean whether(HashMap<String, String> headers) {
if (!"Upgrade".equals(headers.get("Connection"))) {
return false;
}
if (headers.get("Sec-WebSocket-Accept") == null) {
return false;
}
if (!"websocket".equals(headers.get("Upgrade"))) {
return false;
}
return true;
}
public String getPath() {
return path;
}
}
過濾請求,確定服務(wù)后緩存與客戶端的消息發(fā)送通道寥殖,開啟接收客戶端消息的線程并發(fā)送心跳玩讳。
創(chuàng)建WebSocketReceive類接收客戶端消息:
public class WebSocketReceive implements Runnable {
private InputStream inputStream;
private SocketManager socketManager;
public WebSocketReceive(InputStream inputStream, SocketManager socketManager) {
this.inputStream = inputStream;
this.socketManager = socketManager;
}
@Override
public void run() {
while (true) {
try {
receive();
} catch (Exception e) {
break;
}![![connect.png](https://upload-images.jianshu.io/upload_images/18713780-fe629e48150c46ce.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
](https://upload-images.jianshu.io/upload_images/18713780-9b5d9abfc607e017.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
}
}
public void receive() throws IOException {
byte[] frameReader = inputStream.readNBytes(1);
// FIN表示這是消息中的最后一個(gè)片段
// 當(dāng)FIN為0時(shí)表示不是最后一個(gè)片段涩蜘,為1時(shí)表示最后一個(gè)片段
int fin = getByteBit(frameReader[0], 0);
// RSV 是保留的擴(kuò)展定義位,沒有擴(kuò)展的情況下為0
int rsv1 = getByteBit(frameReader[0], 1);
int rsv2 = getByteBit(frameReader[0], 2);
int rsv3 = getByteBit(frameReader[0], 3);
// opcode操作碼
int opcode = getByteBits(frameReader[0], 4, 7);
switch (opcode) {
case 0:
// 連續(xù)幀
break;
case 1:
// 文本幀
break;
case 2:
// 二進(jìn)制幀
break;
case 8:
// 連接關(guān)閉
throw new IOException("socket close");
case 9:
// ping
break;
case 10:
// pone
break;
}
// mask掩碼
frameReader = inputStream.readNBytes(1);
int mask = getByteBit(frameReader[0], 0);
// payload len 有效載荷
int payloadLen = getByteBits(frameReader[0], 1, 7);
// extended payload length 有效載荷長度延長
long extendedPayloadLen = payloadLen;
if (payloadLen == 126) {
// 讀2個(gè)字節(jié)
extendedPayloadLen = CodingUtil.bytesToLong(inputStream.readNBytes(2));
} else if (payloadLen == 127) {
// 讀8個(gè)字節(jié)
extendedPayloadLen = CodingUtil.bytesToLong(inputStream.readNBytes(8));
}
// 獲得屏蔽鍵
byte[] maskingKey = null;
if (mask == 1) {
maskingKey = inputStream.readNBytes(4);
}
// 解碼
frameReader = inputStream.readNBytes(Long.valueOf(extendedPayloadLen).intValue());
if (maskingKey != null) {
byte[] encodeBytes = new byte[frameReader.length];
for (int i = 0; i < encodeBytes.length; i++) {
encodeBytes[i] = (byte) (frameReader[i] ^ maskingKey[i % 4]);
}
String message = new String(encodeBytes);
socketManager.broadcast(message);
System.out.println(message);
// 自定義的消息格式 JSON解析使用了alibaba的fastjson
// JSONObject messager = JSON.parseObject(message);
// System.out.println(messager.get("name") + " : " + messager.get("content"));
}
}
/**
* 取指定位置bit
*
* @param b 字節(jié)
* @param index 位置
* @return
*/
private int getByteBit(byte b, int index) {
return (b & 0x80 >> index) >> (7 - index);
}
/**
* 取指定區(qū)間bit
*
* @param b 字節(jié)
* @param start 開始位置
* @param end 結(jié)束位置
* @return int
*/
private int getByteBits(byte b, int start, int end) {
return (b << start & 0xff) >> (7 - end + start);
}
}
在WebSocket協(xié)議中熏纯,使用幀傳輸數(shù)據(jù)同诫,以上類只完成了對幀數(shù)據(jù)的獲取和解碼,為了簡單性對于其他機(jī)制的處理并沒有按照協(xié)議(可仔細(xì)閱讀協(xié)議自行補(bǔ)充)樟澜,但是足夠完成與客戶端的通信要求误窖。
最后可在WebSocketReceive類中使用SocketManager類進(jìn)行消息廣播或者自定義一對一的消息通信。
完成以上步驟后就可以創(chuàng)建一個(gè)WebSocket類指定端口開啟WebSocket服務(wù)了:
public class WebSocket {
public static void main(String[] args) {
new WebSocketService().start(80);
}
}
接下來編寫html頁面來請求WebSocket服務(wù)器:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>群聊</title>
<script src="https://cdn.bootcss.com/jquery/3.4.0/jquery.slim.min.js"></script>
</head>
<body>
<div>
<label>請輸入用戶名:</label>
<input type="text" id="name" placeholder="用戶名">
</div>
<div>
<button type="button" id="connect">連接</button>
<button type="button" id="disconnect">斷開連接</button>
</div>
<div id="chat">
</div>
<div>
<input type="text" id="content" placeholder="內(nèi)容" style="display: inline-block;">
<button type="button" id="send" style="display: inline-block;">發(fā)送</button>
</div>
<ul id="info" style="list-style: none">
</ul>
<script>
function connect() {
var ws = new WebSocket("ws://localhost");
ws.onopen = function(){
$('#chat').text('連接成功');
};
ws.onmessage = function (evt){
try{
update(JSON.parse(evt.data));
}catch(err){
}
};
ws.onclose = function(){
$('#chat').text('');
};
$('#send').click(function () {
ws.send(JSON.stringify({'name': $('#name').val(), 'content': $('#content').val()}));
});
$('#disconnect').click(function () {
ws.close();
});
}
function update(message) {
$('#info').append('<li><b>' + message.name + ' : </b>' + message.content + '</li>')
}
$(function () {
$('#connect').click(function () {
connect();
});
})
</script>
</body>
</html>
測試:
連接WebSocket服務(wù)器:
服務(wù)端接收秩贰、發(fā)送消息:
客戶端發(fā)送消息: