Skip to content

Commit

Permalink
fix server name
Browse files Browse the repository at this point in the history
  • Loading branch information
thinkgos committed Nov 3, 2019
1 parent bd64e9a commit 63a8cfa
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 61 deletions.
16 changes: 8 additions & 8 deletions cs104/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,14 @@ func (sf *Server) ListenAndServer(addr string) {
sf.wg.Add(1)
go func() {
sess := &SrvSession{
Config: sf.conf,
params: sf.params,
handler: sf.handler,
conn: conn,
in: make(chan []byte, 1024),
out: make(chan []byte, 1024),
rawRcv: make(chan []byte, 1024),
rawSend: make(chan []byte, 1024), // may not block!
Config: sf.conf,
params: sf.params,
handler: sf.handler,
conn: conn,
rcvASDU: make(chan []byte, 1024),
sendASDU: make(chan []byte, 1024),
rcvRaw: make(chan []byte, 1024),
sendRaw: make(chan []byte, 1024), // may not block!

Clog: sf.Clog,
}
Expand Down
98 changes: 49 additions & 49 deletions cs104/server_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ type SrvSession struct {
conn net.Conn
handler ServerHandlerInterface

in chan []byte // for received asdu
out chan []byte // for send asdu
rawRcv chan []byte // for recvLoop raw cs104 frame
rawSend chan []byte // for sendLoop raw cs104 frame
rcvASDU chan []byte // for received asdu
sendASDU chan []byte // for send asdu
rcvRaw chan []byte // for recvLoop raw cs104 frame
sendRaw chan []byte // for sendLoop raw cs104 frame

// see subclass 5.1 — Protection against loss and duplication of messages
seqNoOut uint16 // sequence number of next outbound I-frame
ackNoOut uint16 // outbound sequence number yet to be confirmed
seqNoIn uint16 // sequence number of next inbound I-frame
ackNoIn uint16 // inbound sequence number yet to be confirmed
seqNoSend uint16 // sequence number of next outbound I-frame
ackNoSend uint16 // outbound sequence number yet to be confirmed
seqNoRcv uint16 // sequence number of next inbound I-frame
ackNoRcv uint16 // inbound sequence number yet to be confirmed
// maps sendTime I-frames to their respective sequence number
pending []seqPending
//seqManage
Expand All @@ -50,7 +50,7 @@ type SrvSession struct {
ctx context.Context
}

// RecvLoop feeds t.rawRcv.
// RecvLoop feeds t.rcvRaw.
func (sf *SrvSession) recvLoop() {
sf.Debug("recvLoop started!")
defer func() {
Expand Down Expand Up @@ -105,7 +105,7 @@ func (sf *SrvSession) recvLoop() {
if rdCnt == length {
apdu := rawData[:length]
sf.Debug("RX Raw[% x]", apdu)
sf.rawRcv <- apdu
sf.rcvRaw <- apdu
}
}
}
Expand All @@ -125,19 +125,19 @@ func (sf *SrvSession) sendLoop() {
select {
case <-sf.ctx.Done():
return
case apdu := <-sf.rawSend:
case apdu := <-sf.sendRaw:
sf.Debug("TX Raw[% x]", apdu)
for wrCnt := 0; len(apdu) > wrCnt; {
byteCount, err := sf.conn.Write(apdu[wrCnt:])
if err != nil {
// See: https://github.com/golang/go/issues/4373
if err != io.EOF && err != io.ErrClosedPipe ||
strings.Contains(err.Error(), "use of closed network connection") {
sf.Error("rawSend failed, %v", err)
sf.Error("sendRaw failed, %v", err)
return
}
if e, ok := err.(net.Error); !ok || !e.Temporary() {
sf.Error("rawSend failed, %v", err)
sf.Error("sendRaw failed, %v", err)
return
}
// temporary error may be recoverable
Expand Down Expand Up @@ -184,9 +184,9 @@ func (sf *SrvSession) run(ctx context.Context) {
}()

for {
if isActive && seqNoCount(sf.ackNoOut, sf.seqNoOut) <= sf.SendUnAckLimitK {
if isActive && seqNoCount(sf.ackNoSend, sf.seqNoSend) <= sf.SendUnAckLimitK {
select {
case o := <-sf.out:
case o := <-sf.sendASDU:
sf.sendIFrame(o)
idleTimeout3Sine = time.Now()
continue
Expand All @@ -207,20 +207,20 @@ func (sf *SrvSession) run(ctx context.Context) {
return
}
// check oldest unacknowledged outbound
if sf.ackNoOut != sf.seqNoOut &&
if sf.ackNoSend != sf.seqNoSend &&
//now.Sub(sf.peek()) >= sf.SendUnAckTimeout1 {
now.Sub(sf.pending[0].sendTime) >= sf.SendUnAckTimeout1 {
sf.ackNoOut++
sf.ackNoSend++
sf.Error("fatal transmission timeout t₁")
return
}

// 确定最早发送的i-Frame是否超时,超时则回复sFrame
if sf.ackNoIn != sf.seqNoIn &&
if sf.ackNoRcv != sf.seqNoRcv &&
(now.Sub(unAckRcvSince) >= sf.RecvUnAckTimeout2 ||
now.Sub(idleTimeout3Sine) >= timeoutResolution) {
sf.sendSFrame(sf.seqNoIn)
sf.ackNoIn = sf.seqNoIn
sf.sendSFrame(sf.seqNoRcv)
sf.ackNoRcv = sf.seqNoRcv
}

// 空闲时间到,发送TestFrActive帧,保活
Expand All @@ -230,7 +230,7 @@ func (sf *SrvSession) run(ctx context.Context) {
idleTimeout3Sine = testFrAliveSendSince
}

case apdu := <-sf.rawRcv:
case apdu := <-sf.rcvRaw:
idleTimeout3Sine = time.Now() // 每收到一个i帧,S帧,U帧, 重置空闲定时器, t3
apci, asduVal := parse(apdu)
switch head := apci.(type) {
Expand All @@ -247,20 +247,20 @@ func (sf *SrvSession) run(ctx context.Context) {
sf.Warn("station not active")
break // not active, discard apdu
}
if !sf.updateAckNoOut(head.rcvSN) || head.sendSN != sf.seqNoIn {
if !sf.updateAckNoOut(head.rcvSN) || head.sendSN != sf.seqNoRcv {
sf.Error("fatal incoming acknowledge either earlier than previous or later than sendTime")
return
}

sf.in <- asduVal
if sf.ackNoIn == sf.seqNoIn { // first unacked
sf.rcvASDU <- asduVal
if sf.ackNoRcv == sf.seqNoRcv { // first unacked
unAckRcvSince = time.Now()
}

sf.seqNoIn = (sf.seqNoIn + 1) & 32767
if seqNoCount(sf.ackNoIn, sf.seqNoIn) >= sf.RecvUnAckLimitW {
sf.sendSFrame(sf.seqNoIn)
sf.ackNoIn = sf.seqNoIn
sf.seqNoRcv = (sf.seqNoRcv + 1) & 32767
if seqNoCount(sf.ackNoRcv, sf.seqNoRcv) >= sf.RecvUnAckLimitW {
sf.sendSFrame(sf.seqNoRcv)
sf.ackNoRcv = sf.seqNoRcv
}

case uAPCI:
Expand Down Expand Up @@ -302,7 +302,7 @@ func (sf *SrvSession) handlerLoop() {
select {
case <-sf.ctx.Done():
return
case rawAsdu := <-sf.in:
case rawAsdu := <-sf.rcvASDU:
asduPack := asdu.NewEmptyASDU(sf.params)
if err := asduPack.UnmarshalBinary(rawAsdu); err != nil {
sf.Error("asdu UnmarshalBinary failed,%+v", err)
Expand All @@ -329,19 +329,19 @@ func (sf *SrvSession) connectStatus() uint32 {
}

func (sf *SrvSession) cleanUp() {
sf.ackNoIn = 0
sf.ackNoOut = 0
sf.seqNoIn = 0
sf.seqNoOut = 0
sf.ackNoRcv = 0
sf.ackNoSend = 0
sf.seqNoRcv = 0
sf.seqNoSend = 0
sf.pending = nil
// clear sending chan buffer
loop:
for {
select {
case <-sf.rawSend:
case <-sf.rawRcv:
case <-sf.in:
case <-sf.out:
case <-sf.sendRaw:
case <-sf.rcvRaw:
case <-sf.rcvASDU:
case <-sf.sendASDU:
default:
break loop
}
Expand All @@ -358,37 +358,37 @@ func seqNoCount(nextAckNo, nextSeqNo uint16) uint16 {

func (sf *SrvSession) sendSFrame(rcvSN uint16) {
sf.Debug("TX sFrame %v", sAPCI{rcvSN})
sf.rawSend <- newSFrame(rcvSN)
sf.sendRaw <- newSFrame(rcvSN)
}

func (sf *SrvSession) sendUFrame(which byte) {
sf.Debug("TX uFrame %v", uAPCI{which})
sf.rawSend <- newUFrame(which)
sf.sendRaw <- newUFrame(which)
}

func (sf *SrvSession) sendIFrame(asdu1 []byte) {
seqNo := sf.seqNoOut
seqNo := sf.seqNoSend

iframe, err := newIFrame(seqNo, sf.seqNoIn, asdu1)
iframe, err := newIFrame(seqNo, sf.seqNoRcv, asdu1)
if err != nil {
return
}
sf.ackNoIn = sf.seqNoIn
sf.seqNoOut = (seqNo + 1) & 32767
sf.ackNoRcv = sf.seqNoRcv
sf.seqNoSend = (seqNo + 1) & 32767

//sf.push(seqPending{seqNo & 32767, time.Now()})
sf.pending = append(sf.pending, seqPending{seqNo & 32767, time.Now()})

sf.Debug("TX iFrame %v", iAPCI{seqNo, sf.seqNoIn})
sf.rawSend <- iframe
sf.Debug("TX iFrame %v", iAPCI{seqNo, sf.seqNoRcv})
sf.sendRaw <- iframe
}

func (sf *SrvSession) updateAckNoOut(ackNo uint16) (ok bool) {
if ackNo == sf.ackNoOut {
if ackNo == sf.ackNoSend {
return true
}
// new acks validate, ack 不能在 req seq 前面,出错
if seqNoCount(sf.ackNoOut, sf.seqNoOut) < seqNoCount(ackNo, sf.seqNoOut) {
if seqNoCount(sf.ackNoSend, sf.seqNoSend) < seqNoCount(ackNo, sf.seqNoSend) {
return false
}

Expand All @@ -400,7 +400,7 @@ func (sf *SrvSession) updateAckNoOut(ackNo uint16) (ok bool) {
}
}
//sf.confirmReception(ackNo)
sf.ackNoOut = ackNo
sf.ackNoSend = ackNo
return true
}

Expand Down Expand Up @@ -530,7 +530,7 @@ func (sf *SrvSession) Send(u *asdu.ASDU) error {
return err
}
select {
case sf.out <- data:
case sf.sendASDU <- data:
default:
return ErrBufferFulled
}
Expand Down
8 changes: 4 additions & 4 deletions cs104/server_special.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ func NewServerSpecial(conf *Config, params *asdu.Params, handler ServerHandlerIn
params: params,
handler: handler,

in: make(chan []byte, 1024),
out: make(chan []byte, 1024),
rawRcv: make(chan []byte, 1024),
rawSend: make(chan []byte, 1024), // may not block!
rcvASDU: make(chan []byte, 1024),
sendASDU: make(chan []byte, 1024),
rcvRaw: make(chan []byte, 1024),
sendRaw: make(chan []byte, 1024), // may not block!

Clog: clog.NewWithPrefix("cs104 serverSpec => "),
},
Expand Down

0 comments on commit 63a8cfa

Please sign in to comment.