Source: vertx/event_bus.js

/*
 * Copyright 2011-2012 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

if (typeof __vertxload === 'string') {
  throw "Use require() to load Vert.x API modules";
}

var helpers = require('vertx/helpers');
var Buffer  = require('vertx/buffer');

/**
 * While JSON is the preferred messaging on the event bus,
 * you can send any basic type as a message, for example,
 * <code>string</code>, <code>boolean</code>, etc. can all be passed
 * directly into {@linkcode module:vertx/event_bus.send}. You may also
 * send {@linkcode module:vertx/buffer~Buffer}s and plain old 
 * Javascript objects. Objects will be converted to <code>JSON</code>
 * before being placed on the bus.
 * @see module:vertx/event_bus
 * @typedef {string|boolean|number|{}|JSON|module:vertx/buffer~Buffer} Message
 */

/**
 * A <code>MessageHandler</code> is a {@linkcode Handler} that responds to
 * messages on the {@linkcode module:vertx/event_bus} module. <code>MessageHandler</code>s
 * are called with a {@linkcode Message} object as the parameter.
 *
 * @see module:vertx/event_bus.registerHandler
 * @typedef {function} MessageHandler
 * @param {Message} message The message
 */


/**
 * <p>Represents a distributed lightweight event bus which can encompass
 * multiple vert.x instances.  It is very useful for otherwise isolated vert.x
 * application instances to communicate with each other. Messages sent over the
 * event bus are JSON objects.</p>
 *
 * <p>The event bus implements a distributed publish / subscribe network.
 * Messages are sent to an address.  There can be multiple handlers registered
 * against that address.  Any handlers with a matching name will receive the
 * message irrespective of what vert.x application instance and what vert.x
 * instance they are located in.</p>
 *
 * <p>All messages sent over the bus are transient. On event of failure of all
 * or part of the event bus messages may be lost. Applications should be coded
 * to cope with lost messages, e.g. by resending them, and making application
 * services idempotent.</p>
 *
 * <p>The order of messages received by any specific handler from a specific
 * sender will match the order of messages sent from that sender.</p>
 *
 * <p>When sending a message, a reply handler can be provided. If so, it will
 * be called when the reply from the receiver has been received.</p>
 *
 * <p>This module can be used individually, or through the top-level
 * {@linkcode module:vertx|vertx} module.
 * 
 * @example <caption>Accessing the event bus</caption>
 *
 * var vertx = require('vertx');
 *
 * var eb1 = require('vertx/event_bus');
 * var eb2 = vertx.eventBus;
 *
 * eb1.registerHandler('some-address', function(message) {
 *   print("Got a message! " + message);
 * }
 * eb2.publish('some-address', 'Hello world');
 *
 * @exports vertx/event_bus
 */
var eventBus = {};

var handlerMap = {};

var jEventBus = __jvertx.eventBus();

/**
 * Register a handler which won't be propageted acress the cluster.
 *
 * @param {string} address the address to register for. Any messages sent to
 * that address will be received by the handler. A single handler can be
 * registered against many addresses.
 * @param {MessageHandler} handler The handler
 * @returns {module:vertx/event_bus} The event bus
 */
eventBus.registerLocalHandler = function(address, handler) {
  registerHandler(address, handler, true);
  return eventBus;
};

/**
 * Register a handler.
 *
 * @param {string} address the address to register for. Any messages sent to
 * that address will be received by the handler. A single handler can be
 * registered against many addresses.
 * @param {MessageHandler} handler The handler
 * @param {Handler} [registrationHandler] If supplied, this handler is called
 * when all nodes have registered this address
 *
 * @returns {module:vertx/event_bus} the event bus
 */
eventBus.registerHandler = function(address, handler, registrationHandler) {
  registerHandler(address, handler, false, registrationHandler);
  return eventBus;
};

