Jan Amoyo

on software development and possibly other things

Upserting Items into DynamoDB

When updating documents, MongoDB has a useful feature to insert a new document when no document matches the query criteria. This feature is called an upsert. Sadly, as of this writing, DynamoDB misses on this feature out of the box.

Thankfully, there's a way to achieve this. The idea is to do it in 3 steps: (1) Get the previous copy of the item. (2) If a previous copy exists, update it. (3) If it does not exist, insert the item ensuring that concurrent requests do not overwrite each other. Here's a snippet written for Node.js:
function upsert(tableName, partitionKey, sortKey, data) {
  // ...

  // 1. Get the original item
  return _get(partitionKey, sortKey).the(function (original) {
    if (Object.keys(original).length > 0) {
      // 2. Update if item already exists
      return _update(data, original);
    } else {
      // 3. Otherwise, put the item
      return _put(data).catch(function (err) {
        if (err.code === 'ConditionalCheckFailedException') {
          // 3a. Only 1 of the concurrent puts will succeed,
          // the rest should retry recursively
          return this.upsert(tableName, partitionKey, sortKey, data);
        } else {
          throw err;
        }
      });
    }
  });
}
The last part is where it gets tricky -- below is the complete code that illustrates how it is done:
function upsert(tableName, partitionKey, sortKey, data) {

  function _get(partitionKey, sortKey) {
    var params = {
      TableName: tableName,
      Key: {
        partitionKey: partitionKey,
        sortKey:      sortKey
      }
    };

    return docClient.get(params).promise();
  }

  function _update(data, original) {
    var updateExpression = dynamodbUpdateExpression.getUpdateExpression({ data: original }, { data: data });
    var params = Object.assign({
      TableName: tableName,
      Key: {
        partitionKey: partitionKey,
        sortKey:      sortKey
      },
      ReturnValues: 'ALL_NEW',
      ConditionExpression: 'attribute_exists(partitionKey) AND attribute_exists(sortKey)'
    }, updateExpression);

    if (params.UpdateExpression === '') {
      return Promise.resolve();
    }

    return new Promise(function (resolve, reject) {
      return docClient.update(params).promise()
        .then(function (result) { resolve(result.Attributes.data); })
        .catch(reject);
    });
  }

  function _put(data) {
    var params = {
      TableName: tableName,
      Item: {
        partitionKey: partitionKey,
        sortKey:      sortKey,
        data:         data
      },
      ConditionExpression: 'attribute_not_exists(partitionKey) AND attribute_not_exists(sortKey)'
    };

    return docClient.put(params).promise();
  }

  // 1. Get the original item
  return _get(partitionKey, sortKey).the(function (original) {
    if (Object.keys(original).length > 0) {
      // 2. Update if item already exists
      return _update(data, original);
    } else {
      // 3. Otherwise, put the item
      return _put(data).catch(function (err) {
        if (err.code === 'ConditionalCheckFailedException') {
          // 3a. Only 1 of the concurrent puts will succeed,
          // the rest should retry recursively
          return this.upsert(tableName, partitionKey, sortKey, data);
        } else {
          throw err;
        }
      });
    }
  });
}
The trick is to declare a Condition Expression in the put step to ensure that an item only gets inserted if a previous copy does not exist (Line 46). This ensures that when handling concurrent put requests, only the 1st request succeeds and the others fail with a ConditionalCheckFailedException error. We then check for this error type to determine if any of the failed requests should be retried as update requests.

The above code uses dynamodb-update-expression (Line 16) to generate DynamoDB Update Expressions.

Using LocalStorage to Publish Messages Across Browser Windows

