Nell'ultimo articolo del tutorial in materia di Design Pattern abbiamo parlato di Singleton, il pattern della categoria dei creazionali. Oggi riprendiamo la rubrica approfondendo i Design Pattern Publish-Subscribe, dove:

  • i Publisher rappresentano entità che creano/pubblicano un messaggio su un argomento;
  • i Subscriber rappresentano entità che si iscrivono ai messaggi su un argomento.

In un modello Publish-Subscribe basato su argomenti, i Publisher taggano ogni messaggio con l'argomento invece di fare riferimento a specifici Subscriber. Il sistema di messaggistica invia quindi il messaggio a tutti i Subscriber che hanno chiesto di ricevere messaggi su quell'argomento. I Publisher si occupano solamente della creazione del messaggio originale e possono lasciare il compito di servire i Subscriber all'infrastruttura di messaggistica (qui che entra in gioco il modello).

ARCHITETTURA DEL MODELLO PUBLISH-SUBSCRIBE

 

Il modello Publish-Subscriber presenta 3 componenti fondamentali: PublisherSubscriber PubSub Service. In particolare, attraverso quest'ultimo il Publisher è in grado di inviare messaggi all'insaputa dei Subscriber.

I Subscriber riceveranno solo i messaggi per i quali sono registrati con PubSubService. Ad esempio:

supponiamo di avere tre argomenti diversi (tipi di messaggio) - A, B, C -, ma solo l'argomento A interessa il subscriber 1. Gli argomenti B e C sono di interesse per il Subscriber 2, mentre il Subscriber 3 desidera ricevere messaggi per gli argomenti A e C. Quindi il Subscriber 1 riceverà una notifica per l'argomento A, il Subscriber 2 riceverà una notifica per gli argomenti B e C e il Subscriber 3 riceverà una notifica per gli argomenti A e C dal servizio PubSub.

Quindi:

  • i Publisher contrassegnano ogni messaggio con un argomento e lo inviano a PubSubService che funge da intermediario tra publisher e destinatari;
  • il Subscriber si registra con PubSubService (intermediario) e dice di essere interessato ai messaggi relativi a un particolare argomento.

 

Il modello Publisher-Subscriber è un'architettura altamente accoppiata in cui i Publisher non sanno chi sono i Subscriber e i Subscriber non sanno chi sono i Publisher dell'argomento.

Sebbene il pattern Publisher-Subscriber sia molto simile al pattern Observer, ci sono 2 differenze principali:

  1. nel pattern Observer, gli Observer sono consapevoli degli Observable. Nel modello Publisher-Subscriber Publisher e Subscriber non hanno bisogno di conoscersi;
  2. il pattern Observer è per lo più implementato in modo sincrono, ovvero l’observable chiama il metodo appropriato di tutti i suoi Observer quando si verifica un evento. Il modello Publisher-Subscriber è implementato principalmente in modo asincrono (usando la coda dei messaggi).

UN ESEMPIO PRATICO

L’interfaccia Publisher definisce il metodo astratto publish() con cui invia i messaggi al servizio PubSub.
 

package pubsub.publisher;

import pubsub.Message;

import pubsub.service.PubSubService;

 

public interface Publisher {   

            //Publishes new message to PubSubService

            void publish(Message message, PubSubService pubSubService);

}

 

La classe PublisherImpl implementa l’interfaccia Publisher e implementa il metodo publish, che invia il messaggio al servizio PubSubService.

package pubsub.publisher;

import pubsub.Message;

import pubsub.service.PubSubService;

 

public class PublisherImpl implements Publisher {

            //Publishes new message to PubSubService

            public void publish(Message message, PubSubService pubSubService) {              

                        pubSubService.addMessageToQueue(message);

            }

}

 

Subscriber è una classe astratta che contiene:

  • addSubscriber() – Aggiunge/Registra i subscriber per un argomento attraverso il servizio PubSub.
  • unSubscribe() – Rimuove il subscriber da un argomento attraverso il servizio PubSub.
  • List<Message> subscriberMessages – Una lista di messaggi che memorizza i messaggi ricevuti dal Subscriber.
  • getMessagesForSubscriberOfTopic() – Un metodo che richiede i messaggi per un subscriber di un argomento.

 

