为什么需要进程间通信

进程间通信(IPC)好比 RPG 游戏中的事件,主角需要与 NPC 对话,从他们口中得知重要情报以推进游戏。多进程的程序,进程间几乎都会涉及数据的共享、事件的到来、消息的通知等。

考虑进程间如果有重叠的空间,进程 A 将通信数据放在重叠部分,进程 B 从重叠部分直接取走数据,那么进程间通信就会非常轻松。但是这样做可能会带来一些安全隐患,比如进程 A 对重叠部分的修改导致了对进程 B 控制流的劫持、或者对 B 的数据区域造成了误修改,于是引进了虚拟内存这项技术

有虚存的存在,每个进程享有自己独立的进程空间,进程间不会有交叠部分,但是他们之间的通信就变得困难起来。为了解决这个问题,操作系统就作为进程间通信的第三方,进程 A 要与进程 B 通信,需先将数据交由 OS,再由 OS 来通知进程 B,从而实现进程间的间接交流


Linux 进程间通信方式

  • 管道:
    • 无名管道
    • 有名管道
  • system V IPC
    • 消息队列
    • 共享内存
    • 信号量
  • 域套接字

本文只对管道与 system V IPC 两个通信类别进行探讨


无名管道

什么是管道及其读写方式

管道是内核在自己所在的内核空间里开辟的一块缓存空间,其以文件的方式读写,读写时会用到 write,read 等文件 IO 函数,也会有文件读写用的文件描述符。

无名管道正是因为其没有文件名,所以才叫做无名管道。那么没有文件名,自然无法用 open 函数来打开管道文件并获取文件描述符。针对这种情况,内核提供了 pipe 系统 API,其函数原型为:

1
2
3
#include <unistd.h>

int pipe(int pipefd[2]);
  • 功能:创建一个用于亲缘进程之间通信的无名管道,并将管道与两个读写文件描述符关联起来
  • 参数:
    • pipefd[0]:用于存放读管道的读文件描述符
    • pipefd[1]:用于存放写管道的写文件描述符
  • 返回值:
    • 成功返回 0
    • 失败返回 -1,并设置 errno

亲缘进程就是存在继承关系的进程,分为两种:

  • 直接继承关系:父进程 -> 子进程
  • 间接继承关系:父进程 -> 子进程 -> 孙进程 -> …

也就是说,无名管道通过父进程调用 pipe 创建,该管道的读写描述符放在 pipefd 数组中,父进程再调用 fork 创建子进程。子进程由于继承父进程的数据空间,也会有 pipefd 数组,由此获知无名管道的文件描述符。于是,对于有继承关系的进程,都可以与这条继承链上的其他进程进行通信。


单向通信

父子进程之间单向通信的图示如下:

单向通信

要么父进程通过蓝色的线路与子进程通信,要么子进程通过红色的线路与父进程通信,一个管道不能满足父子进程之间的双向通信,因为进程写到管道里的东西会被自己抢先读出来

需要注意的是,如果管道中没有数据,读操作会使进程休眠(阻塞),如同 scanf 函数等待输入


双向通信

若想实现父子进程之间双向通信,就需要两个管道

图示如下:

双向通信

管道 1 用于父进程向子进程通信,管道 2 用于子进程向父进程通信。不需要的管道(图中用黑x去掉的描述符)通过 close 函数关闭掉,避免其他程序误操作这些描述符对进程通信造成影响,简化后:

双向通信

下面通过一个程序来实现父子进程的双向通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

int main()
{
int pipefd1[2] = {0};
int pipefd2[2] = {0};
char msg[100];
// 使用 pipe 创建两个管道
if(pipe(pipefd1) == -1 || pipe(pipefd2) == -1)
{
perror("Unamed Pipe Create Failed");
exit(-1);
}

if(!fork())
{ // 子进程程序部分
close(pipefd1[1]);
close(pipefd2[0]); // 关闭子进程管道 1 写与管道 2 读
while(1)
{
read(pipefd1[0], msg, sizeof(msg)); // 从管道 1 中读入数据并打印
printf("Child recv: %s", msg);
memset(msg, 0, sizeof(msg));
read(0, msg, sizeof(msg)); // 从标准输入读入字符串
write(pipefd2[1], msg, sizeof(msg)); // 写到管道 2 中
}
}
else
{ // 父进程程序部分
close(pipefd1[0]);
close(pipefd2[1]); // 关闭父进程管道 1 读与管道 2 写
while(1)
{
memset(msg, 0, sizeof(msg));
read(0, msg, sizeof(msg)); // 从标准输入读入字符串
write(pipefd1[1], msg, sizeof(msg)); // 写到管道 1 中
read(pipefd2[0], msg, sizeof(msg)); // 从管道 2 读入数据并打印
printf("Father recv: %s", msg);
}
}
return 0;
}

