Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- 'use strict';
- const AWS = require('aws-sdk');
- const SQS = new AWS.SQS({ apiVersion: '2012-11-05' });
- const Lambda = new AWS.Lambda({ apiVersion: '2015-03-31' });
- // TODO add your queue URL here
- const QUEUE_URL = '<QUEUE_URL>';
- const PROCESS_MESSAGE = 'process-message';
- function invokePoller(functionName, message) {
- const payload = {
- operation: PROCESS_MESSAGE,
- message,
- };
- const params = {
- FunctionName: functionName,
- InvocationType: 'Event',
- Payload: new Buffer(JSON.stringify(payload)),
- };
- return new Promise((resolve, reject) => {
- Lambda.invoke(params, (err) => (err ? reject(err) : resolve()));
- });
- }
- function process(message, callback) {
- console.log(message);
- // TODO process message
- // delete message
- const params = {
- QueueUrl: QUEUE_URL,
- ReceiptHandle: message.ReceiptHandle,
- };
- SQS.deleteMessage(params, (err) => callback(err, message));
- }
- function poll(functionName, callback) {
- const params = {
- QueueUrl: QUEUE_URL,
- MaxNumberOfMessages: 10,
- VisibilityTimeout: 10,
- };
- // batch request messages
- SQS.receiveMessage(params, (err, data) => {
- if (err) {
- return callback(err);
- }
- // for each message, reinvoke the function
- const promises = data.Messages.map((message) => invokePoller(functionName, message));
- // complete when all invocations have been made
- Promise.all(promises).then(() => {
- const result = `Messages received: ${data.Messages.length}`;
- console.log(result);
- callback(null, result);
- });
- });
- }
- exports.handler = (event, context, callback) => {
- try {
- if (event.operation === PROCESS_MESSAGE) {
- // invoked by poller
- process(event.message, callback);
- } else {
- // invoked by schedule
- poll(context.functionName, callback);
- }
- } catch (err) {
- callback(err);
- }
- };
Add Comment
Please, Sign In to add comment