yjs 自定义 同步

yjs 自定义 同步

yjs 自带的 y-websocket 不能自定义数据格式。 简单的实现了一下用 socket.io 传输 yjs 的数据

先看一下客户端provider的代码

Yjs学习-CSDN博客

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import { ObservableV2 } from "lib0/observable.js";
import * as encoding from 'lib0/encoding'
import * as decoding from 'lib0/decoding'
import * as Y from 'yjs';
import io, { Socket } from 'socket.io-client'
import * as syncProtocol from 'y-protocols/sync'


export const messageSync = 0
export const messageQueryAwareness = 3
export const messageAwareness = 1
export const messageAuth = 2

export class SocketIOProvide {
   private doc: Y.Doc
   room: string
   socket: Socket
   constructor(doc: Y.Doc, room: string) {
       this.doc = doc;
       this.room = room;
       this.socket = io('ws://localhost:3001', {
           transports: ['websocket']
      }) // 创建数据库连接
       const instance = this
       this.doc.on('update', (update) => { // 文档更新
           const encoder = encoding.createEncoder()
           encoding.writeVarUint(encoder, messageSync)
           syncProtocol.writeUpdate(encoder, update)

           this.socket.emit('document-update', {
               room,
               update: encoding.toUint8Array(encoder)
          });
      })

       this.socket.on('document-update', (update) => { // 接收更新的数据
           const data = new Uint8Array(update);
           const decoder = decoding.createDecoder(data)
           const encoder = encoding.createEncoder()

           const messageType = decoding.readVarUint(decoder)
           const syncMessageType = syncProtocol.readSyncMessage( // 解码后同步到文档上面
               decoder,
               encoder,
               instance.doc,
               instance
            )
      })
  }

}

服务端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import { WebSocketGateway, SubscribeMessage, MessageBody, OnGatewayConnection, OnGatewayDisconnect, WebSocketServer, ConnectedSocket } from '@nestjs/websockets';
import { Request } from 'express';
import { Server, Socket } from 'socket.io'
import * as Y from 'yjs';



@WebSocketGateway(3001, {
   cors: true,
   transport: ['websocket']
})
export class YjsGateway {

   @WebSocketServer()
   server: Server;

   private documents: { [room: string]: Y.Doc } = {};


   @SubscribeMessage('sync-document')
   handleSyncDocument(@MessageBody() { room, state }: { room: string; state: Uint8Array }) {
       console.log('sync-document')
       // if (!this.documents[room]) {
       //   this.documents[room] = new Y.Doc();
       // }
       // Y.applyUpdate(this.documents[room], state);
       this.server.emit('sync-document', state);
  }

   @SubscribeMessage('document-update')
   handleDocumentUpdate(@MessageBody() { room, update }: { room: string; update: Uint8Array }) {
       console.log(Object.prototype.toString.call(update));
       console.log('document-update', room, update)
       // if (!this.documents[room]) {
       //   this.documents[room] = new Y.Doc();
       // }
       // Y.applyUpdate(this.documents[room], update);
       this.server.emit('document-update', update); // 直接传原始数据过去
  }

   @SubscribeMessage('join-room')
   handleJoinRoom(@MessageBody() room: string) {
       console.log("🚀 ~ YjsGateway ~ handleJoinRoom ~ room:", room);
       // this.server.socketsJoin(room);
       // if (this.documents[room]) {
       //   const state = Y.encodeStateAsUpdate(this.documents[room]);
       //   this.server.to(room).emit('sync-document', state);
       // }
  }
}