Как написать масштабируемый сервер на базе Tcp / Ip

148

Я нахожусь в стадии разработки, когда пишу новое приложение-службу Windows, которое принимает соединения TCP / IP для длительных соединений (то есть это не то же самое, что HTTP, где много коротких соединений, а клиент подключается и остается подключенным часами или днями или ещё недели).

Я ищу идеи для лучшего способа проектирования сетевой архитектуры. Мне нужно будет запустить хотя бы один поток для сервиса. Я рассматриваю возможность использования Asynch API (BeginRecieve и т. Д.), Поскольку я не знаю, сколько клиентов я подключу в любой момент времени (возможно, сотни). Я определенно не хочу, чтобы начать поток для каждого соединения.

Данные будут в основном поступать на клиенты с моего сервера, но иногда от клиентов будут отправляться некоторые команды. Это в первую очередь приложение для мониторинга, в котором мой сервер периодически отправляет данные о состоянии клиентам.

Любые предложения о том, как сделать это максимально масштабируемым? Основной рабочий процесс? Спасибо.

РЕДАКТИРОВАТЬ: Чтобы быть ясным, я ищу решения на основе .net (C #, если это возможно, но любой язык .net будет работать)

ЗАМЕТКА: Чтобы получить награду, я ожидаю большего, чем простой ответ. Мне нужен рабочий пример решения, либо как указатель на что-то, что я могу загрузить, либо как короткий пример в строке. И это должно быть на основе .net и Windows (любой язык .net приемлем)

РЕДАКТИРОВАТЬ: Я хочу поблагодарить всех, кто дал хорошие ответы. К сожалению, я мог принять только один, и я решил принять более известный метод Begin / End. Решение Esac вполне может быть лучше, но оно все еще достаточно новое, и я не знаю точно, как оно будет работать.

Я проголосовал за все ответы, которые я считаю хорошими, я хотел бы сделать больше для вас, ребята. Еще раз спасибо.

Эрик Фанкенбуш
источник
1
Вы абсолютно уверены, что это должно быть длительное соединение? Трудно сказать по предоставленной ограниченной информации, но я сделал бы это только в случае крайней необходимости.
Markt
Да, это должно быть долго. Данные должны обновляться в режиме реального времени, поэтому я не могу проводить периодический опрос, данные должны передаваться клиенту по мере его появления, что означает постоянное соединение.
Эрик Фанкенбуш
1
Это не веская причина. Http поддерживает долго работающие соединения просто отлично. Вы просто открываете соединение и ждете отклика (остановленный опрос). Это прекрасно работает для многих приложений в стиле AJAX и т. Д. Как вы думаете, работает gmail :-)
TFD
2
Gmail периодически опрашивает почту, не поддерживает долго работающее соединение. Это хорошо для электронной почты, где ответ в режиме реального времени не требуется.
Эрик Фанкенбуш
2
Опрос или вытягивание хорошо масштабируется, но быстро развивается задержка. Нажатие также не масштабируется, но помогает уменьшить или устранить задержку.
andrewbadera

Ответы:

92

Я написал нечто похожее на это в прошлом. Из моего исследования, проведенного несколько лет назад, я понял, что лучше всего написать собственную реализацию сокетов, используя асинхронные сокеты. Это означало, что клиенты, которые на самом деле ничего не делали, требовали относительно небольших ресурсов. Все, что происходит, обрабатывается пулом потоков .net.

Я написал это как класс, который управляет всеми соединениями для серверов.

Я просто использовал список для хранения всех клиентских подключений, но если вам нужен более быстрый поиск для больших списков, вы можете написать его так, как хотите.

private List<xConnection> _sockets;

Также вам нужен сокет, который прослушивает входящие соединения.

private System.Net.Sockets.Socket _serverSocket;

Метод start фактически запускает сокет сервера и начинает прослушивать любые входящие соединения.

public bool Start()
{
  System.Net.IPHostEntry localhost = System.Net.Dns.GetHostEntry(System.Net.Dns.GetHostName());
  System.Net.IPEndPoint serverEndPoint;
  try
  {
     serverEndPoint = new System.Net.IPEndPoint(localhost.AddressList[0], _port);
  }
  catch (System.ArgumentOutOfRangeException e)
  {
    throw new ArgumentOutOfRangeException("Port number entered would seem to be invalid, should be between 1024 and 65000", e);
  }
  try
  {
    _serverSocket = new System.Net.Sockets.Socket(serverEndPoint.Address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
   }
   catch (System.Net.Sockets.SocketException e)
   {
      throw new ApplicationException("Could not create socket, check to make sure not duplicating port", e);
    }
    try
    {
      _serverSocket.Bind(serverEndPoint);
      _serverSocket.Listen(_backlog);
    }
    catch (Exception e)
    {
       throw new ApplicationException("Error occured while binding socket, check inner exception", e);
    }
    try
    {
       //warning, only call this once, this is a bug in .net 2.0 that breaks if 
       // you're running multiple asynch accepts, this bug may be fixed, but
       // it was a major pain in the ass previously, so make sure there is only one
       //BeginAccept running
       _serverSocket.BeginAccept(new AsyncCallback(acceptCallback), _serverSocket);
    }
    catch (Exception e)
    {
       throw new ApplicationException("Error occured starting listeners, check inner exception", e);
    }
    return true;
 }

Хотелось бы отметить, что код обработки исключений выглядит плохо, но причина этого в том, что у меня был код подавления исключений, поэтому любые исключения будут подавлены и возвращены false если была установлена ​​опция конфигурации, но я хотел удалить ее для ради краткости.

_ServerSocket.BeginAccept (новый AsyncCallback (acceptCallback)), _serverSocket), приведенный выше, по существу устанавливает сокет нашего сервера для вызова метода acceptCallback при каждом подключении пользователя. Этот метод запускается из пула потоков .Net, который автоматически обрабатывает создание дополнительных рабочих потоков, если у вас много операций блокировки. Это должно оптимально обрабатывать любую нагрузку на сервер.

    private void acceptCallback(IAsyncResult result)
    {
       xConnection conn = new xConnection();
       try
       {
         //Finish accepting the connection
         System.Net.Sockets.Socket s = (System.Net.Sockets.Socket)result.AsyncState;
         conn = new xConnection();
         conn.socket = s.EndAccept(result);
         conn.buffer = new byte[_bufferSize];
         lock (_sockets)
         {
           _sockets.Add(conn);
         }
         //Queue recieving of data from the connection
         conn.socket.BeginReceive(conn.buffer, 0, conn.buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), conn);
         //Queue the accept of the next incomming connection
         _serverSocket.BeginAccept(new AsyncCallback(acceptCallback), _serverSocket);
       }
       catch (SocketException e)
       {
         if (conn.socket != null)
         {
           conn.socket.Close();
           lock (_sockets)
           {
             _sockets.Remove(conn);
           }
         }
         //Queue the next accept, think this should be here, stop attacks based on killing the waiting listeners
         _serverSocket.BeginAccept(new AsyncCallback(acceptCallback), _serverSocket);
       }
       catch (Exception e)
       {
         if (conn.socket != null)
         {
           conn.socket.Close();
           lock (_sockets)
           {
             _sockets.Remove(conn);
           }
         }
         //Queue the next accept, think this should be here, stop attacks based on killing the waiting listeners
         _serverSocket.BeginAccept(new AsyncCallback(acceptCallback), _serverSocket);
       }
     }

Приведенный выше код, по сути, только что завершил прием входящего соединения, очереди, BeginReceiveкоторая является обратным вызовом, который будет выполняться, когда клиент отправляет данные, а затем ставит в очередь следующееacceptCallback которое примет следующее входящее соединение клиента.

BeginReceiveВызов метода является то , что говорит сокет , что делать , когда он получает данные от клиента. Для BeginReceive, вам нужно дать ему байтовый массив, куда он будет копировать данные, когда клиент отправляет данные. ReceiveCallbackМетод будет вызван, который , как мы обрабатываем прием данных.

