FastDFS源碼解析

概念:

FastDFS是余慶(前阿里巴巴架構(gòu)師捻激,現(xiàn)易到用車(chē)架構(gòu)師)開(kāi)發(fā)的一個(gè)開(kāi)源的輕量級(jí)分布式文件系統(tǒng),對(duì)于小文件的存儲(chǔ)性能特別高王污,適合以文件為載體的在線服務(wù)重抖。應(yīng)用場(chǎng)景不再贅述,網(wǎng)上相關(guān)資料不少踪古。然而在很多家大公司明里暗里都使用了FastDFS以后含长,居然對(duì)他代碼的分析文章這么少。本人才疏學(xué)淺伏穆,且嘗試著分析一翻拘泞,如果分析的不好,誠(chéng)心求教枕扫。

開(kāi)始:

源碼在sourceforge,github上都能找到田弥。這里我使用的FastDFS v5.01版本,值得注意的是铡原,這個(gè)版本干掉了該死了libevent偷厦,直接使用epoll,kqueue燕刻,可讀性提高了不少只泼,而且0依賴了,贊一個(gè)卵洗。
源碼目錄包括了common,test,client,stroage,tracker
按文件夾順序和首字母進(jìn)行分析:

common文件夾:

common_define.h:

跳過(guò)首字母a的文件先介紹這個(gè)请唱,是因?yàn)檫@個(gè)文件定義了整個(gè)系統(tǒng)的一些環(huán)境變量,包括bool類型过蹂,全局變量等等十绑。下文中你沒(méi)見(jiàn)過(guò),我也沒(méi)提的變量或者宏都取自這里酷勺。
avl_tree.c/avl_tree.h:

對(duì)于avl樹(shù)的定義和實(shí)現(xiàn)本橙,這是FastDFS實(shí)現(xiàn)trunk功能和單盤(pán)恢復(fù)功能所依賴的數(shù)據(jù)結(jié)構(gòu)

typedef struct tagAVLTreeNode {
        void *data;
        struct tagAVLTreeNode *left;
        struct tagAVLTreeNode *right;
        byte balance;
} AVLTreeNode;

typedef struct tagAVLTreeInfo {
        AVLTreeNode *root;
        FreeDataFunc free_data_func;
        CompareFunc compare_func;
} AVLTreeInfo;

經(jīng)典的數(shù)據(jù)結(jié)構(gòu),沒(méi)有修改的原汁原味脆诉。

base64.c/base64.h:

FastDFS得到文件包含的信息后甚亭,用base64算法對(duì)其編碼生成文件ID。

chain.c/chain.hi:

對(duì)于鏈表的實(shí)現(xiàn)击胜。

typedef struct tagChainNode
{
        void *data;
        struct tagChainNode *next;
} ChainNode;

typedef struct
{
        int type;
        ChainNode *head;
        ChainNode *tail;
        FreeDataFunc freeDataFunc;
        CompareFunc compareFunc;
} ChainList;

type變量是定義鏈表的使用方式的:

CHAIN_TYPE_INSERT: insert new node before head

CHAIN_TYPE_APPEND: insert new node after tail

CHAIN_TYPE_SORTED: sorted chain

在fast_mblock中#include了它亏狰,但是并沒(méi)有使用,直接注釋了這個(gè)include也成功編譯無(wú)報(bào)錯(cuò)偶摔,可能后續(xù)會(huì)使用吧暇唾?。mark。

connect_pool.c/connect_pool.h:

連接池的定義與實(shí)現(xiàn)

typedef struct
{
        int sock;
        int port;
        char ip_addr[IP_ADDRESS_SIZE];
} ConnectionInfo;

struct tagConnectionManager;

typedef struct tagConnectionNode {
        ConnectionInfo *conn;
        struct tagConnectionManager *manager;
        struct tagConnectionNode *next;
        time_t atime;  //last access time
} ConnectionNode;

typedef struct tagConnectionManager {
        ConnectionNode *head;
        int total_count;  //total connections
        int free_count;   //free connections
        pthread_mutex_t lock;
} ConnectionManager;

typedef struct tagConnectionPool {
        HashArray hash_array;  //key is ip:port, value is ConnectionManager
        pthread_mutex_t lock;
        int connect_timeout;
        int max_count_per_entry;  //0 means no limit

        /*
        connections whose the idle time exceeds this time will be closed
        */
        int max_idle_time;
} ConnectionPool;

呃策州,注釋已經(jīng)一目了然了嘲叔。

三層結(jié)構(gòu)

pool->manager->node

pool使用哈希來(lái)定位manager,因?yàn)樽鳛閗ey的ip:port是唯一的抽活,而后用鏈表來(lái)管理該節(jié)點(diǎn)的所有連接。

fast_mblock.c/fast_mblock.h:

鏈表的一個(gè)變種锰什,存儲(chǔ)有已分配的對(duì)象和已經(jīng)釋放的對(duì)象下硕,大致相當(dāng)于一個(gè)對(duì)象池,在trunk功能中被使用汁胆。

/* free node chain */ 
struct fast_mblock_node
{
        struct fast_mblock_node *next;
        char data[0];   //the data buffer
};

/* malloc chain */
struct fast_mblock_malloc
{
        struct fast_mblock_malloc *next;
};

struct fast_mblock_man
{
        struct fast_mblock_node *free_chain_head;     //free node chain
        struct fast_mblock_malloc *malloc_chain_head; //malloc chain to be freed
        int element_size;         //element size
        int alloc_elements_once;  //alloc elements once
        pthread_mutex_t lock;     //the lock for read / write free node chain
};

fast_task_queue.c/fast_task_queue.h:

任務(wù)隊(duì)列梭姓,挺重要的一個(gè)數(shù)據(jù)結(jié)構(gòu)

typedef struct ioevent_entry
{
        int fd;
        FastTimerEntry timer;
        IOEventCallback callback;
} IOEventEntry;

struct nio_thread_data
{
        struct ioevent_puller ev_puller;
        struct fast_timer timer;
        int pipe_fds[2];
        struct fast_task_info *deleted_list;        //鏈向已被刪除的任務(wù)指針,復(fù)用了已經(jīng)分配的內(nèi)存
};

struct fast_task_info
{
        IOEventEntry event;
        char client_ip[IP_ADDRESS_SIZE];
        void *arg;  //extra argument pointer
        char *data; //buffer for write or recv
        int size;   //alloc size
        int length; //data length
        int offset; //current offset
        int req_count; //request count
        TaskFinishCallBack finish_callback;     //任務(wù)結(jié)束回調(diào)
        struct nio_thread_data *thread_data;
        struct fast_task_info *next;
};

struct fast_task_queue
{
        struct fast_task_info *head;            //頭尾指針都存在嫩码,分別用來(lái)做隊(duì)列的出隊(duì)和入隊(duì)
        struct fast_task_info *tail;
        pthread_mutex_t lock;
        int max_connections;
        int min_buff_size;
        int max_buff_size;
        int arg_size;
        bool malloc_whole_block;
};

fast_timer.c/fast_timer.h:

時(shí)間哈希表誉尖,以u(píng)nix時(shí)間戳作為key,用雙向鏈表解決沖突铸题,可以根據(jù)當(dāng)前的使用量進(jìn)行rehash等操作铡恕。

在剛才的fast_task_queue中被使用

typedef struct fast_timer_entry {
  int64_t expires;
  void *data;
  struct fast_timer_entry *prev;
  struct fast_timer_entry *next;
  bool rehash;
} FastTimerEntry;

typedef struct fast_timer_slot {
  struct fast_timer_entry head;
} FastTimerSlot;

typedef struct fast_timer {
  int slot_count;    //time wheel slot count
  int64_t base_time; //base time for slot 0
  int64_t current_time;
  FastTimerSlot *slots;
} FastTimer;

fdfs_global.c/fdfs_global.h:

定義了fdfs系統(tǒng)所使用的全局變量,包括超時(shí)丢间,版本號(hào)等等

int g_fdfs_connect_timeout = DEFAULT_CONNECT_TIMEOUT;
int g_fdfs_network_timeout = DEFAULT_NETWORK_TIMEOUT;
char g_fdfs_base_path[MAX_PATH_SIZE] = {'/', 't', 'm', 'p', '\0'};
Version g_fdfs_version = {5, 1};
bool g_use_connection_pool = false;
ConnectionPool g_connection_pool;
int g_connection_pool_max_idle_time = 3600;

fdfs_http_shared.c/fdfs_http_share.h:

FastDFS使用token來(lái)防盜鏈和分享圖片探熔,這一段我也不確定『娲欤回頭再來(lái)看诀艰。

hash.c/hash.h:

經(jīng)典的哈希結(jié)構(gòu),在FastDFS中應(yīng)用的很廣

哈希找到域饮六,而后用鏈表解決沖突

typedef struct tagHashData
{
        int key_len;
        int value_len;
        int malloc_value_size;

#ifdef HASH_STORE_HASH_CODE
        unsigned int hash_code;
#endif

        char *value;
        struct tagHashData *next;       //解決沖突
        char key[0];
} HashData;

typedef struct tagHashArray
{
        HashData **buckets;
        HashFunc hash_func;
        int item_count;
        unsigned int *capacity;
        double load_factor;         //hash的負(fù)載因子其垄,在FastDFS中大于1.0進(jìn)行rehash
        int64_t max_bytes;          //最大占用字節(jié),用于計(jì)算負(fù)載因子
        int64_t bytes_used;         //已經(jīng)使用字節(jié)卤橄,用于計(jì)算負(fù)載因子
        bool is_malloc_capacity;
        bool is_malloc_value;
        unsigned int lock_count;        //鎖總數(shù)绿满,為了線程安全
        pthread_mutex_t *locks;
} HashArray;

typedef struct tagHashStat          //所有hash的統(tǒng)計(jì)情況
{
        unsigned int capacity;
        int item_count;
        int bucket_used;
        double bucket_avg_length;
        int bucket_max_length;
} HashStat;

http_func.c/http_func.h:

http功能已經(jīng)被砍掉了,這個(gè)也回頭來(lái)看窟扑。

ini_file_reader.c/ini_file_reader.h:

FastDFS用于初始化加載配置文件的函數(shù)棒口。

ioevent.c/ioevent.h && ioevent_loop.c/ioevent_loop.h:

對(duì)epoll,kqueue進(jìn)行簡(jiǎn)單封裝辜膝,成為一個(gè)有時(shí)間和網(wǎng)絡(luò)的事件庫(kù)无牵。這部分邏輯應(yīng)該會(huì)開(kāi)獨(dú)立的一章來(lái)分析

linux_stack_trace.c/linux_stack_trace.h:

/**

  • This source file is used to print out a stack-trace when your program
  • segfaults. It is relatively reliable and spot-on accurate.
    */

local_ip_func.c/local_ip_func.h:

基于系統(tǒng)調(diào)用getifaddrs來(lái)獲取本地IP

logger.c/logger.h:

這個(gè)太明顯了,log模塊

md5.c/md5.h:

fdfs_http_shared.c中被調(diào)用,在fdfs_http_gen_token的方法中對(duì)secret_key,file_id,timestamp進(jìn)行md5得到token

mime_file_parser.c/mime_file_parser.h:

從配置文件中加載mime識(shí)別的配置厂抖,至于什么是mime茎毁。。我也不知道,我問(wèn)問(wèn)大神們看看七蜘。

_os_bits.h:

定義了OS的位數(shù)

process_ctrl.c/process_ctrl.h:

從配置文件中載入pid路徑谭溉,定義了pid文件的增刪查改,并且提供了進(jìn)程停止橡卤,重啟等方法

pthread_func.c/pthread_func.h:

線程相關(guān)的操作扮念,包括初始化,創(chuàng)建碧库,殺死線程

