Эффективное программирование TCP-IP

       

Подумайте, не сделать ли приложение событийно-управляемым (2)


| | |

Здесь будет продолжено обсуждение, начатое в совете 20, а также проиллюстрировано использование функции tselect в приложениях и рассмотрены некоторые другие аспекты событийно-управляемого программирования. Вернемся к архи­тектуре с двумя соединениями из совета 19.

Взглянув на программу xout2 (листинг 3.13), вы увидите, что она не управляется событиями. Отправив сообщение удаленному хосту, вы не возвращаетесь к чтению новых данных из стандартного ввода, пока не придет подтверждение Причина в том, что таймер может сбросить новое сообщение. Если бы вы взвели таймер для следующего сообщения, не дождавшись подтверждения, то никогда не узнали бы, подтверждено старое сообщение или нет.

Проблема, конечно, в том, что в программе xout2 только один таймер и поэтому она не может ждать более одного сообщения в каждый момент. Воспользовавшись t select, вы сможете получить несколько таймеров из одного, предоставляемого select.

Представьте, что внешняя система из совета 19 - это шлюз, отправляющий сообщение третьей системе по ненадежному протоколу. Например, он мог бы по­сылать датаграммы в радиорелейную сеть. Предположим, что сам шлюз не дает информации о том, было ли сообщение успешно доставлено. Он просто переправ­ляет сообщение и возвращает подтверждение, полученное от третьей системы.

Чтобы в какой-то мере обеспечить надежность, новый писатель xout3 повторно посылает сообщение (но только один раз), если в течение определенного времени не получает подтверждения. Если и второе сообщение не подтверждено, xout3 протоколирует этот факт и отбрасывает сообщение. Чтобы ассоциировать подтверждение с сообщением, на которое оно поступило, xout 3 включает в каждое сообщение не­кий признак. Конечный получатель сообщения возвращает этот признак в составе подтверждения. Начнем с рассмотрения секции объявлений xout3 (листинг 3.18)

Листинг 3.18. Объявления для программы xout3

1    #define ACK 0x6 /* Символ  подтверждения  АСК.   */

2    #define MRSZ 128 /* Максимальное число неподтвержденных сообщений.*/


3    #define T1 3000  /* Ждать 3 с до первого АСК */
4    #define T2 5000  /* и 5 с до второго АСК. */
5    #define ACKSZ ( sizeof ( u_int32_t ) + 1 )


