[应用]轻探Web Flux

楼主: kentyeh (kent)   2022-04-21 22:46:53
网页版在
https://kentyeh.blogspot.com/2022/04/webflux.html
===================================
本来标题是想写轻探春日流转的,想想还是算了。
程式码 https://github.com/kentyeh/FluxWeb,您可以先git clone下来备用。
Spring 5 后来始导入non-blocking IO、reactive backpressure的Web开发方式;仅管
Spring官方称WebFlux不会比Servlet快到哪去,但实际面临到需要I/O的情况下,理论上
总是会快一点,像用reactor netty抓网页的方式,我感觉就是比Apache HttpClient来的
快些。
转到WebFlux首先要面临的就是Servlet不再,没了JSP,也没了JSTL,一开始真的很难习
惯,忽然发现一堆Listener没得用,也没办法 Wrap Servlet Request,但为了或许能快
那么一点点,总是得付出些代价。
在学习的过程式,觉得困难点大概有三,分别是Web转换、Secuity应用与WebSocket管控
,我想就这几点来说明如果克服(至于如何写Reactive,不想在这里多说,
http://projectreactor.io 可以了解一下,网络也有一堆教学文件)。
首先要面临的是Web撰写方式的转换:
Spring boot 提供了一堆 spring-boot-starter-xxxx,可以很方便的开始一个专案,优
点是快速,缺点是引用了一堆可能用不到的Libraries,我并不打算以此为进入点。
WebFlux在少了Container的情况下,注定以应用程式的方式存在,而应用程式的方式就是
采用ApplicationContext去加载一些程式设定
package wf;
public class Main {
public static void main(String[] args) throws IOException {
try (AbstractApplicationContext context = new
AnnotationConfigApplicationContext(wf.config.AppConfig.class)) {
context.registerShutdownHook();
context.getBean(DisposableServer.class).onDispose().block();
}
....
所以AppConfig.java就是设定的进入点,上述程式加载设定后,随即就是启动HttpServer

package wf.config;
@Configuration
@ImportResource("classpath:applicationContext.xml")
@Import({WebConfig.class, PostgresR2dbConfig.class, H2R2dbConfig.class,
SecConfig.class, WsConfig.class})
public class AppConfig {
...
@Configuration不用多说,写过Spring程式的人都应该知道 。
至于@ImportResource,嗯!我是念旧的人,习惯把设定放在XML内(从Ver 3开始养成的)
,applicationContext.xml包含了Component Scan 与 thymeleaf(取代JSP)的一些设定。
AppConfig.java依序加载了Web设定、数据库设定、安全性设定与WebSocket设定。
WebConfig.java包含了WebFlux运作的基础设定:
前面说了,没了Servlet,WebFlux就必须找一些替代品,首先面临的就是Session的问题
,Spring Session提供了多种选择,我想为了效能,您应该不会选用jDBC的选项,以前我
用过Hazelcast,好处是去中心化(不用多备一台主机,直接把函式库绑入程式内),只要
还有一台Web存活(指的是Cluster架构),资料就不会丢失,但缺点也是去中心化,想要操
纵资料,除了自已写管理程式加入其中,不然就得花钱钱找官方,所以这次采用了大多数
人会用的Redis,好处是有Cli界面可用,缺点是要多备一台机器,一旦机器挂点,程式就
全挂了。
package wf.config;
@Configuration
@Import(RedisCacheConfig.class)
@EnableWebFlux
@EnableRedisWebSession(maxInactiveIntervalInSeconds = 30 * 60)
public class WebConfig implements WebFluxConfigurer, DisposableBean,
ApplicationListener<ContextClosedEvent> {
...
设定档必须继承WebFluxConfigurer并标注@EnableWebFlux是基本要件,
@EnableRedisWebSession则是说明以Redis做为Session资料戴体,理所当然,Redis可以
储存Session资料,当然也可做为Cache所用,所以在此我
Import(RedisCacheConfig.class),并将连线Redis的程式放在RedisCacheConfig内。
WebConfig.java的重要责任就是建构httpServer()(也是Main.java程式启动时主要加载的
标的),为了要在程式结束时优雅的结束httpServer,所以WebConfig也实做了
DisposableBean与ApplicationListener<ContextClosedEvent>,为了就在是程式终止时
顺便关闭httpServer;
另外addResourceHandlers(...)是为了加载静态目录,当url指到了static时,首先从
Webjars先找(像我引用了JQuery与purecss的jar),找不到再找classpath里的static目录

@Override
public void addResourceHandlers(ResourceHandlerRegistry registry) {
registry.addResourceHandler("/static/**")
.addResourceLocations("classpath:/META-INF/resources/webjars/")
.addResourceLocations("classpath:/static/")
.resourceChain(false);
}
而localeContextResolver()则是指定使用url参数locale来变更语系(没有多国语系,就
不用建构LocaleContextResolver)。
@Bean
public LocaleContextResolver localeContextResolver() {
return new LocaleContextResolver() {
...
至此再加写一支带有@Controller标记的程式(如wf.spring.RootController),就可以执
行Main.java让WebFlux跑起来了。
在进入到下个主题之前,我必须提及spring.profiles.active这个系统属性,Spring用这
个属性来控制Profile,所以我决定当这个系统属性为dev时,表示整个系统属于开发模式
,否则就是正式环境模式,所以您可能注意到AppConfig.java同时加载了
PostgresR2dbConfig.class(正式环境)与H2R2dbConfig.class(开发环境)。
package wf.config;
@Configuration
@Profile("dev")
public class H2R2dbConfig extends R2dbConfig implements
ApplicationListener<ContextClosedEvent> {
...
为此我在POM.xml设定的对应的两个Profile,以便在开发模式下,可以引用不同的函式库
并执行一些初始作业(如建构数据库与启动一个Redis Mock Server)。
<profiles>
<profile>
<id>dev</id>
<properties>
<spring.profiles.active>dev</spring.profiles.active>
<http.port>8080</http.port>
<spring.freemarker.checkTemplateLocation>
false
</spring.freemarker.checkTemplateLocation>
</properties>
<dependencies>
...
</dependencies>
</profile>
<profile>
<id>prod</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<spring.profiles.active>prod</spring.profiles.active>
<http.port>80</http.port>
</properties>
在开发时期,我只要执行
mvn -Pdev compile exec:java
就可以以测试环境的方式来执行程式。
当然,除了@Profile外,还有其它选择,在RedisCacheConfig.java里面有通往Redis
Server的连线设定
package wf.config;
Configuration
@EnableCaching(mode = AdviceMode.PROXY)
public class RedisCacheConfig extends CachingConfigurerSupport {
@Bean("prod")
@Conditional(LettuceConnFactoryCondition.class)
public LettuceConnectionFactory
redisProdConnectionFactory(GenericObjectPoolConfig gopc) {
logger.info("采用正式环境:连到正式Redis主机");
RedisSocketConfiguration config = new
RedisSocketConfiguration("/tmp/redis.sock");
...
LettucePoolingClientConfiguration poolConfig =
LettucePoolingClientConfiguration.builder()
.poolConfig(gopc).build();
return new LettuceConnectionFactory(config, poolConfig);
}
@Bean("dev")
@Conditional(LettuceConnFactoryCondition.class)
public LettuceConnectionFactory
redisDevConnectionFactory(GenericObjectPoolConfig gopc) {
logger.info("采用测试环境:连到测试Redis Mock");
RedisStandaloneConfiguration config = new
RedisStandaloneConfiguration("localhost",
context.getBean(RedisServer.class).getBindPort());
LettucePoolingClientConfiguration poolConfig =
LettucePoolingClientConfiguration.builder()
.poolConfig(gopc).build();
return new LettuceConnectionFactory(config, poolConfig);
}
然后透过LettuceConnFactoryCondition.java来决定哪个Bean应该被建立
package wf.util;
public class LettuceConnFactoryCondition implements Condition {
@Override
public boolean matches(ConditionContext context,
AnnotatedTypeMetadata metadata) {
String profile =
context.getEnvironment()
.getProperty("spring.profiles.active", "prod");
Map<String, Object> attributes =
metadata.getAnnotationAttributes(
Bean.class.getName());
String beanName = attributes == null ? ""
: ((String[]) attributes.get("value"))[0];
return beanName.startsWith(profile);
}
}
只要BeanName与开头与系统属性spring.profiles.active(默认为"prod")一致时则建立,
所以得以依这个属性来决定要如何连结到Redis Server。
另外我想要提一下的是Ractive JDBC,目前有R2DBC(https://r2dbc.io)可用,当然
Spring也有对应的专案(https://spring.io/projects/spring-data-r2dbc),现下只支援
几个主流的数据库,而且大都不是官方开发的,最大的困扰还是在于没有JNDI可用,让我
没法利用像Atomikos这种工具来作Two Phase Commit,失去了跨数据库的机会,当然也可
换种想法,要快就不要跨数据库Commit。
为了介绍后续的功能,我必须先说明一下我数据库的Schema:只有包含两个Table,一个
是成员,另一个是成员的角色。
CREATE TABLE IF NOT EXISTS member(
account varchar(10) primary key,
username varchar(16) not null,
passwd varchar(20) not null,
enabled varchar(1) default 'Y' check(enabled='Y' or enabled='N'),
birthday date default CURRENT_DATE
);
CREATE TABLE IF NOT EXISTS authorities(
aid SERIAL primary key,
account varchar(10) references member(account) on update cascade on delete
cascade,
authority varchar(50) not null
);
create unique index IF NOT EXISTS authorities_idx on
authorities(account,authority);
主要对应的类别是wf.model.Member,特别需要关注的是isNew()这个Method,Spring
Data主要透过这个方法来决定资料是要Insert还是Update;其它相关的物件有
wf.data.MemberDao(负责数据库对应的查询或更新)与wf.data.MemberManager(负责交易
的控制)。
说到这,我不得不提Spring的DataBinder,
@ControllerAdvice
public class ControlBinder {
/*private MemberManager manager;
@Autowired
public void setManager(MemberManager manager) {
this.manager = manager;
}*/
@InitBinder
public void initBinder(WebDataBinder binder) {
binder.registerCustomEditor(Date.class, new DatePropertyEditor());
binder.registerCustomEditor(LocalDate.class,
new LocalDatePropertyEditor());
binder.registerCustomEditor(Boolean.class,
new BooleanPropertyEditor());
//binder.registerCustomEditor(Member.Mono.class, manager);
}
}
上面注册了一些物件,来做为资料在String与Object之间的转换,可以看到这些类别都实
做了java.beans.PropertyEditor,(MemberManager也不例外,没有注册的原因是因为我
实做并在WebConfig.java注册了另一个物件MemberFormatter),这种转换有什么用呢?且
看下面例子:
package wf.spring;
@Controller
public class RootController {
@GetMapping("/hello/{member}")
public String hello(@PathVariable("member") Member.Mono member,
Model model) {
model.addAttribute("user", member.get().switchIfEmpty(Mono.empty()));
return "hello";
}
}
只要在网址列打上Member的帐号,在叫用Method前,会先将PathVariable透过转换器转换
成对应的物件。
另一个常用的功能则是用@ModelAttribute来蒐集前端输入
https://i.imgur.com/7eFAzHr.png
@Controller
public class RootController {
@PreAuthorize("hasRole('ADMIN')")
@PostMapping("/modifyMember/{member}")
public Mono<String> modifyMember(
@PathVariable("member") Member.Mono oriMember,
@ModelAttribute Member member, Model model) {
//避免后面有值但第一个没勾,导致null字串
Iterables.removeIf(member.getRoles(), Predicates.isNull());
model.addAttribute("member", memberManager
.saveMember(member.setNew(false)));
return Mono.just("member");
}
}
前端输入的资料,毌论是Master主体资料与Detail角色资料一并被蒐集并转成member物件
(生日也因为注册过LocalDatePropertyEditor也同样被转成LocalDate)。
在进入Security之前要提一下projectreactor.io的reactor.util.Logger,常常在Mono或
Flux加入log()方法来记录除错过程,其实它是叫用log(Logger),可惜这个Logger,其底
层是采用SLF4J所实作的异步Log,但您可以注意到,我采用的是Apache Log4j2,虽然
Log4j2,有asyncLogger,但若Mono或Flux没有对应的Logger可用,有点遗憾,所以我实
做了一个Loggers4j2,可以替代原本的Logger来对Mono或Flux除错。Log4j2的
asyncLogger本质上是不希望记录Log发生在何处,因为找出记录发生在何处会使得效能大
大降低,所以禀持相同理念,您应该在Log时,让讯息本身彰显足以判断出处,当然在开
发模式下,我还是会找出讯息记录的发生处。
Spring Security的设定如下,可惜没有办法用XML进行设定
package wf.config;
@EnableWebFluxSecurity
@EnableReactiveMethodSecurity
public class SecConfig {
...
@EnableWebFluxSecurity是说明采用Spring Security,@EnableReactiveMethodSecurity
则说明会采用Method级别的安全设定,
public class SecConfig {
@Bean
public SecurityWebFilterChain securitygWebFilterChain(
ServerHttpSecurity http) {
SecurityWebFilterChain build = http
.authorizeExchange()
.pathMatchers("/", "/index", "/hello/**", "/static/**",
"/login", "/logout").permitAll()
...
.formLogin((ServerHttpSecurity.FormLoginSpec flt) -> {
flt.authenticationManager(new
CaptchaUserDetailsReactiveAuthenticationManager(
userDetailsService()));
flt.loginPage("/login");
flt.authenticationFailureHandler(new
RedirectFluxWebAuthenticationFailureHandler(
"/login?error"));
})
...
.and().build();
build.getWebFilters().subscribe(filter -> {
if (filter instanceof AuthenticationWebFilter) {
AuthenticationWebFilter awf =
(AuthenticationWebFilter) filter;
awf.setServerAuthenticationConverter(new
CustomServerFormLoginAuthenticationConverter());
}
});
return build;
}
SecurityWebFilterChain(http)是主要的设定主体,可以看出我想自订登录画面(主要是
加入Captcha),因为Spring Security使用UsernamePasswordAuthenticationToken来存放
用户的帐号/密码,所以以wf.security.UsernamePasswordCaptchaAuthenticationToken
来对应存放资讯,也因为多了Captcha,所以必须自行进行授权检查,所以在formLogin里
指定了自订的wf.model.CaptchaUserDetailsReactiveAuthenticationManager,但问题来
了,Webflux把蒐集token的过程隐藏起来以致于没办法让包含Captcha的token被转送给
AuthenticationManager来处理,所以我也只能用过滤Filters的方式,把
wf.security.CustomServerFormLoginAuthenticationConverter替换给
AuthenticationWebFilter。
https://i.imgur.com/ASFzbtc.png
这里要备注一点小提醒:Capatch验证一旦取用,必须立即清除,否则机器人只要取用一
次,就可以无限次try帐/密就失去了Captch的意义了。
理论上加进了Security,那么我们就能在Request Method里面加上
@AuthenticationPrincipal来取的User Principal
public class RootController {
@GetMapping("/whoami")
public String whomai(@AuthenticationPrincipal
Mono<UserDetails> principal, Model model) {
model.addAttribute("user", principal);
return "index";
}
所以写下了测试程式:
package wf.spring;
@WebFluxTest(controllers = RootController.class,
excludeAutoConfiguration = {ReactiveSecurityAutoConfiguration.class})
@TestExecutionListeners({ReactorContextTestExecutionListener.class
,WithSecurityContextTestExecutionListener.class})
@ContextConfiguration(classes = {TestContext.class})
public class TestRootController extends AbstractTestNGSpringContextTests {
@Test
@WithMockUser(username = "nobody", password = "nobody",
authorities = "ROLE_USER")
void testWhoAmi() {
Member member = new Member("nobody", "呒人识君");
member.setPasswd("nobody");
member.addRole("ROLE_USER");
webClient
.mutateWith(mockUser(new MemberDetails(member)))
//.mutateWith(mockUser("呒人识君").roles("USER"))
.get().uri("/whoami").header(HttpHeaders.CONTENT_TYPE,
MediaType.TEXT_HTML_VALUE)
.exchange().expectBody().consumeWith(response
-> Assertions.assertThat(new
String(response.getResponseBody()
, StandardCharsets.UTF_8)).contains("呒人识君"));
}
发现@WithMockUser完全没用,我猜是不是WithMockUserSecurityContextFactory里的
createEmptyContext()的关系,所以不得不改用上述程式码里的
mutateWith(mockUser(…))方式来mock User。
这里第一只测试程式TestRootController必须先说明一下:
没错,TestCase是继承AbstractTestNGSpringContextTests,为什么是TestNG?那是因为
我第一次写单元测试,JUnit没有多执行绪测试,所以只能改用TestNG,另外的一个原因
则是TestNG产生的报表比较美观。也不知道是不是因为TestNG的关系,导致一些行为不如
我的预期。
Spring的测试一般都会排除Security设定,让测试的行为尽量单纯,所以上述测试,
WebFluxTest首先要排除Reactive Security的自动设定,而@TestExecutionListeners用
来处理事前准备作业(其中的WithSecurityContextTestExecutionListener可以从所有测
试程式中移除,虽然我照着官方说明(https://bit.ly/3xIRhK5)来作,但完全看不出有
什么用)。
然后实测结果,Principal还无无法传播到 whoami(),所以我猜应该是某某不知名的原因
,导致测试环境没有建立HandlerMethodArgumentResolver,所以我从boot抄来
wf.util.ConditionalOnMissingBean与wf.util.MissingBeanCondition,并在
package wf.config;
public class TestContext implements WebTestClientConfigurer {
@Bean("testAuthenticationPrincipalResolver")
@ConditionalOnMissingBean(AuthenticationPrincipalArgumentResolver.class)
public HandlerMethodArgumentResolver
authenticationPrincipalArgumentResolver(BeanFactory beanFactory) {
return new TestAuthenticationPrincipalResolver(beanFactory);
}
当环境缺少AuthenticationPrincipalArgumentResolver时,自动建立一个
wf.config.TestAuthenticationPrincipalResolver(也是抄来的),自此测试才算圆满成
功。
这里也要特别提醒,每一支AbstractTestNGSpringContextTests都运行在一个独立的
Context中(多只Tests运行时,会看到Spring Boot的LOGO跑出来多次)。
相信很多人对WebSocket的第一印象就是那个著名的Chat聊天程式,Client端发起一个
WebSocket连线到Server,Server则记着所有连线,只要接收到Client End传来的讯息,
立即把该讯息逐一传送给其它所有连线。
其实细究Client到Server建立连线有一个过程,一开始Client是透过URL连线到Server,
完成HandShake后才建立一个双向连线(双方都可发讯息给另一方),直到有一方中断连线

所以Securiy的应用,第一步就是开始的那个URL连线,SpringSecurity管控URL是天经地
义;当WebSocket连线建立后,即使用户登出,只要双方没有一方切断连线,其实这个
WebSocket并不会受到影响,毕竟两者处在不同世界,所以Server必须记着这个
WebSocket 连线,当用户登出后,立即由Server端切断WebSeocket连线。
记得前面说过,Spring Session可以用来建造Web Cluster吗?因为Session是存在独立的
Redis Server,所以Client端连线进来,并不在意Cookie会被送到丛集中的哪一台。她们
是等价的;但是WebSocket连线是一个持续的连线,一旦建立,Client便会和最后
HandShake的这台WebServer建立一条持久稳固的连线。也就是说:同一浏览器可能开启多
个视窗连线到N台WebServer以建立WebSocket。
假设一用户(Nobody)用两个装置的浏览器,先后登录到WebServer并各自开启两个页面(假
设是聊天室,可能连到不同的两台WebServer)并建立WebSocket。所以我们先确立几件事
※四个WebSocket连线共登录了两次,所以有两个不同的Session Id
※任何给Nobody的讯息,都应该送达这4个页面
※其中一个浏览器进行登出,只会影响同浏览器的页面,另一个装置的两个
WebSocket连线仍然持续运作
首先设定以/ws做为进入点,这个进入点在之前的安全设定必须为登录过用户使用,URL会
取得静态网页chat.html,同时这也是WebSocket HandShake的point.
public class RootController {
@GetMapping("/ws")
public String chat() {
return "chat";
}
然后在WsConfig.java指定WebSocket HandShake所在
package wf.config;
@Configuration
@Import(RedisCacheConfig.class)
public class WsConfig {
@Bean
public HandlerMapping handlerMapping(WebSocketHandler webSocketHandler) {
String path = "/chat";
Map<String, WebSocketHandler> map = new HashMap<>();
map.put(path, webSocketHandler);
return new SimpleUrlHandlerMapping(map, -1);
}
@Bean
public HandlerAdapter wsHandlerAdapter() {
return new WebSocketHandlerAdapter(webSocketService());
}
@Bean
public WebSocketService webSocketService() {
return new wf.spring.HandshakeFluxWebSocketService();
}
介入HandShake过程(靠自订HandshakeFluxWebSocketService)
WebSocket只能在HandShake时取得Session相关资讯,这也是为什么需要介入HandShake
Service的原因,我们在HandShake的同时,将额外的资料设定给WebSocket
package wf.spring;
public class HandshakeFluxWebSocketService extends HandshakeWebSocketService
implements InitializingBean {
@Override
public void afterPropertiesSet() throws Exception {
setSessionAttributePredicate(s -> {
logger.info("转递 ({}) 给 WebSoketHandler", s);
return true;
});
}
@Override
public Mono<Void> handleRequest(ServerWebExchange exchange,
WebSocketHandler handler) {
exchange.getSession().subscribe(ws -> {
SecurityContext sc = ws.getAttribute(
HttpSessionSecurityContextRepository.SPRING_SECURITY_CONTEXT_KEY);
if (sc != null) {
logger.info("HandshakeFluxWebSocketService-principal is [{}]{}"
, sc.getAuthentication().getPrincipal().getClass().getName()
, sc.getAuthentication().getPrincipal());}
ws.getAttributes().put("JSESSIONID", ws.getId());
});
return super.handleRequest(exchange, handler);
}
基本上,HandleShake会想要把Session里面所有的东西设定给WebSocket,但会先问一下
,基本上我是一律放行,所以setSessionAttributePredicate()都是回传true;
之前也说过,浏览器登出时,要把相关的WebSocket全数关闭,所以我需要知道交互的对
象的Session ID,基本上Spring Session,所以在handleRequest(), 我取出后,直接放
到WebSocket的Attributes(Key值是JSESSIONID),您可以从执行的Log中看出端倪。
每个WebSocket连线后,SocketHandler都要建立一个WebSocketRedisListener物件(本身
会记住是属于哪个JSESSIONID),物件的责任内容如下(this的部份不是很正确,勿怪):
https://i.imgur.com/uyF09YL.png
wf.spring.FluxWebSocketHandler里面有一个全域物件存放所有的
WebSocketRedisListener,
public static final ListMultimap<String,websocketredislistener>
userListenser = MultimapBuilder...
WebSocketHandler一开始就是要建立一个WebSocketRedisListener,即使本身收到
Client End传来的讯息,也要交由这个WebSocketRedisListener去广播给所有WebSocket
连线。
@Component("serverLogoutSuccessHandler")
public class FluxWebSocketHandler implements WebSocketHandler,
ServerLogoutSuccessHandler {
public Mono<Void> handle(WebSocketSession session) {
Object jsessionId = session.getAttributes().get("JSESSIONID");
Object sectximp = session.getAttributes().get(
HttpSessionSecurityContextRepository.SPRING_SECURITY_CONTEXT_KEY);
if (sectximp != null && jsessionId != null &&
!jsessionId.toString().trim().isEmpty()) {
SecurityContext sectx = (SecurityContextImpl) sectximp;
Authentication auth = sectx.getAuthentication();
User user = auth == null ? null : ((User) auth.getPrincipal());
if (user != null) {
ReactiveRedisMessageListenerContainer container =
context.getBean(
ReactiveRedisMessageListenerContainer.class);
ReactiveRedisTemplate<String, JsonNode> redisTemplate =
new ReactiveRedisTemplate<>(connectionFactory,
serializationContext);
UnaryOperator<JsonNode> processor = notify -> notify;
WebSocketRedisListener<JsonNode> wsrl =
context.getBean(WebSocketRedisListener.class
,session, container, user.getUsername(), jsessionId
,serializationContext, redisTemplate, processor);
logger.info("put listener[{}] {}", jsessionId,
userListenser.put(jsessionId.toString().trim(), wsrl));
return session.receive().flatMap(webSocketMessage -> {
String payload = webSocketMessage.getPayloadAsText();
logger.debug("收到:{}", payload);
//convert payload to JsonNode
JsonNode notify = serializationContext
.getValueSerializationPair()
.read(ByteBuffer.wrap(payload.getBytes(
StandardCharsets.UTF_8)));
wsrl.getReactiveRedisTemplate().convertAndSend(
user.getUsername(), notify).subscribe();
return Mono.empty();
}).doOnTerminate(() -> {
if (userListenser.get(
jsessionId.toString()).remove(wsrl)) {
wsrl.destroy();
logger.info("移除监听器");
} else {
logger.error("移除监听器失败");
}
}).doFinally(signal -> {
...
这个Handler也同时实做了ServerLogoutSuccessHandler,为的就是在用户登出时,从
userListener清除同JSESSIONID的WebSocketRedisListener。
@Override
public Mono<Void> onLogoutSuccess(WebFilterExchange exchange,
Authentication authentication) {
String username = authentication.getPrincipal() == null
? exchange.getExchange().getPrincipal()
.map(p -> p.getName()).block()
: User.class.isAssignableFrom(
authentication.getPrincipal().getClass())
? ((User) authentication.getPrincipal()).getUsername()
: Principal.class.isAssignableFrom(
authentication.getPrincipal().getClass())
? ((Principal) authentication.getPrincipal()).getName()
: null;
logger.info(":{}登出", username);
exchange.getExchange().getSession().subscribe(ws -> {
logger.info("JSESSIONID:{}", ws.getId());
userListenser.removeAll(ws.getId()).forEach(wrl -> wrl.destroy());
}, t -> logger.error("登出排除WS时错误:" + t.getMessage(), t)
);
ServerHttpResponse response = exchange.getExchange().getResponse();
response.setStatusCode(HttpStatus.FOUND);
response.getCookies().remove("JSESSIONID");
response.getHeaders().setLocation(logoutSuccessUrl);
return exchange.getExchange().getSession()
.flatMap(WebSession::invalidate);
}
至此,完成我对WebSocket的期待,我心目中的购物车,就是当商品被放入购物车的时后
,资料打包丢给JMS去逐一处理(我用这种方式应付过9.8K-Google Analytics显示人潮同
时开抢,可惜那时还不会应用WebSocket),后端处理完成后再透过WebSocket把购物车的
变动通知前端。
chat.html里面有个放入购物车的按纽,就是透过Ajax通知后端放入购物车,然后把讯息
Publish给Redis,Listener收到后再透过WebSocket通知商品已放入购物车。
https://i.imgur.com/B1h1XFW.png
当您同时用不同装置开启虾皮时,只要一个装置放入商品,其它装置的购物车也会同步更
新购物车就是我想要的效果。
再试试直接从Redis命令列直接发布讯息
https://i.imgur.com/ewxrg7H.png
最后要考虑的是如何测试?前述的加入购物车,是个多步骤的过程,首先,需要有Redis的
环境,然后登录,建立WebSocket连线,呼叫放入购物车,检查回传的讯息。近乎真实世
界的测试,此时已不能视为"单元"测试,而是应该视为"整合测试"。因为近乎真实,所以
会有CSRF、会有Captcha,为了测试的缘故,必须将CSRF与Catpch固定下来,所以Captcha
在有系统属性"captcha"时,就会以此值做为默认值。也因为为了固定CSRF的值,所以测
试不会引入原本的wf.config.SecConfig,而是引入改写过的TestSecConfig.java。
@EnableWebFluxSecurity
@EnableReactiveMethodSecurity
public class TestSecConfig extends SecConfig {
SecurityWebFilterChain build = http
.authorizeExchange()
.pathMatchers("/", "/index", "/hello/**", "/static/**",
"/login", "/logout").permitAll()
.pathMatchers("/admin/**").hasAuthority("ROLE_ADMIN")
.anyExchange().authenticated()
.and().csrf(c -> c.csrfTokenRepository(
new FixedCsrfTokenRepository(csrf)))
...
在测试WebSocket,我用了两种方式,分别是透过Reactor Netty的方式与HtmlUnit的方式
,ReactorNetty的方式比较低阶(此时我比较想用这个,毕竟是在写Reactive程式),必须
自行取Captcha图档,记住前次Cookies,发送FormData,HtmlUnit则比较高阶,整个就是
个Headless Browser,除了看不到画面,与一般的浏览器并无不同,但两者都有同样的问
题困扰我:就是无法真正掌握WebSocket建立连接的时刻(以前端的角度来看,就是无法掌
握WebSocket.onOpen的时机),逼得我没办法,只能以Thread.sleep(3000)应对,也许有
大神能够开示一下。
在我学习WebFlux的过程,总感觉官方文件不是写得很详细,所以才想要记录一下历程,
也希望对后来者有点帮助。

Links booklink

Contact Us: admin [ a t ] ucptt.com