#include <iostream>
#include <libgen.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/mman.h>
#include <sys/stat.h>
using namespace std;
#define USER_LIMIT 5
#define BUFFER_SIZE 1024
#define FD_LIMIT 65535
#define MAX_EVENT_NUMBER 1024
#define PROCESS_LIMIT 65536
struct client_data
{
sockaddr_in address;
int connfd;
pid_t pid;
int pipefd[2];
};
static const char *shm_name = "/my_shm";
int sig_pipefd[2];
int epollfd;
int listenfd;
int shmfd;
char *share_mem = 0;
client_data *users = 0;
int *sub_process = 0;
int user_count = 0;
bool stop_child = false;
int setnonblocking(int);
void addfd(int, int);
void sig_handler(int);
void addsig(int sig, void(*handler)(int), bool restart = true);
void del_resource();
void child_term_handler(int);
int run_child(int, client_data*, char*);
int main(int argc, char* argv[])
{
if(argc <= 2)
{
cout << "usage: " << basename(argv[0]) << " ip_address port_number" << endl;
}
const char* ip = argv[1];
int port = atoi(argv[2]);
int ret = 0;
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);
listenfd = socket(AF_INET, SOCK_STREAM, 0);
assert(listenfd >= 0);
ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
assert(ret != -1);
ret = listen(listenfd, 5);
assert(ret != -1);
user_count = 0;
users = new client_data[USER_LIMIT + 1];
sub_process = new int[PROCESS_LIMIT];
for(int i = 0; i < PROCESS_LIMIT; i++)
{
sub_process[i] = -1;
}
epoll_event events[MAX_EVENT_NUMBER];
epollfd = epoll_create(5);
assert(epollfd != -1);
addfd(epollfd, listenfd);
ret = socketpair(AF_UNIX, SOCK_STREAM, 0, sig_pipefd);
assert(ret != -1);
setnonblocking(sig_pipefd[1]);
addfd(epollfd, sig_pipefd[0]);
addsig(SIGCHLD, sig_handler);
addsig(SIGTERM, sig_handler);
addsig(SIGINT, sig_handler);
addsig(SIGPIPE, SIG_IGN);
bool stop_server = false;
bool terminate = false;
shmfd = shm_open(shm_name, O_CREAT | O_RDWR, 0666);
assert(shmfd != -1);
ret = ftruncate(shmfd, USER_LIMIT*BUFFER_SIZE);
assert(ret != -1);
share_mem = (char*)mmap(NULL, USER_LIMIT*BUFFER_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shmfd, 0);
assert(share_mem != MAP_FAILED);
close(shmfd);
while(!stop_server)
{
int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
/*
EBADF epfd is not a valid file descriptor.
EFAULT The memory area pointed to by events is not accessible with write permissions.
EINTR The call was interrupted by a signal handler before either (1)
any of the requested events occurred or (2) the timeout expired;
see signal(7).
EINVAL epfd is not an epoll file descriptor, or maxevents is less than
or equal to zero.
*/
if((number < 0) && (errno != EINTR))
{
cout << "epoll failed" << endl;
break;
}
for(int i = 0; i < number; i++)
{
int sockfd = events[i].data.fd;
/*
*typedef union epoll_data {
* void *ptr;
* int fd;
* uint32_t u32;
* uint64_t u64;
*} epoll_data_t;
*
* truct epoll_event {
* uint32_t events;
* epoll_data_t data;
*};
*/
if(sockfd == listenfd)
{
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr *)&client_address, &client_addrlength);
if(connfd < 0)
{
cout << "errno is" << errno << endl;
continue;
}
if(user_count >= USER_LIMIT)
{
const char *info = "too many users";
cout << info << endl;
send(connfd, info, strlen(info), 0);
close(connfd);
continue;
}
users[user_count].address = client_address;
users[user_count].connfd = connfd;
ret = socketpair(AF_UNIX, SOCK_STREAM, 0, users[user_count].pipefd);
assert(ret != -1);
pid_t pid = fork();
if(pid < 0)
{
close(connfd);
continue;
}
else if(pid == 0)
{
close(epollfd);
close(listenfd);
close(users[user_count].pipefd[0]);
close(sig_pipefd[0]);
close(sig_pipefd[1]);
run_child(user_count, users, share_mem);
munmap((void*)share_mem, USER_LIMIT*BUFFER_SIZE);
exit(0);
}
else
{
close(connfd);
close(users[user_count].pipefd[1]);
addfd(epollfd, users[user_count].pipefd[0]);
users[user_count].pid = pid;
sub_process[pid] = user_count;
user_count++;
}
}
/*handle signal*/
else if((sockfd == sig_pipefd[0]) && (events[i].events & EPOLLIN))
{
int sig;
char signals[1024];
ret = recv(sig_pipefd[0], signals, sizeof(signals), 0);
/*
These calls return the number of bytes received, or -1 if an error
occurred. In the event of an error, errno is set to indicate the
error.
When a stream socket peer has performed an orderly shutdown, the return
value will be 0 (the traditional "end-of-file" return).
Datagram sockets in various domains (e.g., the UNIX and Internet
domains) permit zero-length datagrams. When such a datagram is
received, the return value is 0.
The value 0 may also be returned if the requested number of bytes to
receive from a stream socket was 0.
ERRORS:
EAGAIN or EWOULDBLOCK
The socket is marked nonblocking and the receive operation would
block, or a receive timeout had been set and the timeout expired
before data was received. POSIX.1 allows either error to be
returned for this case, and does not require these constants to
have the same value, so a portable application should check for
both possibilities.
ECONNREFUSED
A remote host refused to allow the network connection (typically
because it is not running the requested service).
EBADF EFAULT EINTR
ENOTCONN
The socket is associated with a connection-oriented protocol and
has not been connected
*/
if(ret == -1)
{
continue;
}
else if(ret == 0)
{
continue;
}
else
{
for(int i = 0; i < ret; i++)
{
switch(signals[i])
{
case SIGCHLD:
{
pid_t pid;
int stat;
while((pid = waitpid(-1, &stat, WNOHANG)) > 0)
{/*WNOHANG return immediately if no child has exited.*/
int del_user = sub_process[pid];
sub_process[pid] = -1;
if(del_user < 0 || del_user > USER_LIMIT)
continue;
epoll_ctl(epollfd, EPOLL_CTL_DEL, users[del_user].pipefd[0], 0);
close(users[del_user].pipefd[0]);
users[del_user] = users[--user_count];
sub_process[users[del_user].pid] = del_user;
}
if(terminate && user_count == 0)
{
stop_server = true;
}
break;
case SIGTERM:
case SIGINT:
{
cout << "kill all the child now" << endl;
if(user_count == 0)
{
stop_server = true;
break;
}
for(int i = 0; i < user_count; i++)
{
int pid = users[i].pid;
kill(pid, SIGTERM);
}
terminate = true;
break;
}
default:
break;
}
}
}
}
}
else if(events[i].events & EPOLLIN)
{
int child = 0;
ret = recv(sockfd, (char *)&child, sizeof(child), 0);
cout << "read data from child accross pipe" << endl;
if(ret == -1)
{
continue;
}
else if(ret == 0)
{
continue;
}
else
{
for(int j = 0; j < user_count; j++)
{
if(users[j].pipefd[0] != sockfd)
{
cout << "send data to child accross pipe" << endl;
send(users[i].pipefd[0], (char *)&child, sizeof(child), 0);
}
}
}
}
}
}
del_resource();
return 0;
}
int setnonblocking(int fd)
{
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}
void addfd(int epollfd, int fd)
{
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
setnonblocking(fd);
}
void sig_handler(int sig)
{
int save_errno = errno;
int msg = sig;
send(sig_pipefd[1], (char*)&msg, 1, 0);
errno = save_errno;
}
void addsig(int sig, void(*handler)(int), bool restart)
{
struct sigaction sa;
memset(&sa, '\0', sizeof(sa));
sa.sa_handler = handler;
if(restart)
{
sa.sa_flags |= SA_RESTART;
}
sigfillset(&sa.sa_mask);
assert(sigaction(sig, &sa, NULL) != 1);
}
void del_resource()
{
close(sig_pipefd[0]);
close(sig_pipefd[1]);
close(listenfd);
close(epollfd);
shm_unlink(shm_name);
delete[] users;
delete[] sub_process;
}
void child_term_handler(int sig)
{
stop_child = true;
}
int run_child(int idx, client_data* users, char *share_mem)
{
epoll_event events[MAX_EVENT_NUMBER];
int child_epollfd = epoll_create(5);
assert(child_epollfd != -1);
int connfd = users[idx].connfd;
addfd(child_epollfd, connfd);
int pipefd = users[idx].pipefd[1];
addfd(child_epollfd, pipefd);
int ret;
addsig(SIGTERM, child_term_handler, false);
while(!stop_child)
{
int number = epoll_wait(child_epollfd, events, MAX_EVENT_NUMBER, -1);
if(number < 0 && errno != EINTR)
{
cout << "epoll failure" << endl;
break;
}
for(int i = 0; i < number; i++)
{
int sockfd = events[i].data.fd;
if(sockfd == connfd && events[i].events & EPOLLIN)
{
memset(share_mem + idx*BUFFER_SIZE, '\0', BUFFER_SIZE);
ret = recv(connfd, share_mem + idx*BUFFER_SIZE, BUFFER_SIZE - 1, 0);
if(ret < 0)
{
if(errno != EAGAIN)
stop_child = true;
}
else if(ret == 0)
{
stop_child = true;
}
else
{
send(pipefd, (char *)&idx, sizeof(idx), 0);
}
}
else if(sockfd == pipefd && (events[i].events & EPOLLIN))
{
int client = 0;
ret = recv(sockfd, (char *)&client, sizeof(client), 0);
if(ret < 0)
{
if(errno != EAGAIN)
stop_child = true;
}
else if(ret == 0)
{
stop_child = true;
}
else
{
send(connfd, share_mem+client*BUFFER_SIZE, BUFFER_SIZE, 0);
}
}
else
{
continue;
}
}
}
close(connfd);
close(pipefd);
close(child_epollfd);
return 0;
}
進程間共享存儲編程
最后編輯于 :
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
- 文/潘曉璐 我一進店門晶姊,熙熙樓的掌柜王于貴愁眉苦臉地迎上來扒接,“玉大人,你說我怎么就攤上這事们衙〖卣” “怎么了?”我有些...
- 文/不壞的土叔 我叫張陵蒙挑,是天一觀的道長宗侦。 經(jīng)常有香客問我,道長忆蚀,這世上最難降的妖魔是什么矾利? 我笑而不...
- 正文 為了忘掉前任姑裂,我火速辦了婚禮,結(jié)果婚禮上男旗,老公的妹妹穿的比我還像新娘舶斧。我一直安慰自己,他們只是感情好察皇,可當我...
- 文/花漫 我一把揭開白布茴厉。 她就那樣靜靜地躺著,像睡著了一般什荣。 火紅的嫁衣襯著肌膚如雪矾缓。 梳的紋絲不亂的頭發(fā)上,一...
- 文/蒼蘭香墨 我猛地睜開眼咐吼,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了商佑?” 一聲冷哼從身側(cè)響起锯茄,我...
- 正文 年R本政府宣布蜂嗽,位于F島的核電站苗膝,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏植旧。R本人自食惡果不足惜荚醒,卻給世界環(huán)境...
- 文/蒙蒙 一芋类、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧界阁,春花似錦侯繁、人聲如沸。這莊子的主人今日做“春日...
- 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至较剃,卻和暖如春咕别,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背写穴。 一陣腳步聲響...
推薦閱讀更多精彩內(nèi)容
- 系統(tǒng)與網(wǎng)絡(luò)編程 exec的使用 找一個函數(shù):find /usr gedit execl 使用調(diào)用系統(tǒng)lsint e...
- 共享內(nèi)存 多個進程共同映射同一內(nèi)核中內(nèi)存高效率,沒有同步分配篷朵,綁定勾怒,脫離,釋放void *shmat(int sh...
- 1声旺、共享內(nèi)存在內(nèi)核中是什么樣子的笔链? 它是一塊緩存,類似于用戶空間的數(shù)組或malloc函數(shù)分配的空間一樣腮猖,只不過是在...
- //使用shmop 系列函數(shù) set_time_limit(0); $shm_key= ftok(__FILE__...