6    typedef struct /* Пакет данных. */
7    {
8    u_int32_t len; /* Длина признака и данных. */
9    u_int32_t cookie; /* Признак сообщения. */
10   char buf[ 128 ]; /* Сообщение. */
11   } packet_t;
12   typedef struct /* Структура сообщения. */
13   {
14   packet_t pkt;  /* Указатель на сохраненное сообщение.*/
15   int id; /* Идентификатор таймера. */
16   } msgrec_t;
17   static msgrec_t  mr[ MRSZ ];
18   static SOCKET s;
Объявления
5 Признак, включаемый в каждое сообщение, — это 32- разрядный порядковый номер сообщения. Подтверждение от удаленного хоста определяется как ASCII-символ АСК, за которым следует признак подтверждаемого сообщения. Поэтому константа ASCZ вычисляется как длина признака плюс 1.
6-11 Тип packet_t определяет структуру посылаемого пакета. Поскольку сообщения могут быть переменной длины, в каждый пакет включена длина сообщения. Удаленное приложение может использовать это поле для разбиения потока данных на отдельные записи (об этом шла речь в совете 6). Поле len - это общая длина самого сообщения и признака. Проблемы, связанные с упаковкой структур, рассматриваются в заме­чаниях после листинга 2.15.
12-16 Структура msgrec_t содержит структуру packet_t, посланную удаленному хосту. Пакет сохраняется на случай, если придется послать его повторно. Поле id - это идентификатор таймера, выступающего в роли таймера ретрансмиссии для этого сообщения.
17 С каждым неподтвержденным сообщением связана структура msgrec_t. Все они хранятся в массиве mr.
Теперь обратимся к функции main программы xout3 (листинг 3.19).
Листинг 3.19. Функция main программы xout3
1    int main( int argc, char **argv )
2    {
3    fd_set allreads;
4    fd_set readmask;
5    msgrec_t *mp;
6    int rc;
7    int mid;
8    int cnt = 0;
9    u_int32_t msgid = 0;
10   char ack[ ACKSZ ];
11  INIT();


12  s = tcp_client( argv[ 1 ], argv[ 2 ] );
13  FD_ZERO( &allreads );
14  FD_SET( s, &allreads );
15  FD_SET( 0, &allreads );
16  for ( mp = mr; mp < mr + MRSZ; mp++ )
17    mp->pkt.len = -1;
18  for ( ; ; )
19  {
20     readmask = allreads;
21     rc-= tselectf s + 1, &readmask, NULL, NULL );
22     if ( rc < 0 )
23      error( 1, errno, "ошибка вызова tselect" );
24     if ( rc == 0 )
25      error( 1, 0, "tselect сказала, что нет событий\n")
26     if ( FD_ISSET( s, &readmask ) )
27     {
28      rc = recv( s, ack + cnt, ACKSZ - cnt, 0 );
29      if ( rc == 0 )
30       error( 1, 0, "сервер отсоединился\n");
31      else if ( rc < 0 )
32       error( 1, errno, "ошибка вызова recv" );
33      if ( ( cnt += rc ) < ACKSZ ) /* Целое сообщение? */
34       continue;   /* Нет, еще подождем. */
35      cnt =0;    /* В следующий раз новое сообщение. */
36      if  ( ack[   0   ] != ACK)
37      {
38       error (  0,0," предупреждение: неверное подтверждение\n");
39       continue;
40      }
41      memcpy( &mid, ack + 1, sizeof( u_int32_t ) );
42      mp = findmsgrec( mid );
43      if (  mp   != NULL)
44      {
45       untimeout(  mp->id  ); /* Отменить таймер.*/
46       freemsgrecf mp  );   /* Удалить сохраненное сообщение.  */
47      }
48     }
49     if ( FD_ISSET( 0, &readmask ) )
50     {
51      mp = getfreerec ();
52      rc = read( 0, mp->pkt.buf, sizeoft mp->pkt.buf )
53      if ( rc < 0 )
54      error( 1, errno, "ошибка вызова read" );
55      mp->pkt.buf[ rc ] = '\0';
56      mp->pkt.cookie = msgid++;
57      mp->pkt.len = htonl( sizeof( u_int32_t ) + rc );
58      if ( send( s, &mp->pkt,
59       2 * sizeof( u_int32_t ) + rc, 0 ) < 0 )
60       error( 1, errno, "ошибка вызова send" );
61      mp->id = timeout( ( tofunc_t )lost_ACK, mp, Tl );
62     }
63   }


64   }
Инициализация
11-15 Так же, как и в программе xout2, соединяемся с удаленным хостои и инициализируем маски событий для tselect, устанавливая в них                                                              биты для дескрипторов stdin и сокета, который возвратилa tcp_client
16-17 Помечаем все структуры msgrec_t как свободные, записывая в поле длины пакета
18-25 Вызываем tselect точно так же, как select, только не передаем последний параметр (времени ожидания). Если tselect возвращает ошибку или нуль, то выводим диагностическое сообщение и заверша­ем программу. В отличие от select возврат нуля из tselect - свиде­тельство ошибки, так как все тайм-ауты обрабатываются внутри.
Обработка входных данных из сокета
26-32 При получении события чтения из сокета ожидаем подтверждение. В совете 6 говорилось о том, что нельзя применить recv в считывании ASCZ байт, поскольку, возможно, пришли еще не все данные. Нельзя воспользоваться и функцией типа readn, которая не возвращает управления до получения указанного числа байт, так как это противоречило бы событийно-управляемой архитектуре приложения, - ни одно собы­тие не может быть обработано, пока readn не вернет управления. Поэтому пытаемся прочесть столько данных, сколько необходимо для завер­шения обработки текущего подтверждения. В переменной cnt хранится число ранее прочитанных байт, поэтому ASCZ - cnt - это число недостающих байт.
33-35 Если общее число прочитанных байт меньше ASCZ, то возвращаемся к началу цикла и назначаем tselect ожидание прихода следующей партии данных или иного события. Если после только что сделанного вызова recv подтверждение получено, то сбрасываем cnt в нуль в ожидании следующего подтверждения (к этому моменту не было прочитано еще ни одного байта следующего подтверждения).
36-40 Далее, в соответствии с советом 11, выполняем проверку правильности полученных данных. Если сообщение - некорректное подтверждение, печатаем диагностическое сообщение и продолжаем работу. Возможно, здесь было бы правильнее завершить программу, так как удаленный хост послал неожиданные данные.


