[LinuxFocus-icon]
<--  | Домой  | Карта  | Индекс  | Поиск

Новости | Архивы | Ссылки | Про LF
[an error occurred while processing this directive]
[Leonardo]
автор Leonardo Giordani
<leo.giordani(at)libero.it>

Об авторе:

Недавно я получил диплом факультета телекоммуникационных технологий Политехнического Университета Милана. Интересуюсь программированием ( в основном на Ассемблере и C/C++ ). С 1999 практически постоянно работаю в Linux/Unix



Перевод на Русский:
Пухляков Кирилл <kirill(at)linuxfocus.org>

Содержание:

 

Параллельное программирование - очереди сообщений (3)

[run in paralell]

Резюме:

Это последняя заметка из серии о параллельном программировании: в ней мы рассмотрим второй уровень нашего протокола, создадим функции, реализующие поведение пользователя на основе первого уровня нашего протокола, реализованного в прошлой заметке.

Неплохо было бы также вам прочитать сначала предыдущие заметки из этой серии:


_________________ _________________ _________________

 

Реализация протокола - Уровень 2 - Общие вопросы

Приложение ipcdemo было разработано для простой реализации коммутатора, позволяющего пользователям обмениваться сообщениями. Чтобы немного разнообразить приложение я добавил так называемый "сервис" - сообщение, задача которого дать пользователям возможность сообщать коммутатору о своем статусе - готовы ли они принимать сообщения, каким образом доставить им их ( послав id очереди IPC ) или они собираются отключиться. Также были добавлены еще два сервиса: Termination и Timing: первый используется для сообщения пользователю, что коммутатор выключается, второй - для измерения времени отклика пользователя. Ниже мы рассмотрим эти моменты подробнее, в разделах о "пользователе" и "коммутаторе" соответственно.

Во второй реализации протокола реализованы функции высокого уровня для отправки и получения сообщений, для взаимодействия с "сервисами", для инициализации: эти функции реализованы на основе функций первого уровня протокола, что делает их достаточно понятными. Обратите внимание на некоторые объявления типов сообщений и сервисов в файле layer2.h.

Приложение ipcdemo демонстрационное, неоптимизированное - вы можете заметить много глобальных переменных, хочу обратить ваше внимание на то, что главной задачей является объяснение читателю вопросов IPC, но не оптимизации кода. Тем не менее, если вы обнаружите что-то действительно странное - пишите мне и мы обсудим.

 

Реализация пользовательского процесса

"Пользователь" это на самом деле порожденный процесс коммутатора ( или лучше сказать - родительского процесса моделирующего коммутатор ). Это значит, что "пользователь" обладает теми же переменными, что и коммутатор: ему известен идентификатор очереди коммутатора, потому что он был сохранен коммутатором в локальной переменной до порождения других процессов.

При возникновении "пользовательского" процесса первое что ему необходимо сделать - это создать очередь и сообщить коммутатору как обратиться к ней: чтобы сделать это надо послать два сообщения - SERV_BIRTH и SERV_QID.

/* инициализация очереди  */
qid = init_queue(i);

/* сообщение коммутатору о возникновении "пользователя" */
child_send_birth(i, sw);

/* сообщение коммутатору способа обращения к "пользователю" */
child_send_qid(i, qid, sw);
Затем начинается рутинная работа: послать сообщение, проверить наличие сообщений от других "пользователей", проверить наличие запроса коммутатора и обратно по циклу.

Решение о посылке сообщения принимается на вероятностной основе: функция myrand() возвращает случайное число в дипазоне от 0 до значения переданного аргумента, в нашем случае 100, мы посылаем сообщение только если это число меньше указанной вероятности. Так как "пользователь" "спит" 1 секунду между последовательными выполнениями тела цикла, то, значит, за каждые 100 секунд он будет посылать сообщение примерно столько раз, чему равно значение вероятности отсылки; тут мы предполагаем, что выборка из 100 элементов достаточна, чтобы превратить вероятность в действительность, на самом деле 100 элементов достаточно мало для этого... Однако просто заметьте, что в программе не надо указывать слишком маленькие вероятности, иначе ваша симуляция будет длиться веками.

