0%

IPC-进程之间的通信详解

前言

面试中常问,你知道进程之间的通信方式有哪几种吗?它们各有什么优缺点?这篇博文就打算从源码角度好好分析一下。

Inter Process Communication (IPC) 有以下八种:

  • Memory Map(内存映射文件)
  • Pipe(匿名管道)
  • Named Pipe(命名管道)
  • Message Queue(消息队列)
  • Shared Memory(共享内存)
  • Semaphore(信号量)
  • Signal(信号)
  • Socket(套接字)

其中共享内存和信号量一般会搭配使用;匿名管道和命名管道也统称为管道。

那么为什么是进程之间的通信,而不是线程之间的通信呢?这是因为每个进程之间的地址空间都是独立的,通常来说一个进程肯定不能去访问另外一个进程的地址空间,否则就乱套了。但是,操作系统的内核部分的空间是每个进程都共享的,所以可以通过内核来完成彼此之间的通信。

IPC

管道和消息队列。管道本身是FIFO的,而且它还有一个天然的特性:它线程安全。而消息队列现在虽然已经不推荐使用了,但是还是简单介绍一下。命名管道本质上其实就是一个“文件”,共享内存的需要搭配信号量使用。

匿名管道

匿名管道你肯定在Linux中早就看过,就是俗称的匿名管道符:|,它可以把前面一个进程的标准输出作为后一个进程的标准输入。从这句话也可以看出,它只可以从一个进程到另外一个进程,这个过程是单向的,所以如果希望两个进程能够相互通信,那么就需要两个管道。

所以管道本身其实是非常形象的,数据真的就是从管道的一头写入,然后像水流一样到另外一端,由另一端的进程进行读取。

来一个简单的命令,猜测一下它会有什么现象:sleep 5 | echo "hello world"

屏幕上立即输出hello world,但是需要等待5秒之后程序才会返回,才会出现用户提示符。默认情况下,reader(在上面的例子中是echo进程)会一直阻塞,直到能够从管道中读到消息。而writer只需要写完自己的信息,然后在加一个EOF(这里其实是end-of-streaming)就可以结束了。匿名管道符会在reader和writer两者都退出之后才结束。

所以虽然sleep睡了5秒,但是它本身没有向管道输入任何信息,但是当结束的时候,会有一个EOF发送到reader。

来看具体的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <limits.h>

#define READEND 0
#define WRITEEND 1
int pipe_fd[2];

pid_t child_pid;

// 父子进程锁执行的任务
void parent_task();
void child_task();

// 错误处理
void report_and_exit(const char *msg);

int main(int argc, const char *argv[])
{

if (pipe(pipe_fd) == -1)
{
report_and_exit("Can not create the pipe");
}

if ((child_pid = fork()) == -1)
{
report_and_exit("fork error");
}

if (child_pid == 0)
{
child_task();
}
else
{
parent_task();
}
return 0;
}

// 父进程进行读取
void parent_task()
{
close(pipe_fd[WRITEEND]);
char buffer[128];
if (read(pipe_fd[READEND], buffer, 128) == -1)
{
report_and_exit("Cannot read pipe");
}
close(pipe_fd[READEND]);
wait(NULL);

printf("PARENT - Received: %s\n", buffer);
exit(EXIT_SUCCESS);
}

// 子进程进行写入
void child_task()
{
close(pipe_fd[READEND]);
char *buffer = "Hello World";
printf("CHILD - Sending : %s\n", buffer);
write(pipe_fd[WRITEEND], buffer, strlen(buffer) + 1);
close(pipe_fd[WRITEEND]);
exit(EXIT_SUCCESS);
}

void report_and_exit(const char *msg)
{
perror(msg);
exit(EXIT_FAILURE);
}

管道本质上还是很简单,但是有几个比较重要的点。

第一点是读端和写端,查询对应的man手册,可以看到有这样的描述:

The first descriptor connects to the read end of the pipe; the second connects to the write end.

所以上面的宏定义#define读写两端是不可以进行交换的。

第二点是记得要在父进程加入wait系统调用来等待子进程执行完毕。但是在上述的程序中,由于管道的特殊性,所以似乎也不会有什么问题。

第三点,记得在fork之前调用pipe,否则新创建的两个进程无法通过pipe创建的管道进行通信,因为相当于父子进程都独立创建了一根管道,而不是共享一根管道。