private void ReceiveCallback(IAsyncResult result)
{
  //get our connection from the callback
  xConnection conn = (xConnection)result.AsyncState;
  //catch any errors, we'd better not have any
  try
  {
    //Grab our buffer and count the number of bytes receives
    int bytesRead = conn.socket.EndReceive(result);
    //make sure we've read something, if we haven't it supposadly means that the client disconnected
    if (bytesRead > 0)
    {
      //put whatever you want to do when you receive data here

      //Queue the next receive
      conn.socket.BeginReceive(conn.buffer, 0, conn.buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), conn);
     }
     else
     {
       //Callback run but no data, close the connection
       //supposadly means a disconnect
       //and we still have to close the socket, even though we throw the event later
       conn.socket.Close();
       lock (_sockets)
       {
         _sockets.Remove(conn);
       }
     }
   }
   catch (SocketException e)
   {
     //Something went terribly wrong
     //which shouldn't have happened
     if (conn.socket != null)
     {
       conn.socket.Close();
       lock (_sockets)
       {
         _sockets.Remove(conn);
       }
     }
   }
 }

РЕДАКТИРОВАТЬ: В этом шаблоне я забыл упомянуть, что в этой области кода:

//put whatever you want to do when you receive data here

//Queue the next receive
conn.socket.BeginReceive(conn.buffer, 0, conn.buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), conn);

В общем, я бы сделал в коде все, что вы хотите, - это сделать повторную сборку пакетов в сообщения, а затем создать их как задания в пуле потоков. Таким образом, BeginReceive следующего блока от клиента не задерживается во время выполнения кода обработки сообщений.

Обратный вызов accept завершает чтение сокета данных путем вызова end receive. Это заполняет буфер, предоставленный в функции начала приема. Как только вы сделаете все, что хотите, там, где я оставил комментарий, мы вызываем следующий BeginReceiveметод, который снова запустит обратный вызов, если клиент отправит больше данных. Теперь вот действительно сложная часть: когда клиент отправляет данные, ваш обратный вызов приема может быть вызван только с частью сообщения. Сборка может стать очень и очень сложной. Я использовал свой собственный метод и создал для этого своего рода собственный протокол. Я оставил это, но если вы попросите, я могу добавить его. Этот обработчик был на самом деле самым сложным фрагментом кода, который я когда-либо писал.

public bool Send(byte[] message, xConnection conn)
{
  if (conn != null && conn.socket.Connected)
  {
    lock (conn.socket)
    {
    //we use a blocking mode send, no async on the outgoing
    //since this is primarily a multithreaded application, shouldn't cause problems to send in blocking mode
       conn.socket.Send(bytes, bytes.Length, SocketFlags.None);
     }
   }
   else
     return false;
   return true;
 }

Приведенный выше метод send фактически использует синхронный Sendвызов, для меня это было хорошо из-за размеров сообщений и многопоточной природы моего приложения. Если вы хотите отправить каждому клиенту, вам просто нужно пройтись по списку _sockets.

Класс xConnection, на который вы ссылаетесь выше, в основном является простой оболочкой для сокета, который включает в себя байтовый буфер, и в моей реализации некоторые дополнения.

public class xConnection : xBase
{
  public byte[] buffer;
  public System.Net.Sockets.Socket socket;
}

Также для справки вот те, которые usingя включаю, так как я всегда раздражаюсь, когда они не включены.

using System.Net.Sockets;

Я надеюсь, что это полезно, это может быть не самый чистый код, но это работает. Есть также некоторые нюансы в коде, которые вы должны быть утомлены при изменении. Для одного, только один BeginAcceptзвонил в любое время. Раньше там была очень досадная ошибка .net, которая была много лет назад, поэтому я не вспоминаю подробности.

Кроме того, в ReceiveCallbackкоде мы обрабатываем все, что получено из сокета, прежде чем ставить в очередь следующий прием. Это означает, что для одного сокета мы на самом деле только ReceiveCallbackодин раз в любой момент времени, и нам не нужно использовать синхронизацию потоков. Однако, если вы измените этот порядок для вызова следующего приема сразу после извлечения данных, что может быть немного быстрее, вам необходимо убедиться, что вы правильно синхронизировали потоки.

Кроме того, я много взломал мой код, но оставил суть происходящего на месте. Это должно стать хорошим началом для вашего дизайна. Оставьте комментарий, если у вас есть еще вопросы по этому поводу.

Кевин Нисбет
источник
1
Это хороший ответ, Кевин .. похоже, вы на пути к получению награды. :)
Эрик Фанкенбуш
6
Я не знаю, почему это самый высокий голос. Начало * Конец * - это не самый быстрый способ создания сетей в C # и не самый масштабируемый способ. Это быстрее, чем синхронно, но в Windows происходит множество операций, которые действительно замедляют этот сетевой путь.
esac
6
Имейте в виду, что esac написал в предыдущем комментарии. Шаблон begin-end, вероятно, будет работать для вас до определенного момента, черт возьми, мой код в настоящее время использует begin-end, но в .net 3.5 есть улучшения по сравнению с его ограничениями. Мне плевать на награду, но я бы порекомендовал вам прочитать ссылку в моем ответе, даже если вы реализуете этот подход. «Улучшения производительности сокетов в версии 3.5»
jvanderh
1
Я просто хотел добавить их, так как, возможно, я не был достаточно ясен, это код эпохи .net 2.0, где я считаю, что это был очень жизнеспособный образец. Тем не менее, ответ esac выглядит несколько более современным, если он нацелен на .net 3.5, единственное, что у меня есть, это бросок событий :), но это легко можно изменить. Кроме того, я провел тестирование пропускной способности с этим кодом, и на двухъядерном операторе 2Ghz смог максимально использовать 100 Мбит / с Ethernet, и это добавило слой шифрования поверх этого кода.
Кевин Нисбет
1
@KevinNisbet Я знаю, что уже довольно поздно, но для любого, кто использует этот ответ для разработки своих собственных серверов, отправка также должна быть асинхронной, потому что в противном случае вы открываете себя для возможности тупика. Если обе стороны записывают данные, которые заполняют их соответствующие буферы, Sendметоды будут блокироваться бесконечно в обе стороны, потому что никто не читает входные данные.
Luaan
83

Есть много способов выполнения сетевых операций в C #. Все они используют разные механизмы под капотом и, таким образом, страдают от серьезных проблем с производительностью с высоким параллелизмом. Операции Begin * являются одними из тех, которые многие ошибочно принимают за то, что они являются самым быстрым / быстрым способом создания сетей.

Для решения этих проблем они представили набор * Async методов: от MSDN http://msdn.microsoft.com/en-us/library/system.net.sockets.socketasynceventargs.aspx

Класс SocketAsyncEventArgs является частью набора улучшений класса System.Net.Sockets .. ::. Socket, которые предоставляют альтернативный асинхронный шаблон, который может использоваться специализированными высокопроизводительными приложениями сокетов. Этот класс был специально разработан для приложений сетевого сервера, которые требуют высокой производительности. Приложение может использовать расширенный асинхронный шаблон исключительно или только в целевых горячих областях (например, при получении больших объемов данных).

Главной особенностью этих улучшений является предотвращение повторного выделения и синхронизации объектов во время асинхронного ввода-вывода с большим объемом. Шаблон проектирования Begin / End, в настоящее время реализуемый классом System.Net.Sockets .. ::. Socket, требует, чтобы объект System .. ::. IAsyncResult был выделен для каждой асинхронной операции сокета.

Под прикрытием * Async API использует порты завершения ввода-вывода, которые являются самым быстрым способом выполнения сетевых операций, см. Http://msdn.microsoft.com/en-us/magazine/cc302334.aspx

И просто чтобы помочь вам, я включаю исходный код для сервера Telnet, который я написал с помощью * Async API. Я включаю только соответствующие части. Также следует отметить, что вместо обработки данных в строке я предпочитаю помещать их в очередь без блокировки (без ожидания), которая обрабатывается в отдельном потоке. Обратите внимание, что я не включаю соответствующий класс Pool, который является простым пулом, который создаст новый объект, если он пуст, и класс Buffer, который является просто саморасширяющимся буфером, который на самом деле не нужен, если вы не получаете неопределенный количество данных. Если вам нужна дополнительная информация, не стесняйтесь, присылайте мне в личку.

 public class Telnet
{
    private readonly Pool<SocketAsyncEventArgs> m_EventArgsPool;
    private Socket m_ListenSocket;

    /// <summary>
    /// This event fires when a connection has been established.
    /// </summary>
    public event EventHandler<SocketAsyncEventArgs> Connected;

    /// <summary>
    /// This event fires when a connection has been shutdown.
    /// </summary>
    public event EventHandler<SocketAsyncEventArgs> Disconnected;

    /// <summary>
    /// This event fires when data is received on the socket.
    /// </summary>
    public event EventHandler<SocketAsyncEventArgs> DataReceived;

    /// <summary>
    /// This event fires when data is finished sending on the socket.
    /// </summary>
    public event EventHandler<SocketAsyncEventArgs> DataSent;