41- 42 Наконец, извлекаем из подтверждения признак сообщения, вызываем findmsgrec для получения указателя на структуру msgrec_t, ассоциированную с сообщением, и используем ее для отмены таймера, после чего освобождаем msgrec_t. Функции findmsgrec и freemsgrec приведены в листинге 3.20.
Обработка данных из стандартного ввода
51-57 Когда tselect сообщает о событии ввода из stdin, получаем структуру msgrec_t и считываем сообщение в пакет данных. Присваиваем сообщению порядковый номер, пользуясь счетчиком msgid, и сохраняем его в поле cookie пакета. Обратите внимание, что вызывать htonl не нужно, так как удаленный хост не анализирует признак, а возвращает его без изменения. Записываем в поля пакета полную длину сообщения вместе с признаком. На этот раз вызываем htonl, так как удаленный хост использует это поле для чтения оставшейся части сообщения (совет 28).
55-61 Посылаем подготовленный пакет удаленному хосту и взводим таймер ретрансмиссии, обращаясь к функции timeout.
Оставшиеся функции программы xout3 приведены в листинге 3.20.
Листинг 3.20. Вспомогательные функции программы xout3
1    msgrec_t *getfreerec( void )
2    {
3    msgrec_t *mp;
4    for ( mp = mr; mp < mr + MRSZ; mp++ )
5      if ( mp->pkt.len == -1 ) /* Запись свободна? */
6       return mp;
7    error(1,0, "getfreerec: исчерпан пул записей сообщений\n" );
8    return NULL; /* "Во избежание предупреждений компилятора.*/
9    }
10   msgrec_t   *findmsgrec(   u_int32_t mid  )
11   {
12   msgrec_t *mp;
13   for ( mp = mr; mp < mr + MRSZ; mp++ )
14   if ( mp->pkt.len != -1 && mp->pkt.cookie == mid )
15     return mp;
16   error (0, 0,"findmsgrec: нет сообщения, соответствующего ACK %d\n", mid);
17   return NULL;
18   }
19   void freemsgrec(   msgrec_t   *mp   )
20   {
21   if (  mp->pkt.len  ==  -1   )
22   error(1,0, "freemsgrec: запись сообщения уже освобождена\n" };
23   mp->pkt.len  =   -1;
24   }
25   gtatic void drop( msgrec_t *mp )


