Friday, February 27, 2015

Service Discovery with Zookeeper



The design behind Zookeeper is intriguing. If you do not think so, ask yourself, if you were to design a library that does inter-process locking, what would you do? You probably want to expose features such as lock, unlock right? Not Zookeeper. Instead, it provides file-system like APIs that enables users to develop their own inter-process coordination.

This gives Zookeeper a lot of flexibility.
But using Zookeeper APIs is difficult (e.g. you have to manage connections, manage errors etc), so comes Curator which makes working with Zookeeper much easier. 

Zookeeper and Curator provide recipes for some common inter-process coordination tasks. 

I use Zookeeper in my container farm for service registry and service discovery:


Containers that run ElasticSearch will register themselves to Zookeeper. Containers that run my app also run Logstash, which will not start until it finds ElasticSearch service is registered. Logstash will analyze logs and send logs to ElasticSearch, and upon fatal errors, it will email the admin.  My apps will also register themselves with Zookeeper.

 

Install zookeeper

To try the following code, first download zookeeper from http://www.apache.org/dyn/closer.cgi/zookeeper/, unzip it. Copy data/zoo_sample.cfg to data/zoo.cfg, edit zoo.cfg, and change dataDir to a folder that exists. 

Start zookeeper server:
./zkServer.sh start

And now start zookeeper client, you can use this client to view registered services:
./zkCli.sh

 

Service registering recipe

The following is the service registering application:
public class ServiceRegister {

       @Parameter(names = { "-service" }, description = "service ids")
       private String serviceId;

       @Parameter(names = { "-h" }, description = "help")
       private boolean help;

       @Parameter(names = { "-zkServer" }, description = "the ip address of the zookeeper server")
       private String zkServer;

       @Parameter(names = { "-zkPort" }, description = "the port of the zookeeper server")
       private String zkPort;

       private static volatile boolean keepRunning = true;

       private ServiceDiscovery<Void> disoveryBuilder;

       public static void main(String[] args) throws Exception {
              final ServiceRegister register = new ServiceRegister();
              new JCommander(register, args);

              register.registerService();

              Runtime.getRuntime().addShutdownHook(new Thread() {
                      public void run() {
                             try {
                                    register.unregisterService();
                             } catch (Exception e) {
                                    e.printStackTrace();
                             }
                             keepRunning = false;

                      }
              });

              while (keepRunning) {
                      Thread.sleep(50000000);
              }
       }

       private void registerService() throws Exception {

              CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(zkServer + ":"+ zkPort, new RetryNTimes(5, 1000));
              curatorFramework.start();

              disoveryBuilder = ServiceDiscoveryBuilder.<Void> builder(Void.class)
                             .basePath("mycontainerfarm").client(curatorFramework).build();

              String[] ids = serviceId.split(",");
              for (String id : ids) {
                      if (Strings.isNullOrEmpty(id)) {
                             continue;
                      }

                      ServiceInstance<Void> serviceInstance = ServiceInstance.<Void> builder().name("app").id(id).build();

                      disoveryBuilder.registerService(serviceInstance);
              }

       }

       private void unregisterService() throws Exception {
              String[] ids = serviceId.split(",");
              for (String id : ids) {
                      if (Strings.isNullOrEmpty(id)) {
                             continue;
                      }

                      ServiceInstance<Void> serviceInstance = ServiceInstance.<Void> builder().name("app")      .id(id).build();

                      disoveryBuilder.unregisterService(serviceInstance);
              }
       }
}
Run this application with arguments:
ServiceRegister -zkServer 192.168.33.16 -zkPort 2181 -service myapp1

Use the zkCli, you will see myapp1 is registered at /mycontainerfarm/app/myapp1:

Unregistering is not necessary, because the node/mycontainerfarm/app/myapp1is an ephemeral one, when ServiceRegister exits, zookeeper server will automatically remove the node after a period of time.  

