Очереди сообщений

Очереди сообщений являются одним из трех механизмов IPC (от англ. Inter Process Communication -- межпроцессное взаимодействие). Другие два -- это семафоры и разделяемая память. Очереди сообщений появились в UNIX system V release III и предназначались для асинхронной передачи сообщений между процессами.

[Raghu J Menon. Перевод Андрей Киселев (kis_an@mail.ru)]

Очереди сообщений

Автор: Raghu J Menon
Перевод: Андрей Киселев

Очереди сообщений являются одним из трех механизмов IPC (от англ. Inter Process Communication -- межпроцессное взаимодействие). Другие два -- это семафоры и разделяемая память. Очереди сообщений появились в UNIX system V release III и предназначались для асинхронной передачи сообщений между процессами.

В общих чертах обмен сообщениями выглядит примерно так: один процесс помещает сообщение в очередь посредством неких системных вызовов, а любой другой процесс может прочитать его оттуда, при условии, что и процесс-источник сообщения и процесс-приемник сообщения используют один и тот же ключ для получения доступа к очереди.

О команде ipcs

Команда ipcs выводит информацию о текущем состоянии IPC операционной системы. Если попробовать ввести ее из командной строки, в результате должно получиться нечто подобное:

   ------ Shared Memory Segments --------
   key shmid owner perms bytes nattch status
   ------ Semaphore Arrays --------
   key semid owner perms nsems status
   ------ Message Queues --------
   key msqid owner perms used-bytes messages
Нас интересует последний раздел. Кратко описание полей в этом разделе следует ниже:
  • key - ключ (имя), который присваивается очереди при ее создании вызовом функции msgget().
  • msqid - значение, возвращаемое функцией msgget(). Является дескриптором очереди. Любые операции над очередью сообщений осуществляются с указанием этого дескриптора.
  • owner - пользователь, создавший очередь.
  • perms - восьмеричное число, определяющее права доступа к очереди. Очень напоминает права доступа к файлам.
  • used-bytes - текущее количество байт, хранящихся в очереди.
  • messages - количество сообщений в очереди.

Создание очереди сообщений

Создание любого элемента IPC производится вызовом соответствующей функции ipcget() (для семафоров -- semget(), для разделяемой памяти -- shmget(), прим перев.). Для очереди сообщений -- это msgget(). Функция принимает 2 параметра: key - ключ, идентифицирующий очередь, и msgflg -- набор флагов. Набор флагов может включать в себя комбинацию значений IPC_CREAT и IPC_EXCL. Первое из них указывает на необходимость создания новой очереди с ключом key (если таковой еще не существует). Если очередь уже существует, то этот флаг просто игнорируется, а функция msgget() возвращает существующую очередь.

Второе -- при использовании совместно с флагом IPC_CREAT приводит к неудаче, если очередь с заданным key уже существует. Что возвращает msgget()? Я полагаю вы уже догадались! Она возвращает id очереди (обычный файловый дескриптор). А теперь попробуйте собрать и запустить у себя: mesg1.c.. Эта программа создает очередь с ключом 10 (передается в функцию msgget() первым параметром). Пусть вас не смущает тип key_t - это обычный int. После запуска программы можно убедиться в том, что очередь в действительности была создана.

Используйте для этого команду ipcs. Просмотрев раздел Message Queues, вы обнаружите запись, в которой поле key имеет значение 10 (0x0000000a в шестнадцатеричном виде) и поле msqid имеет значение 0 (как правило).

Эта очередь и была создана программой. Если у вас еще остались какие либо сомнения, то можете сравнить вывод команды ipcs до и после запуска этой программы, отличия вы увидите сразу.

------ Shared Memory Segments --------
key shmid owner perms bytes nattch status

------ Semaphore Arrays --------
key semid owner perms nsems status

------ Message Queues --------
key       msqid  owner perms  used-bytes  messages
0x0000000a    0   root   666      0          0
Теперь, в качестве примера, попробуйте заменить в нашей программе флаг IPC_CREAT на комбинацию флагов IPC_CREAT | IPC_EXCL. Пересоберите программу и запустите ее. Результат очевиден -- поскольку очередь с заданным ключом уже существует, то функция msgget() возвратит управление с кодом ошибки (отрицательное значение). Созданную программой очередь можно удалить выполнив из командной строки: ipcrm msg <id-number>

