Vert.x: how to handle retry with the EventBus?

Vert.x is a polyglot library that helps to develop reactive applications. Among all its features, Vert.x comes with a built-in EventBus, which is a masterpiece in a event-driven (micro-services) architecture.

What you will read about in this post:

  • how to handle a retry on a request-response of the Vert.x EventBus with a classical Handler
  • … with a RxJava flavor
  • … with a circuit-breaker
  • if we can play ping-pong with Vert.x… forever…

The Vert.x EventBus enables to send messages to an address (which is basically a String). Vert.x handlers that have been registered to this address, can then process the message once this one is received. The Vert.x EventBus supports different messaging patterns:

  • point-to-point: the message is sent to a single consumer that listens to a given address
  • request-response: the message is sent to a single consumer that listens to a given address and the consumer can send back a reply to the initial sender
  • publish-subscribe: the message is sent to all the consumers that listen to a given address

The delivery of the message is best-effort as stated in the Vert.x documentation:

Vert.x does it’s best to deliver messages and won’t consciously throw them away. This is called best-effort delivery.
However, in case of failure of all or parts of the event bus, there is a possibility messages will be lost.
If your application cares about lost messages, you should code your handlers to be idempotent, and your senders to retry after recovery.

And while reading this part of the doc again, something that was on my mind for a long time came back: the retry capability is a nice thing, how do we do that?

How do we do that?

When thinking about this issue, the first solution that came to my mind was a RxJava one because this reactive library has nice operators (as we will see it below). But let’s explore first the solution with the “traditional” Vert.x handlers.

So, what do we do? We will implement a request-response example with potential failures and retry operation. Let’s say how sender will send a “Hello” message to a given address. Let’s say we have a message consumer that listens to the same address and that is supposed to reply “World!!!” (Damn! That looks like a “Hello World” example!).

But for some reasons, the consumer fails twice when receiving the message (that’s the drama of our story…) making the sender retrying again and again until the consumer replies with success (the third time. That’s the happy ending and yes, I’ve spoiled it, sorry…).

So, where to begin? Let’s implement our consumer. We wrap it within a Vert.x verticle:

public class ReplierWithFailureVerticle extends AbstractVerticle {
  private int counter = 0;

  @Override
  public void start() throws Exception {
     vertx.eventBus() // (1)
          .consumer("hello.handler.failure.retry", m -> { // (2) (3) (4)
              counter = counter + 1;
              if (counter < 3) {
                 m.fail(500, "failed to reply, sorry... ¯\\_(⊙︿⊙)_/¯"); // (5)

              } else {
                 counter = 0;
                 m.reply("World!!!"); // (6)
              }
          });
    }
}

(1) we get the Vert.x EventBus and declare a consumer to a given address (2). We register a Handler (3) to manage the received message (4).

To notify the sender there was a problem, we use the message.fail(.) method (5). To send back a “normal” response, we use the message.reply(.) method (6).

Retry with a Handler

public class RetryWithHandlerVerticle extends AbstractVerticle {

  @Override
  public void start() throws Exception {
     vertx.eventBus() // (1)
          .send("hello.handler.failure.retry", // (2) 
                "Hello", // (3) 
                 new Handler<AsyncResult<Message<String>>>() { // (4)
                    private int count = 1;

                    @Override
                    public void handle(final AsyncResult<Message<String>> aResult) {
                       if (aResult.succeeded()) { // (5)
                          System.out.printf("received: \"%s\"\n", aResult.result().body()); 

                       } else if (count < 3) { // (6)
                          System.out.printf("retry count %d, received error \"%s\"\n", 
                                            count, aResult.cause().getMessage());
                          vertx.eventBus().send("hello.handler.failure.retry", "Hello", this); // (7)
                          count = count + 1;

                       } else {
                          aResult.cause().printStackTrace(); // (8)

                       }
                    }
          });
    }
}

(1) we get the Vert.x EventBus and send our “Hello” message (3) to a given address (2) and register a Handler to deal with the response of the consumer (4). When working with Java 8, you usually use lambdas but as we need to count the number of failures, we declare our handler with an anonymous declaration.

If the handler receives a successful reply (5), we print the response. If the handler receives a failure response, we increment our counter up to 2 (6) (we are patient but not too much…) and try to send back the same message (7). Note that we pass this as handler: all the trick is here!

If ever we have received more than 2 failure messages, we give up (8).

Retry with RxJava

Vert.x provides a nice extension for RxJava. This extension enables to use almost the same Vert.x APIs except that these Rx-ified APIs don’t take as a last parameter a handler to deal with asynchronous operation. It returns a Rx- Single or Observable instead.

And among all the nice operations offered by RxJava, guess what, there is a retry(.) operation! Actually, you have a little more than that: retry(.), retryWhen(.) and for RxJava 2 (that will be soon supported by Vertx and may already by the time you read this article) retryUntil(.). And each of these operators are declined in different versions. For instance, retry(.) can take no argument, a long as argument (the number of times you wish to retry) or a Function2 that enables you to handle trickier cases. For our sample, we use the simpler one. We want to retry twice.

