websocket-main.service.ts 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. import { Injectable } from "@angular/core";
  2. import { Subject, Observable, from } from "rxjs";
  3. import * as Atmosphere from "atmosphere.js";
  4. @Injectable({
  5. providedIn: "root",
  6. })
  7. export class WebsocketMainService {
  8. constructor() {}
  9. private lockReconnect = false; //避免ws重复连接
  10. ws; //定义websocket
  11. private url = ""; //ws连接地址
  12. private urlParams; //ws连接传参
  13. private isHandler = false; //是否手动
  14. private subject;
  15. // 连接websocket
  16. connectWs(url, data): Observable<any> {
  17. this.url = url;
  18. this.urlParams = data;
  19. this.subject = new Subject<any>();
  20. let request: any = {
  21. url,
  22. contentType: "application/json",
  23. shared: true, // 标签共享
  24. trackMessageLength: true, //校验数据完整性
  25. transport: "websocket",
  26. };
  27. request.onOpen = (response) => {
  28. console.log("ws连接成功" + new Date().toLocaleString());
  29. this.heartCheck.reset().start(this);
  30. this.ws.push(JSON.stringify(data));
  31. };
  32. request.onMessage = (response) => {
  33. this.heartCheck.reset().start(this);
  34. let message = response.responseBody;
  35. try {
  36. // alert(message);
  37. console.log("收到消息" + message);
  38. if (message !== "pong") {
  39. this.subject.next(JSON.parse(message));
  40. }
  41. } catch (e) {
  42. console.log("This doesn't look like a valid JSON: ", message);
  43. return;
  44. }
  45. };
  46. request.onClose = (response) => {
  47. console.log("ws连接关闭" + new Date().toLocaleString(),this.isHandler);
  48. this.heartCheck.reset();
  49. if (!this.isHandler) {
  50. this.reconnect(this.url, this.urlParams);
  51. }
  52. };
  53. request.onError = (response) => {
  54. console.log("ws连接错误",this.isHandler);
  55. this.heartCheck.reset();
  56. if (!this.isHandler) {
  57. this.reconnect(this.url, this.urlParams);
  58. }
  59. };
  60. this.ws = Atmosphere.subscribe(request);
  61. return this.subject.asObservable();
  62. }
  63. //断线重连
  64. private reconnect(url, data) {
  65. if (this.lockReconnect) return;
  66. this.lockReconnect = true;
  67. setTimeout(() => {
  68. //没连接上会一直重连,设置延迟避免请求过多
  69. this.connectWs(url, data);
  70. this.lockReconnect = false;
  71. }, 15000);
  72. }
  73. private heartCheck = {
  74. timeout: 60000, //1分钟发一次心跳
  75. timeoutObj: null,
  76. serverTimeoutObj: null,
  77. reset: function () {
  78. clearTimeout(this.timeoutObj);
  79. clearTimeout(this.serverTimeoutObj);
  80. return this;
  81. },
  82. start: function (_this) {
  83. this.timeoutObj = setTimeout(() => {
  84. //这里发送一个心跳,后端收到后,返回一个心跳消息,
  85. //onmessage拿到返回的心跳就说明连接正常
  86. _this.ws.push("ping");
  87. this.serverTimeoutObj = setTimeout(() => {
  88. //如果超过一定时间还没重置,说明后端主动断开了
  89. _this.closeWs(); //如果onclose会执行reconnect,我们执行ws.close()就行了.如果直接执行reconnect 会触发onclose导致重连两次
  90. }, this.timeout);
  91. }, this.timeout);
  92. },
  93. };
  94. // 断开websocket
  95. closeWs(flag: boolean = false) {
  96. this.isHandler = flag;
  97. this.ws.close();
  98. }
  99. }