sched_thread.c/sched_thread.h:

定時(shí)任務(wù)線程的模塊柜与,按照hour:minute的期限執(zhí)行任務(wù)

typedef struct tagScheduleEntry
{
        int id;  //the task id

        /* the time base to execute task, such as 00:00, interval is 3600,
           means execute the task every hour as 1:00, 2:00, 3:00 etc. */
        TimeInfo time_base;

        int interval;   //the interval for execute task, unit is second

        TaskFunc task_func; //callback function
        void *func_args;    //arguments pass to callback function

        /* following are internal fields, do not set manually! */
        time_t next_call_time;
        struct tagScheduleEntry *next;
} ScheduleEntry;

typedef struct
{
        ScheduleEntry *entries;
        int count;
} ScheduleArray;

typedef struct
{
        ScheduleArray scheduleArray;
        ScheduleEntry *head;  //schedule chain head
        ScheduleEntry *tail;  //schedule chain tail
        bool *pcontinue_flag;
} ScheduleContext;

稍微看了下實(shí)現(xiàn)的算法,這是一個(gè)變種的鏈表嵌灰,實(shí)現(xiàn)了一個(gè)變種的隊(duì)列弄匕。

但是所有的數(shù)據(jù)都存在scheduleArray這個(gè)數(shù)組里面,每次新任務(wù)插入后沽瞭,會(huì)對(duì)數(shù)組按時(shí)間進(jìn)行一次排序

這樣可以保證頭指針的是最先需要執(zhí)行的迁匠。

而后每次對(duì)head進(jìn)行出隊(duì),初始化next域以后重新從tail入隊(duì)驹溃。

總體來(lái)看是非常的簡(jiǎn)單高效的罚勾。

shared_func.c/shared_func.h:

一些工具函數(shù)棒拂,比如設(shè)置隨機(jī)種子什么的,沒(méi)必要單獨(dú)開(kāi)個(gè)文件,所以放在一起了官帘。

sockopt.c/sockopt.h:

socket的一些工具函數(shù)迷郑,進(jìn)行了簡(jiǎn)單的封裝嘿般。

tracker文件夾:

先分析tracker是因?yàn)閠racker只集成了網(wǎng)絡(luò)部分靴姿,而storage還有處理磁盤(pán)吞吐的,相對(duì)復(fù)雜一些

fdfs_share_func.c/fdfs_share_func.h

tracker和storage共用的一些工具函數(shù)拐辽,比如根據(jù)IP和端口獲取tracker的ID諸如此類的

fdfs_trackerd.c:

tracker的入口函數(shù)

tracker_dump.c/tracker_dump.h:

實(shí)現(xiàn)了fdfs_dump_tracker_global_vars_to_file這個(gè)函數(shù)

當(dāng)tracker收到了SIGUSR1或者SIGUSR2信號(hào)拣挪,將啟動(dòng)sigDumpHandler來(lái)調(diào)用這個(gè)函數(shù),將tracker當(dāng)前的狀態(tài)dump進(jìn)FastDFS跟目錄的logs/tracker_dump.log中

關(guān)于如何根據(jù)該dump文件恢復(fù)的俱诸,目前沒(méi)看到菠劝,后面再補(bǔ)充

tracker_func.c/tracker_func.h:

實(shí)現(xiàn)了tracker_load_from_conf_file這個(gè)函數(shù)

將tracker的一些基本必要信息,從conf_file中導(dǎo)出

tracker_global.c/tracker_global.h:

記錄了tracker使用的一些全局變量

tracker_http_check.c/tracker_http_check.h:

這個(gè)模塊會(huì)對(duì)tracker所管理的所有g(shù)roup的可用storage做檢測(cè)睁搭,測(cè)試所有的http端口是否可用

tracker_mem.c/tracker_mem.h:

這個(gè)模塊維護(hù)了內(nèi)存的所有數(shù)據(jù)赶诊,包括集群運(yùn)行情況等等,提供了save,change和load的接口對(duì)集群的總情況進(jìn)行修改

tracker_nio.c/tracker_nio.h:

nio的模塊在common/ioevent和common/ioevent_loop的基礎(chǔ)上進(jìn)行調(diào)用

tracker_proto.c/tracker_proto.h:

定義了tracker通信的協(xié)議园骆,有時(shí)間可以分析下舔痪。

tracker_relationship.c/tracker_relationship.h:

定義了tracker之間通信的方式,并且定義了選出leader,ping leader等功能锌唾,有時(shí)間可以分析下锄码。

tracker_service.c/tracker_service.h:

tracker的邏輯層處理夺英,各個(gè)請(qǐng)求在nio后進(jìn)入work線程,而后分發(fā)到各個(gè)模塊

tracker_status.c/tracker_status.h:

tracker狀態(tài)的save和load模塊

tracker_types.h:

定義了tracker所用到的所有類型

storage文件夾:

fdfs_storage.c: storage的入口函數(shù)

storage_dio.c/storage_dio.h:

使用common/fast_task_queue實(shí)現(xiàn)了異步的磁盤(pán)IO滋捶,新任務(wù)由storage_dio_queue_push方法入隊(duì)

同時(shí)包含了trunk模塊的處理痛悯,trunk模塊后面再提

storage_disk_recovery.c/storage_disk_recovery.h:

storage的單盤(pán)恢復(fù)算法,用于故障恢復(fù)

storage_dump.c/storage_dump.h:

和tracker_dump原理相同

storage_func.c/storage_func.h:

storage_func_init函數(shù)對(duì)應(yīng)著tracker的tracker_load_from_conf_file函數(shù)

除此之外重窟,還提供了根據(jù)storage_id或者ip判斷是否是本機(jī)的函數(shù)

還提供了一些數(shù)據(jù)持久化的接口

storage_global.c/storage_global.h:

定義了storage使用的全局變量

storage_ip_changed_dealer.c/storage_ip_changer_dealer.h:

storage實(shí)現(xiàn)ip地址改變的模塊

int storage_get_my_tracker_client_ip();         //獲取storage作為tracker客戶端的ip

int storage_changelog_req();                //接入tracker的changelog
int storage_check_ip_changed();             //檢查ip是否改變

storage_nio.c/storage_nio.h:

nio的模塊在common/ioevent和common/ioevent_loop的基礎(chǔ)上進(jìn)行調(diào)用

storage_param_getter.c/storage_param_getter.h:

storage_get_params_from_tracker函數(shù)载萌,顧名思義,從tracker獲取自身的參數(shù)

storage_service.c/storage_service.h:

storage的邏輯層處理巡扇,各個(gè)請(qǐng)求在nio后進(jìn)入work線程扭仁,而后分發(fā)到各個(gè)模塊

storage_sync.c/storage_sync.h:

storage的同步模塊,眾所周知霎迫,F(xiàn)astDFS的同步模塊是根據(jù)時(shí)間戳進(jìn)行的弱一致性同步

tracker_client_thread.c/tracker_client_thread.h

tracker_report的前綴提示的很明顯,這部分是storage作為tracker的客戶端帘靡,向tracker發(fā)送心跳知给,匯報(bào)自己的狀態(tài)等等

全部接口如下:

int tracker_report_init();
int tracker_report_destroy();
int tracker_report_thread_start();
int kill_tracker_report_threads();

int tracker_report_join(ConnectionInfo *pTrackerServer, \
                const int tracker_index, const bool sync_old_done);
int tracker_report_storage_status(ConnectionInfo *pTrackerServer, \
                FDFSStorageBrief *briefServer);
int tracker_sync_src_req(ConnectionInfo *pTrackerServer, \
                StorageBinLogReader *pReader);
int tracker_sync_diff_servers(ConnectionInfo *pTrackerServer, \
                FDFSStorageBrief *briefServers, const int server_count);
int tracker_deal_changelog_response(ConnectionInfo *pTrackerServer);

trunk_mgr:

這是storage文件的子目錄,實(shí)現(xiàn)了trunk功能

trunk功能比較零碎描姚,我目前還沒(méi)搞明白涩赢,比如為什么storage和trunk模塊交互,storage是作為client出現(xiàn)的轩勘,而不是直接調(diào)用trunk筒扒。

這部分內(nèi)容應(yīng)該要單獨(dú)開(kāi)一章來(lái)分析。

FastDFS源碼解析(2)——–trunk模塊分析#

trunk功能是把大量小文件合并存儲(chǔ)绊寻,大量的小文件會(huì)大量消耗linux文件系統(tǒng)的node花墩,使樹(shù)變的過(guò)于龐大,降低了讀寫(xiě)效率

因此小文件合并存儲(chǔ)能顯著緩解這一壓力

我將對(duì)上傳和下載流程分析來(lái)追蹤trunk模塊的行為澄步。

在storage_service模塊中冰蘑,storage_service.c/storage_deal_task對(duì)請(qǐng)求安裝cmd進(jìn)行分離邏輯來(lái)處理

在storage_upload_file中處理上傳邏輯

