MongoDB Atlas v3.30.0 published on Friday, Mar 21, 2025 by Pulumi
mongodbatlas.getStreamProcessor
Explore with Pulumi AI
# Data Source: mongodbatlas.StreamProcessor
mongodbatlas.StreamProcessor describes a stream processor.
Example Usage
S
Coming soon!
Coming soon!
Coming soon!
Coming soon!
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.mongodbatlas.StreamInstance;
import com.pulumi.mongodbatlas.StreamInstanceArgs;
import com.pulumi.mongodbatlas.inputs.StreamInstanceDataProcessRegionArgs;
import com.pulumi.mongodbatlas.StreamConnection;
import com.pulumi.mongodbatlas.StreamConnectionArgs;
import com.pulumi.mongodbatlas.inputs.StreamConnectionDbRoleToExecuteArgs;
import com.pulumi.mongodbatlas.inputs.StreamConnectionAuthenticationArgs;
import com.pulumi.mongodbatlas.inputs.StreamConnectionSecurityArgs;
import com.pulumi.mongodbatlas.StreamProcessor;
import com.pulumi.mongodbatlas.StreamProcessorArgs;
import com.pulumi.mongodbatlas.inputs.StreamProcessorOptionsArgs;
import com.pulumi.mongodbatlas.inputs.StreamProcessorOptionsDlqArgs;
import com.pulumi.mongodbatlas.MongodbatlasFunctions;
import com.pulumi.mongodbatlas.inputs.GetStreamProcessorsArgs;
import com.pulumi.mongodbatlas.inputs.GetStreamProcessorArgs;
import static com.pulumi.codegen.internal.Serialization.*;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }
    public static void stack(Context ctx) {
        var example = new StreamInstance("example", StreamInstanceArgs.builder()
            .projectId(projectId)
            .instanceName("InstanceName")
            .dataProcessRegion(StreamInstanceDataProcessRegionArgs.builder()
                .region("VIRGINIA_USA")
                .cloud_provider("AWS")
                .build())
            .build());
        var example_sample = new StreamConnection("example-sample", StreamConnectionArgs.builder()
            .projectId(projectId)
            .instanceName(example.instanceName())
            .connectionName("sample_stream_solar")
            .type("Sample")
            .build());
        var example_cluster = new StreamConnection("example-cluster", StreamConnectionArgs.builder()
            .projectId(projectId)
            .instanceName(example.instanceName())
            .connectionName("ClusterConnection")
            .type("Cluster")
            .clusterName(clusterName)
            .dbRoleToExecute(StreamConnectionDbRoleToExecuteArgs.builder()
                .role("atlasAdmin")
                .type("BUILT_IN")
                .build())
            .build());
        var example_kafka = new StreamConnection("example-kafka", StreamConnectionArgs.builder()
            .projectId(projectId)
            .instanceName(example.instanceName())
            .connectionName("KafkaPlaintextConnection")
            .type("Kafka")
            .authentication(StreamConnectionAuthenticationArgs.builder()
                .mechanism("PLAIN")
                .username(kafkaUsername)
                .password(kafkaPassword)
                .build())
            .bootstrapServers("localhost:9092,localhost:9092")
            .config(Map.of("auto.offset.reset", "earliest"))
            .security(StreamConnectionSecurityArgs.builder()
                .protocol("PLAINTEXT")
                .build())
            .build());
        var stream_processor_sample_example = new StreamProcessor("stream-processor-sample-example", StreamProcessorArgs.builder()
            .projectId(projectId)
            .instanceName(example.instanceName())
            .processorName("sampleProcessorName")
            .pipeline(serializeJson(
                jsonArray(
                    jsonObject(
                        jsonProperty("$source", jsonObject(
                            jsonProperty("connectionName", mongodbatlasStreamConnection.example-sample().connectionName())
                        ))
                    ), 
                    jsonObject(
                        jsonProperty("$emit", jsonObject(
                            jsonProperty("connectionName", mongodbatlasStreamConnection.example-cluster().connectionName()),
                            jsonProperty("db", "sample"),
                            jsonProperty("coll", "solar"),
                            jsonProperty("timeseries", jsonObject(
                                jsonProperty("timeField", "_ts")
                            ))
                        ))
                    )
                )))
            .state("STARTED")
            .build());
        var stream_processor_cluster_to_kafka_example = new StreamProcessor("stream-processor-cluster-to-kafka-example", StreamProcessorArgs.builder()
            .projectId(projectId)
            .instanceName(example.instanceName())
            .processorName("clusterProcessorName")
            .pipeline(serializeJson(
                jsonArray(
                    jsonObject(
                        jsonProperty("$source", jsonObject(
                            jsonProperty("connectionName", mongodbatlasStreamConnection.example-cluster().connectionName())
                        ))
                    ), 
                    jsonObject(
                        jsonProperty("$emit", jsonObject(
                            jsonProperty("connectionName", mongodbatlasStreamConnection.example-kafka().connectionName()),
                            jsonProperty("topic", "topic_from_cluster")
                        ))
                    )
                )))
            .state("CREATED")
            .build());
        var stream_processor_kafka_to_cluster_example = new StreamProcessor("stream-processor-kafka-to-cluster-example", StreamProcessorArgs.builder()
            .projectId(projectId)
            .instanceName(example.instanceName())
            .processorName("kafkaProcessorName")
            .pipeline(serializeJson(
                jsonArray(
                    jsonObject(
                        jsonProperty("$source", jsonObject(
                            jsonProperty("connectionName", mongodbatlasStreamConnection.example-kafka().connectionName()),
                            jsonProperty("topic", "topic_source")
                        ))
                    ), 
                    jsonObject(
                        jsonProperty("$emit", jsonObject(
                            jsonProperty("connectionName", mongodbatlasStreamConnection.example-cluster().connectionName()),
                            jsonProperty("db", "kafka"),
                            jsonProperty("coll", "topic_source"),
                            jsonProperty("timeseries", jsonObject(
                                jsonProperty("timeField", "ts")
                            ))
                        ))
                    )
                )))
            .state("CREATED")
            .options(StreamProcessorOptionsArgs.builder()
                .dlq(StreamProcessorOptionsDlqArgs.builder()
                    .coll("exampleColumn")
                    .connectionName(mongodbatlasStreamConnection.example-cluster().connectionName())
                    .db("exampleDb")
                    .build())
                .build())
            .build());
        final var example-stream-processors = MongodbatlasFunctions.getStreamProcessors(GetStreamProcessorsArgs.builder()
            .projectId(projectId)
            .instanceName(example.instanceName())
            .build());
        final var example-stream-processor = MongodbatlasFunctions.getStreamProcessor(GetStreamProcessorArgs.builder()
            .projectId(projectId)
            .instanceName(example.instanceName())
            .processorName(stream_processor_sample_example.processorName())
            .build());
        ctx.export("streamProcessorsState", example_stream_processor.applyValue(example_stream_processor -> example_stream_processor.state()));
        ctx.export("streamProcessorsResults", example_stream_processors.applyValue(example_stream_processors -> example_stream_processors.results()));
    }
}
resources:
  example:
    type: mongodbatlas:StreamInstance
    properties:
      projectId: ${projectId}
      instanceName: InstanceName
      dataProcessRegion:
        region: VIRGINIA_USA
        cloud_provider: AWS
  example-sample:
    type: mongodbatlas:StreamConnection
    properties:
      projectId: ${projectId}
      instanceName: ${example.instanceName}
      connectionName: sample_stream_solar
      type: Sample
  example-cluster:
    type: mongodbatlas:StreamConnection
    properties:
      projectId: ${projectId}
      instanceName: ${example.instanceName}
      connectionName: ClusterConnection
      type: Cluster
      clusterName: ${clusterName}
      dbRoleToExecute:
        role: atlasAdmin
        type: BUILT_IN
  example-kafka:
    type: mongodbatlas:StreamConnection
    properties:
      projectId: ${projectId}
      instanceName: ${example.instanceName}
      connectionName: KafkaPlaintextConnection
      type: Kafka
      authentication:
        mechanism: PLAIN
        username: ${kafkaUsername}
        password: ${kafkaPassword}
      bootstrapServers: localhost:9092,localhost:9092
      config:
        auto.offset.reset: earliest
      security:
        protocol: PLAINTEXT
  stream-processor-sample-example:
    type: mongodbatlas:StreamProcessor
    properties:
      projectId: ${projectId}
      instanceName: ${example.instanceName}
      processorName: sampleProcessorName
      pipeline:
        fn::toJSON:
          - $source:
              connectionName: ${mongodbatlasStreamConnection"example-sample"[%!s(MISSING)].connectionName}
          - $emit:
              connectionName: ${mongodbatlasStreamConnection"example-cluster"[%!s(MISSING)].connectionName}
              db: sample
              coll: solar
              timeseries:
                timeField: _ts
      state: STARTED
  stream-processor-cluster-to-kafka-example:
    type: mongodbatlas:StreamProcessor
    properties:
      projectId: ${projectId}
      instanceName: ${example.instanceName}
      processorName: clusterProcessorName
      pipeline:
        fn::toJSON:
          - $source:
              connectionName: ${mongodbatlasStreamConnection"example-cluster"[%!s(MISSING)].connectionName}
          - $emit:
              connectionName: ${mongodbatlasStreamConnection"example-kafka"[%!s(MISSING)].connectionName}
              topic: topic_from_cluster
      state: CREATED
  stream-processor-kafka-to-cluster-example:
    type: mongodbatlas:StreamProcessor
    properties:
      projectId: ${projectId}
      instanceName: ${example.instanceName}
      processorName: kafkaProcessorName
      pipeline:
        fn::toJSON:
          - $source:
              connectionName: ${mongodbatlasStreamConnection"example-kafka"[%!s(MISSING)].connectionName}
              topic: topic_source
          - $emit:
              connectionName: ${mongodbatlasStreamConnection"example-cluster"[%!s(MISSING)].connectionName}
              db: kafka
              coll: topic_source
              timeseries:
                timeField: ts
      state: CREATED
      options:
        dlq:
          coll: exampleColumn
          connectionName: ${mongodbatlasStreamConnection"example-cluster"[%!s(MISSING)].connectionName}
          db: exampleDb
