進程間通信–消息隊列
- 2021 年 4 月 7 日
- 筆記
- Linux下進程間通信
消息隊列(message queue):是進程間通訊的一種常用的方式,可以傳遞多種類型的數據流,可以實現異步傳輸。
消息隊列是兩個或者多個應用程序約定好的一種交互方式,體現在彼此知曉共同的消息隊列 ID, 該收取哪種類型的消息。
下面開始介紹應用的過程,附着的代碼是在某公司開發feature時候加入的
1. 創建IPC key
key_t ftok(const char* pathname, int proj_id)
- pathname 是一個文件的路徑,需要注意的是在實際進程間通訊時,需要保證pathname一直存在且不會被修改, proj_id是一個標識符,常用一個字符表示
- 兩個進程想要實現消息隊列通信,必須有相同的key,因此pathname,proj_id是相同的
- 創建成功返回key_t值,失敗為-1,原因可查看errno
- 代碼如下:
#define NEIGBOR_INFO_GET "/tmp" #define NEIGBOR_INFO_GET_PROJ 'x' ... key = ftok(NEIGBOR_INFO_GET, NEIGBOR_INFO_GET_PROJ); if(key == -1) { printf("[%s][%d] creat key error!\n", __FUNCTION__, __LINE__); return res; }
2. 獲取消息隊列 ID
int msgget(key_t key, int msgflg)
- key是ftok創建所得,flag是對消息隊列的操作,常用IPC_CREAT,即如果消息隊列已存在,則返回消息隊列 ID,否則創建新的消息隊列, 返回 ID
- 消息隊列在創建的時候是可以設置讀寫權限的 | 0666即 設置為可讀可寫
- flag如果指定為IPC_CREAT | IPC_EXCL ,在消息隊列已存在下,會返回-1, errno為EEXIST
- 返回值為消息隊列的 ID, 否則為-1, 原因可查看errno
- 代碼如下:
...
int gs_getneigborinfo_qid; ... gs_getneigborinfo_qid = msgget(key, IPC_CREAT | 0666); if(gs_getneigborinfo_qid == -1) { printf("[%s][%d], create message deque error!\n", __FUNCTION__, __LINE__); return res; }
3. 消息類型定義 msgbuf
msgbuf定義有一定的規範,常用如下:
struct msgbuf { long type; char mtext[1]; };
每一個msg中必須含有一個 long 類型的type,用於區分消息類型,後面的數據是可以自定義的,如下:
... #define CHASSISID_MAX_SIZE 255 #define PORTID_MAX_SIZE 255 ... struct neighborInfo { long type; char chassis_id[CHASSISID_MAX_SIZE + 1]; char port_id[PORTID_MAX_SIZE + 1]; };
4. 發送消息 msgsnd
int msgsnd(int msgid, const void *msgp, size_t msgsz, int msgflg)
- msgid:指定的消息隊列 ID
- msgp:待發送的數據buff
- msgsz:待發送數據大小,常用sizeof(struct msgbuf) – sizeof(long)
- msgflg:常為0:表示阻塞發送,ICP_NOWAIT:非阻塞發送
- 成功返回0,否則為-1, 錯誤碼為errno
- 代碼如下:
... #define RESPONSE_INFO_TYPE 1 #define REQUEST_INFO_TYPE 2 struct neighborInfo temp; memset(&temp, 0, sizeof(temp)); int rs = -1; temp.type = REQUEST_INFO_TYPE; ... rs = msgsnd(gs_getneigborinfo_qid, &temp, sizeof(temp) - sizeof(long), IPC_NOWAIT); if(rs == -1) { printf("[%s][%d] msg send error!\n", __FUNCTION__, __LINE__); // pthread_mutex_unlock(&gs_pthread_lock); return rs; }
5. 接收消息 msgrcv
ssize_t msgrcv(int msgid, void *msgp, size_t msgsz, long msgtyp, int msgflg)
- msgtyp:需要接收的消息類型,如果為0, 則讀取消息隊列的第一條消息,不管其消息類型
- 讀取成功返回讀取的消息長度,否則為-1,錯誤碼查看errno
- 代碼如下:
...
// need to recv info memset(&temp, 0, sizeof(temp)); int count = 200; do { rs= msgrcv(gs_getneigborinfo_qid, &temp, sizeof(temp), RESPONSE_INFO_TYPE, IPC_NOWAIT); count--; if(rs == -1) { usleep(1000); // wait 1ms to rerecive data } } while(rs == -1 && count > 0 && errno == ENOMSG);
...
6. 消息隊列控制 msgctl
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
- msgid:消息隊列 ID
- cmd:控制命令 IPC_STAT::將消息隊列信息存放到buf中,IPC_RMID:刪除消息隊列
- buf: 消息隊列信息存放
struct msqid_ds { struct ipc_perm msg_perm; /* Ownership and permissions */ time_t msg_stime; /* Time of last msgsnd(2) */ time_t msg_rtime; /* Time of last msgrcv(2) */ time_t msg_ctime; /* Time of last change */ unsigned long __msg_cbytes; /* Current number of bytes in queue (nonstandard) */ msgqnum_t msg_qnum; /* Current number of messages in queue */ msglen_t msg_qbytes; /* Maximum number of bytes allowed in queue */ pid_t msg_lspid; /* PID of last msgsnd(2) */ pid_t msg_lrpid; /* PID of last msgrcv(2) */ }; The ipc_perm structure is defined as follows (the highlighted fields are settable using IPC_SET): struct ipc_perm { key_t __key; /* Key supplied to msgget(2) */ uid_t uid; /* Effective UID of owner */ gid_t gid; /* Effective GID of owner */ uid_t cuid; /* Effective UID of creator */ gid_t cgid; /* Effective GID of creator */ unsigned short mode; /* Permissions */ unsigned short __seq; /* Sequence number */ };