Kafka on Nomad

Log streaming using Kafka is well defined with many examples. Recently Foghorn was asked by a leading, global gaming company to design and implement a Kafka system to run in containers on the Hashicorp Nomad Orchestration System. They had the following requirements:

  • All Kafka components deployed and run within containers via Nomad
  • All communication between Kafka components secured via mTLS connections having short (2 week) TTLs for the associated certificates

Challenges

Although there are several kafka extensions (operators) available for the Kubernetes orchestrator, Nomad doesn’t have operators per se or extensions for Kafka, so we would need to create our own extensions to manage Kafka within the Nomad ecosystem. These extensions would have to handle the following:

  1. Zookeeper node discovery
  2. Kafka broker discovery (of zookeeper nodes)
  3. SSL certificate and key generation and re-generations across TTL boundaries
  4. Ongoing re-discovery (zookeeper, kafka) as nodes were restarted (scheduling of nomad allocations)

Discovery 

Nomad provides a robust service discovery layer via Consul which we could leverage to provide discovery for zookeeper nodes at startup allowing them to form a quorum. Similarly we could use consul to provide brokers the list of zookeeper nodes, and Kafka the list of broker nodes. The devil was in the details, which we will dive into shortly.

SSL Certificate/Key generation

Here again the Nomad system comes to the rescue with an excellent system for ssl certificate and key generations and most critically an automated system for re-generating these keys and certificates as they approach their TTL (time to live).

Ongoing re-discovery

Containers running in Nomad or for that matter any scheduler need to be able to handle asynchronous restarts. For Kafka this means the system needs to be resilient and recover from the restart of nodes in any layer. Kafka does not have any built in mechanism to handle re-discovery. This is generally not a problem with brokers or clients as on restart they simply reconnect to a single zookeeper or broker node respectively out of their configured lists which can be static DNS values. However this won’t work for zookeeper nodes as the need to be able to connect to all distinct nodes in order to quorum. Let’s take a look at how we solved for this.

Zookeeper Configuration

Having a strategy to handle the extensions needed to get Zookeeper running in the Nomad ecosystem let’s take a look at the specifics on each. 

Zookeeper needs several configuration files setup before it starts:

  1. zoo.cfg.dynamic 
  2. keystore.jks and truststore.jks 
  3. myid
  4. zoo.cfg
  5. jvm_flags.sh 

To create these configuration files we will use a combination of Nomad templating functionality and an entrypoint bash script, entrypoint.sh.  Let’s first start with how we handled creating zoo.cfg.dynamic which defines all the zookeeper nodes and the ports that they will be listening on.

Zookeeper Dynamic Configuration (zoo.cfg.dynamic)

The Nomad Consul ecosystem supports dynamic configuration of services via it’s Consul Template. Fortunately as of version 3.5.0 zookeeper comes with support for dynamic reconfiguration. This allowed us to set up zookeeper node(s) configuration via a file that can be managed by consul template. As an example for a 3 node zookeeper quorum the configuration file will look like this:

zoo.cfg.dynamic:

server.1 = <node1 address1>:<node1 port1>:<node1 port2>;<node1 client port>
server.2 = <node2 address1>:<node2 port1>:<node2 port2>;<node2 client port>
server.3 = <node3 address1>:<node2 port1>:<node3 port2>;<node2 client port>  

Consul template allows us to dynamically determine the node ip address, peer port values, and the client port for the node ip via the following template which when run against consul populates the values. 

zoo.cfg.dynamic.ctpl:

{{ range $_, $instance := service (printf "%s|passing" (env "ZK_CLIENT_SVC_NAME")) -}}
   {{ range $_, $alloc_index_tag := $instance.Tags }}
      {{ if $alloc_index_tag | regexMatch "alloc_index=(d+)" -}}
         {{ range $_, $peer1_port_tag := $instance.Tags }}
             {{ if $peer1_port_tag | regexMatch      "peer1_port=(d+)" -}}
                {{ range $_, $peer2_port_tag := $instance.Tags }}
                    {{ if $peer2_port_tag | regexMatch "peer2_port=(d+)" -}}
                       server.{{ $alloc_index_tag | replaceAll "alloc_index=" "" | parseInt | add 1 }}={{ $instance.Address }}:{{ $peer1_port_tag | replaceAll "peer1_port=" "" }}:{{ $peer2_port_tag | replaceAll "peer2_port=" "" }};{{ $instance.Port }}
                    {{ end }}
                {{ end }}
             {{ end }}
         {{ end }}
      {{ end }}
   {{ end }}
 {{ end }}