variables:
  example-stream-processors:
    fn::invoke:
      function: mongodbatlas:getStreamProcessors
      arguments:
        projectId: ${projectId}
        instanceName: ${example.instanceName}
  example-stream-processor:
    fn::invoke:
      function: mongodbatlas:getStreamProcessor
      arguments:
        projectId: ${projectId}
        instanceName: ${example.instanceName}
        processorName: ${["stream-processor-sample-example"].processorName}
outputs:
  # example making use of data sources
  streamProcessorsState: ${["example-stream-processor"].state}
  streamProcessorsResults: ${["example-stream-processors"].results}
Using getStreamProcessor
Two invocation forms are available. The direct form accepts plain arguments and either blocks until the result value is available, or returns a Promise-wrapped result. The output form accepts Input-wrapped arguments and returns an Output-wrapped result.
function getStreamProcessor(args: GetStreamProcessorArgs, opts?: InvokeOptions): Promise<GetStreamProcessorResult>
function getStreamProcessorOutput(args: GetStreamProcessorOutputArgs, opts?: InvokeOptions): Output<GetStreamProcessorResult>def get_stream_processor(instance_name: Optional[str] = None,
                         processor_name: Optional[str] = None,
                         project_id: Optional[str] = None,
                         opts: Optional[InvokeOptions] = None) -> GetStreamProcessorResult
