Ведение журнала асинхронно - как это сделать?

11

Во многих службах, над которыми я работаю, ведется много регистрации. Сервисы - это сервисы WCF (в основном), которые используют класс .NET EventLogger.

Я нахожусь в процессе улучшения производительности этих сервисов, и я подумал, что асинхронное ведение журнала повысит производительность.

Я не знаю, что происходит, когда несколько потоков просят войти в систему, и если это создает узкое место, но даже если это не так, я все еще думаю, что это не должно мешать действительному процессу, который выполняется.

Я думаю, что я должен вызвать тот же метод журнала, который я сейчас вызываю, но сделать это с использованием нового потока, продолжая при этом сам процесс.

Несколько вопросов об этом:

Это нормально?

Есть ли минусы?

Должно ли это быть сделано по-другому?

Может быть, это так быстро, что это даже не стоит усилий?

Mithir
источник
1
Профилировали ли вы среду (ы) выполнения, чтобы знать, что ведение журналов оказывает ощутимое влияние на производительность? Компьютеры слишком сложны, чтобы просто думать, что что-то может быть медленным, измерять дважды и один раз резать - это хороший совет для любой профессии =)
Патрик Хьюз
@PatrickHughes - некоторые статистические данные из моих тестов по одному конкретному запросу: 61 (!!) сообщение журнала, 150 мс до выполнения какой-то простой обработки, 90 мс после. так что это на 40% быстрее.
Mithir

Ответы:

14

Отдельная тема для операции ввода-вывода звучит разумно.

Например, было бы не хорошо регистрировать, какие кнопки пользователь нажал в том же потоке пользовательского интерфейса. Такой пользовательский интерфейс будет зависать наугад и иметь медленную воспринимаемую производительность .

Решение состоит в том, чтобы отделить событие от его обработки.

Здесь много информации о проблемах производителей и потребителей и очереди событий из мира разработки игр

Часто есть такой код

///Never do this!!!
public void WriteLog_Like_Bastard(string msg)
{
    lock (_lockBecauseILoveThreadContention)
    {
        File.WriteAllText("c:\\superApp.log", msg);
    }
}

Этот подход приведет к конфликту потоков. Все обрабатывающие потоки будут бороться, чтобы иметь возможность получить блокировку и запись в один и тот же файл одновременно.

Некоторые могут попытаться снять замки.

public void Log_Like_Dumbass(string msg)
{
      try 
      {  File.Append("c:\\superApp.log", msg); }
        catch (Exception ex) 
        {
            MessageBox.Show("Log file may be locked by other process...")
        }
      }    
}

Невозможно предсказать результат, если 2 потока будут вводить метод одновременно.

Так что со временем разработчики вообще отключат ведение журнала ...

Можно ли это исправить?

Да.

Допустим, у нас есть интерфейс:

 public interface ILogger
 {
    void Debug(string message);
    // ... etc
    void Fatal(string message);
 }

Вместо того, чтобы ждать блокировки и выполнять блокировку файловой операции при каждом ILoggerвызове, мы добавим новое сообщение LogMessage в очередь сообщений Penging и вернемся к более важным вещам:

public class AsyncLogger : ILogger
{
    private readonly BlockingCollection<LogMessage> _pendingMessages;
    private readonly Type _loggerFor;
    private readonly IThreadAdapter _threadAdapter;

    public AsyncLogger(BlockingCollection<LogMessage> pendingMessages, Type loggerFor, IThreadAdapter threadAdapter)
    {
        _pendingMessages = pendingMessages;
        _loggerFor = loggerFor;
        _threadAdapter = threadAdapter;
    }

    public void Debug(string message)
    {
        Push(LoggingLevel.Debug, message);
    }

    public void Fatal(string message)
    {
        Push(LoggingLevel.Fatal, message);
    }

    private void Push(LoggingLevel importance, string message)
    {
        // since we do not know when our log entry will be written to disk, remember current time
        var timestamp = DateTime.Now;
        var threadId = _threadAdapter.GetCurrentThreadId();

        // adds message to the queue in lock-free manner and immediately returns control to caller
        _pendingMessages.Add(LogMessage.Create(timestamp, importance, message, _loggerFor, threadId));
    }
}

Мы сделали с этим простым Asynchronous Logger .

Следующим шагом является обработка входящих сообщений.

Для простоты, давайте запустим новый поток и будем ждать до тех пор, пока приложение не закроется, или Asynchronous Logger добавит новое сообщение в очередь ожидания .

public class LoggingQueueDispatcher : IQueueDispatcher
{
    private readonly BlockingCollection<LogMessage> _pendingMessages;
    private readonly IEnumerable<ILogListener> _listeners;
    private readonly IThreadAdapter _threadAdapter;
    private readonly ILogger _logger;
    private Thread _dispatcherThread;

