Messaging & Distributed Events¶
This page describes how the Cargonerds solution (ABP 10.x / .NET 10) does asynchronous, message-based communication:
- ABP's distributed event bus over RabbitMQ for cross-module integration events and ABP's own multi-tenant lifecycle events.
- ABP's local (in-process) event bus for entity-change reactions inside a single process.
- A direct Azure Service Bus consumer (raw
Azure.Messaging.ServiceBus, not the ABP event bus) that ingests change notifications from the external "Spark" cloud system.
Scope
This page is about messaging. Background processing (Hangfire jobs and timer-based workers) is closely related but documented separately — see Background Jobs & Workers. Caching (Redis / EF second-level) is on Caching.
Concept¶
ABP ships two event buses:
| Bus | Interface(s) | Transport | Scope |
|---|---|---|---|
| Local event bus | ILocalEventBus, ILocalEventHandler<T> |
in-process (method calls) | one process; lost on restart |
| Distributed event bus | IDistributedEventBus, IDistributedEventHandler<T> |
pluggable provider | across processes / services |
By default the distributed bus uses an in-memory provider that simply forwards to the local bus. Installing the Volo.Abp.EventBus.RabbitMQ integration replaces that provider so published events are serialized and routed through a RabbitMQ exchange, and handlers in any connected process receive them. See the ABP docs for the distributed event bus and the local event bus.
In this codebase:
- The API host wires the RabbitMQ provider, so
IDistributedEventBusis RabbitMQ-backed app-wide. - Messages are plain C# classes — by ABP convention an ETO ("Event Transfer Object"). They carry only data, no behaviour.
- Handlers implement
IDistributedEventHandler<TEto>(orILocalEventHandler<T>) and are auto-registered via ABP's dependency injection when they also implementITransientDependency.
flowchart LR
subgraph host["API Host process (CargonerdsHttpApiHostModule)"]
pub["CommentPublicAppService<br/>IDistributedEventBus.PublishAsync"]
localpub["EF Core change tracking<br/>(EntityCreated / EntityChanged)"]
localh1["UserRegisteredEventHandler<br/>ILocalEventHandler"]
localh2["ApiKeyLocalEventHandler<br/>ILocalEventHandler"]
dh1["CommentStatusChangedEventHandler<br/>(Pricing) IDistributedEventHandler"]
tenant["CargonerdsTenantDatabaseMigrationHandler<br/>IDistributedEventHandler"]
end
pub -->|"CommentExtraPropertiesUpdatedEto"| rmq[("RabbitMQ exchange<br/>'Cargonerds'")]
rmq -->|"CommentExtraPropertiesUpdatedEto"| dh1
rmq -.->|"TenantCreatedEto / ApplyDatabaseMigrationsEto<br/>(ABP framework ETOs)"| tenant
localpub -->|in-process| localh1
localpub -->|in-process| localh2
sparkcloud[["Spark cloud<br/>(external system)"]] -->|"abp-notifications topic"| sb[["Azure Service Bus"]]
sb -->|"ServiceBusAbpNotificationEto"| consumer["ServiceBusNotificationConsumer<br/>(raw BackgroundService)"]
consumer --> notif["NotificationSenderService"]
Three different message channels — don't conflate them
- RabbitMQ (ABP
IDistributedEventBus) — internal app integration events. - Local bus (
ILocalEventBus) — in-process only, never touches RabbitMQ. - Azure Service Bus (
ServiceBusNotificationConsumer) — a hand-rolled consumer using the Azure SDK directly. Despite the topic being namedabp-notificationsand the DTOServiceBusAbpNotificationEto, it is not ABP's event bus and does not use RabbitMQ.
RabbitMQ distributed event bus¶
Where it is wired¶
The RabbitMQ provider is enabled by depending on AbpEventBusRabbitMqModule in the API host module:
src/Cargonerds.HttpApi.Host/CargonerdsHttpApiHostModule.cs
[DependsOn(
// ...
typeof(AbpEventBusRabbitMqModule),
// ...
)]
public class CargonerdsHttpApiHostModule : AbpModule
{
public override void ConfigureServices(ServiceConfigurationContext context)
{
var configuration = context.Services.GetConfiguration();
// ...
ConfigureRabbitMQ(configuration);
// ...
}
}
This single [DependsOn] is what makes every IDistributedEventBus in the host RabbitMQ-backed.
Configuration keys¶
ABP's RabbitMQ module reads the standard RabbitMQ configuration section by convention. The base config:
src/Cargonerds.HttpApi.Host/appsettings.json
"RabbitMQ": {
"Connections": {
"Default": {
"HostName": "localhost"
}
},
"EventBus": {
"ClientName": "HttpApiHost",
"ExchangeName": "Cargonerds"
}
}
| Key | Meaning |
|---|---|
RabbitMQ:Connections:Default:HostName |
RabbitMQ host (or, when overridden, a full URI — see below). |
RabbitMQ:Connections:Default:Override |
Custom flag (not standard ABP) that switches on URI parsing. Absent ⇒ literal host. |
RabbitMQ:EventBus:ClientName |
Per-application client/queue identity. Here HttpApiHost. |
RabbitMQ:EventBus:ExchangeName |
The RabbitMQ exchange events are published to / consumed from. Here Cargonerds. |
The EventBus section (ClientName / ExchangeName) is defined only in the base appsettings.json, so the exchange name Cargonerds and client name HttpApiHost are the same in every environment.
The conditional connection override¶
ConfigureRabbitMQ only rewrites AbpRabbitMqOptions when Override=true and the host string parses as a URI:
src/Cargonerds.HttpApi.Host/CargonerdsHttpApiHostModule.cs
private void ConfigureRabbitMQ(IConfiguration configuration)
{
var rabbitMqOptions = configuration.GetValue<string>("RabbitMQ:Connections:Default:HostName");
var isOverridden = configuration.GetValue<bool>("RabbitMQ:Connections:Default:Override");
if (isOverridden && Uri.TryCreate(rabbitMqOptions, UriKind.RelativeOrAbsolute, out Uri? uri))
{
var factory = new ConnectionFactory { Uri = uri };
Configure<AbpRabbitMqOptions>(options =>
{
options.Connections.Default.UserName = factory.UserName;
options.Connections.Default.Password = factory.Password;
options.Connections.Default.HostName = factory.HostName;
options.Connections.Default.Port = factory.Port;
});
}
}
The override exists for .NET Aspire, which injects a full AMQP connection URI rather than a bare hostname:
src/Cargonerds.HttpApi.Host/appsettings.aspire.json
At runtime Aspire replaces {messaging} with the AMQP URI of the RabbitMQ resource (amqp://user:pass@host:port). ConfigureRabbitMQ then uses RabbitMQ.Client.ConnectionFactory to split that URI into the individual AbpRabbitMqOptions fields (user/password/host/port).
Two distinct code paths
- Plain run (
appsettings.json, noOverride): ABP uses the literalHostName(localhost). The URI-parsing branch is skipped. - Aspire run (
appsettings.aspire.json,Override: true): the injected URI is parsed intoAbpRabbitMqOptions.
The URI-parsing path is therefore Aspire-specific. If you point HostName at a non-Aspire broker, leave Override unset.
Aspire resource¶
The RabbitMQ broker itself is declared as an Aspire resource. See Aspire Integration for the full topology.
src/Cargonerds.AppHost/Program.cs
var rabbitmq = builder
.AddRabbitMQ(CargonerdsConsts.Aspire.Service.Messaging.Name, password: unsecurePassword)
.WithManagementPlugin()
.WithExternalHttpEndpoints()
.WithLifetime(ContainerLifetime.Persistent)
.IfRunMode(c => c.WithContainerName("spark-rabbitmq"))
.PublishAsContainer();
Projects then receive the connection via .WithReference(rabbitmq), which is what populates the {messaging} placeholder above. ABP's RabbitMQ integration broadly follows the pattern described in the ABP .NET Aspire integration docs.
Event types (ETOs) and handlers¶
Custom integration event: CommentExtraPropertiesUpdatedEto¶
The one application-defined event that flows over RabbitMQ in this solution links the Comments feature (Cargonerds host module, built on CmsKit) to the Pricing module.
The ETO is a plain class with no [EventName] attribute (ABP derives the event name from the type):
src/Cargonerds.Domain.Shared/Comments/CommentExtraPropertiesUpdatedEto.cs
public class CommentExtraPropertiesUpdatedEto
{
public CommentDto Comment { get; set; }
public Guid CommentId { get; set; }
public string? ReferenceNumber { get; set; }
public CommentStatus? Status { get; set; }
}
Publisher — CommentPublicAppService (which overrides CmsKit's CommentPublicAppService via [Dependency(ReplaceServices = true)] + [ExposeServices(...)]) publishes it whenever a comment is created or updated with a reference number or status:
src/Cargonerds.Application/Services/CommentPublicAppService.cs
private async Task PublishEvent(Guid id, CommentDto dto, string? referenceNumber, CommentStatus? status)
{
await DistributedEventBus.PublishAsync(
new CommentExtraPropertiesUpdatedEto
{
CommentId = id,
Comment = dto,
ReferenceNumber = referenceNumber,
Status = status,
}
);
}
Consumer — CommentStatusChangedEventHandler in the Pricing module recomputes a Quotation.Status from the aggregated comment statuses of a quotation request:
modules/pricing/src/Pricing.Domain/EventHandler/CommentStatusChangedEventHandler.cs
public class CommentStatusChangedEventHandler(
ICommentRepository commentRepository,
IHubRepository<Hub.Entities.Pricing.Quotation, Guid> quotationRepository
) : IDistributedEventHandler<CommentExtraPropertiesUpdatedEto>, ITransientDependency
{
public async Task HandleEventAsync(CommentExtraPropertiesUpdatedEto eventData)
{
if (eventData.Comment.EntityType != PricingConsts.QuotationRequest)
return;
// ...recompute and persist Quotation.Status...
}
}
This is the canonical cross-module decoupling pattern: the Cargonerds host module does not reference Pricing; it only publishes an event. See also Modules: Pricing.
Framework events: tenant lifecycle (multi-tenancy)¶
ABP's own multi-tenancy module publishes distributed events when tenants change. This solution consumes them to migrate/seed per-tenant databases:
src/Cargonerds.Domain/Data/CargonerdsTenantDatabaseMigrationHandler.cs
public class CargonerdsTenantDatabaseMigrationHandler
: IDistributedEventHandler<TenantCreatedEto>,
IDistributedEventHandler<TenantConnectionStringUpdatedEto>,
IDistributedEventHandler<ApplyDatabaseMigrationsEto>,
ITransientDependency
{
public async Task HandleEventAsync(TenantCreatedEto eventData) { /* migrate + seed */ }
public async Task HandleEventAsync(TenantConnectionStringUpdatedEto eventData) { /* ... */ }
public async Task HandleEventAsync(ApplyDatabaseMigrationsEto eventData) { /* ... */ }
}
These ETOs (TenantCreatedEto, TenantConnectionStringUpdatedEto, ApplyDatabaseMigrationsEto) come from ABP (Volo.Abp.MultiTenancy / data namespaces), not from this codebase. With RabbitMQ enabled, a tenant created in one process can trigger migration in another. For background on multi-tenancy see the ABP multi-tenancy docs.
Local (in-process) event handlers¶
These use ILocalEventHandler<T> and never touch RabbitMQ. They react to ABP's automatically-published entity-change events.
| Handler | Listens to | Action | File |
|---|---|---|---|
UserRegisteredEventHandler |
EntityCreatedEventData<IdentityUser> |
Adds each new identity user to the default organization unit. | src/Cargonerds.Domain/EventHandler/UserRegisteredEventHandler.cs |
ApiKeyLocalEventHandler |
EntityChangedEventData<ApiKey> |
Evicts the API-key cache entries (by prefix and id) when an API key changes. | src/Cargonerds.Domain/ApiKeys/DefaultApiKeyStore.cs |
public class UserRegisteredEventHandler(IOrganizationUnitRepository ouRepository, IdentityUserManager userManager)
: ILocalEventHandler<EntityCreatedEventData<IdentityUser>>,
ITransientDependency
{
public async Task HandleEventAsync(EntityCreatedEventData<IdentityUser> eventData)
{
var defaultUnit = (await ouRepository.GetDbSetAsync())
.FirstOrDefault(ou => ou.DisplayName == OrganizationUnitConstants.DefaultName);
// ...add user to defaultUnit if it has no units yet...
}
}
EntityCreatedEventData<T> / EntityChangedEventData<T> are raised by ABP's change tracking — see DDD: domain events / local event bus.
Summary of message contracts¶
| Event / DTO | Bus | Published by | Handled by |
|---|---|---|---|
CommentExtraPropertiesUpdatedEto |
RabbitMQ (distributed) | CommentPublicAppService (this app) |
CommentStatusChangedEventHandler (Pricing) |
TenantCreatedEto, TenantConnectionStringUpdatedEto, ApplyDatabaseMigrationsEto |
RabbitMQ (distributed) | ABP multi-tenancy (framework) | CargonerdsTenantDatabaseMigrationHandler |
EntityCreatedEventData<IdentityUser> |
Local (in-process) | ABP change tracking | UserRegisteredEventHandler |
EntityChangedEventData<ApiKey> |
Local (in-process) | ABP change tracking | ApiKeyLocalEventHandler |
ServiceBusAbpNotificationEto |
Azure Service Bus (not ABP) | Spark cloud (external) | ServiceBusNotificationConsumer |
Azure Service Bus consumer (direct SDK, not ABP)¶
In the cloud, change notifications from the external Spark system arrive over Azure Service Bus, consumed by a plain BackgroundService that uses Azure.Messaging.ServiceBus directly. It is registered as a hosted service in HubApplicationModule (see Background Jobs & Workers).
modules/hub/src/Hub.Application/Services/Notifications/ServiceBusNotificationConsumer.cs
Key behaviours, all confirmed in source:
- Fixed topic
abp-notifications(theTopicNameconst). - Connection string
hubServiceBus(ConnectionStringNames.HubServiceBus). If it is empty, the consumer logs a warning and returns without starting:
var connectionString = configuration.GetConnectionString(ConnectionStringNames.HubServiceBus);
if (string.IsNullOrWhiteSpace(connectionString))
{
logger.LogWarning(
"Service Bus connection string '{ConnectionStringName}' is not configured. "
+ "The Spark.Notifications consumer will not start.",
ConnectionStringNames.HubServiceBus);
return;
}
- Subscription name is a random
Guid.NewGuid().ToString("N"), cached in the Redis-backedIDistributedCache<string>underServiceBusNotificationConsumer:SubscriptionNameso a restart reuses the same subscription. - Requires an environment name from
AZURE_ENVIRONMENT,ASPIRE_ENVIRONMENT, orSPARK_ENVIRONMENT; without one it also logs a warning and does not start. - Idempotently provisions the subscription and a single
$DefaultSQL rule viaServiceBusAdministrationClient. The rule filters byCargonerdsCustomerIdand a hard-coded set of entity/property changes, and the subscriptionAutoDeleteOnIdleis 7 days:
private static CreateRuleOptions BuildRuleOptions(Guid customerId) =>
new(
"$Default",
new SqlRuleFilter(
$@"
CargonerdsCustomerId = '{customerId:D}'
AND
(
(
EntityTypeName in ('{nameof(Shipment)}', '{nameof(CustomsDeclaration)}')
AND PropertyName IN (
'{nameof(Shipment.EstimatedPickup)}',
'{nameof(Shipment.EstimatedDeparture)}',
'{nameof(Shipment.EstimatedArrival)}',
'{nameof(Shipment.EstimatedDelivery)}'
)
)
OR ( EntityTypeName = '{nameof(Document)}' AND PropertyName = '{nameof(Document.FileDate)}' )
OR ( EntityTypeName = '{nameof(Event)}' AND PropertyName = '{nameof(Event.ActualDate)}' )
)"));
- Processing:
AutoCompleteMessages = false. Each message is deserialized intoServiceBusAbpNotificationEto, processed under a synthetic system principal (UserId = Guid.Empty, roleAdmin) viaICurrentPrincipalAccessor.Change(...)inside aBackgroundExecutionScope, then forwarded toNotificationSenderService.SendExternalNotificationAsync. On success it callsCompleteMessageAsync; on deserialization or processing failure it dead-letters the message.
The message contract:
modules/hub/src/Hub.Application.Contracts/Dtos/Notifications/ServiceBusAbpNotificationEto.cs
public class ServiceBusAbpNotificationEto
{
public string PropertyName { get; set; } = string.Empty;
public string EntityTypeName { get; set; } = string.Empty;
public string ActionType { get; set; } = string.Empty;
public object? OldValue { get; set; }
public object? NewValue { get; set; }
public Guid RelatedEntityId { get; set; }
public Guid CargonerdsCustomerId { get; set; }
public Guid ShipmentId { get; set; }
public string ShipmentReferenceNumber { get; set; } = string.Empty;
}
Local vs cloud¶
| Environment | hubServiceBus |
Effect |
|---|---|---|
Local (appsettings.spark.local.json) |
"" (empty) |
Consumer does not start — there is no Service Bus consumption locally. |
Dev / Test / Prod (appsettings.spark.dev.json, .test.json, .prod.json) |
real Endpoint=sb://…servicebus.windows.net/… |
Consumer subscribes to the abp-notifications topic. |
Secrets committed to the repository
The appsettings.spark.dev.json, appsettings.spark.test.json, and appsettings.spark.prod.json files contain live Service Bus connection strings with SharedAccessKey values checked into the repo. These keys are effectively public and should be rotated and moved to a secret store. (See also the broader configuration notes in Configuration reference.)
Configuration reference¶
| Key / connection string | File(s) | Used by |
|---|---|---|
RabbitMQ:Connections:Default:HostName |
appsettings.json (localhost), appsettings.aspire.json ({messaging}) |
ABP RabbitMQ event bus |
RabbitMQ:Connections:Default:Override |
appsettings.aspire.json (true) |
ConfigureRabbitMQ URI parsing |
RabbitMQ:EventBus:ClientName / ExchangeName |
appsettings.json (HttpApiHost / Cargonerds) |
ABP RabbitMQ event bus |
ConnectionStrings:hubServiceBus |
appsettings.spark.*.json |
ServiceBusNotificationConsumer |
Connection-string names are centralized:
src/Cargonerds.Domain.Shared/Configuration/ConnectionStringNames.cs
public static class ConnectionStringNames
{
public const string HubServiceBus = "hubServiceBus";
public const string HubDb = "Hub";
public const string SparkDb = "Default";
public const string BlobStorage = nameof(BlobStorage);
}
Gotchas¶
RabbitMQ override only fires under Aspire
ConfigureRabbitMQ rewrites AbpRabbitMqOptions only when RabbitMQ:Connections:Default:Override=true and the host string is a valid URI. With the plain appsettings.json (no Override), ABP uses the literal HostName. Setting HostName to a URI without Override will silently fail to parse — set both.
Service Bus consumer is not the ABP event bus and not RabbitMQ
Despite the abp-notifications topic name and the ServiceBusAbpNotificationEto type name, ServiceBusNotificationConsumer is a hand-rolled Azure.Messaging.ServiceBus BackgroundService. It silently no-ops (logging a warning) when hubServiceBus is empty (the local default) or when no environment name is resolvable — so locally there is no Service Bus consumption at all.
Service Bus subscription/rule are provisioned at runtime
The consumer creates and reconciles its subscription and a $Default SQL filter rule on startup. The rule references CargonerdsCustomerId and hard-coded entity/property names (Shipment.EstimatedPickup/Departure/Arrival/Delivery, Document.FileDate, Event.ActualDate, etc.). Renaming those entities/properties changes the rule the consumer (re)creates. Subscriptions AutoDeleteOnIdle after 7 days of inactivity.
ETOs use type-derived event names
CommentExtraPropertiesUpdatedEto has no [EventName] attribute, so the RabbitMQ routing/event name is derived from the type. Renaming or moving the class is therefore a wire-breaking change if publishers and consumers are deployed independently — keep producer and consumer in lockstep.
No transactional outbox/inbox configured
The solution does not configure ABP's distributed-event outbox/inbox (no AddDistributedEventOutbox / inbox registration was found). Publishing is direct: IDistributedEventBus.PublishAsync goes straight to RabbitMQ. If the broker is unavailable at publish time, the event is not persisted for later delivery, and there is no built-in dedup on the consumer side for the RabbitMQ path. (The "Inbox" UI under Hub.UI is the end-user notification inbox — unrelated to the ABP event inbox.)
Local handlers run only where the entity changes
UserRegisteredEventHandler and ApiKeyLocalEventHandler use the local bus, so they execute in the same process that performed the entity change. They do not propagate across services. If user/org or API-key logic ever needs to fan out to other processes, it would have to move to the distributed bus.
Related pages¶
- Background Jobs & Workers — Hangfire jobs, periodic workers, and where
ServiceBusNotificationConsumeris registered as a hosted service. - Caching — Redis
IDistributedCache(used for the Service Bus subscription-name cache) and the Hub EF second-level cache. - Aspire Integration — RabbitMQ and Redis resource wiring and connection-string injection.
- ABP Patterns — module composition,
[DependsOn], event bus,ITransientDependency. - Architecture Overview — host/module topology.
- Modules: Pricing — the cross-module
CommentExtraPropertiesUpdatedEtoconsumer and quotation status logic. - Configuration reference —
RabbitMQ:*,ConnectionStrings(hubServiceBus,Hub,Default).