    /// <summary>
    /// This event fires when a line has been received.
    /// </summary>
    public event EventHandler<LineReceivedEventArgs> LineReceived;

    /// <summary>
    /// Specifies the port to listen on.
    /// </summary>
    [DefaultValue(23)]
    public int ListenPort { get; set; }

    /// <summary>
    /// Constructor for Telnet class.
    /// </summary>
    public Telnet()
    {           
        m_EventArgsPool = new Pool<SocketAsyncEventArgs>();
        ListenPort = 23;
    }

    /// <summary>
    /// Starts the telnet server listening and accepting data.
    /// </summary>
    public void Start()
    {
        IPEndPoint endpoint = new IPEndPoint(0, ListenPort);
        m_ListenSocket = new Socket(endpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);

        m_ListenSocket.Bind(endpoint);
        m_ListenSocket.Listen(100);

        //
        // Post Accept
        //
        StartAccept(null);
    }

    /// <summary>
    /// Not Yet Implemented. Should shutdown all connections gracefully.
    /// </summary>
    public void Stop()
    {
        //throw (new NotImplementedException());
    }

    //
    // ACCEPT
    //

    /// <summary>
    /// Posts a requests for Accepting a connection. If it is being called from the completion of
    /// an AcceptAsync call, then the AcceptSocket is cleared since it will create a new one for
    /// the new user.
    /// </summary>
    /// <param name="e">null if posted from startup, otherwise a <b>SocketAsyncEventArgs</b> for reuse.</param>
    private void StartAccept(SocketAsyncEventArgs e)
    {
        if (e == null)
        {
            e = m_EventArgsPool.Pop();
            e.Completed += Accept_Completed;
        }
        else
        {
            e.AcceptSocket = null;
        }

        if (m_ListenSocket.AcceptAsync(e) == false)
        {
            Accept_Completed(this, e);
        }
    }

    /// <summary>
    /// Completion callback routine for the AcceptAsync post. This will verify that the Accept occured
    /// and then setup a Receive chain to begin receiving data.
    /// </summary>
    /// <param name="sender">object which posted the AcceptAsync</param>
    /// <param name="e">Information about the Accept call.</param>
    private void Accept_Completed(object sender, SocketAsyncEventArgs e)
    {
        //
        // Socket Options
        //
        e.AcceptSocket.NoDelay = true;

        //
        // Create and setup a new connection object for this user
        //
        Connection connection = new Connection(this, e.AcceptSocket);

        //
        // Tell the client that we will be echo'ing data sent
        //
        DisableEcho(connection);

        //
        // Post the first receive
        //
        SocketAsyncEventArgs args = m_EventArgsPool.Pop();
        args.UserToken = connection;

        //
        // Connect Event
        //
        if (Connected != null)
        {
            Connected(this, args);
        }

        args.Completed += Receive_Completed;
        PostReceive(args);

        //
        // Post another accept
        //
        StartAccept(e);
    }

    //
    // RECEIVE
    //    

    /// <summary>
    /// Post an asynchronous receive on the socket.
    /// </summary>
    /// <param name="e">Used to store information about the Receive call.</param>
    private void PostReceive(SocketAsyncEventArgs e)
    {
        Connection connection = e.UserToken as Connection;

        if (connection != null)
        {
            connection.ReceiveBuffer.EnsureCapacity(64);
            e.SetBuffer(connection.ReceiveBuffer.DataBuffer, connection.ReceiveBuffer.Count, connection.ReceiveBuffer.Remaining);

            if (connection.Socket.ReceiveAsync(e) == false)
            {
                Receive_Completed(this, e);
            }              
        }
    }

    /// <summary>
    /// Receive completion callback. Should verify the connection, and then notify any event listeners
    /// that data has been received. For now it is always expected that the data will be handled by the
    /// listeners and thus the buffer is cleared after every call.
    /// </summary>
    /// <param name="sender">object which posted the ReceiveAsync</param>
    /// <param name="e">Information about the Receive call.</param>
    private void Receive_Completed(object sender, SocketAsyncEventArgs e)
    {
        Connection connection = e.UserToken as Connection;

        if (e.BytesTransferred == 0 || e.SocketError != SocketError.Success || connection == null)
        {
            Disconnect(e);
            return;
        }

        connection.ReceiveBuffer.UpdateCount(e.BytesTransferred);

        OnDataReceived(e);

        HandleCommand(e);
        Echo(e);

        OnLineReceived(connection);

        PostReceive(e);
    }

    /// <summary>
    /// Handles Event of Data being Received.
    /// </summary>
    /// <param name="e">Information about the received data.</param>
    protected void OnDataReceived(SocketAsyncEventArgs e)
    {
        if (DataReceived != null)
        {                
            DataReceived(this, e);
        }
    }

    /// <summary>
    /// Handles Event of a Line being Received.
    /// </summary>
    /// <param name="connection">User connection.</param>
    protected void OnLineReceived(Connection connection)
    {
        if (LineReceived != null)
        {
            int index = 0;
            int start = 0;

            while ((index = connection.ReceiveBuffer.IndexOf('\n', index)) != -1)
            {
                string s = connection.ReceiveBuffer.GetString(start, index - start - 1);
                s = s.Backspace();

                LineReceivedEventArgs args = new LineReceivedEventArgs(connection, s);
                Delegate[] delegates = LineReceived.GetInvocationList();

                foreach (Delegate d in delegates)
                {
                    d.DynamicInvoke(new object[] { this, args });

                    if (args.Handled == true)
                    {
                        break;
                    }
                }

                if (args.Handled == false)
                {
                    connection.CommandBuffer.Enqueue(s);
                }

                start = index;
                index++;
            }

            if (start > 0)
            {
                connection.ReceiveBuffer.Reset(0, start + 1);
            }
        }
    }

    //
    // SEND
    //

    /// <summary>
    /// Overloaded. Sends a string over the telnet socket.
    /// </summary>
    /// <param name="connection">Connection to send data on.</param>
    /// <param name="s">Data to send.</param>
    /// <returns>true if the data was sent successfully.</returns>
    public bool Send(Connection connection, string s)
    {
        if (String.IsNullOrEmpty(s) == false)
        {
            return Send(connection, Encoding.Default.GetBytes(s));
        }

        return false;
    }

    /// <summary>
    /// Overloaded. Sends an array of data to the client.
    /// </summary>
    /// <param name="connection">Connection to send data on.</param>
    /// <param name="data">Data to send.</param>
    /// <returns>true if the data was sent successfully.</returns>
    public bool Send(Connection connection, byte[] data)
    {
        return Send(connection, data, 0, data.Length);
    }

    public bool Send(Connection connection, char c)
    {
        return Send(connection, new byte[] { (byte)c }, 0, 1);
    }

    /// <summary>
    /// Sends an array of data to the client.
    /// </summary>
    /// <param name="connection">Connection to send data on.</param>
    /// <param name="data">Data to send.</param>
    /// <param name="offset">Starting offset of date in the buffer.</param>
    /// <param name="length">Amount of data in bytes to send.</param>
    /// <returns></returns>
    public bool Send(Connection connection, byte[] data, int offset, int length)
    {
        bool status = true;

        if (connection.Socket == null || connection.Socket.Connected == false)
        {
            return false;
        }

        SocketAsyncEventArgs args = m_EventArgsPool.Pop();
        args.UserToken = connection;
        args.Completed += Send_Completed;
        args.SetBuffer(data, offset, length);

        try
        {
            if (connection.Socket.SendAsync(args) == false)
            {
                Send_Completed(this, args);
            }
        }
        catch (ObjectDisposedException)
        {                
            //
            // return the SocketAsyncEventArgs back to the pool and return as the
            // socket has been shutdown and disposed of
            //
            m_EventArgsPool.Push(args);
            status = false;
        }

        return status;
    }

    /// <summary>
    /// Sends a command telling the client that the server WILL echo data.
    /// </summary>
    /// <param name="connection">Connection to disable echo on.</param>
    public void DisableEcho(Connection connection)
    {
        byte[] b = new byte[] { 255, 251, 1 };
        Send(connection, b);
    }

    /// <summary>
    /// Completion callback for SendAsync.
    /// </summary>
    /// <param name="sender">object which initiated the SendAsync</param>
    /// <param name="e">Information about the SendAsync call.</param>
    private void Send_Completed(object sender, SocketAsyncEventArgs e)
    {
        e.Completed -= Send_Completed;              
        m_EventArgsPool.Push(e);
    }        