ServiceRegister unregisters explicitly in its shutdown hook, so that when it exits, the node /mycontainerfarm/app/myapp1 immediately disappears.

 

Inter-process locking recipe

There is one problem with this application, the argument -service is the service id, which is unique.  Curator’s logic is that if it finds the same node already exists, it will delete it, and recreate it. 

You can verify this by checking the ctime and mtime of get /mycontainerfarm/app/myapp1. This doesn’t fit with my requirement: I only want to register the same service if it doesn’t exist. 

We can use another recipe: inter-process locking.

for (String id : ids) {
              if (Strings.isNullOrEmpty(id)) {
                      continue;
              }

              InterProcessMutex lock = new InterProcessMutex(curatorFramework,
                             "/mycontainerfarm/myapp" + serviceId);
              boolean acquired = lock.acquire(5, TimeUnit.SECONDS);
              if (acquired) {
                      ServiceInstance<Void> serviceInstance = ServiceInstance.<Void> builder()
                                    .name("app").id(id).build();

                      disoveryBuilder.registerService(serviceInstance);
              } else {
                      System.out.println("fail to register " + serviceId);
       }
}

 

Service discovery recipe

public class ServiceDiscover {
       @Parameter(names = { "-zkServer" }, description = "the ip address of the zookeeper server")
       private String zkServer;

       @Parameter(names = { "-zkPort" }, description = "the port of the zookeeper server")
       private String zkPort;

       public static void main(String[] args) throws Exception {

              final ServiceDiscover discover = new ServiceDiscover();
              new JCommander(discover, args);
              discover.discover();

       }

       private void discover() throws Exception {
              CuratorFramework client = CuratorFrameworkFactory.newClient(zkServer + ":" + zkPort,       new RetryNTimes(5, 1000));
              client.start();

              final ServiceDiscovery<Void> serviceDiscovery = ServiceDiscoveryBuilder
                  .<Void> builder(Void.class).basePath("mycontainerfarm").client(client).build();
              serviceDiscovery.start();

              ServiceProvider<Void> serviceProvider = serviceDiscovery.serviceProviderBuilder()
                      .serviceName("app").build();
              serviceProvider.start();

              List<ServiceInstance<Void>> allInstances = Lists.newArrayList(serviceProvider
                             .getAllInstances());

              for (ServiceInstance<Void> instance : allInstances) {
                      System.out.println(instance.getId());
              }

       }

}

Tuesday, February 17, 2015

Trap killing signals inside Docker



When you stop a Docker container with:

sudo docker stop containername

You are sending SIGTERM signal to Docker, or more precisely the PID 1 process running inside the container. PID 1 process is the one that you ask the container to run when it starts up, it can be the ENTRYPOINT/CMD command defined in the Docker file, or the process passed into docker run

If PID 1 process spawns other processes, in order to shut down those processes gracefully, the PID 1 process has to trap killing signals and pass them on to other processes.

Writing software is a very good way to train my brain, it has taught me how to do system thinking, how to go slow to go fast. I originally started to tackle this problem using Docker, after a few hours of futile trying, I gave it up. The next day, I designed small programs, and slowly, but steadily I solved the problem.

First I wrote a small HelloWorld java application:

public class HelloWorld {
       private static volatile boolean keepRunning = true;

       @Parameter(names = { "-msg" })
       private static String msg = "world";

       public static void main(String[] args) throws Exception {

             final HelloWorld hello = new HelloWorld();
             new JCommander(hello, args);

             System.out.println("hello " + msg);

             Runtime.getRuntime().addShutdownHook(new Thread() {
                    public void run() {
             //wait a while so you can more clearly how things happen in timeline
                          try {
                                 Thread.sleep(50000);
                          } catch (InterruptedException e) {
                                 e.printStackTrace();
                          }

                          try {
                                 System.out.println("goodbye " + msg);
                          } catch (Exception e) {

                          }
                          keepRunning = false;

                    }
             });

             while (keepRunning) {
                    Thread.sleep(50000000);
             }

       }
}

