m3u8 文件是 HTTP Live Streaming(縮寫(xiě)為 HLS) 協(xié)議的部分內(nèi)容狂鞋,而 HLS 是一個(gè)由蘋(píng)果公司提出的基于 HTTP 的流媒體網(wǎng)絡(luò)傳輸協(xié)議片择。
關(guān)于m3u8 格式詳解,可以參考此文:m3u8 文件格式詳解
JS的實(shí)現(xiàn)版本可以參考這位博主的gitHub:m3u8 視頻在線提取工具
在這里骚揍,我先基于python代碼來(lái)說(shuō)解怎么去提取m3u8文件并合并成真正的視頻文件字管。
一個(gè)正常的m3u8文件格式如下:
#EXTM3U
#EXT-X-VERSION:6
#EXT-X-TARGETDURATION:3
#EXT-X-MEDIA-SEQUENCE:0
#EXT-X-PLAYLIST-TYPE:VOD
#EXT-X-MAP:URI="https://xxxx/init.mp4"
#EXTINF:3.00000000000000000000,
https://xxxx/0.m4s
#EXTINF:3.00000000000000000000,
https://xxxxx/1.m4s
#EXTINF:3.00000000000000000000,
https://xxxx/2.m4s
#EXTINF:1.00000000000000000000,
https://xxxx/3.m4s
#EXT-X-ENDLIST
最需要注意的關(guān)鍵節(jié)點(diǎn)是:EXT-X-KEY(表示視頻會(huì)經(jīng)過(guò)指定算法加密)啰挪、EXT-X-MAP(最初的視頻分片,不一定存在)嘲叔、EXTINF(按照順序的視頻分片以及這個(gè)分片的播放秒數(shù))
定義解析函數(shù):m3u8_convert
"""
_url:m3u8 的下載地址
save_path:待保存的視頻本地路徑
"""
def m3u8_convert(_url, save_path):
writer = open(save_path, 'wb')
if not writer:
print("%s 沒(méi)法成功打開(kāi)" % save_path)
return
#先下載文件
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.100 Safari/537.36'}
rs = callAPI(_url)
list_content = rs.decode(encoding = 'utf-8').split('\n')
player_list = []
# key 以后處理加解密的操作
key = ''
#進(jìn)行文件格式解析
for index, line in enumerate(list_content):
# 判斷視頻是否經(jīng)過(guò)AES-128加密
tmp_key = checkExtKey(line)
if len(tmp_key) > 0:
key = tmp_key
else:
next_line = ""
if index < len(list_content)-1:
next_line = list_content[index + 1]
href = checkExtInf(line, next_line)
if len(href) > 0:
player_list.append(href)
#下載每一個(gè)視頻分片亡呵,并保存到本地文件里
for i, _url in enumerate(player_list):
print('正在下載文件:%s' % _url)
_bytes = callAPI(_url)
print('已下載文件大小:%d' % len(_bytes))
writer.write(_bytes)
writer.close()
print('視頻生成完成')
函數(shù)中硫戈,需要對(duì)該三個(gè)節(jié)點(diǎn)EXT-X-KEY锰什、EXT-X-MAP、EXTINF進(jìn)行處理丁逝,由于找不到比較好的已加密的視頻demo 汁胆,所以我的代碼里暫時(shí)忽略對(duì)EXT-X-KEY處理。
# 檢測(cè)EXT-X-KEY霜幼,提取key
def checkExtKey(line):
if "#EXT-X-KEY" in line:
method_pos = line.find("METHOD")
comma_pos = line.find(",")
method = line[method_pos:comma_pos].split('=')[1] # 獲取加密方式
print("Decode Method:", method)
uri_pos = line.find("URI")
quotation_mark_pos = line.rfind('"')
key_url = line[uri_pos:quotation_mark_pos].split('"')[1]
key = callAPI(key_url).decode(encoding='utf-8') # 獲取加密密鑰
print("key:", key)
return key
return ""
# 檢測(cè)EXTINF 和 EXT-X-MAP
def checkExtInf(line, next_line):
href = ""
if '#EXTINF' in line:
# 提取下一行的http 鏈接地址
if 'http' in next_line:
href = next_line
elif '#EXT-X-MAP' in line:
# 提取最初的視頻地址
uri_pos = line.find("URI=\"")
if uri_pos > -1:
href = line[uri_pos + 5:-1]
return href
完整的代碼如下:
# -*- coding: UTF-8 -*-
import requests
import os
# from Crypto.Cipher import AES
def callAPI(_url):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/75.0.3770.100 Safari/537.36'}
return requests.get(_url, headers=headers).content
# 檢測(cè)EXT-X-KEY嫩码,提取key
def checkExtKey(line):
if "#EXT-X-KEY" in line:
method_pos = line.find("METHOD")
comma_pos = line.find(",")
method = line[method_pos:comma_pos].split('=')[1] # 獲取加密方式
print("Decode Method:", method)
uri_pos = line.find("URI")
quotation_mark_pos = line.rfind('"')
key_url = line[uri_pos:quotation_mark_pos].split('"')[1]
key = callAPI(key_url).decode(encoding='utf-8') # 獲取加密密鑰
print("key:", key)
return key
return ""
# 檢測(cè)EXTINF 和 EXT-X-MAP
def checkExtInf(line, next_line):
href = ""
if '#EXTINF' in line:
# 提取下一行的http 鏈接地址
if 'http' in next_line:
href = next_line
elif '#EXT-X-MAP' in line:
# 提取最初的視頻地址
uri_pos = line.find("URI=\"")
if uri_pos > -1:
href = line[uri_pos + 5:-1]
return href
"""
_url:m3u8 的下載地址
save_path:待保存的視頻本地路徑
"""
def m3u8_convert(_url, save_path):
writer = open(save_path, 'wb')
if not writer:
print("%s 沒(méi)法成功打開(kāi)" % save_path)
return
# 先下載文件
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.100 Safari/537.36'}
rs = callAPI(_url).decode(encoding='utf-8')
list_content = rs.split('\n')
player_list = []
# key 以后處理加解密的操作
key = ''
# 進(jìn)行文件格式解析
for index, line in enumerate(list_content):
# 判斷視頻是否經(jīng)過(guò)AES-128加密
tmp_key = checkExtKey(line)
if len(tmp_key) > 0:
key = tmp_key
else:
next_line = ""
if index < len(list_content) - 1:
next_line = list_content[index + 1]
href = checkExtInf(line, next_line)
if len(href) > 0:
player_list.append(href)
# 下載每一個(gè)視頻分片,并保存到本地文件里
for i, _url in enumerate(player_list):
print('正在下載文件:%s' % _url)
_bytes = callAPI(_url)
print('已下載文件大凶锛取:%d' % len(_bytes))
writer.write(_bytes)
writer.close()
print('視頻生成完成')
大功告成后铸题,馬上找一個(gè)可以測(cè)試的m3u8吧。
save_data_file = '~/Desktop/player.mp4'
url = '【m3u8 URL地址】'
# 下載視頻
m3u8_convert(url, save_data_file)
下面是golang的代碼:
package main
import (
"fmt"
"io/ioutil"
"net/http"
"net/url"
"os"
"strings"
)
func callapi(durl string) []byte {
_, err := url.ParseRequestURI(durl)
if err != nil {
panic(durl + " 下載地址出錯(cuò)")
}
client := http.DefaultClient
//client.Timeout = 5000
resp, err := client.Get(durl)
if err != nil {
panic(err)
}
raw := resp.Body
fmt.Println("拿到Body :")
// fmt.Println(resp.Body)
defer raw.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
panic(err)
}
return body
}
// 檢測(cè)EXT-X-KEY琢感,提取key
func checkExtKey(line string) string {
if strings.Contains(line, "#EXT-X-KEY") {
method_pos := strings.Index(line, "METHOD")
comma_pos := strings.Index(line, ",")
method := strings.Split(line[method_pos:comma_pos], "=")[1]
fmt.Println("Decode Method:%s", method)
uri_pos := strings.Index(line, "URI")
quotation_mark_pos := strings.LastIndex(line, "\"")
key_url := strings.Split(line[uri_pos:quotation_mark_pos], "\"")[1]
key := string(callapi(key_url))
fmt.Println("Decode Key:%s", key)
return key
}
return ""
}
// 檢測(cè)EXTINF 和 EXT-X-MAP
func checkExtInf(line string, next_line string) string {
href := ""
if strings.Contains(line, "#EXTINF") {
if strings.Contains(next_line, "http") {
href = next_line
}
} else if strings.Contains(line, "#EXT-X-MAP") {
uri_pos := strings.Index(line, "URI=\"")
if uri_pos > -1 {
href = line[uri_pos+5 : len(line)-1]
}
}
return href
}
func m3u8_convert(_url string, save_file string) {
body := string(callapi(_url))
list_contents := strings.Split(body, "\n")
var player_list []string
//key := ""
for index, line := range list_contents {
tmp_key := checkExtKey(line)
if len(tmp_key) > 0 {
//key = tmp_key
} else {
next_line := ""
if index < len(list_contents)-1 {
next_line = list_contents[index+1]
}
href := checkExtInf(line, next_line)
if len(href) > 0 {
player_list = append(player_list, href)
}
}
}
saveDataWithPlayList(player_list, save_file)
fmt.Println("視頻生成完成")
}
func saveDataWithPlayList(play_list []string, save_file string) {
writer, err := os.Create(save_file)
if err != nil {
panic("生成視頻文件失敗")
}
defer writer.Close()
for _, item := range play_list {
fmt.Println("The item is :", item)
bytes := callapi(item)
writer.Write(bytes)
}
}
func main() {
play_list_url := "【m3u8 URL地址】"
save_file := "~/Desktop/player.mp4"
m3u8_convert(play_list_url, save_file)
}