Friday, February 27, 2015

Service Discovery with Zookeeper



The design behind Zookeeper is intriguing. If you do not think so, ask yourself, if you were to design a library that does inter-process locking, what would you do? You probably want to expose features such as lock, unlock right? Not Zookeeper. Instead, it provides file-system like APIs that enables users to develop their own inter-process coordination.

This gives Zookeeper a lot of flexibility.

But using Zookeeper APIs is difficult (e.g. you have to manage connections, manage errors etc), so comes Curator which makes working with Zookeeper much easier. 

Zookeeper and Curator provide recipes for some common inter-process coordination tasks. 

I use Zookeeper in my container farm for service registry and service discovery:


Containers that run ElasticSearch will register themselves to Zookeeper. Containers that run my app also run Logstash, which will not start until it finds ElasticSearch service is registered. Logstash will analyze logs and send logs to ElasticSearch, and upon fatal errors, it will email the admin.  My apps will also register themselves with Zookeeper.

 

Install zookeeper

To try the following code, first download zookeeper from http://www.apache.org/dyn/closer.cgi/zookeeper/, unzip it. Copy data/zoo_sample.cfg to data/zoo.cfg, edit zoo.cfg, and change dataDir to a folder that exists. 

Start zookeeper server:
./zkServer.sh start

And now start zookeeper client, you can use this client to view registered services:
./zkCli.sh

 

Service registering recipe

The following is the service registering application:
public class ServiceRegister {

       @Parameter(names = { "-service" }, description = "service ids")
       private String serviceId;

       @Parameter(names = { "-h" }, description = "help")
       private boolean help;

       @Parameter(names = { "-zkServer" }, description = "the ip address of the zookeeper server")
       private String zkServer;

       @Parameter(names = { "-zkPort" }, description = "the port of the zookeeper server")
       private String zkPort;

       private static volatile boolean keepRunning = true;

       private ServiceDiscovery<Void> disoveryBuilder;

       public static void main(String[] args) throws Exception {
              final ServiceRegister register = new ServiceRegister();
              new JCommander(register, args);

              register.registerService();

              Runtime.getRuntime().addShutdownHook(new Thread() {
                      public void run() {
                             try {
                                    register.unregisterService();
                             } catch (Exception e) {
                                    e.printStackTrace();
                             }
                             keepRunning = false;

                      }
              });

              while (keepRunning) {
                      Thread.sleep(50000000);
              }
       }

       private void registerService() throws Exception {

              CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(zkServer + ":"+ zkPort, new RetryNTimes(5, 1000));
              curatorFramework.start();

              disoveryBuilder = ServiceDiscoveryBuilder.<Void> builder(Void.class)
                             .basePath("mycontainerfarm").client(curatorFramework).build();

              String[] ids = serviceId.split(",");
              for (String id : ids) {
                      if (Strings.isNullOrEmpty(id)) {
                             continue;
                      }

                      ServiceInstance<Void> serviceInstance = ServiceInstance.<Void> builder().name("app").id(id).build();

                      disoveryBuilder.registerService(serviceInstance);
              }

       }

       private void unregisterService() throws Exception {
              String[] ids = serviceId.split(",");
              for (String id : ids) {
                      if (Strings.isNullOrEmpty(id)) {
                             continue;
                      }

                      ServiceInstance<Void> serviceInstance = ServiceInstance.<Void> builder().name("app")      .id(id).build();

                      disoveryBuilder.unregisterService(serviceInstance);
              }
       }
}
Run this application with arguments:
ServiceRegister -zkServer 192.168.33.16 -zkPort 2181 -service myapp1

Use the zkCli, you will see myapp1 is registered at /mycontainerfarm/app/myapp1:

Unregistering is not necessary, because the node/mycontainerfarm/app/myapp1is an ephemeral one, when ServiceRegister exits, zookeeper server will automatically remove the node after a period of time.

In my case, I chose for ServiceRegister to unregister explicitly in its shutdown hook, so that when it exits, the node /mycontainerfarm/app/myapp1 immediately disappears.

Inter-process locking recipe

There is one problem with this recipe, the argument -service is the service id, which is unique.  Curator’s logic is that if it finds the same node already exists, it will delete it, and recreate it. 

You can verify this by checking the ctime and mtime of get /mycontainerfarm/app/myapp1. This doesn’t fit with my requirement: I only want to register the same service if it doesn’t exist. 

We can use another recipe: inter-process locking.

for (String id : ids) {
              if (Strings.isNullOrEmpty(id)) {
                      continue;
              }

              InterProcessMutex lock = new InterProcessMutex(curatorFramework,
                             "/mycontainerfarm/myapp" + serviceId);
              boolean acquired = lock.acquire(5, TimeUnit.SECONDS);
              if (acquired) {
                      ServiceInstance<Void> serviceInstance = ServiceInstance.<Void> builder()
                                    .name("app").id(id).build();

                      disoveryBuilder.registerService(serviceInstance);
              } else {
                      System.out.println("fail to register " + serviceId);
       }
}

 

Service discovery recipe

public class ServiceDiscover {
       @Parameter(names = { "-zkServer" }, description = "the ip address of the zookeeper server")
       private String zkServer;

       @Parameter(names = { "-zkPort" }, description = "the port of the zookeeper server")
       private String zkPort;

       public static void main(String[] args) throws Exception {

              final ServiceDiscover discover = new ServiceDiscover();
              new JCommander(discover, args);
              discover.discover();

       }

       private void discover() throws Exception {
              CuratorFramework client = CuratorFrameworkFactory.newClient(zkServer + ":" + zkPort,       new RetryNTimes(5, 1000));
              client.start();

              final ServiceDiscovery<Void> serviceDiscovery = ServiceDiscoveryBuilder
                  .<Void> builder(Void.class).basePath("mycontainerfarm").client(client).build();
              serviceDiscovery.start();

              ServiceProvider<Void> serviceProvider = serviceDiscovery.serviceProviderBuilder()
                      .serviceName("app").build();
              serviceProvider.start();

              List<ServiceInstance<Void>> allInstances = Lists.newArrayList(serviceProvider
                             .getAllInstances());

              for (ServiceInstance<Void> instance : allInstances) {
                      System.out.println(instance.getId());
              }

       }

}

1 comment:

  1. ZooKeeper's architecture supports high availability through redundant services. The clients can thus ask another ZooKeeper leader if the first fails to answer. ZooKeeper nodes store their data in a hierarchical name space, much like a file system or a tree data structure. Clients can read and write from/to the nodes and in this way have a shared configuration service. More at Hadoop Online Training

    ReplyDelete