[go] 1

Viewer

  1. // Contains the implementation of a LSP client.
  2.  
  3. package lsp
  4.  
  5. import (
  6.         "container/list"
  7.         "encoding/json"
  8.         //"errors"
  9.         "fmt"
  10.         "github.com/cmu440/lspnet"
  11.         "sort" // ??
  12. )
  13.  
  14. type client struct {
  15.         // TODO: implement this!
  16.         udpConn     *lspnet.UDPConn
  17.         connID      int
  18.         seqNumCli   int
  19.         seqNumSer   int
  20.         msgList     *list.List
  21.         readPayload chan []byte
  22.         writeSig    chan []byte
  23.         readAckMsg  chan *Message
  24.         readDataMsg chan *Message
  25.         readCalled  chan bool
  26.  
  27.         // for sliding window
  28.         maxUnackedMessages int
  29.         windowSize  int
  30.         unackedMessageSeqList []int // ??????
  31.         writeQueue *list.List
  32.  
  33.         // for epoch event
  34.         epochLimit int
  35.         epochMillis int
  36.         maxBackOffInterval int
  37.  
  38.         tryWrite chan *Message // type?????
  39.         canWrite chan *Message
  40.         updateWrite chan int
  41. }
  42.  
  43. // NewClient creates, initiates, and returns a new client. This function
  44. // should return after a connection with the server has been established
  45. // (i.e., the client has received an Ack message from the server in response
  46. // to its connection request), and should return a non-nil error if a
  47. // connection could not be made (i.e., if after K epochs, the client still
  48. // hasn't received an Ack message from the server in response to its K
  49. // connection requests).
  50. //
  51. // initialSeqNum is an int representing the Initial Sequence Number (ISN) this
  52. // client must use. You may assume that sequence numbers do not wrap around.
  53. //
  54. // hostport is a colon-separated string identifying the server's host address
  55. // and port number (i.e., "localhost:9999").
  56. func NewClient(hostport string, initialSeqNum int, params *Params) (Client, error) {
  57.         saddr, err := lspnet.ResolveUDPAddr("udp", hostport)
  58.         if err != nil {
  59.                 fmt.Println(err.Error())
  60.                 return nil, err
  61.         }
  62.  
  63.         udpConn, err := lspnet.DialUDP("udp", nil, saddr)
  64.         if err != nil {
  65.                 fmt.Println(err.Error())
  66.                 return nil, err
  67.         }
  68.  
  69.         msgToSer := NewConnect(initialSeqNum)
  70.         msgJson, err := json.Marshal(msgToSer)
  71.  
  72.         if err != nil {
  73.                 fmt.Println(err.Error())
  74.                 return nil, err
  75.         }
  76.  
  77.         _, err = udpConn.Write(msgJson)
  78.         if err != nil {
  79.                 fmt.Println(err.Error())
  80.                 return nil, err
  81.         }
  82.  
  83.         buf := make([]byte, 2000)
  84.  
  85.         var msgFromSer Message
  86.  
  87.         for {
  88.                 n, err := udpConn.Read(buf)
  89.                 if err != nil {
  90.                         fmt.Println(err.Error())
  91.                         return nil, err
  92.                 }
  93.                 err = json.Unmarshal(buf[:n], &msgFromSer)
  94.                 if err != nil {
  95.                         fmt.Println(err.Error())
  96.                         return nil, err
  97.                 }
  98.  
  99.                 if (msgFromSer.Type == MsgAck) && (msgFromSer.SeqNum == initialSeqNum) {
  100.                         cli := client{
  101.                                 udpConn:     udpConn,
  102.                                 connID:      msgFromSer.ConnID,
  103.                                 seqNumCli:   initialSeqNum,
  104.                                 seqNumSer:   msgFromSer.SeqNum + 1,
  105.                                 msgList:     list.New(),
  106.                                 readPayload: make(chan []byte, 1),
  107.                                 writeSig:    make(chan []byte),
  108.                                 readAckMsg:  make(chan *Message),
  109.                                 readDataMsg: make(chan *Message),
  110.                                 readCalled:  make(chan bool),
  111.                                 maxUnackedMessages: params.MaxUnackedMessages,
  112.                                 windowSize: params.WindowSize,
  113.                                 epochLimit: params.EpochLimit,
  114.                                 epochMillis: params.EpochMillis,
  115.                                 maxBackOffInterval: params.MaxBackOffInterval,
  116.                                 tryWrite:    make(chan *Message),
  117.                                 canWrite:    make(chan *Message),
  118.                                 updateWrite: make(chan int),
  119.                                 unackedMessageSeqList: []int{},
  120.                                 writeQueue:  list.New(),
  121.                         }
  122.  
  123.                         go MainRoutine(&cli)
  124.                         go ReadRoutine(&cli)
  125.                         return &cli, nil
  126.                 }
  127.         }
  128.         return nil, err
  129. }
  130.  
  131. func (*client) ConnID() int {
  132.         return c.connID
  133. }
  134.  
  135. func (*client) Read() ([]byte, error) {
  136.        
  137.         c.readCalled <- true
  138.         payload := <-c.readPayload
  139.         return payload, nil
  140. }
  141.  
  142. func (*client) Write(payload []byte) error {
  143.         c.seqNumCli++
  144.         size := len(payload)
  145.         checksum := CalculateChecksum(c.connID, c.seqNumCli, size, payload)
  146.         msgToSer := NewData(c.connID, c.seqNumCli, size, payload, checksum)
  147.  
  148.         // send message to main_routine to check whether it can write
  149.         fmt.Println("test 1")
  150.         c.tryWrite <- msgToSer
  151.         fmt.Println("test 2")
  152.         writeMsg := <- c.canWrite  // is the blocking reasonable???
  153.         fmt.Println("test 3")
  154.  
  155.         msgJson, err := json.Marshal(writeMsg)
  156.         if err != nil {
  157.                 fmt.Println(err.Error())
  158.                 return err
  159.         }
  160.  
  161.         _, err = c.udpConn.Write(msgJson)
  162.         if err != nil {
  163.                 fmt.Println(err.Error())
  164.                 return err
  165.         }
  166.  
  167.         // send mesage to main_routine to update unacked list
  168.         c.updateWrite <- c.seqNumCli
  169.         fmt.Println("test 4")
  170.  
  171.         return nil
  172. }
  173.  
  174. func (*client) Close() error {
  175.         c.udpConn.Close()
  176.         return nil
  177. }
  178.  
  179. func MainRoutine(*client) {
  180.  
  181.         readCalledSig := false
  182.         writeCalledSig := false
  183.         var curMsg *Message
  184.  
  185.         for {
  186.                 if readCalledSig == true {
  187.                         head := c.msgList.Front()
  188.                         if head != nil && head.Value.(*Message).SeqNum == c.seqNumSer {
  189.                                 c.readPayload <- head.Value.(*Message).Payload
  190.                                 c.msgList.Remove(head)
  191.                                 c.seqNumSer++
  192.                                 readCalledSig = false
  193.                         }
  194.                 }
  195.  
  196.                 if writeCalledSig == true {
  197.                         if len(c.unackedMessageSeqList) > 0{
  198.                                 if (len(c.unackedMessageSeqList) < c.maxUnackedMessages) &&  (curMsg.SeqNum < (c.unackedMessageSeqList[0] + c.windowSize -1)) {
  199.                                         c.canWrite <- curMsg
  200.                                         writeCalledSig = false
  201.                                 }
  202.                         } else { // means we can directly write
  203.                                 c.canWrite <- curMsg
  204.                                 writeCalledSig = false
  205.                         }
  206.                 }
  207.  
  208.  
  209.                 select {
  210.                 case msg := <-c.readDataMsg:
  211.                         InsertMsgToList(c, msg)
  212.                         // PrintList(c.msgList)
  213.                         WriteAck(c, msg)
  214.  
  215.                 case ackMsg := <-c.readAckMsg:
  216.                         // update unackedMessageSeqList
  217.                         if ackMsg.Type == MsgAck{
  218.                                 RemoveFromSeqList(c, ackMsg.SeqNum)
  219.                         }
  220.  
  221.                         if ackMsg.Type == MsgCAck {
  222.                                 if len(c.unackedMessageSeqList) > 0{
  223.                                         firstSeqInList := c.unackedMessageSeqList[0]
  224.  
  225.                                         for i := firstSeqInList; i <= ackMsg.SeqNum; i++ {
  226.                                                 RemoveFromSeqList(c, i)
  227.                                         }
  228.                                 }
  229.                                
  230.                         }
  231.  
  232.                 case <-c.readCalled:
  233.                         readCalledSig = true
  234.  
  235.  
  236.                 case temp := <- c.tryWrite:
  237.                         writeCalledSig = true
  238.                         curMsg = temp
  239.  
  240.  
  241.                 case num := <- c.updateWrite:
  242.                         InsertToSeqList(c, num)
  243.  
  244.                 }
  245.                
  246.         }
  247. }
  248.  
  249. func ReadRoutine(*client) {
  250.         buf := make([]byte, 2000)
  251.  
  252.         for {
  253.                 var msgFromSer Message
  254.  
  255.                 n, err := c.udpConn.Read(buf)
  256.                 if err != nil {
  257.                         fmt.Println(err.Error())
  258.                         // return err ?
  259.                 }
  260.                 err = json.Unmarshal(buf[:n], &msgFromSer)
  261.                 if err != nil {
  262.                         fmt.Println(err.Error())
  263.                         // return err?
  264.                 }
  265.  
  266.                 if msgFromSer.Type == MsgData {
  267.                         c.readDataMsg <- &msgFromSer
  268.                 } else { // MsgAck or MsgCAck
  269.                         c.readAckMsg <- &msgFromSer
  270.                 }
  271.  
  272.         }
  273. }
  274.  
  275.  
  276. func PrintList(*list.List) {
  277.         i := 0
  278.         fmt.Println("Client LIST:  START")
  279.         for e := l.Front(); e != nil; e = e.Next() {
  280.                 fmt.Printf("Client:  %d: %s\n", i, e.Value.(*Message))
  281.                 i++
  282.         }
  283.         fmt.Println("Client LIST:  END")
  284. }
  285.  
  286. func InsertMsgToList(*client, msg *Message) {
  287.         e := c.msgList.Back()
  288.         for ; e != nil; e = e.Prev() {
  289.                 if e.Value.(*Message).SeqNum < msg.SeqNum {
  290.                         c.msgList.InsertAfter(msg, e)
  291.                         return
  292.                 }
  293.         }
  294.         c.msgList.PushFront(msg)
  295. }
  296.  
  297. func WriteAck(*client, msg *Message) error {
  298.  
  299.         msgToSer := NewAck(msg.ConnID, msg.SeqNum)
  300.         msgJson, err := json.Marshal(msgToSer)
  301.  
  302.         if err != nil {
  303.                 fmt.Println(err.Error())
  304.                 return err
  305.         }
  306.  
  307.         // fmt.Printf("Client:  write ack msg: %s\n", msgToSer)
  308.         _, err = c.udpConn.Write(msgJson)
  309.         if err != nil {
  310.                 fmt.Println(err.Error())
  311.                 return err
  312.         }
  313.         return nil
  314. }
  315.  
  316. func RemoveFromSeqList(*client, index int){
  317.  
  318.         // remove from the slice
  319.         for i, v := range c.unackedMessageSeqList {
  320.                 if v == index{
  321.                         del_index := i
  322.                         c.unackedMessageSeqList[del_index] = c.unackedMessageSeqList[len(c.unackedMessageSeqList)-1]
  323.                         c.unackedMessageSeqList = c.unackedMessageSeqList[:len(c.unackedMessageSeqList)-1]
  324.  
  325.                         break
  326.                 }
  327.         } 
  328.  
  329.         // sort
  330.         sort.Ints(c.unackedMessageSeqList)
  331. }
  332.  
  333. func InsertToSeqList(*client, num int){
  334.         c.unackedMessageSeqList = append(c.unackedMessageSeqList, num)
  335.         sort.Ints(c.unackedMessageSeqList) // small to large?
  336. }
  337.  
  338.  
  339.  
  340.  

Editor

You can edit this paste and save as new:


File Description
  • 1
  • Paste Code
  • 28 Sep-2022
  • 7.66 Kb
You can Share it: