Как я могу предотвратить синхронное продолжение задачи?

83

У меня есть код библиотеки (сети сокетов), который предоставляет TaskAPI-интерфейс для ожидающих ответов на запросы на основе TaskCompletionSource<T>. Однако в TPL есть недостаток, заключающийся в невозможности предотвратить синхронное продолжение. Я бы хотел либо:

  • сообщить, TaskCompletionSource<T>что это не должно позволять вызывающим абонентам подключаться TaskContinuationOptions.ExecuteSynchronously, или
  • установите результат ( SetResult/ TrySetResult) таким образом, чтобы указать, что его TaskContinuationOptions.ExecuteSynchronouslyследует игнорировать, используя вместо этого пул

В частности, у меня есть проблема, заключающаяся в том, что входящие данные обрабатываются выделенным читателем, и если вызывающий абонент может подключиться, TaskContinuationOptions.ExecuteSynchronouslyон может остановить читателя (что влияет не только на них). Раньше я работал над этим с помощью некоторого хакера, который обнаруживал, присутствуют ли какие-либо продолжения, и если они есть, он подталкивает завершение к ThreadPool, однако это оказывает значительное влияние, если вызывающий объект насыщает свою рабочую очередь, поскольку завершение не будет обработано своевременно. Если они используют Task.Wait()(или что-то подобное), они по существу зайдут в тупик. Точно так же именно поэтому читатель находится в выделенном потоке, а не использует рабочие.

Так; прежде, чем я попытаюсь придирать команду TPL: я упускаю вариант?

Ключевые моменты:

  • Я не хочу, чтобы внешние абоненты могли захватить мою беседу
  • Я не могу использовать его ThreadPoolкак реализацию, так как он должен работать, когда пул насыщен

В приведенном ниже примере производится вывод (порядок может варьироваться в зависимости от времени):

Continuation on: Main thread
Press [return]
Continuation on: Thread pool

Проблема заключается в том, что случайному вызывающему абоненту удалось получить продолжение в «Основном потоке». В реальном коде это прервало бы работу основного читателя; плохие вещи!

Код:

using System;
using System.Threading;
using System.Threading.Tasks;

static class Program
{
    static void Identify()
    {
        var thread = Thread.CurrentThread;
        string name = thread.IsThreadPoolThread
            ? "Thread pool" : thread.Name;
        if (string.IsNullOrEmpty(name))
            name = "#" + thread.ManagedThreadId;
        Console.WriteLine("Continuation on: " + name);
    }
    static void Main()
    {
        Thread.CurrentThread.Name = "Main thread";
        var source = new TaskCompletionSource<int>();
        var task = source.Task;
        task.ContinueWith(delegate {
            Identify();
        });
        task.ContinueWith(delegate {
            Identify();
        }, TaskContinuationOptions.ExecuteSynchronously);
        source.TrySetResult(123);
        Console.WriteLine("Press [return]");
        Console.ReadLine();
    }
}
Марк Гравелл
источник
2
Я хотел бы попробовать завернуть TaskCompletionSourceс моим собственным API , чтобы предотвратить прямой вызов ContinueWith, так как ни TaskCompletionSource, ни Taskне хорошо подходят для наследования от них.
Деннис
1
@Dennis, чтобы прояснить, на самом деле Taskраскрывается то, что выставлено, а не TaskCompletionSource. Это (предоставление другого API) технически вариант, но это довольно экстремальная вещь, которую нужно делать только для этого ... Я не уверен, что это оправдывает это
Марк Гравелл
2
@MattH не совсем - он просто перефразирует вопрос: либо вы используете ThreadPoolдля этого (о чем я уже упоминал - это вызывает проблемы), либо у вас есть выделенный поток «ожидающих продолжения», и тогда они (продолжения с ExecuteSynchronouslyуказанным) могут захватить это один вместо этого - что вызывает точно такую ​​же проблему, потому что это означает, что продолжение для других сообщений может быть остановлено, что снова влияет на несколько абонентов
Марк Грейвелл
3
@Andrey, который (он работает так, как будто все вызывающие абоненты использовали ContinueWith без exec-sync) - это именно то, чего я хочу достичь. Проблема в том, что если моя библиотека передаст кому-то Задачу, они могут сделать что-то очень нежелательное: они могут прервать мой читатель (нежелательно) с помощью exec-sync. Это очень опасно, поэтому я хотел бы предотвратить его попадание в библиотеку .
Марк Грейвелл
2
@Andrey, потому что а: многие задачи вообще никогда не получают продолжения (особенно при пакетной работе) - это заставит каждую задачу иметь одно, и б: даже те, у которых было бы продолжение, теперь намного сложнее, накладные расходы и рабочие операции. Это важно.
Marc Gravell

