The design behind Zookeeper is intriguing. If you do not
think so, ask yourself, if you were to design a library that does inter-process
locking, what would you do? You probably want to expose features such as lock,
unlock right? Not Zookeeper. Instead, it provides file-system like APIs that enables
users to develop their own inter-process coordination.
This gives Zookeeper a lot of flexibility.
But using Zookeeper APIs is difficult (e.g. you have to manage connections, manage errors etc), so comes Curator which makes working with Zookeeper much easier.
This gives Zookeeper a lot of flexibility.
But using Zookeeper APIs is difficult (e.g. you have to manage connections, manage errors etc), so comes Curator which makes working with Zookeeper much easier.
Zookeeper and Curator provide recipes for some common
inter-process coordination tasks.
I use Zookeeper in my container farm for service registry
and service discovery:
Containers that run ElasticSearch will register themselves
to Zookeeper. Containers that run my app also run Logstash, which will not
start until it finds ElasticSearch service is registered. Logstash will analyze logs and send logs to
ElasticSearch, and upon fatal errors, it will email the admin. My apps will also register themselves with
Zookeeper.
Install zookeeper
To try the following code, first download zookeeper from http://www.apache.org/dyn/closer.cgi/zookeeper/,
unzip it. Copy data/zoo_sample.cfg
to data/zoo.cfg, edit zoo.cfg, and change dataDir to a folder that exists.
Start zookeeper server:
./zkServer.sh start
And now start zookeeper client, you can use this client to
view registered services:
./zkCli.sh
Service registering recipe
The following is the service registering application:
public class ServiceRegister {
@Parameter(names = { "-service" }, description = "service
ids")
private String serviceId;
@Parameter(names = { "-h" }, description = "help")
private boolean help;
@Parameter(names = { "-zkServer" }, description = "the ip address
of the zookeeper server")
private String zkServer;
@Parameter(names = { "-zkPort" }, description = "the port of
the zookeeper server")
private String zkPort;
private static volatile boolean keepRunning = true;
private
ServiceDiscovery<Void> disoveryBuilder;
public static void main(String[] args) throws Exception {
final ServiceRegister
register = new ServiceRegister();
new JCommander(register,
args);
register.registerService();
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
try {
register.unregisterService();
}
catch (Exception e) {
e.printStackTrace();
}
keepRunning = false;
}
});
while (keepRunning) {
Thread.sleep(50000000);
}
}
private void registerService() throws Exception {
CuratorFramework
curatorFramework = CuratorFrameworkFactory.newClient(zkServer + ":"+ zkPort, new RetryNTimes(5,
1000));
curatorFramework.start();
disoveryBuilder = ServiceDiscoveryBuilder.<Void> builder(Void.class)
.basePath("mycontainerfarm").client(curatorFramework).build();
String[]
ids = serviceId.split(",");
for (String id : ids) {
if (Strings.isNullOrEmpty(id))
{
continue;
}
ServiceInstance<Void>
serviceInstance = ServiceInstance.<Void> builder().name("app").id(id).build();
disoveryBuilder.registerService(serviceInstance);
}
}
private void unregisterService() throws Exception {
String[]
ids = serviceId.split(",");
for (String id : ids) {
if (Strings.isNullOrEmpty(id))
{
continue;
}
ServiceInstance<Void>
serviceInstance = ServiceInstance.<Void> builder().name("app") .id(id).build();
disoveryBuilder.unregisterService(serviceInstance);
}
}
}
Run this application with arguments:
ServiceRegister
-zkServer 192.168.33.16 -zkPort 2181 -service myapp1
Use the zkCli,
you will see myapp1 is
registered at /mycontainerfarm/app/myapp1:
Unregistering is not necessary, because the node/mycontainerfarm/app/myapp1is an
ephemeral one, when ServiceRegister exits,
zookeeper server will automatically remove the node after a period of time.
In my case, I chose for ServiceRegister to unregister explicitly
in its shutdown hook, so that when it exits, the node /mycontainerfarm/app/myapp1 immediately disappears.
Inter-process locking recipe
There is one problem with this recipe, the argument -service is the service id, which is
unique. Curator’s logic is that if it
finds the same node already exists, it will delete it, and recreate it.
You can
verify this by checking the ctime
and mtime of get /mycontainerfarm/app/myapp1.
This doesn’t fit with my requirement: I only want to register the same service
if it doesn’t exist.
We can use another recipe: inter-process locking.
for (String id : ids) {
if (Strings.isNullOrEmpty(id))
{
continue;
}
InterProcessMutex lock = new
InterProcessMutex(curatorFramework,
"/mycontainerfarm/myapp" + serviceId);
boolean acquired = lock.acquire(5, TimeUnit.SECONDS);
if (acquired) {
ServiceInstance<Void>
serviceInstance = ServiceInstance.<Void> builder()
.name("app").id(id).build();
disoveryBuilder.registerService(serviceInstance);
}
else {
System.out.println("fail to
register " + serviceId);
}
}
Service discovery recipe
public class ServiceDiscover {
@Parameter(names = { "-zkServer" }, description = "the ip address
of the zookeeper server")
private String zkServer;
@Parameter(names = { "-zkPort" }, description = "the port of
the zookeeper server")
private String zkPort;
public static void main(String[] args) throws Exception {
final ServiceDiscover
discover = new ServiceDiscover();
new JCommander(discover,
args);
discover.discover();
}
private void discover() throws Exception {
CuratorFramework client =
CuratorFrameworkFactory.newClient(zkServer + ":" + zkPort, new RetryNTimes(5,
1000));
client.start();
final ServiceDiscovery<Void> serviceDiscovery =
ServiceDiscoveryBuilder
.<Void> builder(Void.class).basePath("mycontainerfarm").client(client).build();
serviceDiscovery.start();
ServiceProvider<Void>
serviceProvider = serviceDiscovery.serviceProviderBuilder()
.serviceName("app").build();
serviceProvider.start();
List<ServiceInstance<Void>>
allInstances = Lists.newArrayList(serviceProvider
.getAllInstances());
for
(ServiceInstance<Void> instance : allInstances) {
System.out.println(instance.getId());
}
}
}