Below is a simple JavaScript utility for publishing messages across browser windows of the same domain. This implementation uses the browser's localStorage and the storage event to simulate the behavior of an inter-window topic.
(function (global, window) {
  function Publisher() {
    var PUBLISH_PREFIX = 'publish_';
    this.publish = function (topic, message) {
      message.source    = window.name;
      message.timestamp = Date.now();
      window.localStorage.setItem(PUBLISH_PREFIX + topic,
        JSON.stringify(message));
    };
    this.subscribe = function (topic, callback, alias) {
      var subscriber = function (event) {
        if (event.key === PUBLISH_PREFIX + topic
            && event.newValue !== null) {
          callback(JSON.parse(event.newValue));
        }
      };
      window.addEventListener('storage', subscriber);
      return function () {
        window.removeEventListener('storage', subscriber);
      };
    };
  }
  global.jramoyo = { Publisher: new Publisher() };
})(this, window);
Lines 5 and 6 adds a source and timestamp property to the message to ensure uniqueness
Lines 7 and 8 converts the message to JSON and saves it to the localStorage
Lines 12 and 13 uses the event.key to filter which message should be processed by the callback
Line 14 converts the JSON value to a message objects and passes it as an argument to the callback
Lines 18-20 returns a function that when called, removes the subscriber from the topic

Below is a sample code from a publishing window:
jramoyo.Publisher.publish('greeting_topic', {
    name: 'Kyle Katarn'
});
And here is a sample code from a subscribing window:
jramoyo.Publisher.subscribe('greeting_topic',
    function (message) {
        alert('Greetings, ' + message.name);
    });

This works because every time an item is stored in the localStorage, all browser windows sharing the same localStorage will receive a storage event detailing what has changed (except for the window that wrote to the localStorage).

However, the problem with the above implementation is that it doesn't scale if the number of subscribers increases. Every time a storage event is fired, the JavaScript engine will have to iterate through each listener regardless whether the listener is interested in the event or not.

To address this problem, we can use a map to index the callbacks against the event.key. Below is the updated version of the above code:
(function (global, window) {
  function Publisher() {
    var PUBLISH_PREFIX = 'publish_';
    var listeners = []; 
    window.addEventListener('storage',
      function storageListener(event) {
        var array = listeners[event.key];
        if (array && array.length > 0) {
          var message = JSON.parse(event.newValue);
          array.forEach(function (listener) {
            listener(message);
          });
        }
      }, false);
    this.publish = function (topic, message) {
      message.source    = window.name;
      message.timestamp = Date.now();
      window.localStorage.setItem(PUBLISH_PREFIX + topic,
        JSON.stringify(message));
    }; 
    this.subscribe = function (topic, callback, alias) {
      var key = PUBLISH_PREFIX + topic,
        array = listeners[key];
      if (!array) {
        array = []; listeners[key] = array;
      }
      array.push(callback);
      return function () {
        array.splice(array.indexOf(callback), 1);
      };
    };
  }
  global.jramoyo = { Publisher: new Publisher() };
})(this, window);
Lines 5-14 registers a single storage event listener that uses a map to look-up callbacks identified by the event.key
Lines 22-27 saves the callback into the listeners map, identified by the derived key

Increasing ngRepeat Limit on Scroll

The example below shows how to increase the limitTo filter of ngRepeat everytime the div scrollbar reaches the bottom.

This technique is used to improve performance by only rendering ngRepeat instances that are visible from the view.

First, we create a directive that calls a function whenever the div scrollbar reaches the bottom:
module.exports = function (_module) {
  'use strict';
  _module.directive('bufferedScroll', function ($parse) {
    return function ($scope, element, attrs) {
      var handler = $parse(attrs.bufferedScroll);
      element.scroll(function (evt) {
        var scrollTop    = element[0].scrollTop,
            scrollHeight = element[0].scrollHeight,
            offsetHeight = element[0].offsetHeight;
        if (scrollTop === (scrollHeight - offsetHeight)) {
          $scope.$apply(function () {
            handler($scope);
          });
        }
      });
    };
  });
};
Line 5 compiles the expression passed to the directive.
Line 6 listens for scroll events on the directive element (requires jQuery).
Line 10 checks if the scrollbar has reached the bottom.
Line 12 applies the compiled expression to the scope.

