RabbitMq - ConversationId 与 CorrelationId - 哪个更适合跟踪特定请求?

时间:2023-04-26
本文介绍了RabbitMq - ConversationId 与 CorrelationId - 哪个更适合跟踪特定请求?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着跟版网的小编来一起学习吧!

问题描述

RabbitMQ 似乎有两个非常相似的属性,我并不完全理解其中的区别.ConversationIdCorrelationId.

RabbitMQ seems to have two properties that are very similar, and I don't entirely understand the difference. ConversationId and CorrelationId.

我的用例如下.我有一个生成 Guid 的网站.该网站调用一个 API,将该唯一标识符添加到 HttpRequest 标头中.这反过来将消息发布到 RabbitMQ.该消息由第一个消费者处理并在其他地方传递给另一个消费者,依此类推.

My use case is as follows. I have a website that generates a Guid. The website calls an API, adding that unique identifier to the HttpRequest headers. This in turn publishes a message to RabbitMQ. That message is processed by the first consumer and passed off elsewhere to another consumer, and so on.

出于记录目的,我想记录一个将初始请求与所有后续操作联系在一起的标识符.这对于整个应用程序不同部分的旅程来说应该是独一无二的.因此.当登录到像 Serilog/ElasticSearch 这样的东西时,就可以很容易地看到哪个请求触发了初始请求,并且整个应用程序中该请求的所有日志条目都可以关联在一起.

For logging purposes I want to log an identifier that ties the initial request together with all of the subsequent actions. This should be unique for that journey throughout the different parts of the application. Hence. When logged to something like Serilog/ElasticSearch, this then becomes easy to see which request triggered the initial request, and all of the log entries for that request throughout the application can be correlated together.

我创建了一个提供程序,它查看传入的 HttpRequest 以获取标识符.我将其称为CorrelationId",但我开始怀疑是否真的应该将其命名为ConversationId".就 RabbitMQ 而言,ConversationId"的概念更适合这个模型,还是CorrelationId"更好?

I have created a provider that looks at the incoming HttpRequest for an identifier. I've called this a "CorrelationId", but I'm starting to wonder if this should really be named a "ConversationId". In terms of RabbitMQ, does the idea of a "ConversationId" fit better to this model, or is "CorrelationId" better?

这两个概念有什么区别?

What is the difference between the two concepts?

在代码方面,我希望执行以下操作.首先在我的 API 中注册总线并配置 SendPublish 以使用提供者提供的 CorrelationId.

In terms of code, I've looked to do the following. Firstly register the bus in my API and configure the SendPublish to use the CorrelationId from the provider.

// bus registration in the API
var busSettings = context.Resolve<BusSettings>();
// using AspNetCoreCorrelationIdProvider
var correlationIdProvider = context.Resolve<ICorrelationIdProvider>();

var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    cfg.Host(
        new Uri(busSettings.HostAddress),
        h =>
        {
            h.Username(busSettings.Username);
            h.Password(busSettings.Password);
        });
    cfg.ConfigurePublish(x => x.UseSendExecute(sendContext =>
    {
        // which one is more appropriate
        //sendContext.ConversationId = correlationIdProvider.GetCorrelationId();
        sendContext.CorrelationId = correlationIdProvider.GetCorrelationId();
    }));
});

作为参考,这是我的简单提供者界面

For reference, this is my simple provider interface

// define the interface
public interface ICorrelationIdProvider
{
    Guid GetCorrelationId();
}

还有 AspNetCore 实现,它提取调用客户端(即网站)设置的唯一 ID.

And the AspNetCore implementation, which extracts the unique ID set by the calling client (i.e. a website).

public class AspNetCoreCorrelationIdProvider : ICorrelationIdProvider
{
    private IHttpContextAccessor _httpContextAccessor;

    public AspNetCoreCorrelationIdProvider(IHttpContextAccessor httpContextAccessor)
    {
        _httpContextAccessor = httpContextAccessor;
    }

    public Guid GetCorrelationId()
    {
        if (_httpContextAccessor.HttpContext.Request.Headers.TryGetValue("correlation-Id", out StringValues headers))
        {
            var header = headers.FirstOrDefault();
            if (Guid.TryParse(header, out Guid headerCorrelationId))
            {
                return headerCorrelationId;
            }
        }

        return Guid.NewGuid();
    }
}

最后,我的服务主机是简单的 Windows 服务应用程序,它们坐下来使用已发布的消息.他们使用以下内容来获取 CorrelationId,并且很可能会在其他服务主机中发布给其他消费者.

Finally, my Service hosts are simple windows service applications that sit and consume published messages. They use the following to grab the CorrelationId and might well publish to other consumers as well in other service hosts.

public class MessageContextCorrelationIdProvider : ICorrelationIdProvider
{
    /// <summary>
    /// The consume context
    /// </summary>
    private readonly ConsumeContext _consumeContext;

    /// <summary>
    /// Initializes a new instance of the <see cref="MessageContextCorrelationIdProvider"/> class.
    /// </summary>
    /// <param name="consumeContext">The consume context.</param>
    public MessageContextCorrelationIdProvider(ConsumeContext consumeContext)
    {
        _consumeContext = consumeContext;
    }

    /// <summary>
    /// Gets the correlation identifier.
    /// </summary>
    /// <returns></returns>
    public Guid GetCorrelationId()
    {
        // correlationid or conversationIs?
        if (_consumeContext.CorrelationId.HasValue && _consumeContext.CorrelationId != Guid.Empty)
        {
            return _consumeContext.CorrelationId.Value;
        }

        return Guid.NewGuid();
    }
}

然后我的消费者中有一个记录器,它使用该提供程序来提取 CorrelationId:

I then have a logger in my consumer that uses that provider to extract the CorrelationId:

public async Task Consume(ConsumeContext<IMyEvent> context)
{
    var correlationId = _correlationProvider.GetCorrelationId();
    _logger.Info(correlationId, $"#### IMyEvent received for customer:{context.Message.CustomerId}");

    try
    {
        await _mediator.Send(new SomeOtherRequest(correlationId) { SomeObject: context.Message.SomeObject });
    }
    catch (Exception e)
    {
        _logger.Exception(e, correlationId, $"Exception:{e}");
        throw;
    }

    _logger.Info(correlationId, $"Finished processing: {DateTime.Now}");
}

阅读 docs,关于ConversationId"的内容如下:

Reading the docs, it says the following about a "ConversationId":

对话是由发送的第一条消息创建的,或者已发布,其中没有可用的现有上下文(例如,当消息通过使用 IBus.Send 或 IBus.Publish 发送或发布).如果现有上下文用于发送或发布消息,ConversationId 被复制到新消息中,确保一组同一对话中的消息具有相同的标识符.

The conversation is created by the first message that is sent or published, in which no existing context is available (such as when a message is sent or published by using IBus.Send or IBus.Publish). If an existing context is used to send or publish a message, the ConversationId is copied to the new message, ensuring that a set of messages within the same conversation have the same identifier.

现在我开始认为我的术语混淆了,从技术上讲,这是一次对话(尽管对话"就像电话游戏").

Now I'm starting to think that I've got my terminology mixed up, and technically this is a conversation (although the 'conversation' is like 'the telephone game').

那么,在这个用例中是 CorrelationId,还是 ConversationId?请帮助我正确使用我的术语!

So, CorrelationId in this use case, or ConversationId? Please help me get my terminology right!!

推荐答案

在消息对话中(提示不祥的乐谱),可以有一条消息(我告诉你做某事,或者我告诉所有正在听的人发生了某事)或多条消息(我告诉你做某事,而你告诉其他人,或者我告诉所有在听的人发生了某事,而这些听众告诉他们的朋友,等等).

In a message conversation (cue foreboding musical score), there can be a single message (I told you to do something, or I told everyone who is listening that something happened) or multiple messages (I told you to do something, and you told someone else, or I told everyone who is listening that something happened and those listeners told their friends, and so on, and so on).

使用 MassTransit,从第一条消息到最后一条消息,如果使用得当,这些消息中的每一条都将具有相同的 ConversationId.MassTransit 将属性从 ConsumeContext 复制到消息消费期间的每条传出消息,未经修改.这使得所有内容都成为同一 trace 的一部分 - 一次对话.

Using MassTransit, from the first message to the final message, used properly, every single one of those messages would have the same ConversationId. MassTransit copies the property from ConsumeContext, unmodified, to every outgoing message during the consumption of a message. This makes everything part of the same trace - a conversation.

但是,MassTransit 默认不设置 CorrelationId.如果消息属性名为 CorrelationId(或 CommandId,或 EventId),则可以自动设置它,或者您也可以添加自己的名称.

The CorrelationId, however, is not set by default by MassTransit. It can be automatically set if a message property is named CorrelationId (or CommandId, or EventId), or you can add your own names too.

如果 CorrelationId 出现在消费消息中,则任何传出消息都会将该 CorrelationId 属性复制到 InitiatorId 属性(因果关系 - 消费消息启动了后续消息的创建).这形成了一个链(或跟踪术语中的跨度),可以跟踪该链以显示从初始消息开始的消息传播.

If the CorrelationId is present on a consumed message, any outgoing messages will have that CorrelationId property copied to the InitiatorId property (cause, and effect -- the consumed message initiated the creation of the subsequent messages). This forms a chain (or span, in trace terminology) that can be followed to show the spread of messages from the initial message.

应该将 CorrelationId 视为命令或事件的标识符,以便可以在整个系统日志中看到该命令的效果.

The CorrelationId should be thought of as the identifier for a command or event, such that the effects of that command can be seen throughout the system logs.

在我看来,您来自 HTTP 的输入可能是 Initiator,因此将该标识符复制到 InitiatorId 并为消息创建一个新的 CorrelationId,或者您可能只想对初始 CorrelationId 使用相同的标识符并让后续消息使用它作为发起者.

It sounds to me like your input from HTTP might be the Initiator, and thus copy that identifier into InitiatorId and create a new CorrelationId for the message, or you may want to just use the same identifier for the initial CorrelationId and let the subsequent messages use it as the initiator.

这篇关于RabbitMq - ConversationId 与 CorrelationId - 哪个更适合跟踪特定请求?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持跟版网!

上一篇:阻止任务在某个线程上运行 下一篇:RabbitMQ 连接错误没有一个指定的端点是可达的"

相关文章