This is a fork of the official Apache Flink Kinesis Connector.
The fork backports the following features to older versions of Flink:
- Enhanced Fanout (EFO) support for Flink 1.8/1.11. For the original contributions see:
- Support for KDS data sources and sinks in Table API and SQL for Flink 1.11. For the original contributions see:
Both features are already available in the official Apache Flink connector for Flink 1.12.
You no longer need to build the Kinesis Connector from source. Add the following dependency to your project to start using the connector.
<dependency>
<groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-connector-flink</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-connector-flink</artifactId>
<version>2.4.1</version>
</dependency>
Refer to the official Apache Flink documentation for more information on configuring the connector:
Note that Flink 1.11 does not support the "Available Metadata" functionality from upstream Table/SQL connector documentation. If you want to expose KDS metadata fields in your table definitions, consider upgrading to Flink 1.12 or higher and using the KDS connector from the upstream repository.
The amazon-kinesis-sql-connector-flink
module can be used to build a fully shaded connector that can be used with Flink SQL client.
If you are migrating from the Apache Flink Kinesis Connector, you should perform the following steps:
- Replace the dependency in your application
pom.xml
- Migrate the prefix of packages for referenced classes
- From:
org.apache.flink.streaming.connectors.kinesis
- To:
software.amazon.kinesis.connectors.flink
- From:
For example
com.amazonaws.services.kinesisanalytics.flink.connectors.FlinkKinesisConsumer
becomessoftware.amazon.kinesis.connectors.flink.FlinkKinesisConsumer
.
This connector is compatible with Flink 1.11.
For a version of this connector that is compatible with Flink 1.8 use the 1.x
release line.
Other versions of Flink may work, but are not officially supported.
We will support this connector until KDA adds support for Apache Flink 1.12. Beyond this, we will not maintain patching or security for this repo. The Apache Flink Kinesis connector should be used instead of this library where possible.
Two additional properties are required to enable EFO on your FlinkKinesisConsumer
:
RECORD_PUBLISHER_TYPE
: Set this parameter to EFO for your application to use an EFO consumer to access the Kinesis Data Stream data.EFO_CONSUMER_NAME
: Set this parameter to a string value that is unique among the consumers of this stream. Re-using a consumer name in the same Kinesis Data Stream will cause the previous consumer using that name to be terminated.
To configure a FlinkKinesisConsumer to use EFO, add the following parameters to the consumer:
consumerConfig.putIfAbsent(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, "EFO");
consumerConfig.putIfAbsent(ConsumerConfigConstants.EFO_CONSUMER_NAME, "efo-consumer");
Note the additional IAM permissions required to use EFO:
{
"Sid": "AllStreams",
"Effect": "Allow",
"Action": [
"kinesis:ListShards",
"kinesis:ListStreamConsumers"
],
"Resource": "arn:aws:kinesis:<region>:<account>:stream/*"
},
{
"Sid": "Stream",
"Effect": "Allow",
"Action": [
"kinesis:DescribeStreamSummary",
"kinesis:RegisterStreamConsumer"
],
"Resource": "arn:aws:kinesis:<region>:<account>:stream/<stream-name>"
},
{
"Sid": "Consumer",
"Effect": "Allow",
"Action": [
"kinesis:DescribeStreamConsumer",
"kinesis:SubscribeToShard",
"kinesis:DeregisterStreamConsumer"
],
"Resource": [
"arn:aws:kinesis:<region>:<account>:stream/<stream-name>/consumer/<consumer-name>",
"arn:aws:kinesis:<region>:<account>:stream/<stream-name>/consumer/<consumer-name>:*"
]
}
For more information refer to the official EFO documentation for the KDS connector.
See CONTRIBUTING for more information.
This project is licensed under the Apache-2.0 License.