[go] 1
Viewer
*** This page was generated with the meta tag "noindex, nofollow". This happened because you selected this option before saving or the system detected it as spam. This means that this page will never get into the search engines and the search bot will not crawl it. There is nothing to worry about, you can still share it with anyone.
- // Contains the implementation of a LSP client.
- package lsp
- import (
- "container/list"
- "encoding/json"
- //"errors"
- "fmt"
- "github.com/cmu440/lspnet"
- "sort" // ??
- )
- type client struct {
- // TODO: implement this!
- udpConn *lspnet.UDPConn
- connID int
- seqNumCli int
- seqNumSer int
- msgList *list.List
- readPayload chan []byte
- writeSig chan []byte
- readAckMsg chan *Message
- readDataMsg chan *Message
- readCalled chan bool
- // for sliding window
- maxUnackedMessages int
- windowSize int
- unackedMessageSeqList []int // ??????
- writeQueue *list.List
- // for epoch event
- epochLimit int
- epochMillis int
- maxBackOffInterval int
- tryWrite chan *Message // type?????
- canWrite chan *Message
- updateWrite chan int
- }
- // NewClient creates, initiates, and returns a new client. This function
- // should return after a connection with the server has been established
- // (i.e., the client has received an Ack message from the server in response
- // to its connection request), and should return a non-nil error if a
- // connection could not be made (i.e., if after K epochs, the client still
- // hasn't received an Ack message from the server in response to its K
- // connection requests).
- //
- // initialSeqNum is an int representing the Initial Sequence Number (ISN) this
- // client must use. You may assume that sequence numbers do not wrap around.
- //
- // hostport is a colon-separated string identifying the server's host address
- // and port number (i.e., "localhost:9999").
- func NewClient(hostport string, initialSeqNum int, params *Params) (Client, error) {
- saddr, err := lspnet.ResolveUDPAddr("udp", hostport)
- if err != nil {
- fmt.Println(err.Error())
- return nil, err
- }
- udpConn, err := lspnet.DialUDP("udp", nil, saddr)
- if err != nil {
- fmt.Println(err.Error())
- return nil, err
- }
- msgToSer := NewConnect(initialSeqNum)
- msgJson, err := json.Marshal(msgToSer)
- if err != nil {
- fmt.Println(err.Error())
- return nil, err
- }
- _, err = udpConn.Write(msgJson)
- if err != nil {
- fmt.Println(err.Error())
- return nil, err
- }
- buf := make([]byte, 2000)
- var msgFromSer Message
- for {
- n, err := udpConn.Read(buf)
- if err != nil {
- fmt.Println(err.Error())
- return nil, err
- }
- err = json.Unmarshal(buf[:n], &msgFromSer)
- if err != nil {
- fmt.Println(err.Error())
- return nil, err
- }
- if (msgFromSer.Type == MsgAck) && (msgFromSer.SeqNum == initialSeqNum) {
- cli := client{
- udpConn: udpConn,
- connID: msgFromSer.ConnID,
- seqNumCli: initialSeqNum,
- seqNumSer: msgFromSer.SeqNum + 1,
- msgList: list.New(),
- readPayload: make(chan []byte, 1),
- writeSig: make(chan []byte),
- readAckMsg: make(chan *Message),
- readDataMsg: make(chan *Message),
- readCalled: make(chan bool),
- maxUnackedMessages: params.MaxUnackedMessages,
- windowSize: params.WindowSize,
- epochLimit: params.EpochLimit,
- epochMillis: params.EpochMillis,
- maxBackOffInterval: params.MaxBackOffInterval,
- tryWrite: make(chan *Message),
- canWrite: make(chan *Message),
- updateWrite: make(chan int),
- unackedMessageSeqList: []int{},
- writeQueue: list.New(),
- }
- go MainRoutine(&cli)
- go ReadRoutine(&cli)
- return &cli, nil
- }
- }
- return nil, err
- }
- func (c *client) ConnID() int {
- return c.connID
- }
- func (c *client) Read() ([]byte, error) {
- c.readCalled <- true
- payload := <-c.readPayload
- return payload, nil
- }
- func (c *client) Write(payload []byte) error {
- c.seqNumCli++
- size := len(payload)
- checksum := CalculateChecksum(c.connID, c.seqNumCli, size, payload)
- msgToSer := NewData(c.connID, c.seqNumCli, size, payload, checksum)
- // send message to main_routine to check whether it can write
- fmt.Println("test 1")
- c.tryWrite <- msgToSer
- fmt.Println("test 2")
- writeMsg := <- c.canWrite // is the blocking reasonable???
- fmt.Println("test 3")
- msgJson, err := json.Marshal(writeMsg)
- if err != nil {
- fmt.Println(err.Error())
- return err
- }
- _, err = c.udpConn.Write(msgJson)
- if err != nil {
- fmt.Println(err.Error())
- return err
- }
- // send mesage to main_routine to update unacked list
- c.updateWrite <- c.seqNumCli
- fmt.Println("test 4")
- return nil
- }
- func (c *client) Close() error {
- c.udpConn.Close()
- return nil
- }
- func MainRoutine(c *client) {
- readCalledSig := false
- writeCalledSig := false
- var curMsg *Message
- for {
- if readCalledSig == true {
- head := c.msgList.Front()
- if head != nil && head.Value.(*Message).SeqNum == c.seqNumSer {
- c.readPayload <- head.Value.(*Message).Payload
- c.msgList.Remove(head)
- c.seqNumSer++
- readCalledSig = false
- }
- }
- if writeCalledSig == true {
- if len(c.unackedMessageSeqList) > 0{
- if (len(c.unackedMessageSeqList) < c.maxUnackedMessages) && (curMsg.SeqNum < (c.unackedMessageSeqList[0] + c.windowSize -1)) {
- c.canWrite <- curMsg
- writeCalledSig = false
- }
- } else { // means we can directly write
- c.canWrite <- curMsg
- writeCalledSig = false
- }
- }
- select {
- case msg := <-c.readDataMsg:
- InsertMsgToList(c, msg)
- // PrintList(c.msgList)
- WriteAck(c, msg)
- case ackMsg := <-c.readAckMsg:
- // update unackedMessageSeqList
- if ackMsg.Type == MsgAck{
- RemoveFromSeqList(c, ackMsg.SeqNum)
- }
- if ackMsg.Type == MsgCAck {
- if len(c.unackedMessageSeqList) > 0{
- firstSeqInList := c.unackedMessageSeqList[0]
- for i := firstSeqInList; i <= ackMsg.SeqNum; i++ {
- RemoveFromSeqList(c, i)
- }
- }
- }
- case <-c.readCalled:
- readCalledSig = true
- case temp := <- c.tryWrite:
- writeCalledSig = true
- curMsg = temp
- case num := <- c.updateWrite:
- InsertToSeqList(c, num)
- }
- }
- }
- func ReadRoutine(c *client) {
- buf := make([]byte, 2000)
- for {
- var msgFromSer Message
- n, err := c.udpConn.Read(buf)
- if err != nil {
- fmt.Println(err.Error())
- // return err ?
- }
- err = json.Unmarshal(buf[:n], &msgFromSer)
- if err != nil {
- fmt.Println(err.Error())
- // return err?
- }
- if msgFromSer.Type == MsgData {
- c.readDataMsg <- &msgFromSer
- } else { // MsgAck or MsgCAck
- c.readAckMsg <- &msgFromSer
- }
- }
- }
- func PrintList(l *list.List) {
- i := 0
- fmt.Println("Client LIST: START")
- for e := l.Front(); e != nil; e = e.Next() {
- fmt.Printf("Client: %d: %s\n", i, e.Value.(*Message))
- i++
- }
- fmt.Println("Client LIST: END")
- }
- func InsertMsgToList(c *client, msg *Message) {
- e := c.msgList.Back()
- for ; e != nil; e = e.Prev() {
- if e.Value.(*Message).SeqNum < msg.SeqNum {
- c.msgList.InsertAfter(msg, e)
- return
- }
- }
- c.msgList.PushFront(msg)
- }
- func WriteAck(c *client, msg *Message) error {
- msgToSer := NewAck(msg.ConnID, msg.SeqNum)
- msgJson, err := json.Marshal(msgToSer)
- if err != nil {
- fmt.Println(err.Error())
- return err
- }
- // fmt.Printf("Client: write ack msg: %s\n", msgToSer)
- _, err = c.udpConn.Write(msgJson)
- if err != nil {
- fmt.Println(err.Error())
- return err
- }
- return nil
- }
- func RemoveFromSeqList(c *client, index int){
- // remove from the slice
- for i, v := range c.unackedMessageSeqList {
- if v == index{
- del_index := i
- c.unackedMessageSeqList[del_index] = c.unackedMessageSeqList[len(c.unackedMessageSeqList)-1]
- c.unackedMessageSeqList = c.unackedMessageSeqList[:len(c.unackedMessageSeqList)-1]
- break
- }
- }
- // sort
- sort.Ints(c.unackedMessageSeqList)
- }
- func InsertToSeqList(c *client, num int){
- c.unackedMessageSeqList = append(c.unackedMessageSeqList, num)
- sort.Ints(c.unackedMessageSeqList) // small to large?
- }
Editor
You can edit this paste and save as new: