14/01/2022
Francesco Gnarra
I Design Pattern: parliamo di Publish-Subscribe
Nell'ultimo articolo del tutorial in materia di Design Pattern abbiamo parlato di Singleton, il pattern della categoria dei creazionali. Oggi riprendiamo la rubrica approfondendo i Design Pattern Publish-Subscribe, dove:
- i Publisher rappresentano entità che creano/pubblicano un messaggio su un argomento;
- i Subscriber rappresentano entità che si iscrivono ai messaggi su un argomento.
In un modello Publish-Subscribe basato su argomenti, i Publisher taggano ogni messaggio con l'argomento invece di fare riferimento a specifici Subscriber. Il sistema di messaggistica invia quindi il messaggio a tutti i Subscriber che hanno chiesto di ricevere messaggi su quell'argomento. I Publisher si occupano solamente della creazione del messaggio originale e possono lasciare il compito di servire i Subscriber all'infrastruttura di messaggistica (qui che entra in gioco il modello).
ARCHITETTURA DEL MODELLO PUBLISH-SUBSCRIBE
Il modello Publish-Subscriber presenta 3 componenti fondamentali: Publisher, Subscriber e PubSub Service. In particolare, attraverso quest'ultimo il Publisher è in grado di inviare messaggi all'insaputa dei Subscriber.
I Subscriber riceveranno solo i messaggi per i quali sono registrati con PubSubService. Ad esempio:
supponiamo di avere tre argomenti diversi (tipi di messaggio) - A, B, C -, ma solo l'argomento A interessa il subscriber 1. Gli argomenti B e C sono di interesse per il Subscriber 2, mentre il Subscriber 3 desidera ricevere messaggi per gli argomenti A e C. Quindi il Subscriber 1 riceverà una notifica per l'argomento A, il Subscriber 2 riceverà una notifica per gli argomenti B e C e il Subscriber 3 riceverà una notifica per gli argomenti A e C dal servizio PubSub.
Quindi:
- i Publisher contrassegnano ogni messaggio con un argomento e lo inviano a PubSubService che funge da intermediario tra publisher e destinatari;
- il Subscriber si registra con PubSubService (intermediario) e dice di essere interessato ai messaggi relativi a un particolare argomento.
Il modello Publisher-Subscriber è un'architettura altamente accoppiata in cui i Publisher non sanno chi sono i Subscriber e i Subscriber non sanno chi sono i Publisher dell'argomento.
Sebbene il pattern Publisher-Subscriber sia molto simile al pattern Observer, ci sono 2 differenze principali:
- nel pattern Observer, gli Observer sono consapevoli degli Observable. Nel modello Publisher-Subscriber Publisher e Subscriber non hanno bisogno di conoscersi;
- il pattern Observer è per lo più implementato in modo sincrono, ovvero l’observable chiama il metodo appropriato di tutti i suoi Observer quando si verifica un evento. Il modello Publisher-Subscriber è implementato principalmente in modo asincrono (usando la coda dei messaggi).
UN ESEMPIO PRATICO
L’interfaccia Publisher definisce il metodo astratto publish() con cui invia i messaggi al servizio PubSub.
package pubsub.publisher;
import pubsub.Message;
import pubsub.service.PubSubService;
public interface Publisher {
//Publishes new message to PubSubService
void publish(Message message, PubSubService pubSubService);
}
La classe PublisherImpl implementa l’interfaccia Publisher e implementa il metodo publish, che invia il messaggio al servizio PubSubService.
package pubsub.publisher;
import pubsub.Message;
import pubsub.service.PubSubService;
public class PublisherImpl implements Publisher {
//Publishes new message to PubSubService
public void publish(Message message, PubSubService pubSubService) {
pubSubService.addMessageToQueue(message);
}
}
Subscriber è una classe astratta che contiene:
- addSubscriber() – Aggiunge/Registra i subscriber per un argomento attraverso il servizio PubSub.
- unSubscribe() – Rimuove il subscriber da un argomento attraverso il servizio PubSub.
- List<Message> subscriberMessages – Una lista di messaggi che memorizza i messaggi ricevuti dal Subscriber.
- getMessagesForSubscriberOfTopic() – Un metodo che richiede i messaggi per un subscriber di un argomento.
Quindi:
package pubsub.subscriber;
import java.util.ArrayList;
import java.util.List;
import pubsub.Message;
import pubsub.service.PubSubService;
public abstract class Subscriber {
//store all messages received by the subscriber
private List<Message> subscriberMessages = new ArrayList<Message>();
public List<Message> getSubscriberMessages() {
return subscriberMessages;
}
public void setSubscriberMessages(List<Message> subscriberMessages) {
this.subscriberMessages = subscriberMessages;
}
//Add subscriber with PubSubService for a topic
public abstract void addSubscriber(String topic, PubSubService pubSubService);
//Unsubscribe subscriber with PubSubService for a topic
public abstract void unSubscribe(String topic, PubSubService pubSubService);
//Request specifically for messages related to topic from PubSubService
public abstract void getMessagesForSubscriberOfTopic(String topic, PubSubService pubSubService);
//Print all messages received by the subscriber
public void printMessages(){
for(Message message : subscriberMessages){
System.out.println("Message Topic -> "+ message.getTopic() + " : " + message.getPayload());
}
}
}
La classe SubscriberImpl estende la classe Subscriber e implementa i suoi metodi astratti.
package pubsub.subscriber;
import pubsub.service.PubSubService;
public class SubscriberImpl extends Subscriber{
//Add subscriber with PubSubService for a topic
public void addSubscriber(String topic, PubSubService pubSubService){
pubSubService.addSubscriber(topic, this);
}
//Unsubscribe subscriber with PubSubService for a topic
public void unSubscribe(String topic, PubSubService pubSubService){
pubSubService.removeSubscriber(topic, this);
}
//Request specifically for messages related to topic from PubSubService
public void getMessagesForSubscriberOfTopic(String topic, PubSubService pubSubService) {
pubSubService.getMessagesForSubscriberOfTopic(topic, this);
}
}
La classe Message è una semplice classe POJO per rappresentare i messaggi. Ha un attributo topic e un attributo per il contenuto del messaggio (payload).
package pubsub;
public class Message {
private String topic;
private String payload;
public Message(){}
public Message(String topic, String payload) {
this.topic = topic;
this.payload = payload;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getPayload() {
return payload;
}
public void setPayload(String payload) {
this.payload = payload;
}
}
PubSubService è la classe principale e contiene:
- Map<String, Set<Subscriber>> subscribersTopicMap – Memorizza i subscriber interessati ad un argomento.
- Queue<Message> messageQueue – Una coda che memorizza i messaggi pubblicati dai publisher.
- AddMessageToQueue() – Aggiunge un messaggio pubblicato da un publisher alla coda dei messaggi.
- AddSubscriber () – Aggiunge un subscriber per un argomento.
- removeSubscriber() – Rimuove un subscriber per un argomento.
- broadcast() – Effettua il Broadcast dei nuovi messaggi aggiunti in coda a tutti i subscriber dell’argomento. La messagesQueue diventerà vuota dopo che sarà completato il broadcasting.
- getMessagesForSubscriberOfTopic() – Invia i messaggi rispetto ad uno specifico argomento a tutti i suoi subscriber.
package pubsub.service;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import pubsub.Message;
import pubsub.subscriber.Subscriber;
public class PubSubService {
//Keeps set of subscriber topic wise, using set to prevent duplicates
Map<String, Set<Subscriber>> subscribersTopicMap = new HashMap<String, Set<Subscriber>>();
//Holds messages published by publishers
Queue<Message> messagesQueue = new LinkedList<Message>();
//Adds message sent by publisher to queue
public void addMessageToQueue(Message message){
messagesQueue.add(message);
}
//Add a new Subscriber for a topic
public void addSubscriber(String topic, Subscriber subscriber){
if(subscribersTopicMap.containsKey(topic)){
Set<Subscriber> subscribers = subscribersTopicMap.get(topic);
subscribers.add(subscriber);
subscribersTopicMap.put(topic, subscribers);
}else{
Set<Subscriber> subscribers = new HashSet<Subscriber>();
subscribers.add(subscriber);
subscribersTopicMap.put(topic, subscribers);
}
}
//Remove an existing subscriber for a topic
public void removeSubscriber(String topic, Subscriber subscriber){
if(subscribersTopicMap.containsKey(topic)){
Set<Subscriber> subscribers = subscribersTopicMap.get(topic);
subscribers.remove(subscriber);
subscribersTopicMap.put(topic, subscribers);
}
}
//Broadcast new messages added in queue to All subscribers of the topic. messagesQueue will be empty after broadcasting
public void broadcast(){
if(messagesQueue.isEmpty()){
System.out.println("No messages from publishers to display");
}else{
while(!messagesQueue.isEmpty()){
Message message = messagesQueue.remove();
String topic = message.getTopic();
Set<Subscriber> subscribersOfTopic = subscribersTopicMap.get(topic);
for(Subscriber subscriber : subscribersOfTopic){
//add broadcasted message to subscribers message queue
List<Message> subscriberMessages = subscriber.getSubscriberMessages();
subscriberMessages.add(message);
subscriber.setSubscriberMessages(subscriberMessages);
}
}
}
}
//Sends messages about a topic for subscriber at any point
public void getMessagesForSubscriberOfTopic(String topic, Subscriber subscriber) {
if(messagesQueue.isEmpty()){
System.out.println("No messages from publishers to display");
}else{
while(!messagesQueue.isEmpty()){
Message message = messagesQueue.remove();
if(message.getTopic().equalsIgnoreCase(topic)){
Set<Subscriber> subscribersOfTopic = subscribersTopicMap.get(topic);
for(Subscriber _subscriber : subscribersOfTopic){
if(_subscriber.equals(subscriber)){
//add broadcasted message to subscriber message queue
List<Message> subscriberMessages = subscriber.getSubscriberMessages();
subscriberMessages.add(message);
subscriber.setSubscriberMessages(subscriberMessages);
}
}
}
}
}
}
}
DriverClass è la classe main per eseguire e testare il pattern Publisher-Subscriber. Serve solo a scopi di demo poiché nella realtà avremo Publisher e Subscriber che faranno il loro “lavoro” attraverso chiamate API, ecc.
package pubsub;
import pubsub.publisher.Publisher;
import pubsub.publisher.PublisherImpl;
import pubsub.service.PubSubService;
import pubsub.subscriber.Subscriber;
import pubsub.subscriber.SubscriberImpl;
public class DriverClass {
public static void main(String[] args) {
//Instantiate publishers, subscribers and PubSubService
Publisher javaPublisher = new PublisherImpl();
Publisher pythonPublisher = new PublisherImpl();
Subscriber javaSubscriber = new SubscriberImpl();
Subscriber allLanguagesSubscriber = new SubscriberImpl();
Subscriber pythonSubscriber = new SubscriberImpl();
PubSubService pubSubService = new PubSubService();
//Declare Messages and Publish Messages to PubSubService
Message javaMsg1 = new Message("Java", "Core Java Concepts");
Message javaMsg2 = new Message("Java", "Spring MVC : Dependency Injection and AOP");
Message javaMsg3 = new Message("Java", "JPA & Hibernate");
javaPublisher.publish(javaMsg1, pubSubService);
javaPublisher.publish(javaMsg2, pubSubService);
javaPublisher.publish(javaMsg3, pubSubService);
Message pythonMsg1 = new Message("Python", "Easy and Powerful programming language");
Message pythonMsg2 = new Message("Python", "Advanced Python message");
pythonPublisher.publish(pythonMsg1, pubSubService);
pythonPublisher.publish(pythonMsg2, pubSubService);
//Declare Subscribers
javaSubscriber.addSubscriber("Java",pubSubService); //Java subscriber only subscribes to Java topics
pythonSubscriber.addSubscriber("Python",pubSubService); //Python subscriber only subscribes to Python topics
allLanguagesSubscriber.addSubscriber("Java", pubSubService); //all subscriber, subscribes to both Java and Python
allLanguagesSubscriber.addSubscriber("Python", pubSubService);
//Trying unSubscribing a subscriber
//pythonSubscriber.unSubscribe("Python", pubSubService);
//Broadcast message to all subscribers. After broadcast, messageQueue will be empty in PubSubService
pubSubService.broadcast();
//Print messages of each subscriber to see which messages they got
System.out.println("Messages of Java Subscriber are: ");
javaSubscriber.printMessages();
System.out.println("\nMessages of Python Subscriber are: ");
pythonSubscriber.printMessages();
System.out.println("\nMessages of All Languages Subscriber are: ");
allLanguagesSubscriber.printMessages();
//After broadcast the messagesQueue will be empty, so publishing new messages to server
System.out.println("\nPublishing 2 more Java Messages...");
Message javaMsg4 = new Message("Java", "JSP and Servlets");
Message javaMsg5 = new Message("Java", "Struts framework");
javaPublisher.publish(javaMsg4, pubSubService);
javaPublisher.publish(javaMsg5, pubSubService);
javaSubscriber.getMessagesForSubscriberOfTopic("Java", pubSubService);
System.out.println("\nMessages of Java Subscriber now are: ");
javaSubscriber.printMessages();
}
}
Di seguito l’output:
Messages of Java Subscriber are:
Message Topic -> Java : Core Java Concepts
Message Topic -> Java : Spring MVC : Dependency Injection and AOP
Message Topic -> Java : JPA & Hibernate
Messages of Python Subscriber are:
Message Topic -> Python : Easy and Powerful programming language
Message Topic -> Python : Advanced Python message
Messages of All Languages Subscriber are:
Message Topic -> Java : Core Java Concepts
Message Topic -> Java : Spring MVC : Dependency Injection and AOP
Message Topic -> Java : JPA & Hibernate
Message Topic -> Python : Easy and Powerful programming language
Message Topic -> Python : Advanced Python message
Publishing 2 more Java Messages...
Messages of Java Subscriber now are:
Message Topic -> Java : Core Java Concepts
Message Topic -> Java : Spring MVC : Dependency Injection and AOP
Message Topic -> Java : JPA & Hibernate
Message Topic -> Java : JSP and Servlets
Message Topic -> Java : Struts framework