Права доступа к очередям

Очередь сообщений -- это не церковный колокол, в который может позвонить любой. Ее можно сравнить с файлом, который имеет права на чтение и на запись. Права доступа к очереди указываются во втором параметре функции msgget() в восьмеричном виде, объединяя их по ИЛИ (OR) с флагами IPC_CREAT и IPC_EXCL. Рассмотрим второй пример mesg.c, который отличается от первого лишь тем, что в вызове msgget() устанавливаются права доступа к очереди, равные 0644, объединенные по ИЛИ (OR) с флагом IPC_CREAT.

/*-------------------------------------------*/
/* mesg.c */
#include<sys/types.h>
#include<sys/ipc.h>
#include<sys/msg.h>

main()
{
    int msgid;
    key_t key=10;
    msgid=msgget(key,IPC_CREAT);
    printf("Message queue created with id %d\n",msgid);

}   
/*-------------------------------------------*/
Таким образом, очередь создается с правами "чтение-запись" для владельца, и "только-чтение" для всех остальных. Следует отметить, что выдача прав "на-исполнение" для очередей не имеет смысла, поскольку они (очереди) не являются исполняемым кодом. Если необходимо выдать права на "чтение-запись" для всех, то следует указать значение 0666, вместо приведенного выше 0644. (Следует отметить тот факт, что в случае с примером mesg1.c вы не сможете просмотреть информацию о вновь созданной очереди, если пытаетесь всё проделать от имени непривилегированого пользователя. Причина? В правах доступа. Процесс создаётся с правами доступа 0 и только root может получить информацию о такой очереди сообщений. Поэтому будьте внимательны, назначая права доступа. Прим.ред.)

Где хранится информация об очередях?

Для каждой, вновь создаваемой очереди, в области ядра отводится пространство со следующей структурой:

Эта структура определена в файле bits/msg.h. Этот заголовочный файл обязательно должен подключаться к вашим программам, использующим очереди сообщений через подключение файла sys/msg.h.

/* Структура записи для одного сообщения в области ядра
Тип __time_t соответствует типу long int.
Все данные типы определены в заголовочном файле types.h*/
struct msqid_ds
{
struct ipc_perm msg_perm; /* структура описывает права доступа */
__time_t msg_stime;       /* время последней команды msgsnd (см. ниже) */
unsigned long int __unused1;
__time_t msg_rtime;       /* время последней команды msgrcv (см. ниже) */
unsigned long int __unused2;
__time_t msg_ctime;       /* время последнего изменения */
unsigned long int __unused3;
unsigned long int __msg_cbytes; /* текущее число байт в очереди */
msgqnum_t msg_qnum;      /* текущее число сообщений в очереди */
msglen_t msg_qbytes;     /* максимальный размер очереди в байтах */
__pid_t msg_lspid;       /* pid последнего процесса вызвавшего msgsnd() */
__pid_t msg_lrpid;       /* pid последнего процесса вызвавшего msgrcv() */
unsigned long int __unused4;
unsigned long int __unused5;
};
Первый элемент структуры - это ссылка на другую структуру, которая имеет следующее определение в файле bits/ipc.h. подключение которого производится через файл sys/ipc.h.
/* Структура используется для передачи информации о правах доступа в операциях
IPC. */
struct ipc_perm
{
__key_t __key;              /* Ключ. */
__uid_t uid;                /* UID владельца. */
__gid_t gid;                /* GID владельца. */
__uid_t cuid;               /* UID создателя. */
__gid_t cgid;               /* GID создателя. */
unsigned short int mode;    /* Права доступа. */
unsigned short int __pad1;
unsigned short int __seq;   /* Порядковый номер. */
unsigned short int __pad2;
unsigned long int __unused1;
unsigned long int __unused2;
};
Структура ipc_perm хранит UID, GID и права доступа к очереди.

Управление очередью сообщений

После того как очередь будет создана, она может быть изменена. Это означает, что пользователь создавший очередь или авторизованный пользователь, может изменить права доступа к очереди и другие ее характеристики. Для внесения изменений используется функция msgctl(). Она имеет следующее определение:

       int msgctl(int msqid, int cmd, struct msqid_ds *queuestat )
