JSR 356, Java API for WebSocket(本篇文章记录注解实现websocket)
websocket支持两种不同的编程方式:
1.基于注解驱动,使用带注释的POJO,可以与WebSocket生命周期事件进行交互
2.基于接口驱动,可以实现Endpoint
与生命周期事件交互的接口和方法
官网参考资料:
ORACLE JAVA:https://www.oracle.com/technetwork/articles/java/jsr356-1937161.html
J2EE 7:https://docs.oracle.com/javaee/7/tutorial/websocket.htm
1.maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2.ServerEndpointExporter配置
/**
* 外部tomcat由容器自己提供
* @author Administrator
*
*/
@Configuration
public class ServerEndpointExporterConfigure {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
3.创建WebSocket请求的端点
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.http.HttpSession;
import javax.websocket.CloseReason;
import javax.websocket.EndpointConfig;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Component
@ServerEndpoint(value="/websocket/{sid}",configurator=HttpSessionConfigurator.class)
public class ServerEndpointConfigure {
//设定原子整型,用来记录当前在线连接数
private AtomicInteger onlineCount = new AtomicInteger(0);
//concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
// private static CopyOnWriteArraySet<ServerEndpointConfigure> webSocketSet = new CopyOnWriteArraySet<ServerEndpointConfigure>();
//若要实现服务端与指定客户端通信的话,可以使用Map来存放,其中Key可以为用户标识
// public static ConcurrentHashMap<Session,Object> webSocketMap = new ConcurrentHashMap<Session,Object>();
//绑定HttpSession与session
public static ConcurrentHashMap<String,Session> bizSession = new ConcurrentHashMap<String,Session>();
//与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
/**
* <p>连接建立成功调用的方法</p>
* @throws IOException
**/
@OnOpen
public void onOpen(Session session,EndpointConfig config,@PathParam("sid") String sid) throws IOException {
this.session = session;
HttpSession httpSession = (HttpSession) config.getUserProperties().get(HttpSession.class.getName());
log.info("httpSessionId:{},websocketSessionId:{},sid:{}----",httpSession.getId(),session.getId(),sid);
bizSession.put(httpSession.getId(), session);//建立关联
addOnlineCount();//在线数加1
log.info("有新连接加入!当前在线人数为" + getOnlineCount());
try {
sendMessage("["+httpSession.getId()+"]连接成功",session);
//设定模拟线程
new Thread(new Heartbeat(session)).start();
} catch (IOException e) {
log.error("websocket IO异常");
}
}
/**
* <p>连接关闭调用的方法</p>
*/
@OnClose
public void onClose(Session closeSession,CloseReason reason) {
log.info(reason.toString());
for(String key:bizSession.keySet()){
Session session = bizSession.get(key);
if(session.equals(closeSession)){
bizSession.remove(key);
}
}
subOnlineCount(); //在线数减1
log.info("有一连接关闭!当前在线人数为" + getOnlineCount());
}
/**
* <p>收到客户端消息后调用的方法</p>
*
* @param message 客户端发送过来的消息*/
@OnMessage
public void onMessage(String message, Session mySession) {
log.info("来自客户端的消息:" + message);
try {
sendMessage(message, mySession);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
*
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("发生错误");
for(String key:bizSession.keySet()){
Session sessionc = bizSession.get(key);
if(session.equals(sessionc)){
bizSession.remove(key);
}
}
error.printStackTrace();
}
public void sendMessage(String message,Session session) throws IOException {
try {
if(session.isOpen()){//该session如果已被删除,则不执行发送请求,防止报错
session.getBasicRemote().sendText(message);
}
} catch (IOException e) {
e.printStackTrace();
}
}
public int getOnlineCount() {//获取当前值
return onlineCount.get();
}
public int addOnlineCount() {//加1
return onlineCount.getAndIncrement();
}
public int subOnlineCount() {//减1
return onlineCount.getAndDecrement();
}
}
这里configurator指定了端点配置器类,端点类可以使用握手请求对象来访问初始HTTP请求的详细信息,获取HttpSession
对象
,不需要的话可以去掉
HttpSessionConfigurator 类
public class HttpSessionConfigurator extends Configurator {
@Override
public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
HttpSession httpSession = (HttpSession) request.getHttpSession();
sec.getUserProperties().put(HttpSession.class.getName(), httpSession);
}
}
以下配置是避免获取httpsession出现空指针的情况,事先带上
@Component
public class RequestListener implements ServletRequestListener {
@Override
public void requestDestroyed(ServletRequestEvent sre) {
// TODO Auto-generated method stub
ServletRequestListener.super.requestDestroyed(sre);
}
@Override
public void requestInitialized(ServletRequestEvent sre) {
//将所有request请求都携带上httpSession
HttpServletRequest req = (HttpServletRequest) sre.getServletRequest();
req.getSession();
// ((HttpServletRequest) sre.getServletRequest()).getSession();
}
}
放入监听
@SpringBootApplication
public class SpringbootWebsocketApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootWebsocketApplication.class, args);
}
@Autowired
private RequestListener requestListener;
@Bean
public ServletListenerRegistrationBean<RequestListener> servletListenerRegistrationBean() {
ServletListenerRegistrationBean<RequestListener> servletListenerRegistrationBean = new ServletListenerRegistrationBean<>();
servletListenerRegistrationBean.setListener(requestListener);
return servletListenerRegistrationBean;
}
}
Heartbeat类
public class Heartbeat implements Runnable {
private Logger log = LoggerFactory.getLogger(Heartbeat.class);
private Session session;
public Heartbeat(Session session) {
super();
this.session = session;
}
public Heartbeat() {
super();
}
@Override
public void run() {
log.info("服务端开启发送心跳模式");
int i = 0;
while(session.isOpen()) {
try {
String uuid = String.format("%04d", i++)+":the websoket heart is exist 3s";
log.info(uuid);
session.getBasicRemote().sendText(uuid);
Thread.sleep(1000*3);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
3.Controller控制层
@Controller
@Slf4j
public class DemoController {
@GetMapping("demo")
@ResponseBody
public String demo() {
return "this is a demo~";
}
@RequestMapping("/")
public String index(Model model,HttpServletRequest request) {
HttpSession session = request.getSession();
model.addAttribute("sid", session.getId());
log.info("初始化websocket页面~");
return "index";
}
}
4.html
<%@ page language="java" contentType="text/html; charset=utf-8" pageEncoding="utf-8"%>
<%
String path = request.getContextPath();
String basepath = request.getScheme()+"://"+request.getServerName()+":"+request.getServerPort()+path+"/";
%>
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
<title>websocket页面</title>
</head>
<script type="text/javascript" src="https://cdn.bootcss.com/jquery/3.3.1/jquery.min.js"></script>
<script type="text/javascript">
var websocket = null;
socket();//初始化
function socket(){
//判断当前浏览器是否支持WebSocket
if ('WebSocket' in window) {
websocket = new WebSocket("ws://localhost:8080/websocket/${sid}");
}
else {
alert('Not support websocket');
}
//连接发生错误的回调方法
websocket.onerror = function(){
setMessage("连接发生错误");
};
//连接成功建立的回调方法
websocket.onopen = function(event){
console.log("连接成功建立");
}
//接收到消息的回调方法
websocket.onmessage = function(event){
setMessage(event.data);
}
//连接关闭的回调方法
websocket.onclose = function(code,reason){
setMessage("连接关闭");
}
//监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
window.onbeforeunload = function(){
websocket.close();
}
}
//将消息显示在网页上
function setMessage(message){
$('#message').append(message+"\n");
}
function sendMessage(){
var msg = $('#content').val();
websocket.send(msg);
}
</script>
<body>
<div>
<table>
<tr>
<td><input type="button" value="发送消息" onclick="sendMessage()"/></td>
<td>websocket服务端返回信息:</td>
</tr>
<tr>
<td>
<textarea rows="50" cols="100" id="content"></textarea>
</td>
<td>
<textarea rows="50" cols="100" readonly="readonly" id="message"></textarea>
</td>
</tr>
</table>
</div>
</body>
</html>