What is SP?
The Atlas Streaming Processor is a kind of aggregation pipeline that operates on an isolated machine, separate from your Atlas cluster. It can read events from Kafka topics or MongoDB Change Streams, process the data, and then either store the results in a MongoDB database or emit another event to Kafka. It’s important to note that the Streaming Processor (SP) is not an event store; it functions solely as an event processor.
What are the main components of SP?
Connections Registry
The Connection Registry (Key Store) is the component of the Streaming Processor (SP) responsible for storing all the connections used within the SP instance. This is where you configure connections to all data sources, such as Kafka topics or MongoDB Change Streams.
Stream Processors
The Stream Processor is where we create and configure our processing pipeline. A processor consists of a series of specialized aggregation stages that define the flow of events from the source to the destination. In this pipeline, we can add validations, filters, windowing (to process events within a specific time frame), and more.
List of available stages
| Aggregation Pipeline Stage | Performs a left outer join to a specified collection to filter in documents from the “joined” collection for processing. This version of the existing $lookup stage requires that you specify an Atlas collection in the Connection Registry as the value for the from field. |
| $source | Specifies a streaming data source to consume messages from. |
| $validate | Validates the documents of a stream against a user-defined schema. |
| $lookup | Performs a left outer join to a specified collection to filter in documents from the “joined” collection for processing. This version of the existing $lookup stage requires that you specify a Atlas collection in the Connection Registry as the value for the from field. |
| $hoppingWindow | Assigns documents from a stream to windows with user-defined durations and intervals between start times. |
| $tumblingWindow | Assigns documents from a stream to non-overlapping, continuous windows with user-defined durations. |
| $emit | Specifies a stream or time series collection in the connection registry to emit messages to. |
| $merge | A version of the existing $merge stage where the value of the connectionName field must always be the name of a remote collection in the Connection Registry. |
The processor runs in a worker, which provides the RAM and CPUs necessary to run your stream processors
When you define a stream processor, it becomes available only for the stream processing instance (SPI) in which you define it. Each worker can host up to four running stream processors.
Atlas Stream Processing automatically scales your stream processing instance up as you start stream processors by provisioning workers as needed. You can deprovision a worker by stopping all stream processors on it. Atlas Stream Processing always prefers to assign a stream processor to an existing worker over provisioning new workers.
Flow Control
Atlas Stream Processing uses something called “checkpoint documents” to keep track of where it is in the process of handling data. Think of these checkpoints like bookmarks in a book—they help the system remember where it left off.
If the system gets interrupted, it can go back to the last bookmark (checkpoint) and pick up right where it left off, using the information in these records to continue smoothly.
Atlas Stream Processing can use a special holding area for errors called a “dead letter queue” (DLQ). Imagine the DLQ as a safety net for documents that the system has trouble processing.
If something goes wrong and the system can’t handle the event properly, it saves that document in the DLQ along with information about what went wrong. You can set up a specific place (a collection) in your database to store these problem documents when you define your stream processor.
How to create an SP instance?
First, we need to create a new SP instance in our Atlas project. This process is straightforward: simply navigate to the Stream Processing section in your project and create a new instance, or alternatively, you can use the Atlas CLI or Atlas Administration API to do this.
| Atlas UI | Atlas CLI |
| atlas streams instances create <sp-name> \ –provider “AWS” \ –region “VIRGINIA_USA” \ –tier “SP30” \ –projectId <project-id> | |
| Atlas Administration API | |
| POST https://cloud.mongodb.com/api/atlas/v2/groups/{project-id}/streams{ “dataProcessRegion”: { “cloudProvider”: “AWS”, “region”: “VIRGINIA_USA” }, “name”: “<sp-name>”, “streamConfig”: { “tier”: “SP30” }} | |
Once the SP instance is created, you can click on “Connect” in the Atlas web UI to retrieve the connection string. You can then use this connection string with mongosh to connect to your instance. The user must have the “Project Stream Processing Owner” or “Project Owner” role to be authorized.
Use Cases
CDC for Data Analysis
In this scenario, we will use the Stream Processor (SP) to listen to events from a Kafka topic and store the results in a MongoDB database.
1. Setup connections
First, we need to establish connections to both Kafka and MongoDB in our Stream Processor instance. In this example, we will use the Atlas CLI to create these connections.
| Kafka Connection | Atlas Cluster Connection |
| atlas streams connection create myKafka \ –instance test \ –file kafkaConfig.json \ –projectId <project-id> | atlas streams connection create myAtlasCluster \ –instance test \ –file clusterConfig.json \ –projectId <project-id> |
| // kafkaConfig.json { “name”: “myKafka”, “type”: “Kafka”, “authentication”: { “mechanism”: “PLAIN”, “password”: “”, “username”: “” }, “bootstrapServers”: “”, “security”: { “protocol”: “SSL” }, “networking”: { “access”: { “type”: “PUBLIC” } }} | // clusterConfig.json { “name”: “myAtlasCluster”, “type”: “Cluster”, “clusterName”: “Cluster1”, “dbRoleToExecute”: { “role”: “Atlas admin”, “type”: “BUILT_IN” }} |
To check all available connections, we can run the “connection list” command in Atlas CLI
2. Create the processor
There are two ways to create a processor:
- Standard: In this approach, you create the processor, which can then be run, stopped, or dropped as needed. This process can continue running in the background until it is manually stopped, without requiring an active connection to the mongoshell.
- Interactively: This method is typically used during the development of a processor. The interactive mode allows for real-time interaction and has the following characteristics:
- Write output and dead letter queue documents to the shell
- Begin running immediately upon creation
- Run for either 10 minutes or until the user stops them
- Don’t persist after stopping
In this step, we will create a non-interactive processor and define all stages of the pipeline, from the source to the sink. Run the command below in the Mongo shell:


