Getting started

Up and running Watermill.

What is Watermill?

Watermill is a Golang library for working efficiently with message streams. It is intended for building event-driven applications, enabling event sourcing, RPC over messages, sagas and basically whatever else comes to your mind. You can use conventional pub/sub implementations like Kafka or RabbitMQ, but also HTTP or MySQL binlog if that fits your use case.

It comes with a set of Pub/Sub, implementations which can be easily replaced by your own implementation

Watermill is also shipped with the set of standard tools (middlewares) like instrumentation, poison queue, throttling, correlation and other tools used by every message-driven application.

Install

go get -u github.com/ThreeDotsLabs/watermill/

Subscribing for messages

One of the most important parts of the Watermill is Message. It is as important as http.Request for http package. Almost every part of Watermill uses this type in some part.

When we are building reactive/event-driven application/[insert your buzzword here] we always want to listen of incoming messages to react for them. Watermill is supporting multiple publishers and subscribers implementations, with compatible interface and abstraction which provide similar behaviour.

Let’s start with subscribing for messages.

Full source: content/docs/getting-started/go-channel/main.go

// ...
import (
    "log"
    "time"

    "github.com/satori/go.uuid"

    "github.com/ThreeDotsLabs/watermill/message"

    "github.com/ThreeDotsLabs/watermill"
    "github.com/ThreeDotsLabs/watermill/message/infrastructure/gochannel"
)

func main() {
    pubSub := gochannel.NewGoChannel(
        0, // buffer (channel) size
       watermill.NewStdLogger(false, false),
        time.Second, // send timeout
   )

    messages, err := pubSub.Subscribe("example.topic")
    if err != nil {
        panic(err)
    }

    go process(messages)
// ...

Full source: content/docs/getting-started/go-channel/main.go

// ...
func process(messages chan *message.Message) {
    for msg := range messages {
        log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload))

        // we need to Acknowledge that we received and processed the message,
       // otherwise we will not receive next message
       msg.Ack()
    }
}

Installed librdkafka is required to run Kafka subscriber.

Running in Docker

Easiest way to run Watermill with Kafka locally is using Docker.

Full source: content/docs/getting-started/kafka/docker-compose.yml

version: '3'
services:
  server:
    image: threedotslabs/golang-librdkafka:1.11.2-stretch
    restart: on-failure
    depends_on:
      - kafka
    volumes:
      - .:/app
    working_dir: /app
    command: go run main.go

  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    restart: on-failure
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:latest
    restart: on-failure
    depends_on:
      - zookeeper
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

The source should go to main.go.

To run please execute docker-compose up command.

More detailed explanation of how it is running, and how to add live reload you can find in our […] article.

Installing librdkafka on Ubuntu

Newest version of the librdkafka for Ubuntu distributions you can find in Confluent’s repository.

# install `software-properties-common`, `wget`, or `gnupg` if not installed yet
sudo apt-get install -y software-properties-common wget gnupg

# add a new repository
wget -qO - https://packages.confluent.io/deb/4.1/archive.key | sudo apt-key add -
sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/4.1 stable main"

# and then you can install newest version of `librdkafka`
sudo apt-get update && sudo apt-get -y install librdkafka1 librdkafka-dev

Installing librdkafka on CentOS/RedHat/Fedora

We will use Confluent’s repository to download newest version of librdkafka.

# install `curl` and `which` if not already installed
sudo yum -y install curl which

# install Confluent public key
sudo rpm --import https://packages.confluent.io/rpm/4.1/archive.key

# add repository to /etc/yum.repos.d/confluent.repo
sudo cat > /etc/yum.repos.d/confluent.repo << EOF
[Confluent.dist]
name=Confluent repository (dist)
baseurl=https://packages.confluent.io/rpm/4.1/7
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/4.1/archive.key
enabled=1

[Confluent]
name=Confluent repository
baseurl=https://packages.confluent.io/rpm/4.1
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/4.1/archive.key
enabled=1
EOF

# clean YUM cache
sudo yum clean all

# install librdkafka
sudo yum -y install librdkafka1 librdkafka-dev

Installing librdkafka on macOS

On macOS, you can install librdkafka via Homebrew:

brew install librdkafka

Building from sources (for other distros)

Manually compiling from sources:

wget -O "librdkafka.tar.gz" "https://github.com/edenhill/librdkafka/archive/v0.11.6.tar.gz"

mkdir -p librdkafka
tar --extract --file "librdkafka.tar.gz" --directory "librdkafka" --strip-components 1
cd "librdkafka"

./configure --prefix=/usr && make -j "$(getconf _NPROCESSORS_ONLN)" && make install

Full source: content/docs/getting-started/kafka/main.go

// ...
import (
    "log"

    "github.com/ThreeDotsLabs/watermill"

    "github.com/satori/go.uuid"

    "github.com/ThreeDotsLabs/watermill/message"
    "github.com/ThreeDotsLabs/watermill/message/infrastructure/kafka"
)

func main() {
    subscriber, err := kafka.NewConfluentSubscriber(
        kafka.SubscriberConfig{
            Brokers:       []string{"kafka:9092"},
            ConsumerGroup: "test_consumer_group",
        },
        kafka.DefaultMarshaler{},
        watermill.NewStdLogger(false, false),
    )
    if err != nil {
        panic(err)
    }

    messages, err := subscriber.Subscribe("example.topic")
    if err != nil {
        panic(err)
    }

    go process(messages)
// ...

Full source: content/docs/getting-started/kafka/main.go

// ...
func process(messages chan *message.Message) {
    for msg := range messages {
        log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload))

        // we need to Acknowledge that we received and processed the message,
       // otherwise we will not receive next message
       msg.Ack()
    }
}

Publishing messages

Full source: content/docs/getting-started/go-channel/main.go

// ...
   go process(messages)

    publishMessages(pubSub)
}

func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(uuid.NewV4().String(), []byte("Hello, world!"))

        if err := publisher.Publish("example.topic", msg); err != nil {
            panic(err)
        }
    }
}
// ...

Full source: content/docs/getting-started/kafka/main.go

// ...
   go process(messages)

    publisher, err := kafka.NewPublisher([]string{"kafka:9092"}, kafka.DefaultMarshaler{}, nil)
    if err != nil {
        panic(err)
    }

    publishMessages(publisher)
}

func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(uuid.NewV4().String(), []byte("Hello, world!"))

        if err := publisher.Publish("example.topic", msg); err != nil {
            panic(err)
        }
    }
}
// ...

Message format

We don’t enforce any message format. You can use strings, JSON, protobuf, Avro, gob or anything else what serializes to []byte.

Using Messages Router

Publishers and subscribers are rather low-level parts of Watermill. In production use, we want usually use something which is higher level and provides some features like correlation, metrics, poison queue, retrying, throttling etc.

We also don’t want to manually send Ack when processing was successful. Sometimes, we also want to send a message after processing another.

To handle these requirements we created component named Router.

The flow of our application looks like this:

  1. We are producing a message to the topic example.topic_1 every second.
  2. struct_handler handler is listening to example.topic_1. When a message is received, UUID is printed and a new message is produced to example.topic_2.
  3. print_events_topic_1 handler is listening to example.topic_1 and printing message UUID, payload and metadata. Correlation ID should be the same as in message in example.topic_1.
  4. print_events_topic_2 handler is listening to example.topic_2 and printing message UUID, payload and metadata. Correlation ID should be the same as in message in example.topic_2.

Router configuration

For the beginning, we should start with the configuration of the router. We will configure which plugins and middlewares we want to use.

We also will set up handlers which this router will support. Every handler will independently handle the messages.

Full source: content/docs/getting-started/router/main.go

package main

import (
    "fmt"
    "log"
    "time"

    "github.com/satori/go.uuid"

    "github.com/ThreeDotsLabs/watermill"
    "github.com/ThreeDotsLabs/watermill/message"
    "github.com/ThreeDotsLabs/watermill/message/infrastructure/gochannel"
    "github.com/ThreeDotsLabs/watermill/message/router/middleware"
    "github.com/ThreeDotsLabs/watermill/message/router/plugin"
)

var (
    // just a simplest implementation,
   // probably you want to ship your own implementation of `watermill.LoggerAdapter`
   logger = watermill.NewStdLogger(false, false)
)