public class RetryWithRxJavaVerticle extends AbstractVerticle {

  @Override
  public void start() throws Exception {
     vertx.eventBus() // (1)
          .<String>rxSend("hello.rx.failure.retry", "Hello") // (2) (3)
          .retry(2) // (4)
          .map(Message::body) // (5)
          .subscribe(m -> System.out.printf("received: \"%s\"\n", m), // (6)
                     Throwable::printStackTrace); // (7)
  }
}

As for the previous code, we get the Vert.x EventBus (but the Rx-ified one) to send to a given address (2) the “Hello” message (3). As returned of this operation, we get a Single<Message<String>>. Single<> is RxJava that returns a single item while Observable<> returns a stream of item.

Since we get a RxJava Single, we can apply the RxJava operator retry(). And as we are not patient, we retry twice (4). We then extract the body of the message (5) and subscribe to the Rx chain: if we receive the message, we print it (6). If there is still a failure after the two attempts, we print the stacktrace (7) (right, that’s not the nicer error handling we can find, but that’s a demo… after all).

Nice, isn’t it?

Would you like to do some more complex stuff? You can use the retry operator that takes a Function2 as argument:

public class RetryWithRxJavaVerticle extends AbstractVerticle {

  @Override
  public void start() throws Exception {
     vertx.eventBus()
          .<String>rxSend("hello.rx.failure.retry", "Hello"
          .retry((count, error) -> { // here, we do bother, just to print the count
             System.out.printf("retry count %d, received error \"%s\"\n",
                                count, error.getMessage());
             return count < 3;
          })
          .map(Message::body)
          .subscribe(m -> System.out.printf("received: \"%s\"\n", m),
                     Throwable::printStackTrace);
  }
}

Want to do trickier stuff like (exponential back-off, etc.)? Have a look at the repeatWhen(.) operator (and if you read this article by this autumn, the repeatUntil(.) operator) and this excellent article from Dan Lew! There is lots of stuff you can do with one or a combination of RxJava operators! (we love RxJava, if you haven’t noticed it yet :)).

Is that all? Well, no. Guess what? There is a third way to do it!

Retry with the circuit-breaker

The circuit-breaker is a very popular pattern in the micro-services world: it enables to handle reliability and failure. By default, the circuit-breaker is in a “close” state: the operation it monitors is performed. Whenever a failure occurs, the circuit-breaker increases its counter of failure. If a threshold is reached, the circuit-breaker state changes to “open”. The original operation is not called anymore but you can supply a fallback instead. After a while, the circuit-breaker can switch to the “half-open” state: the circuit-breaker calls the original operation once and the fallback for the other calls. If the call to the original operation is successful, then the circuit-breaker goes back to the “close” state.

And guess what? Vert.x has a circuit-breaker extension that enables to settle a circuit-breaker. There are lots of options to configure your circuit-breaker. We will focus only on the retry feature. So, let’s go!

public class RetryWithCircuitBreakerVerticle extends AbstractVerticle {
  @Override
  public void start() throws Exception {
     CircuitBreaker breaker 
            = CircuitBreaker.create("my-circuit-breaker", vertx, // (1)
                new CircuitBreakerOptions()
                  .setMaxRetries(3)
                  .setMaxFailures(3) // number of failure before opening the circuit
//                .setTimeout(8500) // consider a failure if the operation does not succeed in time
//                .setFallbackOnFailure(false) // do we call the fallback on failure
//                .setResetTimeout(10000) // time spent in open state before attempting to re-try
              );

     breaker.execute(fut -> // (2) (3)
         vertx.eventBus()
              .<String>send("hello.handler.failure.circuit-breaker.retry", "Hello", 
                            ar -> {
                              if (ar.succeeded()) {
                                 fut.complete(ar.result().body()); // (4)

                              } else {
                                 System.out.printf("received \"%s\"\n", 
                                                   ar.cause().getMessage());
                                 fut.fail(ar.cause().getMessage()); // (5)
                              }
                }))
                .setHandler(ar -> {
                   if (ar.succeeded()) { // (6)
                     System.out.printf("received \"%s\"\n", ar.result());

                   } else {
                     ar.cause().printStackTrace();
                   }
                });
    }
}

First, we declare and configure our circuit-breaker (1). Then, we wrap our call with the circuit-breaker (2). The execute(.) method of the circuit-breaker takes a Future (3). The wrapped operation will then complete (4) or fail (5) the given future. A handler will deal with the result of the execution of the circuit-breaker (6).

That’s it! The circuit-breaker offers nice options: fallback, timeout, etc. Depending on your business logic, it may be worthwhile having a look at it.

Conclusion

With the Vert.x EventBus, you can implement nice request-response messaging patterns with the icing on the cake, the ability to deal with retry attempts. Depending on your business logic, you can opt for a simple retry with a Handler or with a Rx-ified version and the retry(.) operator.

RxJava offers nice operators that can give you powerful options to implement more complex retry logics.

At last, the circuit-breaker is also a nice challenger to implement complex retry logics and deal with failure.