The first line will loop across all passing consul services matching the name defined by the ZK_CLIENT_SVC_NAME environment variable. It’s helpful to examine the Consul UI to understand what will be returned. Figure 1 shows the service endpoints registered in Consul by the 3 zookeeper nodes, along with Address which corresponds to the ip address:client port. The first line in the template will iterate across these 3 services. 

Additionally each zookeeper service registers the following meta data tags

  • peer_port1
  • peer_port2
  • Alloc_index

Fig 2-4 show the values for these 3 tags across each of the registered zookeeper service endpoints. 

Line 2 in the zoo.dynamic.cfg.ctpl template will iterate through all tags in the service currently being iterated on. Consul template syntax is limited so the lines 3-8 are messy but functional code to match the peer port values against the alloc index, necessary to match correct port values against the specific zookeeper service and underlying ip address and port client. Figures 2-4 show the tag values of each zookeeper service.

Execution of consul template against the template file is done within the entrypoint.sh script we mentioned earlier.

consul-template -once -template 
/consul-templates/zoo.cfg.dynamic.ctpl:/conf/zoo.cfg.dynamic

The final result of the consul template run produces a valid zookeeper dynamic configuration file. 

zoo.cfg.dynamic.

server.3=xx.xxx.x.xx:21728:29886;22649
server.1=xx.xxx.x.xxx:21182:25449;27447
server.2=xx.xxx.x.xxx:22652:23078;31492

Figure 1

Figure 2

Figure 3

Figure 4

 

 

Consul Service Registration 

Of course in order to work consul needs to have each zookeeper service register itself and add all of the required tag values before the actual zookeeper service launches. Here again we rely heavily on the nomad ecosystem. Nomad job templates allow us to define the tags, nomad will pre-populate them against consul before starting the underlying container, in our case the zookeeper service.

Lines 94-102 of the zookeeper nomad job template (see below) define the kafka-zk-XXX1-client service which the above consul template relies on to create the zoo.cfg.dynamic file. Lines 97-102 are where the tags are defined, the following are special Nomad runtime environment variables; NOMAD_HOST_PORT_<service> , NOMAD_ALLOC_INDEX.  Notice that unlike the other services, we do not define a service check for “kafka-zk-XXX1-telemetry-client”, this is important as it allows consul to register this service immediately as passing w/o anything listening to the defined ports. This place holder service allows the consul template to return valid values even before there are zookeepers up and running behind the ports.  For broker connections we will use the “kafka-zk-XXX1-telmetry-secure-client” service as that has valid service checks defined ensuring the service’s “status” is valid.

mTLS Connections (keystore.jks and truststore.jks)

In order to secure the connections between Kafka brokers and zookeepers we need to create truststore and keystore keys and reference those in the zookeeper configuration. To create these java keys we’ll use the Vault’s PKI Secrets Engine to first issue the needed certificate and key via nomad templates. See lines 248-274 of zookeeper.nomad where we generate an intermediate certificate “path/to/root-int-ca.pem” and an associated key “path/to/ssl/node.pem”. We set the change_mode of these two templates to “restart”, this ensures that before the TTL is reached on these certificates Nomad will generate a restart on the allocation forcing new keys to be generated. 

Now that we have the certificate and key we can generate the truststore and keystore from them. This is accomplished within the entrypoint script for the docker container, lines 184-209. Note this requires the openssl libraries which are not part of the default zookeeper docker images, so a custom docker image is required. 

Zookeeper myID

The zookeeper myid file is generated via nomad template, utilizing the Nomad environment variable NOMAD_ALLOC_INDEX which will ensure a unique id for each zookeeper allocation (node). See lines 146-153 of the nomad job template, zookeeper.nomad.

Zoo.cfg

The main zoo.cfg is generated via nomad template, see lines 156-186. The keystore and truststore passwords are pulled from Vault. 

Jvm_Flags.sh

The jvm_flag.sh is a convenience file allowing us to define dynamic java flag values, see lines 187-195.

Nomad job template

The full nomad job template is shown below. Once deployed it will result in 3 zookeeper allocations starting up and quoruming. We won’t get into the details of deploying nomad jobs from templates here but we used terraform

