進程間共享存儲編程

#include <iostream>
#include <libgen.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/mman.h>
#include <sys/stat.h>

using namespace std;

#define USER_LIMIT 5
#define BUFFER_SIZE 1024
#define FD_LIMIT 65535
#define MAX_EVENT_NUMBER 1024
#define PROCESS_LIMIT 65536

struct client_data
{
    sockaddr_in address;
    int connfd;
    pid_t pid;
    int pipefd[2];
};

static const char *shm_name = "/my_shm";
int sig_pipefd[2];
int epollfd;
int listenfd;
int shmfd;
char *share_mem = 0;
client_data *users = 0;
int *sub_process = 0;
int user_count = 0;
bool stop_child = false;

int setnonblocking(int);
void addfd(int, int);
void sig_handler(int);
void addsig(int sig, void(*handler)(int), bool restart = true);
void del_resource();
void child_term_handler(int);
int run_child(int, client_data*, char*);

int main(int argc, char* argv[])
{
    if(argc <= 2)
    {
        cout << "usage: " << basename(argv[0]) << " ip_address port_number" << endl;
    }
    const char* ip = argv[1];
    int port = atoi(argv[2]);

    int ret = 0;
    struct sockaddr_in address;
    bzero(&address, sizeof(address));
    address.sin_family = AF_INET;
    inet_pton(AF_INET, ip, &address.sin_addr);
    address.sin_port = htons(port);

    listenfd = socket(AF_INET, SOCK_STREAM, 0);
    assert(listenfd >= 0);

    ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
    assert(ret != -1);

    ret = listen(listenfd, 5);
    assert(ret != -1);

    user_count = 0;
    users = new client_data[USER_LIMIT + 1];
    sub_process = new int[PROCESS_LIMIT];
    for(int i = 0; i < PROCESS_LIMIT; i++)
    {
        sub_process[i] = -1;
    }

    epoll_event events[MAX_EVENT_NUMBER];
    epollfd = epoll_create(5);
    assert(epollfd != -1);
    addfd(epollfd, listenfd);

    ret = socketpair(AF_UNIX, SOCK_STREAM, 0, sig_pipefd);
    assert(ret != -1);
    setnonblocking(sig_pipefd[1]);
    addfd(epollfd, sig_pipefd[0]);

    addsig(SIGCHLD, sig_handler);
    addsig(SIGTERM, sig_handler);
    addsig(SIGINT, sig_handler);
    addsig(SIGPIPE, SIG_IGN);
    bool stop_server = false;
    bool terminate = false;

    shmfd = shm_open(shm_name, O_CREAT | O_RDWR, 0666);
    assert(shmfd != -1);
    ret = ftruncate(shmfd, USER_LIMIT*BUFFER_SIZE);
    assert(ret != -1);

    share_mem = (char*)mmap(NULL, USER_LIMIT*BUFFER_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shmfd, 0);
    assert(share_mem != MAP_FAILED);
    close(shmfd);

    while(!stop_server)
    {
        int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
        /*
        EBADF  epfd is not a valid file descriptor.

        EFAULT The  memory  area  pointed  to  by events is not accessible with write permissions.

        EINTR  The call was interrupted by a signal handler before  either  (1)
              any of the requested events occurred or (2) the timeout expired;
              see signal(7).

        EINVAL epfd is not an epoll file descriptor, or maxevents is less  than
              or equal to zero.
        */
        if((number < 0) && (errno != EINTR))
        {
            cout << "epoll failed" << endl;
            break;
        }
        for(int i = 0; i < number; i++)
        {
            int sockfd = events[i].data.fd;
            /*
            *typedef union epoll_data {
            *   void    *ptr;
            *   int      fd;
            *   uint32_t u32;
            *   uint64_t u64;
            *} epoll_data_t;
            *
            * truct epoll_event {
            *   uint32_t     events;    
            *   epoll_data_t data;      
            *};
            */
            if(sockfd == listenfd)
            {
                struct sockaddr_in client_address;
                socklen_t client_addrlength = sizeof(client_address);
                int connfd = accept(listenfd, (struct sockaddr *)&client_address, &client_addrlength);
                if(connfd < 0)
                {
                    cout << "errno is" << errno << endl;
                    continue;
                }
                if(user_count >= USER_LIMIT)
                {
                    const char *info = "too many users";
                    cout << info << endl;
                    send(connfd, info, strlen(info), 0);
                    close(connfd);
                    continue;
                }

                users[user_count].address = client_address;
                users[user_count].connfd = connfd;

                ret = socketpair(AF_UNIX, SOCK_STREAM, 0, users[user_count].pipefd);
                assert(ret != -1);
                pid_t pid = fork();
                if(pid < 0)
                {
                    close(connfd);
                    continue;
                }
                else if(pid == 0)
                {
                    close(epollfd);
                    close(listenfd);
                    close(users[user_count].pipefd[0]);
                    close(sig_pipefd[0]);
                    close(sig_pipefd[1]);
                    run_child(user_count, users, share_mem);
                    munmap((void*)share_mem, USER_LIMIT*BUFFER_SIZE);
                    exit(0);
                }
                else
                {
                    close(connfd);
                    close(users[user_count].pipefd[1]);
                    addfd(epollfd, users[user_count].pipefd[0]);
                    users[user_count].pid = pid;

                    sub_process[pid] = user_count;
                    user_count++;
                }
            }
            /*handle signal*/
            else if((sockfd == sig_pipefd[0]) && (events[i].events & EPOLLIN))
            {
                int sig;
                char signals[1024];
                ret = recv(sig_pipefd[0], signals, sizeof(signals), 0);
                /*
                These  calls  return  the  number  of bytes received, or -1 if an error
                occurred.  In the event of an error,  errno  is  set  to  indicate  the
                error.
                When a stream socket peer has performed an orderly shutdown, the return
                value will be 0 (the traditional "end-of-file" return).
                Datagram sockets in  various  domains  (e.g.,  the  UNIX  and  Internet
                domains)  permit  zero-length  datagrams.   When  such  a  datagram  is
                received, the return value is 0.
                The value 0 may also be returned if the requested number  of  bytes  to
                receive from a stream socket was 0.
                ERRORS:
                EAGAIN or EWOULDBLOCK
                The socket is marked nonblocking and the receive operation would
                block, or a receive timeout had been set and the timeout expired
                before  data  was  received.   POSIX.1 allows either error to be
                returned for this case, and does not require these constants  to
                have  the same value, so a portable application should check for
                both possibilities.
                ECONNREFUSED
                A remote host refused to allow the network connection (typically
                because it is not running the requested service).
                EBADF EFAULT EINTR
                ENOTCONN
                The socket is associated with a connection-oriented protocol and
                has not been connected
                */
                if(ret == -1)
                {
                    continue;
                }
                else if(ret == 0)
                {
                    continue;

                }
                else
                {
                    for(int i = 0; i < ret; i++)
                    {
                        switch(signals[i])
                        {
                            case SIGCHLD:
                            {
                                pid_t pid;
                                int stat;
                                while((pid = waitpid(-1, &stat, WNOHANG)) > 0)
                                {/*WNOHANG     return immediately if no child has exited.*/
                                    int del_user = sub_process[pid];
                                    sub_process[pid] = -1;
                                    if(del_user < 0 || del_user > USER_LIMIT)
                                        continue;
                                    epoll_ctl(epollfd, EPOLL_CTL_DEL, users[del_user].pipefd[0], 0);
                                    close(users[del_user].pipefd[0]);
                                    users[del_user] = users[--user_count];
                                    sub_process[users[del_user].pid] = del_user;
                                }
                                if(terminate && user_count == 0)
                                {
                                    stop_server = true;
                                }
                                break;
                            case SIGTERM:
                            case SIGINT:
                            {
                                cout << "kill all the child now" << endl;
                                if(user_count == 0)
                                {
                                    stop_server = true;
                                    break;
                                }
                                for(int i = 0; i < user_count; i++)
                                {
                                    int pid = users[i].pid;
                                    kill(pid, SIGTERM);
                                }
                                terminate = true;
                                break;
                            }
                            default:
                                break;
                            }
                        }
                    }
                }
            }
            else if(events[i].events & EPOLLIN)
            {
                int child = 0;
                ret = recv(sockfd, (char *)&child, sizeof(child), 0);
                cout << "read data from child accross pipe" << endl;
                if(ret == -1)
                {
                    continue;
                }
                else if(ret == 0)
                {
                    continue;
                }
                else
                {
                    for(int j = 0; j < user_count; j++)
                    {
                        if(users[j].pipefd[0] != sockfd)
                        {
                            cout << "send data to child accross pipe" << endl;
                            send(users[i].pipefd[0], (char *)&child, sizeof(child), 0);
                        }
                    }
                }
            }
        }
    }

    del_resource();
    return 0;
}

