select() для очереди сообщений

Самое неудобное, при работе с файлами или сокетами и одновременно с очередями сообщений, заключается в отсутствии поддержки системным вызовом select() очередей сообщений. В данной статье я хотел бы продемонстрировать вам не совсем обычный подход к этой проблеме, суть которого сводится к реализации нового системного вызова с именем msgqToFd(). Эта статья может оказаться полезной для любителей поиграть с исходными текстами ядра GNU/Linux.

[Hyouck "Hawk" Kim. Перевод: Андрей Киселев]

select() для очереди сообщений



Автор: Hyouck "Hawk" Kim
Перевод: Андрей Киселев

Введение

Самое неудобное, при работе с файлами или сокетами и одновременно с очередями сообщений, заключается в отсутствии поддержки системным вызовом select() очередей сообщений. Поэтому, Unix-программисты обычно решают проблему примерно таким, довольно "корявым", способом

while(1)

{

select для сокета с таймаутом;

...

wait для очереди сообщений с флагом IPC_NOWAIT

}

Безусловно, такая реализация выглядит довольно уродливо. И мне она не нравится. Как один из вариантов решения проблемы можно было бы рассмотреть многопоточность. Но в данной статье я хотел бы продемонстрировать вам не совсем обычный подход, суть которого сводится к реализации нового системного вызова с именем msgqToFd(). Я вовсе не пытаюсь продемонстрировать вам завершенный и безошибочный код ядра. Я лишь хочу представить на ваш суд результаты моих экспериментов. Эта статья могла бы быть полезной для любителей поиграть с исходными текстами ядра GNU/Linux.


msgqToFd() -- Новый, нестандартный системный вызов

Сигнатура вызова:

int msgqToFd(int msgq_id)

Возвращает файловый дескриптор, соответствующий очереди сообщений, который может быть использован системным вызовом select().

При возникновении какой либо ошибки -- возвращается значение -1.

Приложение может использовать этот вызов примерно таким образом:

...

q_fd = msgqToFd(msgq_id);

while(1)

{

FD_ZERO(&rset);

FD_SET(0, &rset);

FD_SET(q_fd, &rset);

select(q_fd + 1, &rset, NULL, NULL, NULL);

if(FD_ISSET(0, &rset))

{

...

}

if(FD_ISSET(q_fd, &rset))

{

r = msgrcv(msgq_id, &msg, sizeof(msg.buffer), 0, 0);

...

}

}


Как работает select()

Дескриптор файла связан с соответствующей структурой типа file (include/linux/fs.h). В структуре file имеется поле struct file_operations, которое определяет набор операций, которые могут выполняться над данным файлом. В структуре file_operations имеется поле с именем poll. Которое, в общем случае, содержит ссылку на системный вызов select(), который в свою очередь вызывает функцию poll(), чтобы получить состояние файла (или сокета, или чего-то еще).

В общих чертах системный вызов select() работает примерно так:

while(1)

{

для каждого файлового дескриптора из данного набора

{

вызов функции poll(), чтобы получить маску -- mask.

if(mask & can_read or mask & can_write or mask & exception)

{

установить бит для этого fd т.к. этот файл доступен на чтение/запись либо возникло исключение.

retval++;

}

}

if(retval != 0)

break;

schedule_timeout(__timeout);

}

Саму реализацию системного вызова select() вы найдете в виде функций sys_select() и do_select() в файле fs/select.c. в исходном коде ядра.

Еще необходимо упомянуть о функции poll_wait(). Она помещает текущий процесс в очередь ожидания, которая предусмотрена теми или иными средствами ядра, такими как файлы, каналы, сокеты или, как в нашем случае, очередью сообщений.

Обратите внимание -- текущий процесс может быть размещен в нескольких очередях ожидания вызовом select()


long sys_msgqToFd(long msqid)

Системный вызов должен возвращать файловый дескриптор, соответствующий очереди сообщений. Файловый дескриптор должен указывать на структуру file, которая содержит file_operations -- список операций для манипуляций с очередью сообщений.

