Stream
Resource Overview
A Stream represents an Amazon Kinesis Stream. A Kinesis Stream can be used to collect, process, and analyze large amounts of video and data streams in real time. The stream consists of shards, which hold data records and organize data to be consumed.
Key Features of Amazon Kinesis Streams include:
- Scalable, fault-tolerant consumption of continuous data
- Quickly aggregate data into a data warehouse or map-reduce cluster
- Automatically encrypts sensitive data as it enters a stream
On one end of a Stream you have a producer service submitting data and on the other end you have a consuming service ingesting that data. A web service submitting log data is an example of a producing resource. A data warehouse set up to perform analysis on this large amount of analytics is an example of a consuming resource.
Event Subscription
Event subscription wires (solid line) visualize and configure event subscription integrations between two resources.
The following resources can be subscribed to a Streams:
- Function
Service Discovery
Service discovery wires (dashed line) provide compute resources (Function, Edge Function, Docker Task) with the permissions and environment variables required to perform actions using cloud resources within the stack. This resource is on the receiving end of a service discovery wire originating from compute resources.
The following compute resources can use a service discovery wire to access a Stream resource:
- Function
Configurable Properties
Logical ID
The unique identifier used to reference this resource in the stack template. Defining a custom Logical ID is recommended, as it allows you to quickly identify a resource and any associated sub-resources when working with your stack in AWS, or anywhere outside of the Stackery Dashboard. As a project grows, it becomes useful in quickly spotting this resource in template.yaml
or while viewing a stack in Template View mode.
The Logical ID of all sub-resources associated with this Stream will be prefixed with this value.
The identifier you provide must only contain alphanumeric characters (A-Za-z0-9) and be unique within the stack.
Default Logical ID Example: Stream2
IMPORTANT : AWS uses the Logical ID of each resource to coordinate and apply updates to the stack when deployed. On any update of a resource's logical ID (or any modification that results in one), CloudFormation will delete the currently deployed resource and create a new one in its place when the updated stack is deployed.
Number of Shards
The number of shards in the stream.
Messages are inserted into a single random shard. Each shard can send one copy of a message at a time to each resource connected to the output port. Each message is output by each shard in the order it was received.
Use Existing Kinesis Stream
When enabled, this feature provides you with a field to specify the Amazon Resource Name (ARN) of an existing Kinesis Stream to reference in your application.
You may reference an environment parameter in order to conditionally reference existing infrastructure based on environment.
IAM Permissions
When connected by a service discovery wire (dashed wire), a Function or Docker Task will add the following IAM policy to its role and gain permission to access this resource.
KinesisCrudPolicy
Grants a Function or Docker Task permission to create, read, update, and delete video and data shards from your Stream in the stack, or an existing Kinesis Stream outside of the current stack.
KinesisStreamReadPolicy
Grants a Function or Docker Task permission to read from video and data shards from your Stream in the stack, or an existing Kinesis Stream outside of the current stack.
Environment Variables
When connected by a service discovery wire (dashed wire), a Function or Docker Task will automatically populate and reference the following environment variables in order to interact with this resource.
STREAM_NAME
The Logical ID of the Stream resource.
Example: Stream2
STREAM_ARN
The Amazon Resource Name of the Kinesis Data Stream.
Example: arn:aws:kinesis:us-west-2:111222333:stream/Stream2
AWS SDK Code Example
Language-specific examples of AWS SDK calls using the environment variables discussed above.
Add a record to a Stream
// Load AWS SDK and create a new Stream object
const AWS = require("aws-sdk");
const kinesis = new AWS.Kinesis();
const streamName = process.env.STREAM_NAME // supplied by Function service-discovery wire
exports.handler = async message => {
const testRecord = "Sample Record";
// Construct parameters for the putRecord call
const params = {
Data: testRecord,
PartitionKey: '123',
StreamName: streamName
};
await kinesis.putRecord(params).promise();
console.log('Record added to ' + streamName);
}
import boto3
import os
# Create an Kinesis client
kinesis = boto3.client('kinesis')
stream_name = os.environ['STREAM_NAME'] # Supplied by Function service-discovery wire
def handler(message, context):
# Publish a sample record to the specified Kinesis stream
response = kinesis.put_record(
StreamName=stream_name,
Data='Sample Record',
PartitionKey='123'
)
return response
Related AWS Documentation
AWS Documentation: AWS::Kinesis::Stream