我一直在为一些异步等待的东西苦苦挣扎.我正在使用 RabbitMQ 在某些程序之间发送/接收消息.
I have been struggling a bit with some async await stuff. I am using RabbitMQ for sending/receiving messages between some programs.
作为背景知识,RabbitMQ 客户端使用了 3 个左右的线程,我可以看到:一个连接线程和两个心跳线程.每当通过 TCP 接收到消息时,连接线程都会处理它并调用我通过接口提供的回调.文档说最好避免在此调用期间做大量工作,因为它在与连接相同的线程上完成,并且需要继续进行.他们提供了一个 QueueingBasicConsumer
,它有一个阻塞的 'Dequeue' 方法,用于等待接收消息.
As a bit of background, the RabbitMQ client uses 3 or so threads that I can see: A connection thread and two heartbeat threads. Whenever a message is received via TCP, the connection thread handles it and calls a callback which I have supplied via an interface. The documentation says that it is best to avoid doing lots of work during this call since its done on the same thread as the connection and things need to continue on. They supply a QueueingBasicConsumer
which has a blocking 'Dequeue' method which is used to wait for a message to be received.
我希望我的消费者能够在这段等待时间内真正释放他们的线程上下文,以便其他人可以做一些工作,所以我决定使用 async/await 任务.我编写了一个 AwaitableBasicConsumer
类,它以下列方式使用 TaskCompletionSource
:
I wanted my consumers to be able to actually release their thread context during this waiting time so somebody else could do some work, so I decided to use async/await tasks. I wrote an AwaitableBasicConsumer
class which uses TaskCompletionSource
s in the following fashion:
我有一个等待的出队方法:
public Task<RabbitMQ.Client.Events.BasicDeliverEventArgs> DequeueAsync(CancellationToken cancellationToken)
{
//we are enqueueing a TCS. This is a "read"
rwLock.EnterReadLock();
try
{
TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs> tcs = new TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs>();
//if we are cancelled before we finish, this will cause the tcs to become cancelled
cancellationToken.Register(() =>
{
tcs.TrySetCanceled();
});
//if there is something in the undelivered queue, the task will be immediately completed
//otherwise, we queue the task into deliveryTCS
if (!TryDeliverUndelivered(tcs))
deliveryTCS.Enqueue(tcs);
}
return tcs.Task;
}
finally
{
rwLock.ExitReadLock();
}
}
rabbitmq 客户端调用的回调完成任务: 这是从 AMQP 连接线程的上下文中调用的
The callback which the rabbitmq client calls fulfills the tasks: This is called from the context of the AMQP Connection thread
public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IBasicProperties properties, byte[] body)
{
//we want nothing added while we remove. We also block until everybody is done.
rwLock.EnterWriteLock();
try
{
RabbitMQ.Client.Events.BasicDeliverEventArgs e = new RabbitMQ.Client.Events.BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
bool sent = false;
TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs> tcs;
while (deliveryTCS.TryDequeue(out tcs))
{
//once we manage to actually set somebody's result, we are done with handling this
if (tcs.TrySetResult(e))
{
sent = true;
break;
}
}
//if nothing was sent, we queue up what we got so that somebody can get it later.
/**
* Without the rwlock, this logic would cause concurrency problems in the case where after the while block completes without sending, somebody enqueues themselves. They would get the
* next message and the person who enqueues after them would get the message received now. Locking prevents that from happening since nobody can add to the queue while we are
* doing our thing here.
*/
if (!sent)
{
undelivered.Enqueue(e);
}
}
finally
{
rwLock.ExitWriteLock();
}
}
rwLock
是一个 ReaderWriterLockSlim
.这两个队列(deliveryTCS
和 undelivered
)是 ConcurrentQueues.
rwLock
is a ReaderWriterLockSlim
. The two queues (deliveryTCS
and undelivered
) are ConcurrentQueues.
问题:
每隔一段时间,等待出队方法的方法就会抛出一个异常.这通常不是问题,因为该方法也是 async
,因此它进入任务进入的异常"完成状态.问题出现在调用 DequeueAsync
的任务在 RabbitMQ 客户端创建的 AMQP 连接线程等待之后恢复的情况.通常我已经看到任务恢复到主线程或浮动的工作线程之一.但是,当它恢复到 AMQP 线程并引发异常时,一切都停止了.任务没有进入其异常状态",AMQP 连接线程留在说它正在执行发生异常的方法.
Every once in a while, the method that awaits the dequeue method throws an exception. This would not normally be an issue since that method is also async
and so it enters the "Exception" completion state that tasks enter. The problem comes in the situation where the task that calls DequeueAsync
is resumed after the await on the AMQP Connection thread that the RabbitMQ client creates. Normally I have seen tasks resume onto the main thread or one of the worker threads floating around. However, when it resumes onto the AMQP thread and an exception is thrown, everything stalls. The task does not enter its "Exception state" and the AMQP Connection thread is left saying that it is executing the method that had the exception occur.
我的主要困惑是为什么这不起作用:
My main confusion here is why this doesn't work:
var task = c.RunAsync(); //<-- This method awaits the DequeueAsync and throws an exception afterwards
ConsumerTaskState state = new ConsumerTaskState()
{
Connection = connection,
CancellationToken = cancellationToken
};
//if there is a problem, we execute our faulted method
//PROBLEM: If task fails when its resumed onto the AMQP thread, this method is never called
task.ContinueWith(this.OnFaulted, state, TaskContinuationOptions.OnlyOnFaulted);
这是 RunAsync
方法,为测试设置:
Here is the RunAsync
method, set up for the test:
public async Task RunAsync()
{
using (var channel = this.Connection.CreateModel())
{
...
AwaitableBasicConsumer consumer = new AwaitableBasicConsumer(channel);
var result = consumer.DequeueAsync(this.CancellationToken);
//wait until we find something to eat
await result;
throw new NotImplementeException(); //<-- the test exception. Normally this causes OnFaulted to be called, but sometimes, it stalls
...
} //<-- This is where the debugger says the thread is sitting at when I find it in the stalled state
}
阅读我所写的内容,我发现我可能没有很好地解释我的问题.如果需要澄清,请询问.
Reading what I have written, I see that I may not have explained my problem very well. If clarification is needed, just ask.
我想出的解决方案如下:
My solutions that I have come up with are as follows:
TaskScheduler
决定使用它们.如果我能找到一种方法来告诉任务调度程序这些线程是不受限制的,那就太好了.TaskScheduler
decided to use them. If I could find a way to tell the task scheduler that those threads are off limits, that would be great.有没有人解释为什么会发生这种情况或有任何解决这个问题的建议?现在我正在删除异步代码,只是为了让程序可靠,但我真的很想了解是什么正在这里进行.
Does anyone have an explanation for why this is happening or any suggestions to solving this? Right now I am removing the async code just so that the program is reliable, but I really want to understand what is going on here.
我首先建议你阅读我的async
intro,它准确地解释了 await
将如何捕获上下文并使用它来恢复执行.简而言之,它将捕获当前的SynchronizationContext
(如果SynchronizationContext.Current
为null
,则捕获当前的TaskScheduler
).
I first recommend that you read my async
intro, which explains in precise terms how await
will capture a context and use that to resume execution. In short, it will capture the current SynchronizationContext
(or the current TaskScheduler
if SynchronizationContext.Current
is null
).
另一个重要的细节是 async
延续是用 TaskContinuationOptions.ExecuteSynchronously
安排的(正如@svick 在评论中指出的那样).我有一篇关于此的博文,但 AFAIK它在任何地方都没有正式记录.这个细节确实让编写async
生产者/消费者队列变得困难.
The other important detail is that async
continuations are scheduled with TaskContinuationOptions.ExecuteSynchronously
(as @svick pointed out in a comment). I have a blog post about this but AFAIK it is not officially documented anywhere. This detail does make writing an async
producer/consumer queue difficult.
await
不是切换回原始上下文"的原因(可能)是因为 RabbitMQ 线程没有 SynchronizationContext
或 TaskScheduler
- 因此,当您调用 TrySetResult
时直接执行延续,因为这些线程看起来就像常规线程池线程.
The reason await
isn't "switching back to the original context" is (probably) because the RabbitMQ threads don't have a SynchronizationContext
or TaskScheduler
- thus, the continuation is executed directly when you call TrySetResult
because those threads look just like regular thread pool threads.
顺便说一句,通读您的代码,我怀疑您使用读取器/写入器锁和并发队列不正确.如果没有看到整个代码,我无法确定,但这是我的印象.
BTW, reading through your code, I suspect your use of a reader/writer lock and concurrent queues are incorrect. I can't be sure without seeing the whole code, but that's my impression.
我强烈建议您使用现有的 async
队列并围绕它构建一个使用者(换句话说,让其他人来做最困难的部分 :).BufferBlock<T>
类型输入 TPL 数据流 可以充当 async
队列;如果您的平台上有可用的 Dataflow,那将是我的第一个建议.否则,我的 AsyncEx 库中有一个 AsyncProducerConsumerQueue
类型,或者你可以自己写(正如我在博客中描述的那样).
I strongly recommend you use an existing async
queue and build a consumer around that (in other words, let someone else do the hard part :). The BufferBlock<T>
type in TPL Dataflow can act as an async
queue; that would be my first recommendation if you have Dataflow available on your platform. Otherwise, I have an AsyncProducerConsumerQueue
type in my AsyncEx library, or you could write your own (as I describe on my blog).
这是一个使用 BufferBlock<T>
的示例:
Here's an example using BufferBlock<T>
:
private readonly BufferBlock<RabbitMQ.Client.Events.BasicDeliverEventArgs> _queue = new BufferBlock<RabbitMQ.Client.Events.BasicDeliverEventArgs>();
public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IBasicProperties properties, byte[] body)
{
RabbitMQ.Client.Events.BasicDeliverEventArgs e = new RabbitMQ.Client.Events.BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
_queue.Post(e);
}
public Task<RabbitMQ.Client.Events.BasicDeliverEventArgs> DequeueAsync(CancellationToken cancellationToken)
{
return _queue.ReceiveAsync(cancellationToken);
}
在此示例中,我将保留您的 DequeueAsync
API.但是,一旦您开始使用 TPL 数据流,请考虑在其他地方也使用它.当您需要这样的队列时,通常会发现代码的其他部分也将从数据流方法中受益.例如,您可以将 BufferBlock
链接到 ActionBlock
,而不是使用一堆方法调用 DequeueAsync
.
In this example, I'm keeping your DequeueAsync
API. However, once you start using TPL Dataflow, consider using it elsewhere as well. When you need a queue like this, it's common to find other parts of your code that would also benefit from a dataflow approach. E.g., instead of having a bunch of methods calling DequeueAsync
, you could link your BufferBlock
to an ActionBlock
.
这篇关于阻止任务在某个线程上运行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持跟版网!