    public LoggingQueueDispatcher(BlockingCollection<LogMessage> pendingMessages, IEnumerable<ILogListener> listeners, IThreadAdapter threadAdapter, ILogger logger)
    {
        _pendingMessages = pendingMessages;
        _listeners = listeners;
        _threadAdapter = threadAdapter;
        _logger = logger;
    }

    public void Start()
    {
        //  Here I use 'new' operator, only to simplify example. Should be using interface  '_threadAdapter.CreateBackgroundThread' to allow unit testing
        Thread thread = new Thread(MessageLoop);
        thread.Name = "LoggingQueueDispatcher Thread";
        thread.IsBackground = true;

        thread.Start();
        _logger.Debug("Asked to start log message Dispatcher ");

        _dispatcherThread = thread;
    }

    public bool WaitForCompletion(TimeSpan timeout)
    {
        return _dispatcherThread.Join(timeout);
    }

    private void MessageLoop()
    {
        _logger.Debug("Entering dispatcher message loop...");
        var cancellationToken = new CancellationTokenSource();
        LogMessage message;

        while (_pendingMessages.TryTake(out message, Timeout.Infinite, cancellationToken.Token))
        {
            // !!!!! Now it is safe to use File.AppendAllText("c:\\my.log") without ever using lock or forcing important threads to wait.
            // this is example, do not use in production
            foreach (var listener in _listeners)
            {
                listener.Log(message);
            }
        }

    }
}

Я прохожу цепочку заказных слушателей. Возможно, вы захотите просто отправить каркас регистрации вызовов ( log4netи т. Д.)

Вот остаток кода:

public enum LoggingLevel
{
    Debug,
    // ... etc
    Fatal,
}


public class LogMessage
{
    public DateTime Timestamp { get; private set; }
    public LoggingLevel Importance { get; private set; }
    public string Message { get; private set; }
    public Type Source { get; private set; }
    public int ThreadId { get; private set; }

    private LogMessage(DateTime timestamp, LoggingLevel importance, string message, Type source, int threadId)
    {
        Timestamp = timestamp;
        Message = message;
        Source = source;
        ThreadId = threadId;
        Importance = importance;
    }

    public static LogMessage Create(DateTime timestamp, LoggingLevel importance, string message, Type source, int threadId)
    {
        return  new LogMessage(timestamp, importance, message, source, threadId);
    }

    public override string ToString()
    {
        return string.Format("{0}  [TID:{4}] {1:h:mm:ss} ({2})\t{3}", Importance, Timestamp, Source, Message, ThreadId);
    }
}

public class LoggerFactory : ILoggerFactory
{
    private readonly BlockingCollection<LogMessage> _pendingMessages;
    private readonly IThreadAdapter _threadAdapter;

    private readonly ConcurrentDictionary<Type, ILogger> _loggersCache = new ConcurrentDictionary<Type, ILogger>();


    public LoggerFactory(BlockingCollection<LogMessage> pendingMessages, IThreadAdapter threadAdapter)
    {
        _pendingMessages = pendingMessages;
        _threadAdapter = threadAdapter;
    }

    public ILogger For(Type loggerFor)
    {
        return _loggersCache.GetOrAdd(loggerFor, new AsyncLogger(_pendingMessages, loggerFor, _threadAdapter));
    }
}

public class ThreadAdapter : IThreadAdapter
{
    public int GetCurrentThreadId()
    {
        return Thread.CurrentThread.ManagedThreadId;
    }
}

public class ConsoleLogListener : ILogListener
{
    public void Log(LogMessage message)
    {
        Console.WriteLine(message.ToString());
        Debug.WriteLine(message.ToString());
    }
}

public class SimpleTextFileLogger : ILogListener
{
    private readonly IFileSystem _fileSystem;
    private readonly string _userRoamingPath;
    private readonly string _logFileName;
    private FileStream _fileStream;

    public SimpleTextFileLogger(IFileSystem fileSystem, string userRoamingPath, string logFileName)
    {
        _fileSystem = fileSystem;
        _userRoamingPath = userRoamingPath;
        _logFileName = logFileName;
    }

    public void Start()
    {
        _fileStream = new FileStream(_fileSystem.Path.Combine(_userRoamingPath, _logFileName), FileMode.Append);
    }

    public void Stop()
    {
        if (_fileStream != null)
        {
            _fileStream.Dispose();
        }
    }

    public void Log(LogMessage message)
    {
        var bytes = Encoding.UTF8.GetBytes(message.ToString() + Environment.NewLine);
        _fileStream.Write(bytes, 0, bytes.Length);
    }
}

public interface ILoggerFactory
{
    ILogger For(Type loggerFor);
}

public interface ILogListener
{
    void Log(LogMessage message);
}

public interface IThreadAdapter
{
    int GetCurrentThreadId();
}

public interface IQueueDispatcher
{
    void Start();
}

Точка входа:

public static class Program
{
    public static void Main()
    {
        Debug.WriteLine("[Program] Entering Main ...");

        var pendingLogQueue = new BlockingCollection<LogMessage>();


        var threadAdapter = new ThreadAdapter();
        var loggerFactory = new LoggerFactory(pendingLogQueue, threadAdapter);


        var fileSystem = new FileSystem();
        var userRoamingPath = GetUserDataDirectory(fileSystem);

        var simpleTextFileLogger = new SimpleTextFileLogger(fileSystem, userRoamingPath, "log.txt");
        simpleTextFileLogger.Start();
        ILogListener consoleListener = new ConsoleLogListener();
        ILogListener[] listeners = new [] { simpleTextFileLogger , consoleListener};

        var loggingQueueDispatcher = new LoggingQueueDispatcher(pendingLogQueue, listeners, threadAdapter, loggerFactory.For(typeof(LoggingQueueDispatcher)));
        loggingQueueDispatcher.Start();

        var logger = loggerFactory.For(typeof(Console));

        string line;
        while ((line = Console.ReadLine()) != "exit")
        {
            logger.Debug("you have entered: " + line);
        }

        logger.Fatal("Exiting...");

        Debug.WriteLine("[Program] pending LogQueue will be stopped now...");
        pendingLogQueue.CompleteAdding();
        var logQueueCompleted = loggingQueueDispatcher.WaitForCompletion(TimeSpan.FromSeconds(5));

        simpleTextFileLogger.Stop();
        Debug.WriteLine("[Program] Exiting... logQueueCompleted: " + logQueueCompleted);

    }



    private static string GetUserDataDirectory(FileSystem fileSystem)
    {
        var roamingDirectory = Environment.GetFolderPath(Environment.SpecialFolder.ApplicationData);
        var userDataDirectory = fileSystem.Path.Combine(roamingDirectory, "Async Logging Sample");
        if (!fileSystem.Directory.Exists(userDataDirectory))
            fileSystem.Directory.CreateDirectory(userDataDirectory);
        return userDataDirectory;
    }
}
Глеб Севрук
источник
1

Ключевыми факторами, которые необходимо учитывать, являются ваша потребность в надежности файлов журналов и потребность в производительности. Обратитесь к минусам. Я думаю, что это отличная стратегия для ситуаций с высокими показателями.

Это нормально - да

Существуют ли какие-либо недостатки - да, - в зависимости от критичности вашего ведения журнала и вашей реализации может произойти любое из следующих действий - журналы записываются не по порядку, действия потока журнала не завершаются до завершения действий события. (Представьте себе сценарий, в котором вы регистрируете «начало соединения с БД», а затем сбиваете сервер, событие журнала может никогда не записаться, даже если событие произошло (!))

Если это будет сделано по-другому - вы можете посмотреть на модель Disruptor, так как она почти идеальна для этого сценария

Может быть, это так быстро, что это даже не стоит усилий - не согласен. Если у вас логика «приложения», и единственное, что вы делаете, - это записывает логи активности - тогда вы будете на несколько порядков меньше задерживать за счет разгрузки журналов. Однако, если вы полагаетесь на 5-секундный вызов БД SQL для возврата перед регистрацией 1-2 операторов, преимущества будут неоднозначными.

jasonk
источник
1

Я думаю, что регистрация - вообще синхронная операция по своей природе. Вы хотите регистрировать вещи, если они происходят, или если они не зависят от вашей логики, поэтому, чтобы что-то регистрировать, эта вещь должна быть оценена в первую очередь.

Сказав это, вы можете улучшить производительность своего приложения, кэшируя логи, а затем создавая поток и сохраняя их в файлы, когда у вас есть связанная с процессором операция.

Вам нужно грамотно идентифицировать свои контрольные точки, чтобы вы не потеряли важную информацию для регистрации в течение этого периода кэширования.

Если вы хотите повысить производительность своих потоков, вам необходимо сбалансировать операции ввода-вывода и загрузки процессора.

Если вы создадите 10 потоков, которые все выполняют IO, то вы не получите повышения производительности.

Мерт Акчакая
источник
Как бы вы предложили кэширование журналов? в большинстве сообщений журнала есть специфичные для запроса элементы, чтобы идентифицировать их, в моем сервисе такие же запросы встречаются редко.
Mithir
0

Ведение журнала асинхронно - единственный путь, если вам нужны низкие задержки в потоках журналирования. То, как это делается для максимальной производительности, заключается в использовании схемы прерывателя для обмена потоками без блокировок и без мусора. Теперь, если вы хотите разрешить нескольким потокам одновременно регистрировать один и тот же файл, вы должны либо синхронизировать вызовы журнала и заплатить цену за конфликт блокировки, либо использовать мультиплексор без блокировки. Например, CoralQueue предоставляет простую очередь мультиплексирования, как описано ниже:

введите описание изображения здесь

Вы можете взглянуть на CoralLog, который использует эти стратегии для асинхронной регистрации.

Отказ от ответственности: я являюсь одним из разработчиков CoralQueue и CoralLog.

rdalmeida
источник