Kinesisストリームからのイベントを処理するLambdaをローカルで実行する

Icon

70_10

はじめに

Kinesis ストリームからイベントを処理する Lambda 関数をローカルで実行する環境を構築します。

実装する

ローカルで実行するファンクションを用意する

handler.js
module.exports.putRecord = (event, context, callback) => {
  const { Records } = event;

  const records = Records.map(r => ({
    data: Buffer.from(r.kinesis.data, "base64").toString("ascii")
  }));

  callback(null, records);
};

Kinesis ストリームから取得したレコードを返すだけのものです。

ローカル環境に Kinesis を立てる

kinesaliteというモジュールを使って、ローカル環境に Kinesis を立てます。
kinesalite 実行後にストリームの作成も行います。

run-local.js
const kinesalite = require("kinesalite");
const AWS = require("aws-sdk");
const { putRecord } = require("./handler");

const Kinesis = new AWS.Kinesis({
  endpoint: "http://localhost:4567/",
  region: "ap-northeast-1",
  sslEnabled: false
});

function startKinesalite(port) {
  return new Promise((resolve, reject) => {
    const kinesisServer = kinesalite();
    kinesisServer.listen(port, err => (err ? reject(err) : resolve()));
  });
}

async function main() {
  await startKinesalite(4567);

  console.log(`Kinesalite start http://localhost:4567/`);

  await Kinesis.createStream({
    ShardCount: 1,
    StreamName: "sample-stream"
  }).promise();
}

main().catch(console.error);

ローカル Kinesis にレコードが PUT されたときに handler.js の putRecord を実行する

local-kinesis-lambda-runner について

@rabblerouser/local-kinesis-lambda-runnerを使います。
このモジュールに、実行する Lambda ファンクションを引数に渡して実行するだけでローカル実行ができます。

const run = require("@rabblerouser/local-kinesis-lambda-runner");
const lambda = require("./index").handler;

run(lambda);

実行時、環境変数にKINESIS_ENDPOINTSTREAM_NAMEを指定する必要があります。

run-local.js に local-kinesis-lambda-runner を組み込む

run-local.js
const PORT = 4567;
process.env.KINESIS_ENDPOINT = `http://localhost:${PORT}/`;
process.env.STREAM_NAME = "sample-stream";

const run = require("@rabblerouser/local-kinesis-lambda-runner");
const kinesalite = require("kinesalite");
const AWS = require("aws-sdk");
const { putRecord } = require("./handler");

const Kinesis = new AWS.Kinesis({
  endpoint: process.env.KINESIS_ENDPOINT,
  region: "ap-northeast-1",
  sslEnabled: false
});

function startKinesalite(port) {
  return new Promise((resolve, reject) => {
    const kinesisServer = kinesalite();
    kinesisServer.listen(port, err => (err ? reject(err) : resolve()));
  });
}

async function main() {
  await startKinesalite(PORT);

  console.log(`Kinesalite start ${process.env.KINESIS_ENDPOINT}`);

  await Kinesis.createStream({
    ShardCount: 1,
    StreamName: process.env.STREAM_NAME
  }).promise();

  run(putRecord);
}

main().catch(console.error);

実行する

run-local.jsを実行すると、以下のように kinesalite と Kinesis のイベントポーリングが起動します。
あとは、実際に Kinesis に対してレコードを PUT すれば OK です。

$ node run-local.js
Kinesalite start http://localhost:4567/
Found sample-stream!
Polling kinesis for events...
const AWS = require("aws-sdk");
const uuid = require("uuid");

const Kinesis = new AWS.Kinesis({
  endpoint: "http://localhost:4567/",
  region: "ap-northeast-1",
  sslEnabled: false,
});

async function main() {
  const res = await Kinesis.putRecord({
    Data: JSON.stringify({ hello: "world" }),
    PartitionKey: uuid.v1(),
    StreamName: "sample-stream",
  }).promise();
  console.log(res);
}

main().catch(console.error);

おわりに

kinesalitelocal-kinesis-lambda-runner を利用することで簡単にローカルで実行できます。
ソースはGitHub にも上げています。