123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- import { Injectable } from "@angular/core";
- import { Subject, Observable, from } from "rxjs";
- import * as Atmosphere from "atmosphere.js";
- @Injectable({
- providedIn: "root",
- })
- export class WebsocketMainService {
- constructor() {}
- private lockReconnect = false; //避免ws重复连接
- ws; //定义websocket
- private url = ""; //ws连接地址
- private urlParams; //ws连接传参
- private isHandler = false; //是否手动
- private subject;
- // 连接websocket
- connectWs(url, data): Observable<any> {
- this.url = url;
- this.urlParams = data;
- this.subject = new Subject<any>();
- let request: any = {
- url,
- contentType: "application/json",
- shared: true, // 标签共享
- trackMessageLength: true, //校验数据完整性
- transport: "websocket",
- };
- request.onOpen = (response) => {
- console.log("ws连接成功" + new Date().toLocaleString());
- this.heartCheck.reset().start(this);
- this.ws.push(JSON.stringify(data));
- };
- request.onMessage = (response) => {
- this.heartCheck.reset().start(this);
- let message = response.responseBody;
- try {
- // alert(message);
- console.log("收到消息" + message);
- if (message !== "pong") {
- this.subject.next(JSON.parse(message));
- }
- } catch (e) {
- console.log("This doesn't look like a valid JSON: ", message);
- return;
- }
- };
- request.onClose = (response) => {
- console.log("ws连接关闭" + new Date().toLocaleString(),this.isHandler);
- this.heartCheck.reset();
- if (!this.isHandler) {
- this.reconnect(this.url, this.urlParams);
- }
- };
- request.onError = (response) => {
- console.log("ws连接错误",this.isHandler);
- this.heartCheck.reset();
- if (!this.isHandler) {
- this.reconnect(this.url, this.urlParams);
- }
- };
- this.ws = Atmosphere.subscribe(request);
- return this.subject.asObservable();
- }
- //断线重连
- private reconnect(url, data) {
- if (this.lockReconnect) return;
- this.lockReconnect = true;
- setTimeout(() => {
- //没连接上会一直重连,设置延迟避免请求过多
- this.connectWs(url, data);
- this.lockReconnect = false;
- }, 15000);
- }
- private heartCheck = {
- timeout: 60000, //1分钟发一次心跳
- timeoutObj: null,
- serverTimeoutObj: null,
- reset: function () {
- clearTimeout(this.timeoutObj);
- clearTimeout(this.serverTimeoutObj);
- return this;
- },
- start: function (_this) {
- this.timeoutObj = setTimeout(() => {
- //这里发送一个心跳,后端收到后,返回一个心跳消息,
- //onmessage拿到返回的心跳就说明连接正常
- _this.ws.push("ping");
- this.serverTimeoutObj = setTimeout(() => {
- //如果超过一定时间还没重置,说明后端主动断开了
- _this.closeWs(); //如果onclose会执行reconnect,我们执行ws.close()就行了.如果直接执行reconnect 会触发onclose导致重连两次
- }, this.timeout);
- }, this.timeout);
- },
- };
- // 断开websocket
- closeWs(flag: boolean = false) {
- this.isHandler = flag;
- this.ws.close();
- }
- }
|