博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
vaniglia 源码学习(五)
阅读量:6816 次
发布时间:2019-06-26

本文共 4129 字,大约阅读时间需要 13 分钟。

vaniglia message quene特性主要实现一个消息队列,功能如下:

public class Main {    private static final Logger logger = Logger.getLogger(Main.class);    public static void main(String[] args) throws MessageQueueException {        MessageQueue queue = MessageQueue.getQueue("MyQueue", MessageStorageType.MemoryType,                MessageStorageParameters.createMemoryStorageParameters("MyStorage", 1000));        queue.start();        queue.subscribe(new MyQueueListener());        MyQueuePublisher myQueuePublisher = new MyQueuePublisher(queue);        Thread publishingThread = new Thread(myQueuePublisher);        publishingThread.setDaemon(true);        logger.info("Starting the publisher.");        publishingThread.start();        queue.stop();    }}

MessageQueue 通过getQueue产生一个消息队列,参数MessageStorageType设定消息的存储类型(内存、文件、数据库等),MessageStorageParameters存储存储类型的参数。

其中MyQueueListner为消息监听器(用户继承接口实现),有消息在队列中时触发处理。myQueuePublisher是消息写入线程,通过调用MessageQueue的push接口向队列中填入消息。主要的类图有两部分:

一部分是MessageStorageType:

 

二.消息队列的业务

在MesssageQueue中启动Timer线程,调用MessageQueueProcessingTask,如下:

 

public void start(long interval) throws MessageQueueException {        if (timerThread == null) {            logger.info("Starting queue "+name+" with processing interval = "+interval+"ms");            timerThread = new Timer(true);            processingTask = new MessageQueueProcessingTask(this);            timerThread.scheduleAtFixedRate(processingTask, interval, interval);        }        else {            throw new MessageQueueException("Queue "+name+" already running.");        }    }

 

 

 

比较值得学习的代码有:

public abstract class MessageStorageParameters {    public static class MemoryStorageParameters extends MessageStorageParameters {        private long size;        public MemoryStorageParameters(String name, long size) {            super(name);            this.size = size;        }        public long getSize() {            return size;        }    }    public static class FileSystemStorageParameters extends MessageStorageParameters {        private String basedir;        public FileSystemStorageParameters(String name) {            super(name);        }        public FileSystemStorageParameters(String name, String basedir) {            super(name);            this.basedir = basedir;        }        public String getBasedir() {            return basedir;        }    }    public static class JDBCStorageParameters extends MessageStorageParameters {        public JDBCStorageParameters(String name) {            super(name);        }    }    /**     * Creates a Memory Storage Parameters given its name and size limit.     *     * @param name the storage name     * @param size the storage size limit or -1 to avoid size limit checking.     *     * @return the newly created MemoryStorageParameters object     */    public static MessageStorageParameters createMemoryStorageParameters(String name, long size) {        return new MemoryStorageParameters(name, size);    }    /**     * Creates a FileSystem Storage Paramters given its name and basedir.     *     * @param name the storage name     * @param basedir the storage basedir     *     * @return the newly create FileSystemStorageParamters object     */    public static MessageStorageParameters createFileSystemStorageParameters(String name, String basedir) {        return new FileSystemStorageParameters(name, basedir);    }    /**     * Creates a FileSystem Storage Paramters given its name.     *     * @param name the storage name     *     * @return the newly create FileSystemStorageParamters object     */    public static MessageStorageParameters createFileSystemStorageParameters(String name) {        return new FileSystemStorageParameters(name);    }    public static MessageStorageParameters createJDBCStorageParameters(String name) {        return new JDBCStorageParameters(name);    }    protected String name;    protected MessageStorageParameters(String name) {        this.name = name;    }    public String getName() {        return name;    }    public void setName(String name) {        this.name = name;    }}

 

转载于:https://www.cnblogs.com/Fredric-2013/archive/2013/04/03/2997659.html

你可能感兴趣的文章
CISCO引擎RPR SSO
查看>>
LINUX APACHE 安装测试
查看>>
Java导致登录UCS Manager异常
查看>>
HTTP协议
查看>>
Win10怎么改Host文件?Win10编辑host文件方法(无视权限)
查看>>
sql convert and cast
查看>>
我的NodeJS一年之旅总结
查看>>
MyBatis-3.4.2-源码分析6:解析XML之objectWrapperFactoryElement & reflectorFactoryElement
查看>>
javascript与获取鼠标位置有关的属性
查看>>
Oracle database 11.2.0.3.0 升级至 11.2.0.3.14
查看>>
heartbeat理论介绍
查看>>
简单实现MVC模式
查看>>
什么版本的Maven与Java 6兼容?
查看>>
CCNA第3次课程
查看>>
Gson详解:Java对象与JSON相互转换的利器
查看>>
U-mail邮件系统又一getshell
查看>>
Spring Boot 入门
查看>>
路由交换调试(CCNA)零基础到专家 二
查看>>
我的友情链接
查看>>
.Net组件程序设计之序列化
查看>>