diff --git a/x-pack/solutions/observability/packages/kbn-streams-schema/src/helpers/index.ts b/x-pack/solutions/observability/packages/kbn-streams-schema/src/helpers/index.ts
index d801ecb4f95a5..f8afc7a941b9f 100644
--- a/x-pack/solutions/observability/packages/kbn-streams-schema/src/helpers/index.ts
+++ b/x-pack/solutions/observability/packages/kbn-streams-schema/src/helpers/index.ts
@@ -7,5 +7,6 @@
export * from './type_guards';
export * from './hierarchy';
+export * from './lifecycle';
export * from './condition_fields';
export * from './condition_to_query_dsl';
diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/lifecycle.test.ts b/x-pack/solutions/observability/packages/kbn-streams-schema/src/helpers/lifecycle.test.ts
similarity index 98%
rename from x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/lifecycle.test.ts
rename to x-pack/solutions/observability/packages/kbn-streams-schema/src/helpers/lifecycle.test.ts
index 1d787c11ca710..d27aa17a2f9ab 100644
--- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/lifecycle.test.ts
+++ b/x-pack/solutions/observability/packages/kbn-streams-schema/src/helpers/lifecycle.test.ts
@@ -5,7 +5,7 @@
* 2.0.
*/
-import { WiredStreamDefinition } from '@kbn/streams-schema';
+import { WiredStreamDefinition } from '../models/ingest/base';
import { findInheritedLifecycle, findInheritingStreams } from './lifecycle';
describe('Lifecycle helpers', () => {
diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/lifecycle.ts b/x-pack/solutions/observability/packages/kbn-streams-schema/src/helpers/lifecycle.ts
similarity index 77%
rename from x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/lifecycle.ts
rename to x-pack/solutions/observability/packages/kbn-streams-schema/src/helpers/lifecycle.ts
index c28fa9175cf8d..d3047201204a0 100644
--- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/lifecycle.ts
+++ b/x-pack/solutions/observability/packages/kbn-streams-schema/src/helpers/lifecycle.ts
@@ -5,25 +5,20 @@
* 2.0.
*/
+import { WiredStreamDefinition } from '../models/ingest/base';
import {
- WiredIngestStreamEffectiveLifecycle,
- WiredStreamDefinition,
- getSegments,
- isChildOf,
- isDescendantOf,
isInheritLifecycle,
-} from '@kbn/streams-schema';
-import { orderBy } from 'lodash';
+ WiredIngestStreamEffectiveLifecycle,
+} from '../models/ingest/lifecycle';
+import { isDescendantOf, isChildOf, getSegments } from './hierarchy';
export function findInheritedLifecycle(
definition: WiredStreamDefinition,
ancestors: WiredStreamDefinition[]
): WiredIngestStreamEffectiveLifecycle {
- const originDefinition = orderBy(
- [...ancestors, definition],
- (parent) => getSegments(parent.name).length,
- 'asc'
- ).findLast(({ ingest }) => !isInheritLifecycle(ingest.lifecycle));
+ const originDefinition = [...ancestors, definition]
+ .sort((a, b) => getSegments(a.name).length - getSegments(b.name).length)
+ .findLast(({ ingest }) => !isInheritLifecycle(ingest.lifecycle));
if (!originDefinition) {
throw new Error('Unable to find inherited lifecycle');
diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/client.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/client.ts
index fb8e644d4a8f9..12cf56ca60e1a 100644
--- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/client.ts
+++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/client.ts
@@ -35,6 +35,8 @@ import {
isUnwiredStreamDefinition,
isWiredStreamDefinition,
streamDefinitionSchema,
+ findInheritedLifecycle,
+ findInheritingStreams,
} from '@kbn/streams-schema';
import { cloneDeep, keyBy, omit, orderBy } from 'lodash';
import { AssetClient } from './assets/asset_client';
@@ -63,7 +65,6 @@ import { updateDataStreamsLifecycle } from './data_streams/manage_data_streams';
import { DefinitionNotFoundError } from './errors/definition_not_found_error';
import { MalformedStreamIdError } from './errors/malformed_stream_id_error';
import { SecurityError } from './errors/security_error';
-import { findInheritedLifecycle, findInheritingStreams } from './helpers/lifecycle';
import { NameTakenError } from './errors/name_taken_error';
import { MalformedStreamError } from './errors/malformed_stream_error';
diff --git a/x-pack/solutions/observability/plugins/streams/server/routes/streams/crud/read_stream.ts b/x-pack/solutions/observability/plugins/streams/server/routes/streams/crud/read_stream.ts
index b750252b49a44..493d17386c660 100644
--- a/x-pack/solutions/observability/plugins/streams/server/routes/streams/crud/read_stream.ts
+++ b/x-pack/solutions/observability/plugins/streams/server/routes/streams/crud/read_stream.ts
@@ -9,6 +9,7 @@ import {
InheritedFieldDefinition,
StreamGetResponse,
WiredStreamGetResponse,
+ findInheritedLifecycle,
isGroupStreamDefinition,
isUnwiredStreamDefinition,
} from '@kbn/streams-schema';
@@ -19,7 +20,6 @@ import {
getDataStreamLifecycle,
getUnmanagedElasticsearchAssets,
} from '../../../lib/streams/stream_crud';
-import { findInheritedLifecycle } from '../../../lib/streams/helpers/lifecycle';
export async function readStream({
name,
diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_lifecycle/modal.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_lifecycle/modal.tsx
index d273b3067901c..f8866f75d31a2 100644
--- a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_lifecycle/modal.tsx
+++ b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_lifecycle/modal.tsx
@@ -5,7 +5,7 @@
* 2.0.
*/
-import React, { useEffect, useState } from 'react';
+import React, { useEffect, useMemo, useState } from 'react';
import {
IlmLocatorParams,
Phases,
@@ -16,8 +16,16 @@ import {
IngestStreamGetResponse,
IngestStreamLifecycle,
StreamGetResponse,
+ UnwiredStreamGetResponse,
+ WiredStreamGetResponse,
+ getAncestors,
isIlmLifecycle,
+ isUnwiredStreamGetResponse,
+ isWiredStreamDefinition,
isWiredStreamGetResponse,
+ findInheritedLifecycle,
+ findInheritingStreams,
+ isDslLifecycle,
} from '@kbn/streams-schema';
import {
EuiButton,
@@ -25,11 +33,12 @@ import {
EuiCallOut,
EuiContextMenuItem,
EuiContextMenuPanel,
- EuiFieldNumber,
+ EuiFieldText,
EuiFlexGroup,
EuiFlexItem,
EuiHighlight,
EuiLink,
+ EuiLoadingSpinner,
EuiModal,
EuiModalBody,
EuiModalFooter,
@@ -46,6 +55,8 @@ import {
import { i18n } from '@kbn/i18n';
import { useBoolean } from '@kbn/react-hooks';
import useToggle from 'react-use/lib/useToggle';
+import { useStreamsAppRouter } from '../../hooks/use_streams_app_router';
+import { useWiredStreams } from '../../hooks/use_wired_streams';
export type LifecycleEditAction = 'none' | 'dsl' | 'ilm' | 'inherit';
@@ -77,6 +88,16 @@ export function EditLifecycleModal({
return ;
}
+const isInvalidRetention = (value: string) => {
+ const num = Number(value);
+ return isNaN(num) || num < 1 || num % 1 > 0;
+};
+
+const parseRetentionDuration = (value: string = '') => {
+ const result = /(\d+)([d|m|s|h])/.exec(value);
+ return { value: result?.[1], unit: result?.[2] };
+};
+
function DslModal({ closeModal, definition, updateInProgress, updateLifecycle }: ModalOptions) {
const timeUnits = [
{ name: 'Days', value: 'd' },
@@ -85,10 +106,24 @@ function DslModal({ closeModal, definition, updateInProgress, updateLifecycle }:
{ name: 'Seconds', value: 's' },
];
- const [selectedUnit, setSelectedUnit] = useState(timeUnits[0]);
- const [retentionValue, setRetentionValue] = useState(1);
- const [noRetention, toggleNoRetention] = useToggle(false);
+ const existingRetention = isDslLifecycle(definition.stream.ingest.lifecycle)
+ ? parseRetentionDuration(definition.stream.ingest.lifecycle.dsl.data_retention)
+ : undefined;
+ const [selectedUnit, setSelectedUnit] = useState(
+ (existingRetention && timeUnits.find((unit) => unit.value === existingRetention.unit)) ||
+ timeUnits[0]
+ );
+ const [retentionValue, setRetentionValue] = useState(
+ (existingRetention && existingRetention.value) || '1'
+ );
+ const [noRetention, toggleNoRetention] = useToggle(
+ Boolean(existingRetention && !existingRetention.value)
+ );
const [showUnitMenu, { on: openUnitMenu, off: closeUnitMenu }] = useBoolean(false);
+ const invalidRetention = useMemo(
+ () => isInvalidRetention(retentionValue) && !noRetention,
+ [retentionValue, noRetention]
+ );
return (
@@ -105,20 +140,13 @@ function DslModal({ closeModal, definition, updateInProgress, updateLifecycle }:
defaultMessage: 'Specify a custom data retention period for this stream.',
})}
- {
- const valueAsNumber = e.target.valueAsNumber;
- if (isNaN(valueAsNumber) || valueAsNumber < 1) {
- setRetentionValue(1);
- } else {
- setRetentionValue(valueAsNumber);
- }
- }}
- min={1}
+ onChange={(e) => setRetentionValue(e.target.value)}
disabled={noRetention}
fullWidth
+ isInvalid={invalidRetention}
append={
}
/>
+ {invalidRetention ? (
+ <>
+
+
+ {i18n.translate('xpack.streams.streamDetailLifecycle.invalidRetentionValue', {
+ defaultMessage: 'A positive integer is required',
+ })}
+
+ >
+ ) : null}
{
updateLifecycle({
dsl: {
- data_retention: noRetention ? undefined : `${retentionValue}${selectedUnit.value}`,
+ data_retention: noRetention
+ ? undefined
+ : `${Number(retentionValue)}${selectedUnit.value}`,
},
});
}}
@@ -341,7 +382,31 @@ function IlmModal({
);
}
-function InheritModal({ definition, closeModal, updateInProgress, updateLifecycle }: ModalOptions) {
+function InheritModal({ definition, ...options }: ModalOptions) {
+ if (isWiredStreamGetResponse(definition)) {
+ return ;
+ } else if (isUnwiredStreamGetResponse(definition)) {
+ return ;
+ }
+}
+
+function InheritModalWired({
+ definition,
+ closeModal,
+ updateInProgress,
+ updateLifecycle,
+}: ModalOptions & { definition: WiredStreamGetResponse }) {
+ const { wiredStreams, isLoading: wiredStreamsLoading } = useWiredStreams();
+
+ const parents = useMemo(() => {
+ if (wiredStreamsLoading || !wiredStreams) {
+ return undefined;
+ }
+
+ const ancestors = getAncestors(definition.stream.name);
+ return wiredStreams.filter((stream) => ancestors.includes(stream.name));
+ }, [definition, wiredStreams, wiredStreamsLoading]);
+
return (
@@ -353,16 +418,67 @@ function InheritModal({ definition, closeModal, updateInProgress, updateLifecycl
- {isWiredStreamGetResponse(definition)
- ? i18n.translate('xpack.streams.streamDetailLifecycle.defaultLifecycleWiredDesc', {
- defaultMessage:
- 'All custom retention settings for this stream will be removed, resetting it to inherit data retention from its nearest parent.',
- })
- : i18n.translate('xpack.streams.streamDetailLifecycle.defaultLifecycleUnwiredDesc', {
- defaultMessage:
- 'All custom retention settings for this stream will be removed, resetting it to use the configuration of the data stream.',
- })}
-
+ {i18n.translate('xpack.streams.streamDetailLifecycle.defaultLifecycleWiredDesc', {
+ defaultMessage:
+ 'All custom retention settings for this stream will be removed, resetting it to inherit data retention from',
+ })}{' '}
+ {wiredStreamsLoading || !parents ? (
+
+ ) : (
+ <>
+
+ .
+ >
+ )}
+
+
+ updateLifecycle({ inherit: {} })}
+ updateInProgress={updateInProgress}
+ />
+
+ );
+}
+
+function InheritModalUnwired({
+ definition,
+ closeModal,
+ updateInProgress,
+ updateLifecycle,
+}: ModalOptions & { definition: UnwiredStreamGetResponse }) {
+ return (
+
+
+
+ {i18n.translate('xpack.streams.streamDetailLifecycle.defaultLifecycleTitle', {
+ defaultMessage: 'Set data retention to default',
+ })}
+
+
+
+
+ {i18n.translate('xpack.streams.streamDetailLifecycle.defaultLifecycleUnwiredDesc', {
+ defaultMessage:
+ 'All custom retention settings for this stream will be removed, resetting it to use the configuration of the data stream.',
+ })}
void;
closeModal: () => void;
}) {
+ const { wiredStreams, isLoading: wiredStreamsLoading } = useWiredStreams();
+ const inheritingStreams = useMemo(() => {
+ if (!isWiredStreamGetResponse(definition) || wiredStreamsLoading || !wiredStreams) {
+ return [];
+ }
+ return findInheritingStreams(
+ definition.stream,
+ wiredStreams.filter(isWiredStreamDefinition)
+ ).filter((name) => name !== definition.stream.name);
+ }, [definition, wiredStreams, wiredStreamsLoading]);
+
return (
@@ -413,6 +540,27 @@ function ModalFooter({
'Data retention changes will apply to dependant streams unless they already have custom retention settings in place.',
}
)}
+
+
+
+ {wiredStreamsLoading ? (
+
+ ) : inheritingStreams.length > 0 ? (
+ <>
+ {i18n.translate('xpack.streams.streamDetailLifecycle.inheritingChildStreams', {
+ defaultMessage: 'The following child streams will be updated:',
+ })}{' '}
+ {inheritingStreams.map((name) => (
+ <>
+ {' '}
+ {' '}
+ >
+ ))}
+ .
+ >
+ ) : (
+ 'No child streams will be updated.'
+ )}
@@ -450,3 +598,22 @@ function ModalFooter({
);
}
+
+function LinkToStream({ name }: { name: string }) {
+ const router = useStreamsAppRouter();
+
+ return (
+
+ [{name}]
+
+ );
+}
diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_overview/index.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_overview/index.tsx
index 91feba9f04e11..eb19d094d86ec 100644
--- a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_overview/index.tsx
+++ b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_overview/index.tsx
@@ -21,6 +21,7 @@ import React, { useMemo } from 'react';
import { css } from '@emotion/css';
import {
IngestStreamGetResponse,
+ isDescendantOf,
isUnwiredStreamGetResponse,
isWiredStreamDefinition,
} from '@kbn/streams-schema';
@@ -36,6 +37,7 @@ import { useStreamsAppRouter } from '../../hooks/use_streams_app_router';
import { useDashboardsFetch } from '../../hooks/use_dashboards_fetch';
import { DashboardsTable } from '../stream_detail_dashboards_view/dashboard_table';
import { AssetImage } from '../asset_image';
+import { useWiredStreams } from '../../hooks/use_wired_streams';
const formatNumber = (val: number) => {
return Number(val).toLocaleString('en', {
@@ -288,34 +290,18 @@ function QuickLinks({ definition }: { definition?: IngestStreamGetResponse }) {
}
function ChildStreamList({ definition }: { definition?: IngestStreamGetResponse }) {
- const {
- dependencies: {
- start: {
- streams: { streamsRepositoryClient },
- },
- },
- } = useKibana();
const router = useStreamsAppRouter();
- const streamsListFetch = useStreamsAppFetch(
- ({ signal }) => {
- return streamsRepositoryClient.fetch('GET /api/streams', {
- signal,
- });
- },
- [streamsRepositoryClient]
- );
+ const { wiredStreams } = useWiredStreams();
const childrenStreams = useMemo(() => {
if (!definition) {
return [];
}
- return streamsListFetch.value?.streams.filter(
- (d) => isWiredStreamDefinition(d) && d.name.startsWith(definition.stream.name)
- );
- }, [definition, streamsListFetch.value?.streams]);
+ return wiredStreams?.filter((d) => isDescendantOf(definition.stream.name, d.name));
+ }, [definition, wiredStreams]);
- if (definition && childrenStreams?.length === 1) {
+ if (definition && childrenStreams?.length === 0) {
return (
diff --git a/x-pack/solutions/observability/plugins/streams_app/public/hooks/use_wired_streams.ts b/x-pack/solutions/observability/plugins/streams_app/public/hooks/use_wired_streams.ts
new file mode 100644
index 0000000000000..4ed4e02d86321
--- /dev/null
+++ b/x-pack/solutions/observability/plugins/streams_app/public/hooks/use_wired_streams.ts
@@ -0,0 +1,30 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+import { isWiredStreamDefinition } from '@kbn/streams-schema';
+import { useKibana } from './use_kibana';
+import { useStreamsAppFetch } from './use_streams_app_fetch';
+
+export const useWiredStreams = () => {
+ const {
+ dependencies: {
+ start: {
+ streams: { streamsRepositoryClient },
+ },
+ },
+ } = useKibana();
+
+ const result = useStreamsAppFetch(
+ async ({ signal }) => streamsRepositoryClient.fetch('GET /api/streams', { signal }),
+ [streamsRepositoryClient]
+ );
+
+ return {
+ wiredStreams: result.value?.streams.filter(isWiredStreamDefinition),
+ isLoading: result.loading,
+ };
+};