1
1
import { capitalize } from 'lodash' ;
2
- import { map , Observable , defer , mergeMap } from 'rxjs' ;
2
+ import { map , Observable , takeWhile } from 'rxjs' ;
3
3
import { v4 as uuidv4 } from 'uuid' ;
4
4
5
5
import {
@@ -20,7 +20,7 @@ import { SearchStreamingState } from './dataquery.gen';
20
20
import { DEFAULT_SPSS , TempoDatasource } from './datasource' ;
21
21
import { formatTraceQLResponse } from './resultTransformer' ;
22
22
import { SearchMetrics , TempoJsonData , TempoQuery } from './types' ;
23
- export async function getLiveStreamKey ( ) : Promise < string > {
23
+ function getLiveStreamKey ( ) : string {
24
24
return uuidv4 ( ) ;
25
25
}
26
26
@@ -34,59 +34,66 @@ export function doTempoChannelStream(
34
34
35
35
let frames : DataFrame [ ] | undefined = undefined ;
36
36
let state : LoadingState = LoadingState . NotStarted ;
37
+ const requestTime = performance . now ( ) ;
37
38
38
- return defer ( ( ) => getLiveStreamKey ( ) ) . pipe (
39
- mergeMap ( ( key ) => {
40
- const requestTime = performance . now ( ) ;
41
- return getGrafanaLiveSrv ( )
42
- . getStream < MutableDataFrame > ( {
43
- scope : LiveChannelScope . DataSource ,
44
- namespace : ds . uid ,
45
- path : `search/${ key } ` ,
46
- data : {
47
- ...query ,
48
- SpansPerSpanSet : query . spss ?? DEFAULT_SPSS ,
49
- timeRange : {
50
- from : range . from . toISOString ( ) ,
51
- to : range . to . toISOString ( ) ,
52
- } ,
53
- } ,
54
- } )
55
- . pipe (
56
- map ( ( evt ) => {
57
- if ( 'message' in evt && evt ?. message ) {
58
- const currentTime = performance . now ( ) ;
59
- const elapsedTime = currentTime - requestTime ;
60
- // Schema should be [traces, metrics, state, error]
61
- const traces = evt . message . data . values [ 0 ] [ 0 ] ;
62
- const metrics = evt . message . data . values [ 1 ] [ 0 ] ;
63
- const frameState : SearchStreamingState = evt . message . data . values [ 2 ] [ 0 ] ;
64
- const error = evt . message . data . values [ 3 ] [ 0 ] ;
39
+ return getGrafanaLiveSrv ( )
40
+ . getStream < MutableDataFrame > ( {
41
+ scope : LiveChannelScope . DataSource ,
42
+ namespace : ds . uid ,
43
+ path : `search/${ getLiveStreamKey ( ) } ` ,
44
+ data : {
45
+ ...query ,
46
+ SpansPerSpanSet : query . spss ?? DEFAULT_SPSS ,
47
+ timeRange : {
48
+ from : range . from . toISOString ( ) ,
49
+ to : range . to . toISOString ( ) ,
50
+ } ,
51
+ } ,
52
+ } )
53
+ . pipe (
54
+ takeWhile ( ( evt ) => {
55
+ if ( 'message' in evt && evt ?. message ) {
56
+ const frameState : SearchStreamingState = evt . message . data . values [ 2 ] [ 0 ] ;
57
+ if ( frameState === SearchStreamingState . Done || frameState === SearchStreamingState . Error ) {
58
+ return false ;
59
+ }
60
+ }
61
+ return true ;
62
+ } , true )
63
+ )
64
+ . pipe (
65
+ map ( ( evt ) => {
66
+ if ( 'message' in evt && evt ?. message ) {
67
+ const currentTime = performance . now ( ) ;
68
+ const elapsedTime = currentTime - requestTime ;
69
+ // Schema should be [traces, metrics, state, error]
70
+ const traces = evt . message . data . values [ 0 ] [ 0 ] ;
71
+ const metrics = evt . message . data . values [ 1 ] [ 0 ] ;
72
+ const frameState : SearchStreamingState = evt . message . data . values [ 2 ] [ 0 ] ;
73
+ const error = evt . message . data . values [ 3 ] [ 0 ] ;
65
74
66
- switch ( frameState ) {
67
- case SearchStreamingState . Done :
68
- state = LoadingState . Done ;
69
- break ;
70
- case SearchStreamingState . Streaming :
71
- state = LoadingState . Streaming ;
72
- break ;
73
- case SearchStreamingState . Error :
74
- throw new Error ( error ) ;
75
- }
75
+ switch ( frameState ) {
76
+ case SearchStreamingState . Done :
77
+ state = LoadingState . Done ;
78
+ break ;
79
+ case SearchStreamingState . Streaming :
80
+ state = LoadingState . Streaming ;
81
+ break ;
82
+ case SearchStreamingState . Error :
83
+ throw new Error ( error ) ;
84
+ }
76
85
77
- frames = [
78
- metricsDataFrame ( metrics , frameState , elapsedTime ) ,
79
- ...formatTraceQLResponse ( traces , instanceSettings , query . tableType ) ,
80
- ] ;
81
- }
82
- return {
83
- data : frames || [ ] ,
84
- state,
85
- } ;
86
- } )
87
- ) ;
88
- } )
89
- ) ;
86
+ frames = [
87
+ metricsDataFrame ( metrics , frameState , elapsedTime ) ,
88
+ ...formatTraceQLResponse ( traces , instanceSettings , query . tableType ) ,
89
+ ] ;
90
+ }
91
+ return {
92
+ data : frames || [ ] ,
93
+ state,
94
+ } ;
95
+ } )
96
+ ) ;
90
97
}
91
98
92
99
function metricsDataFrame ( metrics : SearchMetrics , state : SearchStreamingState , elapsedTime : number ) {
0 commit comments