ApplicationEventPublisher is provided in Spring for publishing events, which is useful in some scenarios, for example, when I create an item, and another service listens for item creation and creates inventory for it. So it’s very practical in Spring MVC, and I often use it for decoupling, but when I switch to WebFlux, it’s embarrassing. Because ApplicationEventPublisher is a synchronous operation, it does not support reactive, ie streaming operations.

So, I implement a similar publish-subscribe pattern in WebFlux to replace ApplicationEventPublisher

Implement the Event service

Our event service is divided into 3 parts

  • Encapsulation of published objects
  • Design a common listening interface
  • Provide an object like ApplicationEventPublisher to publish events

ObjectEventNotifier

First, encapsulate the published object. This step is unavoidable even if it is ApplicationEventPublisher. Otherwise, you listen to an object, but you don’t know whether the object is created or deleted. What should you do?

public class ObjectEventNotifier<T> implements ResolvableTypeProvider {

    private final T object;
    private final Type type;

    private ObjectEventNotifier(T object, Type type) {
        this.object = object;
        this.type = type;
    }

    public static <T> ObjectEventNotifier<T> from(T object, Type type) {
        return new ObjectEventNotifier<>(object, type);
    }

    @Override
    public ResolvableType getResolvableType() {
        return ResolvableType.forClassWithGenerics(getClass(), ResolvableType.forInstance(object));
    }

    public T getObject() {
        return this.object;
    }

    public Type getType() {
        return this.type;
    }

    public enum Type {
        Create, Update, Delete
    }
}

The above class, according to the enumeration Type, classifies objects so that they can be handled differently in different situations

EventListener

Next, design the listening interface. Generally speaking, we choose different consumers according to the type of class. Therefore, the simple listening interface is as follows

public interface EventListener<T> {

    Class<T> target();

    Publisher<Void> consume(ObjectEventNotifier<T> consumer);

}

But in this way, there is a problem, when we create a listening service, it will be like the following

@Bean
public EventListener<Goods> listener() {
    return new EventListener<>() {
        @Override
        public Class<Goods> target() {
            return Goods.class;
        }
        @Override
        public Publisher<Void> consume(ObjectEventNotifier<Goods> consumer) {
            // ...
        }
    };
}

This code is really hard to describe, so a simplified version of the creation method must be provided for the listening interface

public interface EventListener<T> {
    //...
    static <T> EventListener<T> register(Class<T> target, Function<ObjectEventNotifier<T>, Publisher<Void>> fn) {
        return new EventListener<>() {
            @Override
            public Class<T> target() {
                return target;
            }
            @Override
            public Publisher<Void> consume(ObjectEventNotifier<T> consumer) {
                return fn.apply(consumer);
            }
        };
    }
}

@Bean
public EventListener<Goods> listener() {
    return EventListener.register(Goods.class, notifier -> ...);
}

so much better

EventService

Finally, implement our most important event service, which is divided into two parts

  • Receive listening services and classify based on class type
  • Accept the published object, encapsulate and select the corresponding monitoring service consumption according to the type

In the first part, since we use Spring boot, we can let spring give us the listening service we want. We only need to store the EventListener in different Maps according to the type.

@Service
public class EventService {

    public Map<String, List<Object>> store = new ConcurrentHashMap<>();

    @SuppressWarnings({"unchecked", "rawtypes"})
    public EventService(List<EventListener> listeners) {
        listeners.forEach(listener -> {
            this.register(listener.target(), listener::consume);
        });
    }

    public <T> void register(Class<T> target, Function<ObjectEventNotifier<T>, Publisher<Void>> consumer) {
        String name = target.getName();
        List<Object> consumers = store.getOrDefault(name, null);
        if (consumers == null) {
            consumers = new ArrayList<>();
            store.put(name, consumers);
        }
        consumers.add(consumer);
    }
}

The second part, this is the difficulty in reactive. The reactive is non-blocking, so the consumer also needs to return a non-blocking result. The previous interface returned YES Publisher<Void> because this is an unprocessed result.

@Service
public class EventService {

    public Map<String, List<Object>> store = new ConcurrentHashMap<>();

    @SuppressWarnings({"unchecked"})
    public <T> Flux<Void> publish(ObjectEventNotifier.Type type, T target) {
        String name = target.getClass().getName();
        List<Object> consumers = store.getOrDefault(name, null);
        if (consumers == null) {
            return Flux.empty();
        }
        return Flux
            .fromIterable(consumers)
            .flatMap(obj -> {
                Function<ObjectEventNotifier<T>, Publisher<Void>> consumer = (Function<ObjectEventNotifier<T>, Publisher<Void>>) obj;
                Publisher<Void> apply = consumer.apply(ObjectEventNotifier.from(target, type));
                return apply == null ? Mono.empty() : apply;
            });
    }

    public <T> Mono<T> publishCreate(T target) {
        return this.publish(ObjectEventNotifier.Type.Create, target)
            .then(Mono.just(target));
    }

    public <T> Mono<T> publishUpdate(T target) {
        return this.publish(ObjectEventNotifier.Type.Update, target)
            .then(Mono.just(target));
    }

    public <T> Mono<T> publishDelete(T target) {
        return this.publish(ObjectEventNotifier.Type.Delete, target)
            .then(Mono.just(target));
    }

}

Mainly the code of the publish part, the following publishCreate, etc., are for quick operation

Try the written Event service

Create a cargo service

public class GoodsService {

    @Resource
    private EventService eventService;

    public Mono<Goods> createGoods() {
        Goods apple = Goods.of(1, "苹果");
        return Mono.just(apple)
            .flatMap(eventService::publishCreate);
    }

}

Create a monitoring service for Goods (create two, after all, this kind of monitoring is generally one-to-many)

@Configuration
public class ServerConfiguration {

    @Bean
    public GoodsService goodsService() {
        return new GoodsService();
    }

    @Bean
    public StockService stockService() {
        return new StockService();
    }

    @Bean
    public EventListener<Goods> listener1(StockService stockService) {
        return EventListener.register(Goods.class, stockService::initStockWithGoods1);
    }

    @Bean
    public EventListener<Goods> listener2(StockService stockService) {
        return EventListener.register(Goods.class, stockService::initStockWithGoods2);
    }

}

@Slf4j
public class StockService {

    public Publisher<Void> initStockWithGoods1(ObjectEventNotifier<Goods> notifier) {
        ObjectEventNotifier.Type type = notifier.getType();
        if (type == ObjectEventNotifier.Type.Create) {
            log.error("Create stock for: " + notifier.getObject().getName());
            return Mono.just(Stock.of(1, 0)).then();
        }
        return Mono.empty();
    }

    public Publisher<Void> initStockWithGoods2(ObjectEventNotifier<Goods> notifier) {
        log.error("Another listener for Goods: " + notifier.getObject().getName());
        return Mono.empty();
    }

}

Create a test case that calls create apple

@SpringBootTest
class GoodsServiceTest {

    @Resource
    GoodsService goodsService;

    @Test
    void createGoods() {
        goodsService.createGoods()
            .as(StepVerifier::create)
            .expectNextCount(1)
            .verifyComplete();
    }
}

Running, we can see the following results, indicating that when the apple was created, two methods in the inventory were called

The source code is here

Source code: https://github.com/jiangtj-lab/ex-flux-event

The progress of Spring’s support of the reactive technology stack for ApplicationEventPublisher can be seen in this Issue: https://github.com/spring-projects/spring-framework/issues/21025