DPDK 無鎖ring

本文整理下之前的學習筆記蝙场,基于DPDK17.11版本源碼甚疟,主要分析無鎖隊列ring的實現(xiàn)哈误。

rte_ring_tailq保存rte_ring鏈表

創(chuàng)建ring后會將其插入共享內(nèi)存鏈表rte_ring_tailq颤绕,以便主從進程都可以訪問。

//定義隊列頭結(jié)構(gòu) struct rte_tailq_elem_head
TAILQ_HEAD(rte_tailq_elem_head, rte_tailq_elem);

//聲明全局變量rte_tailq_elem_head慨丐,類型為struct rte_tailq_elem_head坡脐,
//相當于是鏈表頭,用來保存本進程注冊的隊列
/* local tailq list */
static struct rte_tailq_elem_head rte_tailq_elem_head =
    TAILQ_HEAD_INITIALIZER(rte_tailq_elem_head);

//調(diào)用EAL_REGISTER_TAILQ在main函數(shù)前注冊rte_ring_tailq到全局變量rte_tailq_elem_head房揭。
#define RTE_TAILQ_RING_NAME "RTE_RING"
static struct rte_tailq_elem rte_ring_tailq = {
    .name = RTE_TAILQ_RING_NAME,
};
EAL_REGISTER_TAILQ(rte_ring_tailq)

調(diào)用rte_eal_tailq_update遍歷鏈表rte_tailq_elem_head上的節(jié)點备闲,將節(jié)點中的head指向 struct rte_mem_ring->tailq_head[]數(shù)組中的一個tailq_head晌端,此head又作為另一個鏈表頭。比如注冊的rte_ring_tailq節(jié)點恬砂,其head專門用來保存創(chuàng)建的rte_ring(將rte_ring作為struct rte_tailq_entry的data咧纠,將struct rte_tailq_entry插入head)。前面說過struct rte_mem_ring->tailq_head存放在共享內(nèi)存中泻骤,主從進程都可以訪問惧盹,這樣對于rte_ring來說,主從進程都可以創(chuàng)建/訪問ring瞪讼。

相關(guān)的數(shù)據(jù)結(jié)構(gòu)如下圖所示


image.png

創(chuàng)建ring

調(diào)用函數(shù)rte_ring_create創(chuàng)建ring,它會申請一塊memzone的內(nèi)存粹断,大小為struct rte_ring結(jié)構(gòu)加上count個void類型指針符欠,內(nèi)存結(jié)構(gòu)如下


image.png

然后將ring中生產(chǎn)者和消費者的頭尾指向0,最后將ring作為struct rte_tailq_entry的data插入共享內(nèi)存鏈表瓶埋,這樣主從進程都可以訪問此ring希柿。

/**
 * An RTE ring structure.
 *
 * The producer and the consumer have a head and a tail index. The particularity
 * of these index is that they are not between 0 and size(ring). These indexes
 * are between 0 and 2^32, and we mask their value when we access the ring[]
 * field. Thanks to this assumption, we can do subtractions between 2 index
 * values in a modulo-32bit base: that's why the overflow of the indexes is not
 * a problem.
 */
struct rte_ring {
    /*
     * Note: this field kept the RTE_MEMZONE_NAMESIZE size due to ABI
     * compatibility requirements, it could be changed to RTE_RING_NAMESIZE
     * next time the ABI changes
     */
    char name[RTE_MEMZONE_NAMESIZE] __rte_cache_aligned; /**< Name of the ring. */
    //flags有如下三個值:
    //RING_F_SP_ENQ創(chuàng)建單生產(chǎn)者,
    //RING_F_SC_DEQ創(chuàng)建單消費者养筒,
    //RING_F_EXACT_SZ
    int flags;               /**< Flags supplied at creation. */
    //memzone內(nèi)存管理的底層結(jié)構(gòu)曾撤,用來分配內(nèi)存
    const struct rte_memzone *memzone;
            /**< Memzone, if any, containing the rte_ring */
    //size為ring大小,值和RING_F_EXACT_SZ有關(guān)晕粪,如果指定了flag     
    //RING_F_EXACT_SZ挤悉,則size為rte_ring_create的參數(shù)count的
    //向上取2次方,比如count為15巫湘,則size就為16装悲。如果沒有指定
    //flag,則count必須是2的次方尚氛,此時size等于count
    uint32_t size;           /**< Size of ring. */
    //mask值為size-1
    uint32_t mask;           /**< Mask (size-1) of ring. */
    //capacity的值也和RING_F_EXACT_SZ有關(guān)诀诊,如果指定了,
    //則capacity為rte_ring_create的參數(shù)count阅嘶,如果沒指定属瓣,
    //則capacity為size-1
    uint32_t capacity;       /**< Usable size of ring */

