on software development and possibly other things

Upserting Items into DynamoDB

No comments
When updating documents, MongoDB has a useful feature to insert a new document when no document matches the query criteria. This feature is called an upsert. Sadly, as of this writing, DynamoDB misses on this feature out of the box.

Thankfully, there's a way to achieve this. The idea is to do it in 3 steps: (1) Get the previous copy of the item. (2) If a previous copy exists, update it. (3) If it does not exist, insert the item ensuring that concurrent requests do not overwrite each other. Here's a snippet written for Node.js:
function upsert(tableName, partitionKey, sortKey, data) {
  // ...

  // 1. Get the original item
  return _get(partitionKey, sortKey).the(function (original) {
    if (Object.keys(original).length > 0) {
      // 2. Update if item already exists
      return _update(data, original);
    } else {
      // 3. Otherwise, put the item
      return _put(data).catch(function (err) {
        if (err.code === 'ConditionalCheckFailedException') {
          // 3a. Only 1 of the concurrent puts will succeed,
          // the rest should retry recursively
          return this.upsert(tableName, partitionKey, sortKey, data);
        } else {
          throw err;
        }
      });
    }
  });
}
The last part is where it gets tricky -- below is the complete code that illustrates how it is done:
function upsert(tableName, partitionKey, sortKey, data) {

  function _get(partitionKey, sortKey) {
    var params = {
      TableName: tableName,
      Key: {
        partitionKey: partitionKey,
        sortKey:      sortKey
      }
    };

    return docClient.get(params).promise();
  }

  function _update(data, original) {
    var updateExpression = dynamodbUpdateExpression.getUpdateExpression({ data: original }, { data: data });
    var params = Object.assign({
      TableName: tableName,
      Key: {
        partitionKey: partitionKey,
        sortKey:      sortKey
      },
      ReturnValues: 'ALL_NEW',
      ConditionExpression: 'attribute_exists(partitionKey) AND attribute_exists(sortKey)'
    }, updateExpression);

    if (params.UpdateExpression === '') {
      return Promise.resolve();
    }

    return new Promise(function (resolve, reject) {
      return docClient.update(params).promise()
        .then(function (result) { resolve(result.Attributes.data); })
        .catch(reject);
    });
  }

  function _put(data) {
    var params = {
      TableName: tableName,
      Item: {
        partitionKey: partitionKey,
        sortKey:      sortKey,
        data:         data
      },
      ConditionExpression: 'attribute_not_exists(partitionKey) AND attribute_not_exists(sortKey)'
    };

    return docClient.put(params).promise();
  }

  // 1. Get the original item
  return _get(partitionKey, sortKey).the(function (original) {
    if (Object.keys(original).length > 0) {
      // 2. Update if item already exists
      return _update(data, original);
    } else {
      // 3. Otherwise, put the item
      return _put(data).catch(function (err) {
        if (err.code === 'ConditionalCheckFailedException') {
          // 3a. Only 1 of the concurrent puts will succeed,
          // the rest should retry recursively
          return this.upsert(tableName, partitionKey, sortKey, data);
        } else {
          throw err;
        }
      });
    }
  });
}
The trick is to declare a Condition Expression in the put step to ensure that an item only gets inserted if a previous copy does not exist (Line 46). This ensures that when handling concurrent put requests, only the 1st request succeeds and the others fail with a ConditionalCheckFailedException error. We then check for this error type to determine if any of the failed requests should be retried as update requests.

The above code uses dynamodb-update-expression (Line 16) to generate DynamoDB Update Expressions.

No comments :

Post a Comment