萬盛學電腦網

 萬盛學電腦網 >> 數據庫 >> mysql教程 >> zookeeper跨集群數據拷貝的例子

zookeeper跨集群數據拷貝的例子

ZooKeeper是一個分布式的,開放源碼的分布式應用程序協調服務,是Google的Chubby一個開源的實現,是Hadoop和Hbase的重要組件。它是一個為分布式應用提供一致性服務的軟件,提供的功能包括:配置維護、名字服務、分布式同步、組服務等。

有這麼一個場景,多套測試的zookeeper集群之間數據拷貝,按之前的理解,如果可以的話,直接拷貝zk的data文件可以解決問題。
但是最近碰到這麼一個事情,只是無意中刪除了一個集群某一個路徑下的數據,由於data數據並不可讀,不能有選擇的copy。所以,在這場景下,只能通過代碼的方式解決了。
 
我所用到的2個場景:
1. 誤刪除數據,從其它zookeeper集群拷貝
2. 搭建測試環境,直接從線上導入部分節點數據到本地
 
zookeeper管理node,基本和我們的文件系統一致,這個需求就變得非常簡單,可以直接轉換為遞歸一個路徑,然後在新的集群下創建。簡單的實現如下:
 
import com.metaboy.common.zk.dao.ZkDaoImpl;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.CuratorFrameworkFactory;
import com.netflix.curator.retry.RetryNTimes;
 
import java.util.List;
 
/**
 * @author yuxiong.wangy
 *         Date: 14-8-14
 *         Time: 下午7:33
 */
public class ZookeeperDemo {
    protected static CuratorFramework client_src;
    protected static CuratorFramework client_dst;
    protected static String  namespace_src;
    protected static String  namespace_dst;
    protected static String zkRoot_src = “/app”;
 
    public static void main(String[] args) throws InterruptedException {
        String zkConnectionStr_src = “*.*.*.*:2181,*.*.*.*:2181,*.*.*.*:2181”;
        client_src = getZKClient(zkConnectionStr_src,namespace_src,60000);
        String zkConnectionStr_dst = “*.*.*.*:2181,*.*.*.*:2181,*.*.*.*:2181”;
        client_dst = getZKClient(zkConnectionStr_dst,namespace_dst,60000);
 
        copyDataRecursion(zkRoot_src);
    }
 
    /*
     *       遞歸拷貝數據
     */
    public static void  copyDataRecursion(String parent){
        List<String> groups = ZkDaoImpl.getChildren(client_src, parent);
        if(groups.size() > 0){
            for(String group: groups){
                String path = parent+”/”+group;
 
                if(ZkDaoImpl.getData(client_src, path) != null){
                    ZkDaoImpl.createPersistentFile(client_dst, path, ZkDaoImpl.getData(client_src, path));
                    System.out.println(“[” + path + “]:” + ZkDaoImpl.getData(client_src,path));
                }else{
                    ZkDaoImpl.createPersistentFile(client_dst, path);
                    System.out.println(“[” + path + “]:”);
                }
 
                if(ZkDaoImpl.getChildren(client_src,path).size() > 0){
                    copyDataRecursion(path);
                }
            }
        }else {
            ZkDaoImpl.createPersistentFile(client_dst,parent,ZkDaoImpl.getData(client_src,parent));
        }
    }
 
    public static CuratorFramework getZKClient(String zkConnectionStr, String namespace, int sessionTimeout) throws InterruptedException {
        int connectTimeout = 60000;
        int retry = 3;
        int retryTimeout = 10000;
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString(zkConnectionStr)
                .retryPolicy(new RetryNTimes(retry, retryTimeout)).connectionTimeoutMs(connectTimeout)
                .sessionTimeoutMs(sessionTimeout).namespace(namespace).build();
        client.start();
        client.getZookeeperClient().blockUntilConnectedOrTimedOut();
        return client;
    }
}
 
ZkDaoImpl封裝一個對ZK的操作的集合,類似下面這種,將原生的方法包裝了下,使用上是差不多:
    /**
     * 獲取文件數據
     *
     * @param path
     *            文件路徑
     * @return 文件內容
     */
    public static String getData(CuratorFramework client, String path) {
        try {
            return new String(client.getData().forPath(path));
        } catch (Exception e) {
            throw new ZkException(ZkErrors.GET_DATA_EXCEPTION, “path:” + path, e);
        }
    }

copyright © 萬盛學電腦網 all rights reserved