Что должна делать функция sys_msgqToFd()

  1. для заданного msqid, отыскать соответствующую struct msg_queue

  2. разместить новый inode вызовом get_msgq_inode()

  3. разместить новый файловый дескриптор вызовом get_unused_fd()

  4. разместить новую структуру file вызовом get_empty_filp()

  5. инициировать inode и структуру file

  6. записать ссылку на msgq_file_ops в поле file_operations

  7. записать в поле private_data, структуры file, msq->q_perm.key

  8. установить fd (дескриптор) и структуру file, вызовом fd_install()

  9. вернуть новый fd

Реализацию этого системного вызова вы найдете в файлах msg.c и msg.h, которые должны сопровождать эту статью. А так же загляните в файл sys_i386.c

msgq_poll()

реализация msgq_poll() очень проста.

Что она делает

  1. Находит очередь сообщений по ключу file->private_data.

  2. помещает текущий процесс в очередь ожидания, соответствующую данной очереди сообщений, вызовом poll_wait()

  3. если очередь сообщений пуста (msq->q_qnum == 0), установить маску, как доступную для записи (этому есть свое объяснение, но давайте пока забудем об этом). Иначе -- как доступную на чтение

  4. вернуть маску


Изменения существующего исходного кода очередей сообщений

Чтобы добавить поддержку poll() для очередей сообщений, нам необходимо внести некоторые изменения в существующий исходный код.

Эти изменения включают в себя

  1. добавление поля wait_queue_head в struct msg_queue, которое будет использоваться при размещении процесса в очереди ожидания вызовом select(). Дополнительно необходимо предусмотреть инициализацию этого поля при создании очереди сообщений. Обратите внимание на struct msg_queue и newque() в файле msg.c.

  2. Всякий раз, когда в очередь помещается новое сообщение, необходимо предусмотреть активизацию ожидающего процесса, который был размещен в очереди ожидания системным вызовом select(). Смотрите функцию sys_msgsnd() в файле msg.c.

  3. При удалении очереди сообщений или при изменении ее характеристик, все ожидающие процессы должны быть "разбужены". Смотрите функции sys_msgctl() и freeque() в файле msg.c.

  4. Чтобы иметь возможность размещать новые inode и структуры file, необходимо установить соответствующую файловую систему

  5. VFS подходит для этого как нельзя лучше. Нам необходимо лишь добавить код, который будет регистрировать новую файловую систему и выполнять некоторые настройки. Смотрите функцию msg_init() в файле msg.c.

Все изменения заключены в команду препроцессора "ifdef MSGQ_POLL_SUPPORT". Так что вы достаточно легко их найдете.


Сведения, касающиеся файловой системы

Прежде чем разместить новую структуру file, необходимо прежде инициировать поля f_vfsmnt и f_dentry. В противном случае вы получите сообщение OOPS на консоли. Для того, чтобы VFS могла корректно работать с этой новой структурой file, необходимо выполнить дополнительные настройки, которые уже упоминались ранее.

Так как в file_operations поддерживается только вызов poll(), для нас достаточно должным образом установить поля f_dentry и f_vfsmnt. Большая часть кода была скопирована из файла pipe.c.


Добавление нового системного вызова

Чтобы добавить новый системный вызов, необходимо сделать две вещи.

Первое -- добавить реализацию системного вызова в ядро, которая будет работать на уровне ядра, что мы уже сделали (sys_msgqToFd()).
В ядре GNU/Linux, все вызовы, имеющие отношение к system V IPC диспетчеризуются через функцию sys_ipc(), размещенную в файле arch/i386/kernel/sys_i386.c. Для идентификации системного вызова, sys_ipc() использует номер этого вызова. Прежде чем мы сможем обращаться к новому системному вызову, необходимо определиться с его номером (для sys_msgqToFd() это будет число 25) и добавить его обработку в функцию sys_ipc(). Просто загляните в файл arch/i386/kernel/entry.S (в исходных текстах ядра) и посмотрите на код функции sys_ipc() в файле sys_i386.c, который должен следовать с данной статьей.

