Skip to content

Messaging & Distributed Events

This page describes how the Cargonerds solution (ABP 10.x / .NET 10) does asynchronous, message-based communication:

  • ABP's distributed event bus over RabbitMQ for cross-module integration events and ABP's own multi-tenant lifecycle events.
  • ABP's local (in-process) event bus for entity-change reactions inside a single process.
  • A direct Azure Service Bus consumer (raw Azure.Messaging.ServiceBus, not the ABP event bus) that ingests change notifications from the external "Spark" cloud system.

Scope

This page is about messaging. Background processing (Hangfire jobs and timer-based workers) is closely related but documented separately — see Background Jobs & Workers. Caching (Redis / EF second-level) is on Caching.


Concept

ABP ships two event buses:

Bus Interface(s) Transport Scope
Local event bus ILocalEventBus, ILocalEventHandler<T> in-process (method calls) one process; lost on restart
Distributed event bus IDistributedEventBus, IDistributedEventHandler<T> pluggable provider across processes / services

By default the distributed bus uses an in-memory provider that simply forwards to the local bus. Installing the Volo.Abp.EventBus.RabbitMQ integration replaces that provider so published events are serialized and routed through a RabbitMQ exchange, and handlers in any connected process receive them. See the ABP docs for the distributed event bus and the local event bus.

In this codebase:

  • The API host wires the RabbitMQ provider, so IDistributedEventBus is RabbitMQ-backed app-wide.
  • Messages are plain C# classes — by ABP convention an ETO ("Event Transfer Object"). They carry only data, no behaviour.
  • Handlers implement IDistributedEventHandler<TEto> (or ILocalEventHandler<T>) and are auto-registered via ABP's dependency injection when they also implement ITransientDependency.
flowchart LR
    subgraph host["API Host process (CargonerdsHttpApiHostModule)"]
        pub["CommentPublicAppService<br/>IDistributedEventBus.PublishAsync"]
        localpub["EF Core change tracking<br/>(EntityCreated / EntityChanged)"]
        localh1["UserRegisteredEventHandler<br/>ILocalEventHandler"]
        localh2["ApiKeyLocalEventHandler<br/>ILocalEventHandler"]
        dh1["CommentStatusChangedEventHandler<br/>(Pricing) IDistributedEventHandler"]
        tenant["CargonerdsTenantDatabaseMigrationHandler<br/>IDistributedEventHandler"]
    end

    pub -->|"CommentExtraPropertiesUpdatedEto"| rmq[("RabbitMQ exchange<br/>'Cargonerds'")]
    rmq -->|"CommentExtraPropertiesUpdatedEto"| dh1
    rmq -.->|"TenantCreatedEto / ApplyDatabaseMigrationsEto<br/>(ABP framework ETOs)"| tenant

    localpub -->|in-process| localh1
    localpub -->|in-process| localh2

    sparkcloud[["Spark cloud<br/>(external system)"]] -->|"abp-notifications topic"| sb[["Azure Service Bus"]]
    sb -->|"ServiceBusAbpNotificationEto"| consumer["ServiceBusNotificationConsumer<br/>(raw BackgroundService)"]
    consumer --> notif["NotificationSenderService"]

Three different message channels — don't conflate them

  1. RabbitMQ (ABP IDistributedEventBus) — internal app integration events.
  2. Local bus (ILocalEventBus) — in-process only, never touches RabbitMQ.
  3. Azure Service Bus (ServiceBusNotificationConsumer) — a hand-rolled consumer using the Azure SDK directly. Despite the topic being named abp-notifications and the DTO ServiceBusAbpNotificationEto, it is not ABP's event bus and does not use RabbitMQ.

RabbitMQ distributed event bus

Where it is wired

The RabbitMQ provider is enabled by depending on AbpEventBusRabbitMqModule in the API host module:

src/Cargonerds.HttpApi.Host/CargonerdsHttpApiHostModule.cs

[DependsOn(
    // ...
    typeof(AbpEventBusRabbitMqModule),
    // ...
)]
public class CargonerdsHttpApiHostModule : AbpModule
{
    public override void ConfigureServices(ServiceConfigurationContext context)
    {
        var configuration = context.Services.GetConfiguration();
        // ...
        ConfigureRabbitMQ(configuration);
        // ...
    }
}

This single [DependsOn] is what makes every IDistributedEventBus in the host RabbitMQ-backed.

Configuration keys

