设为首页 加入收藏

TOP

基于 ZooKeeper 的分布式锁和队列(七)
2017-10-22 06:06:45 】 浏览:777
Tags:基于 ZooKeeper 分布式 队列
}

这个类是本客户端获取到lock和释放lock的时候触发操作的接口:

public interface LockListener {
    /**
     * call back called when the lock 
     * is acquired
     */
    public void lockAcquired();
    
    /**
     * call back called when the lock is 
     * released.
     */
    public void lockReleased();
}

队列(Queue)

分布式队列是通用的数据结构,为了在 Zookeeper 中实现分布式队列,首先需要指定一个 Znode 节点作为队列节点(queue node), 各个分布式客户端通过调用 create() 函数向队列中放入数据,调用create()时节点路径名带”qn-”结尾,并设置顺序(sequence)节点标志。 由于设置了节点的顺序标志,新的路径名具有以下字符串模式:”_path-to-queue-node_/qn-X”,X 是唯一自增号。需要从队列中获取数据/移除数据的客户端首先调用 getChildren() 函数,有数据则获取(获取数据后可以删除也可以不删),没有则在队列节点(queue node)上将 watch 设置为 true,等待触发并处理最小序号的节点(即从序号最小的节点中取数据)。

实现步骤基本如下:

前提:需要一个队列root节点dir

入队:使用create()创建节点,将共享数据data放在该节点上,节点类型为PERSISTENT_SEQUENTIAL,永久顺序性的(也可以设置为临时的,看需求)。

出队:因为队列可能为空,2种方式处理:一种如果为空则wait等待,一种返回异常。

等待方式:这里使用了CountDownLatch的等待和Watcher的通知机制,使用了TreeMap的排序获取节点顺序最小的数据(FIFO)。

抛出异常:getChildren()获取队列数据时,如果size==0则抛出异常。

一个分布式Queue的实现,详细代码:

package org.apache.zookeeper.recipes.queue;

import java.util.List;
import java.util.NoSuchElementException;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;

/**
 * 
 * A <a href="package.html">protocol to implement a distributed queue</a>.
 * 
 */
public class DistributedQueue {
    private static final Logger LOG = LoggerFactory.getLogger(DistributedQueue.class);

    private final String dir;

    private ZooKeeper zookeeper;
    private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;

    private final String prefix = "qn-";

    public DistributedQueue(ZooKeeper zookeeper, String dir, List<ACL> acl){
        this.dir = dir;

        if(acl != null){
            this.acl = acl;
        }
        this.zookeeper = zookeeper;
        
        //Add root dir first if not exists
        if (zookeeper != null) {
            try {
                Stat s = zookeeper.exists(dir, false);
                if (s == null) {
                    zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);
                }
            } catch (KeeperException e) {
                LOG.error(e.toString());
            } catch (InterruptedException e) {
                LOG.error(e.toString());
            }
        }
    }

    /**
     * Returns a Map of the children, ordered by id.
     * @param watcher optional watcher on getChildren() operation.
     * @return map from id to child name for all children
     */
    private TreeMap<Long,String> orderedChildren(Watcher watcher) throws KeeperException, InterruptedException {
        TreeMap<Long,String> orderedChildren = new TreeMap<Long,String>();

        List<String> childNames = null;
        try{
            childNames = zookeeper.getChildren(dir, watcher);
        }catch (KeeperException.NoNodeException e){
            throw e;
        }

        for(String childName : childNames){
            try{
                //Check format
                if(!childName.regionMatches(0, prefix, 0, prefix.length())){
                    LOG.warn("Found child node with improper name: " + childName);
                    continue;
                }
                String suffix = childName.substring(prefix.length());
                Long childId = new Long(suffix);
                order
首页 上一页 4 5 6 7 8 9 下一页 尾页 7/9/9
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇就是要你懂 Java 中 volatile 关.. 下一篇基于 Spring Boot 的天气预报服务

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目