Skip to content

Commit

Permalink
Start dual write to notification and notification_settings (#6683)
Browse files Browse the repository at this point in the history
Co-authored-by: josephkmh <[email protected]>
  • Loading branch information
xiaohansong and josephkmh committed Jun 7, 2023
1 parent d315b4b commit 949ceb9
Show file tree
Hide file tree
Showing 9 changed files with 265 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.server.converters;

import io.airbyte.commons.enums.Enums;
import java.util.stream.Collectors;

/**
* Convert between API and internal versions of notification models.
*/
@SuppressWarnings({"MissingJavadocMethod", "LineLength"})
public class NotificationSettingsConverter {

public static io.airbyte.config.NotificationSettings toConfig(final io.airbyte.api.model.generated.NotificationSettings notification) {
io.airbyte.config.NotificationSettings configNotificationSettings = new io.airbyte.config.NotificationSettings();
if (notification == null) {
return configNotificationSettings;
}

if (notification.getSendOnSuccess() != null) {
configNotificationSettings.setSendOnSuccess(toConfig(notification.getSendOnSuccess()));
}
if (notification.getSendOnFailure() != null) {
configNotificationSettings.setSendOnFailure(toConfig(notification.getSendOnFailure()));
}
if (notification.getSendOnConnectionUpdate() != null) {
configNotificationSettings.setSendOnConnectionUpdate(toConfig(notification.getSendOnConnectionUpdate()));
}
if (notification.getSendOnSyncDisabled() != null) {
configNotificationSettings.setSendOnSyncDisabled(toConfig(notification.getSendOnSyncDisabled()));
}
if (notification.getSendOnSyncDisabledWarning() != null) {
configNotificationSettings.setSendOnSyncDisabledWarning(toConfig(notification.getSendOnSyncDisabledWarning()));
}
if (notification.getSendOnConnectionUpdateActionRequired() != null) {
configNotificationSettings.setSendOnConnectionUpdateActionRequired(toConfig(notification.getSendOnConnectionUpdateActionRequired()));
}

return configNotificationSettings;
}

// Currently customerIoConfiguration is an empty object, so we tend to keep it as null.
private static io.airbyte.config.NotificationItem toConfig(final io.airbyte.api.model.generated.NotificationItem notificationItem) {
return new io.airbyte.config.NotificationItem()
.withNotificationType(notificationItem.getNotificationType().stream()
.map(notificationType -> Enums.convertTo(notificationType, io.airbyte.config.Notification.NotificationType.class)).collect(
Collectors.toList()))
.withSlackConfiguration(toConfig(notificationItem.getSlackConfiguration()));
}

private static io.airbyte.config.SlackNotificationConfiguration toConfig(final io.airbyte.api.model.generated.SlackNotificationConfiguration notification) {
if (notification == null) {
return new io.airbyte.config.SlackNotificationConfiguration();
}
return new io.airbyte.config.SlackNotificationConfiguration()
.withWebhook(notification.getWebhook());
}

public static io.airbyte.api.model.generated.NotificationSettings toApi(final io.airbyte.config.NotificationSettings notificationSettings) {
io.airbyte.api.model.generated.NotificationSettings apiNotificationSetings = new io.airbyte.api.model.generated.NotificationSettings();
if (notificationSettings == null) {
return apiNotificationSetings;
}

if (notificationSettings.getSendOnSuccess() != null) {
apiNotificationSetings.setSendOnSuccess(toApi(notificationSettings.getSendOnSuccess()));
}
if (notificationSettings.getSendOnFailure() != null) {
apiNotificationSetings.setSendOnFailure(toApi(notificationSettings.getSendOnFailure()));
}
if (notificationSettings.getSendOnConnectionUpdate() != null) {
apiNotificationSetings.setSendOnConnectionUpdate(toApi(notificationSettings.getSendOnConnectionUpdate()));
}
if (notificationSettings.getSendOnSyncDisabled() != null) {
apiNotificationSetings.setSendOnSyncDisabled(toApi(notificationSettings.getSendOnSyncDisabled()));
}
if (notificationSettings.getSendOnSyncDisabledWarning() != null) {
apiNotificationSetings.setSendOnSyncDisabledWarning(toApi(notificationSettings.getSendOnSyncDisabledWarning()));
}
if (notificationSettings.getSendOnConnectionUpdateActionRequired() != null) {
apiNotificationSetings.setSendOnConnectionUpdateActionRequired(toApi(notificationSettings.getSendOnConnectionUpdateActionRequired()));
}
return apiNotificationSetings;
}

private static io.airbyte.api.model.generated.NotificationItem toApi(final io.airbyte.config.NotificationItem notificationItem) {
return new io.airbyte.api.model.generated.NotificationItem()
.notificationType(notificationItem.getNotificationType().stream()
.map(notificationType -> Enums.convertTo(notificationType, io.airbyte.api.model.generated.NotificationType.class)).collect(
Collectors.toList()))
.slackConfiguration(toApi(notificationItem.getSlackConfiguration()));
}

private static io.airbyte.api.model.generated.SlackNotificationConfiguration toApi(final io.airbyte.config.SlackNotificationConfiguration notification) {
if (notification == null) {
return new io.airbyte.api.model.generated.SlackNotificationConfiguration();
}
return new io.airbyte.api.model.generated.SlackNotificationConfiguration()
.webhook(notification.getWebhook());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.server.converters.ApiPojoConverters;
import io.airbyte.commons.server.converters.NotificationConverter;
import io.airbyte.commons.server.converters.NotificationSettingsConverter;
import io.airbyte.commons.server.converters.WorkspaceWebhookConfigsConverter;
import io.airbyte.commons.server.errors.InternalServerKnownException;
import io.airbyte.commons.server.errors.ValueConflictKnownException;
Expand Down Expand Up @@ -114,6 +115,7 @@ public WorkspaceRead createWorkspace(final WorkspaceCreate workspaceCreate)
.withDisplaySetupWizard(displaySetupWizard != null ? displaySetupWizard : false)
.withTombstone(false)
.withNotifications(NotificationConverter.toConfigList(workspaceCreate.getNotifications()))
.withNotificationSettings(NotificationSettingsConverter.toConfig(workspaceCreate.getNotificationSettings()))
.withDefaultGeography(defaultGeography)
.withWebhookOperationConfigs(WorkspaceWebhookConfigsConverter.toPersistenceWrite(workspaceCreate.getWebhookConfigs(), uuidSupplier));

Expand Down Expand Up @@ -284,6 +286,7 @@ private static WorkspaceRead buildWorkspaceRead(final StandardWorkspace workspac
.news(workspace.getNews())
.securityUpdates(workspace.getSecurityUpdates())
.notifications(NotificationConverter.toApiList(workspace.getNotifications()))
.notificationSettings(NotificationSettingsConverter.toApi(workspace.getNotificationSettings()))
.defaultGeography(Enums.convertTo(workspace.getDefaultGeography(), Geography.class));
// Add read-only webhook configs.
if (workspace.getWebhookOperationConfigs() != null) {
Expand Down Expand Up @@ -318,6 +321,9 @@ private void applyPatchToStandardWorkspace(final StandardWorkspace workspace, fi
if (workspacePatch.getNotifications() != null) {
workspace.setNotifications(NotificationConverter.toConfigList(workspacePatch.getNotifications()));
}
if (workspacePatch.getNotificationSettings() != null) {
workspace.setNotificationSettings(NotificationSettingsConverter.toConfig(workspacePatch.getNotificationSettings()));
}
if (workspacePatch.getDefaultGeography() != null) {
workspace.setDefaultGeography(ApiPojoConverters.toPersistenceGeography(workspacePatch.getDefaultGeography()));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.server.converters;

import static org.junit.jupiter.api.Assertions.assertEquals;

import io.airbyte.api.model.generated.NotificationItem;
import io.airbyte.api.model.generated.NotificationType;
import io.airbyte.api.model.generated.SlackNotificationConfiguration;
import java.util.List;
import org.junit.jupiter.api.Test;

class NotificationSettingsConverterTest {

private static final io.airbyte.api.model.generated.NotificationSettings API_NOTIFICATION_SETTINGS =
new io.airbyte.api.model.generated.NotificationSettings().sendOnFailure(
new NotificationItem()
.addNotificationTypeItem(NotificationType.CUSTOMERIO)
.addNotificationTypeItem(NotificationType.SLACK)
.slackConfiguration(new SlackNotificationConfiguration().webhook("webhook")))
.sendOnConnectionUpdate(new NotificationItem()
.addNotificationTypeItem(NotificationType.SLACK)
.slackConfiguration(new SlackNotificationConfiguration().webhook("webhook2")));

private static final io.airbyte.config.NotificationSettings PROTOCOL_NOTIFICATION_SETTINGS =
new io.airbyte.config.NotificationSettings().withSendOnFailure(
new io.airbyte.config.NotificationItem()
.withNotificationType(
List.of(io.airbyte.config.Notification.NotificationType.CUSTOMERIO, io.airbyte.config.Notification.NotificationType.SLACK))
.withSlackConfiguration(new io.airbyte.config.SlackNotificationConfiguration().withWebhook("webhook")))
.withSendOnConnectionUpdate(
new io.airbyte.config.NotificationItem().withNotificationType(List.of(io.airbyte.config.Notification.NotificationType.SLACK))
.withSlackConfiguration(new io.airbyte.config.SlackNotificationConfiguration().withWebhook("webhook2")));

private static final io.airbyte.api.model.generated.NotificationSettings EMPTY_API_NOTIFICATION_SETTINGS =
new io.airbyte.api.model.generated.NotificationSettings()
.sendOnSuccess(new NotificationItem().notificationType(List.of()).slackConfiguration(new SlackNotificationConfiguration()))
.sendOnFailure(new NotificationItem().notificationType(List.of()).slackConfiguration(new SlackNotificationConfiguration()))
.sendOnConnectionUpdate(new NotificationItem().notificationType(List.of()).slackConfiguration(new SlackNotificationConfiguration()))
.sendOnSyncDisabled(new NotificationItem().notificationType(List.of()).slackConfiguration(new SlackNotificationConfiguration()))
.sendOnSyncDisabledWarning(new NotificationItem().notificationType(List.of()).slackConfiguration(new SlackNotificationConfiguration()))
.sendOnConnectionUpdateActionRequired(
new NotificationItem().notificationType(List.of()).slackConfiguration(new SlackNotificationConfiguration()));

private static final io.airbyte.config.NotificationSettings EMPTY_CONFIG_NOTIFICATION_SETTINGS =
new io.airbyte.config.NotificationSettings()
.withSendOnSuccess(new io.airbyte.config.NotificationItem().withNotificationType(List.of())
.withSlackConfiguration(new io.airbyte.config.SlackNotificationConfiguration()))
.withSendOnFailure(new io.airbyte.config.NotificationItem().withNotificationType(List.of())
.withSlackConfiguration(new io.airbyte.config.SlackNotificationConfiguration()))
.withSendOnConnectionUpdate(new io.airbyte.config.NotificationItem().withNotificationType(List.of())
.withSlackConfiguration(new io.airbyte.config.SlackNotificationConfiguration()))
.withSendOnSyncDisabled(new io.airbyte.config.NotificationItem().withNotificationType(List.of())
.withSlackConfiguration(new io.airbyte.config.SlackNotificationConfiguration()))
.withSendOnSyncDisabledWarning(new io.airbyte.config.NotificationItem().withNotificationType(List.of())
.withSlackConfiguration(new io.airbyte.config.SlackNotificationConfiguration()))
.withSendOnConnectionUpdateActionRequired(new io.airbyte.config.NotificationItem().withNotificationType(List.of())
.withSlackConfiguration(new io.airbyte.config.SlackNotificationConfiguration()));

@Test
void testConvertToPrototype() {
assertEquals(NotificationSettingsConverter.toConfig(API_NOTIFICATION_SETTINGS), PROTOCOL_NOTIFICATION_SETTINGS);
assertEquals(NotificationSettingsConverter.toConfig(EMPTY_API_NOTIFICATION_SETTINGS), EMPTY_CONFIG_NOTIFICATION_SETTINGS);
}

@Test
void testConvertToApi() {
assertEquals(NotificationSettingsConverter.toApi(PROTOCOL_NOTIFICATION_SETTINGS), API_NOTIFICATION_SETTINGS);
assertEquals(NotificationSettingsConverter.toApi(EMPTY_CONFIG_NOTIFICATION_SETTINGS), EMPTY_API_NOTIFICATION_SETTINGS);
}

}
Loading

0 comments on commit 949ceb9

Please sign in to comment.