Then, we apply the directive to our view:
<div buffered-scroll="increaseLimit();" ng-init="limit=15;">
  <table>
    <tr ng-repeat="item in items | limitTo:limit">
      ...
    </tr>
  </table>
</div>
Line 1 assigns a function expression to the directive and initializes the limit variable.
Line 3 declares the ngRepeat with a limitTo filter.

Finally, we create the scope function that increases the limit variable if the value is still less than the number of items iterated by ngRepeat.
$scope.increaseLimit = function () {
  if ($scope.limit < $scope.items.length) {
    $scope.limit += 15;
  }
};

A working example is available at Plunker.

The same technique can be used to implement "infinite scroll" by calling a function that appends data from the server instead of increasing the limitTo filter.

Changing Tab Focus Behavior Using Angular

The example below uses 2 Angular directives to change the focus behavior when pressing the tab key.

The first directive is used to assign a name to a 'focusable' element.
module.exports = function (_module) {
  _module.directive('focusName', function () {
    return {
      restrict: 'A',
      link: function ($scope, element, attributes) {
        $scope.focusRegistry = $scope.focusRegistry || {};
        $scope.focusRegistry[attributes.focusName] = element[0];
      }
    };
  });
};
Line 6 lazily creates a scope object called focusRegistry. This object will be used to store all 'focusable' elements within the scope.

Line 7 registers the element using the value of the attribute focusName (focus-name) as the key.

The second directive is used declare the which element will be focused when the tab key is pressed. It will also be responsible for handling the events triggered by pressing the tab key.
module.exports = function (_module) {
  _module.directive('nextFocus', function () {
    return {
      restrict: 'A',
      link: function ($scope, element, attributes) {
        element.bind('keydown keypress', function (event) {
          if (event.which === 9) { // Tab
            var focusElement = $scope.focusRegistry[attributes.nextFocus];
            if (focusElement) {
              if (!focusElement.hidden && !focusElement.disabled) {
                focusElement.focus();
                event.preventDefault();
                return;
              }
            }

            console.log('Unable to focus on target: ' + attributes.nextFocus);
          }
        });
      }
    };
  });
};
Line 7 captures the keydown or keypress event on the tab key (9).

Line 8 retrieves the focus element from the registry using the value of the attribute nextFocus (next-focus).

Lines 10 to 14, moves the focus to the element if it is possible.

If the focus cannot be moved to the specified element, the default behavior of tab will take effect.

Below is an example on how to use both directives:
<input name="1" focus-name="1" next-focus="2" value="next focus goes to 2" />
<input name="2" focus-name="2" next-focus="3" value="next focus goes to 3" />
<input name="3" focus-name="3" next-focus="1" value="next focus goes back to 1" />
<input name="4" value="will not be focused" />
A working example is available at Plunker.

It is important to note that this approach will only work on elements within the same scope.

Testing Angular Directives with Templates on Karma and Browserify

Directives are the cornerstone of every Angular application. And templates help keep their behavior separate from the presentation.

Karma works well with Angular and is an essential tool for running tests against a number of supported browsers.

Lastly, Browserify helps preserve sanity while maintaining JavaScript modules (similar to Node.js i.e. CommonJS spec).

Sadly, integrating all four concepts is not a straightforward process. Below is a rough guide on how to achieve this.

1. Install karma-browserifast and karma-ng-html2js-preprocessor
$ npm install karma-browserifast --save-dev
$ npm install karma-ng-html2js-preprocessor --save-dev
The package karma-browserifast enables Browserify support on Karma. While karma-ng-html2js-preprocessor converts HTML templates to JavaScript and loads them as an Angular module.

