簡(jiǎn)述
如果golang程序想監(jiān)聽(tīng)文件系統(tǒng)的變化, 那么最普遍的做法是使用fsnotify庫(kù). 起初是由Chris Howey(github account: howeyc)開發(fā)的庫(kù), 后來(lái)受到廣大開發(fā)者的喜愛(ài), 遂單獨(dú)建立倉(cāng)庫(kù). 至今為止, 其倉(cāng)庫(kù)已收到了5.9k star, 這足以證明其受歡迎程度. 想了解更多關(guān)于fsnotify的歷史, 可以查看官網(wǎng).
源碼分析
以下源碼分析基于的git commit版本為: 7f4cf4dd2b522a984eaca51d1ccee54101d3414a
1. 代碼統(tǒng)計(jì)
使用cloc工具進(jìn)行源碼統(tǒng)計(jì), cloc --by-file-by-lang --exclude-dir=.github --exclude-lang=YAML,Markdown [project-dir]
, 結(jié)果如下(省略yaml等標(biāo)記型語(yǔ)言相關(guān)統(tǒng)計(jì)):
File | blank | comment | code |
---|---|---|---|
./integration_test.go | 188 | 126 | 923 |
./inotify_test.go | 69 | 28 | 358 |
./inotify_poller_test.go | 29 | 10 | 190 |
./integration_darwin_test.go | 31 | 31 | 105 |
./fsnotify_test.go | 11 | 8 | 51 |
./windows.go | 42 | 31 | 488 |
./kqueue.go | 73 | 77 | 371 |
./inotify.go | 45 | 66 | 226 |
./inotify_poller.go | 16 | 33 | 138 |
./fsnotify.go | 10 | 12 | 46 |
./fen.go | 8 | 9 | 20 |
./open_mode_bsd.go | 4 | 4 | 3 |
./open_mode_darwin.go | 4 | 5 | 3 |
SUM: | 530 | 440 | 2922 |
fsnotify的go代碼總行數(shù)為2922行, 其中測(cè)試類代碼占1627(=923+358+190+105+51)行, 實(shí)際有效代碼只有1295行. 如此少的代碼還支持了windows/linux/mac平臺(tái), 由此可見(jiàn), 算是一個(gè)比較精簡(jiǎn)的庫(kù)了.
2. 使用示例
為了先對(duì)代碼有一個(gè)感性的認(rèn)識(shí), 我們以官方的示例作為開頭, 代碼如下:
package main
import (
"log"
"github.com/fsnotify/fsnotify"
)
func main() {
watcher, err := fsnotify.NewWatcher() // 初始化一個(gè)空的watcher
if err != nil {
log.Fatal(err)
}
defer watcher.Close() // 最后結(jié)束程序時(shí)關(guān)閉watcher
done := make(chan bool)
go func() { // 啟動(dòng)一個(gè)協(xié)程來(lái)單獨(dú)處理watcher發(fā)來(lái)的事件
for {
select {
case event, ok := <-watcher.Events: // 正常的事件的處理邏輯
if !ok {
return
}
log.Println("event:", event)
if event.Op&fsnotify.Write == fsnotify.Write {
log.Println("modified file:", event.Name)
}
case err, ok := <-watcher.Errors: // 發(fā)生錯(cuò)誤時(shí)的處理邏輯
if !ok {
return
}
log.Println("error:", err)
}
}
}()
err = watcher.Add("/tmp/foo") // 使watcher監(jiān)控/tmp/foo
if err != nil {
log.Fatal(err)
}
<-done // 使主協(xié)程不退出
}
用法非常的簡(jiǎn)單:
- 初始化一個(gè)空的fsnotify watcher
- 寫一個(gè)協(xié)程用來(lái)處理watcher推送的事件
- 告訴watcher需要監(jiān)聽(tīng)的文件或目錄
3. 構(gòu)建約束
fsnotify是一個(gè)跨平臺(tái)的庫(kù), 源碼中既包含了linux平臺(tái)的實(shí)現(xiàn)邏輯, 也包含了mac平臺(tái)和windows平臺(tái)的實(shí)現(xiàn)邏輯, 此時(shí)問(wèn)題就來(lái)了:
開發(fā)者在引用了此庫(kù)后, 如何才能保證編譯出來(lái)的可執(zhí)行文件, 只包含對(duì)應(yīng)的目標(biāo)平臺(tái)的實(shí)現(xiàn), 而不包含無(wú)關(guān)平臺(tái)的實(shí)現(xiàn)呢? 比如開發(fā)者的編譯目標(biāo)平臺(tái)是linux, 編譯時(shí)如何去除mac和windows等無(wú)關(guān)平臺(tái)的實(shí)現(xiàn)代碼呢?
好在golang為我們提供了構(gòu)建約束(Build Constraints), 大概使用方法如下:
// +build linux,386 darwin,!cgo
上面這條注釋不是普通的注釋, 而是構(gòu)建約束, 把它寫在代碼文件的頂部(package聲明的上面), 會(huì)被編譯器在編譯時(shí)按照目標(biāo)平臺(tái)來(lái)判斷是否編譯進(jìn)可執(zhí)行文件中. 上面這行構(gòu)建約束的意思是(linux AND 386) OR (darwin AND (NOT cgo)).
好了, 了解了構(gòu)建約束的用法, 我們看fsnotify的源碼時(shí)就可以根據(jù)自己所關(guān)心的平臺(tái)來(lái)詳細(xì)閱讀其實(shí)現(xiàn).
4. 詳細(xì)解讀--linux部分
用的最多的當(dāng)屬linux實(shí)現(xiàn)部分了, 其底層是基于linux的inotify機(jī)制, 相關(guān)邏輯就在庫(kù)中的inotify.go文件中.
a. 總體思路
按照前面使用示例的步驟, 第一步是watcher, err := fsnotify.NewWatcher()
, 那么我們就來(lái)看看這里new的watcher都包含什么, 代碼如下:
// NewWatcher establishes a new watcher with the underlying OS and begins waiting for events.
func NewWatcher() (*Watcher, error) {
// Create inotify fd
fd, errno := unix.InotifyInit1(unix.IN_CLOEXEC)
if fd == -1 {
return nil, errno
}
// Create epoll
poller, err := newFdPoller(fd)
if err != nil {
unix.Close(fd)
return nil, err
}
w := &Watcher{
fd: fd,
poller: poller,
watches: make(map[string]*watch),
paths: make(map[int]string),
Events: make(chan Event),
Errors: make(chan error),
done: make(chan struct{}),
doneResp: make(chan struct{}),
}
go w.readEvents()
return w, nil
}
上面代碼的總體思路:
-
建立一個(gè)inotify實(shí)例
inotify實(shí)例會(huì)以一個(gè)文件描述符(fd)的形式返回給調(diào)用者, 一旦有我們watch的文件發(fā)生變化, 就能從這個(gè)fd里讀到相應(yīng)的事件. 但是問(wèn)題是這個(gè)文件描述符需要我們自己去讀取, 所以我們就需要有某種輪訓(xùn)機(jī)制, 就引出下面的epoll注冊(cè)的用處.
-
使用epoll監(jiān)聽(tīng)實(shí)例上的事件
把這個(gè)fd注冊(cè)到epoll上, 在fd上有數(shù)據(jù)到達(dá)時(shí), epoll就能立刻收到并返回給我們.
初始化各種的狀態(tài)上下文, 如: watches用來(lái)存放watch對(duì)象, event用來(lái)推送事件
啟動(dòng)監(jiān)聽(tīng)協(xié)程
b. 事件監(jiān)聽(tīng)協(xié)程
上面的代碼最后啟動(dòng)了一個(gè)監(jiān)聽(tīng)協(xié)程go w.readEvents()
, 我們就來(lái)看看這里發(fā)生了什么, 代碼如下:
為使篇幅簡(jiǎn)練, 省略冗余代碼
func (w *Watcher) readEvents() {
var (...) // 各種變量
defer close(...) // 關(guān)閉上下文的各種資源
for {
if w.isClosed() { return }
ok, errno = w.poller.wait() // 程序阻塞在這行, 直到epoll監(jiān)聽(tīng)到相關(guān)事件為止
if ... { continue } // 各種error處理邏輯
n, errno = unix.Read(w.fd, buf[:]) // 走到這里的話就是有事件發(fā)生, 所以這里讀出事件到buffer里, 放到下面處理
if ... { continue } // 各種error處理邏輯
if n < unix.SizeofInotifyEvent { // 當(dāng)讀到的事件小于16字節(jié)(一個(gè)事件結(jié)構(gòu)體的單位大小), 異常處理邏輯
...
continue
}
var offset uint32
// 此時(shí)我們也不知道讀了幾個(gè)事件到buffer里
// 所以我們就用offset記錄下當(dāng)前所讀到的位置偏移量, 直到讀完為止
// 這個(gè)for循環(huán)結(jié)束條件是: offset累加到了某個(gè)值, 以至于剩余字節(jié)數(shù)不夠讀取出一整個(gè)inotify event結(jié)構(gòu)體
for offset <= uint32(n-unix.SizeofInotifyEvent) {
// 強(qiáng)制把地址值轉(zhuǎn)換成inotify結(jié)構(gòu)體
raw := (*unix.InotifyEvent)(unsafe.Pointer(&buf[offset]))
mask := uint32(raw.Mask) // 所發(fā)生的事件以掩碼形式表示
nameLen := uint32(raw.Len) // 當(dāng)監(jiān)聽(tīng)的是個(gè)目錄時(shí), 目錄中發(fā)生事件的文件名會(huì)包含在結(jié)構(gòu)體中, 這里的len就是文件名的長(zhǎng)度
if mask&unix.IN_Q_OVERFLOW != 0 { ... } // mask格式錯(cuò)誤, 向Errors chan發(fā)送事件
w.mu.Lock() // 由于可能會(huì)對(duì)上下文進(jìn)行刪除操作, 所以鎖住
// Wd是我們所watch的, 并且此次發(fā)生事件了的文件描述符
// 我們可以從構(gòu)建好的上下文中取出這個(gè)文件描述符所對(duì)應(yīng)的文件名
name, ok := w.paths[int(raw.Wd)]
// 如果發(fā)生刪除事件, 也一并在上下文中刪掉這個(gè)文件名
if ok && mask&unix.IN_DELETE_SELF == unix.IN_DELETE_SELF {
delete(w.paths, int(raw.Wd))
delete(w.watches, name)
}
w.mu.Unlock() // 解鎖
if nameLen > 0 { // 當(dāng)我們watch是一個(gè)目錄的時(shí)候, 其下面的文件發(fā)生事件時(shí), 就會(huì)導(dǎo)致這個(gè)nameLen大于0
// 此時(shí)讀取文件名字(文件名就在inotify event結(jié)構(gòu)體的后面), 強(qiáng)制把地址值轉(zhuǎn)換成長(zhǎng)度4096的byte數(shù)組
bytes := (*[unix.PathMax]byte)(unsafe.Pointer(&buf[offset+unix.SizeofInotifyEvent]))[:nameLen:nameLen]
// 拼接路徑(文件名會(huì)以\000為結(jié)尾表示, 所以要去掉)
name += "/" + strings.TrimRight(string(bytes[0:nameLen]), "\000")
}
event := newEvent(name, mask) // 生成一個(gè)event
if !event.ignoreLinux(mask) { // 如果這個(gè)事件沒(méi)有被忽略, 那么發(fā)送到Events chan
select {
case w.Events <- event:
case <-w.done:
return
}
}
// 移動(dòng)offset偏移量到下個(gè)inotify event結(jié)構(gòu)體
offset += unix.SizeofInotifyEvent + nameLen
}
}
}
c. 添加watch路徑
我們通過(guò)err = watcher.Add("/tmp/foo")
來(lái)讓watcher去watch路徑/tmp/foo, Add方法就是在inotify里注冊(cè)路徑, 代碼如下:
// Add starts watching the named file or directory (non-recursively).
func (w *Watcher) Add(name string) error {
name = filepath.Clean(name) // 獲取標(biāo)準(zhǔn)路徑, 如/tmp//////too經(jīng)過(guò)Clean后就成了/tmp/too
if w.isClosed() {
return errors.New("inotify instance already closed")
}
const agnosticEvents = unix.IN_MOVED_TO | unix.IN_MOVED_FROM |
unix.IN_CREATE | unix.IN_ATTRIB | unix.IN_MODIFY |
unix.IN_MOVE_SELF | unix.IN_DELETE | unix.IN_DELETE_SELF
var flags uint32 = agnosticEvents
w.mu.Lock()
defer w.mu.Unlock()
watchEntry := w.watches[name] // 取出上下文里的watch路徑(如果存在的話)
if watchEntry != nil {
flags |= watchEntry.flags | unix.IN_MASK_ADD
}
wd, errno := unix.InotifyAddWatch(w.fd, name, flags) // 添加watch路徑
if wd == -1 {
return errno
}
if watchEntry == nil { // 如果上下文里不存在此路徑, 表明這是一個(gè)新的watch, 添加到上下文
w.watches[name] = &watch{wd: uint32(wd), flags: flags}
w.paths[wd] = name
} else { // 如果在上下文中存在, 則更新上下文
watchEntry.wd = uint32(wd)
watchEntry.flags = flags
}
return nil
}
d. 刪除watch路徑
// Remove stops watching the named file or directory (non-recursively).
func (w *Watcher) Remove(name string) error {
name = filepath.Clean(name) // 獲取標(biāo)準(zhǔn)路徑
// 涉及到多協(xié)程可能同時(shí)對(duì)同一個(gè)watch項(xiàng)寫, 所以鎖住
w.mu.Lock()
defer w.mu.Unlock() // 最后解鎖
watch, ok := w.watches[name]
if ... { ... } // 錯(cuò)誤處理
// 刪除上下文里的相應(yīng)watch項(xiàng)
delete(w.paths, int(watch.wd))
delete(w.watches, name)
// 刪除inotify中watch的fd
success, errno := unix.InotifyRmWatch(w.fd, watch.wd)
if ... { ... } // 錯(cuò)誤處理
return nil
}
e. poller部分(基于epoll)
我們上面看到在func NewWatcher() (*Watcher, error)
函數(shù)中調(diào)用了poller, err := newFdPoller(fd)
, 這是將inotify的fd注冊(cè)在epoll上, 以實(shí)現(xiàn)高效的fs監(jiān)聽(tīng), 代碼如下:
為使篇幅簡(jiǎn)練, 省略冗余代碼
func newFdPoller(fd int) (*fdPoller, error) {
var errno error
poller := emptyPoller(fd)
defer func() {
if errno != nil {
poller.close()
}
}()
poller.fd = fd
// 要使用epoll的話, 需要使用EpollCreate函數(shù)為其單獨(dú)創(chuàng)建一個(gè)fd
poller.epfd, errno = unix.EpollCreate1(unix.EPOLL_CLOEXEC)
if ... { return ... } // error處理
// 為實(shí)現(xiàn)優(yōu)雅退出, 需要?jiǎng)?chuàng)建一個(gè)管道, pipe[0]用來(lái)讀, pipe[1]用來(lái)寫
// 在介紹watcher的Close函數(shù)時(shí)會(huì)分析這部分的邏輯
errno = unix.Pipe2(poller.pipe[:], unix.O_NONBLOCK|unix.O_CLOEXEC)
if ... { return ... } // error處理
// 注冊(cè)inotify的fd到epoll
event := unix.EpollEvent{
Fd: int32(poller.fd),
Events: unix.EPOLLIN,
}
errno = unix.EpollCtl(poller.epfd, unix.EPOLL_CTL_ADD, poller.fd, &event)
if ... { return ... } // error處理
// 注冊(cè)管道的fd到epoll
event = unix.EpollEvent{
Fd: int32(poller.pipe[0]),
Events: unix.EPOLLIN,
}
errno = unix.EpollCtl(poller.epfd, unix.EPOLL_CTL_ADD, poller.pipe[0], &event)
if ... { return ... } // error處理
return poller, nil
}
函數(shù)func newFdPoller(fd int) (*fdPoller, error)
在epoll的fd上注冊(cè)了兩個(gè)文件, 一個(gè)是inotify的, 另一個(gè)是其用來(lái)實(shí)現(xiàn)優(yōu)雅退出的pipe[0].
我們?cè)谏厦娴?事件監(jiān)聽(tīng)協(xié)程 func (w Watcher) readEvents()小節(jié)中提到的ok, errno = w.poller.wait()
語(yǔ)句阻塞直到收到事件才會(huì)返回, 來(lái)看看具體poller(也就是上面的epoll)對(duì)事件的處理邏輯, 代碼如下:
為使篇幅簡(jiǎn)練, 省略冗余代碼
func (poller *fdPoller) wait() (bool, error) {
// 總共監(jiān)聽(tīng)兩個(gè)fd: 1.inotify 2.優(yōu)雅退出所需的pipe[0]
// 每個(gè)fd有三種可能的事件, 所以最多可以觸發(fā)6個(gè)事件
// 準(zhǔn)備一個(gè)大于6的slice
events := make([]unix.EpollEvent, 7)
for {
// 阻塞wait在epoll的fd上, 參數(shù)-1表示不會(huì)超時(shí)
// 一旦有事件產(chǎn)生, 就會(huì)發(fā)到events里
n, errno := unix.EpollWait(poller.epfd, events, -1)
if ... { ... } // 各種異常處理
// 以下就是收到正常事件的處理
ready := events[:n]
epollhup := false
epollerr := false
epollin := false
for _, event := range ready {
if event.Fd == int32(poller.fd) {
if event.Events&unix.EPOLLHUP != 0 {
// This should not happen, but if it does, treat it as a wakeup.
epollhup = true
}
if event.Events&unix.EPOLLERR != 0 {
// If an error is waiting on the file descriptor, we should pretend
// something is ready to read, and let unix.Read pick up the error.
epollerr = true
}
if event.Events&unix.EPOLLIN != 0 {
// inotify有事件
epollin = true
}
}
if event.Fd == int32(poller.pipe[0]) {
if event.Events&unix.EPOLLHUP != 0 {
// Write pipe descriptor was closed, by us. This means we're closing down the
// watcher, and we should wake up.
}
if event.Events&unix.EPOLLERR != 0 {
// If an error is waiting on the pipe file descriptor.
// This is an absolute mystery, and should never ever happen.
return false, errors.New("Error on the pipe descriptor.")
}
if event.Events&unix.EPOLLIN != 0 {
// 收到程序發(fā)來(lái)的優(yōu)雅退出事件, 將調(diào)用clearWake以使管道排空
err := poller.clearWake()
if err != nil {
return false, err
}
}
}
}
if epollhup || epollerr || epollin {
return true, nil
}
return false, nil
}
}
clearWake函數(shù), 代碼如下
func (poller *fdPoller) clearWake() error {
// You have to be woken up a LOT in order to get to 100!
buf := make([]byte, 100)
n, errno := unix.Read(poller.pipe[0], buf) // 讀取pipe[0]中的退出信號(hào)
if ... { ... } // 錯(cuò)誤處理
return nil
}
那么pipe[0]中的信號(hào)是怎么來(lái)的呢? 也就是說(shuō)必須有一個(gè)地方往pipe[1]中寫數(shù)據(jù). 其實(shí), 我們示例代碼中采用defer方式調(diào)用了watcher.Close()
函數(shù), 而其最重要的一步就是調(diào)用w.poller.wake()
函數(shù), 代碼如下:
為使篇幅簡(jiǎn)練, 省略冗余代碼
// Close the write end of the poller.
func (poller *fdPoller) wake() error {
buf := make([]byte, 1)
// 這里在pipe[1]寫入了一個(gè)字符當(dāng)做退出信號(hào)
n, errno := unix.Write(poller.pipe[1], buf)
if ... { ... } // 錯(cuò)誤處理
return nil
}
題外話: 關(guān)于這個(gè)優(yōu)雅退出的早期設(shè)計(jì)其實(shí)不是這樣的, 但是思路差不多. 有興趣可以去看看fsnotify的早期提交
至此, 關(guān)于fsnotify對(duì)linux的實(shí)現(xiàn)就分析完了.