1 year ago

#382724

test-img

sabbysabs

Testcontainers, Localstack SES and Publishing to SNS

I'm using Spring Boot 2.6.3, with testcontainers version 1.16.3 and aws-java-sdk version 1.12.178. I'm trying to create an Integration test using testcontainers and the testcontainer localstack module. The test is to send an email to SES and have SES publish the message to an SNS topic, where an SQS queue is subscribed to the SNS topic.

I've successfully built a test where I publish a message directly to SNS and read the message from the SQS queue, however going through SES the message never makes it to SQS or at the very least is not getting published.

I have to think that I'm not setting the ACL correctly or that maybe SES via Localstack does not allow for publishing to other AWS services.

Where am I going wrong?

build.gradle

plugins {
    id 'java'
    id 'jacoco'
    id 'idea'
    id 'maven-publish'
    id 'signing'
    id 'com.palantir.docker' version '0.32.0'
    id 'net.researchgate.release' version '2.8.1'
    id 'io.spring.dependency-management' version '1.0.11.RELEASE'
    id 'org.springframework.boot' version '2.6.3'
    id 'com.diffplug.spotless' version '6.2.0'
}

dependencyManagement {
    imports {
        mavenBom 'io.awspring.cloud:spring-cloud-aws-dependencies:2.3.3'
        mavenBom 'com.amazonaws:aws-java-sdk-bom:1.12.178'
    }
}

dependencies {
    // implementation
    implementation 'org.springframework.boot:spring-boot-starter-log4j2'
    implementation 'org.springframework.boot:spring-boot-starter-actuator'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.boot:spring-boot-starter-validation'
    implementation 'org.projectlombok:lombok'
    implementation 'net.devh:grpc-server-spring-boot-starter:2.13.1.RELEASE'
    implementation 'org.springframework.data:spring-data-envers'

    

    // These dependencies (spring-boot-starter-data-jpa, liquibase-core, postgresql) are needed to
    // have liquibase startup and execute the changelog if you don't have jpa nothing gets picked
    // up, if you don't have postgresql we bomb out stating that it can't find a driver
    implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
    implementation 'org.liquibase:liquibase-core'
    implementation 'org.postgresql:postgresql'

    implementation 'org.springframework:spring-jms'
    implementation 'com.amazonaws:amazon-sqs-java-messaging-lib:1.0.8'
    implementation 'com.amazonaws:aws-java-sdk-sns'
    implementation 'com.amazonaws:aws-java-sdk-sts'
    implementation 'com.amazonaws:aws-java-sdk-core'
    implementation 'com.amazonaws:aws-java-sdk-ses'
    implementation 'io.awspring.cloud:spring-cloud-starter-aws-secrets-manager-config'

    annotationProcessor 'org.projectlombok:lombok'

    // runtime
    runtimeOnly 'org.apache.logging.log4j:log4j-layout-template-json'

    // testing implementations
    testImplementation 'org.mockito:mockito-inline'
    testImplementation('org.springframework.boot:spring-boot-starter-test') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }
    testImplementation 'org.junit.jupiter:junit-jupiter-api'
    testImplementation 'org.junit.jupiter:junit-jupiter-params'
    testImplementation 'org.junit.jupiter:junit-jupiter-engine'
    testImplementation 'org.testcontainers:testcontainers:1.16.3'
    testImplementation 'org.testcontainers:junit-jupiter:1.16.3'
    testImplementation 'org.testcontainers:postgresql:1.16.3'
    testImplementation "org.testcontainers:localstack:1.16.3"
    integrationTestImplementation('org.springframework.boot:spring-boot-starter-test') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }
}

AbstractIT.java

@ActiveProfiles("it")
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@AutoConfigureTestDatabase(replace = AutoConfigureTestDatabase.Replace.NONE)
@ContextConfiguration(initializers = AbstractIT.DockerPostgreDataSourceInitializer.class)
@Transactional
@Testcontainers
public class AbstractIT {

  public static final PostgreSQLContainer<?> postgresDBContainer =
      new PostgreSQLContainer<>("postgres:12.1");