第四点,如果有两个进程同时写入了一个匿名管道,会发生什么?POSIX标准保证,只要你写入的字节不大于PIPE_BUF(默认是4096个)字节,那么就不会出现你中有我,我中有你的情况。

pipe系统调用

仅当pipe的所有与之相关的描述符都关闭之后,pipe本身才会被关闭。

命名管道

匿名管道,这根管道本质上其实是系统维护的一个内存中的Buffer而已,一旦writer和reader进程结束,那么这篇内存区域就销毁了,即匿名管道就没了。

那命名管道就很简单,其实就是一个文件,只是一个比较特殊的文件。

首先还是一样,先来体验一样,利用mkfifo pipe来创建一个名字叫pipe的命名管道。你会发现在当前的目录中创建了一个叫pipe的文件,如果用ll命令查看,会发现这个文件的类型是p,意味着这是一个管道。然后就可以利用cat pipe来查看对应的管道的内容,或者是利用cat > pipe往里面写内容了。

由于命名管道本质上其实是一个文件,所以可以用mknod这个系统调用创建,也可以mkfifo这个更为特殊的创建文件的系统调用。

reader的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#include <errno.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <fcntl.h>

#define PIPE_NAME "./pipe"

pid_t child_pid;

int main(int argc, const char *argv[])
{

umask(0);
if (mknod(PIPE_NAME, S_IFIFO | 0660, 0) == -1)
{
if (errno != EEXIST)
{
perror("Cannot create the pipe");
exit(EXIT_FAILURE);
}
else
printf("Using existing file\n");
}

// Reading
char buffer[128];
int fd = open(PIPE_NAME, O_RDONLY);
read(fd, buffer, 128);
close(fd);
printf("Received: %s\n", buffer);

return 0;
}

writer的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include <errno.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <fcntl.h>
#include <string.h>

#define PIPE_NAME "./pipe"

pid_t child_pid;

int main(int argc, const char *argv[])
{

if (argc < 2)
{
printf("./writer <message>");
exit(EXIT_FAILURE);
}

umask(0);
if (mkfifo(PIPE_NAME, 0660) == -1)
{
if (errno != EEXIST)
{
perror("Cannot create the pipe");
exit(EXIT_FAILURE);
}
else
{
printf("Using existing file\n");
}
}

int fd = open(PIPE_NAME, O_WRONLY);
write(fd, argv[1], strlen(argv[1]));
close(fd);
printf("Sent: %s\n", argv[1]);

return 0;
}

可以看到,其实本质上就是利用mknod或者是mkfifo系统调用创建了特殊的管道文件,然后两个进程分别去文件里进行读取罢了。

显然命名管道的好处就是,它可以不像匿名管道一样,要求两个进程必须具有“血缘关系”,而是只要能读写文件就可以。

消息队列

管道,包括匿名管道和命名管道,都有一个问题,就是它们只支持FIFO,消息队列就可以稍微灵活一点,不一定要求FIFO了。

每个消息至少要包括两部分,分别是payload(就是消息本体),id(这样便于灵活取用)。下面是man page给出的参考:

1
2
3
4
struct msgbuf {
long mtype; /* message type, must be > 0 */
char mtext[1]; /* message data */
};

ftok()

首先摆在消息队列中的第一个问题是,如何产生一个独一无二的标识符,很自然想到的是文件,inode自然而然是独一无二的。

此时就可以使用ftok()这个系统调用来生成IPC的唯一标识符。它的描述就是通过文件路径来获得独一无二的标识符。

create IPC identifier from path name

只需要给定一个文件的路径,然后给定一个小数字,就可以生成一个唯一的标识符。

简单说,这就是一个hash函数。

msgget

获得一个消息队列的标识符。函数原型:int msgget(key_t key, int msgflg);,参数中的key就是上一步生成的key,然后返回一个ID。这个返回值就可以用来做对应的消息队列的标识符。

msgsnd && msgrcv

1
2
3
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);

ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);

其中的msqid就是上面中得到的消息队列,msgp就是对应的结构体(msgbuf),msgsz是消息的长度(就是上面的mtext的长度),

可以看到,接受消息有了一个叫msgtyp的参数,就是这个参数可以控制指定的类型的数据。

小结

所以使用消息队列的顺序如下:

  1. 首先根据某个文件获得对应的id,再根据id获得对应的消息队列。
  2. 根据需要进行消息的发送/接受。

代码