执行结果:

双向通信

无名管道的特性及缺点

特性:

  • 当进程写一个所有读端都被关闭的管道时,内核会向该进程发送 SIGPIPE 信号,默认会将该进程直接终止

缺点:

  • 无法在非亲缘进程间实现通信
  • 无法实现进程间网状通信

有名管道

读写方式

与无名管道相反,有名管道拥有文件名,由此不论是否为亲缘进程,两个进程间也可以通信。创建有名管道的 API 为 mkfifo,函数原型如下:

1
2
3
4
#include <sys/types.h>
#include <sys/stat.h>

int mkfifo(const char *pathname, mode_t mode);
  • 功能:创建一个有名管道,当创建的文件名在路径下已经存在时,会报文件已存在的错
  • 参数:
    • pathname:文件路径名
    • mode:被创建时的原始权限,一般为 0664,需至少包含读写权限
  • 返回值:
    • 成功返回 0
    • 失败返回 -1,并设置 errno

有名管道文件是特殊文件,不能用 open 函数直接创建,只能用 mkfifo 创建。创建完成后,相应目录下会生成这么一个管道文件,之后多个进程便可通过 open 函数对该文件进行读写,即进程间通信了


单向通信与双向通信

单向通信基本与无名管道相同,有名管道双向通信中同样存在抢数据的问题,同样需要两个有名管道

下面以服务器-客户端的模式来呈现有名管道双向通信,先是服务器端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// 服务器端代码
// gcc namedPipe1.c -o n1
#include <stdio.h>
#include <fcntl.h>
#include <errno.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <signal.h>
#include <sys/stat.h>
#include <sys/types.h>

#define FILENAME1 "./tmp1"
#define FILENAME2 "./tmp2"

// 当按下 Ctrl+C 终止时删除有名管道文件
void removePipeFile(int signo)
{
remove(FILENAME1);
remove(FILENAME2);
puts("");
exit(0);
}

// 创建/打开有名管道
int createNamedPipe(char *filename, int mode)
{
int ret, file;

ret = mkfifo(filename, 0664);
if(ret == -1 && errno != EEXIST) // 忽略文件已存在时的报错
{
perror("Create Named Pipe Failed");
exit(-1);
}

file = open(filename, mode); // 将有名管道以 mode 方式打开并返回文件描述符
if(file == -1)
{
perror("Open File Failed");
exit(-1);
}
return file;
}

int main()
{
char msg[100];
int file1 = createNamedPipe(FILENAME1, O_WRONLY); // 管道 1 用于服务端发信息
int file2 = createNamedPipe(FILENAME2, O_RDONLY); // 管道 2 用于服务端收信息

if(!fork())
{ // 服务端子进程用于从客户端接收信息
while(1)
{
memset(msg, 0, sizeof(msg));
read(file2, msg, sizeof(msg));
printf("Server recv: %s", msg);
}
}
else
{ // 服务端父进程用于向客户端发送信息
signal(SIGINT, removePipeFile);
while(1)
{
memset(msg, 0, sizeof(msg));
read(0, msg, sizeof(msg));
write(file1, msg, sizeof(msg));
}
}
return 0;
}

然后是客户端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// 客户端代码
// gcc namedPipe2.c -o n2
#include <stdio.h>
#include <fcntl.h>
#include <errno.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <signal.h>
#include <sys/stat.h>
#include <sys/types.h>

#define FILENAME1 "./tmp1"
#define FILENAME2 "./tmp2"

int createNamedPipe(char *filename, int mode)
{
int ret, file;

ret = mkfifo(filename, 0664);
if(ret == -1 && errno != EEXIST)
{
perror("Create Named Pipe Failed");
exit(-1);
}
file = open(filename, mode);
if(file == -1)
{
perror("Open File Failed");
exit(-1);
}
return file;
}

int main()
{
char msg[100];
int file1 = createNamedPipe(FILENAME1, O_RDONLY); // 管道 1 用于客户端收信息
int file2 = createNamedPipe(FILENAME2, O_WRONLY); // 管道 2 用于客户端发信息

if(!fork())
{ // 客户端子进程用于向服务器发送信息
while(1)
{
memset(msg, 0, sizeof(msg));
read(0, msg, sizeof(msg));
write(file2, msg, sizeof(msg));
}
}
else
{ // 客户端父进程用于从服务器接收信息
while(1)
{
memset(msg, 0, sizeof(msg));
read(file1, msg, sizeof(msg));
printf("Client recv: %s", msg);
}
}
return 0;
}

运行结果:

双向通信

使用场合

  • 两个不论是否为亲缘关系进程间通信时,可以使用有名管道
  • 实现多进程间网状通信也可以使用,不过进程数量一多,实现起来就会很复杂

消息队列

原理

system V IPC 类的通信方式与管道通信实现机制不同,就消息队列来说,其本质是由内核创建的用于存放消息的双向链表,进程通过操作同一个消息队列就能实现通信。链表上的每个结点就是一个消息,除了形成双向链表所用的结构体指针外,结点还包含一个消息包,定义如下:

1
2
3
4
struct msgbuf {
long mtype; // 消息类型
char mtext[msgsz]; // 消息正文,容量为 msgsz
};

多进程在使用消息队列通信时,需要发消息的进程会把消息挂到链表上,收取指定消息类型的进程就会把消息取下来从而达到通信目的。

消息类型好比邮箱地址,假设进程 A,B,C 在通信(三者都使用 A 创建的消息队列),A 的邮箱地址(消息类型)是 0,B 与 C 的邮箱分别是 1 和 2。现在假设 A 要给 B 和 C 发信息通知他们该吃饭了,那么 A 会产生两个消息包,一个消息包的消息类型填上 B 的邮箱 1,另一个消息包的消息类型填上 2,并且两个消息包的消息正文都填上 “该吃饭了”。之后,A 通过 API 函数将消息挂在消息队列上,B 和 C 同样通过 API 函数分别从消息队列上接收类型为 1 和 2 的信息,这样 B 和 C 就知道该吃饭了。C 想告诉 A 他吃过饭了,C 也要产生一个消息类型是 0(A 的邮箱)、正文是 “我吃过了” 的消息包通过 API 挂上消息队列,A 一收取 0 类信息也就知晓 C 吃过了。

由以上的描述,可以得知使用消息队列来实现网状交叉通信会比有名管道容易得多。需要通信的进程通过消息队列标识符找到消息队列的位置,然后封装好消息包,挂上消息队列,就可以与任意知晓该消息队列的进程通信了。


创建消息队列

使用 msgget 函数可以创建或者获取已存在的消息队列,其函数原型为:

1
2
3
4
5
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgget(key_t key, int msgflg);
  • 参数:

    • key:该函数通过 key 值来创建一个具有唯一标识符的消息队列,该值可以为:
      • IPC_PRIVATE:每次调用 msgget 都会创建新的消息队列
      • 指定一个整型数:容易重复
      • 使用 ftok 函数生成
    • msgflg:创建消息队列时的原始权限,需至少具有读写权限。创建新队列时,还需指定 IPC_CREAT 选项
  • 返回值:

    • 成功返回消息队列唯一标识符 msgid
    • 失败返回 -1,并设置 errno

一般 key 都是通过 ftok 生成的,其原型为:

1
2
3
4
#include <sys/types.h>
#include <sys/ipc.h>

key_t ftok(const char *pathname, int proj_id);
  • 功能:通过 pathname 与 proj_id 生成唯一的 key,只要两个参数的值不变,key 值就不会变

  • 参数:

    • pathname:文件路径名

    • proj_id:只会使用该参数的低八位,故常被指定为一个字符

  • 返回值:

    • 成功返回生成的 key 值
    • 失败返回 -1,并设置 errno

收发消息

  1. 发送消息即将消息挂在消息队列中,通过 msgsnd 函数可以实现:
1
2
3
4
5
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
  • 参数:

    • msqid:消息队列唯一标识符
    • msgp:消息包地址,即上面提及的 struct msgbuf 的一个填写好的实例
    • msgsz:消息正文的大小
    • msgflg:发送模式,常用的有两种:
      • 0:一直阻塞等直到发送成功
      • IPC_NOWAIT:无论发送是否成功都不会阻塞
  • 返回值:

    • 成功返回 0
    • 失败返回 -1,并设置 errno
  1. 接收消息使用 msgrcv:
1
2
3
4
5
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
  • 参数:

    • msqid:消息队列唯一标识符
    • msgp:消息包地址
    • msgsz:消息正文的大小
    • msgtyp:要接收的消息类型(相当于你的邮箱地址)
    • msgflg:接收模式,常用的有两种:
      • 0:没有消息时,接收会休眠(阻塞)
      • IPC_NOWAIT:无论是否接收到消息都不会阻塞
  • 返回值:

    • 成功返回消息正文的字节数
    • 失败返回 -1,并设置 errno

销毁消息队列

消息队列一旦创建不会随着进程运行的结束而自动销毁,需要使用 msgctl 函数来手动销毁