Где первый параметр msqid -- это дескриптор очереди. Это должен быть дескриптор существующей очереди!

Параметр cmd может быть одним из следующих:

  • IPC_STAT -- в буфер queuestat копируется информация о заданной очереди. Процесс, вызвавший msgctl() должен иметь, по крайней мере, права "на чтение". По сути, эта операция производит копирование структуры msqid_ds, описанной выше.

  • IPC_SET -- Устанавливает UID и GID владельца, права доступа и размер (в байтах) очереди сообщений. Процесс, вызвавший msgctl(), должен иметь эффективный UID владельца, создателя или суперпользователя. Эта операция позволяет изменять поля msqid_ds.msg_perm.uid, msqid_ds.msg_perm.gid, msqid_ds.msg_perm.mode и msg_qbytes структуры msqid_ds очереди сообщений. В данном случае данные копируются из нашего буфера queuestat в область ядра.

  • IPC_RMID -- Операция удаления очереди сообщений с дескриптором msqid.
Следующий пример выводит содержимое элементов структуры: qinfo.c.
/*-------------------------------------------*/
#include<sys/types.h>
#include<sys/ipc.h>
#include<sys/msg.h>
#include<stdio.h>

main(int argc, char *argv[])
{
    int qid;
    struct msqid_ds qstatus;
    qid=msgget((key_t)atoi(argv[1]),IPC_CREAT);
    if(qid==-1){
	perror("msgget failed");
	exit(1);
    }
    if(msgctl(qid,IPC_STAT,&qstatus)<0){
	perror("msgctl failed");
	exit(1);
    }
    printf("Real user id of the queue creator: %d\n",qstatus.msg_perm.cuid);
    printf("Real group id of the queue creator: %d\n",qstatus.msg_perm.cgid);

    printf("Effective user id of the queue creator: %d\n",qstatus.msg_perm.uid);
    printf("Effective group id of the queue creator: %d\n",qstatus.msg_perm.gid);
    printf("Permissions: %d\n",qstatus.msg_perm.mode);
    printf("Message queue id: %d\n",qid);
    printf("%d message(s) on queue\n",qstatus.msg_qnum);
    printf("Last message sent by process :%3d at %s \n",qstatus.msg_lspid,ctime(& (qstatus.msg_stime)));

    printf("Last message received by process :%3d at %s \n",qstatus.msg_lrpid,ctime(& (qstatus.msg_rtime)));
    printf("Current number of bytes on queue %d\n",qstatus.msg_cbytes);
    printf("Maximum number of bytes allowed on the queue%d\n",qstatus.msg_qbytes);
}   
/*-------------------------------------------*/
В качестве параметра командной строки, программе передается дескриптор очереди (msqid). (Неувязочка. Автор ошибся -- в качестве параметра передаётся не дескриптор (msqid), а ключ (key) очереди сообщения. Прим.ред.) Таким образом, перед вызовом программы, необходимо создать очередь сообщений. Можно выбрать одну из очередей, которые выводятся командой ipcs. Функция msgctl() заполняет структуру, на которую ссылается указатель qstatus. После чего отдельные характеристики очереди выводятся на дисплей. Было бы неплохо попробовать запустить программу qinfo после того, как вы попробуете скомпилировать и запустить пример send.c, рассматриваемый ниже

Итак, мы продвинулись достаточно далеко, чтобы перейти к рассмотрению вопросов, связанных с передачей и приемом сообщений.

Передача и прием сообщений