Ответы:

50

Новое в .NET 4.6:

.NET 4.6 содержит новый TaskCreationOptions: RunContinuationsAsynchronously.


Поскольку вы хотите использовать Reflection для доступа к закрытым полям ...

Вы можете пометить Задачу TCS с помощью TASK_STATE_THREAD_WAS_ABORTEDфлага, что приведет к тому, что все продолжения не будут встроены.

const int TASK_STATE_THREAD_WAS_ABORTED = 134217728;

var stateField = typeof(Task).GetField("m_stateFlags", BindingFlags.NonPublic | BindingFlags.Instance);
stateField.SetValue(task, (int) stateField.GetValue(task) | TASK_STATE_THREAD_WAS_ABORTED);

Редактировать:

Вместо использования Reflection emit я предлагаю вам использовать выражения. Это гораздо более читабельно и имеет то преимущество, что он совместим с PCL:

var taskParameter = Expression.Parameter(typeof (Task));
const string stateFlagsFieldName = "m_stateFlags";
var setter =
    Expression.Lambda<Action<Task>>(
        Expression.Assign(Expression.Field(taskParameter, stateFlagsFieldName),
            Expression.Or(Expression.Field(taskParameter, stateFlagsFieldName),
                Expression.Constant(TASK_STATE_THREAD_WAS_ABORTED))), taskParameter).Compile();

Без использования Reflection:

Если кому-то интересно, я нашел способ сделать это без Reflection, но он тоже немного «грязный» и, конечно, несет в себе немалый штраф за производительность:

try
{
    Thread.CurrentThread.Abort();
}
catch (ThreadAbortException)
{
    source.TrySetResult(123);
    Thread.ResetAbort();
}
Эли Арбель
источник
3
@MarcGravell Используйте это, чтобы создать некоторый псевдо-образец для команды TPL и сделать запрос на изменение о возможности сделать это с помощью параметров конструктора или чего-то еще.
Адам Хоулдсворт
1
@ Адам, да, если бы вам пришлось назвать этот флаг «что он делает», а не «что его вызывает», это было бы что-то вроде TaskCreationOptions.DoNotInline- и даже не потребовалось бы изменение подписи ctor наTaskCompletionSource
Marc Gravell
2
@AdamHouldsworth и не волнуйтесь, я уже отправляю им то же самое по электронной почте; p
Марк Грейвелл
1
Для вашего интереса: вот он, оптимизированный с помощью ILGeneratoretc: github.com/StackExchange/StackExchange.Redis/blob/master/…
Марк Грейвелл
1
@Noseratio ага, проверил - спасибо; они все в порядке ИМО; Я согласен, что это чистый обходной путь, но он дает точно правильные результаты.
Марк Грейвелл
9

Я не думаю, что в TPL есть что-то, что могло бы явный API-контроль над TaskCompletionSource.SetResultпродолжениями. Я решил оставить свой первоначальный ответ для контроля этого поведения наasync/await сценариев.

Вот еще одно решение, которое требует асинхронности ContinueWith, если tcs.SetResult-triggered продолжение происходит в том же потоке, который SetResultбыл вызван:

public static class TaskExt
{
    static readonly ConcurrentDictionary<Task, Thread> s_tcsTasks =
        new ConcurrentDictionary<Task, Thread>();

    // SetResultAsync
    static public void SetResultAsync<TResult>(
        this TaskCompletionSource<TResult> @this,
        TResult result)
    {
        s_tcsTasks.TryAdd(@this.Task, Thread.CurrentThread);
        try
        {
            @this.SetResult(result);
        }
        finally
        {
            Thread thread;
            s_tcsTasks.TryRemove(@this.Task, out thread);
        }
    }