1
2
3
4
5
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgctl(int msqid, int cmd, struct msqid_ds *buf);
  • 功能:该函数根据参数 cmd 指定的功能来控制消息队列,可以做到:
    • 获取、修改消息队列属性信息
    • 销毁消息队列等
  • 参数:
    • msqid:消息队列唯一标识符
    • cmd:控制选项,常用的有:
      • IPC_STAT:将 msqid 指向的消息队列属性读到第三个参数 buf 中
      • IPC_SET:用 buf 中的消息队列属性修改消息队列属性
      • IPC_RMID:销毁消息队列,第三个参数为 NULL
    • buf:根据 cmd 的不同有不同的解释方式

消息队列的属性用一个结构体 msqid_ds 记录,里面记录了消息队列当前消息条数、所有消息总字节数等信息,这里不再展开


终端中查看消息队列

在终端中可以通过 ipcs -q 指令查看当前存在的消息队列

也可以通过 ipcrm -q + 对应 msqid 来删除指定的消息队列


代码实现

下面实现了一个多进程间通过消息队列来通信的程序,父进程发送消息,子进程接收消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
#include <stdio.h>
#include <fcntl.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <unistd.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/types.h>

#define MSGSIZE 1024 // 定义数据正文大小
#define PATHNAME "./tmp" // 生成 key 用到的路径名

int msgId; // 将 msgid 定义为全局变量方便通信结束销毁消息队列

struct msgbuf{
long mtype;
char mtext[MSGSIZE];
} msgSendPackage, msgRecvPackage; // 声明用于发送和接收消息的消息包
typedef struct msgbuf msgBuf;

// 打印出错的函数
void printError(char *error)
{
perror(error);
exit(-1);
}

// 按下 Ctrl+C 时删除临时文件并销毁消息队列
void removeMessageQueue(int signo)
{
remove(PATHNAME);
msgctl(msgId, IPC_RMID, NULL);
puts("");
exit(0);
}

// 创建/获取消息队列 id
int createMessageQueue()
{
int fd = open(PATHNAME, O_RDWR|O_CREAT, 0664); // 创建一个临时文件
if(fd == -1)
printError("Create Tmp File Failed");
close(fd);

key_t key = ftok(PATHNAME, 'a'); // 根据临时文件路径生成 key
int msgId = msgget(key, 0664|IPC_CREAT); // 根据 key 生成/获取消息队列 id
if(msgId == -1)
printError("Create/Get Queue Failed");
return msgId;
}

int main(int argc, char **argv)
{
if(argc < 2)
printError("Please bring the type you want to receive");
long mtype = atol(argv[1]); // 通过命令行参数指定自己的邮箱地址
msgId = createMessageQueue();

if(!fork())
{ // 子进程接收在自己邮箱中的消息
while(1)
{
memset(&msgRecvPackage, 0, sizeof(msgBuf));
int recvSize = msgrcv(msgId, &msgRecvPackage, MSGSIZE, mtype, 0);
write(1, msgRecvPackage.mtext, recvSize);
}
}
else
{ // 父进程向其他进程发送消息
signal(SIGINT, removeMessageQueue);
while(1)
{
memset(&msgSendPackage, 0, sizeof(msgBuf));
scanf("%ld", &msgSendPackage.mtype); // 目的邮箱与消息正文由终端输入
read(0, msgSendPackage.mtext, sizeof(msgSendPackage.mtext));
msgsnd(msgId, &msgSendPackage, MSGSIZE, 0);
}
}
return 0;
}

运行结果:

消息队列


共享内存

原理

之前提到虚拟内存的出现使得进程空间独立,各进程间没有交叠的空间。共享内存并不是使得进程间有交叠的空间,而是 OS 在物理内存中划出一块区域,进程将自己空闲的逻辑空间地址一一映射过去,进程在操作该逻辑地址时,相当于直接操作共享的物理内存。多个进程空闲地址都可以映射到同一块共享物理内存,从而实现进程间通信。如图:

共享内存

从图中可以看到,进程 A 和 B 都拿出空闲的虚拟内存地址去映射相同的一块物理内存地址,这样 A 在该区域放数据,B 可以取,反之亦然


创建并映射共享内存

与消息队列相似,使用 shmget 函数可以创建或获取已存在的共享内存

1
2
3
4
#include <sys/ipc.h>
#include <sys/shm.h>