  static {
    postgresDBContainer.start();
  }

  public static class DockerPostgreDataSourceInitializer
      implements ApplicationContextInitializer<ConfigurableApplicationContext> {

    @Override
    @DynamicPropertySource
    public void initialize(ConfigurableApplicationContext applicationContext) {
      TestPropertySourceUtils.addInlinedPropertiesToEnvironment(
          applicationContext,
          "spring.datasource.web.hikari.jdbc-url=" + postgresDBContainer.getJdbcUrl(),
          "spring.datasource.web.hikari.username=" + postgresDBContainer.getUsername(),
          "spring.datasource.password=" + postgresDBContainer.getPassword());
    }
  }
}

LocalStackAbstractIT.java

@Testcontainers
public class LocalStackAbstractIT extends AbstractIT {

  @ClassRule
  public static final LocalStackContainer localStackContainer =
      new LocalStackContainer(DockerImageName.parse("localstack/localstack").withTag("0.14.2"))
          .withServices(Service.S3, Service.SES, Service.SNS, Service.SQS)
          .withEnv("AWS_ACCESS_KEY_ID", "accesskey")
          .withEnv("AWS_SECRET_ACCESS_KEY", "secretkey");

  static {
    localStackContainer.start();
    try {
      verifyEmail();
    } catch (IOException | InterruptedException e) {
      e.printStackTrace();
    }
  }

  private static void verifyEmail() throws IOException, InterruptedException {
    String command =
        "aws ses verify-email-identity --email-address no-reply@sabbysabs.com --region us-east-1 --endpoint-url http://localhost:4566";
    ExecResult result = localStackContainer.execInContainer(command.split(" "));
    if (result.getExitCode() != 0) {
      throw new RuntimeException(result.getStderr());
    }
  }

  @DynamicPropertySource
  static void overrideConfiguration(DynamicPropertyRegistry registry) {
    registry.add(
        "cloud.aws.sqs.endpoint", () -> localStackContainer.getEndpointOverride(Service.SQS));
    registry.add(
        "cloud.aws.s3.endpoint", () -> localStackContainer.getEndpointOverride(Service.S3));
    registry.add(
        "cloud.aws.sns.endpoint", () -> localStackContainer.getEndpointOverride(Service.SNS));
    registry.add(
        "cloud.aws.ses.endpoint", () -> localStackContainer.getEndpointOverride(Service.SES));
    registry.add("cloud.aws.credentials.access-key", localStackContainer::getAccessKey);
    registry.add("cloud.aws.credentials.secret-key", localStackContainer::getSecretKey);
  }

}

EmailNotificationRequestHandlerITTest.java

@AutoConfigureTestEntityManager
public class EmailNotificationRequestHandlerITTest extends LocalStackAbstractIT {
  @Autowired private ApplicationConfig config;
  @Autowired private NotificationRepository notificationRepository;
  @Autowired private TemplateRepository templateRepository;
  @Autowired private EmailNotificationRepository emailNotificationRepository;
  @Autowired private AmazonSNS snsClient;
  @Autowired private AmazonSQS sqsClient;
  @Autowired private AmazonSimpleEmailService sesClient;
  @Autowired private EmailNotificationRequestHandler handler;
  @Autowired private ObjectMapper objectMapper;

  @Test
  public void testHandleEmailNotificationCreate() throws JsonProcessingException {
    ConfigurationSet configurationSet = new ConfigurationSet();
    configurationSet.withName("click-send-local");

    CreateConfigurationSetRequest createConfigurationSetRequest =
            new CreateConfigurationSetRequest();
    createConfigurationSetRequest.withConfigurationSet(configurationSet);
    sesClient.createConfigurationSet(createConfigurationSetRequest);

    CreateTopicResult emailRequestActivityTopic =
        snsClient.createTopic("EmailRequestActivityTopicITTest");
    Condition accountCondition = new StringCondition(StringComparisonType.StringEqualsIgnoreCase, "AWS:SourceAccount", "000000000000");
    Condition sourceArnCondition = new StringCondition(StringComparisonType.StringEqualsIgnoreCase, "AWS:SourceArn", "arn:aws:ses:us-east-1:000000000000:configuration-set/click-send-local");
    Policy policy =
        new Policy()
            .withStatements(
                new Statement(Effect.Allow)
                    .withPrincipals(Principal.All)
                    .withActions(SNSActions.Publish)
                    .withResources(new Resource(emailRequestActivityTopic.getTopicArn()))
                    .withConditions(accountCondition, sourceArnCondition));
    SetTopicAttributesRequest setTopicAttributesRequest = new SetTopicAttributesRequest();
    setTopicAttributesRequest
        .withTopicArn(emailRequestActivityTopic.getTopicArn())
        .withAttributeName("Policy")
        .withAttributeValue(policy.toJson());
    snsClient.setTopicAttributes(setTopicAttributesRequest);

    CreateQueueResult emailRequestActivityQueueResult =
        sqsClient.createQueue(config.notificationEmailRequestActivityCreateQueue);
    SubscribeRequest subscribeRequest = new SubscribeRequest();
    subscribeRequest
        .withTopicArn(emailRequestActivityTopic.getTopicArn())
        .withEndpoint(emailRequestActivityQueueResult.getQueueUrl())
        .withProtocol("sqs");
    snsClient.subscribe(subscribeRequest);

    SNSDestination snsDestination = new SNSDestination();
    snsDestination.withTopicARN(emailRequestActivityTopic.getTopicArn());
    EventDestination eventDestination = new EventDestination();
    eventDestination
        .withSNSDestination(snsDestination)
        .withName("EmailRequestActivityTopicITTest")
        .withEnabled(true)
        .withMatchingEventTypes(
            "Send",
            "Reject",
            "Bounce",
            "Complaint",
            "Delivery",
            "Open",
            "Click",
            "RenderingFailure");
    CreateConfigurationSetEventDestinationRequest configurationSetEventDestinationRequest =
        new CreateConfigurationSetEventDestinationRequest();
    configurationSetEventDestinationRequest
        .withConfigurationSetName(configurationSet.getName())
        .withEventDestination(eventDestination);
    sesClient.createConfigurationSetEventDestination(configurationSetEventDestinationRequest);

    NotificationRequest notificationRequest = createNotificationRequest();
    EmailNotification emailNotification =
        EmailNotification.builder()
            .notificationRequest(notificationRequest)
            .toAddress("emailExample@domain.com")
            .title("title")
            .message("message")
            .actionText("actionText")
            .actionUrl("actionUrl")
            .status(EmailNotificationStatus.PENDING)
            .id(UUID.randomUUID())
            .build();
    EmailNotification savedEmailNotification = emailNotificationRepository.save(emailNotification);
    EmailNotificationMessage emailNotificationMessage = new EmailNotificationMessage();
    emailNotificationMessage.setId(savedEmailNotification.getId().toString());
    handler.handleEmailNotificationCreate(emailNotificationMessage);

    EmailNotification finalEmailNotification =
        emailNotificationRepository.getById(savedEmailNotification.getId());
    assertNotNull(finalEmailNotification.getMessageId());

    ReceiveMessageResult receiveMessageResult =
        sqsClient.receiveMessage(emailRequestActivityQueueResult.getQueueUrl());
    assertNotNull(receiveMessageResult);
    List<Message> messageList = receiveMessageResult.getMessages();
    assertNotNull(messageList);
    assertFalse(messageList.isEmpty());
    assertEquals(1, messageList.size());
    Message message = messageList.get(0);
    Map<String, String> payload = objectMapper.readValue(message.getBody(), Map.class);
    String messageStr = payload.get("Message");
    System.out.println("MS STR: " + messageStr);
  }

  private NotificationRequest createNotificationRequest() {
    return createNotificationRequest(UUID.randomUUID());
  }

  private NotificationRequest createNotificationRequest(UUID id) {
    UUID templateId = createTemplate();
    NotificationRequest request =
        NotificationRequest.builder()
            .id(id)
            .templateId(templateId)
            .languageTag("language-tag")
            .relatedUserId("recipientId")
            .createdAt(null)
            .build();

    return notificationRepository.saveAndFlush(request);
  }

  private UUID createTemplate() {
    return createTemplate(UUID.randomUUID());
  }

  private UUID createTemplate(UUID id) {
    Template template =
        Template.builder()
            .id(id)
            .translationGroupId("translationGroupId-NotificationRepository")
            .titleKey("title-NotificationRepository")
            .messageKey("message-NotificationRepository")
            .application("application-NotificationRepository")
            .owner("ownerIntegrationTest-NotificationRepository")
            .recipient(TemplateRecipient.SABBY_USER)
            .status(TemplateStatus.DRAFT)
            .urgent(false)
            .library(false)
            .build();
    templateRepository.saveAndFlush(template);
    return id;
  }
}

EmailNotificationRequestHandler.java

@Component
@RequiredArgsConstructor
@Slf4j
@Validated
public class EmailNotificationRequestHandler {
  private final EmailNotificationRepository emailNotificationRepository;
  private final TemplateRepository templateRepository;
  private final AmazonSimpleEmailService sesClient;
  private final ApplicationConfig applicationConfig;

  @Transactional
  public void handleEmailNotificationCreate(@Valid @NotNull EmailNotificationMessage message) {
    String id = message.getId();
    log.debug("sending email notification for {}", id);
    Optional<EmailNotification> optionalEmailNotification =
        emailNotificationRepository.findById(UUID.fromString(id));
    if (optionalEmailNotification.isEmpty()) {
      String errorMsg = String.format("Email Notification %s does not exist, unable to send.", id);
      log.warn(errorMsg);
      throw new IllegalArgumentException(errorMsg);
    }
    EmailNotification emailNotification = optionalEmailNotification.get();
    Optional<Template> optionalTemplate =
        templateRepository.findById(emailNotification.getNotificationRequest().getTemplateId());
    if (optionalTemplate.isEmpty()) {
      String errorMsg =
          String.format(
              "Template with id %s, does not exist for Email Notification %s",
              emailNotification.getNotificationRequest().getTemplateId(), id);
      log.warn(errorMsg);
      throw new IllegalArgumentException(errorMsg);
    }
    Template template = optionalTemplate.get();
    log.debug("Found Template: {} for EmailNotification: {}", template, emailNotification);
    SendEmailRequest emailRequest = buildSendEmailRequest(emailNotification, template);
    log.debug("Built Send Email Request: {}", emailRequest);
    SendEmailResult result = sesClient.sendEmail(emailRequest);
    log.debug("Sent Email Result: {}", result.toString());
    EmailNotification updateEmailNotification =
        emailNotification.toBuilder().messageId(result.getMessageId()).build();
    EmailNotification savedUpdateEmailNotification =
        emailNotificationRepository.save(updateEmailNotification);
    log.debug("Saved Updated Email Notification: {}", savedUpdateEmailNotification);
  }

  @VisibleForTesting
  SendEmailRequest buildSendEmailRequest(
      @NonNull EmailNotification emailNotification, @NonNull Template template) {
    return new SendEmailRequest()
        .withDestination(new Destination().withToAddresses(emailNotification.getToAddress()))
        .withMessage(
            new Message(
                new Content(emailNotification.getTitle()),
                new Body(new Content(emailNotification.getMessage()))))
        .withSource(applicationConfig.emailSourceAddress)
        .withConfigurationSetName(applicationConfig.sesConfigSet)
        .withTags(getMessageTags(template.getApplication() + "-" + template.getTitleKey()));
  }

  @VisibleForTesting
  MessageTag getMessageTags(@NonNull String name) {
    MessageTag tags = new MessageTag().withName(applicationConfig.cloudWatchMetric).withValue(name);
    return tags;
  }
}

java

spring-boot

amazon-ses

testcontainers

localstack

0 Answers

Your Answer

Accepted video resources