/**
 * Unregisters a handler.
 *
 * @param {string} address The address the handler is registered to
 * @param {MessageHandler} handler The handler to unregister
 * @param {Handler} [registrationHandler] If supplied, this handler is called
 * when all nodes have unregistered this address
 * @returns {module:vertx/event_bus} the event bus
 */
eventBus.unregisterHandler = function(address, handler, registrationHandler) {
  checkHandlerParams(address, handler);
  var wrapped = handlerMap[handler];
  if (wrapped) {
    if (typeof registrationHandler == 'function') {
      jEventBus.unregisterHandler(address, wrapped, registrationHandler);
    } else {
      jEventBus.unregisterHandler(address, wrapped);
    }
    delete handlerMap[handler];
  }
  return eventBus;
};

/**
 * Sends a message on the event bus.
 *
 * @param {string} address The address to send the message to
 * @param {Message} message The message to send
 * @param {MessageHandler} [replyHandler] called when the message receives a reply
 * @returns {module:vertx/event_bus}
 */
eventBus.send = function(address, message, replyHandler) {
  sendOrPub(true, address, message, replyHandler);
  return eventBus;
};

/**
 * Sends a message on the event bus. If a reply is not received within
 * `timeout` milliseconds, the `replyHandler` will be called with an
 * error and empty message body, then discarded.
 *
 * @param {string} address The address to send the message to
 * @param {Message} message The message to send
 * @param {number} timeout The timeout period in milliseconds
 * @param {ResultHandler} resultHandler called when the message receives a reply, or when the timeout triggers
 * 
 */
eventBus.sendWithTimeout = function(address, message, timeout, replyHandler) {
  sendWithTimeout(address, message, replyHandler, timeout);
  return eventBus;
};

/**
 * Set the default reply time in milliseconds.
 * Unless changed, the default will be -1 which means messages sent on
 * the event bus will never timeout unless `sendWithTimeout()` is
 * used explicitly. Any other value will cause the event bus to
 * timeout reply hanlders in the given number of milliseconds.
 * @param {number} timeout the number of milliseconds to wait before timing out a handler
 * @returns {module:vertx/event_bus}
 */
eventBus.setDefaultReplyTimeout = function(timeout) {
  jEventBus.setDefaultReplyTimeout(timeout);
  return eventBus;
};

/**
 * Get the default reply timeout in milliseconds.
 * Unless changed, the default will be -1 which means messages sent on
 * the event bus will never timeout unless `sendWithTimeout()` is
 * used explicitly. Any other value will cause the event bus to
 * timeout reply hanlders in the given number of milliseconds.
 * @returns {number}
 */
eventBus.getDefaultReplyTimeout = function() {
  return jEventBus.getDefaultReplyTimeout();
};

/**
 * Publish a message on the event bus.
 * Message should be a JSON object It should have a property "address".
 *
 * @param {string} address The address to send the message to
 * @param {Message} message The message to send
 * @returns {module:vertx/event_bus}
 */
eventBus.publish = function(address, message) {
  sendOrPub(false, address, message);
  return eventBus;
};

function checkHandlerParams(address, handler) {
  if (!address) {
    throw "address must be specified";
  }
  if (!handler) {
    throw "handler must be specified";
  }
  if (typeof address != "string") {
    throw "address must be a string";
  }
  if (typeof handler != "function") {
    throw "handler must be a function";
  }
}

var jsonObjectClass = new org.vertx.java.core.json.JsonObject().getClass();
var jsonArrayClass  = new org.vertx.java.core.json.JsonArray().getClass();
var bufferClass     = new org.vertx.java.core.buffer.Buffer().getClass();