    //生產(chǎn)者位置,包含head和tail讯柔,head代表著下一次生產(chǎn)時的起
    //始位置抡蛙。tail代表消費者可以消費的位置界限,到達tail后就無  
    //法繼續(xù)消費磷杏,通常情況下生產(chǎn)完成后tail = head溜畅,意味著剛生
    //產(chǎn)的元素皆可以被消費
    /** Ring producer status. */
    struct rte_ring_headtail prod __rte_aligned(PROD_ALIGN);

    // 消費者位置,也包含head和tail极祸,head代表著下一次消費時的
    //起始位置慈格。tail代表生產(chǎn)者可以生產(chǎn)的位置界限怠晴,到達tail后就
    //無法繼續(xù)生產(chǎn),通常情況下消費完成后浴捆,tail =head蒜田,意味著
    //剛消費的位置皆可以被生產(chǎn)
    /** Ring consumer status. */
    struct rte_ring_headtail cons __rte_aligned(CONS_ALIGN);
};

下面看一下在函數(shù)rte_ring_create中ring是如何被創(chuàng)建的。

/* create the ring */
struct rte_ring *
rte_ring_create(const char *name, unsigned count, int socket_id, unsigned flags)
{
    char mz_name[RTE_MEMZONE_NAMESIZE];
    struct rte_ring *r;
    struct rte_tailq_entry *te;
    const struct rte_memzone *mz;
    ssize_t ring_size;
    int mz_flags = 0;
    struct rte_ring_list* ring_list = NULL;
    const unsigned int requested_count = count;
    int ret;

    //(tailq_entry)->tailq_head 的類型應(yīng)該是 struct rte_tailq_entry_head选泻,
    //但是返回的卻是 struct rte_ring_list冲粤,因為 rte_tailq_entry_head 和 rte_ring_list 定義都是一樣的,
    //可以認為是等同的页眯。
    #define RTE_TAILQ_CAST(tailq_entry, struct_name) \
        (struct struct_name *)&(tailq_entry)->tailq_head
    ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list);

    /* for an exact size ring, round up from count to a power of two */
    if (flags & RING_F_EXACT_SZ)
        count = rte_align32pow2(count + 1);

    //獲取需要的內(nèi)存大小梯捕,包括結(jié)構(gòu)體 struct rte_ring 和 count 個指針
    ring_size = rte_ring_get_memsize(count);
        ssize_t sz;
        sz = sizeof(struct rte_ring) + count * sizeof(void *);
        sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
    
    #define RTE_RING_MZ_PREFIX "RG_"
    snprintf(mz_name, sizeof(mz_name), "%s%s", RTE_RING_MZ_PREFIX, name);

    //分配 struct rte_tailq_entry,用來將申請的ring掛到共享鏈表ring_list中
    te = rte_zmalloc("RING_TAILQ_ENTRY", sizeof(*te), 0);

    rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);

    //申請memzone窝撵,
    /* reserve a memory zone for this ring. If we can't get rte_config or
     * we are secondary process, the memzone_reserve function will set
     * rte_errno for us appropriately - hence no check in this this function */
    mz = rte_memzone_reserve_aligned(mz_name, ring_size, socket_id, mz_flags, __alignof__(*r));
    if (mz != NULL) {
        //memzone的的addr指向分配的內(nèi)存傀顾,ring也從此內(nèi)存開始
        r = mz->addr;
        /* no need to check return value here, we already checked the
         * arguments above */
        rte_ring_init(r, name, requested_count, flags);

        //將ring保存到鏈表entry中
        te->data = (void *) r;
        r->memzone = mz;

        //將鏈表entry插入鏈表ring_list
        TAILQ_INSERT_TAIL(ring_list, te, next);
    } else {
        r = NULL;
        RTE_LOG(ERR, RING, "Cannot reserve memory\n");
        rte_free(te);
    }
    rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);

    return r;
}

int
rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
    unsigned flags)
{
    int ret;

    /* compilation-time checks */
    RTE_BUILD_BUG_ON((sizeof(struct rte_ring) &
              RTE_CACHE_LINE_MASK) != 0);
    RTE_BUILD_BUG_ON((offsetof(struct rte_ring, cons) &
              RTE_CACHE_LINE_MASK) != 0);
    RTE_BUILD_BUG_ON((offsetof(struct rte_ring, prod) &
              RTE_CACHE_LINE_MASK) != 0);

    /* init the ring structure */
    memset(r, 0, sizeof(*r));
    ret = snprintf(r->name, sizeof(r->name), "%s", name);
    if (ret < 0 || ret >= (int)sizeof(r->name))
        return -ENAMETOOLONG;
    r->flags = flags;
    r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;
    r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;

    if (flags & RING_F_EXACT_SZ) {
        r->size = rte_align32pow2(count + 1);
        r->mask = r->size - 1;
        r->capacity = count;
    } else {
        if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK)) {
            RTE_LOG(ERR, RING,
                "Requested size is invalid, must be power of 2, and not exceed the size limit %u\n",
                RTE_RING_SZ_MASK);
            return -EINVAL;
        }
        r->size = count;
        r->mask = count - 1;
        r->capacity = r->mask;
    }
    //初始時,生產(chǎn)者和消費者的首尾都為0
    r->prod.head = r->cons.head = 0;
    r->prod.tail = r->cons.tail = 0;

    return 0;
}

入隊操作

DPDK提供了如下幾個api用來執(zhí)行入隊操作碌奉,它們最終都會調(diào)用__rte_ring_do_enqueue來實現(xiàn)短曾,所以重點分析函數(shù)__rte_ring_do_enqueue。

//多生產(chǎn)者批量入隊赐劣。入隊個數(shù)n必須全部成功嫉拐,否則入隊失敗。調(diào)用者明確知道是多生產(chǎn)者
rte_ring_mp_enqueue_bulk
//單生產(chǎn)者批量入隊魁兼。入隊個數(shù)n必須全部成功婉徘,否則入隊失敗。調(diào)用者明確知道是單生產(chǎn)者
rte_ring_sp_enqueue_bulk
//批量入隊咐汞。入隊個數(shù)n必須全部成功判哥,否則入隊失敗。調(diào)用者不用關(guān)心是不是單生產(chǎn)者
rte_ring_enqueue_bulk
//多生產(chǎn)者批量入隊碉考。入隊個數(shù)n不一定全部成功塌计。調(diào)用者明確知道是多生產(chǎn)者
rte_ring_mp_enqueue_burst
//單生產(chǎn)者批量入隊。入隊個數(shù)n不一定全部成功侯谁。調(diào)用者明確知道是單生產(chǎn)者
rte_ring_sp_enqueue_burst
//批量入隊锌仅。入隊個數(shù)n不一定全部成功。調(diào)用者不用關(guān)心是不是單生產(chǎn)者
rte_ring_enqueue_burst

__rte_ring_do_enqueue主要做了三個事情:
a. 移動生產(chǎn)者head墙贱,此處在多生產(chǎn)者下可能會有沖突热芹,需要使用cas操作循環(huán)檢測,只有自己能移動head時才行惨撇。
b. 執(zhí)行入隊操作伊脓,將obj插入ring,從老的head開始魁衙,直到新head結(jié)束报腔。
c. 更新生產(chǎn)者tail株搔,只有這樣消費者才能看到最新的消費對象。

其參數(shù)r指定了目標ring纯蛾。
參數(shù)obj_table指定了入隊對象纤房。
參數(shù)n指定了入隊對象個數(shù)。
參數(shù)behavior指定了入隊行為翻诉,有兩個值RTE_RING_QUEUE_FIXED和RTE_RING_QUEUE_VARIABLE炮姨,前者表示入隊對象必須一次性全部成功,后者表示盡可能多的入隊碰煌。
參數(shù)is_sp指定了是否為單生產(chǎn)者模式舒岸,默認為多生產(chǎn)者模式。

static __rte_always_inline unsigned int
__rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
         unsigned int n, enum rte_ring_queue_behavior behavior,
         int is_sp, unsigned int *free_space)
{
    uint32_t prod_head, prod_next;
    uint32_t free_entries;

    //先移動生產(chǎn)者的頭指針芦圾,prod_head保存移動前的head吁津,prod_next保存移動后的head
    n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
            &prod_head, &prod_next, &free_entries);
    if (n == 0)
        goto end;

    //&r[1]指向存放對象的內(nèi)存。
    //從prod_head開始堕扶,將n個對象obj_table插入ring的prod_head位置
    ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
    rte_smp_wmb();

    //更新生產(chǎn)者tail
    update_tail(&r->prod, prod_head, prod_next, is_sp);
end:
    if (free_space != NULL)
        *free_space = free_entries - n;
    return n;
}

__rte_ring_move_prod_head用來使用cas操作更新生產(chǎn)者head。

static __rte_always_inline unsigned int
__rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
        unsigned int n, enum rte_ring_queue_behavior behavior,
        uint32_t *old_head, uint32_t *new_head,
        uint32_t *free_entries)
{
    const uint32_t capacity = r->capacity;
    unsigned int max = n;
    int success;

    do {
        /* Reset n to the initial burst count */
        n = max;

        //獲取生產(chǎn)者當前的head位置
        *old_head = r->prod.head;

        /* add rmb barrier to avoid load/load reorder in weak
         * memory model. It is noop on x86
         */
        rte_smp_rmb();

        const uint32_t cons_tail = r->cons.tail;
        /*
         *  The subtraction is done between two unsigned 32bits value
         * (the result is always modulo 32 bits even if we have
         * *old_head > cons_tail). So 'free_entries' is always between 0
         * and capacity (which is < size).
         */
        //獲取空閑 entry 個數(shù)
        *free_entries = (capacity + cons_tail - *old_head);

        //如果入隊的對象個數(shù)大于空閑entry個數(shù)梭依,則如果入隊要求固定大小稍算,則入隊失敗,返回0役拴,否則
        //只入隊空閑entry個數(shù)的對象
        /* check that we have enough room in ring */
        if (unlikely(n > *free_entries))
            n = (behavior == RTE_RING_QUEUE_FIXED) ?
                    0 : *free_entries;

        if (n == 0)
            return 0;

        //當前head位置加上入隊對象個數(shù)獲取新的生產(chǎn)者head
        *new_head = *old_head + n;
        //如果是單生產(chǎn)者糊探,直接更新生產(chǎn)者head,并返回1
        if (is_sp)
            r->prod.head = *new_head, success = 1;
        else //如果是多生產(chǎn)者河闰,需要借助函數(shù)rte_atomic32_cmpset科平,比較old_head和r->prod.head是否相同,
             //如果相同姜性,則將r->prod.head更新為new_head瞪慧,并返回1,退出循環(huán)部念,
             //如果不相同說明有其他生產(chǎn)者更新head了弃酌,返回0,繼續(xù)循環(huán)儡炼。
            success = rte_atomic32_cmpset(&r->prod.head,
                    *old_head, *new_head);
    } while (unlikely(success == 0));
    return n;
}

ENQUEUE_PTRS定義了入隊操作妓湘。

/* the actual enqueue of pointers on the ring.
 * Placed here since identical code needed in both
 * single and multi producer enqueue functions */