ABP's RabbitMQ module reads the standard RabbitMQ configuration section by convention. The base config:

src/Cargonerds.HttpApi.Host/appsettings.json

"RabbitMQ": {
  "Connections": {
    "Default": {
      "HostName": "localhost"
    }
  },
  "EventBus": {
    "ClientName": "HttpApiHost",
    "ExchangeName": "Cargonerds"
  }
}
Key Meaning
RabbitMQ:Connections:Default:HostName RabbitMQ host (or, when overridden, a full URI — see below).
RabbitMQ:Connections:Default:Override Custom flag (not standard ABP) that switches on URI parsing. Absent ⇒ literal host.
RabbitMQ:EventBus:ClientName Per-application client/queue identity. Here HttpApiHost.
RabbitMQ:EventBus:ExchangeName The RabbitMQ exchange events are published to / consumed from. Here Cargonerds.

The EventBus section (ClientName / ExchangeName) is defined only in the base appsettings.json, so the exchange name Cargonerds and client name HttpApiHost are the same in every environment.

The conditional connection override

ConfigureRabbitMQ only rewrites AbpRabbitMqOptions when Override=true and the host string parses as a URI:

src/Cargonerds.HttpApi.Host/CargonerdsHttpApiHostModule.cs

private void ConfigureRabbitMQ(IConfiguration configuration)
{
    var rabbitMqOptions = configuration.GetValue<string>("RabbitMQ:Connections:Default:HostName");
    var isOverridden = configuration.GetValue<bool>("RabbitMQ:Connections:Default:Override");

    if (isOverridden && Uri.TryCreate(rabbitMqOptions, UriKind.RelativeOrAbsolute, out Uri? uri))
    {
        var factory = new ConnectionFactory { Uri = uri };
        Configure<AbpRabbitMqOptions>(options =>
        {
            options.Connections.Default.UserName = factory.UserName;
            options.Connections.Default.Password = factory.Password;
            options.Connections.Default.HostName = factory.HostName;
            options.Connections.Default.Port = factory.Port;
        });
    }
}

The override exists for .NET Aspire, which injects a full AMQP connection URI rather than a bare hostname:

src/Cargonerds.HttpApi.Host/appsettings.aspire.json

"RabbitMQ": {
  "Connections": {
    "Default": {
      "HostName": "{messaging}",
      "Override": true
    }
  }
}

At runtime Aspire replaces {messaging} with the AMQP URI of the RabbitMQ resource (amqp://user:pass@host:port). ConfigureRabbitMQ then uses RabbitMQ.Client.ConnectionFactory to split that URI into the individual AbpRabbitMqOptions fields (user/password/host/port).

Two distinct code paths

  • Plain run (appsettings.json, no Override): ABP uses the literal HostName (localhost). The URI-parsing branch is skipped.
  • Aspire run (appsettings.aspire.json, Override: true): the injected URI is parsed into AbpRabbitMqOptions.

The URI-parsing path is therefore Aspire-specific. If you point HostName at a non-Aspire broker, leave Override unset.

Aspire resource

The RabbitMQ broker itself is declared as an Aspire resource. See Aspire Integration for the full topology.

src/Cargonerds.AppHost/Program.cs

var rabbitmq = builder
    .AddRabbitMQ(CargonerdsConsts.Aspire.Service.Messaging.Name, password: unsecurePassword)
    .WithManagementPlugin()
    .WithExternalHttpEndpoints()
    .WithLifetime(ContainerLifetime.Persistent)
    .IfRunMode(c => c.WithContainerName("spark-rabbitmq"))
    .PublishAsContainer();

Projects then receive the connection via .WithReference(rabbitmq), which is what populates the {messaging} placeholder above. ABP's RabbitMQ integration broadly follows the pattern described in the ABP .NET Aspire integration docs.


Event types (ETOs) and handlers

Custom integration event: CommentExtraPropertiesUpdatedEto

The one application-defined event that flows over RabbitMQ in this solution links the Comments feature (Cargonerds host module, built on CmsKit) to the Pricing module.

The ETO is a plain class with no [EventName] attribute (ABP derives the event name from the type):

src/Cargonerds.Domain.Shared/Comments/CommentExtraPropertiesUpdatedEto.cs

public class CommentExtraPropertiesUpdatedEto
{
    public CommentDto Comment { get; set; }
    public Guid CommentId { get; set; }
    public string? ReferenceNumber { get; set; }
    public CommentStatus? Status { get; set; }
}

PublisherCommentPublicAppService (which overrides CmsKit's CommentPublicAppService via [Dependency(ReplaceServices = true)] + [ExposeServices(...)]) publishes it whenever a comment is created or updated with a reference number or status:

src/Cargonerds.Application/Services/CommentPublicAppService.cs

private async Task PublishEvent(Guid id, CommentDto dto, string? referenceNumber, CommentStatus? status)
{
    await DistributedEventBus.PublishAsync(
        new CommentExtraPropertiesUpdatedEto
        {
            CommentId = id,
            Comment = dto,
            ReferenceNumber = referenceNumber,
            Status = status,
        }
    );
}

ConsumerCommentStatusChangedEventHandler in the Pricing module recomputes a Quotation.Status from the aggregated comment statuses of a quotation request:

modules/pricing/src/Pricing.Domain/EventHandler/CommentStatusChangedEventHandler.cs

public class CommentStatusChangedEventHandler(
    ICommentRepository commentRepository,
    IHubRepository<Hub.Entities.Pricing.Quotation, Guid> quotationRepository
) : IDistributedEventHandler<CommentExtraPropertiesUpdatedEto>, ITransientDependency
{
    public async Task HandleEventAsync(CommentExtraPropertiesUpdatedEto eventData)
    {
        if (eventData.Comment.EntityType != PricingConsts.QuotationRequest)
            return;
        // ...recompute and persist Quotation.Status...
    }
}

This is the canonical cross-module decoupling pattern: the Cargonerds host module does not reference Pricing; it only publishes an event. See also Modules: Pricing.

Framework events: tenant lifecycle (multi-tenancy)

ABP's own multi-tenancy module publishes distributed events when tenants change. This solution consumes them to migrate/seed per-tenant databases:

src/Cargonerds.Domain/Data/CargonerdsTenantDatabaseMigrationHandler.cs

public class CargonerdsTenantDatabaseMigrationHandler
    : IDistributedEventHandler<TenantCreatedEto>,
        IDistributedEventHandler<TenantConnectionStringUpdatedEto>,
        IDistributedEventHandler<ApplyDatabaseMigrationsEto>,
        ITransientDependency
{
    public async Task HandleEventAsync(TenantCreatedEto eventData) { /* migrate + seed */ }
    public async Task HandleEventAsync(TenantConnectionStringUpdatedEto eventData) { /* ... */ }
    public async Task HandleEventAsync(ApplyDatabaseMigrationsEto eventData) { /* ... */ }
}

These ETOs (TenantCreatedEto, TenantConnectionStringUpdatedEto, ApplyDatabaseMigrationsEto) come from ABP (Volo.Abp.MultiTenancy / data namespaces), not from this codebase. With RabbitMQ enabled, a tenant created in one process can trigger migration in another. For background on multi-tenancy see the ABP multi-tenancy docs.

Local (in-process) event handlers

These use ILocalEventHandler<T> and never touch RabbitMQ. They react to ABP's automatically-published entity-change events.

Handler Listens to Action File
UserRegisteredEventHandler EntityCreatedEventData<IdentityUser> Adds each new identity user to the default organization unit. src/Cargonerds.Domain/EventHandler/UserRegisteredEventHandler.cs
ApiKeyLocalEventHandler EntityChangedEventData<ApiKey> Evicts the API-key cache entries (by prefix and id) when an API key changes. src/Cargonerds.Domain/ApiKeys/DefaultApiKeyStore.cs
public class UserRegisteredEventHandler(IOrganizationUnitRepository ouRepository, IdentityUserManager userManager)
    : ILocalEventHandler<EntityCreatedEventData<IdentityUser>>,
        ITransientDependency
{
    public async Task HandleEventAsync(EntityCreatedEventData<IdentityUser> eventData)
    {
        var defaultUnit = (await ouRepository.GetDbSetAsync())
            .FirstOrDefault(ou => ou.DisplayName == OrganizationUnitConstants.DefaultName);
        // ...add user to defaultUnit if it has no units yet...
    }
}

EntityCreatedEventData<T> / EntityChangedEventData<T> are raised by ABP's change tracking — see DDD: domain events / local event bus.

Summary of message contracts

Event / DTO Bus Published by Handled by
CommentExtraPropertiesUpdatedEto RabbitMQ (distributed) CommentPublicAppService (this app) CommentStatusChangedEventHandler (Pricing)
TenantCreatedEto, TenantConnectionStringUpdatedEto, ApplyDatabaseMigrationsEto RabbitMQ (distributed) ABP multi-tenancy (framework) CargonerdsTenantDatabaseMigrationHandler
EntityCreatedEventData<IdentityUser> Local (in-process) ABP change tracking UserRegisteredEventHandler
EntityChangedEventData<ApiKey> Local (in-process) ABP change tracking ApiKeyLocalEventHandler
ServiceBusAbpNotificationEto Azure Service Bus (not ABP) Spark cloud (external) ServiceBusNotificationConsumer

Azure Service Bus consumer (direct SDK, not ABP)

In the cloud, change notifications from the external Spark system arrive over Azure Service Bus, consumed by a plain BackgroundService that uses Azure.Messaging.ServiceBus directly. It is registered as a hosted service in HubApplicationModule (see Background Jobs & Workers).

modules/hub/src/Hub.Application/Services/Notifications/ServiceBusNotificationConsumer.cs

Key behaviours, all confirmed in source:

  • Fixed topic abp-notifications (the TopicName const).
  • Connection string hubServiceBus (ConnectionStringNames.HubServiceBus). If it is empty, the consumer logs a warning and returns without starting:
var connectionString = configuration.GetConnectionString(ConnectionStringNames.HubServiceBus);
if (string.IsNullOrWhiteSpace(connectionString))
{
    logger.LogWarning(
        "Service Bus connection string '{ConnectionStringName}' is not configured. "
            + "The Spark.Notifications consumer will not start.",
        ConnectionStringNames.HubServiceBus);
    return;
}
  • Subscription name is a random Guid.NewGuid().ToString("N"), cached in the Redis-backed IDistributedCache<string> under ServiceBusNotificationConsumer:SubscriptionName so a restart reuses the same subscription.
  • Requires an environment name from AZURE_ENVIRONMENT, ASPIRE_ENVIRONMENT, or SPARK_ENVIRONMENT; without one it also logs a warning and does not start.
  • Idempotently provisions the subscription and a single $Default SQL rule via ServiceBusAdministrationClient. The rule filters by CargonerdsCustomerId and a hard-coded set of entity/property changes, and the subscription AutoDeleteOnIdle is 7 days:
private static CreateRuleOptions BuildRuleOptions(Guid customerId) =>
    new(
        "$Default",
        new SqlRuleFilter(
            $@"
        CargonerdsCustomerId = '{customerId:D}'
        AND
        (
            (
                EntityTypeName in ('{nameof(Shipment)}', '{nameof(CustomsDeclaration)}')
                AND PropertyName IN (
                    '{nameof(Shipment.EstimatedPickup)}',
                    '{nameof(Shipment.EstimatedDeparture)}',
                    '{nameof(Shipment.EstimatedArrival)}',
                    '{nameof(Shipment.EstimatedDelivery)}'
                )
            )
            OR ( EntityTypeName = '{nameof(Document)}' AND PropertyName = '{nameof(Document.FileDate)}' )
            OR ( EntityTypeName = '{nameof(Event)}'    AND PropertyName = '{nameof(Event.ActualDate)}' )
        )"));
  • Processing: AutoCompleteMessages = false. Each message is deserialized into ServiceBusAbpNotificationEto, processed under a synthetic system principal (UserId = Guid.Empty, role Admin) via ICurrentPrincipalAccessor.Change(...) inside a BackgroundExecutionScope, then forwarded to NotificationSenderService.SendExternalNotificationAsync. On success it calls CompleteMessageAsync; on deserialization or processing failure it dead-letters the message.

The message contract:

modules/hub/src/Hub.Application.Contracts/Dtos/Notifications/ServiceBusAbpNotificationEto.cs

public class ServiceBusAbpNotificationEto
{
    public string PropertyName { get; set; } = string.Empty;
    public string EntityTypeName { get; set; } = string.Empty;
    public string ActionType { get; set; } = string.Empty;
    public object? OldValue { get; set; }
    public object? NewValue { get; set; }
    public Guid RelatedEntityId { get; set; }
    public Guid CargonerdsCustomerId { get; set; }
    public Guid ShipmentId { get; set; }
    public string ShipmentReferenceNumber { get; set; } = string.Empty;
}

Local vs cloud

Environment hubServiceBus Effect
Local (appsettings.spark.local.json) "" (empty) Consumer does not start — there is no Service Bus consumption locally.
Dev / Test / Prod (appsettings.spark.dev.json, .test.json, .prod.json) real Endpoint=sb://…servicebus.windows.net/… Consumer subscribes to the abp-notifications topic.

Secrets committed to the repository

The appsettings.spark.dev.json, appsettings.spark.test.json, and appsettings.spark.prod.json files contain live Service Bus connection strings with SharedAccessKey values checked into the repo. These keys are effectively public and should be rotated and moved to a secret store. (See also the broader configuration notes in Configuration reference.)


Configuration reference

Key / connection string File(s) Used by
RabbitMQ:Connections:Default:HostName appsettings.json (localhost), appsettings.aspire.json ({messaging}) ABP RabbitMQ event bus
RabbitMQ:Connections:Default:Override appsettings.aspire.json (true) ConfigureRabbitMQ URI parsing
RabbitMQ:EventBus:ClientName / ExchangeName appsettings.json (HttpApiHost / Cargonerds) ABP RabbitMQ event bus
ConnectionStrings:hubServiceBus appsettings.spark.*.json ServiceBusNotificationConsumer

Connection-string names are centralized:

src/Cargonerds.Domain.Shared/Configuration/ConnectionStringNames.cs

public static class ConnectionStringNames
{
    public const string HubServiceBus = "hubServiceBus";
    public const string HubDb = "Hub";
    public const string SparkDb = "Default";
    public const string BlobStorage = nameof(BlobStorage);
}

Gotchas

RabbitMQ override only fires under Aspire

ConfigureRabbitMQ rewrites AbpRabbitMqOptions only when RabbitMQ:Connections:Default:Override=true and the host string is a valid URI. With the plain appsettings.json (no Override), ABP uses the literal HostName. Setting HostName to a URI without Override will silently fail to parse — set both.

Service Bus consumer is not the ABP event bus and not RabbitMQ

Despite the abp-notifications topic name and the ServiceBusAbpNotificationEto type name, ServiceBusNotificationConsumer is a hand-rolled Azure.Messaging.ServiceBus BackgroundService. It silently no-ops (logging a warning) when hubServiceBus is empty (the local default) or when no environment name is resolvable — so locally there is no Service Bus consumption at all.

Service Bus subscription/rule are provisioned at runtime

The consumer creates and reconciles its subscription and a $Default SQL filter rule on startup. The rule references CargonerdsCustomerId and hard-coded entity/property names (Shipment.EstimatedPickup/Departure/Arrival/Delivery, Document.FileDate, Event.ActualDate, etc.). Renaming those entities/properties changes the rule the consumer (re)creates. Subscriptions AutoDeleteOnIdle after 7 days of inactivity.

ETOs use type-derived event names

CommentExtraPropertiesUpdatedEto has no [EventName] attribute, so the RabbitMQ routing/event name is derived from the type. Renaming or moving the class is therefore a wire-breaking change if publishers and consumers are deployed independently — keep producer and consumer in lockstep.

No transactional outbox/inbox configured

The solution does not configure ABP's distributed-event outbox/inbox (no AddDistributedEventOutbox / inbox registration was found). Publishing is direct: IDistributedEventBus.PublishAsync goes straight to RabbitMQ. If the broker is unavailable at publish time, the event is not persisted for later delivery, and there is no built-in dedup on the consumer side for the RabbitMQ path. (The "Inbox" UI under Hub.UI is the end-user notification inbox — unrelated to the ABP event inbox.)

Local handlers run only where the entity changes

UserRegisteredEventHandler and ApiKeyLocalEventHandler use the local bus, so they execute in the same process that performed the entity change. They do not propagate across services. If user/org or API-key logic ever needs to fan out to other processes, it would have to move to the distributed bus.


  • Background Jobs & Workers — Hangfire jobs, periodic workers, and where ServiceBusNotificationConsumer is registered as a hosted service.
  • Caching — Redis IDistributedCache (used for the Service Bus subscription-name cache) and the Hub EF second-level cache.
  • Aspire Integration — RabbitMQ and Redis resource wiring and connection-string injection.
  • ABP Patterns — module composition, [DependsOn], event bus, ITransientDependency.
  • Architecture Overview — host/module topology.
  • Modules: Pricing — the cross-module CommentExtraPropertiesUpdatedEto consumer and quotation status logic.
  • Configuration referenceRabbitMQ:*, ConnectionStrings (hubServiceBus, Hub, Default).