/**
1 byte: store path index
8 bytes: file size 
FDFS_FILE_EXT_NAME_MAX_LEN bytes: file ext name, do not include dot (.)
file size bytes: file content
**/
static int storage_upload_file(struct fast_task_info *pTask, bool bAppenderFile)
{
    StorageClientInfo *pClientInfo;
    StorageFileContext *pFileContext;
    DisconnectCleanFunc clean_func;
    char *p;
    char filename[128];
    char file_ext_name[FDFS_FILE_PREFIX_MAX_LEN + 1];
    int64_t nInPackLen;
    int64_t file_offset;
    int64_t file_bytes;
    int crc32;
    int store_path_index;
    int result;
    int filename_len;

    pClientInfo = (StorageClientInfo *)pTask->arg;
    pFileContext =  &(pClientInfo->file_context);
    nInPackLen = pClientInfo->total_length - sizeof(TrackerHeader);

    //對(duì)包頭大小進(jìn)行驗(yàn)證
    
    if (nInPackLen < 1 + FDFS_PROTO_PKG_LEN_SIZE + 
            FDFS_FILE_EXT_NAME_MAX_LEN)
    {
        logError("file: "__FILE__", line: %d, " \
            "cmd=%d, client ip: %s, package size " \
            INT64_PRINTF_FORMAT" is not correct, " \
            "expect length >= %d", __LINE__, \
            STORAGE_PROTO_CMD_UPLOAD_FILE, \
            pTask->client_ip,  nInPackLen, \
            1 + FDFS_PROTO_PKG_LEN_SIZE + \
            FDFS_FILE_EXT_NAME_MAX_LEN);
        pClientInfo->total_length = sizeof(TrackerHeader);
        return EINVAL;
    }

    //跳過(guò)包頭第一段,獲得文件路徑索引號(hào)
    p = pTask->data + sizeof(TrackerHeader);
    store_path_index = *p++;

    if (store_path_index == -1)
    {
        if ((result=storage_get_storage_path_index( \
            &store_path_index)) != 0)
        {
            logError("file: "__FILE__", line: %d, " \
                "get_storage_path_index fail, " \
                "errno: %d, error info: %s", __LINE__, \
                result, STRERROR(result));
            pClientInfo->total_length = sizeof(TrackerHeader);
            return result;
        }
    }
    else if (store_path_index < 0 || store_path_index >= \
        g_fdfs_store_paths.count)
    {
        logError("file: "__FILE__", line: %d, " \
            "client ip: %s, store_path_index: %d " \
            "is invalid", __LINE__, \
            pTask->client_ip, store_path_index);
        pClientInfo->total_length = sizeof(TrackerHeader);
        return EINVAL;
    }

    //獲取文件大小
    file_bytes = buff2long(p);
    p += FDFS_PROTO_PKG_LEN_SIZE;
    if (file_bytes < 0 || file_bytes != nInPackLen - \
            (1 + FDFS_PROTO_PKG_LEN_SIZE + \
             FDFS_FILE_EXT_NAME_MAX_LEN))
    {
        logError("file: "__FILE__", line: %d, " \
            "client ip: %s, pkg length is not correct, " \
            "invalid file bytes: "INT64_PRINTF_FORMAT \
            ", total body length: "INT64_PRINTF_FORMAT, \
            __LINE__, pTask->client_ip, file_bytes, nInPackLen);
        pClientInfo->total_length = sizeof(TrackerHeader);
        return EINVAL;
    }

    //獲取文件名
    memcpy(file_ext_name, p, FDFS_FILE_EXT_NAME_MAX_LEN);
    *(file_ext_name + FDFS_FILE_EXT_NAME_MAX_LEN) = '\0';
    p += FDFS_FILE_EXT_NAME_MAX_LEN;
    if ((result=fdfs_validate_filename(file_ext_name)) != 0)
    {
        logError("file: "__FILE__", line: %d, " \
            "client ip: %s, file_ext_name: %s " \
            "is invalid!", __LINE__, \
            pTask->client_ip, file_ext_name);
        pClientInfo->total_length = sizeof(TrackerHeader);
        return result;
    }

    pFileContext->calc_crc32 = true;
    pFileContext->calc_file_hash = g_check_file_duplicate;
    pFileContext->extra_info.upload.start_time = g_current_time;

    strcpy(pFileContext->extra_info.upload.file_ext_name, file_ext_name);
    storage_format_ext_name(file_ext_name, \
            pFileContext->extra_info.upload.formatted_ext_name);
    pFileContext->extra_info.upload.trunk_info.path. \
                store_path_index = store_path_index;
    pFileContext->extra_info.upload.file_type = _FILE_TYPE_REGULAR;
    pFileContext->sync_flag = STORAGE_OP_TYPE_SOURCE_CREATE_FILE;
    pFileContext->timestamp2log = pFileContext->extra_info.upload.start_time;
    pFileContext->op = FDFS_STORAGE_FILE_OP_WRITE;
    
    //如果是追加寫(xiě)文件村缸,注目額外的文件追加命令值
    if (bAppenderFile)
    {
        pFileContext->extra_info.upload.file_type |= \
                    _FILE_TYPE_APPENDER;
    }
    else
    {
        //判斷是否開(kāi)了trunk_file功能祠肥,根據(jù)大小檢查是否需要trunk合并存儲(chǔ)
        if (g_if_use_trunk_file && trunk_check_size( \
            TRUNK_CALC_SIZE(file_bytes)))
        {
            pFileContext->extra_info.upload.file_type |= \
                        _FILE_TYPE_TRUNK;
        }
    }

    //根據(jù)上一步的檢查需要開(kāi)啟trunk合并存儲(chǔ)
    if (pFileContext->extra_info.upload.file_type & _FILE_TYPE_TRUNK)
    {
        FDFSTrunkFullInfo *pTrunkInfo;

        pFileContext->extra_info.upload.if_sub_path_alloced = true;
        pTrunkInfo = &(pFileContext->extra_info.upload.trunk_info);
                //為trunk文件名分配空間,并添加到緩存
        if ((result=trunk_client_trunk_alloc_space( \
            TRUNK_CALC_SIZE(file_bytes), pTrunkInfo)) != 0)
        {
            pClientInfo->total_length = sizeof(TrackerHeader);
            return result;
        }

        clean_func = dio_trunk_write_finish_clean_up;
        file_offset = TRUNK_FILE_START_OFFSET((*pTrunkInfo));
            pFileContext->extra_info.upload.if_gen_filename = true;
        trunk_get_full_filename(pTrunkInfo, pFileContext->filename, \
                sizeof(pFileContext->filename));
                //注冊(cè)trunk操作的回調(diào)
        pFileContext->extra_info.upload.before_open_callback = \
                    dio_check_trunk_file_when_upload;
        pFileContext->extra_info.upload.before_close_callback = \
                    dio_write_chunk_header;
        pFileContext->open_flags = O_RDWR | g_extra_open_file_flags;
    }
    else
    {
        //普通文件的方式梯皿,略過(guò)
        ...
    }

    return storage_write_to_file(pTask, file_offset, file_bytes, \
            p - pTask->data, dio_write_file, \
            storage_upload_file_done_callback, \
            clean_func, store_path_index);
}
  • 追蹤一下trunk_client_trunk_alloc_space的實(shí)現(xiàn)
int trunk_client_trunk_alloc_space(const int file_size, \
                FDFSTrunkFullInfo *pTrunkInfo)
{
        int result;
        ConnectionInfo trunk_server;
        ConnectionInfo *pTrunkServer;
        
    //如果自己就是trunker仇箱,直接操作
        if (g_if_trunker_self)
        {       
                return trunk_alloc_space(file_size, pTrunkInfo);
        }               
                
    //否則根據(jù)trunk_server的ip和port進(jìn)行連接
        if (*(g_trunk_server.ip_addr) == '\0')
        {               
                logError("file: "__FILE__", line: %d, " \
                        "no trunk server", __LINE__);
                return EAGAIN;
        }       
                
        memcpy(&trunk_server, &g_trunk_server, sizeof(ConnectionInfo));
        if ((pTrunkServer=tracker_connect_server(&trunk_server, &result)) == NULL)
        {       
                logError("file: "__FILE__", line: %d, " \
                        "can't alloc trunk space because connect to trunk " \
                        "server %s:%d fail, errno: %d", __LINE__, \
                        trunk_server.ip_addr, trunk_server.port, result);
                return result;
        }
        
    //使用client api進(jìn)行操作
        result = trunk_client_trunk_do_alloc_space(pTrunkServer, \
                        file_size, pTrunkInfo);
                
        tracker_disconnect_server_ex(pTrunkServer, result != 0);
        return result;  
}
  • 對(duì)直接調(diào)用和client_api操作分別追蹤
int trunk_alloc_space(const int size, FDFSTrunkFullInfo *pResult)
{
        FDFSTrunkSlot target_slot;
        FDFSTrunkSlot *pSlot;
        FDFSTrunkNode *pPreviousNode;
        FDFSTrunkNode *pTrunkNode;
        int result;

        STORAGE_TRUNK_CHECK_STATUS();

        target_slot.size = (size > g_slot_min_size) ? size : g_slot_min_size;
        target_slot.head = NULL;

        pPreviousNode = NULL;
        pTrunkNode = NULL;
    //分配trunk需要鎖
        pthread_mutex_lock(&trunk_mem_lock);
    //尋找可以插入該文件的地方
        while (1)
        {
                pSlot = (FDFSTrunkSlot *)avl_tree_find_ge(tree_info_by_sizes \
                         + pResult->path.store_path_index, &target_slot);
                if (pSlot == NULL)
                {
                        break;
                }

                pPreviousNode = NULL;
                pTrunkNode = pSlot->head;
                while (pTrunkNode != NULL && \
                        pTrunkNode->trunk.status == FDFS_TRUNK_STATUS_HOLD)
                {
                        pPreviousNode = pTrunkNode;
                        pTrunkNode = pTrunkNode->next;
                }

                if (pTrunkNode != NULL)
                {
                        break;
                }

                target_slot.size = pSlot->size + 1;
        }

    //找到了,于是插入
        if (pTrunkNode != NULL)
        {
                if (pPreviousNode == NULL)
                {
                        pSlot->head = pTrunkNode->next;
                        if (pSlot->head == NULL)
                        {
                                trunk_delete_size_tree_entry(pResult->path. \
                store_path_index, pSlot);
                        }
                }
                else
                {
                        pPreviousNode->next = pTrunkNode->next;
                }

                trunk_free_block_delete(&(pTrunkNode->trunk));
        }
        else
        {
        //沒(méi)找到东羹,為他創(chuàng)建一個(gè)單獨(dú)的trunk_file
                pTrunkNode = trunk_create_trunk_file(pResult->path. \
                                        store_path_index, &result);
                if (pTrunkNode == NULL)
                {
                        pthread_mutex_unlock(&trunk_mem_lock);
                        return result;
                }
        }
        pthread_mutex_unlock(&trunk_mem_lock);

        result = trunk_split(pTrunkNode, size);
        if (result != 0)
        {
                return result;
        }

        pTrunkNode->trunk.status = FDFS_TRUNK_STATUS_HOLD;
        result = trunk_add_free_block(pTrunkNode, true);
        if (result == 0)
        {
                memcpy(pResult, &(pTrunkNode->trunk), \
                        sizeof(FDFSTrunkFullInfo));
        }

        return result;
}
static int trunk_client_trunk_do_alloc_space(ConnectionInfo *pTrunkServer, \
                const int file_size, FDFSTrunkFullInfo *pTrunkInfo)
{
        TrackerHeader *pHeader;

    //初始化請(qǐng)求包等等數(shù)據(jù)剂桥,略過(guò)
    ...

        pHeader->cmd = STORAGE_PROTO_CMD_TRUNK_ALLOC_SPACE;

        if ((result=tcpsenddata_nb(pTrunkServer->sock, out_buff, \
                        sizeof(out_buff), g_fdfs_network_timeout)) != 0)
        {
                logError("file: "__FILE__", line: %d, " \
                        "send data to storage server %s:%d fail, " \
                        "errno: %d, error info: %s", __LINE__, \
                        pTrunkServer->ip_addr, pTrunkServer->port, \
                        result, STRERROR(result));

                return result;
        }

        p = (char *)&trunkBuff;
        if ((result=fdfs_recv_response(pTrunkServer, \
                &p, sizeof(FDFSTrunkInfoBuff), &in_bytes)) != 0)
        {
                return result;
        }

    //設(shè)置pTrunckInfo信息,略過(guò)
    ...

        return 0;
}
  • 追蹤解析STORAGE_PROTO_CMD_TRUNK_ALLOC_SPACE行為的服務(wù)端函數(shù)
    storage_service.c會(huì)將其由storage_server_trunk_alloc_space函數(shù)來(lái)解析
/**
 * request package format:
 * FDFS_GROUP_NAME_MAX_LEN bytes: group_name
 * 4 bytes: file size
 * 1 bytes: store_path_index
 *
 * response package format:
 * 1 byte: store_path_index
 * 1 byte: sub_path_high
 * 1 byte: sub_path_low
 * 4 bytes: trunk file id
 * 4 bytes: trunk offset
 * 4 bytes: trunk size
 * **/
