Atlas Streaming Processor

atlas streaming processor

What is SP?

The Atlas Streaming Processor is a kind of aggregation pipeline that operates on an isolated machine, separate from your Atlas cluster. It can read events from Kafka topics or MongoDB Change Streams, process the data, and then either store the results in a MongoDB database or emit another event to Kafka. It’s important to note that the Streaming Processor (SP) is not an event store; it functions solely as an event processor.

What are the main components of SP?

Connections Registry

The Connection Registry (Key Store) is the component of the Streaming Processor (SP) responsible for storing all the connections used within the SP instance. This is where you configure connections to all data sources, such as Kafka topics or MongoDB Change Streams.

Stream Processors

The Stream Processor is where we create and configure our processing pipeline. A processor consists of a series of specialized aggregation stages that define the flow of events from the source to the destination. In this pipeline, we can add validations, filters, windowing (to process events within a specific time frame), and more.

List of available stages

Aggregation Pipeline StagePerforms a left outer join to a specified collection to filter in documents from the “joined” collection for processing.
This version of the existing $lookup stage requires that you specify an Atlas collection in the Connection Registry as the value for the from field.
$sourceSpecifies a streaming data source to consume messages from.
$validateValidates the documents of a stream against a user-defined schema.
$lookupPerforms a left outer join to a specified collection to filter in documents from the “joined” collection for processing.
This version of the existing $lookup stage requires that you specify a Atlas collection in the Connection Registry as the value for the from field.
$hoppingWindowAssigns documents from a stream to windows with user-defined durations and intervals between start times.
$tumblingWindowAssigns documents from a stream to non-overlapping, continuous windows with user-defined durations.
$emitSpecifies a stream or time series collection in the connection registry to emit messages to.
$mergeA version of the existing $merge stage where the value of the connectionName field must always be the name of a remote collection in the Connection Registry.

The processor runs in a worker, which provides the RAM and CPUs necessary to run your stream processors

When you define a stream processor, it becomes available only for the stream processing instance (SPI) in which you define it. Each worker can host up to four running stream processors.

Atlas Stream Processing automatically scales your stream processing instance up as you start stream processors by provisioning workers as needed. You can deprovision a worker by stopping all stream processors on it. Atlas Stream Processing always prefers to assign a stream processor to an existing worker over provisioning new workers.

Flow Control

Atlas Stream Processing uses something called “checkpoint documents” to keep track of where it is in the process of handling data. Think of these checkpoints like bookmarks in a book—they help the system remember where it left off.

If the system gets interrupted, it can go back to the last bookmark (checkpoint) and pick up right where it left off, using the information in these records to continue smoothly.

Atlas Stream Processing can use a special holding area for errors called a “dead letter queue” (DLQ). Imagine the DLQ as a safety net for documents that the system has trouble processing. 

If something goes wrong and the system can’t handle the event properly, it saves that document in the DLQ along with information about what went wrong. You can set up a specific place (a collection) in your database to store these problem documents when you define your stream processor.

How to create an SP instance?

First, we need to create a new SP instance in our Atlas project. This process is straightforward: simply navigate to the Stream Processing section in your project and create a new instance, or alternatively, you can use the Atlas CLI or Atlas Administration API to do this.

Atlas UIAtlas CLI
 atlas streams instances create <sp-name> \    –provider “AWS” \    –region “VIRGINIA_USA” \    –tier “SP30” \    –projectId <project-id>
Atlas Administration API
POST https://cloud.mongodb.com/api/atlas/v2/groups/{project-id}/streams{  “dataProcessRegion”: {    “cloudProvider”: “AWS”,    “region”: “VIRGINIA_USA”  },  “name”: “<sp-name>”,  “streamConfig”: {    “tier”: “SP30”  }}

Once the SP instance is created, you can click on “Connect” in the Atlas web UI to retrieve the connection string. You can then use this connection string with mongosh to connect to your instance. The user must have the “Project Stream Processing Owner” or “Project Owner” role to be authorized.

Use Cases

CDC for Data Analysis

In this scenario, we will use the Stream Processor (SP) to listen to events from a Kafka topic and store the results in a MongoDB database.

1. Setup connections

First, we need to establish connections to both Kafka and MongoDB in our Stream Processor instance. In this example, we will use the Atlas CLI to create these connections.

