import { Injectable } from "@angular/core"; import { Subject, Observable, from } from "rxjs"; import * as Atmosphere from "atmosphere.js"; @Injectable({ providedIn: "root", }) export class WebsocketMainService { constructor() {} private lockReconnect = false; //避免ws重复连接 ws; //定义websocket private url = ""; //ws连接地址 private urlParams; //ws连接传参 private isHandler = false; //是否手动 private subject; // 连接websocket connectWs(url, data): Observable { this.url = url; this.urlParams = data; this.subject = new Subject(); let request: any = { url, contentType: "application/json", shared: true, // 标签共享 trackMessageLength: true, //校验数据完整性 transport: "websocket", }; request.onOpen = (response) => { console.log("ws连接成功" + new Date().toLocaleString()); this.heartCheck.reset().start(this); this.ws.push(JSON.stringify(data)); }; request.onMessage = (response) => { this.heartCheck.reset().start(this); let message = response.responseBody; try { // alert(message); console.log("收到消息" + message); if (message !== "pong") { this.subject.next(JSON.parse(message)); } } catch (e) { console.log("This doesn't look like a valid JSON: ", message); return; } }; request.onClose = (response) => { console.log("ws连接关闭" + new Date().toLocaleString(),this.isHandler); this.heartCheck.reset(); if (!this.isHandler) { this.reconnect(this.url, this.urlParams); } }; request.onError = (response) => { console.log("ws连接错误",this.isHandler); this.heartCheck.reset(); if (!this.isHandler) { this.reconnect(this.url, this.urlParams); } }; this.ws = Atmosphere.subscribe(request); return this.subject.asObservable(); } //断线重连 private reconnect(url, data) { if (this.lockReconnect) return; this.lockReconnect = true; setTimeout(() => { //没连接上会一直重连,设置延迟避免请求过多 this.connectWs(url, data); this.lockReconnect = false; }, 15000); } private heartCheck = { timeout: 60000, //1分钟发一次心跳 timeoutObj: null, serverTimeoutObj: null, reset: function () { clearTimeout(this.timeoutObj); clearTimeout(this.serverTimeoutObj); return this; }, start: function (_this) { this.timeoutObj = setTimeout(() => { //这里发送一个心跳,后端收到后,返回一个心跳消息, //onmessage拿到返回的心跳就说明连接正常 _this.ws.push("ping"); this.serverTimeoutObj = setTimeout(() => { //如果超过一定时间还没重置,说明后端主动断开了 _this.closeWs(); //如果onclose会执行reconnect,我们执行ws.close()就行了.如果直接执行reconnect 会触发onclose导致重连两次 }, this.timeout); }, this.timeout); }, }; // 断开websocket closeWs(flag: boolean = false) { this.isHandler = flag; this.ws.close(); } }