2. Configure Karma
module.exports = function (config) {
  config.set({
    files: [
      'node_modules/angular/angular.js',
      'src/**/*.html'
    ],

    browserify: {
      files: [
        'test/unit/**/*.js',
      ],
      debug: true
    },

    preprocessors: {
      '/**/*.browserify': ['browserify'],
      'src/**/*.html':    ['ng-html2js'],
    },

    ngHtml2JsPreprocessor: {
      moduleName: 'karma.templates'
    },

    frameworks: ['jasmine', 'browserify'],
    browsers:   ['Chrome'],
    reporters:  ['spec'],
    logLevel:   'info',
    autoWatch:  true,
    colors:     true,
  });
};
Lines 4 and 5 loads the HTML templates into Karma. Because they will be pre-processed by karma-ng-html2js-preprocessor, they will eventually get loaded as JavaScript files. Note that even if Angular will be included as part of the Browserify bundle, it is important to load it explicitly. Otherwise, the templates cannot be made available to Angular.

Line 10 tells karma-browserifast to include all unit tests as a Browserify bundle. This makes it possible for unit tests to execute JavaScript modules using the "require" keyword.

Line 21 tells karma-ng-html2js-preprocessor to load the JavaScript templates as an Angular module named "karma.templates". This will later be used in unit tests to allow testing of directives that use templates.

3. Write the unit tests
require('../src/app');
require('angular-mocks/angular-mocks');
describe('myDirective', function () {
  var scope, element;

  beforeEach(function () {
      angular.mock.module('karma.templates');
      angular.mock.module('myModule');
      angular.mock.inject(function ($rootScope, $compile) {
          scope   = $rootScope.$new();
          element = $compile('<div my-directive></div>')(scope);

          scope.$digest();
      });
  });

  it('does something', function () {
    expect(1).toBe(1);
  });
});
Lines 1 and 2 demonstrates the capability of a Karma-based unit test to load modules via "require".

Line 7 loads the generated JavaScript templates as an Angular module; without this, the directive will not compile because the template cannot be fetched.

Examples of Streams and Lamdas in Java 8

Below is a code showcasing Streams and Lamdas in Java 8 (written as a JUnit test allow execution of individual methods).
@RunWith(BlockJUnit4ClassRunner.class)
public class Java8Showcase {

  @FunctionalInterface
  public static interface Concat {
    String concat(String str1, String str2);
  }

  @Test
  public void functional_interfaces() {
    // Traditional way of declaring an
    // interface implementation
    @SuppressWarnings("unused")
    Concat _concat = new Concat() {
      @Override
      public String concat(String str1, String str2) {
        return str1.concat(str2);
      }
    };

    Concat contact = (str1, str2) -> str1.concat(str2);
    System.out.println(contact.concat("hello", " world"));
  }

  @Test
  public void function_reference() {
    List<Integer> nums = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
    // nums.stream().reduce((a, b) -> Math.max(a, b))
    Integer max = nums.stream().reduce(Math::max).get();

    System.out.println(max);
  }

  @Test
  public void streams_filter() {
    List<Integer> nums = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
    List<Integer> evens = nums.stream().filter((n) -> n % 2 == 0)
      .collect(Collectors.toList());

    System.out.println(evens);
  }

  @Test
  public void streams_map() {
    List<Integer> nums = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
    List<Integer> x2 = nums.stream().map((n) -> n * 2)
      .collect(Collectors.toList());

    System.out.println(x2);
  }

  @Test
  public void streams_reduce() {
    List<Integer> nums = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
    Integer sum = nums.stream().reduce((a, b) -> a + b).get();

    System.out.println(sum);
  }

  @Test
  public void streams_fluent() {
    List<Integer> nums = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
    Integer sum = nums.stream()
        .filter((n) -> n % 2 == 0) // filter the even numbers
        .map((n) -> n * 2)         // multiply each number by 2
        .reduce((a, b) -> a + b)   // add each number
        .get();

    System.out.println(sum);
  }

  @Test
  public void streams_parallel() {
    List<Integer> nums = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
    Integer sum = nums.parallelStream().reduce((a, b) -> a + b).get();

    System.out.println(sum);
  }