    /// <summary>
    /// Handles a Telnet command.
    /// </summary>
    /// <param name="e">Information about the data received.</param>
    private void HandleCommand(SocketAsyncEventArgs e)
    {
        Connection c = e.UserToken as Connection;

        if (c == null || e.BytesTransferred < 3)
        {
            return;
        }

        for (int i = 0; i < e.BytesTransferred; i += 3)
        {
            if (e.BytesTransferred - i < 3)
            {
                break;
            }

            if (e.Buffer[i] == (int)TelnetCommand.IAC)
            {
                TelnetCommand command = (TelnetCommand)e.Buffer[i + 1];
                TelnetOption option = (TelnetOption)e.Buffer[i + 2];

                switch (command)
                {
                    case TelnetCommand.DO:
                        if (option == TelnetOption.Echo)
                        {
                            // ECHO
                        }
                        break;
                    case TelnetCommand.WILL:
                        if (option == TelnetOption.Echo)
                        {
                            // ECHO
                        }
                        break;
                }

                c.ReceiveBuffer.Remove(i, 3);
            }
        }          
    }

    /// <summary>
    /// Echoes data back to the client.
    /// </summary>
    /// <param name="e">Information about the received data to be echoed.</param>
    private void Echo(SocketAsyncEventArgs e)
    {
        Connection connection = e.UserToken as Connection;

        if (connection == null)
        {
            return;
        }

        //
        // backspacing would cause the cursor to proceed beyond the beginning of the input line
        // so prevent this
        //
        string bs = connection.ReceiveBuffer.ToString();

        if (bs.CountAfterBackspace() < 0)
        {
            return;
        }

        //
        // find the starting offset (first non-backspace character)
        //
        int i = 0;

        for (i = 0; i < connection.ReceiveBuffer.Count; i++)
        {
            if (connection.ReceiveBuffer[i] != '\b')
            {
                break;
            }
        }

        string s = Encoding.Default.GetString(e.Buffer, Math.Max(e.Offset, i), e.BytesTransferred);

        if (connection.Secure)
        {
            s = s.ReplaceNot("\r\n\b".ToCharArray(), '*');
        }

        s = s.Replace("\b", "\b \b");

        Send(connection, s);
    }

    //
    // DISCONNECT
    //

    /// <summary>
    /// Disconnects a socket.
    /// </summary>
    /// <remarks>
    /// It is expected that this disconnect is always posted by a failed receive call. Calling the public
    /// version of this method will cause the next posted receive to fail and this will cleanup properly.
    /// It is not advised to call this method directly.
    /// </remarks>
    /// <param name="e">Information about the socket to be disconnected.</param>
    private void Disconnect(SocketAsyncEventArgs e)
    {
        Connection connection = e.UserToken as Connection;

        if (connection == null)
        {
            throw (new ArgumentNullException("e.UserToken"));
        }

        try
        {
            connection.Socket.Shutdown(SocketShutdown.Both);
        }
        catch
        {
        }

        connection.Socket.Close();

        if (Disconnected != null)
        {
            Disconnected(this, e);
        }

        e.Completed -= Receive_Completed;
        m_EventArgsPool.Push(e);
    }

    /// <summary>
    /// Marks a specific connection for graceful shutdown. The next receive or send to be posted
    /// will fail and close the connection.
    /// </summary>
    /// <param name="connection"></param>
    public void Disconnect(Connection connection)
    {
        try
        {
            connection.Socket.Shutdown(SocketShutdown.Both);
        }
        catch (Exception)
        {
        }            
    }

    /// <summary>
    /// Telnet command codes.
    /// </summary>
    internal enum TelnetCommand
    {
        SE = 240,
        NOP = 241,
        DM = 242,
        BRK = 243,
        IP = 244,
        AO = 245,
        AYT = 246,
        EC = 247,
        EL = 248,
        GA = 249,
        SB = 250,
        WILL = 251,
        WONT = 252,
        DO = 253,
        DONT = 254,
        IAC = 255
    }

    /// <summary>
    /// Telnet command options.
    /// </summary>
    internal enum TelnetOption
    {
        Echo = 1,
        SuppressGoAhead = 3,
        Status = 5,
        TimingMark = 6,
        TerminalType = 24,
        WindowSize = 31,
        TerminalSpeed = 32,
        RemoteFlowControl = 33,
        LineMode = 34,
        EnvironmentVariables = 36
    }
}
ESAC
источник
Это довольно простой и простой пример. Спасибо. Я собираюсь оценить плюсы и минусы каждого метода.
Эрик Фанкенбуш
У меня не было возможности проверить это, но по какой-то причине у меня возникает смутное ощущение состояния гонки. Во-первых, если вы получаете много сообщений, я не знаю, что события будут обрабатываться по порядку (может быть, это не важно для приложения пользователя, но следует отметить), или я могу ошибаться, и события будут обрабатываться по порядку. Во-вторых, возможно, я его пропустил, но разве нет риска, что буфер будет перезаписан очищен, пока DataReceived все еще работает, если это занимает много времени? Если эти возможно неоправданные проблемы будут решены, я думаю, что это очень хорошее современное решение.
Кевин Нисбет
1
В моем случае для моего сервера telnet 100%, ДА, они в порядке. Ключевым моментом является установка правильного метода обратного вызова перед вызовом AcceptAsync, ReceiveAsync и т. Д. В моем случае я выполняю SendAsync в отдельном потоке, поэтому, если он изменяется для выполнения шаблона Accept / Send / Receive / Send / Receive / Disconnect, тогда это нужно будет изменить.
Esac
1
Точка № 2 также должна учитываться. Я храню свой объект «Connection» в контексте SocketAsyncEventArgs. Это означает, что у меня есть только один буфер приема на соединение. Я не публикую еще одно получение с этим SocketAsyncEventArgs до тех пор, пока DataReceived не будет завершен, поэтому никакие дополнительные данные не могут быть прочитаны до тех пор, пока он не будет завершен. Я СОВЕТУЮ, что над этими данными не будет длительных операций. Я фактически перемещаю весь буфер всех полученных данных в очередь без блокировки и затем обрабатываю их в отдельном потоке. Это обеспечивает низкую задержку в сетевой части.
Esac
1
В дополнение к этому, я написал модульные тесты и нагрузочные тесты для этого кода, и, поскольку я увеличил пользовательскую нагрузку с 1 пользователя до 250 пользователей (в одноядерной системе, 4 ГБ ОЗУ), время отклика для 100 байтов (1 пакет) и 10000 байт (3 пакета) остались неизменными на протяжении всей кривой пользовательской нагрузки.
Esac
46

Раньше было действительно хорошее обсуждение масштабируемого TCP / IP с использованием .NET, написанное Крисом Маллинсом из Coversant, к сожалению, похоже, что его блог исчез из прежнего местоположения, поэтому я постараюсь собрать воедино его советы из памяти (некоторые полезные комментарии его появляются в этой теме: C ++ vs. C #: разработка сервера IOCP с высокой степенью масштабируемости )

Прежде всего, обратите внимание, что и при использовании, Begin/Endи в Asyncметодах Socketкласса используются порты завершения ввода-вывода (IOCP) для обеспечения масштабируемости. Это делает гораздо большую разницу (при правильном использовании; см. Ниже) в масштабируемости, чем какой из двух методов вы фактически выбираете для реализации своего решения.

Посты Криса Маллинса были основаны на использовании Begin/End, с которым я лично сталкивался. Обратите внимание, что Крис создал решение, основанное на этом, которое масштабировало до 10 000 одновременных клиентских подключений на 32-разрядной машине с 2 ГБ памяти и до 100 000 на 64-разрядной платформе с достаточным объемом памяти. Исходя из собственного опыта работы с этой техникой (хотя это далеко не такая нагрузка), у меня нет оснований сомневаться в этих показательных показателях.

IOCP против потоковых соединений или примитивов select

Причина, по которой вы хотите использовать механизм, который использует IOCP, заключается в том, что он использует очень низкоуровневый пул потоков Windows, который не пробуждает какие-либо потоки, пока на канале ввода-вывода не появятся фактические данные, которые вы пытаетесь прочитать ( обратите внимание, что IOCP можно использовать и для файлового ввода-вывода). Преимущество этого состоит в том, что Windows не нужно переключаться на поток только для того, чтобы обнаружить, что в любом случае еще нет данных, так что это уменьшает количество переключений контекста, которые ваш сервер должен будет сделать, до минимально необходимого.

Переключение контекста - это то, что определенно убьет механизм «поток на соединение», хотя это жизнеспособное решение, если вы имеете дело только с несколькими дюжинами соединений. Этот механизм, однако, ни в коем случае не является «масштабируемым».

Важные соображения при использовании IOCP

объем памяти

Прежде всего, важно понимать, что IOCP может легко привести к проблемам с памятью в .NET, если ваша реализация слишком наивна. Каждый BeginReceiveвызов IOCP приведет к «закреплению» буфера, в который вы читаете. Хорошее объяснение причин этой проблемы см. В блоге Юнь Джина: OutOfMemoryException и Pinning .

К счастью, этой проблемы можно избежать, но она требует некоторого компромисса. Предлагаемое решение состоит в том, чтобы выделить большой byte[]буфер при запуске приложения (или закрыть его) размером не менее 90 КБ или около того (по состоянию на .NET 2 требуемый размер может быть больше в более поздних версиях). Причина для этого заключается в том, что большие выделения памяти автоматически оказываются в некомпактном сегменте памяти (куча больших объектов), который эффективно автоматически закрепляется. Выделив один большой буфер при запуске, вы убедитесь, что этот блок неподвижной памяти находится по относительно «низкому адресу», где он не будет мешать и вызывать фрагментацию.

Затем вы можете использовать смещения для сегментирования этого большого буфера в отдельные области для каждого соединения, которое должно считывать некоторые данные. Это где компромисс вступает в игру; так как этот буфер должен быть предварительно выделен, вам нужно будет решить, сколько буферного пространства вам нужно для каждого соединения, и какой верхний предел вы хотите установить для числа соединений, которые вы хотите масштабировать (или вы можете реализовать абстракцию). который может выделить дополнительные закрепленные буферы, когда они вам понадобятся).

