Kafka
This example demonstrates defining a custom Lambda Trigger that subscribes to a Kafka topic.
Initial Provisioning
graph LR; cli --> control_plane; console --> control_plane; control_plane <-- tunnel --> agent; subgraph user_edge[User Edge] cli[CLI]; console[Console]; end subgraph cloud[Cloud <small>api.on-prem.net</small>] control_plane[Control Plane]; end subgraph device_edge[Device Edge] agent[Agent]; end
Subsequent Autonomous Edge Operation
graph TB; agent --> trigger[Lambda Trigger]; trigger --> lambda1; trigger --> lambda2; trigger --> lambda3; kafka -- subscribe --> trigger; subgraph device_edge[Device Edge] agent; kafka[(Kafka)] end subgraph agent[Agent] trigger; lambda1[Lambda 1]; lambda2[Lambda 2]; lambda3[Lambda 3]; end
Define the lambda trigger
$ onprem generate xid
cj7ei4jerad89eavqu70
# kafka_trigger.yaml
id: cj7ei4jerad89eavqu70
kind: LambdaTriggerType
name: kafka_trigger
description: >
Trigger lambdas driven by a Kafka subscription.
runsAtControlPlane: false
runsAtDevices: true
scriptContentType: text/x-lua
script: >
local kafka = require('kafka')
local settings = {
['bootstrap.servers'] = 'c001-b6-n3:9092,c001-b6-n4:9092,c001-b6-n5:9092',
['auto.offset.reset'] = 'latest',
['group.id'] = 'onprem.lambda-trigger.kafka_trigger',
}
local consumer = kafka.consumer(settings)
local M = {}
function M.init(context)
consumer:subscribe('topic1', 'topic2', 'topic3')
context['consumer'] = consumer
end
function M.run(context)
local consumer = context.consumer
while true do
local message = consumer:poll(1000)
if message then
coroutine.yield(message)
end
end
end
return M
Upload it to the control plane
$ onprem apply kafka_trigger.yaml
It will now show up in the cloud console.
And it will also now show up as one of the trigger choices when editing a Lambda.
When a subscription yields a new message, it will trigger associated Lambda with an event containing the following fields:
timestamp
(number)topic
(string)partition
(number)offset
(number)key
(string)payload
(string)