def get_stream_processor_output(instance_name: Optional[pulumi.Input[str]] = None,
                         processor_name: Optional[pulumi.Input[str]] = None,
                         project_id: Optional[pulumi.Input[str]] = None,
                         opts: Optional[InvokeOptions] = None) -> Output[GetStreamProcessorResult]func LookupStreamProcessor(ctx *Context, args *LookupStreamProcessorArgs, opts ...InvokeOption) (*LookupStreamProcessorResult, error)
func LookupStreamProcessorOutput(ctx *Context, args *LookupStreamProcessorOutputArgs, opts ...InvokeOption) LookupStreamProcessorResultOutput> Note: This function is named LookupStreamProcessor in the Go SDK.
public static class GetStreamProcessor 
{
    public static Task<GetStreamProcessorResult> InvokeAsync(GetStreamProcessorArgs args, InvokeOptions? opts = null)
    public static Output<GetStreamProcessorResult> Invoke(GetStreamProcessorInvokeArgs args, InvokeOptions? opts = null)
}public static CompletableFuture<GetStreamProcessorResult> getStreamProcessor(GetStreamProcessorArgs args, InvokeOptions options)
public static Output<GetStreamProcessorResult> getStreamProcessor(GetStreamProcessorArgs args, InvokeOptions options)
fn::invoke:
  function: mongodbatlas:index/getStreamProcessor:getStreamProcessor
  arguments:
    # arguments dictionaryThe following arguments are supported:
- Instance
Name string - Human-readable label that identifies the stream instance.
 - Processor
Name string - Human-readable label that identifies the stream processor.
 - Project
Id string - Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
 
- Instance
Name string - Human-readable label that identifies the stream instance.
 - Processor
Name string - Human-readable label that identifies the stream processor.
 - Project
Id string - Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
 
- instance
Name String - Human-readable label that identifies the stream instance.
 - processor
Name String - Human-readable label that identifies the stream processor.
 - project
Id String - Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
 