In Part 2 of Foghorn’s Kafka on Nomad 3 part blog series we will examine in detail how we configured Kafka broker Nomad allocations to be able to connect to our zookeeper Nomad allocations.

job "kafka-zk-XXX1-telemetry" {

  type = "service"

  group "kafka-zk-XXX1" {

    count = 3

    meta {
      cert_ttl            = "168h"
      cluster_dc          = "XXX1"
      mtls_path           = "/path/to/kafka/mtls"
      int_ca_path         = "/path/to/intca/ca"
      root_ca_path        = "/path/to/rootca/ca"
    }

    # Run tasks in serial or parallel (1 for serial)
    update {
      max_parallel = 1
      min_healthy_time = "1m"
    }
    
    restart {
      attempts = 3
      interval = "10m"
      delay    = "30s"
      mode     = "fail"
    }

    migrate {
      max_parallel     = 1
      health_check     = "checks"
      min_healthy_time = "10s"
      healthy_deadline = "5m"
    }

    reschedule {
      delay          = "30s"
      delay_function = "constant"
      unlimited      = true
    }

    ephemeral_disk {
      migrate = false
      size    = "500"
      sticky  = false
    }

    task "kafka-zk-XXX1" {
      driver = "docker"
      config {
        image = "kafka-zookeeper-3.5.5"
        entrypoint = ["/conf/entrypoint.sh"]
        command = "zkServer.sh start-foreground"
        labels {
            group = "zk-docker"
        }
        network_mode = "host"
        port_map {
            client = 2181
            peer1 = 2888
            peer2 = 3888
            jmx = 9999
        }
        volumes = [
          "local/conf:/conf",
          "local/data:/data",
          "local/logs:/logs"
        ]
      }

      env {
        ZOO_CONF_DIR="/conf"
        ZOO_DATA_DIR="/data"
        ZOO_LOG4J_PROP="INFO,CONSOLE"
        ZK_WAIT_FOR_CONSUL_SVC="30"
        ZK_CLIENT_SVC_NAME="kafka-zk-XXX1-client"
        ZK_PEER1_SVC_NAME="kafka-zk-XXX1-peer1"
        ZK_PEER2_SVC_NAME="kafka-zk-XXX1-peer2"
      }

      kill_timeout = "15s"

      resources {
        cpu = 1000
        memory = 1024
        network {
          mbits = 100
          port "client" {}
          port "secure_client" {
            static = 2281
          }
          port "peer1" {}
          port "peer2" {}
          port "jmx" {}
          port "jolokia" {}
        }
      }
      service {
        port = "client"
        name = "kafka-zk-XXX1-telemetry-client"
        tags = [
          "kafka-zk-XXX1-telmetry-client",
          "peer1_port=$${NOMAD_HOST_PORT_peer1}",
          "peer2_port=$${NOMAD_HOST_PORT_peer2}",
          "alloc_index=$${NOMAD_ALLOC_INDEX}"
        ]
      }
      service {
        port = "secure_client"
        name = "kafka-zk-XXX1-telemetry-secure-client"
        tags = [
          "kafka-zk-XXX1-telmetry-secure-client"
        ]
        check {
          name     = "secure-client-check"
          port     = "secure_client"
          type     = "tcp"
          interval = "30s"
          timeout  = "2s"
          initial_status = "passing"
        }
      }
      service {
        port = "peer1"
        name = "kafka-zk-XXX1-telmetry-peer1"
        tags = [
          "kafka-zk-XXX1-telemetry-peer1"
        ]
      }
      service {
        port = "peer2"
        name = "kafka-zk-XXX1-telemetry-peer2"
        tags = [
          "kafka-zk-XXX1-telmetry-peer2"
        ]
      }

      vault {
         policies = ["allow_vault"]
         change_mode = "noop"
      }

      # consul template used to create the zoo.cfg.dyamic file within the entrypoint script.
      template {
        destination = "local/conf/zoo.cfg.dynamic.ctpl"
        change_mode = "noop"
        data = <<EOF
{{ range $_, $instance := service (printf "%s|passing" (env "ZK_CLIENT_SVC_NAME")) -}}
{{ range $_, $alloc_index_tag := $instance.Tags }}{{ if $alloc_index_tag | regexMatch "alloc_index=(d+)" -}}
{{ range $_, $peer1_port_tag := $instance.Tags }}{{ if $peer1_port_tag | regexMatch "peer1_port=(d+)" -}}
{{ range $_, $peer2_port_tag := $instance.Tags }}{{ if $peer2_port_tag | regexMatch "peer2_port=(d+)" -}}
server.{{ $alloc_index_tag | replaceAll "alloc_index=" "" | parseInt | add 1 }}={{ $instance.Address }}:{{ $peer1_port_tag | replaceAll "peer1_port=" "" }}:{{ $peer2_port_tag | replaceAll "peer2_port=" "" }};{{ $instance.Port }}
{{ end }}{{ end }}{{ end }}{{ end }}{{ end }}{{ end }}{{ end }}
EOF
      }
      # Generate a myid file, which is copied to /data/myid by the entrypoint script.
      template {
        destination = "local/conf/myid"
        change_mode = "noop"
        data = <<EOF
{{ env "NOMAD_ALLOC_INDEX" | parseInt | add 1 }}
EOF
      }
      # as zookeeper dynamically updates zoo.cfg we template to zoo.cfg.tmpl and in the docker-entrypoint.sh of the image copy to zoo.cfg.
      # this prevents the allocation from throwing an error when zookeeper updates zoo.cfg
      template {
        destination = "local/conf/zoo.cfg.tmpl"
        change_mode = "noop"
        data = <<EOF
{{ $mtls_path := env "NOMAD_META_mtls_path" -}}
admin.enableServer=false
tickTime=2000
initLimit=5
syncLimit=2
standaloneEnabled=false
reconfigEnabled=true
skipACL=yes
4lw.commands.whitelist=*
secureClientPort={{ env "NOMAD_PORT_secure_client" }}
serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
sslQuorum=true
ssl.quorum.hostnameVerification=false
ssl.quorum.keyStore.location=/conf/ssl/keystore.jks
ssl.quorum.keyStore.password={{ with secret (printf "%s" $mtls_path) }}{{ .Data.keystore_password }}{{ end }}
ssl.quorum.trustStore.location=/conf/ssl/truststore.jks
ssl.quorum.trustStore.password={{ with secret (printf "%s" $mtls_path) }}{{ .Data.truststore_password }}{{ end }}
authProvider.1=org.apache.zookeeper.server.auth.X509AuthenticationProvider
ssl.hostnameVerification=false
ssl.keyStore.location=/conf/ssl/keystore.jks
ssl.keyStore.password={{ with secret (printf "%s" $mtls_path) }}{{ .Data.keystore_password }}{{ end }}
ssl.trustStore.location=/conf/ssl/truststore.jks
ssl.trustStore.password={{ with secret (printf "%s" $mtls_path) }}{{ .Data.truststore_password }}{{ end }}
dataDir=/data
dynamicConfigFile=/conf/zoo.cfg.dynamic
EOF
      }
      template {
        destination = "local/conf/jvm_flags.sh"
        change_mode = "noop"
        data = <<EOF
#!/usr/bin/env bash
export 
SERVER_JVMFLAGS="-Dzookeeper.serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory -Dcom.sun.management.jmxremote.host={{ env "NOMAD_IP_jmx" }} -javaagent:/apache-zookeeper/lib/jolokia-jvm-agent.jar=port={{ env "NOMAD_PORT_jolokia" }},host={{ env "NOMAD_IP_jolokia" }}"
export JMXPORT="{{ env "NOMAD_PORT_jmx" }}"
EOF
      }
      template {
        destination = "local/conf/entrypoint.sh"
        change_mode = "noop"
        data = <<EOF
#!/usr/bin/env bash
set -e

# sleep to allow nomad services to be registered in consul and for zookeeper-watcher to run after service changes
if [[ -z "${ZK_WAIT_FOR_CONSUL_SVC}" ]]; then
    sleep 30 # reasonable default
else
    sleep $ZK_WAIT_FOR_CONSUL_SVC
fi

# if zoo.cfg.tmpl exists copy to zoo.cfg
if [[ -f "$ZOO_CONF_DIR/zoo.cfg.tmpl" ]]; then
    cp $ZOO_CONF_DIR/zoo.cfg.tmpl $ZOO_CONF_DIR/zoo.cfg
fi

# create the zookeeper dynamic cfg from consul template
if [[ -z "${CONSUL_HTTP_ADDR}" ]]; then
    consul-template -once -template /consul-templates/zoo.cfg.dynamic.ctpl:$ZOO_CONF_DIR/zoo.cfg.dynamic
else
    consul-template -once -consul-addr=${CONSUL_HTTP_ADDR} -template /consul-templates/zookeeper-services.ctpl:$ZOO_CONF_DIR/zoo.cfg.dynamic
fi

# create truststore and keystore from pem files if they exist
if [[ -f "$ZOO_CONF_DIR/ssl/root-int-ca.pem" && -f "$ZOO_CONF_DIR/ssl/node.pem" ]]; then

    if [[ -f "$ZOO_CONF_DIR/ssl/truststore.jks" ]]; then
        rm "$ZOO_CONF_DIR/ssl/truststore.jks"
    fi
    if [[ -f "$ZOO_CONF_DIR/ssl/keystore.jks" ]]; then
        rm "$ZOO_CONF_DIR/ssl/keystore.jks"
    fi

    # create truststore jks
    echo "create truststore.jks"

    # pull truststore from zoo.cfg
    truststore_password="$(grep ssl.trustStore.password= $ZOO_CONF_DIR/zoo.cfg | egrep -o '[^=]+$')"
    keytool -import -alias root-int-ca -trustcacerts -file $ZOO_CONF_DIR/ssl/root-int-ca.pem -noprompt 
      -keystore $ZOO_CONF_DIR/ssl/truststore.jks -storepass $truststore_password

    # create keystore jks
    echo "create keystore.jks"

    # pull keystore password from zoo.cfg
    keystore_password="$(grep ssl.keyStore.password= $ZOO_CONF_DIR/zoo.cfg | egrep -o '[^=]+$')"
    openssl pkcs12 -export -in $ZOO_CONF_DIR/ssl/node.pem -out $ZOO_CONF_DIR/ssl/node.p12 -passout pass:$keystore_password
    keytool -importkeystore -srckeystore $ZOO_CONF_DIR/ssl/node.p12 -srcstoretype PKCS12 
      -destkeystore $ZOO_CONF_DIR/ssl/keystore.jks -srcstorepass $keystore_password -deststorepass $keystore_password
fi

# myid is generated by Nomad job (myid = allocation index + 1)
cp $ZOO_CONF_DIR/myid $ZOO_DATA_DIR/myid

# source in SERVER_JVMFLAGS and CLIENT_JVMFLAGS
. $ZOO_CONF_DIR/jvm_flags.sh

# Allow the container to be started with `--user`
if [[ "$1" = 'zkServer.sh' && "$(id -u)" = '0' ]]; then
    chown -R zookeeper "$ZOO_DATA_DIR" "$ZOO_DATA_LOG_DIR" "$ZOO_LOG_DIR"
    echo "gosu zookeeper $@"
    exec gosu zookeeper "$@"
else
    exec "$@"
fi
EOF
      }
      template {
        destination = "path/to/root-int-ca.pem"
        change_mode = "restart"
  data = <<EOH
{{ $root_ca_path := env "NOMAD_META_root_ca_path" -}}
{{ $int_ca_path := env "NOMAD_META_int_ca_path" -}}
{{ with secret (printf "%s" $int_ca_path) }}
{{ .Data.certificate -}}
{{ end -}}
{{ with secret (printf "%s" $root_ca_path) }}
{{ .Data.certificate -}}
{{ end }}
EOH
      }
      template {
        destination = "path/to/ssl/sl/node.pem"
        change_mode = "restart"
        data = <<EOH
{{ $ip_address := env "NOMAD_IP_client" -}}
{{ $vault_cert_path := env "NOMAD_META_vault_cert_path" -}}
{{ $cluster_dc := env "NOMAD_META_cluster_dc" -}}
{{ $cert_ttl := env "NOMAD_META_cert_ttl" -}}
{{ with secret (printf "%s" $vault_cert_path) (printf "common_name=zk-%s.service.%s.consul" $cluster_dc $cluster_dc) (printf "alt_names=zk-%s.service.%s.consul" $cluster_dc $cluster_dc) (printf "ip_sans=%s" $ip_address) (printf "ttl=%s" $cert_ttl) (printf "format=pem_bundle") }}
{{ .Data.certificate -}}
{{ end }}
EOH
      }
    }
  }
}

Previous

Next