if(myrand(100) < send_prob){
  dest = 0;

  /* не посылать сообщения коммутатору, самому себе и */
  /* уже получившему */
  while((dest == 0) || (dest == i) || (dest == olddest)){
    dest = myrand(childs + 1);
  }
  olddest = dest;

  printf("%d -- U %d -- Message to user %d\n", (int) time(NULL), i, dest);
  child_send_msg(i, dest, 0, sw);
}
Сообщения посылаемые пользователями коммутатору и затем пересылаемые коммутатором нам - мы отметим как TYPE_CONN (от CONNECTION).
/* проверить наличие простых входящих сообщений */
if(child_get_msg(TYPE_CONN, &in)){
  msg_sender = get_sender(&in);
  msg_data = get_data(&in);
  printf("%d -- U %d -- Message from user %d: %d\n",
         (int) time(NULL), i, msg_sender, msg_data);
}
Для запроса сервиса коммутатора будем использовать сообщения типа TYPE_SERV. В случае получения сообщения о прекращении работы - "пользователю" необходимо отправить подтверждающее сообщение, чтобы коммутатор отметил "пользователя" как недоступного и прекратил посылать ему сообщения; затем " пользователь" должен прочитать все сообщения, предназначенные ему ( чтобы выглядеть вежливым, мы можем пропустить этот момент ), удалить очередь и сказать "до свидания" коммутатору. В случае запроса сервиса проверки времени мы посылаем коммутатору сообщение с текущим временем, получив его коммутатор вычисляет разницу между временем получения и отправления сообщения, чтобы знать сколько времени сообщение провело в очередях и заносит это значение в лог.
/* проверка наличия запроса коммутатора */
if(child_get_msg(TYPE_SERV, &in)){
  msg_service = get_service(&in);

  switch(msg_service){
  case SERV_TERM:
    /* извините, необходимо прекратить работу */
    /* послать подтверждение коммутатору */
    child_send_death(i, getpid(), sw);

    /* прочитать сообщения из очереди */
    while(child_get_msg(TYPE_CONN, &in)){
      msg_sender = get_sender(&in);
      msg_data = get_data(&in);
      printf("%d -- U %d -- Message from user %d: %d\n",
             (int) time(NULL), i, msg_sender, msg_data);
    }

    /* удалить очередь */
    close_queue(qid);
    printf("%d -- U %d -- Termination\n", (int) time(NULL), i);
    exit(0);
    break;
  case SERV_TIME:
    /* необходимо провести замер времени пребывания сообщения в очередях */
    child_send_time(i, sw);
    printf("%d -- U %d -- Timing\n", (int) time(NULL), i);
    break;
  }
}
 

Реализация процесса коммутатора

Работу родительского процесса мы можем разделить на две части - "до" и "после" создания порожденных процессов. В первой части необходимо инициализировать массив для хранения идентификаторов порожденных процесов и создать свою очередь. Создавать массив для хранения идентификаторов не совсем верное решение для реального приложения - гораздо правильнее использовать динамические структуры и выделение памяти ( memory allocation ), но объяснение этих тем находится за пределами данной заметки. Сначала идентификаторы очереди инициализируются идентификатором коммутатора, чтобы показать что "пользователь" еще не подключен, после отключения "пользователя" идентификатору опять присваивается оригинальное значение.

Во второй части своей работы родительский процесс также как и "пользовательский" ходит по циклу до тех пор пока не отключатся все "пользователи". Коммутатор принимает сообщения от "пользователей" и перенаправляет их по назначению.

/* проверить попытку подключения "пользователя" */
if(switch_get_msg(TYPE_CONN, &in)){

  msg_receiver = get_receiver(&in);
  msg_sender = get_sender(&in);
  msg_data = get_data(&in);

  /* если адресат доступен */
  if(queues[msg_receiver] != sw){

    /* послать сообщение адресату */
    switch_send_msg(msg_sender, msg_data, queues[msg_receiver]);

    printf("%d -- S -- Sender: %d -- Destination: %d\n",
           (int) time(NULL), msg_sender, msg_receiver);
  }
  else{
    /* адресат недоступен */
    printf("%d -- S -- Unreachable destination (Sender: %d - Destination: %d)\n",
          (int) time(NULL), msg_sender, msg_receiver);
  }
Если "пользователь" посылает сообщение через коммутатор, ему может быть послан запрос одного из двух видов. Решение о посылке запроса и выбор типа запроса производится на вероятностной основе (принцип аналогичен ранее описаному). Первый тип запроса, который может быть послан - запрос на завершение работы "пользователя", второй - запрос на замер времени: мы фиксируем текущее время и помечаем пользователя, чтобы в последующем не пытаться заново замерять время у пользователя, который уже это делает. Если мы не приняли сообщение, возможно все пользователи уже завершили работу. В этом случае мы выжидаем, чтобы порожденные процессы завершили работу до конца (последний пользователь может проверять оставшиеся сообщение в очереди), уничтожаем нашу очередь и выходим.
  /* случайный запрос сервиса инициатора последнего сообщения */
  if((myrand(100)  < death_prob) && (queues[msg_sender] != sw)){
    switch(myrand(2))
      {
      case 0:
    /* пользователь должен отключиться */
    printf("%d -- S -- User %d chosen for termination\n",
          (int) time(NULL), msg_sender);
    switch_send_term(i, queues[msg_sender]);
    break;
      case 1:
    /* проверка наличия замера пользователя */
    if(!timing[msg_sender][0]){
      timing[msg_sender][0] = 1;
      timing[msg_sender][1] = (int) time(NULL);
      printf("%d -- S -- User %d chosen for timing...\n",
            timing[msg_sender][1], msg_sender);
      switch_send_time(queues[msg_sender]);
    }
    break;
      }
  }
}
else{
  if(deadproc == childs){
    /* все порожденные процессы завершили работу, ожидание окончания последним своих задач */
    waitpid(pid, &status, 0);

    /* удаление очереди коммутатора */
    remove_queue(sw);

    /* завершение программы */
    exit(0);
  }
}
Затем мы проверяем, не получили ли мы сервисное сообщение: мы можем получить сообщения о начале и завершении работы пользователя, id очереди и ответы на запрос замера времени.
if(switch_get_msg(TYPE_SERV, &in)){
  msg_service = get_service(&in);
  msg_sender = get_sender(&in);

  switch(msg_service)
    {
    case SERV_BIRTH:
      /* подключение нового пользователя */
      printf("%d -- S -- Activation of user %d\n", (int) time(NULL), msg_sender);
      break;

    case SERV_DEATH:
      /* завершение работы пользователя */
      printf("%d -- S -- User %d is terminating\n", (int) time(NULL), msg_sender);

      /* удаление очереди пользователя из списка */
      queues[msg_sender] = sw;

      /* контроль количество пользователей, завершивших работу */
      deadproc++;
    break;

    case SERV_QID:
      /* посылка пользователем идентификатора своей очереди */
      msg_data = get_data(&in);
      printf("%d -- S -- Got queue id of user %d: %d\n",
            (int) time(NULL), msg_sender, msg_data);
      queues[msg_sender] = msg_data;
      break;

    case SERV_TIME:
      msg_data = get_data(&in);

      /* информация о времени */
      timing[msg_sender][1] = msg_data - timing[msg_sender][1];

      printf("%d -- S -- Timing of user %d: %d seconds\n",
            (int) time(NULL), msg_sender, timing[msg_sender][1]);
      /* The user is no more under time control */
      timing[msg_sender][0] = 0;
      break;
    }
}
 

Заключение

Это последняя заметка из серии о параллельном программировании: естественно мы не рассмотрели все аспекты этого подхода, но мы можем точно сказать, что вы теперь знаете что скрывается за словом IPC и для решения каких проблем это верный выбор. Советую вам немного усложнить наше приложение, как я уже говорил - отладка таких приложений непростое занятие, но с другой стороны вы можете лучше узнать возможности отладчиков ( не забывайте что gdb ваш верный друг в процессе программирования ), обратите внимание на ссылки ниже - там вы найдете интересные приложения.

Небольшой совет касающийся IPC экспериментов. Представьте, что вы запустили несколько раз программу, которая работает не так как вы хотите, в этом случае простое нажатие клавиш Ctrl-C не уничтожит все порожденные процессы. Ранее я не упоминал об утилите "kill", но теперь вы знаете немного о процессах и я уверен, что вы разберетесь с man страницей. Но есть еще одна вещь, которую оставляют за собой процессы - IPC структуры. В приведенном выше примере уничтоженные процессы не освободят выделенную память; чтобы сделать это - мы можем использовать программы ipcs и ipcrm: ipcs показывает список выделенных IPC ресурсов (будьте внимательны - она покажет вам все ресурсы, не только вашего приложения), а ipcrm даст вам возможность удалить некоторые из них; если вы запустите ipcrm без аргументов - вы получите всю интересующую вас информацию: предлагаемые цифры для первых экспериментов - "5 70 70".

Разархивируйте командой "tar xvzf ipcdemo-0.1.tar.gz". Чтобы собрать ipcdemo выполните команду "make" внутри каталога с проектом; "make clean" - убирает backup файлы, а "make cleanall" убирает также object файлы.

 

Заключение

Прошу прощения за задержку этой заметки, программирование не единственное занятие в жизни... Как обычно жду ваших комментариев по поводу заметки и конечно предложений тем следующих заметок: что вы например думаете о threads?  

Рекомендуемые приложения, сайты и литература

Литературу советую посмотреть вам в предыдущих заметках, здесь хочу посоветовать вам несколько адресов в Интернет о программировании, отладке и т.д.

Отладчики верные друзья разработчика, по крайней мере в момент разработки: научитесь сначала пользоваться gdb, а потом уже ddd потому, что графическое приложение это конечно хорошо, но необходимо знать и основы.

Наверняка вы когда-нибудь получали такое сообщение - "Segmentation fault" и размышляли где вы совершили ошибку в коде. Могу посоветовать вам кроме изучения файла с дампом, обратить внимание на valgrind и наблюдать за памятью. Также для чтения core dumped файла с помощью gdb вы можете использовать valgrind.

Вообщем-то создавать IPC приложения на языке 'C' занятие интересное, но непростое. Возможным решением может стать выбор языка Python: в нем полностью поддерживается fork и другое, связанное с этим. Обратите внимание на этот язык.

 

Загрузить

 

Страница отзывов

У каждой заметки есть страница отзывов. На этой странице вы можете оставить свой комментарий или просмотреть комментарии других читателей
 talkback page 

<--, перейти к начальной странице выпуска

Webpages maintained by the LinuxFocus Editor team
© Leonardo Giordani, FDL
LinuxFocus.org
Translation information:
en --> -- : Leonardo Giordani <leo.giordani(at)libero.it>
en --> ru: Пухляков Кирилл <kirill(at)linuxfocus.org>

2004-01-27, generated by lfparser version 2.43