阻止任务在某个线程上运行

时间:2023-04-26
本文介绍了阻止任务在某个线程上运行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着跟版网的小编来一起学习吧!

问题描述

我一直在为一些异步等待的东西苦苦挣扎.我正在使用 RabbitMQ 在某些程序之间发送/接收消息.

I have been struggling a bit with some async await stuff. I am using RabbitMQ for sending/receiving messages between some programs.

作为背景知识,RabbitMQ 客户端使用了 3 个左右的线程,我可以看到:一个连接线程和两个心跳线程.每当通过 TCP 接收到消息时,连接线程都会处理它并调用我通过接口提供的回调.文档说最好避免在此调用期间做大量工作,因为它在与连接相同的线程上完成,并且需要继续进行.他们提供了一个 QueueingBasicConsumer,它有一个阻塞的 'Dequeue' 方法,用于等待接收消息.

As a bit of background, the RabbitMQ client uses 3 or so threads that I can see: A connection thread and two heartbeat threads. Whenever a message is received via TCP, the connection thread handles it and calls a callback which I have supplied via an interface. The documentation says that it is best to avoid doing lots of work during this call since its done on the same thread as the connection and things need to continue on. They supply a QueueingBasicConsumer which has a blocking 'Dequeue' method which is used to wait for a message to be received.

我希望我的消费者能够在这段等待时间内真正释放他们的线程上下文,以便其他人可以做一些工作,所以我决定使用 async/await 任务.我编写了一个 AwaitableBasicConsumer 类,它以下列方式使用 TaskCompletionSource:

I wanted my consumers to be able to actually release their thread context during this waiting time so somebody else could do some work, so I decided to use async/await tasks. I wrote an AwaitableBasicConsumer class which uses TaskCompletionSources in the following fashion:

我有一个等待的出队方法:

public Task<RabbitMQ.Client.Events.BasicDeliverEventArgs> DequeueAsync(CancellationToken cancellationToken)
{
    //we are enqueueing a TCS. This is a "read"
    rwLock.EnterReadLock();

    try
    {
        TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs> tcs = new TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs>();

        //if we are cancelled before we finish, this will cause the tcs to become cancelled
        cancellationToken.Register(() =>
        {
            tcs.TrySetCanceled();
        });

        //if there is something in the undelivered queue, the task will be immediately completed
        //otherwise, we queue the task into deliveryTCS
        if (!TryDeliverUndelivered(tcs))
            deliveryTCS.Enqueue(tcs);
        }

        return tcs.Task;
    }
    finally
    {
        rwLock.ExitReadLock();
    }
}

rabbitmq 客户端调用的回调完成任务: 这是从 AMQP 连接线程的上下文中调用的

The callback which the rabbitmq client calls fulfills the tasks: This is called from the context of the AMQP Connection thread

public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IBasicProperties properties, byte[] body)
{
    //we want nothing added while we remove. We also block until everybody is done.
    rwLock.EnterWriteLock();
    try
    {
        RabbitMQ.Client.Events.BasicDeliverEventArgs e = new RabbitMQ.Client.Events.BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);

        bool sent = false;
        TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs> tcs;
        while (deliveryTCS.TryDequeue(out tcs))
        {
            //once we manage to actually set somebody's result, we are done with handling this
            if (tcs.TrySetResult(e))
            {
                sent = true;
                break;
            }
        }

        //if nothing was sent, we queue up what we got so that somebody can get it later.
        /**
         * Without the rwlock, this logic would cause concurrency problems in the case where after the while block completes without sending, somebody enqueues themselves. They would get the
         * next message and the person who enqueues after them would get the message received now. Locking prevents that from happening since nobody can add to the queue while we are
         * doing our thing here.
         */
        if (!sent)
        {
            undelivered.Enqueue(e);
        }
    }
    finally
    {
        rwLock.ExitWriteLock();
    }
}

rwLock 是一个 ReaderWriterLockSlim.这两个队列(deliveryTCSundelivered)是 ConcurrentQueues.