Для передачи и приема сообщений UNIX-подобные системы предоставляют две функции: msgsnd() -- для передачи и msgrcv() -- для приема. Определение обеих функций приведено ниже:

	int msgsnd (int msqid, const void *msgp, size_t msgsz, int msgflg);
	int msgrcv (int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
Сначала рассмотрим функцию msgsnd(). Она принимает 4 параметра. Первый -- дескриптор очереди (таким образом, msgsnd() может обращаться только к существующей очереди). Второй -- указатель на сообщение -- это структура, которая хранит собственно сообщение и его тип. Описание структуры приводится ниже:
struct message{
    long mtype;        // Тип сообщения
    char mesg [MSGSZ]; // Само сообщение, длиной MSGSZ.
};
Третьим параметром следует длина сообщения MSGSZ в байтах. И последний параметр msgflg -- описывает действие, которое производится, если хотя бы одно из следующих условий истинно:
  • Количество байт в очереди уже равно максимальному числу (msg_qbytes).

  • Общее число сообщений во всех очередях превышает системное ограничение.
Что происходит в таких случаях?
  • Если (msgflg & IPC_NOWAIT) != 0, то сообщение не передается и управление сразу же возвращается в вызывающий процесс (с кодом ошибки EAGAIN прим. перев.).

  • Если (msgflg & IPC_NOWAIT) == 0, то вызывающий будет приостановлен до тех пор, пока не будет выполнено одно из следующих условий:
    • Условия, вызвавшие приостановку процесса, перестали существовать, в данном случае это означает, что сообщение было передано.
    • Очередь с дескриптором msqid была удалена, в этом случае переменной errno будет присвоено значение EIDRM и функция msgsnd() вернет значение -1.
    • Вызывающему процессу поступил сигнал. В этом случае сообщение не передается и вызывающий процесс продолжает исполнение (функция msgsnd() возвращает код ошибки EINTR прим. перев.)
    В случае успеха выполняются следующие действия:
    • Значение msg_qnum увеличивается на 1 т.е. увеличивается счетчик счетчик числа сообщений в очереди.
    • Значение msg_lspid устанавливается равным PID вызвавшего процесса Поле msg_lspid содержит pid последнего процесса, выполнявшего передачу сообщения.
    • В поле msg_stime заносится текущее время. Это поле хранит время передачи последнего сообщения.
Все выше перечисленные поля являются элементами структуры msqid_ds. Функция msgrcv() принимает (по сравнению с msgsnd()) один дополнительный параметр -- msgtyp, определяющий тип принимаемого сообщения, который должен указать процесс-источник сообщения. Приняты будут только те сообщения, для которых совпадают значения параметра msgtyp и значение поля mtype структуры message (см. выше). Рассмотрим приведенное объяснение на примере recv.c

Следующие программы содержат код, достаточно четко поясняющий только что рассмотренные принципы "общения" между процессами. Программа send.c создает очередь и помещает в нее сообщение, а recv.c вынимает сообщение из очереди и выводит его на экран.

/*-------------------------------------------*/
/* send.c */
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <string.h>

#define MSGSZ     128

typedef struct msgbuf {
         long    mtype;
         char    mtext[MSGSZ];
         } message_buf;

main()
{
    int msqid;
    int msgflg = IPC_CREAT | 0666;
    key_t key;
    message_buf sbuf;
    size_t buf_length;

    key = 10;

    printf("Calling msgget with key %#lx and flag %#o\n",key,msgflg);

    if ((msqid = msgget(key, msgflg )) < 0) {
        perror("msgget");
        exit(1);
    }
    else 
     printf("msgget: msgget succeeded: msqid = %d\n", msqid);


    sbuf.mtype = 1;
    
    printf("msgget: msgget succeeded: msqid = %d\n", msqid);
    
    (void) strcpy(sbuf.mtext, "I am in the queue?");
    
    printf("msgget: msgget succeeded: msqid = %d\n", msqid);
    
    buf_length = strlen(sbuf.mtext) + 1 ;
    
    if (msgsnd(msqid, &sbuf, buf_length, IPC_NOWAIT) < 0) {
       printf ("%d, %d, %s, %d\n", msqid, sbuf.mtype, sbuf.mtext, buf_length);
        perror("msgsnd");
        exit(1);
    }

   else 
      printf("Message: \"%s\" Sent\n", sbuf.mtext);
      
    exit(0);
}
/*-------------------------------------------*/

/*-------------------------------------------*/
/* recv.c */
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>

#define MSGSZ     128



typedef struct msgbuf {
    long    mtype;
    char    mtext[MSGSZ];
} message_buf;


main()
{
    int msqid;
    key_t key;
    message_buf  rbuf;

    key = 10;

    if ((msqid = msgget(key, 0666)) < 0) {
        perror("msgget");
        exit(1);
    }

    
    if (msgrcv(msqid, &rbuf, MSGSZ, 1, 0) < 0) {
        perror("msgrcv");
        exit(1);
    }

    printf("%s\n", rbuf.mtext);
    exit(0);
}
/*-------------------------------------------*/

Как работает send.c?

Текст программы начинается с определения структуры msgbuf, которая хранит сообщение, помещаемое в очередь. Эта структура имеет два поля, которые мы рассматривали выше, это тип сообщения mtype и массив mtext, в котором хранится собственно сообщение. Вызовом msgget создается новая очередь с ключом, равным 10, и флагами IPC_CREAT|0666, посредством чего мы выдаем право "на-чтение" всем пользователям (если быть более точным, то право на "чтение-запись" всем пользователям прим. перев.). Далее, тип сообщения устанавливается равным 1. Копируется текст сообщения "I am in the queue" в массив mtext. И затем сообщение передается в очередь вызовом функции msgsnd() с флагом IPC_NOWAIT (пояснения, касающиеся флагов в функции msgsnd() приводились выше). На каждом этапе производится проверка на возникновение ошибок и вывод соответствующих сообщений вызовом функции perror().

...И recv.c?

Код достаточно прост. Начинается программа с определения структуры, в которую будет приниматься сообщение из очереди. Далее, вызовом msgget(), программа получает дескриптор очереди с ключом 10, если таковая существует. Здесь следует отметить, что процессы должны указывать один и тот же ключ, для того чтобы создать (send.c) и получить доступ к очереди (recv.c). Здесь, в качестве аналогии, можно привести двух друзей, которые имеют ключ от одной и той же двери. Если один может запереть дверь, то другой может ее открыть, и больше никто (если не учитывать возможность взлома замка!). Функция msgrcv() вынимает сообщение из очереди в rbuf, которое затем выводится на дисплей. Четвертым аргументом функции msgrcv() передается 1, сможете догадаться почему? Как уже пояснялось ранее, программа send.c передала сообщение с типом равным 1, это объясняет, почему программа recv.c принимает сообщения только данного типа. Пятым параметром (msgflag) передается 0, который просто игнорируется. Наверное более правильным было бы передать флаги IPC_NOWAIT|MSG_NOERROR, в этом случае программа-приемник будет игнорировать ошибки, возникающие из-за несовпадения фактической длины сообщения и параметра msgsz. Если опустить флаг MSG_NOERROR, то в том случае, когда фактический размер сообщения превысит параметр msgsz, функция будет возвращать управление с признаком ошибки. Попробуйте выполнить команду ipcs после выполнения программы send.c и после выполнения программы recv.c. Вы должны получить нечто подобное:

После send.c:

------ Shared Memory Segments --------
key shmid owner perms bytes nattch status
------ Semaphore Arrays --------
key semid owner perms nsems status
------ Message Queues --------
key             msqid       owner perms used-bytes messages
0x0000000a      65536       root  666   19         1
После recv.c:
------ Shared Memory Segments --------
key shmid owner perms bytes nattch status
------ Semaphore Arrays --------
key semid owner perms nsems status
------Message Queues --------
key           msqid        owner   perms used-bytes messages
0x0000000a    65536        root      666        0            0
Сразу же бросаются в глаза отличия в полях used-bytes и messages Сообщение, переданное программой send.c было получено программой recv.c.

Неплохо было бы проверить -- как изменится поведение программы при использовании отрицательного значения msgtyp в программе recv.c. Измените программу send.c таким образом, чтобы она помещала в очередь несколько сообщений (скажем 3) с различными типами (пусть это будут 1, 2 и 3). Затем измените программу recv.c так, чтобы она принимала сообщения с типом, равным -2, а затем -3. К чему это приведет?

Допустим, что в программе recv.c параметр msgtyp равен -n, тогда она сможет принять все сообщения с типом, равным 1, 2, 3, ..., n. Для чего это нужно? Если задать n достаточно большим, скажем 1000, то мы сможем вынуть все сообщения из очереди.

В следующих выпусках мы познакомимся с более сложными примерами использования очередей сообщений.

Copyright (C) 2003, Raghu J Menon. Copying license
Copying license http://www.linuxgazette.com/copying.html
Published in Issue 89 of Linux Gazette, April 2003

Статья взята с сайта OpenNet. Оригинал: http://gazette.linux.ru.net/lg89/raghu.html

[ опубликовано 29/12/2003 ]

Raghu J Menon. Перевод Андрей Киселев (kis_an@mail.ru) - Очереди сообщений   Версия для печати