Quindi:

 

package pubsub.subscriber;

 

import java.util.ArrayList;

import java.util.List;

import pubsub.Message;

import pubsub.service.PubSubService;

 

public abstract class Subscriber {     

            //store all messages received by the subscriber

            private List<Message> subscriberMessages = new ArrayList<Message>();

           

            public List<Message> getSubscriberMessages() {

                        return subscriberMessages;

            }

            public void setSubscriberMessages(List<Message> subscriberMessages) {

                        this.subscriberMessages = subscriberMessages;

            }

           

            //Add subscriber with PubSubService for a topic

            public abstract void addSubscriber(String topic, PubSubService pubSubService);

           

            //Unsubscribe subscriber with PubSubService for a topic

            public abstract void unSubscribe(String topic, PubSubService pubSubService);

           

            //Request specifically for messages related to topic from PubSubService

            public abstract void getMessagesForSubscriberOfTopic(String topic, PubSubService pubSubService);

           

            //Print all messages received by the subscriber

            public void printMessages(){

                        for(Message message : subscriberMessages){

                                   System.out.println("Message Topic -> "+ message.getTopic() + " : " + message.getPayload());

                        }

            }

}

 

La classe SubscriberImpl estende la classe Subscriber e implementa i suoi metodi astratti.

package pubsub.subscriber;

import pubsub.service.PubSubService;

 

public class SubscriberImpl extends Subscriber{

            //Add subscriber with PubSubService for a topic

            public void addSubscriber(String topic, PubSubService pubSubService){

                        pubSubService.addSubscriber(topic, this);

            }

           

            //Unsubscribe subscriber with PubSubService for a topic

            public void unSubscribe(String topic, PubSubService pubSubService){

                        pubSubService.removeSubscriber(topic, this);

            }

 

            //Request specifically for messages related to topic from PubSubService

            public void getMessagesForSubscriberOfTopic(String topic, PubSubService pubSubService) {

                        pubSubService.getMessagesForSubscriberOfTopic(topic, this);

                       

            }          

}

 

La classe Message è una semplice classe POJO per rappresentare i messaggi. Ha un attributo topic e un attributo per il contenuto del messaggio (payload).

package pubsub;

 

public class Message {          

            private String topic;

            private String payload;

           

            public Message(){}    

            public Message(String topic, String payload) {

                        this.topic = topic;

                        this.payload = payload;

            }

            public String getTopic() {

                        return topic;

            }

            public void setTopic(String topic) {

                        this.topic = topic;

            }

            public String getPayload() {

                        return payload;

            }

            public void setPayload(String payload) {

                        this.payload = payload;

            }

}

 

PubSubService è la classe principale e contiene:

  • Map<String, Set<Subscriber>> subscribersTopicMap – Memorizza i subscriber interessati ad un argomento.
  • Queue<Message> messageQueue – Una coda che memorizza i messaggi pubblicati dai publisher.
  • AddMessageToQueue() – Aggiunge un messaggio pubblicato da un publisher alla coda dei messaggi.
  • AddSubscriber () – Aggiunge un subscriber per un argomento.
  • removeSubscriber() – Rimuove un subscriber per un argomento.
  • broadcast() – Effettua il Broadcast dei nuovi messaggi aggiunti in coda a tutti i subscriber dell’argomento. La messagesQueue diventerà vuota dopo che sarà completato il broadcasting.
  • getMessagesForSubscriberOfTopic() – Invia i messaggi rispetto ad uno specifico argomento a tutti i suoi subscriber.

 

package pubsub.service;

import java.util.HashMap;

import java.util.HashSet;

import java.util.LinkedList;

import java.util.List;

import java.util.Map;

import java.util.Queue;

import java.util.Set;

import pubsub.Message;

import pubsub.subscriber.Subscriber;

 

public class PubSubService {

            //Keeps set of subscriber topic wise, using set to prevent duplicates

            Map<String, Set<Subscriber>> subscribersTopicMap = new HashMap<String, Set<Subscriber>>();

 

            //Holds messages published by publishers

            Queue<Message> messagesQueue = new LinkedList<Message>();

 

            //Adds message sent by publisher to queue

            public void addMessageToQueue(Message message){

                        messagesQueue.add(message);

            }

 

            //Add a new Subscriber for a topic

            public void addSubscriber(String topic, Subscriber subscriber){

 

                        if(subscribersTopicMap.containsKey(topic)){

                                   Set<Subscriber> subscribers = subscribersTopicMap.get(topic);

                                   subscribers.add(subscriber);

                                   subscribersTopicMap.put(topic, subscribers);

                        }else{

                                   Set<Subscriber> subscribers = new HashSet<Subscriber>();

                                   subscribers.add(subscriber);

                                   subscribersTopicMap.put(topic, subscribers);

                        }                     

            }

 

            //Remove an existing subscriber for a topic

            public void removeSubscriber(String topic, Subscriber subscriber){

 

                        if(subscribersTopicMap.containsKey(topic)){

                                   Set<Subscriber> subscribers = subscribersTopicMap.get(topic);

                                   subscribers.remove(subscriber);

                                   subscribersTopicMap.put(topic, subscribers);

                        }

            }

 

            //Broadcast new messages added in queue to All subscribers of the topic. messagesQueue will be empty after broadcasting

            public void broadcast(){

                        if(messagesQueue.isEmpty()){

                                   System.out.println("No messages from publishers to display");

                        }else{

                                   while(!messagesQueue.isEmpty()){

                                               Message message = messagesQueue.remove();

                                               String topic = message.getTopic();

 

                                               Set<Subscriber> subscribersOfTopic = subscribersTopicMap.get(topic);

 

                                               for(Subscriber subscriber : subscribersOfTopic){

                                                           //add broadcasted message to subscribers message queue

                                                           List<Message> subscriberMessages = subscriber.getSubscriberMessages();

                                                           subscriberMessages.add(message);

                                                           subscriber.setSubscriberMessages(subscriberMessages);

                                               }                                 

                                   }

                        }

            }

 

            //Sends messages about a topic for subscriber at any point

            public void getMessagesForSubscriberOfTopic(String topic, Subscriber subscriber) {

                        if(messagesQueue.isEmpty()){

                                   System.out.println("No messages from publishers to display");

                        }else{

                                   while(!messagesQueue.isEmpty()){

                                               Message message = messagesQueue.remove();

 

                                               if(message.getTopic().equalsIgnoreCase(topic)){

 

                                                           Set<Subscriber> subscribersOfTopic = subscribersTopicMap.get(topic);

 

                                                           for(Subscriber _subscriber : subscribersOfTopic){

                                                                       if(_subscriber.equals(subscriber)){

                                                                                   //add broadcasted message to subscriber message queue

                                                                                   List<Message> subscriberMessages = subscriber.getSubscriberMessages();

                                                                                   subscriberMessages.add(message);

                                                                                   subscriber.setSubscriberMessages(subscriberMessages);

                                                                       }

                                                           }

                                               }

                                    }

                        }

            }

 

}

DriverClass è la classe main per eseguire e testare il pattern Publisher-Subscriber. Serve solo a scopi di demo poiché nella realtà avremo Publisher e Subscriber che faranno il loro “lavoro” attraverso chiamate API, ecc.

package pubsub;

 

import pubsub.publisher.Publisher;

import pubsub.publisher.PublisherImpl;

import pubsub.service.PubSubService;

import pubsub.subscriber.Subscriber;

import pubsub.subscriber.SubscriberImpl;

 

public class DriverClass {

            public static void main(String[] args) {

                       

                        //Instantiate publishers, subscribers and PubSubService

                        Publisher javaPublisher = new PublisherImpl();

                        Publisher pythonPublisher = new PublisherImpl();

                       

                        Subscriber javaSubscriber = new SubscriberImpl();

                        Subscriber allLanguagesSubscriber = new SubscriberImpl();

                        Subscriber pythonSubscriber = new SubscriberImpl();

                       

                        PubSubService pubSubService = new PubSubService();

                       

                        //Declare Messages and Publish Messages to PubSubService

                        Message javaMsg1 = new Message("Java", "Core Java Concepts");

                        Message javaMsg2 = new Message("Java", "Spring MVC : Dependency Injection and AOP");

                        Message javaMsg3 = new Message("Java", "JPA & Hibernate");

                       

                        javaPublisher.publish(javaMsg1, pubSubService);

                        javaPublisher.publish(javaMsg2, pubSubService);

                        javaPublisher.publish(javaMsg3, pubSubService);

                       

                        Message pythonMsg1 = new Message("Python", "Easy and Powerful programming language");

                        Message pythonMsg2 = new Message("Python", "Advanced Python message");

                       

                        pythonPublisher.publish(pythonMsg1, pubSubService);

                        pythonPublisher.publish(pythonMsg2, pubSubService);

                       

                        //Declare Subscribers

                        javaSubscriber.addSubscriber("Java",pubSubService);                   //Java subscriber only subscribes to Java topics

                        pythonSubscriber.addSubscriber("Python",pubSubService);   //Python subscriber only subscribes to Python topics

                        allLanguagesSubscriber.addSubscriber("Java", pubSubService);    //all subscriber, subscribes to both Java and Python

                        allLanguagesSubscriber.addSubscriber("Python", pubSubService);

                       

                        //Trying unSubscribing a subscriber

                        //pythonSubscriber.unSubscribe("Python", pubSubService);

                       

                        //Broadcast message to all subscribers. After broadcast, messageQueue will be empty in PubSubService

                        pubSubService.broadcast();

                       

                        //Print messages of each subscriber to see which messages they got

                        System.out.println("Messages of Java Subscriber are: ");

                        javaSubscriber.printMessages();

                       

                        System.out.println("\nMessages of Python Subscriber are: ");

                        pythonSubscriber.printMessages();

                       

                        System.out.println("\nMessages of All Languages Subscriber are: ");

                        allLanguagesSubscriber.printMessages();

                       

                        //After broadcast the messagesQueue will be empty, so publishing new messages to server

                        System.out.println("\nPublishing 2 more Java Messages...");

                        Message javaMsg4 = new Message("Java", "JSP and Servlets");

                        Message javaMsg5 = new Message("Java", "Struts framework");

                       

                        javaPublisher.publish(javaMsg4, pubSubService);

                        javaPublisher.publish(javaMsg5, pubSubService);

                       

                        javaSubscriber.getMessagesForSubscriberOfTopic("Java", pubSubService);

                        System.out.println("\nMessages of Java Subscriber now are: ");

                        javaSubscriber.printMessages();                 

            }

}

 

Di seguito l’output:

Messages of Java Subscriber are:

Message Topic -> Java : Core Java Concepts

Message Topic -> Java : Spring MVC : Dependency Injection and AOP

Message Topic -> Java : JPA & Hibernate

 

Messages of Python Subscriber are:

Message Topic -> Python : Easy and Powerful programming language

Message Topic -> Python : Advanced Python message

 

Messages of All Languages Subscriber are:

Message Topic -> Java : Core Java Concepts

Message Topic -> Java : Spring MVC : Dependency Injection and AOP

Message Topic -> Java : JPA & Hibernate

Message Topic -> Python : Easy and Powerful programming language

Message Topic -> Python : Advanced Python message

 

Publishing 2 more Java Messages...

 

Messages of Java Subscriber now are:

Message Topic -> Java : Core Java Concepts

Message Topic -> Java : Spring MVC : Dependency Injection and AOP

Message Topic -> Java : JPA & Hibernate

Message Topic -> Java : JSP and Servlets

Message Topic -> Java : Struts framework

 

Articoli del tutorial