|
|
автор Leonardo Giordani <leo.giordani(at)libero.it> Об авторе: Недавно я получил диплом факультета телекоммуникационных технологий Политехнического Университета Милана. Интересуюсь программированием ( в основном на Ассемблере и C/C++ ). С 1999 практически постоянно работаю в Linux/Unix Перевод на Русский: Пухляков Кирилл <kirill(at)linuxfocus.org> Содержание: |
Параллельное программирование - очереди сообщений (3)Резюме: Это последняя заметка из серии о параллельном программировании: в ней мы рассмотрим второй уровень нашего протокола, создадим функции, реализующие поведение пользователя на основе первого уровня нашего протокола, реализованного в прошлой заметке. Неплохо было бы также вам прочитать сначала предыдущие заметки из этой серии:
|
Во второй реализации протокола реализованы функции высокого уровня для отправки и получения сообщений, для взаимодействия с "сервисами", для инициализации: эти функции реализованы на основе функций первого уровня протокола, что делает их достаточно понятными. Обратите внимание на некоторые объявления типов сообщений и сервисов в файле layer2.h.
Приложение ipcdemo демонстрационное, неоптимизированное - вы можете заметить много глобальных переменных, хочу обратить ваше внимание на то, что главной задачей является объяснение читателю вопросов IPC, но не оптимизации кода. Тем не менее, если вы обнаружите что-то действительно странное - пишите мне и мы обсудим.
При возникновении "пользовательского" процесса первое что ему необходимо сделать - это создать очередь и сообщить коммутатору как обратиться к ней: чтобы сделать это надо послать два сообщения - SERV_BIRTH и SERV_QID.
/* инициализация очереди */ qid = init_queue(i); /* сообщение коммутатору о возникновении "пользователя" */ child_send_birth(i, sw); /* сообщение коммутатору способа обращения к "пользователю" */ child_send_qid(i, qid, sw);Затем начинается рутинная работа: послать сообщение, проверить наличие сообщений от других "пользователей", проверить наличие запроса коммутатора и обратно по циклу.
Решение о посылке сообщения принимается на вероятностной основе: функция myrand() возвращает случайное число в дипазоне от 0 до значения переданного аргумента, в нашем случае 100, мы посылаем сообщение только если это число меньше указанной вероятности. Так как "пользователь" "спит" 1 секунду между последовательными выполнениями тела цикла, то, значит, за каждые 100 секунд он будет посылать сообщение примерно столько раз, чему равно значение вероятности отсылки; тут мы предполагаем, что выборка из 100 элементов достаточна, чтобы превратить вероятность в действительность, на самом деле 100 элементов достаточно мало для этого... Однако просто заметьте, что в программе не надо указывать слишком маленькие вероятности, иначе ваша симуляция будет длиться веками.
if(myrand(100) < send_prob){ dest = 0; /* не посылать сообщения коммутатору, самому себе и */ /* уже получившему */ while((dest == 0) || (dest == i) || (dest == olddest)){ dest = myrand(childs + 1); } olddest = dest; printf("%d -- U %d -- Message to user %d\n", (int) time(NULL), i, dest); child_send_msg(i, dest, 0, sw); }Сообщения посылаемые пользователями коммутатору и затем пересылаемые коммутатором нам - мы отметим как TYPE_CONN (от CONNECTION).
/* проверить наличие простых входящих сообщений */ if(child_get_msg(TYPE_CONN, &in)){ msg_sender = get_sender(&in); msg_data = get_data(&in); printf("%d -- U %d -- Message from user %d: %d\n", (int) time(NULL), i, msg_sender, msg_data); }Для запроса сервиса коммутатора будем использовать сообщения типа TYPE_SERV. В случае получения сообщения о прекращении работы - "пользователю" необходимо отправить подтверждающее сообщение, чтобы коммутатор отметил "пользователя" как недоступного и прекратил посылать ему сообщения; затем " пользователь" должен прочитать все сообщения, предназначенные ему ( чтобы выглядеть вежливым, мы можем пропустить этот момент ), удалить очередь и сказать "до свидания" коммутатору. В случае запроса сервиса проверки времени мы посылаем коммутатору сообщение с текущим временем, получив его коммутатор вычисляет разницу между временем получения и отправления сообщения, чтобы знать сколько времени сообщение провело в очередях и заносит это значение в лог.
/* проверка наличия запроса коммутатора */ if(child_get_msg(TYPE_SERV, &in)){ msg_service = get_service(&in); switch(msg_service){ case SERV_TERM: /* извините, необходимо прекратить работу */ /* послать подтверждение коммутатору */ child_send_death(i, getpid(), sw); /* прочитать сообщения из очереди */ while(child_get_msg(TYPE_CONN, &in)){ msg_sender = get_sender(&in); msg_data = get_data(&in); printf("%d -- U %d -- Message from user %d: %d\n", (int) time(NULL), i, msg_sender, msg_data); } /* удалить очередь */ close_queue(qid); printf("%d -- U %d -- Termination\n", (int) time(NULL), i); exit(0); break; case SERV_TIME: /* необходимо провести замер времени пребывания сообщения в очередях */ child_send_time(i, sw); printf("%d -- U %d -- Timing\n", (int) time(NULL), i); break; } }
Во второй части своей работы родительский процесс также как и "пользовательский" ходит по циклу до тех пор пока не отключатся все "пользователи". Коммутатор принимает сообщения от "пользователей" и перенаправляет их по назначению.
/* проверить попытку подключения "пользователя" */ if(switch_get_msg(TYPE_CONN, &in)){ msg_receiver = get_receiver(&in); msg_sender = get_sender(&in); msg_data = get_data(&in); /* если адресат доступен */ if(queues[msg_receiver] != sw){ /* послать сообщение адресату */ switch_send_msg(msg_sender, msg_data, queues[msg_receiver]); printf("%d -- S -- Sender: %d -- Destination: %d\n", (int) time(NULL), msg_sender, msg_receiver); } else{ /* адресат недоступен */ printf("%d -- S -- Unreachable destination (Sender: %d - Destination: %d)\n", (int) time(NULL), msg_sender, msg_receiver); }Если "пользователь" посылает сообщение через коммутатор, ему может быть послан запрос одного из двух видов. Решение о посылке запроса и выбор типа запроса производится на вероятностной основе (принцип аналогичен ранее описаному). Первый тип запроса, который может быть послан - запрос на завершение работы "пользователя", второй - запрос на замер времени: мы фиксируем текущее время и помечаем пользователя, чтобы в последующем не пытаться заново замерять время у пользователя, который уже это делает. Если мы не приняли сообщение, возможно все пользователи уже завершили работу. В этом случае мы выжидаем, чтобы порожденные процессы завершили работу до конца (последний пользователь может проверять оставшиеся сообщение в очереди), уничтожаем нашу очередь и выходим.
/* случайный запрос сервиса инициатора последнего сообщения */ if((myrand(100) < death_prob) && (queues[msg_sender] != sw)){ switch(myrand(2)) { case 0: /* пользователь должен отключиться */ printf("%d -- S -- User %d chosen for termination\n", (int) time(NULL), msg_sender); switch_send_term(i, queues[msg_sender]); break; case 1: /* проверка наличия замера пользователя */ if(!timing[msg_sender][0]){ timing[msg_sender][0] = 1; timing[msg_sender][1] = (int) time(NULL); printf("%d -- S -- User %d chosen for timing...\n", timing[msg_sender][1], msg_sender); switch_send_time(queues[msg_sender]); } break; } } } else{ if(deadproc == childs){ /* все порожденные процессы завершили работу, ожидание окончания последним своих задач */ waitpid(pid, &status, 0); /* удаление очереди коммутатора */ remove_queue(sw); /* завершение программы */ exit(0); } }Затем мы проверяем, не получили ли мы сервисное сообщение: мы можем получить сообщения о начале и завершении работы пользователя, id очереди и ответы на запрос замера времени.
if(switch_get_msg(TYPE_SERV, &in)){ msg_service = get_service(&in); msg_sender = get_sender(&in); switch(msg_service) { case SERV_BIRTH: /* подключение нового пользователя */ printf("%d -- S -- Activation of user %d\n", (int) time(NULL), msg_sender); break; case SERV_DEATH: /* завершение работы пользователя */ printf("%d -- S -- User %d is terminating\n", (int) time(NULL), msg_sender); /* удаление очереди пользователя из списка */ queues[msg_sender] = sw; /* контроль количество пользователей, завершивших работу */ deadproc++; break; case SERV_QID: /* посылка пользователем идентификатора своей очереди */ msg_data = get_data(&in); printf("%d -- S -- Got queue id of user %d: %d\n", (int) time(NULL), msg_sender, msg_data); queues[msg_sender] = msg_data; break; case SERV_TIME: msg_data = get_data(&in); /* информация о времени */ timing[msg_sender][1] = msg_data - timing[msg_sender][1]; printf("%d -- S -- Timing of user %d: %d seconds\n", (int) time(NULL), msg_sender, timing[msg_sender][1]); /* The user is no more under time control */ timing[msg_sender][0] = 0; break; } }
Небольшой совет касающийся IPC экспериментов. Представьте, что вы запустили несколько раз программу, которая работает не так как вы хотите, в этом случае простое нажатие клавиш Ctrl-C не уничтожит все порожденные процессы. Ранее я не упоминал об утилите "kill", но теперь вы знаете немного о процессах и я уверен, что вы разберетесь с man страницей. Но есть еще одна вещь, которую оставляют за собой процессы - IPC структуры. В приведенном выше примере уничтоженные процессы не освободят выделенную память; чтобы сделать это - мы можем использовать программы ipcs и ipcrm: ipcs показывает список выделенных IPC ресурсов (будьте внимательны - она покажет вам все ресурсы, не только вашего приложения), а ipcrm даст вам возможность удалить некоторые из них; если вы запустите ipcrm без аргументов - вы получите всю интересующую вас информацию: предлагаемые цифры для первых экспериментов - "5 70 70".
Разархивируйте командой "tar xvzf ipcdemo-0.1.tar.gz". Чтобы собрать ipcdemo выполните команду "make" внутри каталога с проектом; "make clean" - убирает backup файлы, а "make cleanall" убирает также object файлы.
Отладчики верные друзья разработчика, по крайней мере в момент разработки: научитесь сначала пользоваться gdb, а потом уже ddd потому, что графическое приложение это конечно хорошо, но необходимо знать и основы.
Наверняка вы когда-нибудь получали такое сообщение - "Segmentation fault" и размышляли где вы совершили ошибку в коде. Могу посоветовать вам кроме изучения файла с дампом, обратить внимание на valgrind и наблюдать за памятью. Также для чтения core dumped файла с помощью gdb вы можете использовать valgrind.
Вообщем-то создавать IPC приложения на языке 'C' занятие интересное, но непростое. Возможным решением может стать выбор языка Python: в нем полностью поддерживается fork и другое, связанное с этим. Обратите внимание на этот язык.
|
Webpages maintained by the LinuxFocus Editor team
© Leonardo Giordani, FDL LinuxFocus.org |
Translation information:
|
2004-01-27, generated by lfparser version 2.43