Empowering Event-Driven Architecture using AWS EventBridge

Context

Data has become the centre of our lives. We consume many sources of data and handle notifications (i.e. events) from different devices in order to live and work such as requesting Uber or receiving an SMS to pick up your partner.  

The same is becoming true of many systems. Event-driven architectures are becoming more and more popular as it delivers many benefits such as systems decoupling, reducing processing time and speeding up reaction time just to name a few. As an example, consider a trading halt event, which is when an exchange stops trading on specific stock for a certain period of time due to news announcements or the price getting lower than the reference threshold.

The technologies to deliver event-driven systems have evolved over time to deliver better systems integration/decoupling and developer experience. Without a doubt, AWS EventBridge is the new cool kid in town. Today, we are going to explore EventBridge and shed some light on when to and not to use it.

EventBridge

AWS has multiple services to empower event-driven systems. For instance, in addition to EventBridge, there are AWS SNS (Simple Notification Service) and AWS SQS (Simple Queue Service). The question is what are the differences and when to use EventBridge over SQS or SNS?

Similarities with SNS and SQS

  • SNS, SQS and EventBridge are services to design asynchronous communication between systems.
  • SNS and SQS and EventBridge are all serverless services. meaning there are no servers (we can see) to provision and pricing is based on actual usage (e.g. a number of events).

EventBridge main features

  • Native capabilities to integrate with more services including more AWS services & third party systems. There are over 100 built-in events sources and targets (source).
  • Public schema registry, the ability to navigate the event schema without the need to trigger an event.
  • Event filtering via rules.
  • Ability to schedule events, i.e. define cron jobs to publish events.

Use Case

To put the discussion in context, I've created a use case scenario we will use throughout the rest of the post.

User story

As a busy developer and not having enough time to check the internet, I want to be notified of the Australian Google Search trends so that I'm aware of what's happening.