int shmget(key_t key, size_t size, int shmflg);
  • 参数:
    • key:该函数通过 key 值来创建一个具有唯一标识符的共享内存,该值可以为:
      • IPC_PRIVATE:每次调用 msgget 都会创建新的共享内存
      • 指定一个整型数:容易重复
      • 使用 ftok 函数生成
    • size:指定共享内存的大小,一般是虚拟页的大小(4k)的整数倍
    • shmflg:创建消息队列时的原始权限,需至少具有读写权限。创建新队列时,还需指定 IPC_CREAT 选项
  • 返回:
    • 成功返回共享内存的 shmid
    • 失败返回 -1,并设置 errno

光创建/获取到共享内存还不够,进程需调用 shmat 函数将自己空闲的虚拟内存地址映射过去:

1
2
3
4
#include <sys/types.h>
#include <sys/shm.h>

void *shmat(int shmid, const void *shmaddr, int shmflg);
  • 参数:
    • shmid:共享内存标识符
    • shmaddr:指定虚拟内存映射的起始地址,有两种设置方式:
      • 自己指定
      • NULL:由 OS 自己选择,一般使用这种方式
    • shmflg:指定映射条件:
      • 0:可读可写的方式映射
      • SHM_RDONLY:只读方式映射
  • 返回:
    • 成功返回映射起始地址
    • 失败返回 (void *)-1,并设置 errno

取消映射及销毁共享内存

进程结束时会自动取消映射但不会销毁共享内存,取消映射的函数是 shmdt

1
2
3
4
#include <sys/types.h>
#include <sys/shm.h>

int shmdt(const void *shmaddr);

参数就是映射的起始地址,成功返回 0,失败返回 -1,并设置 errno

销毁共享内存的函数是 shmctl,与消息队列的 msgctl 相似,该函数具有控制共享内存的功能,函数原型为:

1
2
3
4
#include <sys/ipc.h>
#include <sys/shm.h>

int shmctl(int shmid, int cmd, struct shmid_ds *buf);
  • 功能:该函数根据参数 cmd 指定的功能来控制共享内存,可以做到:
    • 获取、修改共享内存属性信息
    • 销毁共享内存等
  • 参数:
    • msqid:共享内存唯一标识符
    • cmd:控制选项,常用的有:
      • IPC_STAT:将 shmid 指向的共享内存属性读到第三个参数 buf 中
      • IPC_SET:用 buf 中的共享内存属性修改共享内存属性
      • IPC_RMID:销毁共享内存,第三个参数为 NULL
    • buf:根据 cmd 的不同有不同的解释方式

共享内存的属性用一个结构体 shmid_ds 记录,里面记录了共享内存当前映射数量,将其创建的进程 PID 等信息,这里同样不作展开

此外,当多个进程同时有地址映射在同一片共享内存上时,只有当所有进程都取消映射后,调用 shmctl 销毁共享内存才有效。


终端中查看共享内存

在终端中可以通过 ipcs -m 指令查看当前存在的共享内存

也可以通过 ipcrm -m + 对应 shmid 来删除指定的共享内存


代码实现

同样通过服务器-客户端模式来呈现,可以同有名管道一样使用两块共享内存来防止双向通信时进程抢数据。不过这里我打算用一块共享内存 + 同步手段(信号)来实现双向通信。为了使进程间读写数据同步,考虑首先通过两个有名管道将两个进程的 pid 值互相交换。读进程休眠直到写进程完成写操作后通过 kill 函数发送 SIGUSR1 信号唤醒,将数据读出。

因为两端代码大同小异,可以把重复的代码写进头文件里

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
// shareMemory.h
#include <stdio.h>
#include <errno.h>
#include <fcntl.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <signal.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/stat.h>
#include <sys/types.h>

#define SHMSIZE 4096 // 共享内存大小 4k
#define PATHNAME "./shmTmp" // 用于产生 key 值
#define FILENAME1 "./pipeTmp1" // 两个有名管道
#define FILENAME2 "./pipeTmp2"

int shmId; // shmid 与 映射的虚拟地址定义为全局变量方便销毁
void *shmAddr;

// 打印错误的函数
void printError(char *error)
{
perror(error);
exit(-1);
}

// 处理 SIGUSR1 的函数,什么都不做
void wakeUp(int signo) {}

// 服务器端按下 Ctrl+C 时删除临时文件、关闭映射、销毁共享内存
void serverRemoveSharedMemory(int signo)
{
remove(PATHNAME);
remove(FILENAME1);
remove(FILENAME2);
shmdt(shmAddr);
shmctl(shmId, IPC_RMID, NULL);
puts("");
exit(0);
}

// 客户端按下 Ctrl+C 时关闭映射、销毁共享内存
void clientRemoveSharedMemory(int signo)
{
shmdt(shmAddr);
shmctl(shmId, IPC_RMID, NULL);
puts("");
exit(0);
}

