yjs 自定义 同步
John Doeyjs 自定义 同步
yjs 自带的 y-websocket 不能自定义数据格式。 简单的实现了一下用 socket.io 传输 yjs 的数据
先看一下客户端provider的代码
Yjs学习-CSDN博客
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| import { ObservableV2 } from "lib0/observable.js"; import * as encoding from 'lib0/encoding' import * as decoding from 'lib0/decoding' import * as Y from 'yjs'; import io, { Socket } from 'socket.io-client' import * as syncProtocol from 'y-protocols/sync' export const messageSync = 0 export const messageQueryAwareness = 3 export const messageAwareness = 1 export const messageAuth = 2 export class SocketIOProvide { private doc: Y.Doc room: string socket: Socket constructor(doc: Y.Doc, room: string) { this.doc = doc; this.room = room; this.socket = io('ws://localhost:3001', { transports: ['websocket'] }) // 创建数据库连接 const instance = this this.doc.on('update', (update) => { // 文档更新 const encoder = encoding.createEncoder() encoding.writeVarUint(encoder, messageSync) syncProtocol.writeUpdate(encoder, update) this.socket.emit('document-update', { room, update: encoding.toUint8Array(encoder) }); }) this.socket.on('document-update', (update) => { // 接收更新的数据 const data = new Uint8Array(update); const decoder = decoding.createDecoder(data) const encoder = encoding.createEncoder() const messageType = decoding.readVarUint(decoder) const syncMessageType = syncProtocol.readSyncMessage( // 解码后同步到文档上面 decoder, encoder, instance.doc, instance ) }) } }
|
服务端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| import { WebSocketGateway, SubscribeMessage, MessageBody, OnGatewayConnection, OnGatewayDisconnect, WebSocketServer, ConnectedSocket } from '@nestjs/websockets'; import { Request } from 'express'; import { Server, Socket } from 'socket.io' import * as Y from 'yjs'; @WebSocketGateway(3001, { cors: true, transport: ['websocket'] }) export class YjsGateway { @WebSocketServer() server: Server; private documents: { [room: string]: Y.Doc } = {}; @SubscribeMessage('sync-document') handleSyncDocument(@MessageBody() { room, state }: { room: string; state: Uint8Array }) { console.log('sync-document') // if (!this.documents[room]) { // this.documents[room] = new Y.Doc(); // } // Y.applyUpdate(this.documents[room], state); this.server.emit('sync-document', state); } @SubscribeMessage('document-update') handleDocumentUpdate(@MessageBody() { room, update }: { room: string; update: Uint8Array }) { console.log(Object.prototype.toString.call(update)); console.log('document-update', room, update) // if (!this.documents[room]) { // this.documents[room] = new Y.Doc(); // } // Y.applyUpdate(this.documents[room], update); this.server.emit('document-update', update); // 直接传原始数据过去 } @SubscribeMessage('join-room') handleJoinRoom(@MessageBody() room: string) { console.log("🚀 ~ YjsGateway ~ handleJoinRoom ~ room:", room); // this.server.socketsJoin(room); // if (this.documents[room]) { // const state = Y.encodeStateAsUpdate(this.documents[room]); // this.server.to(room).emit('sync-document', state); // } } }
|