The full code can be found on Github.

So, you have the choice of the weapons 😉

So, you have the choice of the weapons!

Takeaway n°1

Vert.x EventBus has a timeout when sending a message: if the timeout is reached and the EventBus hasn’t managed to send the message to a consumer, then the EventBus returns a failure to the handler that will deal with the reply. By default, the timeout is 30s. You can configure the timeout by supplying a configured DeliveryOptions to the send method as 3rd argument:

public class RetryWithHandlerVerticle extends AbstractVerticle {

  @Override
  public void start() throws Exception {
    DeliveryOptions deliveryOptions = new DeliveryOptions().setSendTimeout(1500);

    vertx.eventBus()
         .send("hello.handler.timeout.retry", 
               "Hello", 
               deliveryOptions, 
               new Handler<AsyncResult<Message<String>>>() {
                  private int count = 1;

                  @Override
                  public void handle(final AsyncResult<Message<String>> aResult) {
                    if (aResult.succeeded()) {
                      System.out.printf("received \"%s\"\n", aResult.result().body());

                    } else if (count < 3) {
                      System.out.printf("retry count %d, received error \"%s\"\n",
                                               count, aResult.cause().getMessage());
                      vertx.eventBus().send("hello.handler.timeout.retry", 
                                            "Hello", 
                                            deliveryOptions, 
                                            this);
                      count = count + 1;

                    } else {
                      aResult.cause().printStackTrace();

                    }
                 }
         });
  }
}

Dealing with timeout and attempting to retry to send the message is a nice use-case of what we have seen above.

Takeaway n°2

Once we get the response of the consumer, can we send back a reply?

Can we reply?

Hum, the answer is… Yes! One could think, we just have to get the response message and call reply(.) on it. That’s the case. If you wish to get back a response, you can supply a Handler to deal with it.

Here is the code for a sender that sends “ping”, a consumer that replies “pong”, the sender that replies “pang”, etc.

public class PingVerticle extends AbstractVerticle {
  @Override
  public void start() throws Exception {
    vertx.eventBus()
         .<String>send("ping.pong.topic", "ping", ar -> {
             if (ar.succeeded()) {
               Message<String> message = ar.result();

               System.out.printf("[%s] received message: %s\n", PingVerticle.class.getName(), 
                                 message.body());
               message.reply("pang", ar2 -> {
                 if (ar2.succeeded()) {
                    System.out.printf("[%s] received message: %s\n", PingVerticle.class.getName(), 
                                      ar2.result().body());
                     ar2.result().reply("peng", ar3 -> {
                     if (ar3.succeeded()) {
                        System.out.printf("[%s] received message: %s\n", 
                                          PingVerticle.class.getName(), 
                                          ar3.result().body());

                         } else {
                             ar3.cause().printStackTrace();
                         }
                      });
                  } else {
                      ar2.cause().printStackTrace();
                 }
               });
             } else {
                 ar.cause().printStackTrace();
             }
         });
  }
}

You have the beginning of a callback hell chain and you can fix it by reading this article or having a look at the RxJava version: here.

sender - consumer
 ping  -  pong
 pang  -  pung
 peng  -  pyng

And then, the message exchange stops.

But… Can we play Ping-Pong with Vert.x? … forever?

Ping pong forever

Sometimes you have stupid questions that come to your mind? (may be more often than you wish…). Well, at least, this question came to my mind. And I can surely answer: Yes, you can!

For a solution with a Handler, simply declares a Handler and pass its reference to the reply(.) method:

message.reply(“new message”, this);

For the RxJava solution, do almost the same with a Subscriber that subscribes itself to the reply.

Note the code is similar for the one that sends the first message and for the one that replies to it.

The full code can be found here.

Of course, that’s a stupid use-case except if you really wish to implement a Ping-Pong example… And I’ve just found out the origin of this subliminal idea resides in the Vert.x documentation (thanks Vert.x team! ;)):

When a message is received by a recipient, and has been handled, the recipient can optionally decide to reply to the message. If they do so the reply handler will be called.

When the reply is received back at the sender, it can be replied to as well. This can be repeated ad-infinitum, and allows a dialog to be set-up between two different verticles.

Takeaway n°3

If you want to dive into Vert.x, the free e-book Building Reactive Microservices in Java from Clément Escoffier is a good starting point to get an insight of the super-powers of this library.

The Gentle guide to asynchronous programming with Vert.x from the Vert.x team is also a tremendous resource that’s worth being read.

If you like to code to grasp the concepts, you can dive into the workshop From zero to (micro-)hero, which gives a nice overview of the main Vert.x capabilities.

If you’re more a movies fan, you can have a look at these talks: here and here.

And last but not least, the Vert.x documentation is also awesome. So, feel free to read it!

Share it :
0000

Give it a try!

Try streaming any JSON REST API within 30 sec
curl -v "https://proxy.streamdata.io/http://mysite.com/myJsonRestService?param1=[]&param2=[]"

Leave a Reply

Your email address will not be published. Required fields are marked *