有這麼一個場景,多套測試的zookeeper集群之間數據拷貝,按之前的理解,如果可以的話,直接拷貝zk的data文件可以解決問題。
但是最近碰到這麼一個事情,只是無意中刪除了一個集群某一個路徑下的數據,由於data數據並不可讀,不能有選擇的copy。所以,在這場景下,只能通過代碼的方式解決了。
我所用到的2個場景:
1. 誤刪除數據,從其它zookeeper集群拷貝
2. 搭建測試環境,直接從線上導入部分節點數據到本地
zookeeper管理node,基本和我們的文件系統一致,這個需求就變得非常簡單,可以直接轉換為遞歸一個路徑,然後在新的集群下創建。簡單的實現如下:
import com.metaboy.common.zk.dao.ZkDaoImpl;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.CuratorFrameworkFactory;
import com.netflix.curator.retry.RetryNTimes;
import java.util.List;
/**
* @author yuxiong.wangy
* Date: 14-8-14
* Time: 下午7:33
*/
public class ZookeeperDemo {
protected static CuratorFramework client_src;
protected static CuratorFramework client_dst;
protected static String namespace_src;
protected static String namespace_dst;
protected static String zkRoot_src = “/app”;
public static void main(String[] args) throws InterruptedException {
String zkConnectionStr_src = “*.*.*.*:2181,*.*.*.*:2181,*.*.*.*:2181”;
client_src = getZKClient(zkConnectionStr_src,namespace_src,60000);
String zkConnectionStr_dst = “*.*.*.*:2181,*.*.*.*:2181,*.*.*.*:2181”;
client_dst = getZKClient(zkConnectionStr_dst,namespace_dst,60000);
copyDataRecursion(zkRoot_src);
}
/*
* 遞歸拷貝數據
*/
public static void copyDataRecursion(String parent){
List<String> groups = ZkDaoImpl.getChildren(client_src, parent);
if(groups.size() > 0){
for(String group: groups){
String path = parent+”/”+group;
if(ZkDaoImpl.getData(client_src, path) != null){
ZkDaoImpl.createPersistentFile(client_dst, path, ZkDaoImpl.getData(client_src, path));
System.out.println(“[” + path + “]:” + ZkDaoImpl.getData(client_src,path));
}else{
ZkDaoImpl.createPersistentFile(client_dst, path);
System.out.println(“[” + path + “]:”);
}
if(ZkDaoImpl.getChildren(client_src,path).size() > 0){
copyDataRecursion(path);
}
}
}else {
ZkDaoImpl.createPersistentFile(client_dst,parent,ZkDaoImpl.getData(client_src,parent));
}
}
public static CuratorFramework getZKClient(String zkConnectionStr, String namespace, int sessionTimeout) throws InterruptedException {
int connectTimeout = 60000;
int retry = 3;
int retryTimeout = 10000;
CuratorFramework client = CuratorFrameworkFactory.builder().connectString(zkConnectionStr)
.retryPolicy(new RetryNTimes(retry, retryTimeout)).connectionTimeoutMs(connectTimeout)
.sessionTimeoutMs(sessionTimeout).namespace(namespace).build();
client.start();
client.getZookeeperClient().blockUntilConnectedOrTimedOut();
return client;
}
}
ZkDaoImpl封裝一個對ZK的操作的集合,類似下面這種,將原生的方法包裝了下,使用上是差不多:
/**
* 獲取文件數據
*
* @param path
* 文件路徑
* @return 文件內容
*/
public static String getData(CuratorFramework client, String path) {
try {
return new String(client.getData().forPath(path));
} catch (Exception e) {
throw new ZkException(ZkErrors.GET_DATA_EXCEPTION, “path:” + path, e);
}
}