Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 39 additions & 7 deletions znet/connection.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package znet

import (
"bufio"
"context"
"encoding/hex"
"errors"
Expand All @@ -26,6 +27,9 @@ type Connection struct {
// // The socket TCP socket of the current connection(当前连接的socket TCP套接字)
conn net.Conn

// The buffer writer of the current connection(当前连接的写缓冲)
bufWriter *bufio.Writer

// The ID of the current connection, also known as SessionID, globally unique, used by server Connection
// uint64 range: 0~18,446,744,073,709,551,615
// This is the maximum number of connID theoretically supported by the process
Expand Down Expand Up @@ -121,6 +125,7 @@ func newServerConn(server ziface.IServer, conn net.Conn, connID uint64) ziface.I
// Initialize Conn properties
c := &Connection{
conn: conn,
bufWriter: bufio.NewWriterSize(conn, 16*1024),
connID: connID,
connIdStr: strconv.FormatUint(connID, 10),
startWriterFlag: 0,
Expand Down Expand Up @@ -158,6 +163,7 @@ func newServerConn(server ziface.IServer, conn net.Conn, connID uint64) ziface.I
func newClientConn(client ziface.IClient, conn net.Conn) ziface.IConnection {
c := &Connection{
conn: conn,
bufWriter: bufio.NewWriterSize(conn, 16*1024),
connID: 0, // client ignore
connIdStr: "", // client ignore
startWriterFlag: 0,
Expand Down Expand Up @@ -186,20 +192,29 @@ func newClientConn(client ziface.IClient, conn net.Conn) ziface.IConnection {
// (写消息Goroutine, 用户将数据发送给客户端)
func (c *Connection) StartWriter() {
zlog.Ins().InfoF("Writer Goroutine is running")
defer zlog.Ins().InfoF("%s [conn Writer exit!]", c.RemoteAddr().String())

ticker := time.NewTicker(10 * time.Millisecond)
defer func() {
zlog.Ins().InfoF("%s [conn Writer exit!]", c.RemoteAddr().String())
ticker.Stop()
c.Flush()
}()
for {
select {
case <-ticker.C:
err := c.Flush()
if err != nil {
zlog.Ins().ErrorF("Flush Buff Data error: %v Conn Writer exit", err)
return
}
case data, ok := <-c.msgBuffChan:
if ok {
if err := c.Send(data); err != nil {
if err := c.SendBuf(data); err != nil {
zlog.Ins().ErrorF("Send Buff Data error:, %s Conn Writer exit", err)
break
return
}

} else {
zlog.Ins().ErrorF("msgBuffChan is Closed")
break
return
}
case <-c.ctx.Done():
return
Expand Down Expand Up @@ -348,17 +363,34 @@ func (c *Connection) LocalAddr() net.Addr {
return c.conn.LocalAddr()
}

func (c *Connection) Flush() error {
if c.isClosed() == true {
return errors.New("connection closed when flush data")
}
return c.bufWriter.Flush()
}

func (c *Connection) Send(data []byte) error {
if c.isClosed() == true {
return errors.New("connection closed when send msg")
}

_, err := c.conn.Write(data)
if err != nil {
zlog.Ins().ErrorF("SendMsg err data = %+v, err = %+v", data, err)
return err
}
return nil
}

func (c *Connection) SendBuf(data []byte) error {
if c.isClosed() == true {
return errors.New("connection closed when send msg")
}
_, err := c.bufWriter.Write(data)
if err != nil {
zlog.Ins().ErrorF("SendMsg err data = %+v, err = %+v", data, err)
return err
}
return nil
}

Expand Down
Loading