#define ENQUEUE_PTRS(r, ring_start, prod_head, obj_table, n, obj_type) do { \
    unsigned int i; \
    const uint32_t size = (r)->size; \
    uint32_t idx = prod_head & (r)->mask; \
    obj_type *ring = (obj_type *)ring_start; \
    //idx+n 大于 size,說明入隊n個對象后乌询,ring還沒滿榜贴,還沒翻轉(zhuǎn)
    if (likely(idx + n < size)) { \
        //一次循環(huán)入隊四個對象
        for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \
            ring[idx] = obj_table[i]; \
            ring[idx+1] = obj_table[i+1]; \
            ring[idx+2] = obj_table[i+2]; \
            ring[idx+3] = obj_table[i+3]; \
        } \
        //還有剩余不滿四個對象,則在switch里入隊
        switch (n & 0x3) { \
        case 3: \
            ring[idx++] = obj_table[i++]; /* fallthrough */ \
        case 2: \
            ring[idx++] = obj_table[i++]; /* fallthrough */ \
        case 1: \
            ring[idx++] = obj_table[i++]; \
        } \
    } else { \
        //入隊n個對象妹田,會導致ring滿唬党,發(fā)生翻轉(zhuǎn)鹃共,
        //則先入隊idx到size的位置,
        for (i = 0; idx < size; i++, idx++)\
            ring[idx] = obj_table[i]; \
        //再翻轉(zhuǎn)回到ring起始位置初嘹,入隊剩余的對象
        for (idx = 0; i < n; i++, idx++) \
            ring[idx] = obj_table[i]; \
    } \
} while (0)

最后更新生產(chǎn)者tail及汉。

static __rte_always_inline void
update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val,
        uint32_t single)
{
    /*
     * If there are other enqueues/dequeues in progress that preceded us,
     * we need to wait for them to complete
     */
    if (!single)
        //多生產(chǎn)者時,必須等到其他生產(chǎn)者入隊成功屯烦,再更新自己的tail
        while (unlikely(ht->tail != old_val))
            rte_pause();

    ht->tail = new_val;
}

出隊操作

DPDK提供了如下幾個api用來執(zhí)行出隊操作坷随,它們最終都會調(diào)用__rte_ring_do_dequeue來實現(xiàn),所以重點分析函數(shù)__rte_ring_do_dequeue驻龟。

//多消費者批量出隊温眉。出隊個數(shù)n必須全部成功,否則出隊失敗翁狐。調(diào)用者明確知道是多消費者
rte_ring_mc_dequeue_bulk
//單消費者批量出隊类溢。出隊個數(shù)n必須全部成功,否則出隊失敗露懒。調(diào)用者明確知道是單消費者
rte_ring_sc_dequeue_bulk
//批量出隊闯冷。出隊個數(shù)n必須全部成功,否則出隊失敗懈词。調(diào)用者不用關(guān)心是不是單消費者
rte_ring_dequeue_bulk
//多消費者批量出隊蛇耀。出隊個數(shù)n不一定全部成功。調(diào)用者明確知道是多消費者
rte_ring_mc_dequeue_burst
//單消費者批量出隊坎弯。出隊個數(shù)n不一定全部成功纺涤。調(diào)用者明確知道是單消費者
rte_ring_sc_dequeue_burst
//批量出隊。出隊個數(shù)n不一定全部成功抠忘。調(diào)用者不用關(guān)心是不是單消費者
rte_ring_dequeue_burst

__rte_ring_do_dequeue主要做了三個事情:
a. 移動消費者head撩炊,此處在多消費者下可能會有沖突,需要使用cas操作循環(huán)檢測崎脉,只有自己能移動head時才行拧咳。
b. 執(zhí)行出隊操作,將ring中的obj插入obj_table囚灼,從老的head開始呛踊,直到新head結(jié)束。
c. 更新消費者tail啦撮,只有這樣生成者才能進行生產(chǎn)谭网。

其參數(shù)r指定了目標ring。
參數(shù)obj_table指定了出隊對象出隊后存放位置赃春。
參數(shù)n指定了入隊對象個數(shù)愉择。
參數(shù)behavior指定了出隊行為,有兩個值RTE_RING_QUEUE_FIXED和RTE_RING_QUEUE_VARIABLE,前者表示出隊對象必須一次性全部成功锥涕,后者表示盡可能多的出隊衷戈。
參數(shù)is_sp指定了是否為單消費者模式,默認為多消費者模式层坠。

static __rte_always_inline unsigned int
__rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,
         unsigned int n, enum rte_ring_queue_behavior behavior,
         int is_sc, unsigned int *available)
{
    uint32_t cons_head, cons_next;
    uint32_t entries;

    //先移動消費者head殖妇,成功后,cons_head為老的head破花,cons_next為新的head谦趣,
    //兩者之間的部分為此次可消費的對象
    n = __rte_ring_move_cons_head(r, is_sc, n, behavior,
            &cons_head, &cons_next, &entries);
    if (n == 0)
        goto end;

    //執(zhí)行出隊操作,從老的cons_head開始出隊n個對象
    DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
    rte_smp_rmb();

    //更新消費者tail座每,和前面更新生產(chǎn)者head代碼相同
    update_tail(&r->cons, cons_head, cons_next, is_sc);

end:
    if (available != NULL)
        *available = entries - n;
    return n;
}

__rte_ring_move_cons_head用來使用cas操作更新消費者head前鹅。

static __rte_always_inline unsigned int
__rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
        unsigned int n, enum rte_ring_queue_behavior behavior,
        uint32_t *old_head, uint32_t *new_head,
        uint32_t *entries)
{
    unsigned int max = n;
    int success;

    /* move cons.head atomically */
    do {
        /* Restore n as it may change every loop */
        n = max;

        //取出當前head位置
        *old_head = r->cons.head;

        /* add rmb barrier to avoid load/load reorder in weak
         * memory model. It is noop on x86
         */
        rte_smp_rmb();

        //生產(chǎn)者tail減去消費者head為可消費的對象個數(shù)。
        //因為head和tail都是無符號32位類型峭梳,即使生產(chǎn)者tail比消費者head
        //小舰绘,也能正確得出結(jié)果,不用擔心溢出葱椭。
        const uint32_t prod_tail = r->prod.tail;
        /* The subtraction is done between two unsigned 32bits value
         * (the result is always modulo 32 bits even if we have
         * cons_head > prod_tail). So 'entries' is always between 0
         * and size(ring)-1. */
        *entries = (prod_tail - *old_head);

        //要求出隊對象個數(shù)大于實際可消費對象個數(shù)
        /* Set the actual entries for dequeue */
        if (n > *entries)
            //此時如果behavior為RTE_RING_QUEUE_FIXED捂寿,表示必須滿足n,滿足不了就一個都不出隊孵运,返回0秦陋,
            //如果不為RTE_RING_QUEUE_FIXED,則盡可能多的出隊
            n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;

        if (unlikely(n == 0))
            return 0;
        
        //當前head加上n即為新的消費者head
        *new_head = *old_head + n;
        if (is_sc)
            //如果單消費者掐松,直接更新head即可,返回1
            r->cons.head = *new_head, success = 1;
        else
            //多消費者粪小,需要借用rte_atomic32_cmpset更新head
            success = rte_atomic32_cmpset(&r->cons.head, *old_head,
                    *new_head);
    } while (unlikely(success == 0));
    return n;
}

ring是否滿或者是否為空

函數(shù)rte_ring_full用來判斷ring是否滿
static inline int
rte_ring_full(const struct rte_ring *r)
{
    return rte_ring_free_count(r) == 0;
}

static inline unsigned
rte_ring_free_count(const struct rte_ring *r)
{
    return r->capacity - rte_ring_count(r);
}

函數(shù)rte_ring_empty用來判斷ring是否為空

static inline int
rte_ring_empty(const struct rte_ring *r)
{
    return rte_ring_count(r) == 0;
}

判斷ring是否為空或者是否滿都需要調(diào)用rte_ring_count獲取當前ring中已使用的個數(shù)大磺。

static inline unsigned
rte_ring_count(const struct rte_ring *r)
{
    uint32_t prod_tail = r->prod.tail;
    uint32_t cons_tail = r->cons.tail;
    uint32_t count = (prod_tail - cons_tail) & r->mask;
    return (count > r->capacity) ? r->capacity : count;
}

參考

