|
|
автор Leonardo Giordani <leo.giordani(at)libero.it> Об авторе: Недавно я получил диплом факультета телекоммуникационных технологий Политехнического Университета Милана. Интересуюсь программированием ( в основном на Ассемблере и C/C++ ). С 1999 практически постоянно работаю в Linux/Unix Перевод на Русский: Kirill Pukhlyakov <kirill(at)linuxfocus.org> Содержание: |
Параллельное программирование - очереди сообщений (2)Резюме: Цель этой серии заметок - знакомство читателя с идеей многозадачности и ее реализация в ОС Linux. Начав эту серию с теоретических основ многозадачности - мы закончим написанием приложения, демонстрирующего взаимодействие между процессами посредством простого, но эффективного протокола. С чем должны быть знакомы читатели для понимания заметки:
Неплохо было бы также вам прочитать сначала предыдущие заметки из этой серии:
|
Как мы уже сказали, протокол - это набор правил, позволяющих общаться людям или машинам, даже если они разные. Например, английский язык является протоколом, позволяющим мне передавать знания моим индийским читателям ( им всегда очень интересно то, что я пишу ). Говоря о чем-нибудь более приближенном к ОС Linux, мы можем упомянуть о перекомпиляции ядра ( не пугайтесь, это не так трудно ), в частности о сетевом разделе, где вы можете указать ядру о необходимости понимания разных сетевых протоколов, таких как TCP/IP.
Чтобы придумать протокол во-первых необходимо подумать о типе приложения, для которого он будет создан. Мы займемся созданием простого телефонного коммутатора. Основным процессом будет коммутатор, а потомками - пользователи: задача будет - позволить пользователям обмениваться сообщениями через этот коммутатор.
В протоколе необходимо предусмотреть три события: появление пользователя ( т.е. пользователь присутствует и подключен ), обычные действия пользователя и его исчезновение ( пользователь не подключен ). Остановимся немного подробнее на этих событиях:
При подключении пользователя к системе он создает свою собственную очередь ( не забывайте, что мы говорим о процессах ), ее идентификатор должен быть послан коммутатору, чтобы он знал как обратиться к этому пользователю. Здесь самое время создать некие структуры данных, если в них есть необходимость. От коммутатора необходимо получить идентификатор очереди, чтобы знать куда посылать сообщения другим пользователям.
Пользователь может посылать и принимать сообщения. В случае посылки сообщения другому пользователю мы должны сначала узнать - подключен он или нет, но в любом случае посылающему пользователю необходимо вернуть ответ, чтобы он знал что произошло с его сообщением. Принимающей стороне ничего не надо делать - все сделает коммутатор.
При отключении пользователя от системы ему необходимо оповестить об этом коммутатор. В этом фрагменте кода показывается как это сделать
/* Birth */ create_queue init send_alive send_queue_id get_switch_queue_id /* Work */ while(!leaving){ receive_all if(<send condition>){ send_message } if(<leave condition>){ leaving = 1 } } /* Death */ send_dead
Теперь рассмотрим поведение нашего коммутатора: при подключении пользователя он посылает сообщение с идентификатором его очереди сообщений; нам необходимо его запомнить, чтобы передавать предназначенные ему сообщения и в свою очередь передать ему идентификатор очереди куда бы он мог посылать сообщения для других пользователей. Далее, нам необходимо анализировать сообщения от пользователей и проверять подключены или нет те, кому они предназначены: в случае если пользователь подключен - передать ему сообщение, если нет - отменить сообщение, в обоих случаях необходимо дать ответ отправителю. При отключении пользователя мы удаляем идентификатор его очереди и он становится недоступным.
Это реализуется следующим образом
while(1){ /* New user */ if (<birth of a user>){ get_queue_id send switch_queue_id } /* User dies */ if (<death of a user>){ remove_user } /* Messages delivering */ check_message if (<user alive>){ send_message ack_sender_ok } else{ ack_sender_error } }
Первое что нам необходимо - определить структуру для сообщения, используя прототип уровня ядра - msgbuf
typedef struct { int service; int sender; int receiver; int data; } messg_t; typedef struct { long mtype; /* Tipo del messaggio */ messg_t messaggio; } mymsgbuf_t;
Это на самом деле общий вид, который мы дополним позже: поля получателя и отправителя содержат идентификатор пользователя, поле данных - общие данные, поле сервиса используется для запроса на обслуживание у коммутатора. Например представим, что у нас два сервиса - один для незамедлительной и один для отсроченной доставки - в каждом случае поле данных содержит количество секунд задержки. Это всего лишь пример, но нам важно понять насколько разнообразно применение поля сервиса.
Теперь создаим несколько функций для управления нашими структурами данных, в частности для заполнения и чтения полей сообщения. Эти функции более-менее схожи, поэтому я покажу только две из них, а остальные вы найдете в заголовочных файлах
.void set_sender(mymsgbuf_t * buf, int sender) { buf->message.sender = sender; } int get_sender(mymsgbuf_t * buf) { return(buf->message.sender); }
Назначение их не в уменьшении кода ( каждая состоит всего из одной строки ), а в приближении протокола к чему-то более понятному для человека, а следовательно для понимания протокола в целом.
Теперь напишем функции для создания IPC ключей, создания и удаления очередей сообщений, отправки и получения сообщений: создание IPC ключа
key_t build_key(char c) { key_t key; key = ftok(".", c); return(key); }
функция создания очереди
int create_queue(key_t key) { int qid; if((qid = msgget(key, IPC_CREAT | 0660)) == -1){ perror("msgget"); exit(1); } return(qid); }
как видите обработка ошибок в данном случае достаточно проста. Следующая функция - уничтожение очереди
int remove_queue(int qid) { if(msgctl(qid, IPC_RMID, 0) == -1) { perror("msgctl"); exit(1); } return(0); }
И наконец функции для отправки и получения сообщений - т.е. запись сообщения в определенную очередь и чтение из нее
int send_message(int qid, mymsgbuf_t *qbuf) { int result, lenght; lenght = sizeof(mymsgbuf_t) - sizeof(long); if ((result = msgsnd(qid, qbuf, lenght, 0)) == -1){ perror("msgsnd"); exit(1); } return(result); } int receive_message(int qid, long type, mymsgbuf_t *qbuf) { int result, length; length = sizeof(mymsgbuf_t) - sizeof(long); if((result = msgrcv(qid, (struct msgbuf *)qbuf, length, type, IPC_NOWAIT)) == -1){ if(errno == ENOMSG){ return(0); } else{ perror("msgrcv"); exit(1); } } return(result); }
Вот и все. Вы можете найти эти функции в файле layer1.h: попробуйте написать сами программу используя их. В следующей заметке мы поговорим о втором уровне протокола и создадим его.
Присылайте свои комментарии, вопросы на мой почтовый адрес leo.giordani(at)libero.it или пишите мне через "страницу отзывов". Вы можете писать мне на английском, немецком или итальянском языках.
|
Webpages maintained by the LinuxFocus Editor team
© Leonardo Giordani, FDL LinuxFocus.org |
Translation information:
|
2003-09-26, generated by lfparser version 2.43