int setnonblocking(int fd)
{
    int old_option = fcntl(fd, F_GETFL);
    int new_option = old_option | O_NONBLOCK;
    fcntl(fd, F_SETFL, new_option);
    return old_option;
}
void addfd(int epollfd, int fd)
{
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN | EPOLLET;
    epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
    setnonblocking(fd);
}

void sig_handler(int sig)
{
    int save_errno = errno;
    int msg = sig;
    send(sig_pipefd[1], (char*)&msg, 1, 0);
    errno = save_errno;
}

void addsig(int sig, void(*handler)(int), bool restart)
{
    struct sigaction sa;
    memset(&sa, '\0', sizeof(sa));
    sa.sa_handler = handler;
    if(restart)
    {
        sa.sa_flags |= SA_RESTART;
    }
    sigfillset(&sa.sa_mask);
    assert(sigaction(sig, &sa, NULL) != 1);
}

void del_resource()
{
    close(sig_pipefd[0]);
    close(sig_pipefd[1]);
    close(listenfd);
    close(epollfd);
    shm_unlink(shm_name);
    delete[] users;
    delete[] sub_process;
}

void child_term_handler(int sig)
{
    stop_child = true;
}

int run_child(int idx, client_data* users, char *share_mem)
{
    epoll_event events[MAX_EVENT_NUMBER];
    int child_epollfd = epoll_create(5);
    assert(child_epollfd != -1);
    int connfd = users[idx].connfd;
    addfd(child_epollfd, connfd);
    int pipefd = users[idx].pipefd[1];
    addfd(child_epollfd, pipefd);
    int ret;
    addsig(SIGTERM, child_term_handler, false);

    while(!stop_child)
    {
        int number = epoll_wait(child_epollfd, events, MAX_EVENT_NUMBER, -1);
        if(number < 0 && errno != EINTR)
        {
            cout << "epoll failure" << endl;
            break;
        }

        for(int i = 0; i < number; i++)
        {
            int sockfd = events[i].data.fd;
            if(sockfd == connfd && events[i].events & EPOLLIN)
            {
                memset(share_mem + idx*BUFFER_SIZE, '\0', BUFFER_SIZE);
                ret = recv(connfd, share_mem + idx*BUFFER_SIZE, BUFFER_SIZE - 1, 0);
                if(ret < 0)
                {
                    if(errno != EAGAIN)
                        stop_child = true;
                }
                else if(ret == 0)
                {
                    stop_child = true;
                }
                else
                {
                    send(pipefd, (char *)&idx, sizeof(idx), 0);
                }
            }
            else if(sockfd == pipefd && (events[i].events & EPOLLIN))
            {
                int client = 0;
                ret = recv(sockfd, (char *)&client, sizeof(client), 0);
                if(ret < 0)
                {
                    if(errno != EAGAIN)
                        stop_child = true;
                }
                else if(ret == 0)
                {
                    stop_child = true;
                }
                else
                {
                    send(connfd, share_mem+client*BUFFER_SIZE, BUFFER_SIZE, 0);
                }
            }
            else
            {
                continue;
            }
        }
    }

    close(connfd);
    close(pipefd);
    close(child_epollfd);
    return 0;
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市准给,隨后出現(xiàn)的幾起案子狼钮,更是在濱河造成了極大的恐慌元莫,老刑警劉巖妈拌,帶你破解...
    沈念sama閱讀 216,496評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異埃儿,居然都是意外死亡截亦,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,407評論 3 392
  • 文/潘曉璐 我一進店門晶姊,熙熙樓的掌柜王于貴愁眉苦臉地迎上來扒接,“玉大人,你說我怎么就攤上這事们衙〖卣” “怎么了?”我有些...
    開封第一講書人閱讀 162,632評論 0 353
  • 文/不壞的土叔 我叫張陵蒙挑,是天一觀的道長宗侦。 經(jīng)常有香客問我,道長忆蚀,這世上最難降的妖魔是什么矾利? 我笑而不...
    開封第一講書人閱讀 58,180評論 1 292
  • 正文 為了忘掉前任姑裂,我火速辦了婚禮,結(jié)果婚禮上男旗,老公的妹妹穿的比我還像新娘舶斧。我一直安慰自己,他們只是感情好察皇,可當我...
    茶點故事閱讀 67,198評論 6 388
  • 文/花漫 我一把揭開白布茴厉。 她就那樣靜靜地躺著,像睡著了一般什荣。 火紅的嫁衣襯著肌膚如雪矾缓。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,165評論 1 299
  • 那天溃睹,我揣著相機與錄音而账,去河邊找鬼。 笑死因篇,一個胖子當著我的面吹牛泞辐,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播竞滓,決...
    沈念sama閱讀 40,052評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼咐吼,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了商佑?” 一聲冷哼從身側(cè)響起锯茄,我...
    開封第一講書人閱讀 38,910評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎茶没,沒想到半個月后肌幽,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,324評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡抓半,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,542評論 2 332
  • 正文 我和宋清朗相戀三年喂急,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片笛求。...
    茶點故事閱讀 39,711評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡廊移,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出探入,到底是詐尸還是另有隱情狡孔,我是刑警寧澤,帶...
    沈念sama閱讀 35,424評論 5 343
  • 正文 年R本政府宣布蜂嗽,位于F島的核電站苗膝,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏植旧。R本人自食惡果不足惜荚醒,卻給世界環(huán)境...
    茶點故事閱讀 41,017評論 3 326
  • 文/蒙蒙 一芋类、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧界阁,春花似錦侯繁、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,668評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至较剃,卻和暖如春咕别,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背写穴。 一陣腳步聲響...
    開封第一講書人閱讀 32,823評論 1 269
  • 我被黑心中介騙來泰國打工惰拱, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人啊送。 一個月前我還...
    沈念sama閱讀 47,722評論 2 368
  • 正文 我出身青樓偿短,卻偏偏與公主長得像,于是被迫代替她去往敵國和親馋没。 傳聞我的和親對象是個殘疾皇子昔逗,可洞房花燭夜當晚...
    茶點故事閱讀 44,611評論 2 353

推薦閱讀更多精彩內(nèi)容