// 创建/打开有名管道
int createNamedPipe(char *filename, int mode)
{
int ret, file;

ret = mkfifo(filename, 0664);
if(ret == -1 && errno != EEXIST)
printError("Create Named Pipe Failed");

file = open(filename, mode);
if(file == -1)
printError("Open File Failed");
return file;
}

// 创建/获取共享内存 id
int createSharedMemory()
{
int fd = open(PATHNAME, O_RDWR|O_CREAT, 0664);
if(fd == -1)
printError("Create Tmp File Failed");
close(fd);

key_t key = ftok(PATHNAME, 'b');
int shmId = shmget(key, SHMSIZE, 0664|IPC_CREAT);
if(shmId == -1)
printError("Create/Get Shared Memory Failed");
return shmId;
}

服务器端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// gcc shareMemory1.c -o s1
#include "shareMemory.h"

int main()
{
char buf[SHMSIZE];

shmId = createSharedMemory(); // 得到共享内存 id
shmAddr = shmat(shmId, NULL, 0); // 得到用于映射的虚拟内存地址
int pipeFile1 = createNamedPipe(FILENAME1, O_WRONLY); // 两个有名管道用于交换 pid
int pipeFile2 = createNamedPipe(FILENAME2, O_RDONLY);

if(!fork())
{ // 子进程用于接收数据
pid_t selfPid = getpid();
signal(SIGUSR1, wakeUp);
write(pipeFile1, &selfPid, sizeof(pid_t)); // 告诉客户端自己接收数据进程的 pid
while(1)
{
pause(); // 首先通过 pause 休眠,直到客户端写进程发送 SIGUSR1 信号唤醒
printf("Server recv: %s", (char *)shmAddr); // 打印信息并清空共享内存
memset(shmAddr, 0, SHMSIZE);
}
}
else
{ // 父进程用于写数据
pid_t otherPid = 0;
signal(SIGINT, serverRemoveSharedMemory);
read(pipeFile2, &otherPid, sizeof(pid_t)); // 获取客户端接收数据进程的 pid
while(1)
{
memset(buf, 0, sizeof(buf));
read(0, buf, sizeof(buf)); // 从终端读入信息至 buf
memcpy(shmAddr, buf, SHMSIZE); // 拷贝至共享内存区域
kill(otherPid, SIGUSR1); // 写好数据通知客户端接收
}
}
return 0;
}

客户端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// gcc shareMemory2.c -o s2
#include "shareMemory.h"

int main()
{
char buf[SHMSIZE];

shmId = createSharedMemory();
shmAddr = shmat(shmId, NULL, 0);
int pipeFile1 = createNamedPipe(FILENAME1, O_RDONLY);
int pipeFile2 = createNamedPipe(FILENAME2, O_WRONLY);

if(!fork())
{ // 子进程用于写数据
pid_t otherPid = 0;
read(pipeFile1, &otherPid, sizeof(pid_t)); // 获取服务器端接收数据的进程 pid
while(1)
{
memset(buf, 0, sizeof(buf));
read(0, buf, sizeof(buf));
memcpy(shmAddr, buf, SHMSIZE);
kill(otherPid, SIGUSR1);
}
}
else
{ // 父进程用于接收数据
pid_t selfPid = getpid();
signal(SIGUSR1, wakeUp);
signal(SIGINT, clientRemoveSharedMemory);
write(pipeFile2, &selfPid, sizeof(pid_t)); // 告诉服务器端自己接收数据的进程 pid
while(1)
{
pause();
printf("Client recv: %s", (char *)shmAddr);
memset(shmAddr, 0, SHMSIZE);
}
}
return 0;
}

运行结果:

共享内存


信号量

原理

设想现有 A,B 两个进程想要写同一个文件,这两个进程要写的数据很大,在他们各自的 1 个时间片内都写不完,如果对文件没有资源保护的手段,那么 A 进程的数据写了一半,B 进程又拿过去写,就会破坏 A 进程数据的连续性。为了解决这个问题,A 在自己数据没写完的情况下将文件上锁。A 时间片用尽,切换到 B 时,B 一看文件上有把锁,就无法往文件里写东西了。再切换回 A 时,A 又可以继续往文件里写剩下的数据,写完后将锁解开。这样下次切换到 B 时,B 就可以对文件进行操作了,A 上的锁就是所谓的信号量。

