Building Real-Time Messaging Systems
Redis’s Pub/Sub system can construct real-time messaging systems, such as many developers use Pub/Sub to build real-time chat systems.
import redis.clients.jedis.*;
import java.util.Date;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.commons.lang3.RandomStringUtils;
class PrintListener extends JedisPubSub{
@Override
public void onMessage(String channel, String message) {
String time = DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss");
System.out.println("message receive:" + message + ",channel:" + channel +
"..." + time);
//Here we can unsubscribe
if(message.equalsIgnoreCase("quit")){
this.unsubscribe(channel);
}
}
}
class PubClient {
private Jedis jedis;
public PubClient(String host,int port){
jedis = new Jedis(host,port);
}
public void pub(String channel,String message){
jedis.publish(channel, message);
}
public void close(String channel){
jedis.publish(channel, "quit");
jedis.del(channel);//Real-time messaging system
}
}
class SubClient {
private Jedis jedis;//
public SubClient(String host,int port){
jedis = new Jedis(host,port);
}
public void sub(JedisPubSub listener,String channel){
jedis.subscribe(listener, channel);
//This place will be blocked, at the client code level for JedisPubSub in handling messages, will "monoport" link
//and the way of while loop is taken, listening to the subscribed messages
}
}
public class PubSubTest {
/**
* @param args
*/
static String host = "127.0.0.1";
static int port = 10011;
public static void main(String[] args) throws Exception{
PubClient pubClient = new PubClient(host,port);
final String channel = "pubsub-channel";
pubClient.pub(channel, "before1");
pubClient.pub(channel, "before2");
Thread.sleep(2000);
//The message subscriber is very special and needs to monopolize the connection, so we need to create a new connection for it;
//in addition, the implementation of the client also ensures the "link monoport" feature, the sub method will keep blocking,
//until the listener.unsubscribe method is called
Thread subThread = new Thread(new Runnable() {
@Override
public void run() {
try{
SubClient subClient = new SubClient(host,port);
System.out.println("----------subscribe operation begin-------");
JedisPubSub listener = new PrintListener();
//At the API level, this is a polling operation, and it will return only when unsubscribe is called
subClient.sub(listener, channel);
System.out.println("----------subscribe operation end-------")
;
}catch(Exception e){
e.printStackTrace();
}
}
});
subThread.start();
int i=0;
while(i < 10){
String message = RandomStringUtils.random(6, true, true);//apache-commons
pubClient.pub(channel, message);
i++;
Thread.sleep(1000);
}
//Passive closing instruction, if the publisher in the channel confirms that the channel needs to be closed, then send a "quit"
//then when the listener.onMessage() receives a "quit", other subscription clients will execute the "unsubscribe" operation.
pubClient.close(channel);
//Additionally, you can unsubscribe like this
//listener.unsubscribe(channel);
}
}
Output:
----------subscribe operation begin-------
message receive:erRIEe,channel:pubsub-channel...2016-03-15 15:53:52
message receive:Ovcwiw,channel:pubsub-channel...2016-03-15 15:53:53
message receive:STPWfV,channel:pubsub-channel...2016-03-15 15:53:54
message receive:SR4iIk,channel:pubsub-channel...2016-03-15 15:53:55
message receive:GI3Ege,channel:pubsub-channel...2016-03-15 15:53:56
message receive:0V1JUt,channel:pubsub-channel...2016-03-15 15:53:57
message receive:3iU8BV,channel:pubsub-channel...2016-03-15 15:53:58
message receive:BqeI2x,channel:pubsub-channel...2016-03-15 15:53:59
message receive:D53cHF,channel:pubsub-channel...2016-03-15 15:54:00
message receive:quit,channel:pubsub-channel...2016-03-15 15:54:01
----------subscribe operation end-------