static int storage_server_trunk_alloc_space(struct fast_task_info *pTask)
{
        StorageClientInfo *pClientInfo;
        FDFSTrunkInfoBuff *pApplyBody;
        char *in_buff;
        char group_name[FDFS_GROUP_NAME_MAX_LEN + 1];
        FDFSTrunkFullInfo trunkInfo;
        int64_t nInPackLen;
        int file_size;
        int result;

        pClientInfo = (StorageClientInfo *)pTask->arg;
        nInPackLen = pClientInfo->total_length - sizeof(TrackerHeader);
        pClientInfo->total_length = sizeof(TrackerHeader);

        CHECK_TRUNK_SERVER(pTask)

        if (nInPackLen != FDFS_GROUP_NAME_MAX_LEN + 5)
        {
                logError("file: "__FILE__", line: %d, " \
                        "cmd=%d, client ip: %s, package size " \
                        INT64_PRINTF_FORMAT" is not correct, " \
                        "expect length: %d", __LINE__, \
                        STORAGE_PROTO_CMD_TRUNK_ALLOC_SPACE, \
                        pTask->client_ip,  nInPackLen, \
                        FDFS_GROUP_NAME_MAX_LEN + 5);
                return EINVAL;
        }

        in_buff = pTask->data + sizeof(TrackerHeader);
        memcpy(group_name, in_buff, FDFS_GROUP_NAME_MAX_LEN);
        *(group_name + FDFS_GROUP_NAME_MAX_LEN) = '\0';
        if (strcmp(group_name, g_group_name) != 0)
        {
                logError("file: "__FILE__", line: %d, " \
                        "client ip:%s, group_name: %s " \
        "not correct, should be: %s", \
                        __LINE__, pTask->client_ip, \
                        group_name, g_group_name);
                return EINVAL;
        }

        file_size = buff2int(in_buff + FDFS_GROUP_NAME_MAX_LEN);
        if (file_size < 0 || !trunk_check_size(file_size))
        {
                logError("file: "__FILE__", line: %d, " \
                        "client ip:%s, invalid file size: %d", \
                        __LINE__, pTask->client_ip, file_size);
                return EINVAL;
        }

        trunkInfo.path.store_path_index = *(in_buff+FDFS_GROUP_NAME_MAX_LEN+4);
    //實(shí)質(zhì)還是調(diào)用的trunk_alloc_space
        if ((result=trunk_alloc_space(file_size, &trunkInfo)) != 0)
        {
                return result;
        }

        pApplyBody = (FDFSTrunkInfoBuff *)(pTask->data+sizeof(TrackerHeader));
        pApplyBody->store_path_index = trunkInfo.path.store_path_index;
        pApplyBody->sub_path_high = trunkInfo.path.sub_path_high;
        pApplyBody->sub_path_low = trunkInfo.path.sub_path_low;
        int2buff(trunkInfo.file.id, pApplyBody->id);
        int2buff(trunkInfo.file.offset, pApplyBody->offset);
        int2buff(trunkInfo.file.size, pApplyBody->size);

        pClientInfo->total_length = sizeof(TrackerHeader) + \
                                sizeof(FDFSTrunkInfoBuff);
        return 0;
}

trunk_client_trunk_alloc_space會(huì)向同組內(nèi)唯一的trunk_server申請(qǐng)空間

最終的實(shí)現(xiàn)還是trunk_alloc_space函數(shù)

trunk相當(dāng)于一個(gè)KV吧属提。介個(gè)會(huì)不會(huì)出現(xiàn)單點(diǎn)問(wèn)題渊额,這臺(tái)trunk失效以后如何冗余故障,接著往下分析看看

以下這段函數(shù)是在tracker_client_thread里面的,大致是storage和tracker的一個(gè)交互旬迹,如果有故障冗余火惊,這里應(yīng)該存在機(jī)制

static int tracker_check_response(ConnectionInfo *pTrackerServer, \
    bool *bServerPortChanged)
{
    int64_t nInPackLen;
    TrackerHeader resp;
    int server_count;
    int result;
    char in_buff[1 + (2 + FDFS_MAX_SERVERS_EACH_GROUP) * \
            sizeof(FDFSStorageBrief)];
    FDFSStorageBrief *pBriefServers;
    char *pFlags;

    //解析包
    ...
    
    //tracker_leader變化
    if ((*pFlags) & FDFS_CHANGE_FLAG_TRACKER_LEADER)
    {
        ...
    }

    //trunk_leader變化
    if ((*pFlags) & FDFS_CHANGE_FLAG_TRUNK_SERVER)
    {
        if (server_count < 1)
        {
            logError("file: "__FILE__", line: %d, " \
                "tracker server %s:%d, reponse server " \
                "count: %d < 1", __LINE__, \
                pTrackerServer->ip_addr, \
                pTrackerServer->port, server_count);
            return EINVAL;
        }

        //未啟動(dòng)trunk服務(wù),從tracker重新加載
        if (!g_if_use_trunk_file)
        {
            logInfo("file: "__FILE__", line: %d, " \
                "reload parameters from tracker server", \
                __LINE__);
            storage_get_params_from_tracker();
        }

        //還未啟動(dòng)trunk服務(wù)奔垦,報(bào)錯(cuò)
        if (!g_if_use_trunk_file)
        {
            logWarning("file: "__FILE__", line: %d, " \
                "tracker server %s:%d, " \
                "my g_if_use_trunk_file is false, " \
                "can't support trunk server!", \
                __LINE__, pTrackerServer->ip_addr, \
                pTrackerServer->port);
        }
        else
        {
        memcpy(g_trunk_server.ip_addr, pBriefServers->ip_addr, \
            IP_ADDRESS_SIZE - 1);
        *(g_trunk_server.ip_addr + (IP_ADDRESS_SIZE - 1)) = '\0';
        g_trunk_server.port = buff2int(pBriefServers->port);
        //如果本地的ip端口和trunk_server一致
        if (is_local_host_ip(g_trunk_server.ip_addr) && \
            g_trunk_server.port == g_server_port)
        {
            //我已經(jīng)是trunk了屹耐,tracker重啟把我重新選為trunk了
            if (g_if_trunker_self)
            {
            logWarning("file: "__FILE__", line: %d, " \
                "I am already the trunk server %s:%d, " \
                "may be the tracker server restart", \
                __LINE__, g_trunk_server.ip_addr, \
                g_trunk_server.port);
            }
            else
            {
            //我成為了新的trunk
            logInfo("file: "__FILE__", line: %d, " \
                "I am the the trunk server %s:%d", __LINE__, \
                g_trunk_server.ip_addr, g_trunk_server.port);

            tracker_fetch_trunk_fid(pTrackerServer);
            g_if_trunker_self = true;

            if ((result=storage_trunk_init()) != 0)
            {
                return result;
            }

            if (g_trunk_create_file_advance && \
                g_trunk_create_file_interval > 0)
            {
            ScheduleArray scheduleArray;
            ScheduleEntry entries[1];

            entries[0].id = TRUNK_FILE_CREATOR_TASK_ID;
            entries[0].time_base = g_trunk_create_file_time_base;
            entries[0].interval = g_trunk_create_file_interval;
            entries[0].task_func = trunk_create_trunk_file_advance;
            entries[0].func_args = NULL;

            scheduleArray.count = 1;
            scheduleArray.entries = entries;
            sched_add_entries(&scheduleArray);
            }

            trunk_sync_thread_start_all();
            }
        }
        else
        {
            logInfo("file: "__FILE__", line: %d, " \
                "the trunk server is %s:%d", __LINE__, \
                g_trunk_server.ip_addr, g_trunk_server.port);

            //我以前是trunk,我讓權(quán)
            if (g_if_trunker_self)
            {
                int saved_trunk_sync_thread_count;

                logWarning("file: "__FILE__", line: %d, " \
                    "I am the old trunk server, " \
                    "the new trunk server is %s:%d", \
                    __LINE__, g_trunk_server.ip_addr, \
                    g_trunk_server.port);

                tracker_report_trunk_fid(pTrackerServer);
                g_if_trunker_self = false;

                saved_trunk_sync_thread_count = \
                        g_trunk_sync_thread_count;
                if (saved_trunk_sync_thread_count > 0)
                {
                    logInfo("file: "__FILE__", line: %d, "\
                        "waiting %d trunk sync " \
                        "threads exit ...", __LINE__, \
                        saved_trunk_sync_thread_count);
                }

                while (g_trunk_sync_thread_count > 0)
                {
                    usleep(50000);
                }

                if (saved_trunk_sync_thread_count > 0)
                {
                    logInfo("file: "__FILE__", line: %d, " \
                        "%d trunk sync threads exited",\
                        __LINE__, \
                        saved_trunk_sync_thread_count);
                }
                
                storage_trunk_destroy_ex(true);
                if (g_trunk_create_file_advance && \
                    g_trunk_create_file_interval > 0)
                {
                sched_del_entry(TRUNK_FILE_CREATOR_TASK_ID);
                }
            }
        }
        }

        pBriefServers += 1;
        server_count -= 1;
    }

    if (!((*pFlags) & FDFS_CHANGE_FLAG_GROUP_SERVER))
    {
        return 0;
    }

    /*
    //printf("resp server count=%d\n", server_count);
    {
        int i;
        for (i=0; i<server_count; i++)
        {   
            //printf("%d. %d:%s\n", i+1, pBriefServers[i].status, \
                pBriefServers[i].ip_addr);
        }
    }
    */

    if (*bServerPortChanged)
    {
        if (!g_use_storage_id)
        {
            FDFSStorageBrief *pStorageEnd;
            FDFSStorageBrief *pStorage;

            *bServerPortChanged = false;
            pStorageEnd = pBriefServers + server_count;
            for (pStorage=pBriefServers; pStorage<pStorageEnd; 
                pStorage++)
            {
                if (strcmp(pStorage->id, g_my_server_id_str) == 0)
                {
                    continue;
                }

                tracker_rename_mark_files(pStorage->ip_addr, \
                    g_last_server_port, pStorage->ip_addr, \
                    g_server_port);
            }
        }

        if (g_server_port != g_last_server_port)
        {
            g_last_server_port = g_server_port;
            if ((result=storage_write_to_sync_ini_file()) != 0)
            {
                return result;
            }
        }
    }

    return tracker_merge_servers(pTrackerServer, \
                pBriefServers, server_count);
}

可以看到椿猎,trunk的失敗確實(shí)是存在冗余機(jī)制惶岭,由tracker來(lái)選出trunk。

trunk的分析暫告一段落犯眠,刪除文件后是否存在文件空洞按灶,空洞的利用率如何,都得用數(shù)據(jù)說(shuō)話才行哈筐咧。

總結(jié):

每個(gè)組都有唯一的trunk leader,組內(nèi)所有trunk文件的信息鸯旁,由這個(gè)trunk leader內(nèi)部組織的avl樹(shù)來(lái)保存。

上傳文件后量蕊,storage會(huì)向trunk leader發(fā)起申請(qǐng)空間的請(qǐng)求铺罢,這時(shí)trunk leader會(huì)使用一個(gè)全局的鎖,獲得了trunk存儲(chǔ)的位置后残炮,storage在本地寫(xiě)磁盤(pán)韭赘。

下載文件時(shí),trunk信息在文件名里面已經(jīng)包含势就,只需要直接讀即可泉瞻。

使用trunk方式主要是為了解決node過(guò)多造成讀寫(xiě)性能下降的問(wèn)題,但是引入trunk方式本身也會(huì)造成一定的性能損耗苞冯。

目前感覺(jué)我對(duì)trunk功能還是hold不住瓦灶,包括如果trunk出錯(cuò),怎么樣恢復(fù)trunk文件的數(shù)據(jù)抱完,因?yàn)闆](méi)有提供的官方的工具贼陶,所以不太敢用。

以后如果有需求在跟進(jìn)巧娱,先告一段落了吧碉怔。

FastDFS源碼解析(3)——–通信協(xié)議分析#

就上傳和下載進(jìn)行分析,其他暫時(shí)略過(guò)

上傳:

1 根據(jù)ip,port連接上tracker

2 發(fā)送一個(gè)10字節(jié)的包禁添,其中第9個(gè)字節(jié)為T(mén)RACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ONE撮胧,也就是101

3 接受一個(gè)10字節(jié)的包,其中第10個(gè)字節(jié)為返回狀態(tài)老翘,如果是0芹啥,說(shuō)明一切正常

4 接受的這個(gè)包锻离,0-8字節(jié)是下面要接收的包的大小,通過(guò)以下算法可以還原成數(shù)字

int64_t buff2long(const char *buff)
{                       
        unsigned char *p;
        p = (unsigned char *)buff;
        return  (((int64_t)(*p)) << 56) | \
                (((int64_t)(*(p+1))) << 48) |  \
                (((int64_t)(*(p+2))) << 40) |  \
                (((int64_t)(*(p+3))) << 32) |  \
                (((int64_t)(*(p+4))) << 24) |  \
                (((int64_t)(*(p+5))) << 16) |  \
                (((int64_t)(*(p+6))) << 8) | \
                ((int64_t)(*(p+7)));
}   