以上是对信号量直观的感受,具体实现时,一个信号量就相当于 OS 声明的一个全局变量,所有进程都可以访问,上述例子就可以转化为:假设信号量初始值是 1(表示文件还未被任何进程占用),A 进程在开始写文件之前先将信号量值减 1(P 操作),表示 A 占用了这个文件资源(上锁)。A 写完所有数据后,将信号量值加 1(V 操作),表示文件可以被其他进程占用了(释放锁)。所以不论 A 要写多少个时间片,只要没写完,信号量的值一直都会是 0。切换到 B 进程时,B 进程访问信号量的值,值为 0,说明文件上了锁,不该我操作,我就休眠;值为 1,说明别的进程操作完了,我可以操作了,于是 B 进程同样在开始写数据前将信号量减 1,在写完数据后加 1。这样一来,不论多少个进程要写同一个文件,只要他们共享同一个信号量,就可以保证他们对文件操作的互斥性

在保证互斥的基础上,如果再对使用文件的进程顺序有要求(比如必须 A 先写,B 再读),也可以用信号量实现,这就是所谓的同步

综上所述,信号量可以满足多进程对资源访问的互斥同步要求


创建信号量集并为其中信号量赋初值

Linux 中不能单独创建一个信号量,但可以创建一个信号量集,使其中仅包含一个信号量,创建的 API 是 semget

1
2
3
4
5
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>

int semget(key_t key, int nsems, int semflg);
  • 参数:
    • key:同消息队列与共享内存
    • nsems:指定信号集中信号量的数量
    • semflg:同消息队列与共享内存
  • 返回值:
    • 成功返回信号量集 id
    • 失败返回 -1,并设置 errno

为信号量集中的信号量赋初值使用 semctl 函数

1
2
3
4
5
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>

int semctl(int semid, int semnum, int cmd, ...);
  • 参数:
    • semid:信号量集 id
    • semnum:要操作的信号量在信号量集中的编号(从 0 开始递增)
    • cmd:设置为 SETVAL 表示为信号量赋值
    • 可选:第四个参数为联合体 semun
  • 返回值:
    • 成功返回非 -1 值
    • 失败返回 -1,并设置 errno

以下是 semun 联合体的定义,头文件中没有对这个联合体定义,编程时需要自己定义

1
2
3
4
5
6
union semun {
int val;
struct semid_ds *buf;
unsigned short *array;
struct seminfo *__buf;
};

因为是赋值,所以只会用到 val 字段,下面给出一个赋值的例子

1
2
3
union semun tmp;	// 声明一个 semun 联合体实例
tmp.val = 2; // 以下两句表示将信号量集 id 为 semid 的第 1 个信号量赋值为 2
semctl(semid, 0, SETVAL, tmp);

P、V 操作

对信号量的值进行修改(+1 或 -1)使用 semop 函数

1
2
3
4
5
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>

int semop(int semid, struct sembuf *sops, unsigned nsops);
  • 参数:
    • semid:信号量集 id
    • sops:要操作的信号量结构体 sembuf 数组
    • nsops:结构体数组的元素个数
  • 返回值:
    • 成功返回 0
    • 失败返回 -1,并设置 errno

sembuf 结构体无需自己定义,semop 头文件中已经定义

1
2
3
4
5
struct sembuf{
unsigned short sem_num;
short sem_op;
short sem_flg;
};

各字段含义:

  • sem_num:信号量编号
  • sem_op:设置为 1 表示 V 操作,设置为 -1 表示 P 操作
  • sem_flg:
    • SEM_UNDO:防止死锁(一般设置为这个)
    • IPC_NOWAIT:信号量值为 0 时,P 操作不会阻塞

销毁信号集

与赋初值一样使用 semctl,不过需要将 cmd 参数的值设置为 IPC_RMID,且第二个参数无意义,一般设置为 0,形如:semctl(semid, 0, IPC_RMID)


终端中查看信号量集

在终端中可以通过 ipcs -s 指令查看当前存在的信号量集

也可以通过 ipcrm -s + 对应 semid 来删除指定的信号量集


代码实现

使用共享内存的代码框架,这次用信号量实现读写同步,头文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// shmWithSemaphore.h
#include <stdio.h>
#include <errno.h>
#include <fcntl.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <signal.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/sem.h>
#include <sys/stat.h>
#include <sys/types.h>

#define NSEMS 2 // 定义信号量的数量
#define SHMSIZE 4096
#define PATHNAME "./shmTmp"

int semId; // semId 也定义为全局变量
int shmId;
void *shmAddr;
void deleteSemaphore();

// 该结构体用于初始化信号集
union semun {
int val;
struct semid_ds *buf;
unsigned short *array;
struct seminfo *__buf;
};
typedef union semun semUn;

// 打印错误的函数
void printError(char *error)
{
perror(error);
exit(-1);
}

