Kafka

This example demonstrates defining a custom Lambda Trigger that subscribes to a Kafka topic.

Initial Provisioning

graph LR;

cli --> control_plane;
console --> control_plane;
control_plane <-- tunnel --> agent;

subgraph user_edge[User Edge]
cli[CLI];
console[Console];
end

subgraph cloud[Cloud <small>api.on-prem.net</small>]
control_plane[Control Plane];
end

subgraph device_edge[Device Edge]
agent[Agent];
end

Subsequent Autonomous Edge Operation

graph TB;

agent --> trigger[Lambda Trigger];
trigger --> lambda1;
trigger --> lambda2;
trigger --> lambda3;
kafka -- subscribe --> trigger;

subgraph device_edge[Device Edge]
agent;
kafka[(Kafka)]
end

subgraph agent[Agent]
trigger;
lambda1[Lambda 1];
lambda2[Lambda 2];
lambda3[Lambda 3];
end

Define the lambda trigger

$ onprem generate xid
cj7ei4jerad89eavqu70
# kafka_trigger.yaml
id: cj7ei4jerad89eavqu70
kind: LambdaTriggerType
name: kafka_trigger
description: >
  Trigger lambdas driven by a Kafka subscription.
runsAtControlPlane: false
runsAtDevices: true
scriptContentType: text/x-lua
script: >
  local kafka = require('kafka')

  local settings = {
    ['bootstrap.servers'] = 'c001-b6-n3:9092,c001-b6-n4:9092,c001-b6-n5:9092',
    ['auto.offset.reset'] = 'latest',
    ['group.id']          = 'onprem.lambda-trigger.kafka_trigger',
  }
  local consumer = kafka.consumer(settings)
  
  local M = {}

  function M.init(context)
    consumer:subscribe('topic1', 'topic2', 'topic3')
    context['consumer'] = consumer
  end

  function M.run(context)
    local consumer = context.consumer
    while true do
      local message = consumer:poll(1000)
      if message then
        coroutine.yield(message)
      end
    end
  end

  return M

Upload it to the control plane

$ onprem apply kafka_trigger.yaml

It will now show up in the cloud console.

Cloud Console

And it will also now show up as one of the trigger choices when editing a Lambda.

Editing a Lambda

When a subscription yields a new message, it will trigger associated Lambda with an event containing the following fields:

  • timestamp (number)
  • topic (string)
  • partition (number)
  • offset (number)
  • key (string)
  • payload (string)