void long2buff(int64_t n, char *buff)
{                       
        unsigned char *p;
        p = (unsigned char *)buff;
        *p++ = (n >> 56) & 0xFF;
        *p++ = (n >> 48) & 0xFF;
        *p++ = (n >> 40) & 0xFF;
        *p++ = (n >> 32) & 0xFF;
        *p++ = (n >> 24) & 0xFF;
        *p++ = (n >> 16) & 0xFF;
        *p++ = (n >> 8) & 0xFF;
        *p++ = n & 0xFF;
}

5 讀完這個(gè)數(shù)字對(duì)應(yīng)的字節(jié)數(shù)目墓怀,這個(gè)數(shù)字應(yīng)當(dāng)有TRACKER_QUERY_STORAGE_STORE_BODY_LEN長(zhǎng)汽纠,否則出錯(cuò)

  • define TRACKER_QUERY_STORAGE_STORE_BODY_LEN (FDFS_GROUP_NAME_MAX_LEN
    + IP_ADDRESS_SIZE - 1 + FDFS_PROTO_PKG_LEN_SIZE + 1)
    也就是16+16-1+8+1 = 40

6 這40個(gè)字節(jié),頭16字節(jié)是組名傀履,接著15字節(jié)是IP地址虱朵,接著8字節(jié)是端口號(hào),還是根據(jù)buff2long算法還原成數(shù)字钓账,最后1字節(jié)是store_path_index

tracker交互完畢碴犬,此時(shí)進(jìn)行storage操作

7 根據(jù)ip和端口連接storage

8 發(fā)送25字節(jié)的包

頭10字節(jié)是TrackerHeader一樣的結(jié)構(gòu),其中1-8字節(jié)的內(nèi)容為filesize+這個(gè)包的大邪鹉骸(25)-頭的大小(10)服协,也就是file_size+15這個(gè)數(shù),通過(guò)long2buff,轉(zhuǎn)換的8字節(jié)字串啦粹,然后其中第9字節(jié)的內(nèi)容是STORAGE_PROTO_CMD_UPLOAD_FILE偿荷,也就是11

第11字節(jié)是剛才接受的storage_path_index

第12-19字節(jié)是file_size,通過(guò)long2buff算法轉(zhuǎn)換為8字節(jié)字串

19-25字節(jié)是ext_name相關(guān)卖陵,這里設(shè)置為0即可

9 發(fā)送file_size字節(jié)內(nèi)容遭顶,即為文件信息

10 接受一個(gè)10字節(jié)的包张峰,其中第10個(gè)字節(jié)為返回狀態(tài)泪蔫,如果是0,說(shuō)明一切正常

11 接受的這個(gè)包喘批,0-8字節(jié)是下面要接收的包的大小撩荣,通過(guò)buff2long還原為數(shù)字

12 這個(gè)數(shù)字應(yīng)該大于FDFS_GROUP_NAME_MAX_LEN,也就是16字節(jié)饶深,否則出錯(cuò)

13 頭16字節(jié)為組名餐曹,后面全部的字節(jié)為remote_filename

14 上傳流程完成

下載:

下載需要上傳時(shí)rsp返回的文件ID,這里命名為file_id

1 連接tracker

2 切分file_id,第一個(gè)/前出現(xiàn)的即為group_name,后面的都是remote_filename

3 發(fā)送一個(gè)10字節(jié)的pHeader敌厘,其中1-8字節(jié)是FDFS_GROUP_NAME_MAX_LEN(值為16) 加上 remote_filename的長(zhǎng)度台猴,通過(guò)long2buff轉(zhuǎn)化而成的

第9字節(jié)是CMD TRACKER_PROTO_CMD_SERVICE_QUERY_FETCH_ONE,即為102

4 發(fā)送16字節(jié)是group_name

5 發(fā)送remote_filename這個(gè)字串

6 接受一個(gè)10字節(jié)的包俱两,其中第10個(gè)字節(jié)為返回狀態(tài)饱狂,如果是0,說(shuō)明一切正常

7 接受的這個(gè)包宪彩,1-8字節(jié)是下面要接收的包的大小休讳,通過(guò)buff2long可以還原成數(shù)字

8 讀完這個(gè)數(shù)字對(duì)應(yīng)的字節(jié)數(shù)目,這個(gè)數(shù)字應(yīng)當(dāng)有TRACKER_QUERY_STORAGE_FETCH_BODY_LEN(TRACKER_QUERY_STORAGE_STORE_BODY_LEN - 1,也就是39)長(zhǎng)尿孔,否則出錯(cuò)

9 這39個(gè)字節(jié)俊柔,頭16字節(jié)是組名(下載邏輯時(shí)可以忽略)筹麸,接著15字節(jié)是IP地址,接著8字節(jié)是端口號(hào)雏婶,還是根據(jù)buff2long算法還原成數(shù)字

10 和tracker的交互完成物赶,下面是storage

11 根據(jù)ip和端口連接storage

12 發(fā)送一個(gè)pHeader+file_offset+download_bytes+group_name(補(bǔ)全16字節(jié))+filename的數(shù)據(jù)包

也就是10+8+8+16+filename_size

1-8字節(jié)是8+8+16+filename_size的大小根據(jù)long2buff轉(zhuǎn)換的字串

9字節(jié)是STORAGE_PROTO_CMD_DOWNLOAD_FILE也就是14

11-18字節(jié)是file_offset的long2buff字串

19-26是download_bytes的long2buff字串

27-42是group_name

再往后就是finename

13 接受一個(gè)10字節(jié)的包,其中第10個(gè)字節(jié)為返回狀態(tài)尚骄,如果是0块差,說(shuō)明一切正常

14 接受的這個(gè)包,1-8字節(jié)是下面要接收的包的大小倔丈,通過(guò)buff2long可以還原成數(shù)字

15 將接收到的包寫(xiě)入文件憨闰,一次下載邏輯完畢

上傳下載是最經(jīng)典的邏輯,其他邏輯都可以從這里衍生需五,不做詳細(xì)介紹了

FastDFS源碼解析(4)——–storage運(yùn)行流程分析#

大致來(lái)分析一下fdfs storage是如何提供服務(wù)的鹉动,以上傳文件為例。

從storage的初始化函數(shù)來(lái)入手

int storage_service_init()
{
    int result;
    int bytes;
    struct storage_nio_thread_data *pThreadData;
    struct storage_nio_thread_data *pDataEnd;
    pthread_t tid;
    pthread_attr_t thread_attr;

    //storage任務(wù)線程鎖
    if ((result=init_pthread_lock(&g_storage_thread_lock)) != 0)
    {
        return result;
    }

    //路徑索引鎖
    if ((result=init_pthread_lock(&path_index_thread_lock)) != 0)
    {
        return result;
    }

    //狀態(tài)計(jì)數(shù)鎖
    if ((result=init_pthread_lock(&stat_count_thread_lock)) != 0)
    {
        return result;
    }

    //初始化線程堆棧大小
    if ((result=init_pthread_attr(&thread_attr, g_thread_stack_size)) != 0)
    {
        logError("file: "__FILE__", line: %d, " \
            "init_pthread_attr fail, program exit!", __LINE__);
        return result;
    }

    //建立任務(wù)task對(duì)象池宏邮,復(fù)用task類型
    if ((result=free_queue_init(g_max_connections, g_buff_size, \
                g_buff_size, sizeof(StorageClientInfo))) != 0)
    {
        return result;
    }

    bytes = sizeof(struct storage_nio_thread_data) * g_work_threads;
    g_nio_thread_data = (struct storage_nio_thread_data *)malloc(bytes);
    if (g_nio_thread_data == NULL)
    {
        logError("file: "__FILE__", line: %d, " \
            "malloc %d bytes fail, errno: %d, error info: %s", \
            __LINE__, bytes, errno, STRERROR(errno));
        return errno != 0 ? errno : ENOMEM;
    }
    memset(g_nio_thread_data, 0, bytes);

    g_storage_thread_count = 0;
    pDataEnd = g_nio_thread_data + g_work_threads;
    for (pThreadData=g_nio_thread_data; pThreadData<pDataEnd; pThreadData++)
    {
        if (ioevent_init(&pThreadData->thread_data.ev_puller,
            g_max_connections + 2, 1000, 0) != 0)
        {
            result  = errno != 0 ? errno : ENOMEM;
            logError("file: "__FILE__", line: %d, " \
                "ioevent_init fail, " \
                "errno: %d, error info: %s", \
                __LINE__, result, STRERROR(result));
            return result;
        }
        result = fast_timer_init(&pThreadData->thread_data.timer,
                2 * g_fdfs_network_timeout, g_current_time);
        if (result != 0)
        {
            logError("file: "__FILE__", line: %d, " \
                "fast_timer_init fail, " \
                "errno: %d, error info: %s", \
                __LINE__, result, STRERROR(result));
            return result;
        }

        if (pipe(pThreadData->thread_data.pipe_fds) != 0)
        {
            result = errno != 0 ? errno : EPERM;
            logError("file: "__FILE__", line: %d, " \
                "call pipe fail, " \
                "errno: %d, error info: %s", \
                __LINE__, result, STRERROR(result));
            break;
        }

#if defined(OS_LINUX)
        if ((result=fd_add_flags(pThreadData->thread_data.pipe_fds[0], \
                O_NONBLOCK | O_NOATIME)) != 0)
        {
            break;
        }
#else
        if ((result=fd_add_flags(pThreadData->thread_data.pipe_fds[0], \
                O_NONBLOCK)) != 0)
        {
            break;
        }
#endif

        //創(chuàng)建工作線程
        if ((result=pthread_create(&tid, &thread_attr, \
            work_thread_entrance, pThreadData)) != 0)
        {
            logError("file: "__FILE__", line: %d, " \
                "create thread failed, startup threads: %d, " \
                "errno: %d, error info: %s", \
                __LINE__, g_storage_thread_count, \
                result, STRERROR(result));
            break;
        }
        else
        {
            if ((result=pthread_mutex_lock(&g_storage_thread_lock)) != 0)
            {
                logError("file: "__FILE__", line: %d, " \
                    "call pthread_mutex_lock fail, " \
                    "errno: %d, error info: %s", \
                    __LINE__, result, STRERROR(result));
            }
            g_storage_thread_count++;
            if ((result=pthread_mutex_unlock(&g_storage_thread_lock)) != 0)
            {
                logError("file: "__FILE__", line: %d, " \
                    "call pthread_mutex_lock fail, " \
                    "errno: %d, error info: %s", \
                    __LINE__, result, STRERROR(result));
            }
        }
    }

    pthread_attr_destroy(&thread_attr);

    last_stat_change_count = g_stat_change_count;

    //DO NOT support direct IO !!!
    //g_extra_open_file_flags = g_disk_rw_direct ? O_DIRECT : 0;
    
    if (result != 0)
    {
        return result;
    }

    return result;
}
跟進(jìn)工作線程