Простейшим решением было бы назначить каждому соединению один байт с уникальным смещением в этом буфере. Затем вы можете сделать BeginReceiveвызов для считывания одного байта и выполнить остальное чтение в результате полученного вами обратного вызова.

обработка

Когда вы получаете обратный вызов от Beginсделанного вами вызова, очень важно понимать, что код обратного вызова будет выполняться в низкоуровневом потоке IOCP. Абсолютно необходимо избегать длительных операций в этом обратном вызове. Использование этих потоков для сложной обработки убьет вашу масштабируемость так же эффективно, как и использование потокового соединения.

Предлагаемое решение состоит в том, чтобы использовать обратный вызов только для постановки в очередь рабочего элемента для обработки входящих данных, которые будут выполнены в каком-то другом потоке. Избегайте любых потенциально блокирующих операций внутри обратного вызова, чтобы поток IOCP мог вернуться в свой пул как можно быстрее. В .NET 4.0 я бы предложил самое простое решение - создать a Task, указав ему ссылку на сокет клиента и копию первого байта, который уже был прочитан BeginReceiveвызовом. Затем эта задача отвечает за чтение всех данных из сокета, представляющих обрабатываемый вами запрос, его выполнение, а затем BeginReceiveповторный вызов, чтобы снова поставить в очередь сокет для IOCP. До .NET 4.0 вы можете использовать ThreadPool или создать собственную реализацию многопоточной очереди.

Резюме

По сути, я бы предложил использовать пример кода Кевина для этого решения со следующими добавленными предупреждениями:

  • Убедитесь, что буфер, который вы передаете, BeginReceiveуже «закреплен»
  • Убедитесь, что обратный вызов, который вы передаете, BeginReceiveне делает ничего больше, чем ставит в очередь задачу для обработки фактической обработки входящих данных.

Когда вы сделаете это, я не сомневаюсь, что вы сможете повторить результаты Криса, увеличив число потенциальных клиентов до сотен тысяч одновременно (при условии правильного оборудования и эффективной реализации вашего собственного кода обработки, конечно);

jerryjvl
источник
1
Чтобы закрепить меньший блок памяти, можно использовать метод Alloc объекта GCHandle для закрепления буфера. Как только это будет сделано, UnsafeAddrOfPinnedArrayElement объекта Marshal может быть использован для получения указателя на буфер. Например: GCHandle gchTheCards = GCHandle.Alloc (TheData, GCHandleType.Pinned); IntPtr pAddr = Marshal.UnsafeAddrOfPinnedArrayElement (TheData, 0); (sbyte *) pTheData = (sbyte *) pAddr.ToPointer ();
Боб Брайан
@BobBryan Если я не пропущу тонкое замечание, которое вы пытаетесь высказать, этот подход на самом деле не помогает решить проблему, которую мое решение пытается решить путем выделения больших блоков, а именно потенциальной возможности существенной фрагментации памяти, присущей повторному выделению небольших закрепленных блоков. памяти.
Jerryjvl
Ну, дело в том, что вам не нужно выделять большой блок, чтобы сохранить его в памяти. Вы можете выделить меньшие блоки и использовать описанную выше технику, чтобы закрепить их в памяти, чтобы избежать перемещения их gc. Вы можете сохранить ссылку на каждый из меньших блоков, так же, как вы сохраняете ссылку на один больший блок и использовать их по мере необходимости. Любой подход верен - я просто указал, что вам не нужно использовать очень большой буфер. Но, сказав, что иногда использование очень большого буфера является лучшим способом, так как gc будет обрабатывать его более эффективно.
Боб Брайан
@BobBryan, поскольку закрепление буфера происходит автоматически, когда вы вызываете BeginReceive, закрепление здесь не является существенной точкой; эффективность была;) ... и это особенно важно при попытке написать масштабируемый сервер, поэтому необходимо выделять большие блоки для использования в буферном пространстве.
Jerryjvl
@jerryjvl Извините, что поднял действительно старый вопрос, однако недавно я обнаружил эту проблему с асинхронными методами BeginXXX / EndXXX. Это отличный пост, но на поиски ушло много копаний. Мне нравится ваше предлагаемое решение, но я не понимаю его части: «Затем вы можете сделать вызов BeginReceive для считывания одного байта и выполнить остальное чтение в результате полученного вами обратного вызова». Что вы подразумеваете под выполнением остальной части подготовки в результате обратного вызова, который вы получаете?
Mausimo
22

Вы уже получили большую часть ответа через примеры кода выше. Использование асинхронной операции ввода-вывода - абсолютно верный путь. Асинхронный ввод-вывод - это способ, которым Win32 спроектирован для внутреннего масштабирования. Максимально возможная производительность, которую вы можете получить, достигается с помощью портов завершения, привязки сокетов к портам завершения и наличию пула потоков, ожидающих завершения порта завершения. Общепринятым является наличие 2-4 потоков на процессор (ядро), ожидающих завершения. Я настоятельно рекомендую ознакомиться с этими тремя статьями Рика Викика из команды Windows Performance:

  1. Разработка приложений для производительности - часть 1
  2. Разработка приложений для производительности - часть 2
  3. Разработка приложений для производительности - часть 3

Указанные статьи посвящены в основном нативному Windows API, но их обязательно нужно прочесть всем, кто пытается освоить масштабируемость и производительность. У них тоже есть некоторые заметки по управляемой стороне вещей.

Второе, что вам нужно сделать, - это прочитать книгу « Повышение производительности и масштабируемости приложений .NET» , доступную онлайн. В главе 5 вы найдете полезные и полезные советы по использованию потоков, асинхронных вызовов и блокировок. Но настоящие жемчужины есть в главе 17, где вы найдете такие полезные вещи, как практическое руководство по настройке пула потоков. У моих приложений были серьезные проблемы, пока я не настроил maxIothreads / maxWorkerThreads согласно рекомендациям в этой главе.

Вы говорите, что хотите создать чистый TCP-сервер, поэтому мой следующий пункт является ложным. Однако , если вы оказались загнаны в угол и используете класс WebRequest и его производные, имейте в виду, что за этой дверью стоит дракон: ServicePointManager . Это класс конфигурации, у которого есть одна цель в жизни: подорвать вашу производительность. Убедитесь, что вы освобождаете свой сервер от искусственно навязанного ServicePoint.ConnectionLimit, иначе ваше приложение никогда не масштабируется (я позволю вам узнать, какое значение по умолчанию ...). Вы также можете пересмотреть политику по умолчанию для отправки заголовка Expect100Continue в запросах http.