func main() {
    router, err := message.NewRouter(message.RouterConfig{}, logger)
    if err != nil {
        panic(err)
    }

    // this plugin will gracefully shutdown router, when SIGTERM was sent
   // you can also close router by just calling `r.Close()`
   router.AddPlugin(plugin.SignalsHandler)

    router.AddMiddleware(
        // correlation ID will copy correlation id from consumed message metadata to produced messages
       middleware.CorrelationID,

        // when error occurred, function will be retried,
       // after max retries (or if no Retry middleware is added) Nack is send and message will be resent
       middleware.Retry{
            MaxRetries: 3,
            WaitTime:   time.Millisecond * 100,
            Backoff:    3,
            Logger:     logger,
        }.Middleware,

        // this middleware will handle panics from handlers
       // and pass them as error to retry middleware in this case
       middleware.Recoverer,
    )

    // for simplicity we are using gochannel Pub/Sub here,
   // you can replace it with any Pub/Sub implementation, it will work the same
   pubSub := gochannel.NewGoChannel(0, logger, time.Second)

    // producing some messages in background
   go publishMessages(pubSub)

    if err := router.AddHandler(
        "struct_handler",  // handler name, must be unique
       "example.topic_1", // topic from which we will read events
       "example.topic_2", // topic to which we will publish event
       pubSub,
        structHandler{}.Handler,
    ); err != nil {
        panic(err)
    }

    // just for debug, we are printing all events sent to `example.topic_1`
   if err := router.AddNoPublisherHandler(
        "print_events_topic_1",
        "example.topic_1",
        pubSub,
        printMessages,
    ); err != nil {
        panic(err)
    }

    // just for debug, we are printing all events sent to `example.topic_2`
   if err := router.AddNoPublisherHandler(
        "print_events_topic_2",
        "example.topic_2",
        pubSub,
        printMessages,
    ); err != nil {
        panic(err)
    }

    // when everything is ready, let's run router,
   // this function is blocking since router is running
   if err := router.Run(); err != nil {
        panic(err)
    }
}

// ...

Producing messages

Producing messages work just like before. We only have added middleware.SetCorrelationID to set correlation ID. Correlation ID will be added to all messages produced by the router (middleware.CorrelationID).

Full source: content/docs/getting-started/router/main.go

// ...
func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(uuid.NewV4().String(), []byte("Hello, world!"))
        middleware.SetCorrelationID(uuid.NewV4().String(), msg)

        log.Printf("sending message %s, correlation id: %s\n", msg.UUID, middleware.MessageCorrelationID(msg))

        if err := publisher.Publish("example.topic_1", msg); err != nil {
            panic(err)
        }

        time.Sleep(time.Second)
    }
}
// ...

Handlers

You may notice that we have two types of handler functions:

  1. function func(msg *message.Message) ([]*message.Message, error)
  2. method func (c structHandler) Handler(msg *message.Message) ([]*message.Message, error)

The second option is useful when our function requires some dependencies like database, logger etc. When we have just function without dependencies, it’s fine to use just a function.

Full source: content/docs/getting-started/router/main.go

// ...
func printMessages(msg *message.Message) ([]*message.Message, error) {
    fmt.Printf(
        "\n> Received message: %s\n> %s\n> metadata: %v\n\n",
        msg.UUID, string(msg.Payload), msg.Metadata,
    )
    return nil, nil
}

type structHandler struct {
    // we can add some dependencies here
}

func (s structHandler) Handler(msg *message.Message) ([]*message.Message, error) {
    log.Println("structHandler received message", msg.UUID)

    msg = message.NewMessage(uuid.NewV4().String(), []byte("message produced by structHandler"))
    return message.Messages{msg}, nil
}

Done!

We can just run this example by go run main.go.

We just created our first application with Watermill. The full source you can find in /docs/getting-started/router/main.go.

Deployment

Watermill is not a framework. We don’t enforce any type of deployment and it’s totally up to you.

What’s next?

For more detailed documentation you should check documentation topics list.

Examples

We also created some examples, which will show you how you can start using Watermill. The recommended entry point is Your first Watermill application. It contains the entire environment in docker-compose.yml, including Golang and Kafka which you can run with one command.

After that, you could check the Simple app example. It uses more middlewares and contains two handlers. There is also a separate application for publishing messages.

The third example showcases the use of a different Subscriber implementation, namely HTTP. It is a very simple application, which can save GitLab webhooks to Kafka.

You may also find some useful informations in our README .

Support

If anything is not clear feel free to use any of our support channels, we will we’ll be glad to help.