static void *work_thread_entrance(void* arg)
{
    int result;
    struct storage_nio_thread_data *pThreadData;

    pThreadData = (struct storage_nio_thread_data *)arg;
    if (g_check_file_duplicate)
    {
        if ((result=fdht_copy_group_array(&(pThreadData->group_array),\
                &g_group_array)) != 0)
        {
            pthread_mutex_lock(&g_storage_thread_lock);
            g_storage_thread_count--;
            pthread_mutex_unlock(&g_storage_thread_lock);
            return NULL;
        }
    }
    
    //啟動(dòng)主io主循環(huán)泽示,為pThreadData->thread_data對(duì)應(yīng)的pipe_fd注冊(cè)回調(diào)函數(shù)
    //storage_recv_notify_read
    ioevent_loop(&pThreadData->thread_data, storage_recv_notify_read,
        task_finish_clean_up, &g_continue_flag);
    //循環(huán)退出,銷(xiāo)毀響應(yīng)數(shù)據(jù)結(jié)構(gòu)
    ioevent_destroy(&pThreadData->thread_data.ev_puller);

    if (g_check_file_duplicate)
    {
        if (g_keep_alive)
        {
            fdht_disconnect_all_servers(&(pThreadData->group_array));
        }

        fdht_free_group_array(&(pThreadData->group_array));
    }

    //總線程數(shù)目自減
    if ((result=pthread_mutex_lock(&g_storage_thread_lock)) != 0)
    {
        logError("file: "__FILE__", line: %d, " \
            "call pthread_mutex_lock fail, " \
            "errno: %d, error info: %s", \
            __LINE__, result, STRERROR(result));
    }
    g_storage_thread_count--;
    if ((result=pthread_mutex_unlock(&g_storage_thread_lock)) != 0)
    {
        logError("file: "__FILE__", line: %d, " \
            "call pthread_mutex_lock fail, " \
            "errno: %d, error info: %s", \
            __LINE__, result, STRERROR(result));
    }

    logDebug("file: "__FILE__", line: %d, " \
        "nio thread exited, thread count: %d", \
        __LINE__, g_storage_thread_count);

    return NULL;
}

除了work_thread_entrance線程蜜氨,還有一個(gè)叫做accept_thread_entrance的線程械筛,專門(mén)用來(lái)accept請(qǐng)求,防止大量的操作阻塞了accept的性能

static void *accept_thread_entrance(void* arg)
{
    int server_sock;
    int incomesock;
    struct sockaddr_in inaddr;
    socklen_t sockaddr_len;
    in_addr_t client_addr;
    char szClientIp[IP_ADDRESS_SIZE];
    long task_addr;
    struct fast_task_info *pTask;
    StorageClientInfo *pClientInfo;
    struct storage_nio_thread_data *pThreadData;

    server_sock = (long)arg;
    while (g_continue_flag)
    {
        sockaddr_len = sizeof(inaddr);
        incomesock = accept(server_sock, (struct sockaddr*)&inaddr, \
                    &sockaddr_len);
        if (incomesock < 0) //error
        {
            if (!(errno == EINTR || errno == EAGAIN))
            {
                logError("file: "__FILE__", line: %d, " \
                    "accept failed, " \
                    "errno: %d, error info: %s", \
                    __LINE__, errno, STRERROR(errno));
            }

            continue;
        }

        client_addr = getPeerIpaddr(incomesock, \
                szClientIp, IP_ADDRESS_SIZE);
        if (g_allow_ip_count >= 0)
        {
            if (bsearch(&client_addr, g_allow_ip_addrs, \
                    g_allow_ip_count, sizeof(in_addr_t), \
                    cmp_by_ip_addr_t) == NULL)
            {
                logError("file: "__FILE__", line: %d, " \
                    "ip addr %s is not allowed to access", \
                    __LINE__, szClientIp);

                close(incomesock);
                continue;
            }
        }

        if (tcpsetnonblockopt(incomesock) != 0)
        {
            close(incomesock);
            continue;
        }

        pTask = free_queue_pop();
        if (pTask == NULL)
        {
            logError("file: "__FILE__", line: %d, " \
                "malloc task buff failed", \
                __LINE__);
            close(incomesock);
            continue;
        }

        pClientInfo = (StorageClientInfo *)pTask->arg;
        
        //從task對(duì)象池里拿出一個(gè)task飒炎,將fd域填充為incomesock
        pTask->event.fd = incomesock;
        pClientInfo->stage = FDFS_STORAGE_STAGE_NIO_INIT;
        pClientInfo->nio_thread_index = pTask->event.fd % g_work_threads;
        pThreadData = g_nio_thread_data + pClientInfo->nio_thread_index;

        strcpy(pTask->client_ip, szClientIp);

        task_addr = (long)pTask;

        //通過(guò)pThreadData->thread_data.pipe_fds[1]將task傳給work_thread
        //work_thread監(jiān)視著pThreadData->thread_data.pipe_fds[0]
        //storage_recv_notify_read將被調(diào)用
        if (write(pThreadData->thread_data.pipe_fds[1], &task_addr, \
            sizeof(task_addr)) != sizeof(task_addr))
        {
            close(incomesock);
            free_queue_push(pTask);
            logError("file: "__FILE__", line: %d, " \
                "call write failed, " \
                "errno: %d, error info: %s", \
                __LINE__, errno, STRERROR(errno));
        }
    }

    return NULL;
}

關(guān)注一下storage_recv_notify_read函數(shù)

void storage_recv_notify_read(int sock, short event, void *arg)
{
    struct fast_task_info *pTask;
    StorageClientInfo *pClientInfo;
    long task_addr;
    int64_t remain_bytes;
    int bytes;
    int result;

    while (1)
    {
        //讀取這個(gè)task結(jié)構(gòu)
        if ((bytes=read(sock, &task_addr, sizeof(task_addr))) < 0)
        {
            if (!(errno == EAGAIN || errno == EWOULDBLOCK))
            {
                logError("file: "__FILE__", line: %d, " \
                    "call read failed, " \
                    "errno: %d, error info: %s", \
                    __LINE__, errno, STRERROR(errno));
            }

            break;
        }
        else if (bytes == 0)
        {
            logError("file: "__FILE__", line: %d, " \
                "call read failed, end of file", __LINE__);
            break;
        }

        pTask = (struct fast_task_info *)task_addr;
        pClientInfo = (StorageClientInfo *)pTask->arg;

        if (pTask->event.fd < 0)  //quit flag
        {
            return;
        }

        /* //logInfo("=====thread index: %d, pTask->event.fd=%d", \
            pClientInfo->nio_thread_index, pTask->event.fd);
        */

        if (pClientInfo->stage & FDFS_STORAGE_STAGE_DIO_THREAD)
        {
            pClientInfo->stage &= ~FDFS_STORAGE_STAGE_DIO_THREAD;
        }
        switch (pClientInfo->stage)
        {
            //初始化階段埋哟,進(jìn)行數(shù)據(jù)初始化
            case FDFS_STORAGE_STAGE_NIO_INIT:
                result = storage_nio_init(pTask);
                break;
            //暫時(shí)略過(guò),先看storage_nio_init
            case FDFS_STORAGE_STAGE_NIO_RECV:
                pTask->offset = 0;
                remain_bytes = pClientInfo->total_length - \
                           pClientInfo->total_offset;
                if (remain_bytes > pTask->size)
                {
                    pTask->length = pTask->size;
                }
                else
                {
                    pTask->length = remain_bytes;
                }

                if (set_recv_event(pTask) == 0)
                {
                    client_sock_read(pTask->event.fd,
                        IOEVENT_READ, pTask);
                }
                result = 0;
                break;
            case FDFS_STORAGE_STAGE_NIO_SEND:
                result = storage_send_add_event(pTask);
                break;
            case FDFS_STORAGE_STAGE_NIO_CLOSE:
                result = EIO;   //close this socket
                break;
            default:
                logError("file: "__FILE__", line: %d, " \
                    "invalid stage: %d", __LINE__, \
                    pClientInfo->stage);
                result = EINVAL;
                break;
        }

        if (result != 0)
        {
            add_to_deleted_list(pTask);
        }
    }
}

初始化實(shí)質(zhì)上是將task對(duì)應(yīng)的fd郎汪,注冊(cè)client_sock_read函數(shù) 同時(shí)將task狀態(tài)設(shè)置為FDFS_STORAGE_STAGE_NIO_RECV

static int storage_nio_init(struct fast_task_info *pTask)
{
    StorageClientInfo *pClientInfo;
    struct storage_nio_thread_data *pThreadData;

    pClientInfo = (StorageClientInfo *)pTask->arg;
    pThreadData = g_nio_thread_data + pClientInfo->nio_thread_index;

    pClientInfo->stage = FDFS_STORAGE_STAGE_NIO_RECV;
    return ioevent_set(pTask, &pThreadData->thread_data,
            pTask->event.fd, IOEVENT_READ, client_sock_read,
            g_fdfs_network_timeout);
}

看看這個(gè)client_sock_read函數(shù)

static void client_sock_read(int sock, short event, void *arg)
{
    int bytes;
    int recv_bytes;
    struct fast_task_info *pTask;
        StorageClientInfo *pClientInfo;

    pTask = (struct fast_task_info *)arg;
        pClientInfo = (StorageClientInfo *)pTask->arg;
    if (pClientInfo->canceled)
    {
        return;
    }

    if (pClientInfo->stage != FDFS_STORAGE_STAGE_NIO_RECV)
    {
        if (event & IOEVENT_TIMEOUT) {
            pTask->event.timer.expires = g_current_time +
                g_fdfs_network_timeout;
            fast_timer_add(&pTask->thread_data->timer,
                &pTask->event.timer);
        }

        return;
    }
    
    //超時(shí)了赤赊,刪除這個(gè)task
    if (event & IOEVENT_TIMEOUT)
    {
        if (pClientInfo->total_offset == 0 && pTask->req_count > 0)
        {
            pTask->event.timer.expires = g_current_time +
                g_fdfs_network_timeout;
            fast_timer_add(&pTask->thread_data->timer,
                &pTask->event.timer);
        }
        else
        {
            logError("file: "__FILE__", line: %d, " \
                "client ip: %s, recv timeout, " \
                "recv offset: %d, expect length: %d", \
                __LINE__, pTask->client_ip, \
                pTask->offset, pTask->length);

            task_finish_clean_up(pTask);
        }

        return;
    }

    //io錯(cuò)誤,一樣刪
    if (event & IOEVENT_ERROR)
    {
        logError("file: "__FILE__", line: %d, " \
            "client ip: %s, recv error event: %d, "
            "close connection", __LINE__, pTask->client_ip, event);

        task_finish_clean_up(pTask);
        return;
    }

    fast_timer_modify(&pTask->thread_data->timer,
        &pTask->event.timer, g_current_time +
        g_fdfs_network_timeout);
    while (1)
    {
        //pClientInfo的total_length域?yàn)?煞赢,說(shuō)明頭還沒(méi)接收抛计,接收一個(gè)頭
        if (pClientInfo->total_length == 0) //recv header
        {
            recv_bytes = sizeof(TrackerHeader) - pTask->offset;
        }
        else
        {
            recv_bytes = pTask->length - pTask->offset;
        }

        /*
        logInfo("total_length="INT64_PRINTF_FORMAT", recv_bytes=%d, "
            "pTask->length=%d, pTask->offset=%d",
            pClientInfo->total_length, recv_bytes, 
            pTask->length, pTask->offset);
        */

        bytes = recv(sock, pTask->data + pTask->offset, recv_bytes, 0);
        if (bytes < 0)
        {
            if (errno == EAGAIN || errno == EWOULDBLOCK)
            {
            }
            else
            {
                logError("file: "__FILE__", line: %d, " \
                    "client ip: %s, recv failed, " \
                    "errno: %d, error info: %s", \
                    __LINE__, pTask->client_ip, \
                    errno, STRERROR(errno));

                task_finish_clean_up(pTask);
            }

            return;
        }
        else if (bytes == 0)
        {
            logDebug("file: "__FILE__", line: %d, " \
                "client ip: %s, recv failed, " \
                "connection disconnected.", \
                __LINE__, pTask->client_ip);

            task_finish_clean_up(pTask);
            return;
        }

        //用包頭數(shù)據(jù)對(duì)pClientInfo進(jìn)行初始化
        if (pClientInfo->total_length == 0) //header
        {
            if (pTask->offset + bytes < sizeof(TrackerHeader))
            {
                pTask->offset += bytes;
                return;
            }

            pClientInfo->total_length=buff2long(((TrackerHeader *) \
                        pTask->data)->pkg_len);
            if (pClientInfo->total_length < 0)
            {
                logError("file: "__FILE__", line: %d, " \
                    "client ip: %s, pkg length: " \
                    INT64_PRINTF_FORMAT" < 0", \
                    __LINE__, pTask->client_ip, \
                    pClientInfo->total_length);

                task_finish_clean_up(pTask);
                return;
            }

            pClientInfo->total_length += sizeof(TrackerHeader);

            //如果需要接受的數(shù)據(jù)總長(zhǎng)大于pTask的固定長(zhǎng)度閥值,那么暫時(shí)只接受那么長(zhǎng)
            if (pClientInfo->total_length > pTask->size)
            {
                pTask->length = pTask->size;
            }
            else
            {
                pTask->length = pClientInfo->total_length;
            }
        }

        pTask->offset += bytes;

        //接受完了當(dāng)前的包
        if (pTask->offset >= pTask->length) //recv current pkg done
        {
            //略過(guò)先看下面
            if (pClientInfo->total_offset + pTask->length >= \
                    pClientInfo->total_length)
            {
                /* current req recv done */
                pClientInfo->stage = FDFS_STORAGE_STAGE_NIO_SEND;
                pTask->req_count++;
            }
            
            //剛接受了包頭照筑,那么由storage_deal_task分發(fā)任務(wù)
            if (pClientInfo->total_offset == 0)
            {
                pClientInfo->total_offset = pTask->length;
                storage_deal_task(pTask);
            }
            else
            {
                //略過(guò)先看下面
                pClientInfo->total_offset += pTask->length;

                /* continue write to file */
                storage_dio_queue_push(pTask);
            }

            return;
        }
    }

    return;
}

