Event bus and service registry for inter-application communication in Java.
- Message Bus: Topic-based publish/subscribe messaging
- Service Registry: Type-safe service discovery and lookup
- Asynchronous Delivery: Non-blocking message dispatch
- Thread-Safe: Concurrent operations with CopyOnWriteArrayList
- Flexible Messages: Support for headers, custom payloads, and metadata
- Multiple Subscribers: Broadcast messages to all interested parties
- Exception Isolation: Handler exceptions don't affect other subscribers
<dependency>
<groupId>org.flossware</groupId>
<artifactId>eventbus-java</artifactId>
<version>1.0</version>
</dependency>implementation 'org.flossware:eventbus-java:1.0'import org.flossware.eventbus-java.*;
import org.flossware.eventbus-java.api.*;
// Create message bus
InMemoryMessageBus messageBus = new InMemoryMessageBus();
// Subscribe to topic
Subscription subscription = messageBus.subscribe("events", message -> {
System.out.println("Received: " + new String(message.getPayload()));
});
// Publish message
Message msg = Message.builder()
.topic("events")
.sourceApplicationId("my-app")
.payload("Hello, World!".getBytes())
.header("priority", "high")
.build();
messageBus.publish("events", msg);
// Unsubscribe when done
subscription.cancel();
messageBus.shutdown();import org.flossware.eventbus-java.*;
import org.flossware.eventbus-java.api.*;
// Create registry
ServiceRegistryImpl registry = new ServiceRegistryImpl();
// Define service interface
public interface PaymentService {
void processPayment(double amount);
}
// Register implementation
PaymentService impl = new PaymentServiceImpl();
registry.registerService(PaymentService.class, impl);
// Lookup service
Optional<PaymentService> service = registry.getService(PaymentService.class);
service.ifPresent(s -> s.processPayment(100.0));
// Get all implementations
List<PaymentService> all = registry.getAllServices(PaymentService.class);
// Unregister when done
registry.unregisterService(PaymentService.class, impl);InMemoryMessageBus bus = new InMemoryMessageBus();
// Simple message
Message msg = Message.builder()
.topic("notifications")
.sourceApplicationId("notifier")
.payload("New notification".getBytes())
.build();
bus.publish("notifications", msg);
// Message with headers
Message msgWithHeaders = Message.builder()
.topic("events")
.sourceApplicationId("app-1")
.payload(jsonData.getBytes())
.header("content-type", "application/json")
.header("version", "1.0")
.header("priority", 5)
.build();
bus.publish("events", msgWithHeaders);// Simple subscription
Subscription sub = bus.subscribe("notifications", message -> {
String text = new String(message.getPayload());
System.out.println("Notification: " + text);
});
// Access message metadata
bus.subscribe("events", message -> {
String source = message.getSourceApplicationId();
Map<String, Object> headers = message.getHeaders();
long timestamp = message.getTimestamp();
System.out.printf("Message from %s at %d%n", source, timestamp);
});
// Unsubscribe
sub.cancel();// Multiple subscribers to same topic
bus.subscribe("broadcast", msg -> handleInService1(msg));
bus.subscribe("broadcast", msg -> handleInService2(msg));
bus.subscribe("broadcast", msg -> handleInService3(msg));
// All subscribers receive the message
bus.publish("broadcast", message);Message msg = Message.builder()
.id("custom-id") // Optional: auto-generated if not set
.topic("my-topic")
.sourceApplicationId("app-1")
.payload(data)
.header("key1", "value1")
.header("key2", 42)
.timestamp(System.currentTimeMillis()) // Optional: auto-generated
.build();ServiceRegistryImpl registry = new ServiceRegistryImpl();
// Register single implementation
UserService userService = new UserServiceImpl();
registry.registerService(UserService.class, userService);
// Register multiple implementations
EmailNotifier emailNotifier = new EmailNotifier();
SmsNotifier smsNotifier = new SmsNotifier();
registry.registerService(Notifier.class, emailNotifier);
registry.registerService(Notifier.class, smsNotifier);// Get first registered service
Optional<UserService> userService = registry.getService(UserService.class);
userService.ifPresent(service -> {
User user = service.findById(123);
});
// Get all implementations
List<Notifier> notifiers = registry.getAllServices(Notifier.class);
for (Notifier notifier : notifiers) {
notifier.send("Hello!");
}
// Check if service exists
if (registry.getService(PaymentService.class).isPresent()) {
// Service available
}// Compile-time type safety
registry.registerService(UserService.class, new UserServiceImpl());
// This won't compile - type mismatch
// registry.registerService(UserService.class, new PaymentServiceImpl());
// Runtime validation
try {
registry.registerService(String.class, "not an interface");
} catch (IllegalArgumentException e) {
// serviceInterface must be an interface
}public class EventBusApplication {
private final InMemoryMessageBus bus = new InMemoryMessageBus();
public void start() {
// Register event handlers
bus.subscribe("user.created", this::handleUserCreated);
bus.subscribe("user.updated", this::handleUserUpdated);
bus.subscribe("user.deleted", this::handleUserDeleted);
}
private void handleUserCreated(Message msg) {
// Send welcome email
// Update analytics
// Log audit trail
}
public void createUser(User user) {
// Save user to database
saveUser(user);
// Publish event
Message event = Message.builder()
.topic("user.created")
.sourceApplicationId("user-service")
.payload(toJson(user).getBytes())
.build();
bus.publish("user.created", event);
}
}public class ServiceLocator {
private static final ServiceRegistryImpl registry = new ServiceRegistryImpl();
public static <T> void register(Class<T> serviceInterface, T implementation) {
registry.registerService(serviceInterface, implementation);
}
public static <T> T getService(Class<T> serviceInterface) {
return registry.getService(serviceInterface)
.orElseThrow(() -> new ServiceNotFoundException(serviceInterface));
}
public static <T> List<T> getAll(Class<T> serviceInterface) {
return registry.getAllServices(serviceInterface);
}
}public interface Plugin {
String getName();
void initialize();
void shutdown();
}
// Register plugins
ServiceRegistryImpl registry = new ServiceRegistryImpl();
registry.registerService(Plugin.class, new DatabasePlugin());
registry.registerService(Plugin.class, new CachePlugin());
registry.registerService(Plugin.class, new MetricsPlugin());
// Initialize all plugins
List<Plugin> plugins = registry.getAllServices(Plugin.class);
for (Plugin plugin : plugins) {
System.out.println("Initializing: " + plugin.getName());
plugin.initialize();
}- Decoupling: Publishers and subscribers don't know about each other
- Asynchronous: Message delivery doesn't block publishers
- Thread-Safe: Safe for concurrent access from multiple threads
- Exception Isolation: Subscriber exceptions don't affect other subscribers
- Type Safety: Service registry provides compile-time type checking
- Simple API: Easy to understand and use
- Microservices Communication: Event-driven communication between services
- Plugin Systems: Dynamic service discovery and registration
- Event Sourcing: Publish domain events for audit/replay
- Decoupled Components: Reduce coupling between application modules
- Notification Systems: Broadcast notifications to multiple handlers
All classes are thread-safe and can be used from multiple threads concurrently:
InMemoryMessageBus: UsesCopyOnWriteArrayListfor subscriber listsServiceRegistryImpl: UsesConcurrentHashMapandCopyOnWriteArrayListMessage: Immutable with defensive copying
- Java 21 or higher
- SLF4J API (for logging)
MIT License - see LICENSE file for details.
Contributions are welcome! Please see CONTRIBUTING.md for guidelines.
See CHANGELOG.md for version history.