Demistify AWS IAM permissions required for AWS MSK Connect
Demystifying AWS MSK Connect IAM permissions
MSK Connect requires a bit of well-defined IAM (Identity and Access Management) permissions to make it successful. I’m here to help navigate your way through the required permissions to utilize AWS MSK Connect!
Table of Contents
Preamble
We have a few assumptions going into this blog post..
- You have a functioning AWS account, that utilizes a region that supports
AWS MSK
- You understand what
AWS MSK
is and howAWS MSK Connect
interacts with it - You have a base understanding of AWS IAM and how service principles, grants and similar function
- You’re ready to learn and ask questions, because it took me a few times to get this right (and why I’m sharing it with the community!)
Where to start?
-
Create a Amazon S3 bucket to support the
AWS MSK Connect
resources (see below for options) -
Deploy an AWS MSK cluster (see below for options)
Deploy an AWS MSK Cluster (AWS Developer Guide HashiCorp Terraform Resource HashiCorp Terraform Module (provided by cloudposse) AWS CDK Terraform Construct
Topic Creation and MSK Connect Custom Plugin
- Create two Kafka topics in the MSK cluster:
source_topic
andtarget_topic
- Download and create the MSK Connect Custom Plugin (Lenses AWS S3 Connector) accordingly:
- Download it (link above)
- Upload it to a private, Amazon S3 bucket of your choosing that you have
S3:GetObject
access to (likely created previously in this guide) - Browse to: Amazon MSK > Custom plugins >
Create custom plugin
- Specify the
S3 URI - Custom plugin object
- Define a name for the plugin, such as kafka-connect-s3
- Add an optional description
- Select
Create custom plugin
IAM
- Create an IAM role and policies with the following:
-
A
trust relationship
containing:{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "kafkaconnect.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }
-
-
An
IAM policy
with the following:{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafkaconnect:*", "ec2:CreateNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:DescribeSecurityGroups", "logs:CreateLogDelivery", "logs:GetLogDelivery", "logs:DeleteLogDelivery", "logs:ListLogDeliveries", "logs:PutResourcePolicy", "logs:DescribeResourcePolicies", "logs:DescribeLogGroups" ], "Resource": "*" }, { "Effect": "Allow", "Action": "iam:CreateServiceLinkedRole", "Resource": "arn:aws:iam::*:role/aws-service-role/kafkaconnect.amazonaws.com/AWSServiceRoleForKafkaConnect*", "Condition": { "StringLike": { "iam:AWSServiceName": "kafkaconnect.amazonaws.com" } } }, { "Effect": "Allow", "Action": [ "iam:AttachRolePolicy", "iam:PutRolePolicy" ], "Resource": "arn:aws:iam::*:role/aws-service-role/kafkaconnect.amazonaws.com/AWSServiceRoleForKafkaConnect*" }, { "Effect": "Allow", "Action": "iam:CreateServiceLinkedRole", "Resource": "arn:aws:iam::*:role/aws-service-role/delivery.logs.amazonaws.com/AWSServiceRoleForLogDelivery*", "Condition": { "StringLike": { "iam:AWSServiceName": "delivery.logs.amazonaws.com" } } }, { "Effect": "Allow", "Action": "s3:*", "Resource": [ "arn:aws:s3:::example-s3-bucket/*", "arn:aws:s3:::example-s3-bucket" ] }, { "Effect": "Allow", "Action": "iam:PassRole", "Resource": "arn:aws:iam::XXXXXXXXXXX:role/aws-service-role/kafkaconnect.amazonaws.com/AWSServiceRoleForKafkaConnect" }, { "Effect": "Allow", "Action": "s3:ListAllMyBuckets", "Resource": [ "arn:aws:s3:::example-s3-bucket/*", "arn:aws:s3:::example-s3-bucket" ] }, { "Effect": "Allow", "Action": "firehose:TagDeliveryStream", "Resource": "arn:aws:s3:::example-s3-bucket/*" }, { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:AlterCluster", "kafka-cluster:DescribeCluster" ], "Resource": [ "arn:aws:kafka:uYOURREGION:XXXXXXXXX:cluster/msk-cluster-name/UUID" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:*Topic*", "kafka-cluster:WriteData", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:us-east-1:XXXXXXXXX:topic/msk-cluster-name/*" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:AlterGroup", "kafka-cluster:DescribeGroup" ], "Resource": [ "arn:aws:kafka:us-east-1:XXXXXXXXX:group/msk-cluster-name/*" ] } ] }
Ensure you’re replacing any MSK Cluster ARN
’s and Amazon S3 bucket names accordingly.
Creation of MSK Connect
-
Execute the creation of the MSK Connect plugin:
aws kafkaconnect create-connector \ --capacity "autoScaling={maxWorkerCount=2,mcuCount=1,minWorkerCount=1,scaleInPolicy={cpuUtilizationPercentage=10},scaleOutPolicy={cpuUtilizationPercentage=80}}" \ --connector-configuration \ "connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector, \ key.converter.schemas.enable=false, \ connect.s3.kcql=INSERT INTO example-s3-bucket:msk_cluster_dump SELECT * FROM EXAMPLE_MSK_TOPIC STOREAS \`JSON\` WITHPARTITIONER=KeysAndValues WITH_FLUSH_COUNT = 5, \ aws.region=YOURREGION, \ tasks.max=2, \ topics=EXAMPLE_MSK_TOPIC, \ schema.enable=false, \ errors.log.enable=true, \ value.converter=org.apache.kafka.connect.storage.StringConverter, \ key.converter=org.apache.kafka.connect.storage.StringConverter " \ --connector-name "backup-msk-to-s3-v1" \ --kafka-cluster '{"apacheKafkaCluster": {"bootstrapServers": "b-1.example-cluster.gw00wb.c7.kafka.YOURREGION.amazonaws.com:9098,b-3.example-cluster.gw00wb.c7.kafka.YOURREGION.amazonaws.com:9098,b-1.example-cluster.gw00wb.c7.kafka.YOURREGION.amazonaws.com:9098","vpc": {"securityGroups": [ "sg-XXX" ],"subnets": [ "subnet-XXX","subnet-XXX","subnet-XXX" ]}}}' \ --kafka-cluster-client-authentication "authenticationType=IAM" \ --kafka-cluster-encryption-in-transit "encryptionType=TLS" \ --kafka-connect-version "2.7.1" \ --plugins "customPlugin={customPluginArn="arn:aws:kafkaconnect:YOURREGION:XXXXXXXXXXX:custom-plugin/msk-connect/REPLACEME",revision=1}" \ --service-execution-role-arn "arn:aws:iam::XXXXXXXXXXX:role/msk-example-role01" \ --workerLogDelivery={cloudWatchLogs={enabled=boolean,logGroup="arn:aws:logs:YOURREGION:XXXXXXXXXXX:log-group:example-msk-loggroup:*"}}
Notable points above to replace
:
- Replace
example-s3-bucket
with your Amazon S3 bucket you created in which to dump topic messages to\from - Replace
YOURREGION
with your region, such as us-east-1 - Replace
EXAMPLE_MSK_TOPIC
with your MSK topic name - Define your connector name
- Define your MSK cluster broker’s; this can be found on the MSK Cluster overview page or by retrieving it with
aws kafka describe-cluster \ --cluster-arn CLUSTER_ARN
- Replace security groups (
sg-XXX
) that are used with one that can communicate with the MSK cluster - Replace the
subnet-XXX
values with where the MSK cluster resides - Define the
service-execution-role-arn
to be the AWS IAM role you defined in earlier - Set your workerLogDelivery ARN accordingly (replace
example-msk-loggroup
with your preferred CloudWatch log group to dump logs to
- You should see the
AWS MSK Connect
connector creating (utilize the AWS MSK Connect AWS CLI to demonstrate this
Deployment via AWS-CDK v2.x
This is an experimental deployment, that is rendered via former2
and provided for additional context:
Learn more, here!
from aws_cdk import (
aws_iam as iam,
aws_kafkaconnect as kafkaconnect,
aws_s3 as s3,
aws_logs as logs,
core as cdk
)
class MyStack(cdk.Stack):
def __init__(self, scope: cdk.Construct, id: str, **kwargs) -> None:
super().__init__(scope, id, **kwargs)
iammanagedpolicy = iam.CfnManagedPolicy(
self,
"IAMManagedPolicy",
managed_policy_name="msk-test-connect",
path="/",
policy_document='''
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kafkaconnect:*",
"ec2:CreateNetworkInterface",
"ec2:DescribeSubnets",
"ec2:DescribeVpcs",
"ec2:DescribeSecurityGroups",
"logs:CreateLogDelivery",
"logs:GetLogDelivery",
"logs:DeleteLogDelivery",
"logs:ListLogDeliveries",
"logs:PutResourcePolicy",
"logs:DescribeResourcePolicies",
"logs:DescribeLogGroups"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": "iam:CreateServiceLinkedRole",
"Resource": "arn:aws:iam::*:role/aws-service-role/kafkaconnect.amazonaws.com/AWSServiceRoleForKafkaConnect*",
"Condition": {
"StringLike": {
"iam:AWSServiceName": "kafkaconnect.amazonaws.com"
}
}
},
{
"Effect": "Allow",
"Action": [
"iam:AttachRolePolicy",
"iam:PutRolePolicy"
],
"Resource": "arn:aws:iam::*:role/aws-service-role/kafkaconnect.amazonaws.com/AWSServiceRoleForKafkaConnect*"
},
{
"Effect": "Allow",
"Action": "iam:CreateServiceLinkedRole",
"Resource": "arn:aws:iam::*:role/aws-service-role/delivery.logs.amazonaws.com/AWSServiceRoleForLogDelivery*",
"Condition": {
"StringLike": {
"iam:AWSServiceName": "delivery.logs.amazonaws.com"
}
}
},
{
"Effect": "Allow",
"Action": "s3:*",
"Resource": [
"arn:aws:s3:::example-s3-bucket/*",
"arn:aws:s3:::example-s3-bucket"
]
},
{
"Effect": "Allow",
"Action": "iam:PassRole",
"Resource": "arn:aws:iam::XXXXXXXXXXX:role/aws-service-role/kafkaconnect.amazonaws.com/AWSServiceRoleForKafkaConnect"
},
{
"Effect": "Allow",
"Action": "s3:ListAllMyBuckets",
"Resource": [
"arn:aws:s3:::example-s3-bucket/*",
"arn:aws:s3:::example-s3-bucket"
]
},
{
"Effect": "Allow",
"Action": "firehose:TagDeliveryStream",
"Resource": "arn:aws:s3:::example-s3-bucket/*"
}
]
}
'''
)
kafkaconnectconnector = kafkaconnect.CfnConnector(
self,
"KafkaConnectConnector",
capacity={
"auto_scaling": {
"max_worker_count": 2,
"mcu_count": 1,
"min_worker_count": 1,
"scale_in_policy": {
"cpu_utilization_percentage": 10
},
"scale_out_policy": {
"cpu_utilization_percentage": 80
}
}
},
connector_configuration={
"connector.class": "io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector",
"key.converter.schemas.enable": "false",
"connect.s3.kcql": "INSERT INTO example-s3-bucket:msk_cluster_dump SELECT * FROM SCHEDULED_APPOINTMENT_NOTIFICATION STOREAS `JSON` WITHPARTITIONER=KeysAndValues WITH_FLUSH_COUNT = 5",
"aws.region": "YOURREGION",
"tasks.max": "2",
"topics": "SCHEDULED_APPOINTMENT_NOTIFICATION",
"schema.enable": "false",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"errors.log.enable": "true",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
},
connector_name="backup-msk-to-s3-v1",
kafka_cluster={
"apache_kafka_cluster": {
"bootstrap_servers": "b-2.YOURCLUSTERNAME.XXXX.REPLACE.kafka.YOURREGION.amazonaws.com:9098,b-3.YOURCLUSTERNAME.XXXX.REPLACE.kafka.YOURREGION.amazonaws.com:9098,b-1.YOURCLUSTERNAME.XXXX.REPLACE.kafka.YOURREGION.amazonaws.com:9098",
"vpc": {
"security_groups": [
"sg-0f2f80cd8cf9bee0b"
],
"subnets": [
"subnet-071bd02cc078c08f1",
"subnet-02c50c2901e0fbde9",
"subnet-06e592c3ad621a6cb"
]
}
}
},
kafka_cluster_client_authentication={
"authentication_type": "IAM"
},
kafka_cluster_encryption_in_transit={
"encryption_type": "TLS"
},
kafka_connect_version="2.7.1",
plugins=[
{
"custom_plugin": {
"custom_plugin_arn": "arn:aws:kafkaconnect:YOURREGION:XXXXXXXXXXX:custom-plugin/msk-connect/CLUSTERUUIDREPLACEME",
"revision": 1
}
}
],
service_execution_role_arn=iamrole.attr_arn
)
s3bucket = s3.CfnBucket(
self,
"S3Bucket",
bucket_name="example-s3-bucket",
bucket_encryption={
"server_side_encryption_configuration": [
{
"server_side_encryption_by_default": {
"s_s_e_algorithm": "AES256"
},
"bucket_key_enabled": False
}
]
},
logging_configuration={
"destination_bucket_name": "XXXXXXXXXXX-s3-accesslogs",
"log_file_prefix": "example-s3-bucket-access-logs"
},
versioning_configuration={
"status": "Enabled"
}
)
s3bucketpolicy = s3.CfnBucketPolicy(
self,
"S3BucketPolicy",
bucket="XXXXXXXXXXX-s3-accesslogs",
policy_document={
"version": "2012-10-17",
"statement": [
{
"effect": "Deny",
"principal": "*",
"action": "*",
"resource": "arn:aws:s3:::XXXXXXXXXXX-s3-accesslogs/*",
"condition": {
"bool": {
"aws:_secure_transport": "false"
}
}
}
]
}
)
s3bucketpolicy2 = s3.CfnBucketPolicy(
self,
"S3BucketPolicy2",
bucket=s3bucket.ref,
policy_document={
"version": "2012-10-17",
"statement": [
{
"sid": "AllowSSLRequestsOnly_qlgo6m",
"effect": "Deny",
"principal": "*",
"action": "s3:*",
"resource": [
"arn:aws:s3:::example-s3-bucket",
"arn:aws:s3:::example-s3-bucket/*"
],
"condition": {
"bool": {
"aws:_secure_transport": "false"
}
}
},
{
"sid": "AWSLogDeliveryWrite",
"effect": "Allow",
"principal": {
"service": "delivery.logs.amazonaws.com"
},
"action": "s3:PutObject",
"resource": "arn:aws:s3:::example-s3-bucket/logs/AWSLogs/XXXXXXXXXXX/*",
"condition": {
"string_equals": {
"aws:_source_account": "XXXXXXXXXXX",
"s3:x-amz-acl": "bucket-owner-full-control"
},
"arn_like": {
"aws:_source_arn": "arn:aws:logs:YOURREGION:XXXXXXXXXXX:*"
}
}
},
{
"sid": "AWSLogDeliveryAclCheck",
"effect": "Allow",
"principal": {
"service": "delivery.logs.amazonaws.com"
},
"action": "s3:GetBucketAcl",
"resource": "arn:aws:s3:::example-s3-bucket",
"condition": {
"string_equals": {
"aws:_source_account": "XXXXXXXXXXX"
},
"arn_like": {
"aws:_source_arn": "arn:aws:logs:YOURREGION:XXXXXXXXXXX:*"
}
}
}
]
}
)
iamrole = iam.CfnRole(
self,
"IAMRole",
path="/",
role_name="msk-connect-role01",
assume_role_policy_document="{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Service\":\"kafkaconnect.amazonaws.com\"},\"Action\":\"sts:AssumeRole\"}]}",
max_session_duration=3600,
managed_policy_arns=[
iammanagedpolicy.ref
],
description="Allows MSK Connect to access AWS resources on your behalf."
)
iampolicy = iam.CfnPolicy(
self,
"IAMPolicy",
policy_document='''
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kafka-cluster:Connect",
"kafka-cluster:AlterCluster",
"kafka-cluster:DescribeCluster"
],
"Resource": [
"arn:aws:kafka:YOURREGION:XXXXXXXXXXX:cluster/YOURCLUSTERNAME/743f4f31-55a2-41c7-961f-ef5eed02b978-7"
]
},
{
"Effect": "Allow",
"Action": [
"kafka-cluster:*Topic*",
"kafka-cluster:WriteData",
"kafka-cluster:ReadData"
],
"Resource": [
"arn:aws:kafka:YOURREGION:XXXXXXXXXXX:topic/YOURCLUSTERNAME/*"
]
},
{
"Effect": "Allow",
"Action": [
"kafka-cluster:AlterGroup",
"kafka-cluster:DescribeGroup"
],
"Resource": [
"arn:aws:kafka:YOURREGION:XXXXXXXXXXX:group/YOURCLUSTERNAME/*"
]
}
]
}
''',
roles=[
iamrole.ref
],
policy_name="msk-inline-policy"
)
logsloggroup = logs.CfnLogGroup(
self,
"LogsLogGroup",
log_group_name="msk-test-connect",
retention_in_days=90
)
app = cdk.App()
MyStack(app, "my-stack-name", env={'region': 'YOURREGION'})
app.synth()
Recap
We deployed an MSK Connect plugin, along with a MSK Connect (connector) that streams data from a topic
in a AWS MSK Cluster to a Amazon S3 bucket.
Retrospective
When creating this initially, it was difficult to pin down the correct AWS IAM permissions. Hopefully this helps those that are struggling with the permissions, as it should provide a secondary perspective on what works!