Kafka ConnectionAtlas Cluster Connection
atlas streams connection create myKafka \    –instance test \    –file kafkaConfig.json \    –projectId <project-id>atlas streams connection create myAtlasCluster \    –instance test \    –file clusterConfig.json \    –projectId <project-id>
// kafkaConfig.json
{    “name”: “myKafka”,     “type”: “Kafka”,     “authentication”: {        “mechanism”: “PLAIN”,        “password”: “”,        “username”: “”    },     “bootstrapServers”: “”,     “security”: { “protocol”: “SSL” },    “networking”: {        “access”: { “type”: “PUBLIC” }    }}
// clusterConfig.json
{    “name”: “myAtlasCluster”,     “type”: “Cluster”,     “clusterName”: “Cluster1”,     “dbRoleToExecute”: {        “role”: “Atlas admin”,        “type”: “BUILT_IN”    }}

To check all available connections, we can run the “connection list” command in Atlas CLI

2. Create the processor

There are two ways to create a processor:

  • Standard: In this approach, you create the processor, which can then be run, stopped, or dropped as needed. This process can continue running in the background until it is manually stopped, without requiring an active connection to the mongoshell.
  • Interactively: This method is typically used during the development of a processor. The interactive mode allows for real-time interaction and has the following characteristics:
    • Write output and dead letter queue documents to the shell
    • Begin running immediately upon creation
    • Run for either 10 minutes or until the user stops them
    • Don’t persist after stopping

In this step, we will create a non-interactive processor and define all stages of the pipeline, from the source to the sink. Run the command below in the Mongo shell:

				
					var source = { 
    $source: { 
        connectionName: 'myKafka', 
        topic: 'customer_purchases'
    } 
}

var validate = { 
    $validate: { 
        validator: {
            $and: [
                {
                    $expr: {
                        $ne: ["$customer_id", 0]
                    }
                },
                {
                    $jsonSchema: {
                        required: ['items', 'customer_id', 'purchase_time']
                    }
                }
            ]
        },
        validationAction: 'dlq'
    } 
}

var sink = { 
    $merge: { 
        into: {
            connectionName: "myAtlasCluster",
            db: "asp",
            coll: "customer_purchases"
        },
        whenMatched: "replace",
        whenNotMatched: "insert"
    } 
}

var deadLetter = {
    dlq: {
        connectionName: "myAtlasCluster",
        db: "asp",
        coll: "customer_purchases_dlq"
    }
}

var pipeline = [source, validate, sink]
sp.createStreamProcessor('customer_purchases_log', pipeline, deadLetter)
sp.customer_purchases_log.start()
				
			

The processor above is designed to handle events from a Kafka topic and process them according to specific validation rules before storing them in a MongoDB collection.

Source:

a. The processor listens to the customer_purchases topic on a Kafka connection named myKafka.

Validation:

a. The processor validates each event using two criteria:

• The customer_id field must not be equal to 0.

• The event must contain the required fields: items, customer_id, and purchase_date.

b. If an event fails validation, it is sent to a Dead Letter Queue (DLQ).

Sink (SPI can use only clusters in the same project as sources or sinks):

a. Validated events are stored in the customer_purchases collection within the asp database on a MongoDB cluster connection named myAtlasCluster.

b. If an event with a matching _id already exists, it will be replaced. If no match is found, a new document will be inserted.

Dead Letter Queue (DLQ):

a. Events that fail validation are stored in a separate collection named customer_purchases_dlq within the same asp database on the myAtlasCluster connection.

To test our application, let’s start by creating a fake data producer in NodeJS. Next, we’ll write the code to connect to Kafka and send messages. Finally, we’ll implement a script to periodically send these messages to Kafka.

package.json

				
					"dependencies": {
    "@faker-js/faker": "^8.4.1",
    "dotenv": "^16.4.5",
    "kafkajs": "^2.2.4"
  }
				
			

myFaker.js

				
					import { faker } from '@faker-js/faker';

const generateItem = () => {
    return {
        item_id: faker.string.alphanumeric(5).toUpperCase(),
        quantity: faker.number.int({ min: 1, max: 5 }),
        price: parseFloat(faker.commerce.price({ min: 10, max: 100 })),
    }
};

