QNX相關(guān)歷史文章:
The Bones of a Resource Manager
這篇文章將從服務(wù)器端和客戶端兩側(cè)來描述大體的框架和分層实苞,并會(huì)給出實(shí)例谤绳。
1. Under the covers
1.1 Under the client's covers
當(dāng)一個(gè)客戶端調(diào)用需要路徑名解析的函數(shù)時(shí)(比如open()/rename()/stat()/unlink()
)抛姑,它會(huì)同時(shí)向進(jìn)程管理器和對(duì)應(yīng)的資源管理器發(fā)送消息屯耸,進(jìn)而去獲取文件描述符跨释,并通過這個(gè)文件描述符向與路徑名關(guān)聯(lián)的設(shè)備發(fā)送消息钱贯。
/*
* In this stage, the client talks
* to the process manager and the resource manager.
*/
fd = open("/dev/ser1", O_RDWR);
/*
* In this stage, the client talks directly to the
* resource manager.
*/
for (packet = 0; packet < npackets; packet++)
{
write(fd, packets[packet], PACKET_SIZE);
}
close(fd);
上邊的這個(gè)代碼窗宇,背后的邏輯是怎么實(shí)現(xiàn)的呢措伐?我們假設(shè)這是名為devc-ser8250
資源管理器管理的串口設(shè)備,注冊(cè)的路徑名為dev/ser1
:
客戶端發(fā)送
query
消息军俊,open()
函數(shù)會(huì)向進(jìn)程管理器發(fā)送消息侥加,請(qǐng)求查找名字,比如/dev/ser1
蝇完;進(jìn)程管理器會(huì)返回與路徑名關(guān)聯(lián)的
nd/pid/child/handle
官硝。當(dāng)devc-ser8250
資源管理器使用/dev/ser1
注冊(cè)時(shí),進(jìn)程管理器會(huì)維護(hù)一個(gè)類似于下邊的條目信息:
0, 47167, 1, 0, 0, /dev/ser1
其中條目的各項(xiàng)分別代表:
-
0
短蜕, Node descriptor(nd
)氢架,用于描述在網(wǎng)絡(luò)中的節(jié)點(diǎn); -
47167
朋魔, Process ID(pid
)岖研,資源管理器的進(jìn)程ID號(hào); -
1
警检, Channel ID(chid
)孙援,資源管理器用于接收消息的通道; -
0
扇雕, Handle的索引號(hào)拓售,由資源管理器注冊(cè)的Handle; -
0
镶奉, 在名稱注冊(cè)期間傳遞的打開類型础淤; -
/dev/ser1
崭放,注冊(cè)的路徑名;
在進(jìn)程管理器中會(huì)維護(hù)一個(gè)條目表鸽凶,用于記錄各個(gè)資源管理器的信息币砂。在名字匹配的時(shí)候會(huì)選擇最長(zhǎng)匹配。
- 客戶端需要向資源管理器發(fā)送一個(gè)
connect
消息玻侥,首先它需要?jiǎng)?chuàng)建一個(gè)連接的通道:
fd = ConnectAttach(nd, pid, chid, 0, 0);
客戶端也是使用這個(gè)fd
向資源管理器發(fā)送連接消息决摧,請(qǐng)求打開/dev/ser1
。當(dāng)資源管理器收到連接消息后凑兰,會(huì)進(jìn)行權(quán)限檢查掌桩。
資源管理器通常會(huì)回應(yīng)通過或失敗。
當(dāng)客戶端獲取到文件描述符后票摇,可以直接通過它向設(shè)備發(fā)送消息拘鞋。示例代碼看起來像是客戶端直接向設(shè)備寫入,實(shí)際上在調(diào)用
write()
時(shí)矢门,會(huì)先發(fā)送一個(gè)_IO_WRITE
消息給資源管理器,請(qǐng)求數(shù)據(jù)寫入灰蛙,客戶端調(diào)用close()
時(shí)祟剔,會(huì)發(fā)送_IO_CLOSE_DUP
消息給資源管理器,完成最后資源的回收清理工作摩梧。
1.2 Under the resource manager's covers
資源管理器就是一個(gè)服務(wù)器物延,通過send/receive/reply
消息協(xié)議來接收和回復(fù)消息,偽代碼如下:
initialize the resource manager
register the name with the process manager
DO forever
receive a message
SWITCH on the type of message
CASE _IO_CONNECT:
call io_open handler
ENDCASE
CASE _IO_READ:
call io_read handler
ENDCASE
CASE _IO_WRITE:
call io_write handler
ENDCASE
. /* etc. handle all other messages */
. /* that may occur, performing */
. /* processing as appropriate */
ENDSWITCH
ENDDO
事實(shí)上仅父,上述代碼中的很多細(xì)節(jié)在實(shí)現(xiàn)一個(gè)資源管理器中可能用不到叛薯,因?yàn)橛幸恍?kù)進(jìn)行了封裝,但是自己還是需要去實(shí)現(xiàn)對(duì)消息的回復(fù)笙纤。
2. Layers in a resource manager
資源管理器由四層組成耗溜,從下到上分別為:
- iofunc layer
- resmgr layer
- dispatch layer
- thread pool layer
2.1 iofunc layer
這一層的主要功能是提供POSIX特性,用戶編寫資源管理器時(shí)不需要太關(guān)心向外界提供POSIX文件系統(tǒng)所涉及的細(xì)節(jié)省容。
這一層由一組函數(shù)iofunc_*
組成抖拴,包含了默認(rèn)處理程序,如果沒有提供自己的處理程序的話腥椒,就會(huì)使用默認(rèn)提供的程序阿宅。比如,如果沒有提供io_open
處理程序的話笼蛛,就會(huì)調(diào)用iofunc_open_default()
洒放。同時(shí)也包含了默認(rèn)處理程序調(diào)用的幫助函數(shù),當(dāng)自己實(shí)現(xiàn)處理程序時(shí)滨砍,也可以調(diào)用這些幫助函數(shù)往湿。
2.2 resmgr layer
這一層負(fù)責(zé)管理資源管理器庫(kù)的大部分細(xì)節(jié):
- 檢查傳入消息
-
調(diào)用合適的處理程序來處理消息
如果不使用這一層的話榨为,則需要用戶自己解析消息,大部分資源管理器都會(huì)用到這一層煌茴。
The resmgr layer to handle _IO_* message
2.3 dispatch layer
這一層充當(dāng)多種事件類型的阻塞點(diǎn)随闺,使用這一層,可以處理:
- `IO* 消息蔓腐, 使用resmgr層矩乐;
-
select()
,注冊(cè)一個(gè)處理函數(shù)回论,當(dāng)數(shù)據(jù)包到達(dá)時(shí)調(diào)用散罕,這里的函數(shù)是select_*()
函數(shù); -
Pulses
傀蓉,注冊(cè)一個(gè)處理函數(shù)欧漱,當(dāng)pulses
來的時(shí)候調(diào)用,這里的函數(shù)是pulse_*()
函數(shù)葬燎; - 其他消息误甚,可以提供自己創(chuàng)建的一系列消息類型和處理函數(shù),當(dāng)消息到達(dá)時(shí)調(diào)用對(duì)應(yīng)的處理函數(shù)谱净,這里的函數(shù)是
message_*()
函數(shù)窑邦;
use the dispatch layer to handle _IO_* messages, select, pulses, and other messages
消息進(jìn)行分發(fā)時(shí),會(huì)進(jìn)行查找和匹配壕探,然后會(huì)調(diào)用到匹配的Handler函數(shù)冈钦,如果沒有找到匹配的,則返回MsgError
李请。
2.4 thread pool layer
這一層允許用戶創(chuàng)建單線程或多線程資源管理器瞧筛,這意味著可以一個(gè)線程讀,一個(gè)線程寫导盅。需要為線程提供阻塞函數(shù)较幌,以及阻塞函數(shù)返回時(shí)需要調(diào)用的處理函數(shù)。大多數(shù)情況下认轨,可以將dispatch layer
的任務(wù)分配到線程上來绅络,當(dāng)然,也可以是resmgr layer
任務(wù)或自己實(shí)現(xiàn)的功能嘁字。
3. Simple examples of device resource managers
3.1 Single-threaded device resource managers
先來看一份完整的單線程設(shè)備資源管理器代碼吧:
#include <errno.h>
#include <stdio.h>
#include <stddef.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/iofunc.h>
#include <sys/dispatch.h>
static resmgr_connect_funcs_t connect_funcs;
static resmgr_io_funcs_t io_funcs;
static iofunc_attr_t attr;
main(int argc, char **argv)
{
/* declare variables we'll be using */
resmgr_attr_t resmgr_attr;
dispatch_t *dpp;
dispatch_context_t *ctp;
int id;
/* initialize dispatch interface */
if((dpp = dispatch_create()) == NULL) {
fprintf(stderr,
"%s: Unable to allocate dispatch handle.\n",
argv[0]);
return EXIT_FAILURE;
}
/* initialize resource manager attributes */
memset(&resmgr_attr, 0, sizeof resmgr_attr);
resmgr_attr.nparts_max = 1;
resmgr_attr.msg_max_size = 2048;
/* initialize functions for handling messages */
iofunc_func_init(_RESMGR_CONNECT_NFUNCS, &connect_funcs,
_RESMGR_IO_NFUNCS, &io_funcs);
/* initialize attribute structure used by the device */
iofunc_attr_init(&attr, S_IFNAM | 0666, 0, 0);
/* attach our device name */
id = resmgr_attach(
dpp, /* dispatch handle */
&resmgr_attr, /* resource manager attrs */
"/dev/sample", /* device name */
_FTYPE_ANY, /* open type */
0, /* flags */
&connect_funcs, /* connect routines */
&io_funcs, /* I/O routines */
&attr); /* handle */
if(id == -1) {
fprintf(stderr, "%s: Unable to attach name.\n", argv[0]);
return EXIT_FAILURE;
}
/* allocate a context structure */
ctp = dispatch_context_alloc(dpp);
/* start the resource manager message loop */
while(1) {
if((ctp = dispatch_block(ctp)) == NULL) {
fprintf(stderr, "block error\n");
return EXIT_FAILURE;
}
dispatch_handler(ctp);
}
}
上述代碼完成以下功能:
初始化dispatch接口
通過調(diào)用dispatch_create()
接口創(chuàng)建dispatch_t
結(jié)構(gòu)恩急,這個(gè)結(jié)構(gòu)中包含了channel ID
,但channel ID
實(shí)際的創(chuàng)建是在附加其他內(nèi)容的時(shí)候纪蜒,比如調(diào)用resmgr_attach()/message_attach()/pulse_attach()
等衷恭。初始化資源管理器屬性
當(dāng)調(diào)用resmgr_attach()
時(shí)會(huì)傳入resmgr_attr_t
結(jié)構(gòu),在這個(gè)示例中主要配置:
-
nparts_max
纯续,可供服務(wù)器回復(fù)的IOV的個(gè)數(shù)随珠; -
msg_max_size
灭袁,最大接收緩沖大小窗看;
- 初始化消息處理函數(shù)
在這個(gè)例子中提供了兩張表茸歧,用于指定在特定消息到達(dá)時(shí)調(diào)用哪個(gè)函數(shù):
- 連接函數(shù)表;
- I/O函數(shù)表显沈;
可以調(diào)用iofunc_func_init()
接口來配置默認(rèn)操作函數(shù)软瞎。
- 初始化設(shè)備使用的屬性結(jié)構(gòu)
調(diào)用iofunc_attr_init()
接口來設(shè)置,屬性結(jié)構(gòu)中至少應(yīng)該包括以下信息:
- 權(quán)限和設(shè)備類型
- 組ID和屬主ID
在名字空間中注冊(cè)一個(gè)名字
調(diào)用resmgr_attach()
來注冊(cè)資源管理器的路徑拉讯。在資源管理器能接收到其他程序的消息之前涤浇,需要通過進(jìn)程管理器通知其他程序它與路徑名的綁定關(guān)系。分配context結(jié)構(gòu)體
調(diào)用dispatch_context_alloc()
接口來分配魔慷,context
結(jié)構(gòu)體包含了用于接收消息的緩沖只锭,也包含了可用于回復(fù)消息的IOVs
緩沖。開始資源管理器消息循環(huán)
當(dāng)進(jìn)入循環(huán)后院尔,資源管理器便在dispatch_block()
中接收消息蜻展,并調(diào)用dispatch_handler()
進(jìn)行分發(fā)處理,選擇連接函數(shù)表和I/O函數(shù)表中的合適函數(shù)進(jìn)行執(zhí)行召边。當(dāng)執(zhí)行完畢后铺呵,會(huì)再進(jìn)入dispatch_block()
來等待接收其他消息。
3.2 Multi-threaded device resource managers
下邊是多線程設(shè)備資源管理器示例代碼:
#include <errno.h>
#include <stdio.h>
#include <stddef.h>
#include <stdlib.h>
#include <unistd.h>
/*
* Define THREAD_POOL_PARAM_T such that we can avoid a compiler
* warning when we use the dispatch_*() functions below
*/
#define THREAD_POOL_PARAM_T dispatch_context_t
#include <sys/iofunc.h>
#include <sys/dispatch.h>
static resmgr_connect_funcs_t connect_funcs;
static resmgr_io_funcs_t io_funcs;
static iofunc_attr_t attr;
main(int argc, char **argv)
{
/* declare variables we'll be using */
thread_pool_attr_t pool_attr;
resmgr_attr_t resmgr_attr;
dispatch_t *dpp;
thread_pool_t *tpp;
dispatch_context_t *ctp;
int id;
/* initialize dispatch interface */
if((dpp = dispatch_create()) == NULL) {
fprintf(stderr, "%s: Unable to allocate dispatch handle.\n",
argv[0]);
return EXIT_FAILURE;
}
/* initialize resource manager attributes */
memset(&resmgr_attr, 0, sizeof resmgr_attr);
resmgr_attr.nparts_max = 1;
resmgr_attr.msg_max_size = 2048;
/* initialize functions for handling messages */
iofunc_func_init(_RESMGR_CONNECT_NFUNCS, &connect_funcs,
_RESMGR_IO_NFUNCS, &io_funcs);
/* initialize attribute structure used by the device */
iofunc_attr_init(&attr, S_IFNAM | 0666, 0, 0);
/* attach our device name */
id = resmgr_attach(dpp, /* dispatch handle */
&resmgr_attr, /* resource manager attrs */
"/dev/sample", /* device name */
_FTYPE_ANY, /* open type */
0, /* flags */
&connect_funcs, /* connect routines */
&io_funcs, /* I/O routines */
&attr); /* handle */
if(id == -1) {
fprintf(stderr, "%s: Unable to attach name.\n", argv[0]);
return EXIT_FAILURE;
}
/* initialize thread pool attributes */
memset(&pool_attr, 0, sizeof pool_attr);
pool_attr.handle = dpp;
pool_attr.context_alloc = dispatch_context_alloc;
pool_attr.block_func = dispatch_block;
pool_attr.unblock_func = dispatch_unblock;
pool_attr.handler_func = dispatch_handler;
pool_attr.context_free = dispatch_context_free;
pool_attr.lo_water = 2;
pool_attr.hi_water = 4;
pool_attr.increment = 1;
pool_attr.maximum = 50;
/* allocate a thread pool handle */
if((tpp = thread_pool_create(&pool_attr,
POOL_FLAG_EXIT_SELF)) == NULL) {
fprintf(stderr, "%s: Unable to initialize thread pool.\n",
argv[0]);
return EXIT_FAILURE;
}
/* start the threads; will not return */
thread_pool_start(tpp);
}
從代碼中可以看出隧熙,大部分代碼與單線程示例一樣。在這個(gè)代碼中幻林,線程在阻塞循環(huán)中用到了dispatch_*()
函數(shù)(dispatch layer
)贞盯。
初始化線程池的屬性
給pool_attr
的各個(gè)字段賦值,用于告知線程在阻塞循環(huán)時(shí)調(diào)用哪些函數(shù)沪饺,以及線程池需要維護(hù)多少線程躏敢。分配一個(gè)線程池的
handle
調(diào)用thread_pool_create()
接口來分配,這個(gè)handle
用于控制整個(gè)線程池整葡。開啟線程
調(diào)用thread_pool_start()
接口件余,啟動(dòng)線程池。每個(gè)新創(chuàng)建的線程都會(huì)使用在屬性結(jié)構(gòu)中給出的context_alloc()
函數(shù)來分配THREAD_POOL_PARAM_T
定義類型的context
結(jié)構(gòu)遭居。
3.3 Using MsgSend() and MsgReply()
可以不使用read()
和write()
接口來與資源管理器交互啼器,可以調(diào)用MsgSend()
來發(fā)送消息,下邊的示例將分兩部分:服務(wù)器和客戶端俱萍。服務(wù)器端需要使能PROCMGR_AID_PATHSPACE
端壳,用于確保能調(diào)用resmgr_attach()
函數(shù)。
- 服務(wù)器代碼:
/*
* ResMgr and Message Server Process
*/
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <sys/neutrino.h>
#include <sys/iofunc.h>
#include <sys/dispatch.h>
resmgr_connect_funcs_t ConnectFuncs;
resmgr_io_funcs_t IoFuncs;
iofunc_attr_t IoFuncAttr;
typedef struct
{
uint16_t msg_no;
char msg_data[255];
} server_msg_t;
int message_callback( message_context_t *ctp, int type, unsigned flags,
void *handle )
{
server_msg_t *msg;
int num;
char msg_reply[255];
/* Cast a pointer to the message data */
msg = (server_msg_t *)ctp->msg;
/* Print some useful information about the message */
printf( "\n\nServer Got Message:\n" );
printf( " type: %d\n" , type );
printf( " data: %s\n\n", msg->msg_data );
/* Build the reply message */
num = type - _IO_MAX;
snprintf( msg_reply, 254, "Server Got Message Code: _IO_MAX + %d", num );
/* Send a reply to the waiting (blocked) client */
MsgReply( ctp->rcvid, EOK, msg_reply, strlen( msg_reply ) + 1 );
return 0;
}
int main( int argc, char **argv )
{
resmgr_attr_t resmgr_attr;
message_attr_t message_attr;
dispatch_t *dpp;
dispatch_context_t *ctp, *ctp_ret;
int resmgr_id, message_id;
/* Create the dispatch interface */
dpp = dispatch_create();
if( dpp == NULL )
{
fprintf( stderr, "dispatch_create() failed: %s\n",
strerror( errno ) );
return EXIT_FAILURE;
}
memset( &resmgr_attr, 0, sizeof( resmgr_attr ) );
resmgr_attr.nparts_max = 1;
resmgr_attr.msg_max_size = 2048;
/* Setup the default I/O functions to handle open/read/write/... */
iofunc_func_init( _RESMGR_CONNECT_NFUNCS, &ConnectFuncs,
_RESMGR_IO_NFUNCS, &IoFuncs );
/* Setup the attribute for the entry in the filesystem */
iofunc_attr_init( &IoFuncAttr, S_IFNAM | 0666, 0, 0 );
resmgr_id = resmgr_attach( dpp, &resmgr_attr, "serv", _FTYPE_ANY,
0, &ConnectFuncs, &IoFuncs, &IoFuncAttr );
if( resmgr_id == -1 )
{
fprintf( stderr, "resmgr_attach() failed: %s\n", strerror( errno ) );
return EXIT_FAILURE;
}
/* Setup our message callback */
memset( &message_attr, 0, sizeof( message_attr ) );
message_attr.nparts_max = 1;
message_attr.msg_max_size = 4096;
/* Attach a callback (handler) for two message types */
message_id = message_attach( dpp, &message_attr, _IO_MAX + 1,
_IO_MAX + 2, message_callback, NULL );
if( message_id == -1 )
{
fprintf( stderr, "message_attach() failed: %s\n", strerror( errno ) );
return EXIT_FAILURE;
}
/* Setup a context for the dispatch layer to use */
ctp = dispatch_context_alloc( dpp );
if( ctp == NULL )
{
fprintf( stderr, "dispatch_context_alloc() failed: %s\n",
strerror( errno ) );
return EXIT_FAILURE;
}
/* The "Data Pump" - get and process messages */
while( 1 )
{
ctp_ret = dispatch_block( ctp );
if( ctp_ret )
{
dispatch_handler( ctp );
}
else
{
fprintf( stderr, "dispatch_block() failed: %s\n",
strerror( errno ) );
return EXIT_FAILURE;
}
}
return EXIT_SUCCESS;
}
- 首先是調(diào)用
dispatch_create()
接口來創(chuàng)建dpp
枪蘑,通過這個(gè)handle
將接收到的消息轉(zhuǎn)發(fā)到合適的層(resmgr, message, pulse
)损谦; - 設(shè)置調(diào)用
resmgr_attach()
所需要的變量岖免; - 調(diào)用
iofunc_func_init()
來設(shè)置默認(rèn)處理函數(shù),調(diào)用iofunc_attr_init()
來設(shè)置屬性結(jié)構(gòu)照捡; - 調(diào)用
resmgr_attach()
颅湘,注意這時(shí)候注冊(cè)的路徑不是絕對(duì)路徑,因此默認(rèn)在它的執(zhí)行路徑下栗精。. - 告訴
dispatch layer
闯参,除了由resmgr layer
處理的標(biāo)準(zhǔn)I/O和連接消息之外,還需要處理自己的消息术羔。 - 調(diào)用
message_attach()
接口來注冊(cè)自己的消息處理函數(shù)赢赊。 - 當(dāng)調(diào)用
dispatch_block()
接收消息后,調(diào)用dispatch_handler()
來處理级历,實(shí)際上會(huì)在dispatch_handler()
中調(diào)用message_callback()
函數(shù)释移。當(dāng)消息類型為_IO_MAX + 1
或_IO_MAX + 2
時(shí),就會(huì)回調(diào)函數(shù)寥殖。
- 客戶端代碼:
/*
* Message Client Process
*/
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <fcntl.h>
#include <errno.h>
#include <string.h>
#include <sys/neutrino.h>
#include <sys/iofunc.h>
#include <sys/dispatch.h>
typedef struct
{
uint16_t msg_no;
char msg_data[255];
} client_msg_t;
int main( int argc, char **argv )
{
int fd;
int c;
client_msg_t msg;
int ret;
int num;
char msg_reply[255];
num = 1;
/* Process any command line arguments */
while( ( c = getopt( argc, argv, "n:" ) ) != -1 )
{
if( c == 'n' )
{
num = strtol( optarg, 0, 0 );
}
}
/* Open a connection to the server (fd == coid) */
fd = open( "serv", O_RDWR );
if( fd == -1 )
{
fprintf( stderr, "Unable to open server connection: %s\n",
strerror( errno ) );
return EXIT_FAILURE;
}
/* Clear the memory for the msg and the reply */
memset( &msg, 0, sizeof( msg ) );
memset( &msg_reply, 0, sizeof( msg_reply ) );
/* Set up the message data to send to the server */
msg.msg_no = _IO_MAX + num;
snprintf( msg.msg_data, 254, "client %d requesting reply.", getpid() );
printf( "client: msg_no: _IO_MAX + %d\n", num );
fflush( stdout );
/* Send the data to the server and get a reply */
ret = MsgSend( fd, &msg, sizeof( msg ), msg_reply, 255 );
if( ret == -1 )
{
fprintf( stderr, "Unable to MsgSend() to server: %s\n", strerror( errno ) );
return EXIT_FAILURE;
}
/* Print out the reply data */
printf( "client: server replied: %s\n", msg_reply );
close( fd );
return EXIT_SUCCESS;
}
客戶端通過open()
接口獲取connection id
玩讳,調(diào)用MsgSend()
接口去往服務(wù)器上發(fā)送消息,并且等待回復(fù)嚼贡。
上述服務(wù)器和客戶端的例子非常簡(jiǎn)單熏纯,但覆蓋了大部分的內(nèi)容,基于這個(gè)基礎(chǔ)的框架粤策,還可以做其他的事情:
- 基于不同的消息類型樟澜,注冊(cè)不同的消息回調(diào)函數(shù);
- 除了消息之外叮盘,使用
pulse_attach()
來接收pulse
秩贰; - 重寫默認(rèn)的I/O消息處理函數(shù)以便客戶端也能使用
read()
和write()
來與服務(wù)器交互; - 使用線程池來編寫多線程服務(wù)器柔吼;