Demistify AWS IAM permissions required for AWS MSK Connect

Share on:

Demystifying AWS MSK Connect IAM permissions

MSK Connect requires a bit of well-defined IAM (Identity and Access Management) permissions to make it successful. I’m here to help navigate your way through the required permissions to utilize AWS MSK Connect!

Table of Contents

Preamble

We have a few assumptions going into this blog post..

  1. You have a functioning AWS account, that utilizes a region that supports AWS MSK
  2. You understand what AWS MSK is and how AWS MSK Connect interacts with it
  3. You have a base understanding of AWS IAM and how service principles, grants and similar function
  4. You’re ready to learn and ask questions, because it took me a few times to get this right (and why I’m sharing it with the community!)

Where to start?

Topic Creation and MSK Connect Custom Plugin

  1. Create two Kafka topics in the MSK cluster: source_topic and target_topic
  2. Download and create the MSK Connect Custom Plugin (Lenses AWS S3 Connector) accordingly:
    • Download it (link above)
    • Upload it to a private, Amazon S3 bucket of your choosing that you have S3:GetObject access to (likely created previously in this guide)
    • Browse to: Amazon MSK > Custom plugins > Create custom plugin
    • Specify the S3 URI - Custom plugin object
    • Define a name for the plugin, such as kafka-connect-s3
    • Add an optional description
    • Select Create custom plugin

IAM

  1. Create an IAM role and policies with the following:
    • A trust relationship containing:

       {
           "Version": "2012-10-17",
           "Statement": [
               {
                   "Effect": "Allow",
                   "Principal": {
                       "Service": "kafkaconnect.amazonaws.com"
                   },
                   "Action": "sts:AssumeRole"
               }
           ]
       }
      
  • An IAM policy with the following:

     {
    
       "Version": "2012-10-17",
    
       "Statement": [
    
       {
    
       "Effect": "Allow",
    
       "Action": [
    
       "kafkaconnect:*",
    
       "ec2:CreateNetworkInterface",
    
       "ec2:DescribeSubnets",
    
       "ec2:DescribeVpcs",
    
       "ec2:DescribeSecurityGroups",
    
       "logs:CreateLogDelivery",
    
       "logs:GetLogDelivery",
    
       "logs:DeleteLogDelivery",
    
       "logs:ListLogDeliveries",
    
       "logs:PutResourcePolicy",
    
       "logs:DescribeResourcePolicies",
    
       "logs:DescribeLogGroups"
    
       ],
    
       "Resource": "*"
    
       },
    
       {
    
       "Effect": "Allow",
    
       "Action": "iam:CreateServiceLinkedRole",
    
       "Resource": "arn:aws:iam::*:role/aws-service-role/kafkaconnect.amazonaws.com/AWSServiceRoleForKafkaConnect*",
    
       "Condition": {
    
       "StringLike": {
    
       "iam:AWSServiceName": "kafkaconnect.amazonaws.com"
    
       }
    
       }
    
       },
    
       {
    
       "Effect": "Allow",
    
       "Action": [
    
       "iam:AttachRolePolicy",
    
       "iam:PutRolePolicy"
    
       ],
    
       "Resource": "arn:aws:iam::*:role/aws-service-role/kafkaconnect.amazonaws.com/AWSServiceRoleForKafkaConnect*"
    
       },
    
       {
    
       "Effect": "Allow",
    
       "Action": "iam:CreateServiceLinkedRole",
    
       "Resource": "arn:aws:iam::*:role/aws-service-role/delivery.logs.amazonaws.com/AWSServiceRoleForLogDelivery*",
    
       "Condition": {
    
       "StringLike": {
    
       "iam:AWSServiceName": "delivery.logs.amazonaws.com"
    
       }
    
       }
    
       },
    
       {
    
       "Effect": "Allow",
    
       "Action": "s3:*",
    
       "Resource": [
    
       "arn:aws:s3:::example-s3-bucket/*",
    
       "arn:aws:s3:::example-s3-bucket"
    
       ]
    
       },
    
       {
    
       "Effect": "Allow",
    
       "Action": "iam:PassRole",
    
       "Resource": "arn:aws:iam::XXXXXXXXXXX:role/aws-service-role/kafkaconnect.amazonaws.com/AWSServiceRoleForKafkaConnect"
    
       },
    
       {
    
       "Effect": "Allow",
    
       "Action": "s3:ListAllMyBuckets",
    
       "Resource": [
    
       "arn:aws:s3:::example-s3-bucket/*",
    
       "arn:aws:s3:::example-s3-bucket"
    
       ]
    
       },
    
       {
    
       "Effect": "Allow",
    
       "Action": "firehose:TagDeliveryStream",
    
       "Resource": "arn:aws:s3:::example-s3-bucket/*"
    
       },
    
       {
    
       "Effect": "Allow",
    
       "Action": [
    
       "kafka-cluster:Connect",
    
       "kafka-cluster:AlterCluster",
    
       "kafka-cluster:DescribeCluster"
    
       ],
    
       "Resource": [
    
       "arn:aws:kafka:uYOURREGION:XXXXXXXXX:cluster/msk-cluster-name/UUID"
    
       ]
    
       },
    
       {
    
       "Effect": "Allow",
    
       "Action": [
    
       "kafka-cluster:*Topic*",
    
       "kafka-cluster:WriteData",
    
       "kafka-cluster:ReadData"
    
       ],
    
       "Resource": [
    
       "arn:aws:kafka:us-east-1:XXXXXXXXX:topic/msk-cluster-name/*"
    
       ]
    
       },
    
       {
    
       "Effect": "Allow",
    
       "Action": [
    
       "kafka-cluster:AlterGroup",
    
       "kafka-cluster:DescribeGroup"
    
       ],
    
       "Resource": [
    
       "arn:aws:kafka:us-east-1:XXXXXXXXX:group/msk-cluster-name/*"
    
       ]
    
       }
    
       ]
    
     }
    

Ensure you’re replacing any MSK Cluster ARN’s and Amazon S3 bucket names accordingly.

Creation of MSK Connect

  1. Execute the creation of the MSK Connect plugin:

       aws kafkaconnect create-connector \
       --capacity "autoScaling={maxWorkerCount=2,mcuCount=1,minWorkerCount=1,scaleInPolicy={cpuUtilizationPercentage=10},scaleOutPolicy={cpuUtilizationPercentage=80}}" \
       --connector-configuration \
       "connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector, \
       key.converter.schemas.enable=false, \
       connect.s3.kcql=INSERT INTO example-s3-bucket:msk_cluster_dump SELECT * FROM EXAMPLE_MSK_TOPIC STOREAS \`JSON\` WITHPARTITIONER=KeysAndValues WITH_FLUSH_COUNT = 5, \
       aws.region=YOURREGION, \
       tasks.max=2, \
       topics=EXAMPLE_MSK_TOPIC, \
       schema.enable=false, \
       errors.log.enable=true, \
       value.converter=org.apache.kafka.connect.storage.StringConverter, \
       key.converter=org.apache.kafka.connect.storage.StringConverter " \
       --connector-name "backup-msk-to-s3-v1" \
       --kafka-cluster '{"apacheKafkaCluster": {"bootstrapServers": "b-1.example-cluster.gw00wb.c7.kafka.YOURREGION.amazonaws.com:9098,b-3.example-cluster.gw00wb.c7.kafka.YOURREGION.amazonaws.com:9098,b-1.example-cluster.gw00wb.c7.kafka.YOURREGION.amazonaws.com:9098","vpc": {"securityGroups": [ "sg-XXX" ],"subnets": [ "subnet-XXX","subnet-XXX","subnet-XXX" ]}}}' \
       --kafka-cluster-client-authentication "authenticationType=IAM" \
       --kafka-cluster-encryption-in-transit "encryptionType=TLS" \
       --kafka-connect-version "2.7.1" \
       --plugins "customPlugin={customPluginArn="arn:aws:kafkaconnect:YOURREGION:XXXXXXXXXXX:custom-plugin/msk-connect/REPLACEME",revision=1}" \
       --service-execution-role-arn "arn:aws:iam::XXXXXXXXXXX:role/msk-example-role01" \
       --workerLogDelivery={cloudWatchLogs={enabled=boolean,logGroup="arn:aws:logs:YOURREGION:XXXXXXXXXXX:log-group:example-msk-loggroup:*"}}
    

Notable points above to replace:

  • Replace example-s3-bucket with your Amazon S3 bucket you created in which to dump topic messages to\from
  • Replace YOURREGION with your region, such as us-east-1
  • Replace EXAMPLE_MSK_TOPIC with your MSK topic name
  • Define your connector name
  • Define your MSK cluster broker’s; this can be found on the MSK Cluster overview page or by retrieving it with aws kafka describe-cluster \ --cluster-arn CLUSTER_ARN
  • Replace security groups (sg-XXX) that are used with one that can communicate with the MSK cluster
  • Replace the subnet-XXX values with where the MSK cluster resides
  • Define the service-execution-role-arn to be the AWS IAM role you defined in earlier
  • Set your workerLogDelivery ARN accordingly (replace example-msk-loggroup with your preferred CloudWatch log group to dump logs to
  1. You should see the AWS MSK Connect connector creating (utilize the AWS MSK Connect AWS CLI to demonstrate this

Deployment via AWS-CDK v2.x

This is an experimental deployment, that is rendered via former2 and provided for additional context:

Learn more, here!

from aws_cdk import (
    aws_iam as iam,
    aws_kafkaconnect as kafkaconnect,
    aws_s3 as s3,
    aws_logs as logs,
    core as cdk
)

class MyStack(cdk.Stack):
    def __init__(self, scope: cdk.Construct, id: str, **kwargs) -> None:
        super().__init__(scope, id, **kwargs)

        iammanagedpolicy = iam.CfnManagedPolicy(
            self,
            "IAMManagedPolicy",
            managed_policy_name="msk-test-connect",
            path="/",
            policy_document='''
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafkaconnect:*",
                "ec2:CreateNetworkInterface",
                "ec2:DescribeSubnets",
                "ec2:DescribeVpcs",
                "ec2:DescribeSecurityGroups",
                "logs:CreateLogDelivery",
                "logs:GetLogDelivery",
                "logs:DeleteLogDelivery",
                "logs:ListLogDeliveries",
                "logs:PutResourcePolicy",
                "logs:DescribeResourcePolicies",
                "logs:DescribeLogGroups"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": "iam:CreateServiceLinkedRole",
            "Resource": "arn:aws:iam::*:role/aws-service-role/kafkaconnect.amazonaws.com/AWSServiceRoleForKafkaConnect*",
            "Condition": {
                "StringLike": {
                    "iam:AWSServiceName": "kafkaconnect.amazonaws.com"
                }
            }
        },
        {
            "Effect": "Allow",
            "Action": [
                "iam:AttachRolePolicy",
                "iam:PutRolePolicy"
            ],
            "Resource": "arn:aws:iam::*:role/aws-service-role/kafkaconnect.amazonaws.com/AWSServiceRoleForKafkaConnect*"
        },
        {
            "Effect": "Allow",
            "Action": "iam:CreateServiceLinkedRole",
            "Resource": "arn:aws:iam::*:role/aws-service-role/delivery.logs.amazonaws.com/AWSServiceRoleForLogDelivery*",
            "Condition": {
                "StringLike": {
                    "iam:AWSServiceName": "delivery.logs.amazonaws.com"
                }
            }
        },
        {
            "Effect": "Allow",
            "Action": "s3:*",
            "Resource": [
                "arn:aws:s3:::example-s3-bucket/*",
                "arn:aws:s3:::example-s3-bucket"
            ]
        },
        {
            "Effect": "Allow",
            "Action": "iam:PassRole",
            "Resource": "arn:aws:iam::XXXXXXXXXXX:role/aws-service-role/kafkaconnect.amazonaws.com/AWSServiceRoleForKafkaConnect"
        },
        {
            "Effect": "Allow",
            "Action": "s3:ListAllMyBuckets",
            "Resource": [
                "arn:aws:s3:::example-s3-bucket/*",
                "arn:aws:s3:::example-s3-bucket"
            ]
        },
        {
            "Effect": "Allow",
            "Action": "firehose:TagDeliveryStream",
            "Resource": "arn:aws:s3:::example-s3-bucket/*"
        }
    ]
}
'''
        )

        kafkaconnectconnector = kafkaconnect.CfnConnector(
            self,
            "KafkaConnectConnector",
            capacity={
                "auto_scaling": {
                    "max_worker_count": 2,
                    "mcu_count": 1,
                    "min_worker_count": 1,
                    "scale_in_policy": {
                        "cpu_utilization_percentage": 10
                    },
                    "scale_out_policy": {
                        "cpu_utilization_percentage": 80
                    }
                }
            },
            connector_configuration={
                "connector.class": "io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector",
                "key.converter.schemas.enable": "false",
                "connect.s3.kcql": "INSERT INTO example-s3-bucket:msk_cluster_dump SELECT * FROM SCHEDULED_APPOINTMENT_NOTIFICATION STOREAS `JSON` WITHPARTITIONER=KeysAndValues WITH_FLUSH_COUNT = 5",
                "aws.region": "YOURREGION",
                "tasks.max": "2",
                "topics": "SCHEDULED_APPOINTMENT_NOTIFICATION",
                "schema.enable": "false",
                "value.converter": "org.apache.kafka.connect.storage.StringConverter",
                "errors.log.enable": "true",
                "key.converter": "org.apache.kafka.connect.storage.StringConverter"
            },
            connector_name="backup-msk-to-s3-v1",
            kafka_cluster={
                "apache_kafka_cluster": {
                    "bootstrap_servers": "b-2.YOURCLUSTERNAME.XXXX.REPLACE.kafka.YOURREGION.amazonaws.com:9098,b-3.YOURCLUSTERNAME.XXXX.REPLACE.kafka.YOURREGION.amazonaws.com:9098,b-1.YOURCLUSTERNAME.XXXX.REPLACE.kafka.YOURREGION.amazonaws.com:9098",
                    "vpc": {
                        "security_groups": [
                            "sg-0f2f80cd8cf9bee0b"
                        ],
                        "subnets": [
                            "subnet-071bd02cc078c08f1",
                            "subnet-02c50c2901e0fbde9",
                            "subnet-06e592c3ad621a6cb"
                        ]
                    }
                }
            },
            kafka_cluster_client_authentication={
                "authentication_type": "IAM"
            },
            kafka_cluster_encryption_in_transit={
                "encryption_type": "TLS"
            },
            kafka_connect_version="2.7.1",
            plugins=[
                {
                    "custom_plugin": {
                        "custom_plugin_arn": "arn:aws:kafkaconnect:YOURREGION:XXXXXXXXXXX:custom-plugin/msk-connect/CLUSTERUUIDREPLACEME",
                        "revision": 1
                    }
                }
            ],
            service_execution_role_arn=iamrole.attr_arn
        )

        s3bucket = s3.CfnBucket(
            self,
            "S3Bucket",
            bucket_name="example-s3-bucket",
            bucket_encryption={
                "server_side_encryption_configuration": [
                    {
                        "server_side_encryption_by_default": {
                            "s_s_e_algorithm": "AES256"
                        },
                        "bucket_key_enabled": False
                    }
                ]
            },
            logging_configuration={
                "destination_bucket_name": "XXXXXXXXXXX-s3-accesslogs",
                "log_file_prefix": "example-s3-bucket-access-logs"
            },
            versioning_configuration={
                "status": "Enabled"
            }
        )

        s3bucketpolicy = s3.CfnBucketPolicy(
            self,
            "S3BucketPolicy",
            bucket="XXXXXXXXXXX-s3-accesslogs",
            policy_document={
                "version": "2012-10-17",
                "statement": [
                    {
                        "effect": "Deny",
                        "principal": "*",
                        "action": "*",
                        "resource": "arn:aws:s3:::XXXXXXXXXXX-s3-accesslogs/*",
                        "condition": {
                            "bool": {
                                "aws:_secure_transport": "false"
                            }
                        }
                    }
                ]
            }
        )

        s3bucketpolicy2 = s3.CfnBucketPolicy(
            self,
            "S3BucketPolicy2",
            bucket=s3bucket.ref,
            policy_document={
                "version": "2012-10-17",
                "statement": [
                    {
                        "sid": "AllowSSLRequestsOnly_qlgo6m",
                        "effect": "Deny",
                        "principal": "*",
                        "action": "s3:*",
                        "resource": [
                            "arn:aws:s3:::example-s3-bucket",
                            "arn:aws:s3:::example-s3-bucket/*"
                        ],
                        "condition": {
                            "bool": {
                                "aws:_secure_transport": "false"
                            }
                        }
                    },
                    {
                        "sid": "AWSLogDeliveryWrite",
                        "effect": "Allow",
                        "principal": {
                            "service": "delivery.logs.amazonaws.com"
                        },
                        "action": "s3:PutObject",
                        "resource": "arn:aws:s3:::example-s3-bucket/logs/AWSLogs/XXXXXXXXXXX/*",
                        "condition": {
                            "string_equals": {
                                "aws:_source_account": "XXXXXXXXXXX",
                                "s3:x-amz-acl": "bucket-owner-full-control"
                            },
                            "arn_like": {
                                "aws:_source_arn": "arn:aws:logs:YOURREGION:XXXXXXXXXXX:*"
                            }
                        }
                    },
                    {
                        "sid": "AWSLogDeliveryAclCheck",
                        "effect": "Allow",
                        "principal": {
                            "service": "delivery.logs.amazonaws.com"
                        },
                        "action": "s3:GetBucketAcl",
                        "resource": "arn:aws:s3:::example-s3-bucket",
                        "condition": {
                            "string_equals": {
                                "aws:_source_account": "XXXXXXXXXXX"
                            },
                            "arn_like": {
                                "aws:_source_arn": "arn:aws:logs:YOURREGION:XXXXXXXXXXX:*"
                            }
                        }
                    }
                ]
            }
        )

        iamrole = iam.CfnRole(
            self,
            "IAMRole",
            path="/",
            role_name="msk-connect-role01",
            assume_role_policy_document="{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Service\":\"kafkaconnect.amazonaws.com\"},\"Action\":\"sts:AssumeRole\"}]}",
            max_session_duration=3600,
            managed_policy_arns=[
                iammanagedpolicy.ref
            ],
            description="Allows MSK Connect to access AWS resources on your behalf."
        )

        iampolicy = iam.CfnPolicy(
            self,
            "IAMPolicy",
            policy_document='''
{
    "Version": "2012-10-17",
    "Statement": [
{
"Effect": "Allow",
"Action": [
"kafka-cluster:Connect",
"kafka-cluster:AlterCluster",
"kafka-cluster:DescribeCluster"
],
"Resource": [
"arn:aws:kafka:YOURREGION:XXXXXXXXXXX:cluster/YOURCLUSTERNAME/743f4f31-55a2-41c7-961f-ef5eed02b978-7"
]
},
{
"Effect": "Allow",
"Action": [
"kafka-cluster:*Topic*",
"kafka-cluster:WriteData",
"kafka-cluster:ReadData"
],
"Resource": [
"arn:aws:kafka:YOURREGION:XXXXXXXXXXX:topic/YOURCLUSTERNAME/*"
]
},
{
"Effect": "Allow",
"Action": [
"kafka-cluster:AlterGroup",
"kafka-cluster:DescribeGroup"
],
"Resource": [
"arn:aws:kafka:YOURREGION:XXXXXXXXXXX:group/YOURCLUSTERNAME/*"
]
}
    ]
}
''',
            roles=[
                iamrole.ref
            ],
            policy_name="msk-inline-policy"
        )

        logsloggroup = logs.CfnLogGroup(
            self,
            "LogsLogGroup",
            log_group_name="msk-test-connect",
            retention_in_days=90
        )


app = cdk.App()
MyStack(app, "my-stack-name", env={'region': 'YOURREGION'})
app.synth()

Recap

We deployed an MSK Connect plugin, along with a MSK Connect (connector) that streams data from a topic in a AWS MSK Cluster to a Amazon S3 bucket.

Retrospective

When creating this initially, it was difficult to pin down the correct AWS IAM permissions. Hopefully this helps those that are struggling with the permissions, as it should provide a secondary perspective on what works!