export const generateTransaction = () => {

    const numberItems = Math.floor(Math.random() * 5) + 1;
    const items = Array.from({ length: numberItems }, generateItem);
    const total_amount = items.reduce((total, item) => total + item.quantity * item.price, 0);

    return {
        transaction_id: faker.string.numeric(5),
        customer_id: faker.string.numeric(5),
        store_id: faker.string.numeric(3),
        purchase_time: faker.date.recent().toISOString(),
        items: items,
        total_amount: parseFloat(total_amount.toFixed(2)),
        payment_method: faker.helpers.arrayElement(['credit_card', 'debit_card', 'cash', 'paypal']),
    };
}

				
			

myKafka.js

				
					import { Kafka } from 'kafkajs';
import 'dotenv/config'

const kafka = new Kafka({
  clientId: 'customer-producer',
  brokers: [process.env.KAFKA_BROKER],
  ssl: true, 
  sasl: {
    mechanism: 'plain',
    username: process.env.KAFKA_API_KEY,
    password: process.env.KAFKA_API_SECRET,
  },
});

const producer = kafka.producer();
let isConnected = false;

const connectProducer = async () => {
  if (!isConnected) {
    await producer.connect();
    isConnected = true;
  }
};

const disconnectProducer = async () => {
  if (isConnected) {
    await producer.disconnect();
    isConnected = false;
  }
};

export const send = async (message) => {
  try {
    await connectProducer();

    await producer.send({
      topic: process.env.KAFKA_TOPIC,
      messages: [
        { value: message },
      ],
    });

    console.log('Message sent successfully!');
  } catch (error) {
    console.error('Error:', error);
  }
};

process.on('exit', disconnectProducer);
process.on('SIGINT', disconnectProducer);
process.on('SIGTERM', disconnectProducer);

				
			

main.js

				
					import { send }  from './myKafka.js';
import { generateTransaction } from './myFaker.js';

let isRunning = true;

const sendTransaction = async () => {
    if (!isRunning) return;
    const transaction = generateTransaction();
    console.log(transaction);
    await send(JSON.stringify(transaction));
    console.log('Transaction sent: ', transaction.transaction_id);

    const randomInterval = Math.floor(Math.random() * 1700) + 300;
    setTimeout(sendTransaction, randomInterval);
};

const run = async () => {
    sendTransaction().catch(console.error);
};

process.on('SIGINT', () => {
    console.log('\nGracefully shutting down from SIGINT (Ctrl+C)');
    isRunning = false;
    process.exit(0);
});

run().catch(console.error);

				
			

Results

Faker producer output 
atlas streaming processor results

Documents stored on Atlas Cluster

atlas streaming results

3. Useful commands

3.1. View Statistics of a Stream Processor

To retrieve a summary document of the current status of an existing stream processor using mongosh, you can use the sp.<streamprocessor>.stats() method. The syntax is as follows:

				
					// command
sp.<streamprocessor>.stats({options: {verbose: false, scale: 1024}})

// output example
{
  ok: 1,
  ns: '<instance-name>',
  stats: {
    name: '<streamprocessor>',
    status: 'running',
    scaleFactor: Long("1"),
    inputMessageCount: Long("706028"),
    inputMessageSize: 958685236,
    outputMessageCount: Long("46322"),
    outputMessageSize: 85666332,
    dlqMessageCount: Long("0"),
    dlqMessageSize: Long("0"),
    stateSize: Long("2747968"),
    watermark: ISODate("2023-12-14T14:35:32.417Z"),
    ok: 1
  },
}

				
			

The combined stats.stateSize of all stream processing instances can’t exceed 80% of the RAM available for a worker in the same SPI tier. For example, the maximum size of a stream processor in the SP30 tier which has 8GB of RAM per worker, is 6.4GB. If the stats.stateSize of any of your stream processors is approaching 80% of the RAM available for a worker in the same SPI tier, move up to the next SPI tier.

When the 80% RAM threshold has been crossed, all stream processors fail with a stream processing instance out of memeory error.

3.2. Sample results

The sp.<streamprocessor>.sample() command outputs arrays of sampled results from the specified, currently running stream processor to STDOUT. The command continues to run until you either cancel it using CTRL-C or until the cumulative size of the returned samples reaches 40 MB.

				
					// command
sp.<streamprocessor>.sample()
				
			
Scroll to Top