Теперь о стороне API, управляемой с помощью сокетов ядра, довольно легко на стороне отправки, но они значительно сложнее на стороне приема. Для достижения высокой пропускной способности и масштаба вы должны убедиться, что сокет не контролируется потоком, потому что у вас нет буфера, размещенного для приема. В идеале для высокой производительности вы должны отправлять вперед 3-4 буфера и размещать новые буферы, как только вы получите один обратно ( до того, как вы обработаете тот, который был возвращен), чтобы вы всегда были уверены, что в сокете всегда есть место для хранения данных, поступающих из сети. Вы поймете, почему вы, вероятно, не сможете достичь этого в ближайшее время.

Когда вы закончите играть с API BeginRead / BeginWrite и начнете серьезную работу, вы поймете, что вам нужна безопасность вашего трафика, т.е. Проверка подлинности NTLM / Kerberos и шифрование трафика или, по крайней мере, защита от вмешательства в трафик. Для этого вы используете встроенный System.Net.Security.NegotiateStream (или SslStream, если вам нужно пересечь разрозненные домены). Это означает, что вместо того, чтобы полагаться на асинхронные операции с прямым сокетом, вы будете полагаться на асинхронные операции AuthenticatedStream. Как только вы получаете сокет (либо из connect на клиенте, либо из accept на сервере), вы создаете поток в сокете и отправляете его для аутентификации, вызывая BeginAuthenticateAsClient или BeginAuthenticateAsServer. После завершения аутентификации (по крайней мере, ваш сейф от родного сумасшествия InitiateSecurityContext / AcceptSecurityContext ...) вы выполните авторизацию, проверив свойство RemoteIdentity вашего потока Authenticated и выполнив любую проверку ACL, которую должен поддерживать ваш продукт. После этого вы будете отправлять сообщения с помощью BeginWrite и получать их с помощью BeginRead. Эта проблема, о которой я говорил ранее, заключается в том, что вы не сможете разместить несколько буферов приема, потому что классы AuthenticateStream не поддерживают это. Операция BeginRead внутренне управляет всеми операциями ввода-вывода до тех пор, пока вы не получите весь кадр, в противном случае она не сможет обработать аутентификацию сообщения (расшифровать кадр и проверить подпись на кадре). Хотя по моему опыту работа, выполняемая классами AuthenticatedStream, довольно хороша и не должна иметь никаких проблем с этим. То есть. Вы должны быть в состоянии насыщать сеть ГБ только 4-5% процессором. Классы AuthenticatedStream также наложат на вас ограничения размера кадра для протокола (16k для SSL, 12k для Kerberos).

Это должно заставить вас начать на правильном пути. Я не собираюсь публиковать здесь код, есть отличный пример для MSDN . Я сделал много подобных проектов и смог масштабировать до 1000 пользователей, подключенных без проблем. Кроме того, вам нужно изменить разделы реестра, чтобы ядру было доступно больше дескрипторов сокетов. и убедитесь, что вы развертываете на серверной ОС, то есть W2K3, а не XP или Vista (т.е. клиентская ОС), это имеет большое значение.

Кстати, убедитесь, что если у вас есть операции с базами данных на сервере или файловый ввод-вывод, вы также используете асинхронную версию для них, или вы быстро истощите пул потоков. Для соединений с SQL Server убедитесь, что вы добавили «Asyncronous Processing = true» в строку соединения.

Ремус Русану
источник
Здесь есть отличная информация. Я хотел бы наградить награду за несколько человек. Тем не менее, я проголосовал против вас. Хорошие вещи здесь, спасибо.
Эрик Фанкенбуш
11

У меня такой сервер работает в некоторых моих решениях. Вот очень подробное объяснение различных способов сделать это в .net: стать ближе к проводу с высокопроизводительными сокетами в .NET

В последнее время я искал способы улучшить наш код и буду искать это: « Улучшения производительности сокетов в версии 3.5 », которые были специально включены «для использования приложениями, которые используют асинхронный сетевой ввод-вывод для достижения максимальной производительности».

«Главной особенностью этих улучшений является предотвращение повторного выделения и синхронизации объектов во время асинхронного ввода-вывода с большим объемом. В настоящее время шаблон проектирования Begin / End, реализованный классом Socket для асинхронного ввода-вывода через сокет, требует наличия системы. Объект IAsyncResult должен быть выделен для каждой асинхронной операции с сокетом. "

Вы можете продолжить чтение, если перейдете по ссылке. Я лично буду тестировать их пример кода завтра, чтобы сравнить его с тем, что у меня есть.

Изменить: Здесь вы можете найти рабочий код для клиента и сервера, используя новый 3.5 SocketAsyncEventArgs, так что вы можете проверить его в течение нескольких минут и перейти к коду. Это простой подход, но он является основой для начала гораздо более широкой реализации. Кроме того, эта статья, опубликованная почти два года назад в журнале MSDN, была интересной для чтения.

jvanderh
источник
9

Рассматривали ли вы только использование TCP-привязки WCF net и шаблон публикации / подписки? WCF позволит вам сосредоточиться [в основном] на вашем домене, а не на сантехнике.

В разделе загрузок IDesign есть много примеров WCF и даже инфраструктуры публикации / подписки, которая может быть полезна: http://www.idesign.net

Markt
источник
8

Меня интересует одна вещь:

Я определенно не хочу, чтобы начать поток для каждого соединения.

Это почему? Windows может обрабатывать сотни потоков в приложении, начиная хотя бы с Windows 2000. Я сделал это, с ним действительно легко работать, если потоки не нужно синхронизировать. Особенно учитывая, что вы выполняете много операций ввода-вывода (так что вы не привязаны к процессору, и многие потоки будут заблокированы на диске или в сети), я не понимаю этого ограничения.

Вы тестировали многопоточный способ и обнаружили, что в нем чего-то не хватает? Намереваетесь ли вы также иметь соединение с базой данных для каждого потока (это убьет сервер баз данных, так что это плохая идея, но ее легко решить с помощью 3-уровневого проекта). Вы беспокоитесь, что у вас будут тысячи клиентов вместо сотен, и тогда у вас действительно будут проблемы? (Хотя я бы попробовал тысячу потоков или даже десять тысяч, если бы у меня было 32+ ГБ ОЗУ - опять же, учитывая, что вы не привязаны к процессору, время переключения потоков должно быть абсолютно неважно.)

Вот код - чтобы увидеть, как это работает, перейдите на http://mdpopescu.blogspot.com/2009/05/multi-threaded-server.html и нажмите на картинку.

Класс сервера:

  public class Server
  {
    private static readonly TcpListener listener = new TcpListener(IPAddress.Any, 9999);

    public Server()
    {
      listener.Start();
      Console.WriteLine("Started.");

      while (true)
      {
        Console.WriteLine("Waiting for connection...");

        var client = listener.AcceptTcpClient();
        Console.WriteLine("Connected!");

        // each connection has its own thread
        new Thread(ServeData).Start(client);
      }
    }

    private static void ServeData(object clientSocket)
    {
      Console.WriteLine("Started thread " + Thread.CurrentThread.ManagedThreadId);

      var rnd = new Random();
      try
      {
        var client = (TcpClient) clientSocket;
        var stream = client.GetStream();
        while (true)
        {
          if (rnd.NextDouble() < 0.1)
          {
            var msg = Encoding.ASCII.GetBytes("Status update from thread " + Thread.CurrentThread.ManagedThreadId);
            stream.Write(msg, 0, msg.Length);

            Console.WriteLine("Status update from thread " + Thread.CurrentThread.ManagedThreadId);
          }

          // wait until the next update - I made the wait time so small 'cause I was bored :)
          Thread.Sleep(new TimeSpan(0, 0, rnd.Next(1, 5)));
        }
      }
      catch (SocketException e)
      {
        Console.WriteLine("Socket exception in thread {0}: {1}", Thread.CurrentThread.ManagedThreadId, e);
      }
    }
  }

Основная программа сервера:

namespace ManyThreadsServer
{
  internal class Program
  {
    private static void Main(string[] args)
    {
      new Server();
    }
  }
}

Класс клиента:

  public class Client
  {
    public Client()
    {
      var client = new TcpClient();
      client.Connect(IPAddress.Loopback, 9999);

      var msg = new byte[1024];

      var stream = client.GetStream();
      try
      {
        while (true)
        {
          int i;
          while ((i = stream.Read(msg, 0, msg.Length)) != 0)
          {
            var data = Encoding.ASCII.GetString(msg, 0, i);
            Console.WriteLine("Received: {0}", data);
          }
        }
      }
      catch (SocketException e)
      {
        Console.WriteLine("Socket exception in thread {0}: {1}", Thread.CurrentThread.ManagedThreadId, e);
      }
    }
  }

