Эффективное программирование TCP-IP

       

Пульсация


Задача проверки наличия соединения, неразрешимая с помощью механизма контролеров, легко решается путем реализации аналогичного механизма в самом приложении. Оптимальный метод зависит от приложения. Здесь вы можете полнее оценить гибкость, которая может быть достигнута при реализации на прикладном уровне. В качестве примеров рассмотрим два крайних случая:

  • клиент и сервер обмениваются сообщениями разных типов, каждое из которых имеет заголовок, идентифицирующий тип сообщения;
  • приложение передает данные в виде потока байтов без разбиения на записи.
  • Первый случай сравнительно несложен. Вводится новый тип сообщения MSG_HEARTBEAT. Получив такое сообщение, приложение возвращает его отправителю. Такой способ предоставляет большую свободу. Проверять наличие связи могут одна или обе стороны, причем только одна действительно посылает контрольное сообщение-пульс.

    Сначала рассмотрим заголовочный файл (листинг 2.23), который используют как клиент, так и сервер.

    Листинг 2.23. Заголовочный файл для реализации механизма пульсации

    1          #ifndef _HEARTBEAT

    2          #define _HEARTBEAT

    3          #efine MSG_TYPE1 1 /* Сообщение прикладного уровня. */

    4          #efine MSG_TYPE2 2/* Еще одно. */

    5          #efine MSG_HEARTBEAT3 /* Сообщение-пульс. */

    6          typedef struct/* Структура сообщения. */

    7          {



    8          u_int32_t type; /* MSG_TYPE1, ... */

    9          char data[ 2000 ] ;

    10        } msg_t;

    11        #define Tl 60 /* Время простоя перед отправкой пульса. */

    12        #define T2 10 /* Время ожидания ответа. */

    13        #endif /* _HEARTBEAT_H_ */

    3-5 С помощью этих констант определяются различные типы сообщений, которыми обмениваются клиент и сервер. Для данного примера нужно только сообщение MSG_HEARTBEAT.

    6-10 Здесь определяется структура сообщений, которыми обмениваются клиент и сервер. Здесь представляет интерес только поле type. Реальное приложение могло бы подстроить эту структуру под свои возможности. Подробнее это рассматривается в замечаниях к листингу 2.15 о смысле типа u_int32_t и об опасности предположений о способе упаковки структур.


    11 Данная константа определяет, сколько времени может простаивать соединение, прежде чем приложение начнет посылать контрольные сообщения-пульсы. Здесь произвольно выбрано 60 с, реальное же приложение должно подобрать значение, наиболее соответствующее потребностям и виду сети.

    12 Эта константа определяет, сколько времени клиент будет ждать ответа на контрольное сообщение.

    В листинге 2.24 приведен текст клиента, который инициирует посылку контрольных сообщений. Такой выбор абсолютно произволен, в качестве инициатора можно было выбрать сервер.

    Листинг 2.24. Клиент, посылающий контрольные сообщения-пульсы

    1    #include "etcp.h"

    2    #include "heartbeat.h"

    3    int main( int  argc, char **argv )

    4    {

    5    fd_set allfd;

    6    fd_set readfd;

    7    msg_t msg;

    8    struct timeval tv;

    9    SOCKET s;

    10   int rc;

    11   int heartbeats =0;

    12   int cnt = sizeof( msg );

    13   INIT();

    14   s = tcp_client( argv[ 1 ], argv[ 2 ] );

    15   FD_ZERO( &allfd } ;

    16   FD_SET( s, uallfd );

    17   tv.tv_sec = T1;

    18   tv.tv_usec =0;

    19   for ( ;; )

    20   {

    21     readfd = allfd;

    22     rc = select( s + 1, &readfd, NULL, NULL, &tv );

    23     if ( rc < 0 )

    24      error( 1, errno, "ошибка вызова select" );

    25     if ( rc == 0 )  /* Произошел тайм-аут. */

    26     {

    27      if ( ++ heartbeats > 3 )

    28       error( 1, 0, "соединения нет\n" );

    29      error( 0, 0, "посылаю пульс #%d\n" , heartbeats ) ;

    30      msg.type = htonl( MSG_HEARTBEAT );

    3!     rc = send( s, ( char * )&msg, sizeofl msg ), 0 );

    32      if ( rc < 0 )

    33       error( 1, errno, "ошибка вызова send" ) ;

    34      tv.tv_sec = T2;

    35      continue;

    36     )

    37     if ( !FD_ISSET( s, &readfd ) )

    38      error( 1, 0, "select вернул некорректный сокет\n" );

    39     rc = recv( s, ( char * )&msg + sizeof( msg ) - cnt,

    40      cnt, 0 ) ;

    41     if ( rc == 0 )

    42      error ( 1, 0, "сервер закончил работу\n" ) ,-



    43     if ( rc < 0 )

    44      error( 1, errno, "ошибка вызова recv" );

    45     heartbeats = 0;

    46     tv.tv_sec = T1;

    47     cnt -= rc; /* Встроенный readn. */

    48     if ( cnt > 0 )

    49      continue;

    50     cnt = sizeof( msg );

    51     /* Обработка сообщения. */

    52   }

    53   }

    Инициализация

    13-14 Выполняем стандартную инициализацию и соединяемся с сервером, адрес и номер порта которого заданы в командной строке.

    15-16 Задаем маску для системного вызова select, в которой выбран ваш сокет.

    17-18 Взводим таймер на Т1 секунд. Если за это время не было получено никакого сообщения, то select вернет управление с индикацией срабатывания таймера.

    21-22 Устанавливаем маску, выбирающую сокет, из которого читаем, после чего система блокирует программу в вызове select, пока не поступят данные либо не сработает таймер.

    Обработка тайм-аута

    27-28 Если послано подряд более трех контрольных пульсов и не получено ответа, то считается, что соединение «мертво». В этом примере просто завершаем работу, но реальное приложение могло бы предпринять более осмысленные действия.

    29-33 Если максимальное число последовательных контрольных пульсов не достигнуто, посылается новый пульс.

    34 -35 Устанавливаем таймер на Т2 секунд. Если за это время не получен ответ, то либо отправляется новый пульс, либо соединение признается «мертвым» в зависимости от значения переменной heartbeats.

    Обработка сообщения

    37-38 Если select вернул сокет, отличный от соединенного с сервером, to завершаемся с сообщением о фатальной ошибке.

    39-40 Вызываем recv для чтения одного сообщения. Эти строки, а также следующий за ними код, изменяющий значение переменной cnt, - не что иное, как встроенная версия функции readn. Она не может быть вызвана напрямую, поскольку заблокировала бы весь процесс на неопределенное время, нарушив тем самым работу механизма пульсации

    41-44 Если получаем признак конца файла или ошибку чтения, выводим диагностическое сообщение и завершаем сеанс.

    45-46 Поскольку только что получен ответ от сервера, сбрасывается счетчик пульсов в 0 и переустанавливается таймер на Т1 секунд.



    47- 50 Эти строки завершают встроенный вариант readn. Уменьшаем переменную cnt на число, равное количеству только что прочитанных байт. Если прочитано не все, то следует повторить цикл с вызова select. В противном случае заносится в cnt полная длина сообщения и завершается обработка только что принятого сообщения.

    Листинг 2.25 содержит текст сервера для этого примера. Здесь предполагается, что сервер также будет следить за состоянием соединения, но это не обязательно.

    Листинг 2.25. Сервер, отвечающий на контрольные сообщения-пульсы

    1    #include "etcp.h"

    2    #include "heartbeat.h"

    3    int main( int argc, char **argv )

    4    {

    5    fd_set allfd;

    6    fd_set readfd;

    7    msg_t msg;

    8    struct timeval tv;

    9    SOCKET s;

    10   SOCKET s1;

    11   int rc;

    12   int missed_heartbeats = 0;

    13   int cnt = sizeof( msg );

    14   INIT();

    15   s = tcp_server( NULL, argv[ 1 ] );

    16   s1 = accept( s, NULL, NULL ) ;

    17   if ( !isvalidsock( s1 ) )

    18     error( 1, errno, "ошибка вызова accept" );

    19   tv.tv_sec = T1 + T2;

    20   tv.tv_usec = 0;

    21   FD_ZERO( fcallfd );

    22   FD_SET( si, fiallfd ) ;

    23   for ( ;; )

    24   {

    25     readfd = allfd;

    26     rc = select( s1 + 1, &readfd, NULL, NULL, &tv );

    2.7    if ( rc < 0 }

    28      error( 1, errno, "ошибка вызова select" );

    29     if ( rc == 0 )  /* Произошел тайм-аут. */

    30     {

    31      if ( ++missed_heartbeats > 3 )

    32       errorf 1, 0, "соединение умерло\n" );

    33      error( 0, 0, "пропущен пульс #%d\n",

    34       missed_heartbeats );

    35      tv.tv_sec = T2;

    35      continue;

    37     }

    38     if ( !FD_ISSET( s1, &readfd ) )

    39      error( 1, 0, "select вернул некорректный сокет\n" );

    40     rc = recv( si, ( char * )&msg + sizeof( msg ) - cnt,

    41      cnt, 0 );

    42     if ( rc == 0 )

    43      errorf 1, 0, "клиент завершил работу\n" );

    44     if { rc < 0 )

    45      error( 1, errno, "ошибка вызова recv" );



    46     missed_heartbeats = 0;

    47     tv.tv_sec = T1 + T2;

    48     cnt -= rc;  /* Встроенный readn. */

    49     if ( cnt > 0 )

    50      continue;

    51     cnt = sizeof ( msg );

    52     switch ( ntohl( msg.type ) )

    53     {

    54      case MSG_TYPE1 :

    55       /* обработать сообщение типа TYPE1. */

    56       break;

    57      case MSG_TYPE2 :

    58       /* Обработать сообщение типа TYPE2. */

    59       break;

    60      case MSG_HEARTBEAT :

    61       rc = send( si, ( char * )&msg, sizeof( msg ), 0 );

    62       if ( rc < 0 )

    63        error( 1, errno, "ошибка вызова send" );

    64       break;

    65      default :

    66       error ( 1, 0, "неизвестный тип сообщения (%d)\n"',

    67        ntohl( msg.type ) );

    68     }

    69   }

    70   EXIT( 0 ) ;

    71   }

    Инициализация

    14-18 Выполняем стандартную инициализацию и принимаем соединение от клиента.

    19- 20 Взводим таймер на Т1 + Т2 секунд. Поскольку клиент посылает пульс после Т1 секунд неактивности, следует подождать немного больше- на Т2 секунд.

    21-22 Инициализируем маску для select, указывая в ней соединенный сокет, из которого происходит чтение.

    25-28 Вызываем select и проверяем возвращенное значение.

    Обработка тайм-аута

    31-32 Если пропущено более трех пульсов подряд, то соединение считаете «мертвым» - работа завершается. Как и клиент, реальный сервер мог бы предпринять в этом случае более осмысленные действия.

    35 Взводим таймер на Т2 секунд. К этому моменту клиент должен был бы посылать пульсы каждые Т2 секунд, так что если за это время ничего не получено, то необходимо увеличить счетчик пропущенных пульсов.

    Обработка сообщения

    38-39 Производим ту же проверку корректности сокета, что и в клиенте.

    40-41 Как и в клиенте, встраиваем код функции readn.

    42-45 Если recv возвращает признак конца файла или код ошибки, то печатаем диагностическое сообщение и выходим.

    46-47 Поскольку только что получено сообщение от клиента, соединение все еще живо, так что сбрасываем счетчик пропущенных пульсов в нуль и взводим таймер на Т1 + Т2 секунд.



    48- 51 Этот код, такой же, как в клиенте, завершает встроенную версию readn.

    60-64 Если это сообщение-пульс, то возвращаем его клиенту. Когда клиент получит сообщение, обе стороны будут знать, что соединение еще есть.

    Для тестирования этих программ запустим программу hb_server на машине spare, а программу hb_client - на машине bsd. После того как клиент соединится с сервером, отключим spare от сети. Вот что при этом будет напечатано.

    spare: $ hb_server 9000

    hb_server: пропущен пульс #1

    hb_server: пропущен пульс #2

    hb_server: пропущен пульс #3

    hb_server: соединения нет

    spare: $

    bsd: $ hb_client spare 9000

    hb_client: посылаю пульс #1

    hb_client: посылаю пульс #2

    hb_client: посылаю пульс #3

    hb_client: соединения нет

    bsd: $


    Содержание раздела