vaniglia message quene特性主要实现一个消息队列,功能如下:
public class Main { private static final Logger logger = Logger.getLogger(Main.class); public static void main(String[] args) throws MessageQueueException { MessageQueue queue = MessageQueue.getQueue("MyQueue", MessageStorageType.MemoryType, MessageStorageParameters.createMemoryStorageParameters("MyStorage", 1000)); queue.start(); queue.subscribe(new MyQueueListener()); MyQueuePublisher myQueuePublisher = new MyQueuePublisher(queue); Thread publishingThread = new Thread(myQueuePublisher); publishingThread.setDaemon(true); logger.info("Starting the publisher."); publishingThread.start(); queue.stop(); }}
MessageQueue 通过getQueue产生一个消息队列,参数MessageStorageType设定消息的存储类型(内存、文件、数据库等),MessageStorageParameters存储存储类型的参数。
其中MyQueueListner为消息监听器(用户继承接口实现),有消息在队列中时触发处理。myQueuePublisher是消息写入线程,通过调用MessageQueue的push接口向队列中填入消息。主要的类图有两部分:
一部分是MessageStorageType:
二.消息队列的业务
在MesssageQueue中启动Timer线程,调用MessageQueueProcessingTask,如下:
public void start(long interval) throws MessageQueueException { if (timerThread == null) { logger.info("Starting queue "+name+" with processing interval = "+interval+"ms"); timerThread = new Timer(true); processingTask = new MessageQueueProcessingTask(this); timerThread.scheduleAtFixedRate(processingTask, interval, interval); } else { throw new MessageQueueException("Queue "+name+" already running."); } }
比较值得学习的代码有:
public abstract class MessageStorageParameters { public static class MemoryStorageParameters extends MessageStorageParameters { private long size; public MemoryStorageParameters(String name, long size) { super(name); this.size = size; } public long getSize() { return size; } } public static class FileSystemStorageParameters extends MessageStorageParameters { private String basedir; public FileSystemStorageParameters(String name) { super(name); } public FileSystemStorageParameters(String name, String basedir) { super(name); this.basedir = basedir; } public String getBasedir() { return basedir; } } public static class JDBCStorageParameters extends MessageStorageParameters { public JDBCStorageParameters(String name) { super(name); } } /** * Creates a Memory Storage Parameters given its name and size limit. * * @param name the storage name * @param size the storage size limit or -1 to avoid size limit checking. * * @return the newly created MemoryStorageParameters object */ public static MessageStorageParameters createMemoryStorageParameters(String name, long size) { return new MemoryStorageParameters(name, size); } /** * Creates a FileSystem Storage Paramters given its name and basedir. * * @param name the storage name * @param basedir the storage basedir * * @return the newly create FileSystemStorageParamters object */ public static MessageStorageParameters createFileSystemStorageParameters(String name, String basedir) { return new FileSystemStorageParameters(name, basedir); } /** * Creates a FileSystem Storage Paramters given its name. * * @param name the storage name * * @return the newly create FileSystemStorageParamters object */ public static MessageStorageParameters createFileSystemStorageParameters(String name) { return new FileSystemStorageParameters(name); } public static MessageStorageParameters createJDBCStorageParameters(String name) { return new JDBCStorageParameters(name); } protected String name; protected MessageStorageParameters(String name) { this.name = name; } public String getName() { return name; } public void setName(String name) { this.name = name; }}