- instance
Name string - Human-readable label that identifies the stream instance.
 - processor
Name string - Human-readable label that identifies the stream processor.
 - project
Id string - Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
 
- instance_
name str - Human-readable label that identifies the stream instance.
 - processor_
name str - Human-readable label that identifies the stream processor.
 - project_
id str - Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
 
- instance
Name String - Human-readable label that identifies the stream instance.
 - processor
Name String - Human-readable label that identifies the stream processor.
 - project
Id String - Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
 
getStreamProcessor Result
The following output properties are available:
- Id string
 - Instance
Name string - Human-readable label that identifies the stream instance.
 - Options
Get
Stream Processor Options  - Pipeline string
 - Processor
Name string - Human-readable label that identifies the stream processor.
 - Project
Id string - Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
 - State string
 - Stats string
 
- Id string
 - Instance
Name string - Human-readable label that identifies the stream instance.
 - Options
Get
Stream Processor Options  - Pipeline string
 - Processor
Name string - Human-readable label that identifies the stream processor.
 - Project
Id string - Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
 - State string
 - Stats string
 
- id String
 - instance
Name String - Human-readable label that identifies the stream instance.
 - options
Get
Stream Processor Options  - pipeline String
 - processor
Name String - Human-readable label that identifies the stream processor.
 - project
Id String - Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
 - state String
 - stats String
 
- id string
 - instance
Name string - Human-readable label that identifies the stream instance.
 - options
Get
Stream Processor Options  - pipeline string
 - processor
Name string - Human-readable label that identifies the stream processor.
 - project
Id string - Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
 - state string
 - stats string
 
- id str
 - instance_
name str - Human-readable label that identifies the stream instance.
 - options
Get
Stream Processor Options  - pipeline str
 - processor_
name str - Human-readable label that identifies the stream processor.
 - project_
id str - Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
 - state str
 - stats str
 
- id String
 - instance
Name String - Human-readable label that identifies the stream instance.
 - options Property Map
 - pipeline String
 - processor
Name String - Human-readable label that identifies the stream processor.
 - project
Id String - Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
 - state String
 - stats String
 
Supporting Types
GetStreamProcessorOptions   
- Dlq
Get
Stream Processor Options Dlq  - Dead letter queue for the stream processor. Refer to the MongoDB Atlas Docs for more information.
 
- Dlq
Get
Stream Processor Options Dlq  - Dead letter queue for the stream processor. Refer to the MongoDB Atlas Docs for more information.
 
- dlq
Get
Stream Processor Options Dlq  - Dead letter queue for the stream processor. Refer to the MongoDB Atlas Docs for more information.
 
- dlq
Get
Stream Processor Options Dlq  - Dead letter queue for the stream processor. Refer to the MongoDB Atlas Docs for more information.
 
- dlq
Get
Stream Processor Options Dlq  - Dead letter queue for the stream processor. Refer to the MongoDB Atlas Docs for more information.
 
- dlq Property Map
 - Dead letter queue for the stream processor. Refer to the MongoDB Atlas Docs for more information.
 
GetStreamProcessorOptionsDlq    
- Coll string
 - Name of the collection to use for the DLQ.
 - Connection
Name string - Name of the connection to write DLQ messages to. Must be an Atlas connection.
 - Db string
 - Name of the database to use for the DLQ.
 
- Coll string
 - Name of the collection to use for the DLQ.
 - Connection
Name string - Name of the connection to write DLQ messages to. Must be an Atlas connection.
 - Db string
 - Name of the database to use for the DLQ.
 
- coll String
 - Name of the collection to use for the DLQ.
 - connection
Name String - Name of the connection to write DLQ messages to. Must be an Atlas connection.
 - db String
 - Name of the database to use for the DLQ.
 
- coll string
 - Name of the collection to use for the DLQ.
 - connection
Name string - Name of the connection to write DLQ messages to. Must be an Atlas connection.
 - db string
 - Name of the database to use for the DLQ.
 
- coll str
 - Name of the collection to use for the DLQ.
 - connection_
name str - Name of the connection to write DLQ messages to. Must be an Atlas connection.
 - db str
 - Name of the database to use for the DLQ.
 
- coll String
 - Name of the collection to use for the DLQ.
 - connection
Name String - Name of the connection to write DLQ messages to. Must be an Atlas connection.
 - db String
 - Name of the database to use for the DLQ.
 
Package Details
- Repository
 - MongoDB Atlas pulumi/pulumi-mongodbatlas
 - License
 - Apache-2.0
 - Notes
 - This Pulumi package is based on the 
mongodbatlasTerraform Provider.