// Converts a received message from the java event bus into something
// a little more palatable for javascript
var resultConverter = function(jMsg) {
  var body = jMsg.body();

  // Strings, booleans, numbers, and undefined all
  // get passed across the bus without conversion
  switch(typeof body) {
    case 'string':
    case 'boolean':
    case 'number':
    case 'undefined':
      return body;
    case 'object':
      if (body === null) return undefined;
      break;
  }
    
  // must be some other kind of object - deal with it
  var clazz = body.getClass();
  if (clazz === jsonObjectClass || clazz === jsonArrayClass) {
    // Convert to JS JSON
    body = JSON.parse(body.encode());
  } else if (clazz === bufferClass) {
    // Convert to JS Buffer
    body = new Buffer(body);
  }
  return body;
};

function wrappedHandler(handler) {
  return new org.vertx.java.core.Handler({
    handle: function(jMsg) {
      var body = resultConverter(jMsg);
      var meta = {
        address: jMsg.address(),
        fail: function(code, msg) {
          jMsg.fail(code, msg);
        }
      };
      handler(body, function(reply, replyHandler, timeout) {
        if (typeof reply === 'undefined') {
          throw "Reply message must be specified";
        }
        reply = convertMessage(reply);
        if (replyHandler) {
          if (timeout) {
            jMsg.replyWithTimeout(reply, timeout, helpers.adaptAsyncResultHandler(replyHandler, resultConverter));
          } else {
            jMsg.reply(reply, wrappedHandler(replyHandler));
          }
        } else {
          jMsg.reply(reply);
        }
      }, meta);
    }
  });
}

function registerHandler(address, handler, localOnly, registrationHandler) {
  checkHandlerParams(address, handler);

  var wrapped = wrappedHandler(handler);
  if (typeof localOnly == 'function') {
    registrationHandler = localOnly;
    localOnly = false;
  }

  // This is a bit more complex than it should be because we have to wrap the
  // handler - therefore we have to keep track of it :(
  handlerMap[handler] = wrapped;

  if (localOnly) {
    jEventBus.registerLocalHandler(address, wrapped);
  } else {
    if (typeof registrationHandler == 'undefined') {
      jEventBus.registerHandler(address, wrapped);
    } else {
      jEventBus.registerHandler(address, wrapped, registrationHandler);
    }
  }
  return eventBus;
}

function convertMessage(message) {
  if (message === null || message === undefined) return '';
  var msgType = typeof message;
  switch (msgType) {
    case 'string':
    case 'boolean':
    case 'org.vertx.java.core.json.JsonArray':
    case 'org.vertx.java.core.buffer.Buffer':
      break;
    case 'number':
      message = new java.lang.Double(message);
      break;
    case 'object':
      if (message instanceof Array) {
        message = new org.vertx.java.core.json.JsonArray(message);
      } else if (message instanceof Buffer) {
        message = message._to_java_buffer();
      } else if (typeof message.getClass === "undefined") {
        message = new org.vertx.java.core.json.JsonObject(JSON.stringify(message));
      }
      break;
    default:
      throw 'Invalid type for message: ' + msgType;
  }
  return message;
}

function sendOrPub(send, address, message, replyHandler) {
  if (!address) {
    throw "address must be specified";
  }
  if (typeof address !== "string") {
    throw "address must be a string";
  }
  if (replyHandler && typeof replyHandler !== "function") {
    throw "replyHandler must be a function";
  }
  message = convertMessage(message);
  if (send) {
    if (replyHandler) {
      var wrapped = wrappedHandler(replyHandler);
      jEventBus.send(address, message, wrapped);
    } else {
      jEventBus.send(address, message);
    }
  } else {
    jEventBus.publish(address, message);
  }
  return eventBus;
}

function sendWithTimeout(address, message, replyHandler, timeout) {
  if (!address) {
    throw "address must be specified";
  }
  if (typeof address !== "string") {
    throw "address must be a string";
  }
  if (replyHandler && typeof replyHandler !== "function") {
    throw "replyHandler must be a function";
  }
  jEventBus.sendWithTimeout(address, convertMessage(message), timeout, helpers.adaptAsyncResultHandler(replyHandler, resultConverter));
  return eventBus;
}

module.exports = eventBus;