Основная программа клиента:

using System;
using System.Threading;

namespace ManyThreadsClient
{
  internal class Program
  {
    private static void Main(string[] args)
    {
      // first argument is the number of threads
      for (var i = 0; i < Int32.Parse(args[0]); i++)
        new Thread(RunClient).Start();
    }

    private static void RunClient()
    {
      new Client();
    }
  }
}
Марсель Попеску
источник
Windows может обрабатывать множество потоков, но .NET на самом деле не предназначена для их обработки. Каждый домен приложения .NET имеет пул потоков, и вы не хотите исчерпывать этот пул потоков. Я не уверен, если вы запускаете поток вручную, если он идет из пула потоков или нет. Тем не менее, сотни потоков, которые ничего не делают большую часть времени, являются огромной тратой ресурсов.
Эрик Фанкенбуш
1
Я считаю, что у вас неправильное представление потоков. Потоки приходят из пула потоков, только если вы этого действительно хотите - обычные потоки этого не делают. Сотни потоков, которые ничего не делают, ничего не теряют :) (Ну, немного памяти, но память настолько дешевая, что это больше не проблема). Я собираюсь написать пару примеров приложений для этого, я опубликую URL это когда я закончу. А пока я рекомендую вам снова прочитать то, что я написал выше, и попытаться ответить на мои вопросы.
Марсель Попеску
1
Хотя я согласен с комментарием Марселя о том, что представление потоков в созданных потоках не происходит из пула потоков, остальная часть утверждения неверна. Память не о том, сколько установлено на машине, все приложения в окнах работают в виртуальном адресном пространстве и в 32-битной системе, которые дают вам 2 ГБ данных для вашего приложения (не имеет значения, сколько оперативной памяти установлено на коробке). Они все еще должны управляться средой выполнения. Выполнение асинхронного ввода-вывода не использует поток для ожидания (он использует IOCP, который допускает перекрывающийся ввод-вывод) и является лучшим решением и гораздо лучше масштабируется.
Брайан Онейл
7
При запуске большого количества потоков проблема не в памяти, а в процессоре. Переключение контекста между потоками - это относительно дорогая операция, и чем больше у вас активных потоков, тем больше переключений контекста произойдет. Несколько лет назад я запустил тест на своем компьютере с консольным приложением C # и ок. 500 потоков моего процессора было 100%, потоки не делали ничего существенного. Для сетевых коммуникаций лучше уменьшить количество потоков.
sipwiz
1
Я бы либо пошел с решением задачи или использовать async / await. Решение задачи кажется более простым, в то время как асинхронное / ожидание, вероятно, более масштабируемо (они были специально предназначены для ситуаций, связанных с вводом-выводом).
Марсель Попеску
5

Использование встроенного в .NET Async IO ( BeginReadи т. Д.) Будет хорошей идеей, если вы сможете правильно понять все детали. Когда вы правильно настроите свои дескрипторы сокетов / файлов, он будет использовать базовую реализацию IOCP ОС, позволяя завершать ваши операции без использования каких-либо потоков (или, в худшем случае, с использованием потока, который, как я считаю, происходит из пула потоков ввода-вывода ядра) пула потоков .NET, который помогает уменьшить перегруженность пула потоков.)

Главное, что нужно сделать, это убедиться, что вы открываете свои сокеты / файлы в неблокирующем режиме. Большинство стандартных функций удобства (например File.OpenRead) не делают этого, поэтому вам нужно написать свои собственные.

Еще одна важная проблема - обработка ошибок - правильная обработка ошибок при написании асинхронного кода ввода-вывода намного сложнее, чем в синхронном коде. Также очень легко получить условия гонки и взаимоблокировки, даже если вы не используете потоки напрямую, поэтому вы должны знать об этом.

Если возможно, вы должны попытаться использовать вспомогательную библиотеку, чтобы облегчить процесс масштабируемого асинхронного ввода-вывода.

Microsoft Concurrency Coordination Runtime - один из примеров библиотеки .NET, созданной для облегчения трудностей, связанных с программированием такого рода. Это выглядит великолепно, но, поскольку я не использовал его, я не могу комментировать, насколько хорошо он будет масштабироваться.

Для моих личных проектов, в которых требуется асинхронный сетевой или дисковый ввод-вывод, я использую набор инструментов .NET для параллелизма / ввода-вывода, который я создал за последний год и который называется Squared.Task . Он вдохновлен библиотеками, такими как imvu.task и twisted , и я включил несколько рабочих примеров в репозиторий, которые выполняют сетевой ввод-вывод. Я также использовал его в нескольких написанных мною приложениях. Самым крупным из них является NDexer (который использует его для дискового ввода-вывода без потоков ). Библиотека была написана на основе моего опыта работы с imvu.task и имеет набор довольно комплексных модульных тестов, поэтому я настоятельно рекомендую вам попробовать ее. Если у вас есть какие-либо проблемы с этим, я был бы рад предложить вам некоторую помощь.

По моему мнению, основанный на моем опыте использования асинхронного / поточного ввода-вывода вместо потоков - это стоящее начинание на платформе .NET, если вы готовы справиться с кривой обучения. Это позволяет избежать проблем с масштабируемостью, налагаемых стоимостью объектов Thread, и во многих случаях вы можете полностью избежать использования блокировок и мьютексов, осторожно используя примитивы параллелизма, такие как Futures / Promises.

Кейтлин Гадд
источник
Отличная информация, я проверю ваши отзывы и посмотрю, что имеет смысл.
Эрик Фанкенбуш
3

Я использовал решение Кевина, но он говорит, что в решении отсутствует код для повторной сборки сообщений. Разработчики могут использовать этот код для повторной сборки сообщений:

private static void ReceiveCallback(IAsyncResult asyncResult )
{
    ClientInfo cInfo = (ClientInfo)asyncResult.AsyncState;

    cInfo.BytesReceived += cInfo.Soket.EndReceive(asyncResult);
    if (cInfo.RcvBuffer == null)
    {
        // First 2 byte is lenght
        if (cInfo.BytesReceived >= 2)
        {
            //this calculation depends on format which your client use for lenght info
            byte[] len = new byte[ 2 ] ;
            len[0] = cInfo.LengthBuffer[1];
            len[1] = cInfo.LengthBuffer[0];
            UInt16 length = BitConverter.ToUInt16( len , 0);

            // buffering and nulling is very important
            cInfo.RcvBuffer = new byte[length];
            cInfo.BytesReceived = 0;

        }
    }
    else
    {
        if (cInfo.BytesReceived == cInfo.RcvBuffer.Length)
        {
             //Put your code here, use bytes comes from  "cInfo.RcvBuffer"

             //Send Response but don't use async send , otherwise your code will not work ( RcvBuffer will be null prematurely and it will ruin your code)

            int sendLenghts = cInfo.Soket.Send( sendBack, sendBack.Length, SocketFlags.None);

            // buffering and nulling is very important
            //Important , set RcvBuffer to null because code will decide to get data or 2 bte lenght according to RcvBuffer's value(null or initialized)
            cInfo.RcvBuffer = null;
            cInfo.BytesReceived = 0;
        }
    }

    ContinueReading(cInfo);
 }

private static void ContinueReading(ClientInfo cInfo)
{
    try 
    {
        if (cInfo.RcvBuffer != null)
        {
            cInfo.Soket.BeginReceive(cInfo.RcvBuffer, cInfo.BytesReceived, cInfo.RcvBuffer.Length - cInfo.BytesReceived, SocketFlags.None, ReceiveCallback, cInfo);
        }
        else
        {
            cInfo.Soket.BeginReceive(cInfo.LengthBuffer, cInfo.BytesReceived, cInfo.LengthBuffer.Length - cInfo.BytesReceived, SocketFlags.None, ReceiveCallback, cInfo);
        }
    }
    catch (SocketException se)
    {
        //Handle exception and  Close socket here, use your own code 
        return;
    }
    catch (Exception ex)
    {
        //Handle exception and  Close socket here, use your own code 
        return;
    }
}

class ClientInfo
{
    private const int BUFSIZE = 1024 ; // Max size of buffer , depends on solution  
    private const int BUFLENSIZE = 2; // lenght of lenght , depends on solution
    public int BytesReceived = 0 ;
    public byte[] RcvBuffer { get; set; }
    public byte[] LengthBuffer { get; set; }

    public Socket Soket { get; set; }

    public ClientInfo(Socket clntSock)
    {
        Soket = clntSock;
        RcvBuffer = null;
        LengthBuffer = new byte[ BUFLENSIZE ];
    }   

}

