Skip to content

Commit

Permalink
Merge branch '3.2.x'
Browse files Browse the repository at this point in the history
# Conflicts:
#	gradle.properties
#	gradle/version.gradle
  • Loading branch information
li-xunhuan committed Jul 17, 2024
2 parents cc3879a + b27d2e9 commit ab95c48
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 11 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/publish-snapshot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ name: Publish Snapshot

on:
push:
branches: [ master ]
branches: [ master, 2.7.x ]
pull_request:
branches: [ master ]
branches: [ master, 2.7.x ]
jobs:
publish:
runs-on: ubuntu-latest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@ public List<TargetGroup> getList() {
List<ServiceInstance> instanceList = discoveryClient.getInstances(serviceId);
List<String> targets = new ArrayList<>();
for (ServiceInstance instance : instanceList) {
targets.add(String.format("%s:%d", instance.getHost(), instance.getPort()));
String instanceHost = instance.getHost();
int managementPort = Optional.ofNullable(instance.getMetadata())
.map(x -> x.get("management.port"))
.map(Integer::parseInt)
.orElseGet(instance::getPort);
targets.add(instanceHost + ':' + managementPort);
}
Map<String, String> labels = new HashMap<>(4);
// 1. 环境
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* prometheus http sd
Expand All @@ -49,9 +51,14 @@ public class ReactivePrometheusApi {
public Flux<TargetGroup> getList() {
return discoveryClient.getServices()
.flatMap(discoveryClient::getInstances)
.groupBy(ServiceInstance::getServiceId, instance ->
String.format("%s:%d", instance.getHost(), instance.getPort())
).flatMap(instanceGrouped -> {
.groupBy(ServiceInstance::getServiceId, instance -> {
String instanceHost = instance.getHost();
int managementPort = Optional.ofNullable(instance.getMetadata())
.map(x -> x.get("management.port"))
.map(Integer::parseInt)
.orElseGet(instance::getPort);
return instanceHost + ':' + managementPort;
}).flatMap(instanceGrouped -> {
Map<String, String> labels = new HashMap<>(4);
// 1. 环境
if (StringUtils.hasText(activeProfile)) {
Expand All @@ -60,7 +67,7 @@ public Flux<TargetGroup> getList() {
// 2. 服务名
String serviceId = instanceGrouped.key();
labels.put("__meta_prometheus_job", serviceId);
return instanceGrouped.collectList().map(targets -> new TargetGroup(targets, labels));
return instanceGrouped.collect(Collectors.toList()).map(targets -> new TargetGroup(targets, labels));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import net.dreamlu.mica.core.utils.JsonUtil;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.BitFieldSubCommands;
import org.springframework.data.redis.connection.RedisServerCommands;
import org.springframework.data.redis.core.*;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.lang.Nullable;
Expand Down Expand Up @@ -755,8 +756,8 @@ public Long getHCounter(String key, Object field) {
/**
* 获取记数器的值,用于初始化获取 hIncr、hIncrBy 的值
*
* @param key key
* @param loader 加载器
* @param key key
* @param loader 加载器
*/
@Nullable
public Long getHCounter(String key, Object field, LongSupplier loader) {
Expand Down Expand Up @@ -1572,6 +1573,16 @@ public Boolean setBit(String key, long offset, boolean value) {
return redisTemplate.execute((RedisCallback<Boolean>) redis -> redis.setBit(keySerialize(key), offset, value));
}

/**
* 获取 redis 时间
*
* @return redis 时间
*/
@Nullable
public Long getTime() {
return redisTemplate.execute((RedisCallback<Long>) RedisServerCommands::time);
}

/**
* redisKey 序列化
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.Map;
import java.util.function.Predicate;

/**
* Redisson 监听器
Expand All @@ -43,6 +44,8 @@
*/
@Slf4j
public class RStreamListenerDetector implements BeanPostProcessor, InitializingBean {
// redis 重连等会触发异常,异常时不取消订阅
private static final Predicate<Throwable> CANCEL_SUBSCRIPTION_ON_ERROR = t -> false;
private final StreamMessageListenerContainer<String, MapRecord<String, String, byte[]>> streamMessageListenerContainer;
private final RedisTemplate<String, Object> redisTemplate;
private final String consumerGroup;
Expand Down Expand Up @@ -90,15 +93,26 @@ public Object postProcessAfterInitialization(Object bean, String beanName) throw
}

private void broadCast(StreamOffset<String> streamOffset, Object bean, Method method, boolean isReadRawBytes) {
streamMessageListenerContainer.receive(streamOffset, (message) -> {
StreamMessageListenerContainer.StreamReadRequest<String> streamReadRequest = StreamMessageListenerContainer.StreamReadRequest
.builder(streamOffset)
// 重连会触发异常
.cancelOnError(CANCEL_SUBSCRIPTION_ON_ERROR)
.build();
streamMessageListenerContainer.register(streamReadRequest, (message) -> {
// MapBackedRecord
invokeMethod(bean, method, message, isReadRawBytes);
});
}

private void cluster(Consumer consumer, StreamOffset<String> streamOffset, RStreamListener listener, Object bean, Method method) {
boolean autoAcknowledge = listener.autoAcknowledge();
StreamMessageListenerContainer.ConsumerStreamReadRequest<String> readRequest = StreamMessageListenerContainer.StreamReadRequest.builder(streamOffset).consumer(consumer).autoAcknowledge(autoAcknowledge).build();
StreamMessageListenerContainer.ConsumerStreamReadRequest<String> readRequest = StreamMessageListenerContainer.StreamReadRequest
.builder(streamOffset)
.consumer(consumer)
.autoAcknowledge(autoAcknowledge)
// 重连会触发异常
.cancelOnError(CANCEL_SUBSCRIPTION_ON_ERROR)
.build();
StreamOperations<String, Object, Object> opsForStream = redisTemplate.opsForStream();
streamMessageListenerContainer.register(readRequest, (message) -> {
// MapBackedRecord
Expand Down

0 comments on commit ab95c48

Please sign in to comment.