Demistify AWS IAM permissions required for AWS MSK Connect
Demystifying AWS MSK Connect IAM permissions
MSK Connect requires a bit of well-defined IAM (Identity and Access Management) permissions to make it successful. I’m here to help navigate your way through the required permissions to utilize AWS MSK Connect!
Table of Contents
Preamble
We have a few assumptions going into this blog post..
- You have a functioning AWS account, that utilizes a region that supports
AWS MSK - You understand what
AWS MSKis and howAWS MSK Connectinteracts with it - You have a base understanding of AWS IAM and how service principles, grants and similar function
- You’re ready to learn and ask questions, because it took me a few times to get this right (and why I’m sharing it with the community!)
Where to start?
-
Create a Amazon S3 bucket to support the
AWS MSK Connectresources (see below for options) -
Deploy an AWS MSK cluster (see below for options)
Deploy an AWS MSK Cluster (AWS Developer Guide HashiCorp Terraform Resource HashiCorp Terraform Module (provided by cloudposse) AWS CDK Terraform Construct
Topic Creation and MSK Connect Custom Plugin
- Create two Kafka topics in the MSK cluster:
source_topicandtarget_topic - Download and create the MSK Connect Custom Plugin (Lenses AWS S3 Connector) accordingly:
- Download it (link above)
- Upload it to a private, Amazon S3 bucket of your choosing that you have
S3:GetObjectaccess to (likely created previously in this guide) - Browse to: Amazon MSK > Custom plugins >
Create custom plugin - Specify the
S3 URI - Custom plugin object - Define a name for the plugin, such as kafka-connect-s3
- Add an optional description
- Select
Create custom plugin
IAM
- Create an IAM role and policies with the following:
-
A
trust relationshipcontaining:{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "kafkaconnect.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }
-
-
An
IAM policywith the following:{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafkaconnect:*", "ec2:CreateNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:DescribeSecurityGroups", "logs:CreateLogDelivery", "logs:GetLogDelivery", "logs:DeleteLogDelivery", "logs:ListLogDeliveries", "logs:PutResourcePolicy", "logs:DescribeResourcePolicies", "logs:DescribeLogGroups" ], "Resource": "*" }, { "Effect": "Allow", "Action": "iam:CreateServiceLinkedRole", "Resource": "arn:aws:iam::*:role/aws-service-role/kafkaconnect.amazonaws.com/AWSServiceRoleForKafkaConnect*", "Condition": { "StringLike": { "iam:AWSServiceName": "kafkaconnect.amazonaws.com" } } }, { "Effect": "Allow", "Action": [ "iam:AttachRolePolicy", "iam:PutRolePolicy" ], "Resource": "arn:aws:iam::*:role/aws-service-role/kafkaconnect.amazonaws.com/AWSServiceRoleForKafkaConnect*" }, { "Effect": "Allow", "Action": "iam:CreateServiceLinkedRole", "Resource": "arn:aws:iam::*:role/aws-service-role/delivery.logs.amazonaws.com/AWSServiceRoleForLogDelivery*", "Condition": { "StringLike": { "iam:AWSServiceName": "delivery.logs.amazonaws.com" } } }, { "Effect": "Allow", "Action": "s3:*", "Resource": [ "arn:aws:s3:::example-s3-bucket/*", "arn:aws:s3:::example-s3-bucket" ] }, { "Effect": "Allow", "Action": "iam:PassRole", "Resource": "arn:aws:iam::XXXXXXXXXXX:role/aws-service-role/kafkaconnect.amazonaws.com/AWSServiceRoleForKafkaConnect" }, { "Effect": "Allow", "Action": "s3:ListAllMyBuckets", "Resource": [ "arn:aws:s3:::example-s3-bucket/*", "arn:aws:s3:::example-s3-bucket" ] }, { "Effect": "Allow", "Action": "firehose:TagDeliveryStream", "Resource": "arn:aws:s3:::example-s3-bucket/*" }, { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:AlterCluster", "kafka-cluster:DescribeCluster" ], "Resource": [ "arn:aws:kafka:uYOURREGION:XXXXXXXXX:cluster/msk-cluster-name/UUID" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:*Topic*", "kafka-cluster:WriteData", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:us-east-1:XXXXXXXXX:topic/msk-cluster-name/*" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:AlterGroup", "kafka-cluster:DescribeGroup" ], "Resource": [ "arn:aws:kafka:us-east-1:XXXXXXXXX:group/msk-cluster-name/*" ] } ] }
Ensure you’re replacing any MSK Cluster ARN’s and Amazon S3 bucket names accordingly.
Creation of MSK Connect
-
Execute the creation of the MSK Connect plugin:
aws kafkaconnect create-connector \ --capacity "autoScaling={maxWorkerCount=2,mcuCount=1,minWorkerCount=1,scaleInPolicy={cpuUtilizationPercentage=10},scaleOutPolicy={cpuUtilizationPercentage=80}}" \ --connector-configuration \ "connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector, \ key.converter.schemas.enable=false, \ connect.s3.kcql=INSERT INTO example-s3-bucket:msk_cluster_dump SELECT * FROM EXAMPLE_MSK_TOPIC STOREAS \`JSON\` WITHPARTITIONER=KeysAndValues WITH_FLUSH_COUNT = 5, \ aws.region=YOURREGION, \ tasks.max=2, \ topics=EXAMPLE_MSK_TOPIC, \ schema.enable=false, \ errors.log.enable=true, \ value.converter=org.apache.kafka.connect.storage.StringConverter, \ key.converter=org.apache.kafka.connect.storage.StringConverter " \ --connector-name "backup-msk-to-s3-v1" \ --kafka-cluster '{"apacheKafkaCluster": {"bootstrapServers": "b-1.example-cluster.gw00wb.c7.kafka.YOURREGION.amazonaws.com:9098,b-3.example-cluster.gw00wb.c7.kafka.YOURREGION.amazonaws.com:9098,b-1.example-cluster.gw00wb.c7.kafka.YOURREGION.amazonaws.com:9098","vpc": {"securityGroups": [ "sg-XXX" ],"subnets": [ "subnet-XXX","subnet-XXX","subnet-XXX" ]}}}' \ --kafka-cluster-client-authentication "authenticationType=IAM" \ --kafka-cluster-encryption-in-transit "encryptionType=TLS" \ --kafka-connect-version "2.7.1" \ --plugins "customPlugin={customPluginArn="arn:aws:kafkaconnect:YOURREGION:XXXXXXXXXXX:custom-plugin/msk-connect/REPLACEME",revision=1}" \ --service-execution-role-arn "arn:aws:iam::XXXXXXXXXXX:role/msk-example-role01" \ --workerLogDelivery={cloudWatchLogs={enabled=boolean,logGroup="arn:aws:logs:YOURREGION:XXXXXXXXXXX:log-group:example-msk-loggroup:*"}}
Notable points above to replace:
- Replace
example-s3-bucketwith your Amazon S3 bucket you created in which to dump topic messages to\from - Replace
YOURREGIONwith your region, such as us-east-1 - Replace
EXAMPLE_MSK_TOPICwith your MSK topic name - Define your connector name
- Define your MSK cluster broker’s; this can be found on the MSK Cluster overview page or by retrieving it with
aws kafka describe-cluster \ --cluster-arn CLUSTER_ARN - Replace security groups (
sg-XXX) that are used with one that can communicate with the MSK cluster - Replace the
subnet-XXXvalues with where the MSK cluster resides - Define the
service-execution-role-arnto be the AWS IAM role you defined in earlier - Set your workerLogDelivery ARN accordingly (replace
example-msk-loggroupwith your preferred CloudWatch log group to dump logs to
- You should see the
AWS MSK Connectconnector creating (utilize the AWS MSK Connect AWS CLI to demonstrate this
Deployment via AWS-CDK v2.x
This is an experimental deployment, that is rendered via former2 and provided for additional context:
Learn more, here!
from aws_cdk import (
aws_iam as iam,
aws_kafkaconnect as kafkaconnect,
aws_s3 as s3,
aws_logs as logs,
core as cdk
)
class MyStack(cdk.Stack):
def __init__(self, scope: cdk.Construct, id: str, **kwargs) -> None:
super().__init__(scope, id, **kwargs)
iammanagedpolicy = iam.CfnManagedPolicy(
self,
"IAMManagedPolicy",
managed_policy_name="msk-test-connect",
path="/",
policy_document='''
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kafkaconnect:*",
"ec2:CreateNetworkInterface",
"ec2:DescribeSubnets",
"ec2:DescribeVpcs",
"ec2:DescribeSecurityGroups",
"logs:CreateLogDelivery",
"logs:GetLogDelivery",
"logs:DeleteLogDelivery",
"logs:ListLogDeliveries",
"logs:PutResourcePolicy",
"logs:DescribeResourcePolicies",
"logs:DescribeLogGroups"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": "iam:CreateServiceLinkedRole",
"Resource": "arn:aws:iam::*:role/aws-service-role/kafkaconnect.amazonaws.com/AWSServiceRoleForKafkaConnect*",
"Condition": {
"StringLike": {
"iam:AWSServiceName": "kafkaconnect.amazonaws.com"
}
}
},
{
"Effect": "Allow",
"Action": [
"iam:AttachRolePolicy",
"iam:PutRolePolicy"
],
"Resource": "arn:aws:iam::*:role/aws-service-role/kafkaconnect.amazonaws.com/AWSServiceRoleForKafkaConnect*"
},
{
"Effect": "Allow",
"Action": "iam:CreateServiceLinkedRole",
"Resource": "arn:aws:iam::*:role/aws-service-role/delivery.logs.amazonaws.com/AWSServiceRoleForLogDelivery*",
"Condition": {
"StringLike": {
"iam:AWSServiceName": "delivery.logs.amazonaws.com"
}
}
},
{
"Effect": "Allow",
"Action": "s3:*",
"Resource": [
"arn:aws:s3:::example-s3-bucket/*",
"arn:aws:s3:::example-s3-bucket"
]
},
{
"Effect": "Allow",
"Action": "iam:PassRole",
"Resource": "arn:aws:iam::XXXXXXXXXXX:role/aws-service-role/kafkaconnect.amazonaws.com/AWSServiceRoleForKafkaConnect"
},
{
"Effect": "Allow",
"Action": "s3:ListAllMyBuckets",
"Resource": [
"arn:aws:s3:::example-s3-bucket/*",
"arn:aws:s3:::example-s3-bucket"
]
},
{
"Effect": "Allow",
"Action": "firehose:TagDeliveryStream",
"Resource": "arn:aws:s3:::example-s3-bucket/*"
}
]
}
'''
)
kafkaconnectconnector = kafkaconnect.CfnConnector(
self,
"KafkaConnectConnector",
capacity={
"auto_scaling": {
"max_worker_count": 2,
"mcu_count": 1,
"min_worker_count": 1,
"scale_in_policy": {
"cpu_utilization_percentage": 10
},
"scale_out_policy": {
"cpu_utilization_percentage": 80
}
}
},
connector_configuration={
"connector.class": "io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector",
"key.converter.schemas.enable": "false",
"connect.s3.kcql": "INSERT INTO example-s3-bucket:msk_cluster_dump SELECT * FROM SCHEDULED_APPOINTMENT_NOTIFICATION STOREAS `JSON` WITHPARTITIONER=KeysAndValues WITH_FLUSH_COUNT = 5",
"aws.region": "YOURREGION",
"tasks.max": "2",
"topics": "SCHEDULED_APPOINTMENT_NOTIFICATION",
"schema.enable": "false",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"errors.log.enable": "true",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
},
connector_name="backup-msk-to-s3-v1",
kafka_cluster={
"apache_kafka_cluster": {
"bootstrap_servers": "b-2.YOURCLUSTERNAME.XXXX.REPLACE.kafka.YOURREGION.amazonaws.com:9098,b-3.YOURCLUSTERNAME.XXXX.REPLACE.kafka.YOURREGION.amazonaws.com:9098,b-1.YOURCLUSTERNAME.XXXX.REPLACE.kafka.YOURREGION.amazonaws.com:9098",
"vpc": {
"security_groups": [
"sg-0f2f80cd8cf9bee0b"
],
"subnets": [
"subnet-071bd02cc078c08f1",
"subnet-02c50c2901e0fbde9",
"subnet-06e592c3ad621a6cb"
]
}
}
},
kafka_cluster_client_authentication={
"authentication_type": "IAM"
},
kafka_cluster_encryption_in_transit={
"encryption_type": "TLS"
},
kafka_connect_version="2.7.1",
plugins=[
{
"custom_plugin": {
"custom_plugin_arn": "arn:aws:kafkaconnect:YOURREGION:XXXXXXXXXXX:custom-plugin/msk-connect/CLUSTERUUIDREPLACEME",
"revision": 1
}
}
],
service_execution_role_arn=iamrole.attr_arn
)
s3bucket = s3.CfnBucket(
self,
"S3Bucket",
bucket_name="example-s3-bucket",
bucket_encryption={
"server_side_encryption_configuration": [
{
"server_side_encryption_by_default": {
"s_s_e_algorithm": "AES256"
},
"bucket_key_enabled": False
}
]
},
logging_configuration={
"destination_bucket_name": "XXXXXXXXXXX-s3-accesslogs",
"log_file_prefix": "example-s3-bucket-access-logs"
},
versioning_configuration={
"status": "Enabled"
}
)
s3bucketpolicy = s3.CfnBucketPolicy(
self,
"S3BucketPolicy",
bucket="XXXXXXXXXXX-s3-accesslogs",
policy_document={
"version": "2012-10-17",
"statement": [
{
"effect": "Deny",
"principal": "*",
"action": "*",
"resource": "arn:aws:s3:::XXXXXXXXXXX-s3-accesslogs/*",
"condition": {
"bool": {
"aws:_secure_transport": "false"
}
}
}
]
}
)
s3bucketpolicy2 = s3.CfnBucketPolicy(
self,
"S3BucketPolicy2",
bucket=s3bucket.ref,
policy_document={
"version": "2012-10-17",
"statement": [
{
"sid": "AllowSSLRequestsOnly_qlgo6m",
"effect": "Deny",
"principal": "*",
"action": "s3:*",
"resource": [
"arn:aws:s3:::example-s3-bucket",
"arn:aws:s3:::example-s3-bucket/*"
],
"condition": {
"bool": {
"aws:_secure_transport": "false"
}
}
},
{
"sid": "AWSLogDeliveryWrite",
"effect": "Allow",
"principal": {
"service": "delivery.logs.amazonaws.com"
},
"action": "s3:PutObject",
"resource": "arn:aws:s3:::example-s3-bucket/logs/AWSLogs/XXXXXXXXXXX/*",
"condition": {
"string_equals": {
"aws:_source_account": "XXXXXXXXXXX",
"s3:x-amz-acl": "bucket-owner-full-control"
},
"arn_like": {
"aws:_source_arn": "arn:aws:logs:YOURREGION:XXXXXXXXXXX:*"
}
}
},
{
"sid": "AWSLogDeliveryAclCheck",
"effect": "Allow",
"principal": {
"service": "delivery.logs.amazonaws.com"
},
"action": "s3:GetBucketAcl",
"resource": "arn:aws:s3:::example-s3-bucket",
"condition": {
"string_equals": {
"aws:_source_account": "XXXXXXXXXXX"
},
"arn_like": {
"aws:_source_arn": "arn:aws:logs:YOURREGION:XXXXXXXXXXX:*"
}
}
}
]
}
)
iamrole = iam.CfnRole(
self,
"IAMRole",
path="/",
role_name="msk-connect-role01",
assume_role_policy_document="{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Service\":\"kafkaconnect.amazonaws.com\"},\"Action\":\"sts:AssumeRole\"}]}",
max_session_duration=3600,
managed_policy_arns=[
iammanagedpolicy.ref
],
description="Allows MSK Connect to access AWS resources on your behalf."
)
iampolicy = iam.CfnPolicy(
self,
"IAMPolicy",
policy_document='''
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kafka-cluster:Connect",
"kafka-cluster:AlterCluster",
"kafka-cluster:DescribeCluster"
],
"Resource": [
"arn:aws:kafka:YOURREGION:XXXXXXXXXXX:cluster/YOURCLUSTERNAME/743f4f31-55a2-41c7-961f-ef5eed02b978-7"
]
},
{
"Effect": "Allow",
"Action": [
"kafka-cluster:*Topic*",
"kafka-cluster:WriteData",
"kafka-cluster:ReadData"
],
"Resource": [
"arn:aws:kafka:YOURREGION:XXXXXXXXXXX:topic/YOURCLUSTERNAME/*"
]
},
{
"Effect": "Allow",
"Action": [
"kafka-cluster:AlterGroup",
"kafka-cluster:DescribeGroup"
],
"Resource": [
"arn:aws:kafka:YOURREGION:XXXXXXXXXXX:group/YOURCLUSTERNAME/*"
]
}
]
}
''',
roles=[
iamrole.ref
],
policy_name="msk-inline-policy"
)
logsloggroup = logs.CfnLogGroup(
self,
"LogsLogGroup",
log_group_name="msk-test-connect",
retention_in_days=90
)
app = cdk.App()
MyStack(app, "my-stack-name", env={'region': 'YOURREGION'})
app.synth()
Recap
We deployed an MSK Connect plugin, along with a MSK Connect (connector) that streams data from a topic in a AWS MSK Cluster to a Amazon S3 bucket.
Retrospective
When creating this initially, it was difficult to pin down the correct AWS IAM permissions. Hopefully this helps those that are struggling with the permissions, as it should provide a secondary perspective on what works!