rwLock is a ReaderWriterLockSlim. The two queues (deliveryTCS and undelivered) are ConcurrentQueues.

问题:

每隔一段时间,等待出队方法的方法就会抛出一个异常.这通常不是问题,因为该方法也是 async ,因此它进入任务进入的异常"完成状态.问题出现在调用 DequeueAsync 的任务在 RabbitMQ 客户端创建的 AMQP 连接线程等待之后恢复的情况.通常我已经看到任务恢复到主线程或浮动的工作线程之一.但是,当它恢复到 AMQP 线程并引发异常时,一切都停止了.任务没有进入其异常状态",AMQP 连接线程留在说它正在执行发生异常的方法.

Every once in a while, the method that awaits the dequeue method throws an exception. This would not normally be an issue since that method is also async and so it enters the "Exception" completion state that tasks enter. The problem comes in the situation where the task that calls DequeueAsync is resumed after the await on the AMQP Connection thread that the RabbitMQ client creates. Normally I have seen tasks resume onto the main thread or one of the worker threads floating around. However, when it resumes onto the AMQP thread and an exception is thrown, everything stalls. The task does not enter its "Exception state" and the AMQP Connection thread is left saying that it is executing the method that had the exception occur.

我的主要困惑是为什么这不起作用:

My main confusion here is why this doesn't work:

var task = c.RunAsync(); //<-- This method awaits the DequeueAsync and throws an exception afterwards

ConsumerTaskState state = new ConsumerTaskState()
{
    Connection = connection,
    CancellationToken = cancellationToken
};

//if there is a problem, we execute our faulted method
//PROBLEM: If task fails when its resumed onto the AMQP thread, this method is never called
task.ContinueWith(this.OnFaulted, state, TaskContinuationOptions.OnlyOnFaulted);

这是 RunAsync 方法,为测试设置:

Here is the RunAsync method, set up for the test:

public async Task RunAsync()
{
    using (var channel = this.Connection.CreateModel())
    {
        ...
        AwaitableBasicConsumer consumer = new AwaitableBasicConsumer(channel);
        var result = consumer.DequeueAsync(this.CancellationToken);

        //wait until we find something to eat
        await result;

        throw new NotImplementeException(); //<-- the test exception. Normally this causes OnFaulted to be called, but sometimes, it stalls
        ...
    } //<-- This is where the debugger says the thread is sitting at when I find it in the stalled state
}

阅读我所写的内容,我发现我可能没有很好地解释我的问题.如果需要澄清,请询问.

Reading what I have written, I see that I may not have explained my problem very well. If clarification is needed, just ask.

我想出的解决方案如下:

My solutions that I have come up with are as follows:

  • 删除所有 Async/Await 代码,直接使用线程并阻塞.性能会有所下降,但至少有时不会停顿
  • 以某种方式使 AMQP 线程免于用于恢复任务.我假设他们正在睡觉或其他什么,然后默认的 TaskScheduler 决定使用它们.如果我能找到一种方法来告诉任务调度程序这些线程是不受限制的,那就太好了.
  • Remove all Async/Await code and just use straight up threads and block. Performance will be decreased, but at least it won't stall sometimes
  • Somehow exempt the AMQP threads from being used for resuming tasks. I assume that they were sleeping or something and then the default TaskScheduler decided to use them. If I could find a way to tell the task scheduler that those threads are off limits, that would be great.

有没有人解释为什么会发生这种情况或有任何解决这个问题的建议?现在我正在删除异步代码,只是为了让程序可靠,但我真的很想了解是什么正在这里进行.

Does anyone have an explanation for why this is happening or any suggestions to solving this? Right now I am removing the async code just so that the program is reliable, but I really want to understand what is going on here.

推荐答案

我首先建议你阅读我的async intro,它准确地解释了 await 将如何捕获上下文并使用它来恢复执行.简而言之,它将捕获当前的SynchronizationContext(如果SynchronizationContext.Currentnull,则捕获当前的TaskScheduler).

I first recommend that you read my async intro, which explains in precise terms how await will capture a context and use that to resume execution. In short, it will capture the current SynchronizationContext (or the current TaskScheduler if SynchronizationContext.Current is null).

另一个重要的细节是 async 延续是用 TaskContinuationOptions.ExecuteSynchronously 安排的(正如@svick 在评论中指出的那样).我有一篇关于此的博文,但 AFAIK它在任何地方都没有正式记录.这个细节确实让编写async生产者/消费者队列变得困难.

The other important detail is that async continuations are scheduled with TaskContinuationOptions.ExecuteSynchronously (as @svick pointed out in a comment). I have a blog post about this but AFAIK it is not officially documented anywhere. This detail does make writing an async producer/consumer queue difficult.

await 不是切换回原始上下文"的原因(可能)是因为 RabbitMQ 线程没有 SynchronizationContextTaskScheduler - 因此,当您调用 TrySetResult 时直接执行延续,因为这些线程看起来就像常规线程池线程.

The reason await isn't "switching back to the original context" is (probably) because the RabbitMQ threads don't have a SynchronizationContext or TaskScheduler - thus, the continuation is executed directly when you call TrySetResult because those threads look just like regular thread pool threads.

顺便说一句,通读您的代码,我怀疑您使用读取器/写入器锁和并发队列不正确.如果没有看到整个代码,我无法确定,但这是我的印象.

BTW, reading through your code, I suspect your use of a reader/writer lock and concurrent queues are incorrect. I can't be sure without seeing the whole code, but that's my impression.

我强烈建议您使用现有的 async 队列并围绕它构建一个使用者(换句话说,让其他人来做最困难的部分 :).BufferBlock<T> 类型输入 TPL 数据流 可以充当 async 队列;如果您的平台上有可用的 Dataflow,那将是我的第一个建议.否则,我的 AsyncEx 库中有一个 AsyncProducerConsumerQueue 类型,或者你可以自己写(正如我在博客中描述的那样).

I strongly recommend you use an existing async queue and build a consumer around that (in other words, let someone else do the hard part :). The BufferBlock<T> type in TPL Dataflow can act as an async queue; that would be my first recommendation if you have Dataflow available on your platform. Otherwise, I have an AsyncProducerConsumerQueue type in my AsyncEx library, or you could write your own (as I describe on my blog).

这是一个使用 BufferBlock<T> 的示例:

Here's an example using BufferBlock<T>:

private readonly BufferBlock<RabbitMQ.Client.Events.BasicDeliverEventArgs> _queue = new BufferBlock<RabbitMQ.Client.Events.BasicDeliverEventArgs>();

public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IBasicProperties properties, byte[] body)
{
    RabbitMQ.Client.Events.BasicDeliverEventArgs e = new RabbitMQ.Client.Events.BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
    _queue.Post(e);
}

public Task<RabbitMQ.Client.Events.BasicDeliverEventArgs> DequeueAsync(CancellationToken cancellationToken)
{
    return _queue.ReceiveAsync(cancellationToken);
}

在此示例中,我将保留您的 DequeueAsync API.但是,一旦您开始使用 TPL 数据流,请考虑在其他地方也使用它.当您需要这样的队列时,通常会发现代码的其他部分也将从数据流方法中受益.例如,您可以将 BufferBlock 链接到 ActionBlock,而不是使用一堆方法调用 DequeueAsync.

In this example, I'm keeping your DequeueAsync API. However, once you start using TPL Dataflow, consider using it elsewhere as well. When you need a queue like this, it's common to find other parts of your code that would also benefit from a dataflow approach. E.g., instead of having a bunch of methods calling DequeueAsync, you could link your BufferBlock to an ActionBlock.

这篇关于阻止任务在某个线程上运行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持跟版网!

上一篇:Masstransit 使用 RabbitMQ 性能很慢? 下一篇:RabbitMq - ConversationId 与 CorrelationId - 哪个更适合跟踪特定请求?

相关文章