Connecting to Confluent Cloud using kafka-go

March 12, 2020    kafka go

I’m in love with Confluent Cloud’s Kafka offering, it’s serverless and decently fast for most cases, the only issue is that the instructions from its dashboard to connect from Go programs are based on confluentinc/confluent-kafka-go which under the hood uses libkafka via CGO.

I prefer to avoid CGO whenever possible which is why I always use segmentio/kafka-go, so I’m going to show you how to use it as a client to connect to a Confluent Cloud cluster.

First, you need to create an API Key to access your cluster, you can find in Cluster settings -> API access by clicking ”+ Add key”.

Before clicking “Continue” you copy both the Key* and the Secret which we’ll be using from kafka-go as username and password respectively.

Next, we need the host and port of the broker, we find that in CLI & client configuration -> Go:

We copy the value from bootstrap.servers from the config snippet and use it as follows:

import (
	"context"
	"crypto/tls"
	"crypto/x509"

	"github.com/segmentio/kafka-go/sasl/plain"

	kafka "github.com/segmentio/kafka-go"
)

...
broker := "pkc-ep9mm.us-east-2.aws.confluent.cloud:9092" // bootstrap.servers
rootCAs, _ := x509.SystemCertPool()
if rootCAs == nil {
  rootCAs = x509.NewCertPool()
}
dialer := &kafka.Dialer{
  DualStack: true,
  SASLMechanism: plain.Mechanism{
    Username: "G2C2IDOBMB5ENA3F", // access key
    Password: "l55+WCBHcft7YEeXKCQQsnf3g+9YO6a7inYsSa/9T70HZAv7u/tQxxDei0TNKIm", // secret
  },
  TLS: &tls.Config{
    InsecureSkipVerify: true,
    RootCAs:            rootCAs,
  },
}

That’s it, we can now create regular writes and readers with dialer and brokers:

// example writer
w := kafka.NewWriter(kafka.WriterConfig{
  Dialer:    dialer,
  Brokers:   brokers,
  Topic:     "messages",
  Balancer:  &kafka.Hash{},
  BatchSize: 1,
})

w.WriteMessages(context.Background(),
	kafka.Message{
		Key:   []byte("Key-A"),
		Value: []byte("Hello World!"),
	},
	kafka.Message{
		Key:   []byte("Key-B"),
		Value: []byte("One!"),
	},
	kafka.Message{
		Key:   []byte("Key-C"),
		Value: []byte("Two!"),
	},
)

w.Close()

Discovering the Controller

The broker url given by Confluent Cloud is not necessarily the controller of the kafka cluster, so performing administrative such as topic creation and configuration may fail, the best way to ensure it always works is by discovering the controller first:

// connect to bootstrap server to discover the controller
brokerConn, err := dialer.DialContext(ctx, "tcp", brokers[0])
if err != nil {
  return err
}
defer brokerConn.Close()
controller, err := brokerConn.Controller()
if err != nil {
  return err
}

// connect to the controller
controllerHost := fmt.Sprintf("%s:%d", controller.Host, controller.Port)
controllerConn, err = dialer.DialContext(ctx, "tcp", controllerHost)
if err != nil {
  return err
}
defer controllerConn.Close()

// you can use controllerConn for administrative tasks
if err := controllerConn.CreateTopics(kafka.TopicConfig{
  NumPartitions:     1,
  ReplicationFactor: 3,
  Topic:             "messages",
}); err != nil {
  return err
}

Happy Kafka hacking!