Второе -- добавить функцию-заглушку, которая будет вызываться пользовательскими приложениями. Практически все такие функции-заглушки, выполняющие системные вызовы, реализованы в GLIBC. Чтобы добавить новый системный вызов, вам потребуется внести изменения в GLIBC, пересобрать и установить ее. О черт!!! Нет уж, СПАСИБО!!! Меня как то не прельщает делать все это, к тому же я не думаю что и вам эта идея понравится. Чтобы как то разрешить эту проблему, я скопировал часть кода из GLIBC. Если вы заглянете в файл user/syscall_stuff.c, поставляемый с данной статьей, то вы обнаружите функцию с именем msgqToFd(), которая и является функцией-заглушкой для системного вызова msgqToFd().

Она содержит единственную строку кода

return INLINE_SYSCALL(ipc, 5, 25, key, 0, 0, NULL);

Ниже приводится краткое описание этого макроопределения.

ipc : номер системного вызова для sys_ipc(). ipc описано как __NR_ipc и равно 117.
5 : число аргументов этого макроопределения.
25 : номер системного вызова для sys_msgqToFd()
key : аргумент функции sys_msgqToFd()

Макрос INLINE_SYSCALL подготавливает аргументы к передаче и вызывает прерывание 0x80 для перехода в режим ядра и осуществления системного вызова.

Заключение

Я не уверен в практической полезности приведенных мною изменений. Я лишь хотел убедиться в том, что такие изменения возможны.

Кроме того, я хотел бы сообщить о некоторых проблемах.

  1. Если два потока/процесса пытаются получить доступ к очереди сообщений, и один из них ожидает на msgrcv(), а другой -- на select(), то новое сообщение получит процесс/поток, который встал на ожидание раньше. Взгляните на pipelined_send() в msg.c.

  2. Маска доступности на запись, в функции msgq_poll() устанавливается только если очередь пуста. Фактически, эту маску можно устанавливать для случая, когда очередь не заполнена до конца, и тут не будет большой разницы. Но я выбрал такую реализацию для простоты.

  3. А теперь рассмотрим такой сценарий.

    1. Очередь создана
    2. Создан файловый дескриптор для очереди
    3. Очередь удалена

    Что следует сделать в этом случае? Корректно было бы закрыть файловый дескриптор во время удаления очереди. Но это невозможно, поскольку очередь сообщений может быть удалена любым процессом, который имеет права на это. Это означает, что процесс, удаляющий очередь сообщений, мог и не связывать очередь с файловым дескриптором, не смотря на то, что файловый дескриптор для очереди мог быть создан другим процессом.

    И еще, если после удаления вновь будет создана очередь с таким же ключом, то она будет обслуживать ранее размещенный файловый дескриптор.

  4. Проблема эффективности. При поступлении нового сообщения в очередь будут "разбужены" все процессы, ожидающие на вызове select(). Но сообщение получит только один из них, а остальные опять "заснут"..

  5. Отсутствует поддержка типов сообщений. Независимо от типа сообщения, если оно поступило, то select() вернет управление процессу.

Ошибки и исправления

Найдите и исправьте сами :-)

Исходный код

msg.c Измененная реализация очередей сообщений
msg.h Заголовочный файл для исходного кода с реализацией очередей сообщений
sys_i386.c Изменения, с целью добавления нового системного вызова
user/Makefile Makefile для сборки тестовой программы (переименуйте из Makefile.txt в Makefile)
user/syscall_stuff.c Функция-заглушка для msgqToFd()
user/msg_test.h Заголовочный файл для msgqToFd()
user/msgq.c Исходный код тестовой программы
user/msgq2.c Еще одна тестовая программа

В своих экспериментах я использовал ядро GNU/Linux 2-4-20 на платформе x86.
Для сборки тестового ядра просто скопируйте

msg.c в ipc/msg.c
msg.h в include/linux/msg.h
sys_i386.c в arch/i386/kernel/sys_i386.c

пересоберите ядро и установите его!!!!

Перед запуском тестовых программ необходимо создать файлы ключей:

touch .msgq_key1
touch .msgq_key2


Copyright (C) 2003, Hyouck "Hawk" Kim. Copying license http://www.linuxgazette.com/copying.html
Published in Issue 92 of Linux Gazette, July 2003

Оригинал статьи можно найти по адресу: http://gazette.linux.ru.net/lg92/hawk.html

[ опубликовано 05/05/2004 ]

Hyouck "Hawk" Kim. Перевод: Андрей Киселев - select() для очереди сообщений   Версия для печати