Skip to content

Commit 185df6e

Browse files
🎨 添加redis微信消息重复检查相关类
1 parent a19112b commit 185df6e

File tree

3 files changed

+109
-0
lines changed

3 files changed

+109
-0
lines changed
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package me.chanjar.weixin.common.api;
2+
3+
import lombok.RequiredArgsConstructor;
4+
import org.redisson.api.RBucket;
5+
import org.redisson.api.RedissonClient;
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
9+
import java.util.concurrent.TimeUnit;
10+
11+
/**
12+
* 利用redis检查消息是否重复
13+
*
14+
*/
15+
@RequiredArgsConstructor
16+
public class WxMessageInRedisDuplicateChecker implements WxMessageDuplicateChecker {
17+
18+
/**
19+
* 过期时间
20+
*/
21+
private int expire = 10;
22+
23+
private final Logger log = LoggerFactory.getLogger(getClass());
24+
25+
private final RedissonClient redissonClient;
26+
27+
/**
28+
* messageId是否重复
29+
*
30+
* @param messageId messageId
31+
* @return 是否
32+
*/
33+
@Override
34+
public boolean isDuplicate(String messageId) {
35+
RBucket<String> r = redissonClient.getBucket("wx:message:duplicate:check:" + messageId);
36+
boolean setSuccess = r.trySet("1", expire, TimeUnit.SECONDS);
37+
return !setSuccess;
38+
}
39+
40+
public int getExpire() {
41+
return expire;
42+
}
43+
44+
public void setExpire(int expire) {
45+
this.expire = expire;
46+
}
47+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package me.chanjar.weixin.common.api;
2+
3+
import org.redisson.Redisson;
4+
import org.redisson.api.RedissonClient;
5+
import org.redisson.config.Config;
6+
import org.redisson.config.TransportMode;
7+
import org.testng.annotations.BeforeTest;
8+
import org.testng.annotations.Test;
9+
10+
import java.util.concurrent.TimeUnit;
11+
12+
import static org.testng.Assert.assertFalse;
13+
import static org.testng.Assert.assertTrue;
14+
15+
@Test
16+
public class WxMessageInRedisDuplicateCheckerTest {
17+
18+
private RedissonClient redissonClient;
19+
20+
@BeforeTest
21+
public void init() {
22+
Config config = new Config();
23+
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
24+
config.setTransportMode(TransportMode.NIO);
25+
this.redissonClient = Redisson.create(config);
26+
checker = new WxMessageInRedisDuplicateChecker(redissonClient);
27+
checker.setExpire(2);
28+
}
29+
30+
private WxMessageInRedisDuplicateChecker checker;
31+
32+
public void test() throws InterruptedException {
33+
Long[] msgIds = new Long[]{1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L};
34+
35+
// 第一次检查
36+
for (Long msgId : msgIds) {
37+
boolean result = checker.isDuplicate(String.valueOf(msgId));
38+
assertFalse(result);
39+
}
40+
41+
// 过1秒再检查
42+
TimeUnit.SECONDS.sleep(1);
43+
for (Long msgId : msgIds) {
44+
boolean result = checker.isDuplicate(String.valueOf(msgId));
45+
assertTrue(result);
46+
}
47+
48+
// 过1.5秒再检查
49+
TimeUnit.MILLISECONDS.sleep(1500L);
50+
for (Long msgId : msgIds) {
51+
boolean result = checker.isDuplicate(String.valueOf(msgId));
52+
assertFalse(result);
53+
}
54+
55+
}
56+
57+
}

weixin-java-mp/src/main/java/me/chanjar/weixin/mp/api/WxMpMessageRouter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,12 @@
1414
import me.chanjar.weixin.common.util.LogExceptionHandler;
1515
import me.chanjar.weixin.mp.bean.message.WxMpXmlMessage;
1616
import me.chanjar.weixin.mp.bean.message.WxMpXmlOutMessage;
17+
import me.chanjar.weixin.mp.util.WxMpConfigStorageHolder;
1718
import org.apache.commons.lang3.StringUtils;
1819
import org.slf4j.Logger;
1920
import org.slf4j.LoggerFactory;
2021

22+
import javax.xml.ws.Holder;
2123
import java.util.ArrayList;
2224
import java.util.HashMap;
2325
import java.util.List;
@@ -208,11 +210,14 @@ public WxMpXmlOutMessage route(final WxMpXmlMessage wxMessage, final Map<String,
208210

209211
WxMpXmlOutMessage res = null;
210212
final List<Future<?>> futures = new ArrayList<>();
213+
String appId = WxMpConfigStorageHolder.get();
211214
for (final WxMpMessageRouterRule rule : matchRules) {
212215
// 返回最后一个非异步的rule的执行结果
213216
if (rule.isAsync()) {
214217
futures.add(
215218
this.executorService.submit(() -> {
219+
//传入父线程的appId
220+
this.wxMpService.switchoverTo(appId);
216221
rule.service(wxMessage, context, mpService, WxMpMessageRouter.this.sessionManager, WxMpMessageRouter.this.exceptionHandler);
217222
})
218223
);

0 commit comments

Comments
 (0)