The java application prints “hello world” when it starts, and hang until it receives a shutdown signal.  When it receives the signal, it prints “goodbye world” and exit. 

hello0.sh starts the java application:

#!/bin/bash
CLASSPATH="HelloWorld.jar"
java -cp "$CLASSPATH" HelloWorld ${@} &


Hello shell spawns the java application process. Killing the shell process doesn’t kill the java process.

If we just want to kill the java process when the shell is killed, one simple solution is to use exec to invoke the java program, here comes hello1.sh:

#!/bin/bash
CLASSPATH="HelloWorld.jar"
exec java -cp "$CLASSPATH" HelloWorld ${@}
exec java -cp "$CLASSPATH" HelloWorld ${@}


When using exec, the Java process takes over the life of the shell process, meaning, the java process replaces the shell process, no new PID is created.


The same effect can also be achieved by running the java application in the background:

java -cp "$CLASSPATH" HelloWorld ${@} &

But there is an important difference, with exec, the java process replaces the shell process, no new PID is created; when running the java application in the back ground, the shell process starts the java application, continues to run, and exit, so the shell process disappears, and the java application runs in the background with its own PID. 

This has an implication for Docker: if run processes in the background, when the PID 1 exits, the Docker container dies too. To keep PID 1 process running, a trick I often use is to tail –f some file, e.g.
ENTRYPOINT (java -cp HelloWorld.jar HelloWorld &) && touch tmp.txt && tail -f tmp.txt

So could “exec” be the answer? No, things are not always that simple. What if I want to run other applications after the HelloWorld application?  If I run two HelloWorld applications (hello2.sh):

#!/bin/bash
CLASSPATH="HelloWorld.jar"
exec java -cp "$CLASSPATH" HelloWorld -msg apple
exec java -cp "$CLASSPATH" HelloWorld -msg strawberry
Only the first application will get to run, and because it takes over the life of the shell process, the second one will never get to run:

The solution is:
  •  Keep the shell process running, so it can trap the killing signals
  •  Start application processes in the background, and get their PIDs
  •  When the shell process receives killing signals, kill application processes using their PIDs
Here is hello3.sh:
#!/bin/bash
trap 'shut_down' TERM INT

shut_down(){  
       echo "try killing $PID1"
    kill -TERM $PID1

    echo "try killing $PID2"
    kill -TERM $PID2     
      
       wait $PID1
       wait $PID2
      
       echo "kill accomplished"   
}

CLASSPATH="HelloWorld.jar"

java -cp "$CLASSPATH" HelloWorld -msg apple &
PID1=$!
echo "previous process id is $PID1"

java -cp "$CLASSPATH" HelloWorld -msg strawberry &
PID2=$!
echo "previous process id is $PID2"

wait $PID1
wait $PID2

echo "termination complete"
And how it runs:


Let us get a little bit crazier, and write hello4.sh to call hello3.sh:
#!/bin/bash
trap 'shut_down;wait $PID' TERM INT

shut_down(){  
       echo "try killing $PID"
    kill -TERM $PID   
       wait $PID
       echo "kill hello3.sh accomplished"
}

./hello3.sh &

PID=$!
echo "hello3.sh id is $PID"
wait $PID

echo "terminating hello3 complete"
 And how it runs:


Things in real life are even crazier, in my case, the PID 1 process in the container is a python application, which in turns invokes a shell, which in turns invokes two other shells. So the game of trapping and killing has to be passed from one to another. The basic principles stay the same.

To keep the python application running and passing killing signals:
pid= subprocess.Popen(['./hello4.sh], shell=True)

def signal_handler(signal, frame):
   
print('Exiting...')
    os.system(
"kill -TERM "+pid)
signal.signal(signal.SIGTERM, signal_handler)

while 1:
    time.sleep(
30*24*60*60)