storage_deal_task將上傳請(qǐng)求分發(fā)給storage_upload_file

storage_upload_file注冊(cè)一些基本的函數(shù)而后調(diào)用 storage_write_to_file

static int storage_upload_file(struct fast_task_info *pTask, bool bAppenderFile)
{
    //略過(guò)
    ...

    return storage_write_to_file(pTask, file_offset, file_bytes, \
            p - pTask->data, dio_write_file, \
            storage_upload_file_done_callback, \
            clean_func, store_path_index);
}
static int storage_write_to_file(struct fast_task_info *pTask, \
        const int64_t file_offset, const int64_t upload_bytes, \
        const int buff_offset, TaskDealFunc deal_func, \
        FileDealDoneCallback done_callback, \
        DisconnectCleanFunc clean_func, const int store_path_index)
{
    StorageClientInfo *pClientInfo;
    StorageFileContext *pFileContext;
    int result;

    pClientInfo = (StorageClientInfo *)pTask->arg;
    pFileContext =  &(pClientInfo->file_context);

    pClientInfo->deal_func = deal_func;
    pClientInfo->clean_func = clean_func;

    pFileContext->fd = -1;
    pFileContext->buff_offset = buff_offset;
    pFileContext->offset = file_offset;
    pFileContext->start = file_offset;
    pFileContext->end = file_offset + upload_bytes;
    pFileContext->dio_thread_index = storage_dio_get_thread_index( \
        pTask, store_path_index, pFileContext->op);
    pFileContext->done_callback = done_callback;

    if (pFileContext->calc_crc32)
    {
        pFileContext->crc32 = CRC32_XINIT;
    }

    if (pFileContext->calc_file_hash)
    {
        if (g_file_signature_method == STORAGE_FILE_SIGNATURE_METHOD_HASH)
        {
            INIT_HASH_CODES4(pFileContext->file_hash_codes)
        }
        else
        {
            my_md5_init(&pFileContext->md5_context);
        }
    }

    //將任務(wù)壓入磁盤(pán)隊(duì)列
    if ((result=storage_dio_queue_push(pTask)) != 0)
    {
        pClientInfo->total_length = sizeof(TrackerHeader);
        return result;
    }

    return STORAGE_STATUE_DEAL_FILE;
}

壓入磁盤(pán)隊(duì)列的處理函數(shù)

int storage_dio_queue_push(struct fast_task_info *pTask)
{                       
        StorageClientInfo *pClientInfo;
        StorageFileContext *pFileContext;
        struct storage_dio_context *pContext;
        int result;

        pClientInfo = (StorageClientInfo *)pTask->arg;
        pFileContext = &(pClientInfo->file_context);
        pContext = g_dio_contexts + pFileContext->dio_thread_index;

    //這里為什么要或上這個(gè)呢吹截,因?yàn)樵贚T模式的工作下,client_sock_read會(huì)被不斷的觸發(fā)
    //pTask的數(shù)據(jù)就會(huì)被刷掉了凝危,所以改變當(dāng)前FDFS_STORAGE_STAGE_NIO_RECV的狀態(tài)波俄,讓client_sock_read調(diào)用就被返回
        pClientInfo->stage |= FDFS_STORAGE_STAGE_DIO_THREAD;
        if ((result=task_queue_push(&(pContext->queue), pTask)) != 0)
        {
                add_to_deleted_list(pTask);
                return result;
        }
        
        if ((result=pthread_cond_signal(&(pContext->cond))) != 0)
        {
                logError("file: "__FILE__", line: %d, " \
                        "pthread_cond_signal fail, " \
                        "errno: %d, error info: %s", \
                        __LINE__, result, STRERROR(result));
        
                add_to_deleted_list(pTask);
                return result;
        }

        return 0;
}

下面就是磁盤(pán)線程取task了

static void *dio_thread_entrance(void* arg) 
{
    int result;
    struct storage_dio_context *pContext; 
    struct fast_task_info *pTask;

    pContext = (struct storage_dio_context *)arg; 

    pthread_mutex_lock(&(pContext->lock));
    while (g_continue_flag)
    {
        if ((result=pthread_cond_wait(&(pContext->cond), \
            &(pContext->lock))) != 0)
        {
        logError("file: "__FILE__", line: %d, " \
            "call pthread_cond_wait fail, " \
            "errno: %d, error info: %s", \
            __LINE__, result, STRERROR(result));
        }

        //循環(huán)取隊(duì)列里的任務(wù),執(zhí)行他的deal_func
        while ((pTask=task_queue_pop(&(pContext->queue))) != NULL)
        {
            ((StorageClientInfo *)pTask->arg)->deal_func(pTask);
        }
    }
    pthread_mutex_unlock(&(pContext->lock));

    if ((result=pthread_mutex_lock(&g_dio_thread_lock)) != 0)
    {
        logError("file: "__FILE__", line: %d, " \
            "call pthread_mutex_lock fail, " \
            "errno: %d, error info: %s", \
            __LINE__, result, STRERROR(result));
    }
    g_dio_thread_count--;
    if ((result=pthread_mutex_unlock(&g_dio_thread_lock)) != 0)
    {
        logError("file: "__FILE__", line: %d, " \
            "call pthread_mutex_lock fail, " \
            "errno: %d, error info: %s", \
            __LINE__, result, STRERROR(result));
    }

    logDebug("file: "__FILE__", line: %d, " \
        "dio thread exited, thread count: %d", \
        __LINE__, g_dio_thread_count);

    return NULL;
}

對(duì)于上傳任務(wù)來(lái)說(shuō)媒抠,deal_task實(shí)際上是do_write_file

int dio_write_file(struct fast_task_info *pTask)
{
    StorageClientInfo *pClientInfo;
    StorageFileContext *pFileContext;
    int result;
    int write_bytes;
    char *pDataBuff;

    pClientInfo = (StorageClientInfo *)pTask->arg;
    pFileContext = &(pClientInfo->file_context);
    result = 0;
    do
    {
    if (pFileContext->fd < 0)
    {
        if (pFileContext->extra_info.upload.before_open_callback!=NULL)
        {
            result = pFileContext->extra_info.upload. \
                    before_open_callback(pTask);
            if (result != 0)
            {
                break;
            }
        }

        if ((result=dio_open_file(pFileContext)) != 0)
        {
            break;
        }
    }

    pDataBuff = pTask->data + pFileContext->buff_offset;
    write_bytes = pTask->length - pFileContext->buff_offset;
    if (write(pFileContext->fd, pDataBuff, write_bytes) != write_bytes)
    {
        result = errno != 0 ? errno : EIO;
        logError("file: "__FILE__", line: %d, " \
            "write to file: %s fail, fd=%d, write_bytes=%d, " \
            "errno: %d, error info: %s", \
            __LINE__, pFileContext->filename, \
            pFileContext->fd, write_bytes, \
            result, STRERROR(result));
    }

    pthread_mutex_lock(&g_dio_thread_lock);
    g_storage_stat.total_file_write_count++;
    if (result == 0)
    {
        g_storage_stat.success_file_write_count++;
    }
    pthread_mutex_unlock(&g_dio_thread_lock);

    if (result != 0)
    {
        break;
    }

    if (pFileContext->calc_crc32)
    {
        pFileContext->crc32 = CRC32_ex(pDataBuff, write_bytes, \
                    pFileContext->crc32);
    }

    if (pFileContext->calc_file_hash)
    {
        if (g_file_signature_method == STORAGE_FILE_SIGNATURE_METHOD_HASH)
        {
            CALC_HASH_CODES4(pDataBuff, write_bytes, \
                    pFileContext->file_hash_codes)
        }
        else
        {
            my_md5_update(&pFileContext->md5_context, \
                (unsigned char *)pDataBuff, write_bytes);
        }
    }

    /*
    logInfo("###dio write bytes: %d, pTask->length=%d, buff_offset=%d", \
        write_bytes, pTask->length, pFileContext->buff_offset);
    */

    pFileContext->offset += write_bytes;
    if (pFileContext->offset < pFileContext->end)
    {
        pFileContext->buff_offset = 0;
        storage_nio_notify(pTask);  //notify nio to deal
    }
    else
    {
        if (pFileContext->calc_crc32)
        {
            pFileContext->crc32 = CRC32_FINAL( \
                        pFileContext->crc32);
        }

        if (pFileContext->calc_file_hash)
        {
            if (g_file_signature_method == STORAGE_FILE_SIGNATURE_METHOD_HASH)
            {
                FINISH_HASH_CODES4(pFileContext->file_hash_codes)
            }
            else
            {
                my_md5_final((unsigned char *)(pFileContext-> \
                file_hash_codes), &pFileContext->md5_context);
            }
        }

        if (pFileContext->extra_info.upload.before_close_callback != NULL)
        {
            result = pFileContext->extra_info.upload. \
                    before_close_callback(pTask);
        }

        /* file write done, close it */
        close(pFileContext->fd);
        pFileContext->fd = -1;

        if (pFileContext->done_callback != NULL)
        {
            pFileContext->done_callback(pTask, result);
        }
    }

    return 0;
    } while (0);

    pClientInfo->clean_func(pTask);

    if (pFileContext->done_callback != NULL)
    {
        pFileContext->done_callback(pTask, result);
    }
    return result;
}
pFileContext->done_callback對(duì)應(yīng)了storage_upload_file_done_callback