http://doc.dpdk.org/guides/prog_guide/ring_lib.html
https://www.cnblogs.com/jungle1996/p/12194243.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市探膊,隨后出現(xiàn)的幾起案子杠愧,更是在濱河造成了極大的恐慌,老刑警劉巖逞壁,帶你破解...
    沈念sama閱讀 216,402評論 6 499
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件流济,死亡現(xiàn)場離奇詭異,居然都是意外死亡腌闯,警方通過查閱死者的電腦和手機绳瘟,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,377評論 3 392
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來姿骏,“玉大人糖声,你說我怎么就攤上這事。” “怎么了蘸泻?”我有些...
    開封第一講書人閱讀 162,483評論 0 353
  • 文/不壞的土叔 我叫張陵琉苇,是天一觀的道長。 經(jīng)常有香客問我悦施,道長并扇,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,165評論 1 292
  • 正文 為了忘掉前任抡诞,我火速辦了婚禮穷蛹,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘沐绒。我一直安慰自己俩莽,他們只是感情好,可當我...
    茶點故事閱讀 67,176評論 6 388
  • 文/花漫 我一把揭開白布乔遮。 她就那樣靜靜地躺著扮超,像睡著了一般。 火紅的嫁衣襯著肌膚如雪蹋肮。 梳的紋絲不亂的頭發(fā)上出刷,一...
    開封第一講書人閱讀 51,146評論 1 297
  • 那天,我揣著相機與錄音坯辩,去河邊找鬼馁龟。 笑死,一個胖子當著我的面吹牛漆魔,可吹牛的內(nèi)容都是我干的坷檩。 我是一名探鬼主播,決...
    沈念sama閱讀 40,032評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼改抡,長吁一口氣:“原來是場噩夢啊……” “哼矢炼!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起阿纤,我...
    開封第一講書人閱讀 38,896評論 0 274
  • 序言:老撾萬榮一對情侶失蹤句灌,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后欠拾,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體胰锌,經(jīng)...
    沈念sama閱讀 45,311評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,536評論 2 332
  • 正文 我和宋清朗相戀三年藐窄,在試婚紗的時候發(fā)現(xiàn)自己被綠了资昧。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,696評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡荆忍,死狀恐怖榛搔,靈堂內(nèi)的尸體忽然破棺而出诺凡,到底是詐尸還是另有隱情,我是刑警寧澤践惑,帶...
    沈念sama閱讀 35,413評論 5 343
  • 正文 年R本政府宣布腹泌,位于F島的核電站,受9級特大地震影響尔觉,放射性物質(zhì)發(fā)生泄漏凉袱。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,008評論 3 325
  • 文/蒙蒙 一侦铜、第九天 我趴在偏房一處隱蔽的房頂上張望专甩。 院中可真熱鬧,春花似錦钉稍、人聲如沸涤躲。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽种樱。三九已至,卻和暖如春俊卤,著一層夾襖步出監(jiān)牢的瞬間嫩挤,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,815評論 1 269
  • 我被黑心中介騙來泰國打工消恍, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留岂昭,地道東北人。 一個月前我還...
    沈念sama閱讀 47,698評論 2 368
  • 正文 我出身青樓狠怨,卻偏偏與公主長得像约啊,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子佣赖,可洞房花燭夜當晚...
    茶點故事閱讀 44,592評論 2 353

推薦閱讀更多精彩內(nèi)容

  • 在之前分析過dpdk中的相關(guān)一些開源實現(xiàn)恰矩,然后當時的版本是16.07.2,現(xiàn)在最新的是19.11茵汰,代碼方面還是有很...
    fooboo閱讀 665評論 0 0
  • 因為最近在研究高性能方面的技術(shù)枢里,突然想起上一份工作從事抗D的項目孽鸡,在項目中使用到的dpdk組件蹂午,其中之一有無鎖相關(guān)...
    fooboo閱讀 9,222評論 2 3
  • 1. DPDK技術(shù)介紹 1) 簡介 DPDK全稱Intel Data Plane Development Kit,...
    Aubrey_de6c閱讀 71,077評論 0 7
  • 3. 環(huán)境抽象層 環(huán)境抽象層(EAL)為底層資源如硬件和存儲空間的訪問提供了接口彬碱。這些接口為上層應(yīng)用程序和庫隱藏了...
    半天妖閱讀 4,439評論 0 10
  • 3. 環(huán)境抽象層 環(huán)境抽象層(Environment Abstraction Layer豆胸,下文簡稱EAL)是對操作...
    希爾哥哥s閱讀 4,470評論 0 1