    // ContinueWithAsync, TODO: more overrides
    static public Task ContinueWithAsync<TResult>(
        this Task<TResult> @this,
        Action<Task<TResult>> action,
        TaskContinuationOptions continuationOptions = TaskContinuationOptions.None)
    {
        return @this.ContinueWith((Func<Task<TResult>, Task>)(t =>
        {
            Thread thread = null;
            s_tcsTasks.TryGetValue(t, out thread);
            if (Thread.CurrentThread == thread)
            {
                // same thread which called SetResultAsync, avoid potential deadlocks

                // using thread pool
                return Task.Run(() => action(t));

                // not using thread pool (TaskCreationOptions.LongRunning creates a normal thread)
                // return Task.Factory.StartNew(() => action(t), TaskCreationOptions.LongRunning);
            }
            else
            {
                // continue on the same thread
                var task = new Task(() => action(t));
                task.RunSynchronously();
                return Task.FromResult(task);
            }
        }), continuationOptions).Unwrap();
    }
}

Обновлено с учетом комментария:

Я не контролирую вызывающего абонента - я не могу заставить его использовать конкретный вариант продолжения работы: если бы я мог, проблемы бы вообще не существовало

Я не знал, что вы не контролируете звонящего. Тем не менее, если вы не контролируете его, вы, вероятно, также не передаете TaskCompletionSourceобъект непосредственно вызывающей стороне. По логике вещей, вы должны передать его токен , т.е.tcs.Task . В этом случае решение может быть еще проще, если добавить еще один метод расширения к приведенному выше:

// ImposeAsync, TODO: more overrides
static public Task<TResult> ImposeAsync<TResult>(this Task<TResult> @this)
{
    return @this.ContinueWith(new Func<Task<TResult>, Task<TResult>>(antecedent =>
    {
        Thread thread = null;
        s_tcsTasks.TryGetValue(antecedent, out thread);
        if (Thread.CurrentThread == thread)
        {
            // continue on a pool thread
            return antecedent.ContinueWith(t => t, 
                TaskContinuationOptions.None).Unwrap();
        }
        else
        {
            return antecedent;
        }
    }), TaskContinuationOptions.ExecuteSynchronously).Unwrap();
}

Использование:

// library code
var source = new TaskCompletionSource<int>();
var task = source.Task.ImposeAsync();
// ... 

// client code
task.ContinueWith(delegate
{
    Identify();
}, TaskContinuationOptions.ExecuteSynchronously);

// ...
// library code
source.SetResultAsync(123);

На самом деле это работает как для awaitandContinueWith ( fiddle ), так и без рефлексии.

нос
источник
1
Я не контролирую вызывающего абонента - я не могу заставить его использовать конкретный вариант продолжения работы: если бы я мог, проблема бы вообще не существовала
Марк Грейвелл
@MarcGravell, я не знал, что вы не можете контролировать звонящего. Я опубликовал обновление о том, как я с этим справлюсь.
Носератио
дилемма автора библиотеки; p Обратите внимание, что кто-то нашел гораздо более простой и прямой способ достижения желаемого результата
Марк Гравелл
4

А как насчет того, чтобы делать

var task = source.Task;

ты делаешь это вместо

var task = source.Task.ContinueWith<Int32>( x => x.Result );

Таким образом, вы всегда добавляете одно продолжение, которое будет выполняться асинхронно, и тогда не имеет значения, хотят ли подписчики продолжение в том же контексте. Это вроде как выполнение задачи, не так ли?

Иван Златанов
источник
1
Об этом говорилось в комментариях (см. Андрей); проблема есть в том , что она заставляет все задачи иметь продолжение , когда они не имели бы, что это то , что как ContinueWithи awaitобычно стараются избегать (путем проверки уже завершены и т.д.) - и так как это заставит все на рабочих, это действительно обострит ситуацию. Это позитивная идея, и я благодарю вас за нее, но в этом сценарии она не поможет.
Марк Грейвелл
3

если вы можете и готовы использовать отражение, это должно сработать;