// 服务器端按下 Ctrl+C 时删除临时文件、关闭映射、销毁共享内存、销毁信号集
void serverRemoveSharedMemory(int signo)
{
remove(PATHNAME);
shmdt(shmAddr);
shmctl(shmId, IPC_RMID, NULL);
semctl(semId, 0, IPC_RMID);
puts("");
exit(0);
}

// 客户端按下 Ctrl+C 时关闭映射、销毁共享内存
void clientRemoveSharedMemory(int signo)
{
shmdt(shmAddr);
shmctl(shmId, IPC_RMID, NULL);
puts("");
exit(0);
}

// 创建/获取共享内存 id
int createSharedMemory()
{
int fd = open(PATHNAME, O_RDWR|O_CREAT, 0664);
if(fd == -1)
printError("Create Tmp File Failed");
close(fd);

key_t key = ftok(PATHNAME, 'b');
int shmId = shmget(key, SHMSIZE, 0664|IPC_CREAT);
if(shmId == -1)
printError("Create/Get Shared Memory Failed");
return shmId;
}

// 创建/获取信号量集 id
int createSemaphore()
{
key_t key = ftok(PATHNAME, 'c');
int semId = semget(key, NSEMS, 0664|IPC_CREAT);
if(semId == -1)
printError("Create/Get Semaphore Failed");
return semId;
}

// 根据 buf 数组初始化信号量集
void initSemaphore(int buf[])
{
int i, ret;
semUn tmp;

for(i = 0 ; i < NSEMS ; i++)
{
tmp.val = buf[i];
ret = semctl(semId, i, SETVAL, tmp);
if(ret == -1)
printError("Init Semaphore Failed");
}
}

// P 操作,参数 buf 为需要操作的信号量编号数组,length 为数组长度
void P(int buf[], int length)
{
int i, ret;
struct sembuf semBuf[length];

for(i = 0 ; i < length ; i++)
{
semBuf[i].sem_num = buf[i];
semBuf[i].sem_op = -1;
semBuf[i].sem_flg = SEM_UNDO;
}
ret = semop(semId, semBuf, length);
if(ret == -1)
printError("Operation P Failed");
}

// V 操作,与 P 操作函数唯一不同在于 sem_op 设置的值
void V(int buf[], int length)
{
int i, ret;
struct sembuf semBuf[length];

for(i = 0 ; i < length ; i++)
{
semBuf[i].sem_num = buf[i];
semBuf[i].sem_op = 1;
semBuf[i].sem_flg = SEM_UNDO;
}
ret = semop(semId, semBuf, length);
if(ret == -1)
printError("Operation V Failed");
}

服务器端使用信号量 0 控制写操作的完成,客户端使用信号量 1 控制写操作的完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 服务器端代码
// gcc shmWithSemaphore1.c -o s1
#include "shmWithSemaphore.h"

int main()
{
char buf[SHMSIZE];
int initBuf[NSEMS] = {0, 0}; // 由服务器初始化信号量

shmId = createSharedMemory();
shmAddr = shmat(shmId, NULL, 0);
semId = createSemaphore(); // 创建/获取信号集 id
initSemaphore(initBuf); // 初始化信号量

if(!fork())
{ // 子进程用于接收数据
int pBuf[1] = {1};
while(1)
{
P(pBuf, 1); // 等待信号量 1 的释放(即客户端写完)
printf("Server recv: %s", (char *)shmAddr);
memset(shmAddr, 0, SHMSIZE);
}
}
else
{ // 父进程用于写数据
int vBuf[1] = {0};
signal(SIGINT, serverRemoveSharedMemory);
while(1)
{
memset(buf, 0, sizeof(buf));
read(0, buf, sizeof(buf));
memcpy(shmAddr, buf, SHMSIZE);
V(vBuf, 1); // 服务器端写完,释放信号量 0
}
}
return 0;
}

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// gcc shmWithSemaphore2.c -o s2
#include "shmWithSemaphore.h"

int main()
{
char buf[SHMSIZE];

shmId = createSharedMemory();
shmAddr = shmat(shmId, NULL, 0);
semId = createSemaphore();

if(!fork())
{ // 子进程用于接收数据
int pBuf[1] = {0};
while(1)
{
P(pBuf, 1); // 等待信号量 0 的释放(即服务器端写完)
printf("Client recv: %s", (char *)shmAddr);
memset(shmAddr, 0, SHMSIZE);
}
}
else
{ // 父进程用于写数据
int vBuf[1] = {1};
signal(SIGINT, clientRemoveSharedMemory);
while(1)
{
memset(buf, 0, sizeof(buf));
read(0, buf, sizeof(buf));
memcpy(shmAddr, buf, SHMSIZE);
V(vBuf, 1); // 客户端写完,释放信号量 1
}
}
return 0;
}