为什么需要进程间通信 进程间通信(IPC)好比 RPG 游戏中的事件,主角需要与 NPC 对话,从他们口中得知重要情报以推进游戏。多进程的程序,进程间几乎都会涉及数据的共享、事件的到来、消息的通知等。
考虑进程间如果有重叠的空间,进程 A 将通信数据放在重叠部分,进程 B 从重叠部分直接取走数据,那么进程间通信就会非常轻松。但是这样做可能会带来一些安全隐患,比如进程 A 对重叠部分的修改导致了对进程 B 控制流的劫持、或者对 B 的数据区域造成了误修改,于是引进了虚拟内存这项技术
有虚存的存在,每个进程享有自己独立的进程空间,进程间不会有交叠部分,但是他们之间的通信就变得困难起来。为了解决这个问题,操作系统就作为进程间通信的第三方,进程 A 要与进程 B 通信,需先将数据交由 OS,再由 OS 来通知进程 B,从而实现进程间的间接交流
Linux 进程间通信方式
本文只对管道与 system V IPC 两个通信类别进行探讨
无名管道
什么是管道及其读写方式
管道是内核在自己所在的内核空间里开辟的一块缓存空间 ,其以文件的方式读写,读写时会用到 write,read 等文件 IO 函数,也会有文件读写用的文件描述符。
无名管道正是因为其没有文件名,所以才叫做无名管道。那么没有文件名,自然无法用 open 函数来打开管道文件并获取文件描述符。针对这种情况,内核提供了 pipe 系统 API,其函数原型为:
1 2 3 #include <unistd.h> int pipe (int pipefd[2 ]) ;
功能:创建一个用于亲缘进程 之间通信的无名管道,并将管道与两个读写文件描述符关联起来
参数:
pipefd[0]:用于存放读管道的读文件描述符
pipefd[1]:用于存放写管道的写文件描述符
返回值:
亲缘进程就是存在继承关系的进程,分为两种:
直接继承关系:父进程 -> 子进程
间接继承关系:父进程 -> 子进程 -> 孙进程 -> …
也就是说,无名管道通过父进程调用 pipe 创建,该管道的读写描述符放在 pipefd 数组中,父进程再调用 fork 创建子进程。子进程由于继承父进程的数据空间,也会有 pipefd 数组,由此获知无名管道的文件描述符。于是,对于有继承关系的进程,都可以与这条继承链上的其他进程进行通信。
单向通信
父子进程之间单向通信的图示如下:
要么父进程通过蓝色的线路与子进程通信,要么子进程通过红色的线路与父进程通信,一个管道不能满足父子进程之间的双向通信,因为进程写到管道里的东西会被自己抢先读出来
需要注意的是,如果管道中没有数据,读操作会使进程休眠(阻塞),如同 scanf 函数等待输入
双向通信
若想实现父子进程之间双向通信,就需要两个管道
图示如下:
管道 1 用于父进程向子进程通信,管道 2 用于子进程向父进程通信。不需要的管道(图中用黑x去掉的描述符)通过 close 函数关闭掉,避免其他程序误操作这些描述符对进程通信造成影响,简化后:
下面通过一个程序来实现父子进程的双向通信
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> int main () { int pipefd1[2 ] = {0 }; int pipefd2[2 ] = {0 }; char msg[100 ]; if (pipe(pipefd1) == -1 || pipe(pipefd2) == -1 ) { perror("Unamed Pipe Create Failed" ); exit (-1 ); } if (!fork()) { close(pipefd1[1 ]); close(pipefd2[0 ]); while (1 ) { read(pipefd1[0 ], msg, sizeof (msg)); printf ("Child recv: %s" , msg); memset (msg, 0 , sizeof (msg)); read(0 , msg, sizeof (msg)); write(pipefd2[1 ], msg, sizeof (msg)); } } else { close(pipefd1[0 ]); close(pipefd2[1 ]); while (1 ) { memset (msg, 0 , sizeof (msg)); read(0 , msg, sizeof (msg)); write(pipefd1[1 ], msg, sizeof (msg)); read(pipefd2[0 ], msg, sizeof (msg)); printf ("Father recv: %s" , msg); } } return 0 ; }
执行结果:
无名管道的特性及缺点
特性:
当进程写一个所有读端都被关闭的管道时,内核会向该进程发送 SIGPIPE 信号,默认会将该进程直接终止
缺点:
无法在非亲缘进程间实现通信
无法实现进程间网状通信
有名管道
读写方式
与无名管道相反,有名管道拥有文件名,由此不论是否为亲缘进程,两个进程间也可以通信。创建有名管道的 API 为 mkfifo,函数原型如下:
1 2 3 4 #include <sys/types.h> #include <sys/stat.h> int mkfifo (const char *pathname, mode_t mode) ;
功能:创建一个有名管道,当创建的文件名在路径下已经存在时,会报文件已存在的错
参数:
pathname:文件路径名
mode:被创建时的原始权限,一般为 0664,需至少包含读写权限
返回值:
有名管道文件是特殊文件,不能用 open 函数直接创建,只能用 mkfifo 创建。创建完成后,相应目录下会生成这么一个管道文件,之后多个进程便可通过 open 函数对该文件进行读写,即进程间通信了
单向通信与双向通信
单向通信基本与无名管道相同,有名管道双向通信中同样存在抢数据的问题,同样需要两个有名管道
下面以服务器-客户端的模式来呈现有名管道双向通信,先是服务器端代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 #include <stdio.h> #include <fcntl.h> #include <errno.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <signal.h> #include <sys/stat.h> #include <sys/types.h> #define FILENAME1 "./tmp1" #define FILENAME2 "./tmp2" void removePipeFile (int signo) { remove(FILENAME1); remove(FILENAME2); puts ("" ); exit (0 ); } int createNamedPipe (char *filename, int mode) { int ret, file; ret = mkfifo(filename, 0664 ); if (ret == -1 && errno != EEXIST) { perror("Create Named Pipe Failed" ); exit (-1 ); } file = open(filename, mode); if (file == -1 ) { perror("Open File Failed" ); exit (-1 ); } return file; } int main () { char msg[100 ]; int file1 = createNamedPipe(FILENAME1, O_WRONLY); int file2 = createNamedPipe(FILENAME2, O_RDONLY); if (!fork()) { while (1 ) { memset (msg, 0 , sizeof (msg)); read(file2, msg, sizeof (msg)); printf ("Server recv: %s" , msg); } } else { signal(SIGINT, removePipeFile); while (1 ) { memset (msg, 0 , sizeof (msg)); read(0 , msg, sizeof (msg)); write(file1, msg, sizeof (msg)); } } return 0 ; }
然后是客户端代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 #include <stdio.h> #include <fcntl.h> #include <errno.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <signal.h> #include <sys/stat.h> #include <sys/types.h> #define FILENAME1 "./tmp1" #define FILENAME2 "./tmp2" int createNamedPipe (char *filename, int mode) { int ret, file; ret = mkfifo(filename, 0664 ); if (ret == -1 && errno != EEXIST) { perror("Create Named Pipe Failed" ); exit (-1 ); } file = open(filename, mode); if (file == -1 ) { perror("Open File Failed" ); exit (-1 ); } return file; } int main () { char msg[100 ]; int file1 = createNamedPipe(FILENAME1, O_RDONLY); int file2 = createNamedPipe(FILENAME2, O_WRONLY); if (!fork()) { while (1 ) { memset (msg, 0 , sizeof (msg)); read(0 , msg, sizeof (msg)); write(file2, msg, sizeof (msg)); } } else { while (1 ) { memset (msg, 0 , sizeof (msg)); read(file1, msg, sizeof (msg)); printf ("Client recv: %s" , msg); } } return 0 ; }
运行结果:
使用场合
两个不论是否为亲缘关系进程间通信时,可以使用有名管道
实现多进程间网状通信也可以使用,不过进程数量一多,实现起来就会很复杂
消息队列
原理
system V IPC 类的通信方式与管道通信实现机制不同,就消息队列来说,其本质是由内核创建的用于存放消息的双向链表, 进程通过操作同一个消息队列就能实现通信。链表上的每个结点就是一个消息,除了形成双向链表所用的结构体指针外,结点还包含一个消息包,定义如下:
1 2 3 4 struct msgbuf { long mtype; char mtext[msgsz]; };
多进程在使用消息队列通信时,需要发消息的进程会把消息挂到链表上,收取指定消息类型的进程就会把消息取下来从而达到通信目的。
消息类型好比邮箱地址,假设进程 A,B,C 在通信(三者都使用 A 创建的消息队列),A 的邮箱地址(消息类型)是 0,B 与 C 的邮箱分别是 1 和 2。现在假设 A 要给 B 和 C 发信息通知他们该吃饭了,那么 A 会产生两个消息包,一个消息包的消息类型填上 B 的邮箱 1,另一个消息包的消息类型填上 2,并且两个消息包的消息正文都填上 “该吃饭了”。之后,A 通过 API 函数将消息挂在消息队列上,B 和 C 同样通过 API 函数分别从消息队列上接收类型为 1 和 2 的信息,这样 B 和 C 就知道该吃饭了。C 想告诉 A 他吃过饭了,C 也要产生一个消息类型是 0(A 的邮箱)、正文是 “我吃过了” 的消息包通过 API 挂上消息队列,A 一收取 0 类信息也就知晓 C 吃过了。
由以上的描述,可以得知使用消息队列来实现网状交叉通信 会比有名管道容易得多。需要通信的进程通过消息队列标识符 找到消息队列的位置,然后封装好消息包,挂上消息队列,就可以与任意知晓该消息队列的进程通信了。
创建消息队列
使用 msgget 函数可以创建或者获取已存在 的消息队列,其函数原型为:
1 2 3 4 5 #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> int msgget (key_t key, int msgflg) ;
参数:
key:该函数通过 key 值来创建一个具有唯一标识符的消息队列,该值可以为:
IPC_PRIVATE:每次调用 msgget 都会创建新的消息队列
指定一个整型数:容易重复
使用 ftok 函数生成
msgflg:创建消息队列时的原始权限,需至少具有读写权限。创建新队列时,还需指定 IPC_CREAT 选项
返回值:
成功返回消息队列唯一标识符 msgid
失败返回 -1,并设置 errno
一般 key 都是通过 ftok 生成的,其原型为:
1 2 3 4 #include <sys/types.h> #include <sys/ipc.h> key_t ftok (const char *pathname, int proj_id) ;
收发消息
发送消息即将消息挂在消息队列中,通过 msgsnd 函数可以实现:
1 2 3 4 5 #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> int msgsnd (int msqid, const void *msgp, size_t msgsz, int msgflg) ;
参数:
msqid:消息队列唯一标识符
msgp:消息包地址,即上面提及的 struct msgbuf 的一个填写好的实例
msgsz:消息正文的大小
msgflg:发送模式,常用的有两种:
0:一直阻塞等直到发送成功
IPC_NOWAIT:无论发送是否成功都不会阻塞
返回值:
接收消息使用 msgrcv:
1 2 3 4 5 #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> ssize_t msgrcv (int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg) ;
参数:
msqid:消息队列唯一标识符
msgp:消息包地址
msgsz:消息正文的大小
msgtyp:要接收的消息类型(相当于你的邮箱地址)
msgflg:接收模式,常用的有两种:
0:没有消息时,接收会休眠(阻塞)
IPC_NOWAIT:无论是否接收到消息都不会阻塞
返回值:
成功返回消息正文的字节数
失败返回 -1,并设置 errno
销毁消息队列
消息队列一旦创建不会随着进程运行的结束而自动销毁,需要使用 msgctl 函数来手动销毁
1 2 3 4 5 #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> int msgctl (int msqid, int cmd, struct msqid_ds *buf) ;
功能:该函数根据参数 cmd 指定的功能来控制消息队列,可以做到:
参数:
msqid:消息队列唯一标识符
cmd:控制选项,常用的有:
IPC_STAT:将 msqid 指向的消息队列属性读到第三个参数 buf 中
IPC_SET:用 buf 中的消息队列属性修改消息队列属性
IPC_RMID:销毁消息队列,第三个参数为 NULL
buf:根据 cmd 的不同有不同的解释方式
消息队列的属性用一个结构体 msqid_ds 记录,里面记录了消息队列当前消息条数、所有消息总字节数等信息,这里不再展开
终端中查看消息队列
在终端中可以通过 ipcs -q 指令查看当前存在的消息队列
也可以通过 ipcrm -q + 对应 msqid 来删除指定的消息队列
代码实现
下面实现了一个多进程间通过消息队列来通信的程序,父进程发送消息,子进程接收消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 #include <stdio.h> #include <fcntl.h> #include <stdlib.h> #include <string.h> #include <signal.h> #include <unistd.h> #include <sys/ipc.h> #include <sys/msg.h> #include <sys/types.h> #define MSGSIZE 1024 #define PATHNAME "./tmp" int msgId; struct msgbuf { long mtype; char mtext[MSGSIZE]; } msgSendPackage, msgRecvPackage; typedef struct msgbuf msgBuf ;void printError (char *error) { perror(error); exit (-1 ); } void removeMessageQueue (int signo) { remove(PATHNAME); msgctl(msgId, IPC_RMID, NULL ); puts ("" ); exit (0 ); } int createMessageQueue () { int fd = open(PATHNAME, O_RDWR|O_CREAT, 0664 ); if (fd == -1 ) printError("Create Tmp File Failed" ); close(fd); key_t key = ftok(PATHNAME, 'a' ); int msgId = msgget(key, 0664 |IPC_CREAT); if (msgId == -1 ) printError("Create/Get Queue Failed" ); return msgId; } int main (int argc, char **argv) { if (argc < 2 ) printError("Please bring the type you want to receive" ); long mtype = atol(argv[1 ]); msgId = createMessageQueue(); if (!fork()) { while (1 ) { memset (&msgRecvPackage, 0 , sizeof (msgBuf)); int recvSize = msgrcv(msgId, &msgRecvPackage, MSGSIZE, mtype, 0 ); write(1 , msgRecvPackage.mtext, recvSize); } } else { signal(SIGINT, removeMessageQueue); while (1 ) { memset (&msgSendPackage, 0 , sizeof (msgBuf)); scanf ("%ld" , &msgSendPackage.mtype); read(0 , msgSendPackage.mtext, sizeof (msgSendPackage.mtext)); msgsnd(msgId, &msgSendPackage, MSGSIZE, 0 ); } } return 0 ; }
运行结果:
共享内存
原理
之前提到虚拟内存的出现使得进程空间独立,各进程间没有交叠的空间。共享内存并不是使得进程间有交叠的空间,而是 OS 在物理内存中划出一块区域,进程将自己空闲的逻辑空间地址一一映射过去,进程在操作该逻辑地址时,相当于直接操作共享的物理内存。多个进程空闲地址都可以映射到同一块共享物理内存,从而实现进程间通信。如图:
从图中可以看到,进程 A 和 B 都拿出空闲的虚拟内存地址去映射相同的一块物理内存地址,这样 A 在该区域放数据,B 可以取,反之亦然
创建并映射共享内存
与消息队列相似,使用 shmget 函数可以创建或获取已存在的共享内存
1 2 3 4 #include <sys/ipc.h> #include <sys/shm.h> int shmget (key_t key, size_t size, int shmflg) ;
参数:
key:该函数通过 key 值来创建一个具有唯一标识符的共享内存,该值可以为:
IPC_PRIVATE:每次调用 msgget 都会创建新的共享内存
指定一个整型数:容易重复
使用 ftok 函数生成
size:指定共享内存的大小,一般是虚拟页的大小(4k)的整数倍
shmflg:创建消息队列时的原始权限,需至少具有读写权限。创建新队列时,还需指定 IPC_CREAT 选项
返回:
成功返回共享内存的 shmid
失败返回 -1,并设置 errno
光创建/获取到共享内存还不够,进程需调用 shmat 函数将自己空闲的虚拟内存地址映射过去:
1 2 3 4 #include <sys/types.h> #include <sys/shm.h> void *shmat (int shmid, const void *shmaddr, int shmflg) ;
参数:
shmid:共享内存标识符
shmaddr:指定虚拟内存映射的起始地址,有两种设置方式:
自己指定
NULL:由 OS 自己选择,一般使用这种方式
shmflg:指定映射条件:
0:可读可写的方式映射
SHM_RDONLY:只读方式映射
返回:
成功返回映射起始地址
失败返回 (void *)-1,并设置 errno
取消映射及销毁共享内存
进程结束时会自动取消映射但不会销毁共享内存 ,取消映射的函数是 shmdt
1 2 3 4 #include <sys/types.h> #include <sys/shm.h> int shmdt (const void *shmaddr) ;
参数就是映射的起始地址,成功返回 0,失败返回 -1,并设置 errno
销毁共享内存的函数是 shmctl,与消息队列的 msgctl 相似,该函数具有控制共享内存的功能,函数原型为:
1 2 3 4 #include <sys/ipc.h> #include <sys/shm.h> int shmctl (int shmid, int cmd, struct shmid_ds *buf) ;
功能:该函数根据参数 cmd 指定的功能来控制共享内存,可以做到:
参数:
msqid:共享内存唯一标识符
cmd:控制选项,常用的有:
IPC_STAT:将 shmid 指向的共享内存属性读到第三个参数 buf 中
IPC_SET:用 buf 中的共享内存属性修改共享内存属性
IPC_RMID:销毁共享内存,第三个参数为 NULL
buf:根据 cmd 的不同有不同的解释方式
共享内存的属性用一个结构体 shmid_ds 记录,里面记录了共享内存当前映射数量,将其创建的进程 PID 等信息,这里同样不作展开
此外,当多个进程同时有地址映射在同一片共享内存上时,只有当所有进程都取消映射 后,调用 shmctl 销毁共享内存才有效。
终端中查看共享内存
在终端中可以通过 ipcs -m 指令查看当前存在的共享内存
也可以通过 ipcrm -m + 对应 shmid 来删除指定的共享内存
代码实现
同样通过服务器-客户端模式来呈现,可以同有名管道一样使用两块共享内存来防止双向通信时进程抢数据。不过这里我打算用一块共享内存 + 同步手段(信号)来实现双向通信。为了使进程间读写数据同步,考虑首先通过两个有名管道将两个进程的 pid 值互相交换。读进程休眠直到写进程完成写操作后通过 kill 函数发送 SIGUSR1 信号唤醒,将数据读出。
因为两端代码大同小异,可以把重复的代码写进头文件里
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 #include <stdio.h> #include <errno.h> #include <fcntl.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <signal.h> #include <sys/ipc.h> #include <sys/shm.h> #include <sys/stat.h> #include <sys/types.h> #define SHMSIZE 4096 #define PATHNAME "./shmTmp" #define FILENAME1 "./pipeTmp1" #define FILENAME2 "./pipeTmp2" int shmId; void *shmAddr;void printError (char *error) { perror(error); exit (-1 ); } void wakeUp (int signo) {}void serverRemoveSharedMemory (int signo) { remove(PATHNAME); remove(FILENAME1); remove(FILENAME2); shmdt(shmAddr); shmctl(shmId, IPC_RMID, NULL ); puts ("" ); exit (0 ); } void clientRemoveSharedMemory (int signo) { shmdt(shmAddr); shmctl(shmId, IPC_RMID, NULL ); puts ("" ); exit (0 ); } int createNamedPipe (char *filename, int mode) { int ret, file; ret = mkfifo(filename, 0664 ); if (ret == -1 && errno != EEXIST) printError("Create Named Pipe Failed" ); file = open(filename, mode); if (file == -1 ) printError("Open File Failed" ); return file; } int createSharedMemory () { int fd = open(PATHNAME, O_RDWR|O_CREAT, 0664 ); if (fd == -1 ) printError("Create Tmp File Failed" ); close(fd); key_t key = ftok(PATHNAME, 'b' ); int shmId = shmget(key, SHMSIZE, 0664 |IPC_CREAT); if (shmId == -1 ) printError("Create/Get Shared Memory Failed" ); return shmId; }
服务器端代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 #include "shareMemory.h" int main () { char buf[SHMSIZE]; shmId = createSharedMemory(); shmAddr = shmat(shmId, NULL , 0 ); int pipeFile1 = createNamedPipe(FILENAME1, O_WRONLY); int pipeFile2 = createNamedPipe(FILENAME2, O_RDONLY); if (!fork()) { pid_t selfPid = getpid(); signal(SIGUSR1, wakeUp); write(pipeFile1, &selfPid, sizeof (pid_t )); while (1 ) { pause(); printf ("Server recv: %s" , (char *)shmAddr); memset (shmAddr, 0 , SHMSIZE); } } else { pid_t otherPid = 0 ; signal(SIGINT, serverRemoveSharedMemory); read(pipeFile2, &otherPid, sizeof (pid_t )); while (1 ) { memset (buf, 0 , sizeof (buf)); read(0 , buf, sizeof (buf)); memcpy (shmAddr, buf, SHMSIZE); kill(otherPid, SIGUSR1); } } return 0 ; }
客户端代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 #include "shareMemory.h" int main () { char buf[SHMSIZE]; shmId = createSharedMemory(); shmAddr = shmat(shmId, NULL , 0 ); int pipeFile1 = createNamedPipe(FILENAME1, O_RDONLY); int pipeFile2 = createNamedPipe(FILENAME2, O_WRONLY); if (!fork()) { pid_t otherPid = 0 ; read(pipeFile1, &otherPid, sizeof (pid_t )); while (1 ) { memset (buf, 0 , sizeof (buf)); read(0 , buf, sizeof (buf)); memcpy (shmAddr, buf, SHMSIZE); kill(otherPid, SIGUSR1); } } else { pid_t selfPid = getpid(); signal(SIGUSR1, wakeUp); signal(SIGINT, clientRemoveSharedMemory); write(pipeFile2, &selfPid, sizeof (pid_t )); while (1 ) { pause(); printf ("Client recv: %s" , (char *)shmAddr); memset (shmAddr, 0 , SHMSIZE); } } return 0 ; }
运行结果:
信号量
原理
设想现有 A,B 两个进程想要写同一个文件,这两个进程要写的数据很大,在他们各自的 1 个时间片内都写不完,如果对文件没有资源保护的手段,那么 A 进程的数据写了一半,B 进程又拿过去写,就会破坏 A 进程数据的连续性。为了解决这个问题,A 在自己数据没写完的情况下将文件上锁。A 时间片用尽,切换到 B 时,B 一看文件上有把锁,就无法往文件里写东西了。再切换回 A 时,A 又可以继续往文件里写剩下的数据,写完后将锁解开。这样下次切换到 B 时,B 就可以对文件进行操作了,A 上的锁就是所谓的信号量。
以上是对信号量直观的感受,具体实现时,一个信号量就相当于 OS 声明的一个全局变量 ,所有进程都可以访问,上述例子就可以转化为:假设信号量初始值是 1(表示文件还未被任何进程占用),A 进程在开始写文件之前先将信号量值减 1(P 操作),表示 A 占用了这个文件资源(上锁)。A 写完所有数据后,将信号量值加 1(V 操作),表示文件可以被其他进程占用了(释放锁)。所以不论 A 要写多少个时间片,只要没写完,信号量的值一直都会是 0。切换到 B 进程时,B 进程访问信号量的值,值为 0,说明文件上了锁,不该我操作,我就休眠;值为 1,说明别的进程操作完了,我可以操作了,于是 B 进程同样在开始写数据前将信号量减 1,在写完数据后加 1。这样一来,不论多少个进程要写同一个文件,只要他们共享同一个信号量,就可以保证他们对文件操作的互斥性
在保证互斥 的基础上,如果再对使用文件的进程顺序有要求(比如必须 A 先写,B 再读),也可以用信号量实现,这就是所谓的同步
综上所述,信号量可以满足多进程对资源访问的互斥 和同步 要求
创建信号量集并为其中信号量赋初值
Linux 中不能单独创建一个信号量,但可以创建一个信号量集,使其中仅包含一个信号量,创建的 API 是 semget
1 2 3 4 5 #include <sys/types.h> #include <sys/ipc.h> #include <sys/sem.h> int semget (key_t key, int nsems, int semflg) ;
参数:
key:同消息队列与共享内存
nsems:指定信号集中信号量的数量
semflg:同消息队列与共享内存
返回值:
成功返回信号量集 id
失败返回 -1,并设置 errno
为信号量集中的信号量赋初值使用 semctl 函数
1 2 3 4 5 #include <sys/types.h> #include <sys/ipc.h> #include <sys/sem.h> int semctl (int semid, int semnum, int cmd, ...) ;
参数:
semid:信号量集 id
semnum:要操作的信号量在信号量集中的编号(从 0 开始递增)
cmd:设置为 SETVAL 表示为信号量赋值
可选:第四个参数为联合体 semun
返回值:
成功返回非 -1 值
失败返回 -1,并设置 errno
以下是 semun 联合体的定义,头文件中没有对这个联合体定义,编程时需要自己定义
1 2 3 4 5 6 union semun { int val; struct semid_ds *buf ; unsigned short *array ; struct seminfo *__buf ; };
因为是赋值,所以只会用到 val 字段,下面给出一个赋值的例子
1 2 3 union semun tmp ; tmp.val = 2 ; semctl(semid, 0 , SETVAL, tmp);
P、V 操作
对信号量的值进行修改(+1 或 -1)使用 semop 函数
1 2 3 4 5 #include <sys/types.h> #include <sys/ipc.h> #include <sys/sem.h> int semop (int semid, struct sembuf *sops, unsigned nsops) ;
参数:
semid:信号量集 id
sops:要操作的信号量结构体 sembuf 数组
nsops:结构体数组的元素个数
返回值:
sembuf 结构体无需自己定义,semop 头文件中已经定义
1 2 3 4 5 struct sembuf { unsigned short sem_num; short sem_op; short sem_flg; };
各字段含义:
sem_num:信号量编号
sem_op:设置为 1 表示 V 操作,设置为 -1 表示 P 操作
sem_flg:
SEM_UNDO:防止死锁(一般设置为这个)
IPC_NOWAIT:信号量值为 0 时,P 操作不会阻塞
销毁信号集
与赋初值一样使用 semctl,不过需要将 cmd 参数的值设置为 IPC_RMID,且第二个参数无意义,一般设置为 0,形如:semctl(semid, 0, IPC_RMID)
终端中查看信号量集
在终端中可以通过 ipcs -s 指令查看当前存在的信号量集
也可以通过 ipcrm -s + 对应 semid 来删除指定的信号量集
代码实现
使用共享内存的代码框架,这次用信号量实现读写同步,头文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 #include <stdio.h> #include <errno.h> #include <fcntl.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <signal.h> #include <sys/ipc.h> #include <sys/shm.h> #include <sys/sem.h> #include <sys/stat.h> #include <sys/types.h> #define NSEMS 2 #define SHMSIZE 4096 #define PATHNAME "./shmTmp" int semId; int shmId;void *shmAddr;void deleteSemaphore () ;union semun { int val; struct semid_ds *buf ; unsigned short *array ; struct seminfo *__buf ; }; typedef union semun semUn ;void printError (char *error) { perror(error); exit (-1 ); } void serverRemoveSharedMemory (int signo) { remove(PATHNAME); shmdt(shmAddr); shmctl(shmId, IPC_RMID, NULL ); semctl(semId, 0 , IPC_RMID); puts ("" ); exit (0 ); } void clientRemoveSharedMemory (int signo) { shmdt(shmAddr); shmctl(shmId, IPC_RMID, NULL ); puts ("" ); exit (0 ); } int createSharedMemory () { int fd = open(PATHNAME, O_RDWR|O_CREAT, 0664 ); if (fd == -1 ) printError("Create Tmp File Failed" ); close(fd); key_t key = ftok(PATHNAME, 'b' ); int shmId = shmget(key, SHMSIZE, 0664 |IPC_CREAT); if (shmId == -1 ) printError("Create/Get Shared Memory Failed" ); return shmId; } int createSemaphore () { key_t key = ftok(PATHNAME, 'c' ); int semId = semget(key, NSEMS, 0664 |IPC_CREAT); if (semId == -1 ) printError("Create/Get Semaphore Failed" ); return semId; } void initSemaphore (int buf[]) { int i, ret; semUn tmp; for (i = 0 ; i < NSEMS ; i++) { tmp.val = buf[i]; ret = semctl(semId, i, SETVAL, tmp); if (ret == -1 ) printError("Init Semaphore Failed" ); } } void P (int buf[], int length) { int i, ret; struct sembuf semBuf [length ]; for (i = 0 ; i < length ; i++) { semBuf[i].sem_num = buf[i]; semBuf[i].sem_op = -1 ; semBuf[i].sem_flg = SEM_UNDO; } ret = semop(semId, semBuf, length); if (ret == -1 ) printError("Operation P Failed" ); } void V (int buf[], int length) { int i, ret; struct sembuf semBuf [length ]; for (i = 0 ; i < length ; i++) { semBuf[i].sem_num = buf[i]; semBuf[i].sem_op = 1 ; semBuf[i].sem_flg = SEM_UNDO; } ret = semop(semId, semBuf, length); if (ret == -1 ) printError("Operation V Failed" ); }
服务器端使用信号量 0 控制写操作的完成,客户端使用信号量 1 控制写操作的完成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 #include "shmWithSemaphore.h" int main () { char buf[SHMSIZE]; int initBuf[NSEMS] = {0 , 0 }; shmId = createSharedMemory(); shmAddr = shmat(shmId, NULL , 0 ); semId = createSemaphore(); initSemaphore(initBuf); if (!fork()) { int pBuf[1 ] = {1 }; while (1 ) { P(pBuf, 1 ); printf ("Server recv: %s" , (char *)shmAddr); memset (shmAddr, 0 , SHMSIZE); } } else { int vBuf[1 ] = {0 }; signal(SIGINT, serverRemoveSharedMemory); while (1 ) { memset (buf, 0 , sizeof (buf)); read(0 , buf, sizeof (buf)); memcpy (shmAddr, buf, SHMSIZE); V(vBuf, 1 ); } } return 0 ; }
客户端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 #include "shmWithSemaphore.h" int main () { char buf[SHMSIZE]; shmId = createSharedMemory(); shmAddr = shmat(shmId, NULL , 0 ); semId = createSemaphore(); if (!fork()) { int pBuf[1 ] = {0 }; while (1 ) { P(pBuf, 1 ); printf ("Client recv: %s" , (char *)shmAddr); memset (shmAddr, 0 , SHMSIZE); } } else { int vBuf[1 ] = {1 }; signal(SIGINT, clientRemoveSharedMemory); while (1 ) { memset (buf, 0 , sizeof (buf)); read(0 , buf, sizeof (buf)); memcpy (shmAddr, buf, SHMSIZE); V(vBuf, 1 ); } } return 0 ; }