public static class MakeItAsync
{
    static public void TrySetAsync<T>(this TaskCompletionSource<T> source, T result)
    {
        var continuation = typeof(Task).GetField("m_continuationObject", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance);
        var continuations = (List<object>)continuation.GetValue(source.Task);

        foreach (object c in continuations)
        {
            var option = c.GetType().GetField("m_options", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance);
            var options = (TaskContinuationOptions)option.GetValue(c);

            options &= ~TaskContinuationOptions.ExecuteSynchronously;
            option.SetValue(c, options);
        }

        source.TrySetResult(result);
    }        
}
Фреду
источник
Этот взлом может просто перестать работать в следующей версии Framework.
Носератио
@Noseratio, правда, но сейчас это работает, и они могут также реализовать правильный способ сделать это в следующей версии
Фреду
Но зачем вам это нужно, если вы просто можете это сделать Task.Run(() => tcs.SetResult(result))?
Носератио
@Noseratio, я не знаю, задайте этот вопрос Марку :-), я просто снимаю флаг TaskContinuationOptions.ExecuteSynchronously во всех задачах, связанных с TaskCompletionSource, чтобы убедиться, что все они используют пул потоков вместо основного потока
Фреду
Взлом m_continuationObject на самом деле является читом, который я уже использую для выявления потенциально проблемных задач, так что это не остается без внимания. Интересно, спасибо. На данный момент это наиболее полезный вариант.
Marc Gravell
3

Обновлено , я опубликовал отдельный ответ для решения, ContinueWithа не await(потому ContinueWithчто не заботится о текущем контексте синхронизации).

Вы можете использовать тупой контекст синхронизации ввести асинхронность на продолжении инициируется вызов SetResult/SetCancelled/SetExceptionна TaskCompletionSource. Я считаю, что текущий контекст синхронизации (на данный момент await tcs.Task) - это критерии, которые TPL использует, чтобы решить, делать ли такое продолжение синхронным или асинхронным.

Для меня работает следующее:

if (notifyAsync)
{
    tcs.SetResultAsync(null);
}
else
{
    tcs.SetResult(null);
}

SetResultAsync реализуется так:

public static class TaskExt
{
    static public void SetResultAsync<T>(this TaskCompletionSource<T> tcs, T result)
    {
        FakeSynchronizationContext.Execute(() => tcs.SetResult(result));
    }

    // FakeSynchronizationContext
    class FakeSynchronizationContext : SynchronizationContext
    {
        private static readonly ThreadLocal<FakeSynchronizationContext> s_context =
            new ThreadLocal<FakeSynchronizationContext>(() => new FakeSynchronizationContext());

        private FakeSynchronizationContext() { }

        public static FakeSynchronizationContext Instance { get { return s_context.Value; } }

        public static void Execute(Action action)
        {
            var savedContext = SynchronizationContext.Current;
            SynchronizationContext.SetSynchronizationContext(FakeSynchronizationContext.Instance);
            try
            {
                action();
            }
            finally
            {
                SynchronizationContext.SetSynchronizationContext(savedContext);
            }
        }

        // SynchronizationContext methods

        public override SynchronizationContext CreateCopy()
        {
            return this;
        }

        public override void OperationStarted()
        {
            throw new NotImplementedException("OperationStarted");
        }

        public override void OperationCompleted()
        {
            throw new NotImplementedException("OperationCompleted");
        }

        public override void Post(SendOrPostCallback d, object state)
        {
            throw new NotImplementedException("Post");
        }

        public override void Send(SendOrPostCallback d, object state)
        {
            throw new NotImplementedException("Send");
        }
    }
}

SynchronizationContext.SetSynchronizationContext очень дешево с точки зрения накладных расходов. Фактически, очень похожий подход используется при реализации WPFDispatcher.BeginInvoke .

TPL сравнивает целевой контекст синхронизации в точке с контекстом awaitточки tcs.SetResult. Если контекст синхронизации одинаков (или контекст синхронизации отсутствует в обоих местах), продолжение вызывается напрямую, синхронно. В противном случае он помещается в очередь с использованием SynchronizationContext.Postцелевого контекста синхронизации, т. Е. Нормального awaitповедения. Этот подход всегда навязывает SynchronizationContext.Postповедение (или продолжение потока пула, если нет целевого контекста синхронизации).

Обновлено , это не сработает task.ContinueWith, потому ContinueWithчто не заботится о текущем контексте синхронизации. Однако он работает для await task( скрипки ). Это также работает для await task.ConfigureAwait(false).

OTOH, этот подход работает для ContinueWith.

