VictoriaMetrics 官方文檔對vmbackup和vmrestore的介紹比較少胜臊,近期整理了相關(guān)的文檔,現(xiàn)在將源碼閱讀的內(nèi)容分享給大家丹锹。
vmbackup 簡介
vmbackup 從即時快照創(chuàng)建 VictoriaMetrics 的數(shù)據(jù)備份漱抓。
vmbackup支持增量備份和完整備份,如果目標(biāo)路徑(-dst參數(shù))包含先前備份的數(shù)據(jù)伪货,則會自動創(chuàng)建增量備份,可以通過-origin指向同一遠(yuǎn)端存儲上的已有的備份來加速備份钾怔,在這種情況下碱呼,vmbackup會執(zhí)行服務(wù)器端復(fù)制,來節(jié)省數(shù)據(jù)傳輸?shù)某杀竞蜁r間宗侦。
vmbackup備份進(jìn)程可以隨時中斷愚臀。當(dāng)使用相同參數(shù)重新啟動 vmbackup 時,它會自動從中斷點恢復(fù)矾利。
支持的存儲類型
-
GCS. Example:
gs://<bucket>/<path/to/backup>
-
S3. Example:
s3://<bucket>/<path/to/backup>
-
Azure Blob Storage. Example:
azblob://<container>/<path/to/backup>
- 任何兼容S3的存儲姑裂,如: MinIO, Ceph or Swift. See these docs for details.
- 本地文件系統(tǒng). Example: fs://</absolute/path/to/backup>.
可以使用以下命令進(jìn)行定期備份:
./vmbackup -storageDataPath=</path/to/victoria-metrics-data> -snapshot.createURL=http://localhost:8428/snapshot/create -dst=gs://<bucket>/<path/to/new/backup>
vmbackup源碼解析
入口文件:VictoriaMetrics/app/vmbackup/main.go
// ...
func main() {
// ...
// 完成flag、usage男旗、環(huán)境變量舶斧、日志等的初始化
// ...
deleteSnapshot := func() {}
// 判斷是否指定了snapshotCreateURL參數(shù)
if len(*snapshotCreateURL) > 0 {
// 調(diào)用vmstorage的api,創(chuàng)建文件快照
createURL, err := url.Parse(*snapshotCreateURL)
if err != nil {
logger.Fatalf("cannot parse snapshotCreateURL: %s", err)
}
// 不要同時指定snapshotCreateURL 和 snapshotName參數(shù)
if len(*snapshotName) > 0 {
logger.Fatalf("-snapshotName shouldn't be set if -snapshot.createURL is set, since snapshots are created automatically in this case")
}
logger.Infof("Snapshot create url %s", createURL.Redacted())
if len(*snapshotDeleteURL) <= 0 {
err := flag.Set("snapshot.deleteURL", strings.Replace(*snapshotCreateURL, "/create", "/delete", 1))
if err != nil {
logger.Fatalf("Failed to set snapshot.deleteURL flag: %v", err)
}
}
// 構(gòu)造要刪除快照的url
deleteURL, err := url.Parse(*snapshotDeleteURL)
if err != nil {
logger.Fatalf("cannot parse snapshotDeleteURL: %s", err)
}
logger.Infof("Snapshot delete url %s", deleteURL.Redacted())
// 創(chuàng)建快照
name, err := snapshot.Create(createURL.String())
if err != nil {
logger.Fatalf("cannot create snapshot: %s", err)
}
// 將創(chuàng)建的快照名稱賦值給flag的snapshotName變量察皇,供后續(xù)使用
err = flag.Set("snapshotName", name)
if err != nil {
logger.Fatalf("cannot set snapshotName flag: %v", err)
}
deleteSnapshot = func() {
err := snapshot.Delete(deleteURL.String(), name)
if err != nil {
logger.Fatalf("cannot delete snapshot: %s", err)
}
}
}
// 開啟vmbackup的http端口茴厉,用于指標(biāo)暴露,供prometheus采集
listenAddrs := []string{*httpListenAddr}
go httpserver.Serve(listenAddrs, nil, nil)
pushmetrics.Init()
err := makeBackup() // makeBackup是備份的核心邏輯
deleteSnapshot() // 調(diào)用vmbackup api刪除快照
// ... 后續(xù)收尾工作
}
func makeBackup() error {
// 創(chuàng)建目標(biāo)備份對象什荣,一般是遠(yuǎn)端存儲的對象存儲實例矾缓,從-dst參數(shù)中解析出存儲類型(gcs、s3稻爬、azblob嗜闻、fs等等),解析出對象存儲桶和目錄
// 使用-customS3Endpoint參數(shù)連接遠(yuǎn)端對象存儲
// 使用-credsFilePath文件路徑做校驗(key因篇、secrets)
dstFS, err := newDstFS()
if err != nil {
return err
}
if *snapshotName == "" {
// 做遠(yuǎn)端存儲的服務(wù)端的復(fù)制 from -origin to -dst
originFS, err := newRemoteOriginFS()
if err != nil {
return err
}
a := &actions.RemoteBackupCopy{
Concurrency: *concurrency,
Src: originFS,
Dst: dstFS,
}
if err := a.Run(); err != nil {
return err
}
originFS.MustStop()
} else {
// 創(chuàng)建src實例
// 從快照的數(shù)據(jù)目錄創(chuàng)建實例
srcFS, err := newSrcFS()
if err != nil {
return err
}
// 創(chuàng)建origin實例泞辐,如果-origin為空笔横,創(chuàng)建的實例為空,主要是用于加速復(fù)制
originFS, err := newOriginFS()
if err != nil {
return err
}
a := &actions.Backup{
Concurrency: *concurrency,
Src: srcFS,
Dst: dstFS,
Origin: originFS,
}
// 備份的核心邏輯在run方法中
if err := a.Run(); err != nil {
return err
}
srcFS.MustStop()
originFS.MustStop()
}
dstFS.MustStop()
return nil
}
// ...
下面對Run()方法進(jìn)行解析
VictoriaMetrics/lib/backup//actions/backup.go
// Run runs b with the provided settings.
func (b *Backup) Run() error {
concurrency := b.Concurrency
src := b.Src
dst := b.Dst
origin := b.Origin
if origin != nil && origin.String() == dst.String() {
origin = nil
}
if origin == nil {
origin = &fsnil.FS{}
}
// 刪除遠(yuǎn)端存儲的 backup_complete.ignore 文件咐吼,該文件是在上一次備份完成后創(chuàng)建的吹缔,表示備份成功
if err := dst.DeleteFile(backupnames.BackupCompleteFilename); err != nil {
return fmt.Errorf("cannot delete `backup complete` file at %s: %w", dst, err)
}
// 運行備份程序
if err := runBackup(src, dst, origin, concurrency); err != nil {
return err
}
// 創(chuàng)建備份的元數(shù)據(jù),里面保存了備份的創(chuàng)建時間和完成備份的時間
if err := storeMetadata(src, dst); err != nil {
return fmt.Errorf("cannot store backup metadata: %w", err)
}
if err := dst.CreateFile(backupnames.BackupCompleteFilename, nil); err != nil {
return fmt.Errorf("cannot create `backup complete` file at %s: %w", dst, err)
}
return nil
}
// ...
func runBackup(src *fslocal.FS, dst common.RemoteFS, origin common.OriginFS, concurrency int) error {
startTime := time.Now()
logger.Infof("starting backup from %s to %s using origin %s", src, dst, origin)
// 從src中讀取所有快照的文件
srcParts, err := src.ListParts()
if err != nil {
return fmt.Errorf("cannot list src parts: %w", err)
}
logger.Infof("obtained %d parts from src %s", len(srcParts), src)
// 從dst中讀取存在的所有文件
dstParts, err := dst.ListParts()
if err != nil {
return fmt.Errorf("cannot list dst parts: %w", err)
}
logger.Infof("obtained %d parts from dst %s", len(dstParts), dst)
// 從origin中讀取所有存在的文件
originParts, err := origin.ListParts()
if err != nil {
return fmt.Errorf("cannot list origin parts: %w", err)
}
logger.Infof("obtained %d parts from origin %s", len(originParts), origin)
// 計算src所有parts的數(shù)據(jù)容量
backupSize := getPartsSize(srcParts)
// 判斷在src中不存在锯茄,在dst中存在的文件
partsToDelete := common.PartsDifference(dstParts, srcParts)
deleteSize := getPartsSize(partsToDelete)
// 刪除在src中不存在厢塘,在dst中存在的文件,通常是一些小文件肌幽,vmstore將小文件合并成了大文件然后刪除了小文件
// 在vmstore本地已經(jīng)不存在了晚碾,所有在dst中也進(jìn)行刪除
if err := deleteDstParts(dst, partsToDelete, concurrency); err != nil {
return fmt.Errorf("cannot delete unneeded parts at dst: %w", err)
}
// 統(tǒng)計在src中存在,在dst中不存在的文件喂急,就是新增并需要備份的文件
partsToCopy := common.PartsDifference(srcParts, dstParts)
// 計算需要備份的文件和origin中文件的交集格嘁,origin通常也是指向遠(yuǎn)端的對象存儲,用于加速備份
originPartsToCopy := common.PartsIntersect(originParts, partsToCopy)
copySize := getPartsSize(originPartsToCopy)
// 將需要備份的文件和origin中文件的交集廊移,進(jìn)行遠(yuǎn)端復(fù)制
if err := copySrcParts(origin, dst, originPartsToCopy, concurrency); err != nil {
return fmt.Errorf("cannot server-side copy origin parts to dst: %w", err)
}
// 計算需要備份的文件且在origin中不存在的文件糕簿,這是最終需要備份的文件
srcCopyParts := common.PartsDifference(partsToCopy, originParts)
uploadSize := getPartsSize(srcCopyParts)
if len(srcCopyParts) > 0 {
logger.Infof("uploading %d parts from %s to %s", len(srcCopyParts), src, dst)
var bytesUploaded atomic.Uint64
// 并發(fā)執(zhí)行 func(p common.Part) error
err = runParallel(concurrency, srcCopyParts, func(p common.Part) error {
logger.Infof("uploading %s from %s to %s", &p, src, dst)
rc, err := src.NewReadCloser(p)
if err != nil {
return fmt.Errorf("cannot create reader for %s from %s: %w", &p, src, err)
}
sr := &statReader{
r: rc,
bytesRead: &bytesUploaded,
}
// 上傳文件
if err := dst.UploadPart(p, sr); err != nil {
return fmt.Errorf("cannot upload %s to %s: %w", &p, dst, err)
}
if err = rc.Close(); err != nil {
return fmt.Errorf("cannot close reader for %s from %s: %w", &p, src, err)
}
return nil
}, func(elapsed time.Duration) {
n := bytesUploaded.Load()
prc := 100 * float64(n) / float64(uploadSize)
logger.Infof("uploaded %d out of %d bytes (%.2f%%) from %s to %s in %s", n, uploadSize, prc, src, dst, elapsed)
})
if err != nil {
return err
}
}
logger.Infof("backup from %s to %s with origin %s is complete; backed up %d bytes in %.3f seconds; server-side deleted %d bytes; "+
"server-side copied %d bytes; uploaded %d bytes",
src, dst, origin, backupSize, time.Since(startTime).Seconds(), deleteSize, copySize, uploadSize)
return nil
}
下面是對part的操作方法解析
VictoriaMetrics/lib/backup/common/part.go
// 計算在b中存在的part,在a中不存在的part
func PartsDifference(a, b []Part) []Part {
m := make(map[string]bool, len(b))
for _, p := range b {
k := p.key()
m[k] = true
}
var d []Part
for _, p := range a {
k := p.key()
if !m[k] {
d = append(d, p)
}
}
return d
}
// 計算a狡孔、b的交集
func PartsIntersect(a, b []Part) []Part {
m := make(map[string]bool, len(a))
for _, p := range a {
k := p.key()
m[k] = true
}
var d []Part
for _, p := range b {
k := p.key()
if m[k] {
d = append(d, p)
}
}
return d
}
vmbackup的備份邏輯非常清晰簡單懂诗,每次備份在vmstorage中創(chuàng)建快照,然后從src向dst中備份苗膝,如果存在origin殃恒,通過origin做服務(wù)端復(fù)制,來加速備份辱揭。
vmrestore 介紹
在執(zhí)行vmrestore時离唐,必須停止VictoriaMetrics
Run the following command to restore backup from the given -src into the given -storageDataPath:
./vmrestore -src=<storageType>://<path/to/backup> -storageDataPath=<local/path/to/restore>
支持的遠(yuǎn)端存儲引擎與vmbackup一致
vmrestore源碼解析
入口文件:VictoriaMetrics/app/vmrestore/main.go
func main() {
// Write flags and help message to stdout, since it is easier to grep or pipe.
flag.CommandLine.SetOutput(os.Stdout)
flag.Usage = usage
envflag.Parse()
buildinfo.Init()
logger.Init()
// 啟動vmrestore http端口,主要用于指標(biāo)暴露问窃,用于prometheus定期抓取
listenAddrs := []string{*httpListenAddr}
go httpserver.Serve(listenAddrs, nil, nil)
// 從src創(chuàng)建實例(通常是遠(yuǎn)端存儲)
srcFS, err := newSrcFS()
if err != nil {
logger.Fatalf("%s", err)
}
// 從dst創(chuàng)建實例(從storageDataPath目錄)
dstFS, err := newDstFS()
if err != nil {
logger.Fatalf("%s", err)
}
a := &actions.Restore{
Concurrency: *concurrency,
Src: srcFS,
Dst: dstFS,
SkipBackupCompleteCheck: *skipBackupCompleteCheck,
}
// metric 初始化
pushmetrics.Init()
// 核心的恢復(fù)數(shù)據(jù)邏輯在run方法中
if err := a.Run(); err != nil {
logger.Fatalf("cannot restore from backup: %s", err)
}
pushmetrics.Stop()
srcFS.MustStop()
dstFS.MustStop()
startTime := time.Now()
logger.Infof("gracefully shutting down http server for metrics at %q", listenAddrs)
// 停止server
if err := httpserver.Stop(listenAddrs); err != nil {
logger.Fatalf("cannot stop http server for metrics: %s", err)
}
logger.Infof("successfully shut down http server for metrics in %.3f seconds", time.Since(startTime).Seconds())
}
下面對restore的Run()方法進(jìn)行解析
VictoriaMetrics/lib/backup/actions/restore.go
func (r *Restore) Run() error {
startTime := time.Now()
// Make sure VictoriaMetrics doesn't run during the restore process.
// 保證storageDataPath目錄存儲
fs.MustMkdirIfNotExist(r.Dst.Dir)
// 創(chuàng)建flock.lock文件加鎖
flockF := fs.MustCreateFlockFile(r.Dst.Dir)
defer fs.MustClose(flockF)
// 創(chuàng)建restore-in-progress 進(jìn)度文件
if err := createRestoreLock(r.Dst.Dir); err != nil {
return err
}
concurrency := r.Concurrency
src := r.Src
dst := r.Dst
if !r.SkipBackupCompleteCheck {
ok, err := src.HasFile(backupnames.BackupCompleteFilename)
if err != nil {
return err
}
// 檢查src中是否有backup_complete.ignore文件侯繁,有才代表備份成功
if !ok {
return fmt.Errorf("cannot find %s file in %s; this means either incomplete backup or old backup; "+
"pass -skipBackupCompleteCheck command-line flag if you still need restoring from this backup", backupnames.BackupCompleteFilename, src)
}
}
logger.Infof("starting restore from %s to %s", src, dst)
logger.Infof("obtaining list of parts at %s", src)
// 從src讀取文件列表
srcParts, err := src.ListParts()
if err != nil {
return fmt.Errorf("cannot list src parts: %w", err)
}
logger.Infof("obtaining list of parts at %s", dst)
// 從storageDataPath讀取文件列表
dstParts, err := dst.ListParts()
if err != nil {
return fmt.Errorf("cannot list dst parts: %w", err)
}
// 計算備份的容量
backupSize := getPartsSize(srcParts)
// Validate srcParts. They must cover the whole files.
// 對src part進(jìn)行排序,將相同文件名的不同offset的文件排列在一起
common.SortParts(srcParts)
offset := uint64(0)
var pOld common.Part
var path string
// 對具有相同文件名的不同part的offset進(jìn)行校驗
for _, p := range srcParts {
if p.Path != path {
if offset != pOld.FileSize {
return fmt.Errorf("invalid size for %q; got %d; want %d", path, offset, pOld.FileSize)
}
pOld = p
path = p.Path
offset = 0
}
if p.Offset < offset {
return fmt.Errorf("there is an overlap in %d bytes between %s and %s", offset-p.Offset, &pOld, &p)
}
if p.Offset > offset {
if offset == 0 {
return fmt.Errorf("there is a gap in %d bytes from file start to %s", p.Offset, &p)
}
return fmt.Errorf("there is a gap in %d bytes between %s and %s", p.Offset-offset, &pOld, &p)
}
if p.Size != p.ActualSize {
return fmt.Errorf("invalid size for %s; got %d; want %d", &p, p.ActualSize, p.Size)
}
offset += p.Size
}
// 取在src中存在泡躯,但在dst(本地目錄)中不存在的文件,進(jìn)行刪除
partsToDelete := common.PartsDifference(dstParts, srcParts)
deleteSize := uint64(0)
if len(partsToDelete) > 0 {
// Remove only files with the missing part at offset 0.
// Assume other files are partially downloaded during the previous Restore.Run call,
// so only the last part in them may be incomplete.
// The last part for partially downloaded files will be re-downloaded later.
// This addresses https://github.com/VictoriaMetrics/VictoriaMetrics/issues/487 .
pathsToDelete := make(map[string]bool)
for _, p := range partsToDelete {
if p.Offset == 0 {
pathsToDelete[p.Path] = true
}
}
logger.Infof("deleting %d files from %s", len(pathsToDelete), dst)
// 刪除文件
for path := range pathsToDelete {
logger.Infof("deleting %s from %s", path, dst)
size, err := dst.DeletePath(path)
if err != nil {
return fmt.Errorf("cannot delete %s from %s: %w", path, dst, err)
}
deleteSize += size
}
if err := dst.RemoveEmptyDirs(); err != nil {
return fmt.Errorf("cannot remove empty directories at %s: %w", dst, err)
}
}
// Re-read dstParts, since additional parts may be removed on the previous step.
// 重新讀取本地的dst 文件目錄
dstParts, err = dst.ListParts()
if err != nil {
return fmt.Errorf("cannot list dst parts after the deletion: %w", err)
}
// 計算在src(遠(yuǎn)端)中存在丽焊,在dst(本地)不存在的文件列表较剃,即為需要下載的文件列表
partsToCopy := common.PartsDifference(srcParts, dstParts)
downloadSize := getPartsSize(partsToCopy)
// 按照文件路徑進(jìn)行整理,相同文件路徑技健、不同offset的文件放到一起
if len(partsToCopy) > 0 {
perPath := make(map[string][]common.Part)
for _, p := range partsToCopy {
parts := perPath[p.Path]
parts = append(parts, p)
perPath[p.Path] = parts
}
logger.Infof("downloading %d parts from %s to %s", len(partsToCopy), src, dst)
var bytesDownloaded atomic.Uint64
// 以文件路徑為粒度写穴,并發(fā)進(jìn)行下載
// runParallelPerPath 使用waitgroup進(jìn)行并發(fā)執(zhí)行,邏輯比較簡單
err = runParallelPerPath(concurrency, perPath, func(parts []common.Part) error {
// Sort partsToCopy in order to properly grow file size during downloading
// and to properly resume downloading of incomplete files on the next Restore.Run call.
// 對parts進(jìn)行排序雌贱,并在下一次 Restore.Run 調(diào)用時正確恢復(fù)不完整文件的下載(不確定是如何恢復(fù)不完整文件的下載的)
common.SortParts(parts)
for _, p := range parts {
logger.Infof("downloading %s from %s to %s", &p, src, dst)
// 使用p的path啊送,創(chuàng)建writer
wc, err := dst.NewWriteCloser(p)
if err != nil {
return fmt.Errorf("cannot create writer for %q to %s: %w", &p, dst, err)
}
sw := &statWriter{
w: wc,
bytesWritten: &bytesDownloaded,
}
// 下載part
if err := src.DownloadPart(p, sw); err != nil {
return fmt.Errorf("cannot download %s to %s: %w", &p, dst, err)
}
if err := wc.Close(); err != nil {
return fmt.Errorf("cannot close reader from %s from %s: %w", &p, src, err)
}
}
return nil
}, func(elapsed time.Duration) {
n := bytesDownloaded.Load()
prc := 100 * float64(n) / float64(downloadSize)
logger.Infof("downloaded %d out of %d bytes (%.2f%%) from %s to %s in %s", n, downloadSize, prc, src, dst, elapsed)
})
if err != nil {
return err
}
}
logger.Infof("restored %d bytes from backup in %.3f seconds; deleted %d bytes; downloaded %d bytes",
backupSize, time.Since(startTime).Seconds(), deleteSize, downloadSize)
// 刪除restore-in-progress文件
return removeRestoreLock(r.Dst.Dir)
}
以上就是vmbackup/vmrestore的源碼執(zhí)行流程偿短,比較簡單。
總結(jié)
- vmbackup需要與vmstore在同一個實例上部署(能共享storageDataPath目錄)馋没,并定期執(zhí)行vmbackup進(jìn)行備份昔逗,在k8s環(huán)境中,如果通過sidecar集成齐莲,還需要做一些工作熔萧,比如定時執(zhí)行郊艘。
- 官方提供了vmbackupmanager對vmbackup的支持,并且在helm chart部署中提供了直接支持
- vmrestore 同樣需要與vmstore在同一個實例上部署笔链,并且vmrestore執(zhí)行過程中,必須停止vmstorage寫數(shù)據(jù)腮猖。