static void storage_upload_file_done_callback(struct fast_task_info *pTask, \
            const int err_no)
{
    StorageClientInfo *pClientInfo;
    StorageFileContext *pFileContext;
    TrackerHeader *pHeader;
    int result;

    pClientInfo = (StorageClientInfo *)pTask->arg;
    pFileContext =  &(pClientInfo->file_context);

    if (pFileContext->extra_info.upload.file_type & _FILE_TYPE_TRUNK)
    {
        result = trunk_client_trunk_alloc_confirm( \
            &(pFileContext->extra_info.upload.trunk_info), err_no);
        if (err_no != 0)
        {
            result = err_no;
        }
    }
    else
    {
        result = err_no;
    }

    if (result == 0)
    {
        result = storage_service_upload_file_done(pTask);
        if (result == 0)
        {
        if (pFileContext->create_flag & STORAGE_CREATE_FLAG_FILE)
        {
            result = storage_binlog_write(\
                pFileContext->timestamp2log, \
                STORAGE_OP_TYPE_SOURCE_CREATE_FILE, \
                pFileContext->fname2log);
        }
        }
    }

    if (result == 0)
    {
        int filename_len;
        char *p;

        if (pFileContext->create_flag & STORAGE_CREATE_FLAG_FILE)
        {
            CHECK_AND_WRITE_TO_STAT_FILE3_WITH_BYTES( \
                g_storage_stat.total_upload_count, \
                g_storage_stat.success_upload_count, \
                g_storage_stat.last_source_update, \
                g_storage_stat.total_upload_bytes, \
                g_storage_stat.success_upload_bytes, \
                pFileContext->end - pFileContext->start)
        }

        filename_len = strlen(pFileContext->fname2log);
        pClientInfo->total_length = sizeof(TrackerHeader) + \
                    FDFS_GROUP_NAME_MAX_LEN + filename_len;
        p = pTask->data + sizeof(TrackerHeader);
        memcpy(p, pFileContext->extra_info.upload.group_name, \
            FDFS_GROUP_NAME_MAX_LEN);
        p += FDFS_GROUP_NAME_MAX_LEN;
        memcpy(p, pFileContext->fname2log, filename_len);
    }
    else
    {
        pthread_mutex_lock(&stat_count_thread_lock);
        if (pFileContext->create_flag & STORAGE_CREATE_FLAG_FILE)
        {
            g_storage_stat.total_upload_count++;
            g_storage_stat.total_upload_bytes += \
                pClientInfo->total_offset;
        }
        pthread_mutex_unlock(&stat_count_thread_lock);

        pClientInfo->total_length = sizeof(TrackerHeader);
    }

    STORAGE_ACCESS_LOG(pTask, ACCESS_LOG_ACTION_UPLOAD_FILE, result);

    pClientInfo->total_offset = 0;
    pTask->length = pClientInfo->total_length;

    pHeader = (TrackerHeader *)pTask->data;
    pHeader->status = result;
    pHeader->cmd = STORAGE_PROTO_CMD_RESP;
    long2buff(pClientInfo->total_length - sizeof(TrackerHeader), \
            pHeader->pkg_len);

    //又看到熟悉的函數(shù)了弟断,這完成以后將pTask從磁盤(pán)線程壓入work線程
    //work線程調(diào)用storage_recv_notify_read函數(shù)來(lái)做下一步處理
    storage_nio_notify(pTask);
}
void storage_recv_notify_read(int sock, short event, void *arg)
{
    //前文已有,略過(guò)
    ...
        //剛從磁盤(pán)線程里出來(lái)的任務(wù)狀態(tài)依然是dio_thread,去掉dio_thread狀態(tài)
        if (pClientInfo->stage & FDFS_STORAGE_STAGE_DIO_THREAD)
                {
                        pClientInfo->stage &= ~FDFS_STORAGE_STAGE_DIO_THREAD;
                }
        switch (pClientInfo->stage)
        {
            //前文已有趴生,略過(guò)
            ...
            case FDFS_STORAGE_STAGE_NIO_RECV:
                pTask->offset = 0;
                remain_bytes = pClientInfo->total_length - \
                           pClientInfo->total_offset;
                if (remain_bytes > pTask->size)
                {
                    pTask->length = pTask->size;
                }
                else
                {
                    pTask->length = remain_bytes;
                }

                if (set_recv_event(pTask) == 0)
                {
                    client_sock_read(pTask->event.fd,
                        IOEVENT_READ, pTask);
                }
                result = 0;
                break;
            case FDFS_STORAGE_STAGE_NIO_SEND:
                result = storage_send_add_event(pTask);
                break;
            case FDFS_STORAGE_STAGE_NIO_CLOSE:
                result = EIO;   //close this socket
                break;
            default:
                logError("file: "__FILE__", line: %d, " \
                    "invalid stage: %d", __LINE__, \
                    pClientInfo->stage);
                result = EINVAL;
                break;
        }

        if (result != 0)
        {
            add_to_deleted_list(pTask);
        }
}
調(diào)用了client_sock_read函數(shù)進(jìn)行處理

static void client_sock_read(int sock, short event, void *arg)
{
    //前文已有阀趴,略
    ...
        pTask->offset += bytes;
        if (pTask->offset >= pTask->length) //recv current pkg done
        {
            //這個(gè)req接受完畢昏翰,準(zhǔn)備反饋rsp
            if (pClientInfo->total_offset + pTask->length >= \
                    pClientInfo->total_length)
            {
                /* current req recv done */
                pClientInfo->stage = FDFS_STORAGE_STAGE_NIO_SEND;
                pTask->req_count++;
            }

            if (pClientInfo->total_offset == 0)
            {
                pClientInfo->total_offset = pTask->length;
                storage_deal_task(pTask);
            }
            else
            {
                //接受的是數(shù)據(jù)包,壓入磁盤(pán)線程
                pClientInfo->total_offset += pTask->length;

                /* continue write to file */
                storage_dio_queue_push(pTask);
            }

            return;
        }
    
    return;
}

數(shù)據(jù)包的網(wǎng)絡(luò)接收和磁盤(pán)的處理成為一個(gè)環(huán)刘急,接收完一部分棚菊,通過(guò)隊(duì)列壓入磁盤(pán)隊(duì)列,磁盤(pán)線程處理完以后又通過(guò)像工作線程的fd進(jìn)行寫(xiě)叔汁,觸發(fā)網(wǎng)絡(luò)線程讀取這個(gè)task统求。自此源源不斷將數(shù)據(jù)傳過(guò)來(lái)。

總結(jié):
還是上圖吧据块,整個(gè)處理流程如下圖

fastdfs storage流程分析圖

1 client發(fā)出請(qǐng)求码邻,accept線程catch到描述符,初始化pTask結(jié)構(gòu)另假,填入描述符像屋,然后將pTask通過(guò)管道給work_entrance
2 進(jìn)入storage_recv_notify_read函數(shù)
3 根據(jù)當(dāng)前的pTask->stage等于FDFS_STORAGE_STAGE_INIT為fd創(chuàng)建讀事件,綁定函數(shù)client_sock_read
4 調(diào)用storage_upload_file
5 storage_upload_file調(diào)用storage_write_to_file
6 storage_write_to_file調(diào)用壓磁盤(pán)隊(duì)列函數(shù)storage_dio_queue_push
7 storage_dio_queue_push將pTask->stage
= FDFS_STORAGE_STAGE_DIO_THREAD

8 根據(jù)事件觸發(fā)機(jī)制边篮,client_sock_read將被不斷的調(diào)用己莺,然而由于pTask->stage != FDFS_STORAGE_STAGE_RECV,所以返回
9 磁盤(pán)線程通過(guò)隊(duì)列取pTask戈轿,調(diào)用pTask的處理函數(shù)dio_write_file
10 調(diào)用storage_upload_file_done_callback凌受,調(diào)用storage_nio_notify,通過(guò)管道的形式將pTask壓入工作進(jìn)程
11 觸發(fā)storage_recv_notify_read思杯,將task->stage的FDFS_STORAGE_STAGE_DIO_THREAD標(biāo)志去除
12 根據(jù)task->stage的FDFS_STORAGE_STAGE_RECV狀態(tài)胜蛉,調(diào)用函數(shù)client_sock_read
13 client_sock_read讀取完以后調(diào)用磁盤(pán)隊(duì)列函數(shù)storage_dio_queue_push
14 重復(fù)7
15 直到結(jié)束
一次上傳邏輯分析完成
另外pTask的大小是在配置文件里指定的,默認(rèn)256KB智蝠,補(bǔ)充說(shuō)明一下
每個(gè)連接只提供一個(gè)pTask來(lái)做數(shù)據(jù)接受和寫(xiě)腾么,猜測(cè)是怕大并發(fā)占用太多的系統(tǒng)內(nèi)存吧奈梳。
比如1W并發(fā)下杈湾,256K的pTask大致是存在1W個(gè),也就是2.5G左右內(nèi)存
我以前自己寫(xiě)的那個(gè)分布式文件系統(tǒng)也是這個(gè)串行化的邏輯攘须,因?yàn)檫@樣開(kāi)發(fā)簡(jiǎn)單有效漆撞。
有一點(diǎn)不足,我以前把數(shù)據(jù)壓入磁盤(pán)IO后于宙,我就刪除了這個(gè)事件浮驳,等到磁盤(pán)線程讀寫(xiě)完畢,我再建立這個(gè)事件捞魁。
通過(guò)判斷pTask->stage的狀態(tài)來(lái)暫時(shí)忽略回調(diào)的至会,這樣在邏輯上比較好,畢竟有事件發(fā)生了就要去處理谱俭,刪掉了始終不是什么好辦法奉件。
未完待續(xù)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末宵蛀,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子县貌,更是在濱河造成了極大的恐慌术陶,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,470評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件煤痕,死亡現(xiàn)場(chǎng)離奇詭異梧宫,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)摆碉,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,393評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門(mén)塘匣,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人巷帝,你說(shuō)我怎么就攤上這事馆铁。” “怎么了锅睛?”我有些...
    開(kāi)封第一講書(shū)人閱讀 162,577評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵埠巨,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我现拒,道長(zhǎng)辣垒,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,176評(píng)論 1 292
  • 正文 為了忘掉前任印蔬,我火速辦了婚禮勋桶,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘侥猬。我一直安慰自己例驹,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,189評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布退唠。 她就那樣靜靜地躺著鹃锈,像睡著了一般。 火紅的嫁衣襯著肌膚如雪瞧预。 梳的紋絲不亂的頭發(fā)上屎债,一...
    開(kāi)封第一講書(shū)人閱讀 51,155評(píng)論 1 299
  • 那天,我揣著相機(jī)與錄音垢油,去河邊找鬼盆驹。 笑死,一個(gè)胖子當(dāng)著我的面吹牛滩愁,可吹牛的內(nèi)容都是我干的躯喇。 我是一名探鬼主播,決...
    沈念sama閱讀 40,041評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼硝枉,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼廉丽!你這毒婦竟也來(lái)了秸讹?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 38,903評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤雅倒,失蹤者是張志新(化名)和其女友劉穎璃诀,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體蔑匣,經(jīng)...
    沈念sama閱讀 45,319評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡劣欢,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,539評(píng)論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了裁良。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片凿将。...
    茶點(diǎn)故事閱讀 39,703評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖价脾,靈堂內(nèi)的尸體忽然破棺而出牧抵,到底是詐尸還是另有隱情,我是刑警寧澤侨把,帶...
    沈念sama閱讀 35,417評(píng)論 5 343
  • 正文 年R本政府宣布犀变,位于F島的核電站,受9級(jí)特大地震影響秋柄,放射性物質(zhì)發(fā)生泄漏获枝。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,013評(píng)論 3 325
  • 文/蒙蒙 一骇笔、第九天 我趴在偏房一處隱蔽的房頂上張望省店。 院中可真熱鬧,春花似錦笨触、人聲如沸懦傍。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,664評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)粗俱。三九已至,卻和暖如春持寄,著一層夾襖步出監(jiān)牢的瞬間源梭,已是汗流浹背娱俺。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,818評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工稍味, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人荠卷。 一個(gè)月前我還...
    沈念sama閱讀 47,711評(píng)論 2 368
  • 正文 我出身青樓模庐,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親油宜。 傳聞我的和親對(duì)象是個(gè)殘疾皇子掂碱,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,601評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容