How to achieve this? setting up a small service to pull Google search trends i.e. "The Puller". It will pull the trends, do some transformation and push the events to the EventBridge service. The EventBridge service has rules to filter based on category  (based on some keywords to differentiate topics I'm interested in). The EventBridge service will post some events to a general channel and message others as a direct message to myself on Slack. The events will be delivered to Cloudwatch for tracing and logging with different logs groups.

The Big Picture

The 2 minutes architecture diagram will be like this:

The devil i.e details

I will use AWS SDK to create the stack. Please follow me while I will walk through the implementation details. You will find a sample of the codebase in this Github repo: https://github.com/MakerXStudio/EventBridgeDemo

The Puller i.e. the lambda function

The puller has two purposes. The first is to grab the Google trends and the second part is to putEvents in EventBridge.

On the first one to pull Google Trends. It uses the public Google API https://trends.google.com/trends/api/dailytrends?geo=AU. According to Google docs, there are many parameters that you can but the geo i.e. graphic location is the only mandatory one. The code snippet shows how to call it

const https = require("https");

const geo_australia = "AU";
const googleTrendsUrl = "https://trends.google.com";
const googleTrendsApiUrl = `${googleTrendsUrl}/trends/api/dailytrends?geo=${geo_australia}`;

let dataString = "";

const response = await new Promise((resolve, reject) => {
    // Call Google Trends API
    const req = https.get(googleTrendsApiUrl, function (res) {
      res.on("data", (chunk) => {
        dataString += chunk;
      });
      res.on("end", async () => {
          // continue the logic
      });
    });

    req.on("error", (e) => {
      reject({
        statusCode: 500,
        body: "Something went wrong!",
      });
    });
    ```

Then grabbing the array of trends of today using:

const trendsArray =
  JSON.parse(dataString).default.trendingSearchesDays[0]
    .trendingSearches;

The second part of the lambda function is to post the events to AWS EventBridge. We will start by preparing the message. According to AWS documentation the payload request is as follow:

{
   "Entries": [ 
      { 
         "DetailType": "string",
         "EventBusName": "string",
         "Resources": [ "string" ],
         "Source": "string",
         "Time": number,
         "TraceHeader": "string",
         "Detail": "string",
      }
   ]
}

I started constructing the EventBridge events. And then I constructed the Slack messages based on https://api.slack.com/messaging/composing/layouts using these small helper functions I created

 // The trends event list with only 10 trends, eventbridge limit
const eventBridgeTrendsItems = trendsArray.slice(0, 10).map((trend) => {
  return constructEvent(trend)
});

// Construct the EventBridge payload
const eventBridgeTrendsArray = {
  Entries: eventBridgeTrendsItems,
};

And the helpers functions as:

function constructEvent(trend) {
  return {
    // Event fields
    Source: "makerx.googleTrends",
    EventBusName: "google-trends",
    DetailType: getTopicFromTitle(trend),
    Time: new Date(),

    // Event details (slack message)
    Detail: constructSlackMessage(trend)
  };
}

// construct the Slack Message based on
// https://api.slack.com/messaging/composing/layouts
function constructSlackMessage(trend) {
  return JSON.stringify({
    text: `Hey, "${trend.title.query}" topic is trending in AU`,
    attachments: [
      {
        title: `${trend.articles[0].title}`,
        title_link: `${trend.articles[0].url}`,
        image_url: trend.articles[0].image
          ? `${trend.articles[0].image.imageUrl}`
          : "",
      },
    ],
  });
}

// get the interesting topics based on keywords covid for now
function getTopicFromTitle(trend) {
  let topic = "general";
  if (
    trend.title.query.toLowerCase().includes("covid") ||
    trend.articles[0].title.toLowerCase().includes("covid")
  ) {
    topic = "interesting";
  }
  return topic;
}

And after the hard work above, comes the easy part to put the events into EventBridge:

const AWS = require("aws-sdk");
AWS.config.region = "ap-southeast-2";
const eventbridge = new AWS.EventBridge();

...

// push to EventBridge
const eventbridgeResponse = await eventbridge
  .putEvents(eventBridgeTrendsArray)
  .promise();

resolve({
  // return 500 if there is a failure to push events
  statusCode: JSON.stringify(eventbridgeResponse).includes("ErrorCode")
    ? 500
    : 200,
  body: JSON.stringify(eventBridgeTrendsArray, null, 4),
});

You will note I sliced the array to 10 items due to the hard limit in putEvents function. Definitely, you can post more than 10 events to the EventBridge using more than one request but for the sake of the demo, I didn't bother to check how many trends I should send and truncated the array to 10 items only.

Note: Google Trends just works, it seems that it is not in active development as there is a bug that causes the response to come back starting with a weird ")]}',", you will find the code in the Lambda function to remove it.

// Oppps Google, there is a bug in Google API: sometime the response start with ")]}',"
if (dataString.startsWith(googleBugText)) {
  dataString = dataString.replace(googleBugText, "");
}

In the CDK, Setting up the Lambda and the API gateway (to test the stack is as follows):

const thePullerFn = new lambda.Function(this, "the-puller-lambda-fn", {
  runtime: lambda.Runtime.NODEJS_12_X,
  handler: "index.handler",
  code: lambda.Code.fromAsset("the-puller-lambda-fn"),
});

new apigw.LambdaRestApi(this, "the-puller-api-gateway", {
  handler: thePullerFn,
});

The EventBridge Setup

Quick into of different sections of EventBridge:

  1. Eventbus is the part that receives all events from different sources and runs rules on events
  2. Destinations consists of three major types: AWS Services, Partner Services or API destinations. The first two types are self-explanatory. The third type is "API destination", which describe external services targets that can be invoked using HTTP methods. You will need it if you are considering consuming the events from external services i.e non AWS services or AWS Partners services.
  3. Rules is where you define logic to deliver events to different consumers based on criteria. It connects the Event Bus with the destinations.

You have the option to define a new Event Bus or use the default one. I chose to create a new one using CDK. You need to set up the permissions i.e. policy for what service and who can push events to the newly-created event bus.

import * as events from "aws-cdk-lib/aws-events";
import * as iam from "aws-cdk-lib/aws-iam";

const bus = new events.EventBus(this, "google-trends-bus", {
  eventBusName: process.env.EVENTBUS_NAME,
});

new iam.PolicyStatement({
  resources: [
    `arn:aws:events:${process.env.CDK_DEFAULT_REGION}:${process.env.CDK_DEFAULT_ACCOUNT}:*/*`,
  ],
  sid: "allow_account_to_put_events",
  principals: [new iam.AnyPrincipal()],
});

new events.CfnEventBusPolicy(this, "MyCfnEventBusPolicy", {
  statementId: "allow_account_to_put_events",
  eventBusName: bus.eventBusName,
  action: "events:PutEvents",
  principal: "*",
});

Then in order to create the destinations (In our case, API destinations), there are a couple of things we need to prepare. First setting up a Connection which is required to set up the EventBridge authorisation to invoke ApiDestination.

const cfnConnection = new events.CfnConnection(
  this,
  "apiDestinationConnection",
  {
    authorizationType: "API_KEY",
    authParameters: {
      ApiKeyAuthParameters: {
        ApiKeyName: process.env.EVENTBRIDGE_API_DESTINATION_KEY,
        ApiKeyValue: process.env.EVENTBRIDGE_API_DESTINATION_SECRET,
      },
    },
    description: "EventBridge Api destination connection",
    name: "apiDestinationConnection",
  }
);

And then we created a role to grant eventBridge to invoke API Destination

 const invokeApiDestinationPolicy = new iam.PolicyDocument({
      statements: [
        new iam.PolicyStatement({
          resources: [
            `arn:aws:events:${process.env.CDK_DEFAULT_REGION}:${process.env.CDK_DEFAULT_ACCOUNT}:api-destination/*/*`,
          ],
          actions: ["events:InvokeApiDestination"],
        }),
      ],
    });

    const invokeApiDestinationRole = new iam.Role(
      this,
      "eventBridgeTargetRole",
      {
        assumedBy: new iam.ServicePrincipal("events.amazonaws.com"),
        description: "eventBridgeTargetRole",
        inlinePolicies: {
          invokeApiDestinationPolicy,
        },
      }
    );

Now, We can set up the API destinations to call Slack Webhooks. You will notice that we used the previously resource cfnConnection.  

const generalApiDestination = new events.CfnApiDestination(
  this,
  "GeneralTrendsSlack",
  {
    connectionArn: cfnConnection.attrArn,
    httpMethod: "POST",
    invocationEndpoint: process.env.SLACK_SEND_MESSAGE_TO_CHANNEL_URI || "",
    description: "API Destination to post general trends to slack channel",
    invocationRateLimitPerSecond: 5,
    name: "GeneralTrendsSlack",
  }
);

const interestingApiDestination = new events.CfnApiDestination(
  this,
  "interestingTrendsSlack",
  {
    connectionArn: cfnConnection.attrArn,
    httpMethod: "POST",
    invocationEndpoint: process.env.SLACK_SEND_DM_TO_MO_URI || "",
    description: "API Destination to post messages to Mo",
    invocationRateLimitPerSecond: 5,
    name: "InterestingTrendsSlack",
  }
);

Just before creating the rules, we need to create the two CloudWatch log groups to log the messages for each rule separately.

import * as logs from "aws-cdk-lib/aws-logs";

const generalTrendsLogGroup = new logs.LogGroup(
  this,
  "/aws/events/generalLogGroup",
  {
    logGroupName: "/aws/events/generalLogGroup",
    removalPolicy: RemovalPolicy.DESTROY
  }
);
    
const interestingTrendsLogGroup = new logs.LogGroup(
  this,
  "/aws/events/interestingLogGroup",
  {
    logGroupName: "/aws/events/interestingLogGroup",
    removalPolicy: RemovalPolicy.DESTROY
  }
);

And the last step is to create the EventBridge Rules. Setting the Rules is the most important where you glue everything together. In my demo, I created two rules one to post to the Slack channel google-trends and the other one to send a direct message. It is based on the event pattern. I decided to differentiate between the two messages on the detail-type  field in the event. and note where I use InputPath to send only the $.detail section of the event to Slack. Review the Lambda code above where the Slack payload was constructed.    

// Create the Rule using escape hatch
new cdk.CfnResource(this, "general-trends-rule", {
  type: "AWS::Events::Rule",
  properties: {
    Description: "EventRule",
    State: "ENABLED",
    EventBusName: bus.eventBusName,
    EventPattern: { "detail-type": ["general"] },
    Targets: [
      {
        Arn: generalApiDestination.attrArn,
        RoleArn: invokeApiDestinationRole.roleArn,
        Id: "postToSlackChannel",
        InputPath: "$.detail",
      },
      {
        Arn: generalTrendsLogGroup.logGroupArn,
        Id: "generalCloudwatch",
      },
    ],
  },
});

new cdk.CfnResource(this, "interesting-trends-rule", {
      type: "AWS::Events::Rule",
      properties: {
        Description: "EventRule",
        State: "ENABLED",
        EventBusName: bus.eventBusName,
        EventPattern: { "detail-type": [`interesting`] },
        Targets: [
          {
            Arn: interestingApiDestination.attrArn,
            RoleArn: invokeApiDestinationRole.roleArn,
            Id: "postToSlackChannel",
            InputPath: "$.detail",
          },
          {
            Arn: interestingTrendsLogGroup.logGroupArn,
            Id: "interestingCloudwatch",
          },
        ],
      },
    });


After all, we need to deploy the application using cdk deploy and wait till we see the green results with the API gateway URL to test it. The successful results should be something like the screenshot below.  

You need to copy or click the Url from EventBridgeDemoStack. thepullerapigatewayEndpointxyz and in a browser or HTTP tool and what you should see is something similar to the picture below (which is the Lambda response payload sent to the EventBridge for processing).    

And before an eye blink, you will find the Google Search Trends in your Slack:  

A screenshot from Slack 'google-trends' channel

And the interesting trends posted as direct messages by the Google Trends Slack App to me

More technical details

  • Push Start:  the solution can be triggered based on the user preference maybe every day at 7 am or every couple of hours using EventBridge scheduled jobs. I decided to create an API Gateway to trigger it when required.
  • Setting up Slack Integration: I constructed some Slack webhook to receive my messages. Previously, I created a Slack Channel "google-trends" and two API webhooks one to post directly to the google-trends channel and another one to post interesting/special trends to me directly only. The setup is easy according to https://slack.com/intl/en-au/help/articles/115005265063-Incoming-webhooks-for-Slack#set-up-incoming-webhooks
  • Enriching the Slack messages, I used https://api.slack.com/messaging/composing/layouts#attachments
  • In the event.putEvents request model, DetailType is required however, it is optional according to the specs (source)
  • Adding API Destinations as targets to Rules is not supported in CDK (yet) at the time of writing this blog post [Jan 2022]. If you are interested when it will be available, please keep an eye on the open PR here: https://github.com/aws/aws-cdk/pull/13729
  • I used the CDK escape hatch to work around the unsupported features in the CDK, read more about it here

EventBridge Findings

  • While SNS and SQS have a free tier with internal and external services, EventBridge free tier is only for events published by AWS services.
  • EventBridge can pass a trace header for an event only if the event came from a PutEvents request that passed the trace context. X-Ray doesn't trace events that originate from third-party partners, scheduled events, or AWS services, and these event sources don't appear on your X-Ray service map. (source)
  • AWS EventBridge event has its own schema, check the event schema here
  • AWS SDK PutEvents max is 10 events per request