У меня есть код библиотеки (сети сокетов), который предоставляет Task
API-интерфейс для ожидающих ответов на запросы на основе TaskCompletionSource<T>
. Однако в TPL есть недостаток, заключающийся в невозможности предотвратить синхронное продолжение. Я бы хотел либо:
- сообщить,
TaskCompletionSource<T>
что это не должно позволять вызывающим абонентам подключатьсяTaskContinuationOptions.ExecuteSynchronously
, или - установите результат (
SetResult
/TrySetResult
) таким образом, чтобы указать, что егоTaskContinuationOptions.ExecuteSynchronously
следует игнорировать, используя вместо этого пул
В частности, у меня есть проблема, заключающаяся в том, что входящие данные обрабатываются выделенным читателем, и если вызывающий абонент может подключиться, TaskContinuationOptions.ExecuteSynchronously
он может остановить читателя (что влияет не только на них). Раньше я работал над этим с помощью некоторого хакера, который обнаруживал, присутствуют ли какие-либо продолжения, и если они есть, он подталкивает завершение к ThreadPool
, однако это оказывает значительное влияние, если вызывающий объект насыщает свою рабочую очередь, поскольку завершение не будет обработано своевременно. Если они используют Task.Wait()
(или что-то подобное), они по существу зайдут в тупик. Точно так же именно поэтому читатель находится в выделенном потоке, а не использует рабочие.
Так; прежде, чем я попытаюсь придирать команду TPL: я упускаю вариант?
Ключевые моменты:
- Я не хочу, чтобы внешние абоненты могли захватить мою беседу
- Я не могу использовать его
ThreadPool
как реализацию, так как он должен работать, когда пул насыщен
В приведенном ниже примере производится вывод (порядок может варьироваться в зависимости от времени):
Continuation on: Main thread
Press [return]
Continuation on: Thread pool
Проблема заключается в том, что случайному вызывающему абоненту удалось получить продолжение в «Основном потоке». В реальном коде это прервало бы работу основного читателя; плохие вещи!
Код:
using System;
using System.Threading;
using System.Threading.Tasks;
static class Program
{
static void Identify()
{
var thread = Thread.CurrentThread;
string name = thread.IsThreadPoolThread
? "Thread pool" : thread.Name;
if (string.IsNullOrEmpty(name))
name = "#" + thread.ManagedThreadId;
Console.WriteLine("Continuation on: " + name);
}
static void Main()
{
Thread.CurrentThread.Name = "Main thread";
var source = new TaskCompletionSource<int>();
var task = source.Task;
task.ContinueWith(delegate {
Identify();
});
task.ContinueWith(delegate {
Identify();
}, TaskContinuationOptions.ExecuteSynchronously);
source.TrySetResult(123);
Console.WriteLine("Press [return]");
Console.ReadLine();
}
}
источник
TaskCompletionSource
с моим собственным API , чтобы предотвратить прямой вызовContinueWith
, так как ниTaskCompletionSource
, ниTask
не хорошо подходят для наследования от них.Task
раскрывается то, что выставлено, а неTaskCompletionSource
. Это (предоставление другого API) технически вариант, но это довольно экстремальная вещь, которую нужно делать только для этого ... Я не уверен, что это оправдывает этоThreadPool
для этого (о чем я уже упоминал - это вызывает проблемы), либо у вас есть выделенный поток «ожидающих продолжения», и тогда они (продолжения сExecuteSynchronously
указанным) могут захватить это один вместо этого - что вызывает точно такую же проблему, потому что это означает, что продолжение для других сообщений может быть остановлено, что снова влияет на несколько абонентовОтветы:
Новое в .NET 4.6:
.NET 4.6 содержит новый
TaskCreationOptions
:RunContinuationsAsynchronously
.Поскольку вы хотите использовать Reflection для доступа к закрытым полям ...
Вы можете пометить Задачу TCS с помощью
TASK_STATE_THREAD_WAS_ABORTED
флага, что приведет к тому, что все продолжения не будут встроены.const int TASK_STATE_THREAD_WAS_ABORTED = 134217728; var stateField = typeof(Task).GetField("m_stateFlags", BindingFlags.NonPublic | BindingFlags.Instance); stateField.SetValue(task, (int) stateField.GetValue(task) | TASK_STATE_THREAD_WAS_ABORTED);
Редактировать:
Вместо использования Reflection emit я предлагаю вам использовать выражения. Это гораздо более читабельно и имеет то преимущество, что он совместим с PCL:
var taskParameter = Expression.Parameter(typeof (Task)); const string stateFlagsFieldName = "m_stateFlags"; var setter = Expression.Lambda<Action<Task>>( Expression.Assign(Expression.Field(taskParameter, stateFlagsFieldName), Expression.Or(Expression.Field(taskParameter, stateFlagsFieldName), Expression.Constant(TASK_STATE_THREAD_WAS_ABORTED))), taskParameter).Compile();
Без использования Reflection:
Если кому-то интересно, я нашел способ сделать это без Reflection, но он тоже немного «грязный» и, конечно, несет в себе немалый штраф за производительность:
try { Thread.CurrentThread.Abort(); } catch (ThreadAbortException) { source.TrySetResult(123); Thread.ResetAbort(); }
источник
TaskCreationOptions.DoNotInline
- и даже не потребовалось бы изменение подписи ctor наTaskCompletionSource
ILGenerator
etc: github.com/StackExchange/StackExchange.Redis/blob/master/…Я не думаю, что в TPL есть что-то, что могло бы явный API-контроль над
TaskCompletionSource.SetResult
продолжениями. Я решил оставить свой первоначальный ответ для контроля этого поведения наasync/await
сценариев.Вот еще одно решение, которое требует асинхронности
ContinueWith
, еслиtcs.SetResult
-triggered продолжение происходит в том же потоке, которыйSetResult
был вызван:public static class TaskExt { static readonly ConcurrentDictionary<Task, Thread> s_tcsTasks = new ConcurrentDictionary<Task, Thread>(); // SetResultAsync static public void SetResultAsync<TResult>( this TaskCompletionSource<TResult> @this, TResult result) { s_tcsTasks.TryAdd(@this.Task, Thread.CurrentThread); try { @this.SetResult(result); } finally { Thread thread; s_tcsTasks.TryRemove(@this.Task, out thread); } } // ContinueWithAsync, TODO: more overrides static public Task ContinueWithAsync<TResult>( this Task<TResult> @this, Action<Task<TResult>> action, TaskContinuationOptions continuationOptions = TaskContinuationOptions.None) { return @this.ContinueWith((Func<Task<TResult>, Task>)(t => { Thread thread = null; s_tcsTasks.TryGetValue(t, out thread); if (Thread.CurrentThread == thread) { // same thread which called SetResultAsync, avoid potential deadlocks // using thread pool return Task.Run(() => action(t)); // not using thread pool (TaskCreationOptions.LongRunning creates a normal thread) // return Task.Factory.StartNew(() => action(t), TaskCreationOptions.LongRunning); } else { // continue on the same thread var task = new Task(() => action(t)); task.RunSynchronously(); return Task.FromResult(task); } }), continuationOptions).Unwrap(); } }
Обновлено с учетом комментария:
Я не знал, что вы не контролируете звонящего. Тем не менее, если вы не контролируете его, вы, вероятно, также не передаете
TaskCompletionSource
объект непосредственно вызывающей стороне. По логике вещей, вы должны передать его токен , т.е.tcs.Task
. В этом случае решение может быть еще проще, если добавить еще один метод расширения к приведенному выше:// ImposeAsync, TODO: more overrides static public Task<TResult> ImposeAsync<TResult>(this Task<TResult> @this) { return @this.ContinueWith(new Func<Task<TResult>, Task<TResult>>(antecedent => { Thread thread = null; s_tcsTasks.TryGetValue(antecedent, out thread); if (Thread.CurrentThread == thread) { // continue on a pool thread return antecedent.ContinueWith(t => t, TaskContinuationOptions.None).Unwrap(); } else { return antecedent; } }), TaskContinuationOptions.ExecuteSynchronously).Unwrap(); }
Использование:
// library code var source = new TaskCompletionSource<int>(); var task = source.Task.ImposeAsync(); // ... // client code task.ContinueWith(delegate { Identify(); }, TaskContinuationOptions.ExecuteSynchronously); // ... // library code source.SetResultAsync(123);
На самом деле это работает как для
await
andContinueWith
( fiddle ), так и без рефлексии.источник
А как насчет того, чтобы делать
var task = source.Task;
ты делаешь это вместо
var task = source.Task.ContinueWith<Int32>( x => x.Result );
Таким образом, вы всегда добавляете одно продолжение, которое будет выполняться асинхронно, и тогда не имеет значения, хотят ли подписчики продолжение в том же контексте. Это вроде как выполнение задачи, не так ли?
источник
ContinueWith
иawait
обычно стараются избегать (путем проверки уже завершены и т.д.) - и так как это заставит все на рабочих, это действительно обострит ситуацию. Это позитивная идея, и я благодарю вас за нее, но в этом сценарии она не поможет.если вы можете и готовы использовать отражение, это должно сработать;
public static class MakeItAsync { static public void TrySetAsync<T>(this TaskCompletionSource<T> source, T result) { var continuation = typeof(Task).GetField("m_continuationObject", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance); var continuations = (List<object>)continuation.GetValue(source.Task); foreach (object c in continuations) { var option = c.GetType().GetField("m_options", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance); var options = (TaskContinuationOptions)option.GetValue(c); options &= ~TaskContinuationOptions.ExecuteSynchronously; option.SetValue(c, options); } source.TrySetResult(result); } }
источник
Task.Run(() => tcs.SetResult(result))
?Обновлено , я опубликовал отдельный ответ для решения,
ContinueWith
а неawait
(потомуContinueWith
что не заботится о текущем контексте синхронизации).Вы можете использовать тупой контекст синхронизации ввести асинхронность на продолжении инициируется вызов
SetResult/SetCancelled/SetException
наTaskCompletionSource
. Я считаю, что текущий контекст синхронизации (на данный моментawait tcs.Task
) - это критерии, которые TPL использует, чтобы решить, делать ли такое продолжение синхронным или асинхронным.Для меня работает следующее:
if (notifyAsync) { tcs.SetResultAsync(null); } else { tcs.SetResult(null); }
SetResultAsync
реализуется так:public static class TaskExt { static public void SetResultAsync<T>(this TaskCompletionSource<T> tcs, T result) { FakeSynchronizationContext.Execute(() => tcs.SetResult(result)); } // FakeSynchronizationContext class FakeSynchronizationContext : SynchronizationContext { private static readonly ThreadLocal<FakeSynchronizationContext> s_context = new ThreadLocal<FakeSynchronizationContext>(() => new FakeSynchronizationContext()); private FakeSynchronizationContext() { } public static FakeSynchronizationContext Instance { get { return s_context.Value; } } public static void Execute(Action action) { var savedContext = SynchronizationContext.Current; SynchronizationContext.SetSynchronizationContext(FakeSynchronizationContext.Instance); try { action(); } finally { SynchronizationContext.SetSynchronizationContext(savedContext); } } // SynchronizationContext methods public override SynchronizationContext CreateCopy() { return this; } public override void OperationStarted() { throw new NotImplementedException("OperationStarted"); } public override void OperationCompleted() { throw new NotImplementedException("OperationCompleted"); } public override void Post(SendOrPostCallback d, object state) { throw new NotImplementedException("Post"); } public override void Send(SendOrPostCallback d, object state) { throw new NotImplementedException("Send"); } } }
SynchronizationContext.SetSynchronizationContext
очень дешево с точки зрения накладных расходов. Фактически, очень похожий подход используется при реализации WPFDispatcher.BeginInvoke
.TPL сравнивает целевой контекст синхронизации в точке с контекстом
await
точкиtcs.SetResult
. Если контекст синхронизации одинаков (или контекст синхронизации отсутствует в обоих местах), продолжение вызывается напрямую, синхронно. В противном случае он помещается в очередь с использованиемSynchronizationContext.Post
целевого контекста синхронизации, т. Е. Нормальногоawait
поведения. Этот подход всегда навязываетSynchronizationContext.Post
поведение (или продолжение потока пула, если нет целевого контекста синхронизации).Обновлено , это не сработает
task.ContinueWith
, потомуContinueWith
что не заботится о текущем контексте синхронизации. Однако он работает дляawait task
( скрипки ). Это также работает дляawait task.ConfigureAwait(false)
.OTOH, этот подход работает для
ContinueWith
.источник
tcs.SetResult
звонка. Таким образом, он становится атомарным и потокобезопасным, потому что само продолжение будет происходить либо в другом потоке пула, либо в исходной синхронизации. контекст записан вawait tcs.Task
. АSynchronizationContext.SetSynchronizationContext
сам по себе очень дешевый, намного дешевле самого переключателя потоков.ThreadPool
. С этим решением TPL действительно будет использоватьThreadPool
, если не было синхронизации. context (или он был основным по умолчанию) вawait tcs.Task
. Но это стандартное поведение TPL.Симулировать прерывания подход выглядел очень хорошо, но привело к TPL угона потоков в некоторых сценариях .
Затем у меня была реализация, которая была похожа на проверку объекта продолжения , но просто проверяла любое продолжение, поскольку на самом деле существует слишком много сценариев для того, чтобы данный код работал хорошо, но это означало, что даже такие вещи, как
Task.Wait
приводили к поиску пула потоков.В конце концов, после проверки большого количества IL единственным безопасным и полезным сценарием является
SetOnInvokeMres
сценарий (продолжение с ручным сбросом и тонким событием). Есть много других сценариев:В конце концов, я решил проверить наличие ненулевого объекта-продолжения; если он равен нулю, хорошо (без продолжения); если он не равен нулю, проверка в особом случае
SetOnInvokeMres
- если это так: нормально (вызывать безопасно); в противном случае позвольте пулу потоков выполнить этоTrySetComplete
, не сообщая задаче ничего особенного, например прерывания спуфинга.Task.Wait
используетSetOnInvokeMres
подход, который представляет собой конкретный сценарий, который мы очень стараемся не выйти из тупика.Type taskType = typeof(Task); FieldInfo continuationField = taskType.GetField("m_continuationObject", BindingFlags.Instance | BindingFlags.NonPublic); Type safeScenario = taskType.GetNestedType("SetOnInvokeMres", BindingFlags.NonPublic); if (continuationField != null && continuationField.FieldType == typeof(object) && safeScenario != null) { var method = new DynamicMethod("IsSyncSafe", typeof(bool), new[] { typeof(Task) }, typeof(Task), true); var il = method.GetILGenerator(); var hasContinuation = il.DefineLabel(); il.Emit(OpCodes.Ldarg_0); il.Emit(OpCodes.Ldfld, continuationField); Label nonNull = il.DefineLabel(), goodReturn = il.DefineLabel(); // check if null il.Emit(OpCodes.Brtrue_S, nonNull); il.MarkLabel(goodReturn); il.Emit(OpCodes.Ldc_I4_1); il.Emit(OpCodes.Ret); // check if is a SetOnInvokeMres - if so, we're OK il.MarkLabel(nonNull); il.Emit(OpCodes.Ldarg_0); il.Emit(OpCodes.Ldfld, continuationField); il.Emit(OpCodes.Isinst, safeScenario); il.Emit(OpCodes.Brtrue_S, goodReturn); il.Emit(OpCodes.Ldc_I4_0); il.Emit(OpCodes.Ret); IsSyncSafe = (Func<Task, bool>)method.CreateDelegate(typeof(Func<Task, bool>));
источник