Skip to content

Commit

Permalink
[INLONG-6192][Manager] Clean and reuse code for StreamSink (apache#6193)
Browse files Browse the repository at this point in the history
  • Loading branch information
healchow authored Oct 17, 2022
1 parent 2144496 commit c9c120f
Show file tree
Hide file tree
Showing 9 changed files with 148 additions and 179 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public StreamSinkClient(ClientConfiguration configuration) {
}

public Integer createSink(SinkRequest sinkRequest) {
Response<Integer> response = ClientUtils.executeHttpCall(streamSinkApi.createSink(sinkRequest));
Response<Integer> response = ClientUtils.executeHttpCall(streamSinkApi.save(sinkRequest));
ClientUtils.assertRespSuccess(response);
return response.getData();
}
Expand All @@ -51,7 +51,7 @@ public Integer createSink(SinkRequest sinkRequest) {
*/
public boolean deleteSink(int id) {
Preconditions.checkTrue(id > 0, "sinkId is illegal");
Response<Boolean> response = ClientUtils.executeHttpCall(streamSinkApi.deleteSink(id));
Response<Boolean> response = ClientUtils.executeHttpCall(streamSinkApi.deleteById(id));
ClientUtils.assertRespSuccess(response);
return response.getData();
}
Expand All @@ -68,7 +68,7 @@ public List<StreamSink> listSinks(String groupId, String streamId) {
*/
public List<StreamSink> listSinks(String groupId, String streamId, String sinkType) {
Response<PageResult<StreamSink>> response = ClientUtils.executeHttpCall(
streamSinkApi.listSinks(groupId, streamId, sinkType));
streamSinkApi.list(groupId, streamId, sinkType));
ClientUtils.assertRespSuccess(response);
return response.getData().getList();
}
Expand All @@ -77,7 +77,7 @@ public List<StreamSink> listSinks(String groupId, String streamId, String sinkTy
* Update the stream sink info.
*/
public Pair<Boolean, String> updateSink(SinkRequest sinkRequest) {
Response<Boolean> responseBody = ClientUtils.executeHttpCall(streamSinkApi.updateSink(sinkRequest));
Response<Boolean> responseBody = ClientUtils.executeHttpCall(streamSinkApi.updateById(sinkRequest));
ClientUtils.assertRespSuccess(responseBody);

if (responseBody.getData() != null) {
Expand All @@ -91,7 +91,7 @@ public Pair<Boolean, String> updateSink(SinkRequest sinkRequest) {
* Get detail information of data sink.
*/
public StreamSink getSinkInfo(Integer sinkId) {
Response<StreamSink> response = ClientUtils.executeHttpCall(streamSinkApi.getSinkInfo(sinkId));
Response<StreamSink> response = ClientUtils.executeHttpCall(streamSinkApi.get(sinkId));
ClientUtils.assertRespSuccess(response);
return response.getData();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,28 +33,26 @@
public interface StreamSinkApi {

@POST("sink/save")
Call<Response<Integer>> createSink(@Body SinkRequest request);
Call<Response<Integer>> save(@Body SinkRequest request);

@POST("sink/update")
Call<Response<Boolean>> updateSink(@Body SinkRequest request);
Call<Response<Boolean>> updateById(@Body SinkRequest request);

@POST("sink/updateByKey")
Call<Response<UpdateResult>> updateSinkByKey(@Body SinkRequest request);
Call<Response<UpdateResult>> updateByKey(@Body SinkRequest request);

@DELETE("sink/delete/{id}")
Call<Response<Boolean>> deleteSink(@Path("id") Integer id);
Call<Response<Boolean>> deleteById(@Path("id") Integer id);

@DELETE("sink/deleteByKey")
Call<Response<Boolean>> deleteSink(
@Query("groupId") String groupId,
@Query("streamId") String streamId,
Call<Response<Boolean>> deleteByKey(@Query("groupId") String groupId, @Query("streamId") String streamId,
@Query("name") String name);

@GET("sink/get/{id}")
Call<Response<StreamSink>> get(@Path("id") Integer sinkId);

@GET("sink/list")
Call<Response<PageResult<StreamSink>>> listSinks(@Query("inlongGroupId") String groupId,
Call<Response<PageResult<StreamSink>>> list(@Query("inlongGroupId") String groupId,
@Query("inlongStreamId") String streamId, @Query("sinkType") String sinkType);

@GET("sink/get/{id}")
Call<Response<StreamSink>> getSinkInfo(@Path("id") Integer sinkId);

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,21 @@ List<SinkBriefInfo> selectSummary(@Param("groupId") String groupId,
/**
* Query valid sink list by the given group id and stream id.
*
* @param groupId Inlong group id.
* @param streamId Inlong stream id.
* @param sinkName Stream sink name.
* @return Sink entity list.
* @param groupId inlong group id
* @param streamId inlong stream id
* @return stream sink entity list
*/
List<StreamSinkEntity> selectByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId);

/**
* Query stream sink by the unique key.
*
* @param groupId inlong group id
* @param streamId inlong stream id
* @param sinkName stream sink name
* @return stream sink entity
*/
List<StreamSinkEntity> selectByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId,
StreamSinkEntity selectByUniqueKey(@Param("groupId") String groupId, @Param("streamId") String streamId,
@Param("sinkName") String sinkName);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,11 +267,17 @@
<if test="streamId != null and streamId != ''">
and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
</if>
<if test="sinkName != null and sinkName != ''">
and sink_name = #{sinkName, jdbcType=VARCHAR}
</if>
</where>
</select>
<select id="selectByUniqueKey" resultType="org.apache.inlong.manager.dao.entity.StreamSinkEntity">
select
<include refid="Base_Column_List"/>
from stream_sink
where is_deleted = 0
and inlong_group_id = #{groupId, jdbcType=VARCHAR}
and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
and sink_name = #{sinkName, jdbcType=VARCHAR}
</select>
<select id="selectByIdAndType" resultType="org.apache.inlong.manager.dao.entity.StreamSinkEntity">
select
<include refid="Base_Column_List"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public List<AuditVO> listByCondition(AuditRequest request) throws Exception {

// for now, we use the first sink type only.
// this is temporary behavior before multiple sinks in one stream is fully supported.
List<StreamSinkEntity> sinkEntityList = sinkEntityMapper.selectByRelatedId(groupId, streamId, null);
List<StreamSinkEntity> sinkEntityList = sinkEntityMapper.selectByRelatedId(groupId, streamId);
String sinkNodeType = null;
if (CollectionUtils.isNotEmpty(sinkEntityList)) {
sinkNodeType = sinkEntityList.get(0).getSinkType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;

import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;

Expand All @@ -36,37 +37,39 @@
public interface StreamSinkService {

/**
* Save the sink information.
* Save the sink info.
*
* @param request Sink request.
* @param operator Operator's name.
* @return Sink id after saving.
* @param request sink request need to save
* @param operator name of operator
* @return sink id after saving
*/
Integer save(SinkRequest request, String operator);

/**
* Query sink information based on id.
* Get stream sink info based on id.
*
* @param id Sink id.
* @return Sink info.
* @param id sink id
* @return detail of stream sink info
*/
StreamSink get(Integer id);

/**
* Query sink information based on inlong group id and inlong stream id.
* List the stream sinks based on inlong group id and inlong stream id.
*
* @param groupId Inlong group id.
* @param streamId Inlong stream id, can be null.
* @return Sink info list.
* @param groupId inlong group id
* @param streamId inlong stream id, can be null
* @return sink info list
*/
List<StreamSink> listSink(String groupId, String streamId);
List<StreamSink> listSink(String groupId, @Nullable String streamId);

/**
* Query sink summary based on inlong group id and inlong stream id, including sink cluster.
* Query sink brief info based on inlong group id and inlong stream id.
* <p/>
* The result will include sink cluster info.
*
* @param groupId Inlong group id.
* @param streamId Inlong stream id.
* @return Sink info list.
* @param groupId inlong group id
* @param streamId inlong stream id
* @return stream sink brief info list
*/
List<SinkBriefInfo> listBrief(String groupId, String streamId);

Expand All @@ -80,112 +83,113 @@ public interface StreamSinkService {
Map<String, List<StreamSink>> getSinksMap(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos);

/**
* Query the number of undeleted sink info based on inlong group and inlong stream id
* Query the number of undeleted sink info based on inlong group and inlong stream id.
*
* @param groupId Inlong group id.
* @param streamId Inlong stream id.
* @return Number of sink info.
* @param groupId inlong group id
* @param streamId inlong stream id
* @return count of sink info
*/
Integer getCount(String groupId, String streamId);

/**
* Paging query sink information based on conditions.
* Paging query stream sink info based on conditions.
*
* @param request paging request.
* @return sink list
* @param request paging request
* @return sink page list
*/
PageResult<? extends StreamSink> listByCondition(SinkPageRequest request);

/**
* Modify data sink information.
* Modify stream sink info by id.
*
* @param sinkRequest Information that needs to be modified.
* @param operator Operator's name.
* @return Whether succeed.
* @param sinkRequest stream sink request that needs to be modified
* @param operator name of operator
* @return whether succeed
*/
Boolean update(SinkRequest sinkRequest, String operator);

/**
* Modify data sink information by key.
* Modify stream sink info by key.
*
* @param sinkRequest Information that needs to be modified.
* @param operator Operator's name.
* @return Update result.
* @param sinkRequest stream sink request that needs to be modified
* @param operator name of operator
* @return update result
*/
UpdateResult updateByKey(SinkRequest sinkRequest, String operator);

/**
* Modify sink data status.
* Modify stream sink status.
*
* @param id Sink id.
* @param status Target status.
* @param log Modify the log.
* @param id stream sink id
* @param status target status
* @param log log info of this modification
*/
void updateStatus(int id, int status, String log);
void updateStatus(Integer id, int status, String log);

/**
* Delete the stream sink by the given id and sink type.
*
* @param id The primary key of the sink.
* @param operator Operator's name.
* @return Whether succeed
* @param id stream sink id
* @param operator name of operator
* @return whether succeed
*/
Boolean delete(Integer id, String operator);

/**
* Delete the stream sink by given group id, stream id, and sink name.
* @param groupId The group id of sink
* @param streamId The stream id of sink
* @param name The name of sink
* @return Whether succeed
*
* @param groupId inlong group id
* @param streamId inlong stream id
* @param name stream sink name
* @return whether succeed
*/
Boolean deleteByKey(String groupId, String streamId, String name, String operator);

/**
* Logically delete stream sink with the given conditions.
*
* @param groupId InLong group id to which the data source belongs.
* @param streamId InLong stream id to which the data source belongs.
* @param operator Operator's name.
* @return Whether succeed.
* @param groupId inlong group id
* @param streamId inlong stream id
* @param operator name of operator
* @return whether succeed
*/
Boolean logicDeleteAll(String groupId, String streamId, String operator);

/**
* Physically delete stream sink with the given conditions.
*
* @param groupId InLong group id.
* @param streamId InLong stream id.
* @param operator Operator's name.
* @return Whether succeed.
* @param groupId inlong group id
* @param streamId inlong stream id
* @param operator name of operator
* @return whether succeed
*/
Boolean deleteAll(String groupId, String streamId, String operator);

/**
* According to the existing inlong stream ID list, filter out the inlong stream id list
* containing the specified sink type.
*
* @param groupId Inlong group id.
* @param sinkType Sink type.
* @param streamIdList Inlong stream id list.
* @return List of filtered inlong stream ids.
* @param groupId inlong group id
* @param sinkType stream sink type
* @param streamIdList inlong stream id list
* @return list of filtered inlong stream ids
*/
List<String> getExistsStreamIdList(String groupId, String sinkType, List<String> streamIdList);

/**
* According to the inlong stream id, query the list of sink types owned by it
*
* @param groupId Inlong group id
* @param streamId Inlong stream id
* @return List of sink types
* @param groupId inlong group id
* @param streamId inlong stream id
* @return list of sink types
*/
List<String> getSinkTypeList(String groupId, String streamId);

/**
* Save the information modified when the approval is passed
*
* @param sinkApproveList Stream sink approval information
* @param operator Operator's name
* @param sinkApproveList stream sink approval information
* @param operator name of operator
* @return whether succeed
*/
Boolean updateAfterApprove(List<SinkApproveDTO> sinkApproveList, String operator);
Expand Down
Loading

0 comments on commit c9c120f

Please sign in to comment.