нос
источник
Заманчиво, но изменение контекста синхронизации почти наверняка повлияет на вызывающее приложение - например, веб-приложение или приложение Windows, которое просто использует мою библиотеку, не должно обнаруживать, что контекст синхронизации меняется сотни раз в секунду.
Marc Gravell
@MarcGravell, я меняю только под объем tcs.SetResultзвонка. Таким образом, он становится атомарным и потокобезопасным, потому что само продолжение будет происходить либо в другом потоке пула, либо в исходной синхронизации. контекст записан в await tcs.Task. А SynchronizationContext.SetSynchronizationContextсам по себе очень дешевый, намного дешевле самого переключателя потоков.
Носератио
Однако это может не удовлетворить ваше второе требование: не использовать ThreadPool. С этим решением TPL действительно будет использовать ThreadPool, если не было синхронизации. context (или он был основным по умолчанию) в await tcs.Task. Но это стандартное поведение TPL.
Nosratio
Хммм ... поскольку контекст синхронизации предназначен для каждого потока, это действительно может быть жизнеспособным - и мне не нужно было бы постоянно переключать ctx - просто установите его один раз для рабочего потока. Мне нужно будет поиграть с этим
Марк Грейвелл
1
@Noseration ах, верно: было непонятно, что главное в том, что они разные . Посмотрю. Благодарю.
Марк Гравелл
3

Симулировать прерывания подход выглядел очень хорошо, но привело к TPL угона потоков в некоторых сценариях .

Затем у меня была реализация, которая была похожа на проверку объекта продолжения , но просто проверяла любое продолжение, поскольку на самом деле существует слишком много сценариев для того, чтобы данный код работал хорошо, но это означало, что даже такие вещи, как Task.Waitприводили к поиску пула потоков.

В конце концов, после проверки большого количества IL единственным безопасным и полезным сценарием является SetOnInvokeMresсценарий (продолжение с ручным сбросом и тонким событием). Есть много других сценариев:

  • некоторые небезопасны и приводят к перехвату потоков
  • остальные бесполезны, так как в конечном итоге приводят к пулу потоков

В конце концов, я решил проверить наличие ненулевого объекта-продолжения; если он равен нулю, хорошо (без продолжения); если он не равен нулю, проверка в особом случае SetOnInvokeMres- если это так: нормально (вызывать безопасно); в противном случае позвольте пулу потоков выполнить это TrySetComplete, не сообщая задаче ничего особенного, например прерывания спуфинга. Task.Waitиспользует SetOnInvokeMresподход, который представляет собой конкретный сценарий, который мы очень стараемся не выйти из тупика.

Type taskType = typeof(Task);
FieldInfo continuationField = taskType.GetField("m_continuationObject", BindingFlags.Instance | BindingFlags.NonPublic);
Type safeScenario = taskType.GetNestedType("SetOnInvokeMres", BindingFlags.NonPublic);
if (continuationField != null && continuationField.FieldType == typeof(object) && safeScenario != null)
{
    var method = new DynamicMethod("IsSyncSafe", typeof(bool), new[] { typeof(Task) }, typeof(Task), true);
    var il = method.GetILGenerator();
    var hasContinuation = il.DefineLabel();
    il.Emit(OpCodes.Ldarg_0);
    il.Emit(OpCodes.Ldfld, continuationField);
    Label nonNull = il.DefineLabel(), goodReturn = il.DefineLabel();
    // check if null
    il.Emit(OpCodes.Brtrue_S, nonNull);
    il.MarkLabel(goodReturn);
    il.Emit(OpCodes.Ldc_I4_1);
    il.Emit(OpCodes.Ret);

    // check if is a SetOnInvokeMres - if so, we're OK
    il.MarkLabel(nonNull);
    il.Emit(OpCodes.Ldarg_0);
    il.Emit(OpCodes.Ldfld, continuationField);
    il.Emit(OpCodes.Isinst, safeScenario);
    il.Emit(OpCodes.Brtrue_S, goodReturn);

    il.Emit(OpCodes.Ldc_I4_0);
    il.Emit(OpCodes.Ret);

    IsSyncSafe = (Func<Task, bool>)method.CreateDelegate(typeof(Func<Task, bool>));
Марк Гравелл
источник