26   {
27   error( 0, 0, "Сообщение отбрасывается:   %s", mp->pkt.buf );
28   freemsgrec( mp );
29   }
30   static void lost_ACK( msgrec_t *mp )
31   {
32   error( 0, 0, "Повтор сообщения:   %s", mp->pkt.buf );
33   if ( send( s, &mp->pkt,
34     sizeof( u_int32_t ) + ntohl( mp->pkt.len ), 0 ) < 0 )
35     error ( 1, errno, " потерян АСК: ошибка вызова send" );
36   mp->id = timeout) ( tofunc_t )drop, mp, T2 );
37   }
getfreerec
1-9 Данная функция ищет свободную запись в таблице mr. Просматриваем последовательно весь массив, пока не найдем пакет с длиной -1. Это означает, что запись свободна. Если бы массив mr был больше, то можно было бы завести список свободных, как было сделано для записей типа tevent_t в листинге 3.15.
findmsgrec
10-18 Эта функция почти идентичная get f reerec, только на этот раз ищем запись с заданным признаком сообщения.
freemsgrec
19-24 Убедившись, что данная запись занята, устанавливаем длину пакета в -1, помечая тем самым, что теперь она свободна.
drop
25-29 Данная функция вызывается, если не пришло подтверждение на второе посланное сообщение (см. lost_ACK). Пишем в протокол диагнос­тику и отбрасываем запись, вызывая freemsgrec.
lost_ACK
30-37 Эта функция вызывается, если не пришло подтверждение на первое сообщение. Посылаем сообщение повторно и взводим новый таймер ре-трансмиссии, указывая, что при его срабатывании надо вызвать функцию drop.
Для тестирования xout3 напишем серверное приложение, которое случайным образом отбрасывает сообщения. Назовем этот сервер extsys (сокращение от external system - внешняя система). Его текст приведен в листинге 3.21.
Листинг 3.21. Внешняя система
extsys.c
1    #include "etcp.h"
2    #define COOKIESZ  4  /* Так установлено клиентом. */
3    int main ( int argc, char **argv )
4    {
5    SOCKET s;
6    SOCKET s1;
7    int rc;
8    char buf[ 128 ] ;
9    INIT();
10   s = tcp_server( NULL, argv[ 1 ] );
11   s1 = accept( s, NULL, NULL );


12   if ( !isvalidsock) s1 ) )
13     error( 1, errno, "ошибка вызова accept" );
!4   srand( 127 );
15   for ( ;; )
16   {
17     rc = readvrec( s1, buf, sizeof( buf ) );
18     if ( rc == 0 )
19      error( 1, 0, "клиент отсоединился\n" );
20     if ( rc < 0 )
21      error( 1, errno, "ошибка вызова recv" );
22     if ( rand() % 100 < 33 )
23      continue;
24     write! 1, buf + COOKIESZ, rc - COOKIESZ );
25     memmove( buf + 1, buf, COOKIESZ );
26     buf[ 0 ] = ' \006';
27     if ( send( s1, buf, 1 + COOKIESZ, 0 ) < 0 )
28      error( 1, errno, "ошибка вызова send" );
29   }
30   }
Инициализация
9- 14 Выполняем обычную инициализацию сервера и вызываем функцию srand для инициализации генератора случайных чисел.
Премечание: Функция rand из стандартной библиотеки С работает быстрои проста в применении, но имеет ряд нежелательных свойств. Хотя для демонстрации xout3 она вполне пригодна, но для серьезного моделирования нужно было бы воспользоваться более развитым генератором случайных чисел [Knuth 1998].
17-21 Вызываем функцию readvrec для чтения записи переменной длины, посланной xout3.
22-23 Случайным образом отбрасываем примерно треть получаемых сообщений.
24-28 Если сообщение не отброшено, то выводим его на stdout, сдвигаем в буфере признак на один символ вправо, добавляем в начало символ АСК и возвращаем подтверждение клиенту.
Вы тестировали xout3, запустив extsys в одном окне и воспользовавшись конвейером из совета 20 в другом (рис. 3.7).
Можно сделать следующие замечания по поводу работы xout3:
  • доставка сообщений по порядку не гарантирована. На примере сообщении 17 и 20 на рис. 3.8 вы видите, что повторно посланное сообщение нарушило порядок;

  • можно было увеличить число повторных попыток, добавив счетчик попыток в структуру msgrec_t и заставив функцию lost_ACK продолжать попытки отправить сообщение до исчерпания счетчика;

  • легко модифицировать xout3 так, чтобы она работала по протоколу UDP а не TCP. Это стало бы первым шагом на пути предоставления надежного UDP-сервиса (совет 8);



  • если бы приложение работало с большим числом сокетов (и использовало функцию tselect), то имело бы смысл вынести встроенный код геаdn в отдельную функцию. Такая функция могла бы получать на входе структуру, содержащую cnt, указатель на буфер ввода (или сам буфер) и адрес функции, которую нужно вызвать после получения полного сообщения; р в качестве примера xout3, пожалуй, выглядит чересчур искусственно, особенно в контексте совета 19, но так или иначе она иллюстрирует, как можно решить задачу, часто возникающую на практике.


  • bsd $ mp I xout3 localhost 9000
    xout3: Повтор сообщения: message 3
    xout3: Повтор сообщения: message 4
    xout3: Повтор сообщения: message 5
    xoutS: Сообщение отбрасывается: message 4
    xout3: Сообщение отбрасывается: message 5
    xout3: Повтор сообщения: message 11
    xout3: Повтор сообщения: message 14
    xout3: Сообщение отбрасывается: message 11
    xout3: Повтор сообщения: message 16
    xout3: Повтор сообщения: message 17
    xout3: Сообщение отбрасывается: message 14
    xout3: Повтор сообщения: message 19
    xout3: Повтор  сообщения: message 20
    xout3: Сообщение отбрасывается: message 16
    xout3: Сервер отсоединился
    Broken pipe
    bsd $
    bsd $ extsys 9000
    message 1
    message 2
    message 3
    message 6
    message 7
    message 8
    message 9
    message 10
    message 12
    message 13
    message 15
    message 18
    message 17
    message 21
    message 20
    message 23
    ^C сервер остановлен
    bsd $

    Рис. 3.7. Демонстрация xout 3

    Содержание раздела