  @Test
  public void streams_forEach() {
    List<Integer> nums = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
    nums.stream().forEach(System.out::println);
  }

  @Test
  public void streams_collectors() {
    List<Integer> nums = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
    Double ave = nums.stream().collect(Collectors.averagingInt((i) -> i));

    System.out.println(ave);
  }

  @Test
  public void streams_collectors_grouping1() {
    List<Integer> nums = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
    Map<Boolean, List<Integer>> oddEven = nums.stream()
      .collect(Collectors.groupingBy((i) -> i % 2 == 0));

    System.out.println(oddEven);
  }

  @Test
  public void streams_collectors_grouping2() {
    List<Integer> nums = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
    Map<Boolean, Integer> oddEvenSum = nums.stream()
      .collect(Collectors.groupingBy((i) -> i % 2 == 0, Collectors.summingInt((Integer i) -> i)));

    System.out.println(oddEvenSum);
  }
}

MongoDB Transaction Across Multiple Documents using Async and Mongoose

Unlike traditional databases, MongoDB does not support transactions. So suppose you have multiple documents and want to perform a "save all or nothing" operation, you'd have to simulate a transaction from within your application.

Implementing such a transaction in NodeJS can be tricky because IO operations are generally asynchronous. This can lead to nested callbacks as demonstrated by the code below:
var MyModel = require('mongoose').model('MyModel');

var docs = [];
MyModel.create({ field: 'value1' }, function (err, doc) {
  if (err) { console.log(err); }
  else {
    docs.push(doc);
    MyModel.create({ field: 'value2' }, function (err, doc) {
      if (err) { rollback(docs); }
      else {
        docs.push(doc);
        MyModel.create({ field: 'value3' }, function (err, doc) {
          if (err) { rollback(docs); }
          else {
            console.log('Done.');
          }
        });
      }
    });
  }
});
Despite not having a dependency on other documents, inserting each document must be performed in series. This is because we have no way of finding out when all other documents will finish saving. This approach is problematic because it leads to deeper nesting as the number of documents increase. Also, in the above example, the callback functions modify the docs variable - this is a side effect and breaks functional principles. With the help of Async, both issues can be addressed.

The parallel() function of Async allows you run multiple functions in parallel. Each function signals completion by invoking a callback to which either the result of the operation or an error is passed. Once all functions are completed, an optional callback function is invoked to handle the results or errors.

The above code can be improved by implementing document insertions as functions of parallel(). If an insert succeeds, the inserted document will be passed to the callback; otherwise, the error will be passed (these can later be used when a rollback is required). Once all the parallel functions complete, the parallel callback can perform a rollback if any of the functions failed to save.

Below is the improved version using Async:
var async    = require('async'),
    mongoose = require('mongoose');

var MyModel = mongoose.model('MyModel');

async.parallel([
    function (callback) {
      MyModel.create({ field: 'value1' }, callback);
    },
    function (callback) {
      MyModel.create({ field: 'value2' }, callback);
    },
    function (callback) {
      MyModel.create({ field: 'value3' }, callback);
    }
  ],
  function (errs, results) {
    if (errs) {
      async.each(results, rollback, function () {
        console.log('Rollback done.');
      });
    } else {
      console.log('Done.');
    }
  });

function rollback (doc, callback) {
  if (!doc) { callback(); }
  else {
    MyModel.findByIdAndRemove(doc._id, function (err, doc) {
      console.log('Rolled-back document: ' + doc);
      callback();
    });
  }
}
Lines 8, 11, and 14 inserts a new document to MongoDB then passes either the saved document or an error to the callback.

Lines 18-21 checks if any of the parallel functions threw an error, and calls the rollback() function on each document passed to callback of parallel().

Lines 28-33 performs the rollback by deleting the inserted documents using their IDs.

Edit: As pointed-out in the comments, the rollback itself can fail. In such scenarios, it is best to retry the rollback until it succeeds. Therefore, the rollback logic must be an idempotent operation.