由于消息队列支持自己定义消息,所以这里比较好的实践就是把对应的消息定义放到头文件中。

message.h:

1
2
3
4
5
6
#define MSG_LEN 128

struct msgbuf {
long mtype; /* message type, must be > 0 */
char mtext[MSG_LEN]; /* message data */
};

然后是发送端的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#include <stdio.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/ipc.h>
#include <stdlib.h>
#include <string.h>

#include "message.h"

#define KEY 0x01

// 错误处理
void report_and_exit(const char *msg);

int get_queue_id(char key);

int main(int argc, char const *argv[], char *envp[])
{
if (argc < 2)
{
report_and_exit("Usage:./sender <message>");
}

// prepare message
struct msgbuf msg;
msg.mtype = 1;
strcpy(msg.mtext, argv[1]);

int queue_id = get_queue_id(KEY);

// send message
if (msgsnd(queue_id, &msg, sizeof(msg) - sizeof(msg.mtype), 0) == -1)
{
report_and_exit("message send error!");
}
printf("Sent OK!\n");
return 0;
}

int get_queue_id(char key)
{
key_t tmp = ftok(".", key);
int queue_id = 0;

queue_id = msgget(tmp, IPC_CREAT | 0660);
if (queue_id == -1)
{
report_and_exit("error when using msgget");
}
return queue_id;
}

void report_and_exit(const char *msg)
{
perror(msg);
exit(EXIT_FAILURE);
}

最后是接收端的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#include <stdio.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/ipc.h>
#include <stdlib.h>
#include <string.h>

#include "message.h"

#define KEY 0x01

void report_and_exit(const char *msg);
int get_queue_id(char key);

int main(int argc, char const *argv[])
{
struct msgbuf msg;
msg.mtype = 1;

int queue_id = get_queue_id(KEY);
if (msgrcv(queue_id, &msg, sizeof(msg) - sizeof(msg.mtype), 1, 0) == -1)
{
report_and_exit("msgrcv error");
}
printf("msg = %s\n", msg.mtext);
return 0;
}

int get_queue_id(char key)
{
key_t tmp = ftok(".", key);
int queue_id = 0;

queue_id = msgget(tmp, IPC_CREAT | 0660);
if (queue_id == -1)
{
report_and_exit("error when using msgget");
}
return queue_id;
}

void report_and_exit(const char *msg)
{
perror(msg);
exit(EXIT_FAILURE);
}

共享内存

共享内存本质上就是把多个进程的虚拟内存空间给映射到同一个物理内存中,这样每个进程只需要访问自己的内存空间就可以了。

shmget

int shmget(key_t key, size_t size, int shmflg); 这个函数中的参数第一个key和之前的消息队列是一样的,第二个内存的段的大小,第三个可以参考Man手册。

shmat && shmdt

1
2
3
void *shmat(int shmid, const void *shmaddr, int shmflg);

int shmdt(const void *shmaddr);

首先是shmat这个函数,第一个参数自然就是上面返回的id啦,第二个是共享内存的地址,如果是NULL,则让操作系统来进行指定未使用的内存来attach segment。

shmdt这个函数就是对立面,就是把shmaddr指定的内存和segment进行分离。

小结

其实共享内存也很简单,主要就是生成id,然后attach对应的地址,然后就可以直接使用memcpy进行操作了。

当然这里都是理想状态,并没有考虑并发的问题,实际中需要配合信号量进行控制。

信号量

信号量最有名的就是它的PV操作了。把信号量的初值设置成多少,就允许有多少个进程同时进入关键区。PV操作就是最典型的例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void
P(struct semaphore *s)
{
acquire(&s->lock);
while(s->count == 0)
sleep(s,&s->lock);
s->count-=1;
release(&s->lock);
}

void
V(struct semaphore *s)
{
acquire(&s->lock);
s->count+=1;
wakeup(s);
release(&s->lock);
}

可以看到信号量的本质上还是依赖于锁的。

信号

信号本质上就是对中断的一种模拟。

内核接收到信号之后,会把信号放到进程的一个队列中,然后向进程发送一个中断,这样这个进程就进到内核态了。一般来说,进程在返回内核态之前,会去检查一下对应的队列中有没有信号,如果有就进行相应的处理;但是由于此时进程处于内核态,这样是很危险的,所以需要暂时切换回用户态执行,执行完成之后再回到内核态,检查一下是否还有别的信号需要处理。