public static void AcceptCallback(IAsyncResult asyncResult)
{

    Socket servSock = (Socket)asyncResult.AsyncState;
    Socket clntSock = null;

    try
    {

        clntSock = servSock.EndAccept(asyncResult);

        ClientInfo cInfo = new ClientInfo(clntSock);

        Receive( cInfo );

    }
    catch (SocketException se)
    {
        clntSock.Close();
    }
}
private static void Receive(ClientInfo cInfo )
{
    try
    {
        if (cInfo.RcvBuffer == null)
        {
            cInfo.Soket.BeginReceive(cInfo.LengthBuffer, 0, 2, SocketFlags.None, ReceiveCallback, cInfo);

        }
        else
        {
            cInfo.Soket.BeginReceive(cInfo.RcvBuffer, 0, cInfo.BytesReceived, SocketFlags.None, ReceiveCallback, cInfo);

        }

    }
    catch (SocketException se)
    {
        return;
    }
    catch (Exception ex)
    {
        return;
    }

}
Ахмет Арслан
источник
1

Вы можете попробовать использовать среду под названием ACE (Adaptive Communications Environment), которая является общей платформой C ++ для сетевых серверов. Это очень солидный, зрелый продукт, разработанный для поддержки высоконадежных приложений большого объема вплоть до телекоммуникационного класса.

Фреймворк имеет дело с довольно широким спектром моделей параллелизма и, вероятно, имеет подходящую для вашего приложения из коробки. Это должно упростить отладку системы, так как большинство неприятных проблем с параллелизмом уже решено. Компромисс здесь в том, что фреймворк написан на C ++ и не самый теплый и пушистый из кодовых баз. С другой стороны, вы получаете протестированную сетевую инфраструктуру промышленного класса и масштабируемую архитектуру из коробки.

ConcernedOfTunbridgeWells
источник
2
Это хорошее предложение, но из тегов вопроса я считаю, что OP будет использовать C #
JPCosta
Я заметил, что; было высказано предположение, что это доступно для C ++, и я не знаю ничего эквивалентного для C #. Отладка подобного рода системы в лучшем случае непроста, и вы можете получить отдачу от перехода на эту среду, даже если это означает переход на C ++.
ConcernedOfTunbridgeWells
Да, это C #. Я ищу хорошие решения на основе .net. Я должен был быть более ясным, но я предполагал, что люди будут читать теги
Эрик Фанкенбуш
1

Ну, сокеты .NET, кажется, предоставляют select () - это лучше всего подходит для обработки ввода. Для вывода у меня будет пул потоков, записывающих сокеты, которые прослушивают рабочую очередь, принимая дескриптор / объект сокета как часть рабочего элемента, поэтому вам не нужен поток на сокет.

Николай Фетиссов
источник
1

Я бы использовал методы AcceptAsync / ConnectAsync / ReceiveAsync / SendAsync, которые были добавлены в .Net 3.5. Я сделал тест, и он примерно на 35% быстрее (время отклика и битрейт), при этом 100 пользователей постоянно отправляют и получают данные.


источник
1

чтобы люди скопировали и вставили принятый ответ, вы можете переписать метод acceptCallback, удалив все вызовы _serverSocket.BeginAccept (new AsyncCallback (acceptCallback), _serverSocket); и поместите его в предложение finally {} следующим образом:

private void acceptCallback(IAsyncResult result)
    {
       xConnection conn = new xConnection();
       try
       {
         //Finish accepting the connection
         System.Net.Sockets.Socket s = (System.Net.Sockets.Socket)result.AsyncState;
         conn = new xConnection();
         conn.socket = s.EndAccept(result);
         conn.buffer = new byte[_bufferSize];
         lock (_sockets)
         {
           _sockets.Add(conn);
         }
         //Queue recieving of data from the connection
         conn.socket.BeginReceive(conn.buffer, 0, conn.buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), conn);
       }
       catch (SocketException e)
       {
         if (conn.socket != null)
         {
           conn.socket.Close();
           lock (_sockets)
           {
             _sockets.Remove(conn);
           }
         }
       }
       catch (Exception e)
       {
         if (conn.socket != null)
         {
           conn.socket.Close();
           lock (_sockets)
           {
             _sockets.Remove(conn);
           }
         }
       }
       finally
       {
         //Queue the next accept, think this should be here, stop attacks based on killing the waiting listeners
         _serverSocket.BeginAccept(new AsyncCallback(acceptCallback), _serverSocket);       
       }
     }

Вы могли бы даже удалить первый улов, поскольку его содержимое такое же, но это метод шаблона, и вы должны использовать типизированное исключение, чтобы лучше обрабатывать исключения и понимать причину ошибки, поэтому просто реализуйте эти уловки с помощью некоторого полезного кода

Не важный
источник
0

Я бы порекомендовал прочитать эти книги на ACE

чтобы получить представление о шаблонах, позволяющих создать эффективный сервер.

Хотя ACE реализован на C ++, книги охватывают множество полезных шаблонов, которые можно использовать на любом языке программирования.

Лотар
источник
-1

Чтобы быть ясным, я ищу решения на основе .net (C #, если возможно, но любой язык .net будет работать)

Вы не получите наивысший уровень масштабируемости, если будете использовать только .NET. GC паузы могут препятствовать задержке.

Мне нужно будет запустить хотя бы один поток для сервиса. Я рассматриваю возможность использования Asynch API (BeginRecieve и т. Д.), Поскольку я не знаю, сколько клиентов я подключу в любой момент времени (возможно, сотни). Я определенно не хочу, чтобы начать поток для каждого соединения.

Перекрытый ввод-вывод обычно считается самым быстрым интерфейсом Windows для сетевого взаимодействия. Я не знаю, совпадает ли это с вашим Asynch API. Не используйте select, поскольку каждый вызов должен проверять каждый открытый сокет, вместо того, чтобы иметь обратные вызовы на активных сокетах.

неизвестный
источник
1
Я не понимаю ваш комментарий о паузе в GC. Я никогда не видел систему с проблемами масштабируемости, которая была бы напрямую связана с GC.
Markt
4
Гораздо более вероятно, что вы создадите приложение, которое не может масштабироваться из-за плохой архитектуры, а не потому, что существует GC. Огромные масштабируемые + производительные системы были построены как на .NET, так и на Java. В обеих приведенных вами ссылках причина была не в сборке мусора, а в свопинге кучи. Я подозреваю, что это действительно проблема с архитектурой, которой можно было бы избежать. Если вы покажете мне язык, на котором невозможно построить систему, которая не может масштабироваться, я с удовольствием ее использую;)
markt
1
Я не согласен с этим комментарием. Неизвестно, вопросы, на которые вы ссылаетесь, касаются Java, и они конкретно касаются больших выделений памяти и пытаются вручную вызвать gc. У меня не будет большого количества распределения памяти здесь. Это просто не проблема. Но спасибо. Да, модель асинхронного программирования обычно реализуется поверх перекрывающегося ввода-вывода.
Эрик Фанкенбуш
1
На самом деле, лучшая практика не должна постоянно заставлять GC вручную собирать. Это может очень сильно ухудшить работу вашего приложения. .NET GC - это поколение GC, которое будет настраиваться на использование вашего приложения. Если вы действительно думаете, что вам нужно вручную вызывать GC.Collect, я бы сказал, что ваш код, скорее всего, должен быть написан по-другому ...
markt
1
@markt, это комментарий для людей, которые ничего не знают о сборке мусора. Если у вас простой, нет ничего плохого в том, чтобы делать сбор вручную. Это не сделает ваше приложение хуже, когда оно закончится. Академические работы показывают, что генераторы ГК работают, потому что это приблизительное время жизни ваших объектов. Очевидно, это не идеальное представление. На самом деле, существует парадокс, когда «старшее» поколение часто имеет самое высокое соотношение мусора, потому что оно никогда не собирается.
неизвестно
-1

Вы можете использовать Push Framework с открытым исходным кодом для высокопроизводительной разработки серверов. Он построен на IOCP и подходит для push-сценариев и трансляции сообщений.

http://www.pushframework.com

charfeddine.ahmed
источник
1
Этот пост был отмечен C # и .net. Почему вы предложили платформу C ++?
Эрик Фанкенбуш
Вероятно, потому что он написал это. potatosoftware.com/…
quillbreaker
pushframework поддерживает несколько экземпляров сервера? если нет, то как оно масштабируется?
Эсскар