From c0c9bc640216a68f97de6dae09decdfcd0742b26 Mon Sep 17 00:00:00 2001 From: ZGGSONG Date: Sun, 10 Apr 2022 16:31:51 +0800 Subject: [PATCH] =?UTF-8?q?=E7=94=A8websocket=E9=80=9A=E7=9F=A5PC=E6=89=8B?= =?UTF-8?q?=E6=9C=BA=E4=B8=8A=E4=BC=A0=E6=96=87=E4=BB=B6=E4=BA=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/config.go | 5 ++ go.mod | 1 + go.sum | 2 + main.go | 3 +- server/server.go | 22 ++++-- server/ws/client.go | 131 +++++++++++++++++++++++++++++++++++ server/ws/http_controller.go | 35 ++++++++++ server/ws/hub.go | 49 +++++++++++++ 8 files changed, 240 insertions(+), 8 deletions(-) create mode 100644 config/config.go create mode 100644 server/ws/client.go create mode 100644 server/ws/http_controller.go create mode 100644 server/ws/hub.go diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..2689e8d --- /dev/null +++ b/config/config.go @@ -0,0 +1,5 @@ +package config + +func GetPort() string { + return "27149" +} diff --git a/go.mod b/go.mod index 332a599..da3e46c 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.17 require ( github.com/gin-gonic/gin v1.7.7 github.com/google/uuid v1.3.0 + github.com/gorilla/websocket v1.5.0 github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e ) diff --git a/go.sum b/go.sum index b91cec3..1b4f41f 100644 --- a/go.sum +++ b/go.sum @@ -18,6 +18,8 @@ github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaW github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/json-iterator/go v1.1.9 h1:9yzud/Ht36ygwatGx56VwCZtlI/2AD15T1X2sjSuGns= github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y= diff --git a/main.go b/main.go index b5830e1..78e4171 100644 --- a/main.go +++ b/main.go @@ -5,6 +5,7 @@ import ( "os/exec" "os/signal" + "github.com/zggsong/FileSync/config" "github.com/zggsong/FileSync/server" ) @@ -21,7 +22,7 @@ func main() { func startBrowser() *exec.Cmd { chromePath := "C:\\Program Files\\Google\\Chrome\\Application\\chrome.exe" - cmd := exec.Command(chromePath, "--app=http://127.0.0.1:27149/static/index.html") + cmd := exec.Command(chromePath, "--app=http://127.0.0.1:"+config.GetPort()+"/static/index.html") cmd.Start() return cmd } diff --git a/server/server.go b/server/server.go index 4de6e3c..3f85efb 100644 --- a/server/server.go +++ b/server/server.go @@ -8,20 +8,28 @@ import ( "strings" "github.com/gin-gonic/gin" - controller "github.com/zggsong/FileSync/server/controller" + "github.com/zggsong/FileSync/config" + c "github.com/zggsong/FileSync/server/controller" + "github.com/zggsong/FileSync/server/ws" ) //go:embed frontend/dist/* var FS embed.FS func Run() { + hub := ws.NewHub() + go hub.Run() + gin.SetMode(gin.ReleaseMode) router := gin.Default() - router.POST("/api/v1/texts", controller.TextsController) - router.GET("/api/v1/addresses", controller.AddressesController) - router.GET("/api/v1/qrcodes", controller.QrcodesController) - router.GET("/uploads/:path", controller.UploadsController) - router.POST("/api/v1/files", controller.FilesController) + router.POST("/api/v1/files", c.FilesController) + router.POST("/api/v1/texts", c.TextsController) + router.GET("/api/v1/qrcodes", c.QrcodesController) + router.GET("/uploads/:path", c.UploadsController) + router.GET("/api/v1/addresses", c.AddressesController) + router.GET("/ws", func(c *gin.Context) { + ws.HttpController(c, hub) + }) staticFiles, _ := fs.Sub(FS, "frontend/dist") router.StaticFS("/static", http.FS(staticFiles)) router.NoRoute(func(c *gin.Context) { @@ -41,5 +49,5 @@ func Run() { c.Status(http.StatusNotFound) } }) - router.Run(":27149") + router.Run(":" + config.GetPort()) } diff --git a/server/ws/client.go b/server/ws/client.go new file mode 100644 index 0000000..e81ac87 --- /dev/null +++ b/server/ws/client.go @@ -0,0 +1,131 @@ +package ws + +import ( + "bytes" + "log" + "time" + + "github.com/gorilla/websocket" +) + +const ( + // Time allowed to write a message to the peer. + writeWait = 10 * time.Second + + // Time allowed to read the next pong message from the peer. + pongWait = 60 * time.Second + + // Send pings to peer with this period. Must be less than pongWait. + pingPeriod = (pongWait * 9) / 10 + + // Maximum message size allowed from peer. + maxMessageSize = 512 +) + +var ( + newline = []byte{'\n'} + space = []byte{' '} +) + +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, +} + +// Client is a middleman between the websocket connection and the hub. +type Client struct { + hub *Hub + + // The websocket connection. + conn *websocket.Conn + + // Buffered channel of outbound messages. + send chan []byte +} + +// readPump pumps messages from the websocket connection to the hub. +// +// The application runs readPump in a per-connection goroutine. The application +// ensures that there is at most one reader on a connection by executing all +// reads from this goroutine. +func (c *Client) readPump() { + defer func() { + c.hub.unregister <- c + c.conn.Close() + }() + c.conn.SetReadLimit(maxMessageSize) + c.conn.SetReadDeadline(time.Now().Add(pongWait)) + c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil }) + for { + _, message, err := c.conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + log.Printf("error: %v", err) + } + break + } + message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1)) + c.hub.broadcast <- message + } +} + +// writePump pumps messages from the hub to the websocket connection. +// +// A goroutine running writePump is started for each connection. The +// application ensures that there is at most one writer to a connection by +// executing all writes from this goroutine. +func (c *Client) writePump() { + ticker := time.NewTicker(pingPeriod) + defer func() { + ticker.Stop() + c.conn.Close() + }() + for { + select { + case message, ok := <-c.send: + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if !ok { + c.conn.WriteMessage(websocket.CloseMessage, []byte{}) + return + } + + w, err := c.conn.NextWriter(websocket.TextMessage) + if err != nil { + return + } + w.Write(message) + + // Add queued chat messages to the current websocket message. + n := len(c.send) + for i := 0; i < n; i++ { + w.Write(newline) + w.Write(<-c.send) + } + + if err := w.Close(); err != nil { + return + } + case <-ticker.C: + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil { + return + } + } + } +} + +// 在http_controller中调用 +// serveWs handles websocket requests from the peer. +// func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) { +// conn, err := upgrader.Upgrade(w, r, nil) +// if err != nil { +// log.Println(err) +// return +// } +// client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)} +// client.hub.register <- client +// // Allow collection of memory referenced by the caller by doing all work in +// // new goroutines. +// go client.writePump() +// go client.readPump() +// } diff --git a/server/ws/http_controller.go b/server/ws/http_controller.go new file mode 100644 index 0000000..2597848 --- /dev/null +++ b/server/ws/http_controller.go @@ -0,0 +1,35 @@ +package ws + +import ( + "log" + "net/http" + + "github.com/gin-gonic/gin" + "github.com/gorilla/websocket" +) + +var wsupgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool { + return true + }, +} + +func wshandler(hub *Hub, w http.ResponseWriter, r *http.Request) { + conn, err := wsupgrader.Upgrade(w, r, nil) + if err != nil { + log.Println(err) + return + } + client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)} + client.hub.register <- client + + // Allow collection of memory referenced by the caller by doing all work in + // new goroutines. + go client.writePump() + go client.readPump() +} +func HttpController(c *gin.Context, hub *Hub) { + wshandler(hub, c.Writer, c.Request) +} diff --git a/server/ws/hub.go b/server/ws/hub.go new file mode 100644 index 0000000..4dc454a --- /dev/null +++ b/server/ws/hub.go @@ -0,0 +1,49 @@ +package ws + +import ( + "sync" +) + +// Hub maintains the set of active clients and broadcasts messages to the +// clients. +type Hub struct { + clients map[*Client]bool + broadcast chan []byte + register chan *Client + unregister chan *Client +} + +func NewHub() *Hub { + return &Hub{ + broadcast: make(chan []byte), + register: make(chan *Client), + unregister: make(chan *Client), + clients: make(map[*Client]bool), + } +} + +var once sync.Once +var singleton *Hub + +func (h *Hub) Run() { + for { + select { + case client := <-h.register: + h.clients[client] = true + case client := <-h.unregister: + if _, ok := h.clients[client]; ok { + delete(h.clients, client) + close(client.send) + } + case message := <-h.broadcast: + for client := range h.clients { + select { + case client.send <- message: + default: + close(client.send) + delete(h.clients, client) + } + } + } + } +}