500行代碼了解Mecached緩存客戶端驅動原理

原創不易,求分享、求一鍵三連

緩存一般是用來加速數據訪問的效率,在獲取數據耗時高的場景下使用緩存可以有效的提高數據獲取的效率。

比如,先從memcached中獲取數據,如果沒有則查詢mysql中的數據得到結果寫入到memcached中然後返回,下次請求就能夠從memcached中獲取數據直接返回。

在行業中使用比較多的緩存數據庫有Redis和Memcached。

今天用go實現Memcached的驅動程序,深入了解Memcached和咱們平時所寫的業務代碼如何進行數據交互的協議和原理。

什麼是memcached

Memcached是LiveJournal旗下Danga Interactive公司的Brad Fitzpatric為首開發的一款自由開源、高性能的key-value緩存數據庫軟件。

Mecached協議

Mecached服務和應用程序是不同機器不同的進程,雙方進行數據交互通訊涉及到tcp和通訊協議,在memcache中協議有兩種類型一種是文本行方式,另一種是非結構化數據方式。

我們挑選文本行協議來實現,大多數Mecached的客戶端也是採用文本行協議來開發的因為比較簡單,特定格式文本字符串來約定數據交互,如下以客戶端發送命令舉例:

<command name> <key> <flags> <exptime> <bytes>\r\n
<data block>\r\n

<command name> 是協議的命令,大致分為三類:

  1. 存儲命令:set、 add、replace、append、prepend、cas。
  2. 獲取命令:get、gets。
  3. 其他命令:version、stats

<key>

要求存儲數據的關鍵字;由於memached底層實現的限制,key的長度限制在250個字符內,並且key中不能包含控制字符或空格。

<flags>

是一個16位無符號整數。

<exptime>

是存儲超時時間。如果值為0表示該數據項永不超時;

過期時間實現限制,過期時間要麼是Unix時間(從1970-1-1開始計算的秒數),要麼是從當前時間開始計算的秒數。

該值不能超過30天,否則服務器將該參數當做真正的Unix時間而不是當前時間的一個偏移值。

<bytes>

是隨後數據的位元組數,不包括終結符\r\n。<bytes>有可能是0,它後面將是一個空的數據塊。

<data block>

存儲數據流。

客戶端以字符串的方式向服務端發送文本行的內容服務器端會返回對應的執行結果數據,也有返回錯誤的情況,memcache也對錯誤的數據格式定義三種不同錯誤類型的三種格式讓錯誤的返回簡單:

  1. ERROR\r\n

說明客戶端發送了一個不存在命令

  1. CLIENT_ERROR\r\n

說明在輸入行中存在某種類型的客戶端錯誤,例如輸入的信息沒有遵循memcached的協議

  1. SERVER_ERROR\r\n

說明服務端存在某種類型的錯誤導致致命命令無法執行。

<error>是具有可讀性的錯誤字符串。

當服務端錯誤發生後,將會導致服務器將不會再提供服務,服務器在發送該錯誤信息行後將關閉鏈接。只有在此場景下,服務器才會關閉與客戶端鏈接。

以下就具體羅列memcached常用包含客戶端發送和響應的命令格式列表:

需要注意的是command是區分大小寫的,客戶端使用tcp連接服務端發送客戶端文本行命令,發送成功後等待服務器返回數據,根據格式解析獲取需要的返回值這就是一個簡單的協議命令執行流程。

Golang實現客戶端驅動

有了對memache協議的了解現在來實現通訊就比較簡單,首先需要定義Client結構體,保存客戶端一些基本配置信息及鏈接信息:

type Client struct {
   Timeout      time.Duration
   MaxIdleConns int
   lock         sync.Mutex
   addr         net.Addr
   conns        []*conn
}  
  • Timeout tcp鏈接讀寫超時
  • conns 是memcache鏈接池存放的數組
  • MaxIdleConns 是Idle鏈接的數量
  • lock 是操作conns時加鎖
  • addr則是鏈接的memcache的地址

memcached的單獨一個conn連接結構體定義

type conn struct {
   nc   net.Conn
   rw   *bufio.ReadWriter
   addr net.Addr
   c    *Client
}  
  • nc 是建立好的tcp網絡鏈接
  • rw 為了方便數據發送和讀取設置bufio的ReadWriter
  • addr 存儲memcached地址
  • c 存儲客戶端的引用

下面是看如何獲取鏈接和使用完之後如何將鏈接放回到鏈接池中

//獲取memcached的鏈接
func (c *Client) getFreeConn() (cn *conn, ok bool) {
   c.lock.Lock()
   defer c.lock.Unlock()
   if c.conns == nil {
      return nil, false
   }
   freelist := c.conns
   if len(freelist) == 0 {
      return nil, false
   }
   cn = freelist[len(freelist)-1]
   c.conns = freelist[:len(freelist)-1]
   return cn, true
}
//將使用完的鏈接放回到conns中
func (c *Client) putFreeConn(cn *conn) {
   c.lock.Lock()
   defer c.lock.Unlock()
   if c.conns == nil {
      c.conns = make([]*conn, 0)
   }
   freelist := c.conns
   if len(freelist) >= c.maxIdleConns() {
      cn.nc.Close()
      return
   }
   c.conns = append(freelist, cn)
}

接下來以GET命令為例,來詳細看如何進行網絡傳輸和協議解析的實現

func (c *Client) Get(key string) (item *Item, err error) {
   //check key len 驗證key是否長於250字符
   if !legalKey(key) {
      err = ErrMalformedKey
      return
   }
   keys := []string{key}
   cn, err := c.getConn() //獲取memcached鏈接
   defer cn.condRelease(&err) // 方法執行完之後將鏈接release,返回到鏈接池中
   if err != nil {
      return
   }
   rw := cn.rw
   //將gets 命令用文本行協議寫入到rw中
   if _, err = fmt.Fprintf(rw, "gets %s\r\n", strings.Join(keys, " ")); err != nil {
      return
   }
   if err = rw.Flush(); err != nil {
      return
   }
   //獲取GET命令發送之後等待和獲取返回的響應數據
   if err = parseGetResponse(rw.Reader, func(it *Item) { item = it }); err != nil {
      return
   }
   if item == nil {
      err = ErrCacheMiss
   }
   return
}

func parseGetResponse(r *bufio.Reader, cb func(*Item)) error {
 for {
  line, err := r.ReadSlice('\n')
  if err != nil {
   return err
  }
  if bytes.Equal(line, resultEnd) { //如果獲取是 END\r\n 則數據返回完,則返回
   return nil
  }
  it := new(Item)
  size, err := scanGetResponseLine(line, it)//先根據格式獲取第一行數據和<data> 部分的大小
  if err != nil {
   return err
  }
    //根據bytes獲取數據
  it.Value = make([]byte, size+2)
  _, err = io.ReadFull(r, it.Value)
  if err != nil {
   it.Value = nil
   return err
  }
  if !bytes.HasSuffix(it.Value, crlf) {
   it.Value = nil
   return fmt.Errorf("memcache: corrupt get result read")
  }
  it.Value = it.Value[:size]
  cb(it)
 }
}

//根據返回數據格式獲取返回值設置到Item結構中。
func scanGetResponseLine(line []byte, it *Item) (size int, err error) {
 // 返回的數據格式 VALUE <key> <falgs> <bytes> <casid>
  pattern := "VALUE %s %d %d %d\r\n"
 dest := []interface{}{&it.Key, &it.Flags, &size, &it.casid}
 if bytes.Count(line, space) == 3 {
  pattern = "VALUE %s %d %d\r\n"
  dest = dest[:3]
 }
 n, err := fmt.Sscanf(string(line), pattern, dest...)
 if err != nil || n != len(dest) {
  return -1, fmt.Errorf("memcache: unexpected line in get response: %q", line)
 }
 return size, nil
}

//判斷key是否符合要求
func legalKey(key string) bool {
 if len(key) > 250 {
  return false
 }
 for i := 0; i < len(key); i++ {
  if key[i] <= ' ' || key[i] == 0x7f {
   return false
  }
 }
 return true
}  

其它命令不在詳細描述,完整代碼如下:

  1 package memcache
  2 
  3 import (
  4     "bufio"
  5     "bytes"
  6     "errors"
  7     "fmt"
  8     "io"
  9     "net"
 10     "strconv"
 11     "strings"
 12     "sync"
 13     "time"
 14 )
 15 
 16 //memcached -m 1024  -u root -l 127.0.0.1 -p 12001 -c 55535
 17 //# memcached -d -m 10  -u root -l 127.0.0.1 -p 12001 -c 256 -P /tmp/memcached.pid
 18 //-d選項是啟動一個守護進程
 19 //-m是分配給Memcache使用的內存數量,單位是MB,我這裡是10MB
 20 //-u是運行Memcache的用戶
 21 //-l是監聽的服務器IP地址,如果有多個地址的話,我這裡指定了服務器的IP地址127.0.0.1
 22 //-p是設置 Memcache監聽的端口,我這裡設置了12001,最好是1024以上的端口
 23 //-c選項是最大運行的並發連接數,默認是1024,我這裡設置了 256,按照你服務器的負載量來設定
 24 //-P是設置保存Memcache的pid文件,我這裡是保存在 /tmp/memcached.pid
 25 //停止進程:# kill `cat /tmp/memcached.pid`
 26 
 27 const (
 28     DefaultTimeout      = 100 * time.Millisecond
 29     DefaultMaxIdleConns = 40
 30 )
 31 
 32 var (
 33     resultClientErrorPrefix = []byte("CLIENT_ERROR")
 34     resultErrPrefix         = []byte("ERROR")
 35     resultServerErrPrefix   = []byte("SERVER_ERROR")
 36 
 37     crlf            = []byte("\r\n")
 38     space           = []byte(" ")
 39     resultOK        = []byte("OK\r\n")
 40     resultStored    = []byte("STORED\r\n")
 41     resultNotStored = []byte("NOT_STORED\r\n")
 42     resultExists    = []byte("EXISTS\r\n")
 43     resultNotFound  = []byte("NOT_FOUND\r\n")
 44     resultDeleted   = []byte("DELETED\r\n")
 45     resultEnd       = []byte("END\r\n")
 46     resultTouched   = []byte("TOUCHED\r\n")
 47     versionPrefix   = []byte("VERSION")
 48 
 49     ErrMalformedKey = errors.New("malformed: key is too long or contains invalid characters")
 50     ErrCacheMiss    = errors.New("memcache: cache miss")
 51     ErrCASConflict  = errors.New("memcache: compare-and-swap conflict")
 52     ErrNotStored    = errors.New("memcache: item not stored")
 53 )
 54 
 55 type Client struct {
 56     Timeout      time.Duration
 57     MaxIdleConns int
 58     lock         sync.Mutex
 59     addr         net.Addr
 60     conns        []*conn
 61 }
 62 
 63 func NewClient(timeout time.Duration, maxIdleConns int, addr net.Addr) *Client {
 64     return &Client{
 65         Timeout:      timeout,
 66         MaxIdleConns: maxIdleConns,
 67         lock:         sync.Mutex{},
 68         addr:         addr,
 69         conns:        nil,
 70     }
 71 }
 72 
 73 type Item struct {
 74     // Key is the Item's key (250 bytes maximum).
 75     Key string
 76     // Value is the Item's value.
 77     Value []byte
 78     // Flags are server-opaque flags whose semantics are entirely
 79     // up to the app.
 80     Flags uint32
 81     // Expiration is the cache expiration time, in seconds: either a relative
 82     // time from now (up to 1 month), or an absolute Unix epoch time.
 83     // Zero means the Item has no expiration time.
 84     Expiration int32
 85     // Compare and swap ID.
 86     casid uint64
 87 }
 88 
 89 func (c *Client) maxIdleConns() int {
 90     if c.MaxIdleConns > 0 {
 91         return c.MaxIdleConns
 92     }
 93     return DefaultMaxIdleConns
 94 }
 95 
 96 func (c *Client) netTimeout() time.Duration {
 97     if c.Timeout != 0 {
 98         return c.Timeout
 99     }
100     return DefaultTimeout
101 }
102 
103 type conn struct {
104     nc   net.Conn
105     rw   *bufio.ReadWriter
106     addr net.Addr
107     c    *Client
108 }
109 
110 // 設置超時時間
111 func (cn *conn) extendDeadline() {
112     cn.nc.SetDeadline(time.Now().Add(cn.c.netTimeout()))
113 }
114 
115 // Release 如果是正常的err 則放回到conns中,如果不是這直接close掉conn
116 func (cn *conn) condRelease(err *error) {
117     if *err == nil || resumableError(*err) {
118         cn.release()
119     } else {
120         fmt.Println("xxx", fmt.Sprintf("%s", (*err).Error()))
121         cn.nc.Close()
122     }
123 }
124 
125 // release returns this connection back to the client's free pool
126 func (cn *conn) release() {
127     cn.c.putFreeConn(cn)
128 }
129 
130 func (c *Client) putFreeConn(cn *conn) {
131     c.lock.Lock()
132     defer c.lock.Unlock()
133     if c.conns == nil {
134         c.conns = make([]*conn, 0)
135     }
136     freelist := c.conns
137     if len(freelist) >= c.maxIdleConns() {
138         cn.nc.Close()
139         return
140     }
141     c.conns = append(freelist, cn)
142 }
143 
144 func (c *Client) getFreeConn() (cn *conn, ok bool) {
145     c.lock.Lock()
146     defer c.lock.Unlock()
147     if c.conns == nil {
148         return nil, false
149     }
150     freelist := c.conns
151     if len(freelist) == 0 {
152         return nil, false
153     }
154     cn = freelist[len(freelist)-1]
155     c.conns = freelist[:len(freelist)-1]
156     return cn, true
157 }
158 
159 type ConnectTimeoutError struct {
160     Addr net.Addr
161 }
162 
163 func (cte *ConnectTimeoutError) Error() string {
164     return "memcache: connect timeout to " + cte.Addr.String()
165 }
166 
167 //獲取memcached連接
168 func (c *Client) getConn() (*conn, error) {
169     cn, ok := c.getFreeConn()
170     if ok {
171         cn.extendDeadline()
172         return cn, nil
173     }
174     nc, err := c.dial(c.addr)
175     if err != nil {
176         return nil, err
177     }
178     cn = &conn{
179         nc:   nc,
180         addr: c.addr,
181         rw:   bufio.NewReadWriter(bufio.NewReader(nc), bufio.NewWriter(nc)),
182         c:    c,
183     }
184     cn.extendDeadline()
185     return cn, nil
186 }
187 
188 func (c *Client) dial(addr net.Addr) (net.Conn, error) {
189     nc, err := net.DialTimeout(addr.Network(), addr.String(), c.netTimeout())
190     if err == nil {
191         return nc, nil
192     }
193     if ne, ok := err.(net.Error); ok && ne.Timeout() {
194         return nil, &ConnectTimeoutError{Addr: addr}
195     }
196     return nil, err
197 }
198 
199 func (c *Client) Get(key string) (item *Item, err error) {
200     //check key len
201     if !legalKey(key) {
202         err = ErrMalformedKey
203         return
204     }
205     keys := []string{key}
206     cn, err := c.getConn()
207     defer cn.condRelease(&err)
208     if err != nil {
209         return
210     }
211     rw := cn.rw
212     if _, err = fmt.Fprintf(rw, "gets %s\r\n", strings.Join(keys, " ")); err != nil {
213         return
214     }
215     if err = rw.Flush(); err != nil {
216         return
217     }
218     if err = parseGetResponse(rw.Reader, func(it *Item) { item = it }); err != nil {
219         return
220     }
221     if item == nil {
222         err = ErrCacheMiss
223     }
224     return
225 }
226 
227 func (c *Client) GetMulti(keys []string) (map[string]*Item, error) {
228     var lk sync.Mutex
229     m := make(map[string]*Item)
230     addItemToMap := func(it *Item) {
231         lk.Lock()
232         defer lk.Unlock()
233         m[it.Key] = it
234     }
235     for _, key := range keys {
236         if !legalKey(key) {
237             return nil, ErrMalformedKey
238         }
239     }
240     cn, err := c.getConn()
241     defer cn.condRelease(&err)
242     if err != nil {
243         return nil, err
244     }
245     if _, err = fmt.Fprintf(cn.rw, "gets %s\r\n", strings.Join(keys, " ")); err != nil {
246         return nil, err
247     }
248     if err = cn.rw.Flush(); err != nil {
249         return nil, err
250     }
251     if err = parseGetResponse(cn.rw.Reader, addItemToMap); err != nil {
252         return nil, err
253     }
254     return m, err
255 
256 }
257 
258 func (c *Client) Touch(key string, seconds int32) (err error) {
259 
260     cn, err := c.getConn()
261     if err != nil {
262         return
263     }
264     defer cn.condRelease(&err)
265 
266     if _, err = fmt.Fprintf(cn.rw, "touch %s %d\r\n", key, seconds); err != nil {
267         return
268     }
269     if err = cn.rw.Flush(); err != nil {
270         return
271     }
272     line, err := cn.rw.ReadSlice('\n')
273     if err != nil {
274         return
275     }
276     switch {
277     case bytes.Equal(line, resultTouched):
278         break
279     case bytes.Equal(line, resultNotFound):
280         return ErrCacheMiss
281     default:
282         return fmt.Errorf("memcache: unexpected response line from touch: %q", string(line))
283     }
284     return nil
285 }
286 
287 func (c *Client) Add(item *Item) error {
288     return c.onItem(item, func(client *Client, rw *bufio.ReadWriter, item *Item) error {
289         return client.populateOne(rw, "add", item)
290     })
291 }
292 
293 func (c *Client) Set(item *Item) error {
294     return c.onItem(item, func(client *Client, rw *bufio.ReadWriter, item *Item) error {
295         return client.populateOne(rw, "set", item)
296     })
297 }
298 
299 func (c *Client) CompareAndSwap(item *Item) error {
300     return c.onItem(item, func(client *Client, rw *bufio.ReadWriter, item *Item) error {
301         return client.populateOne(rw, "cas", item)
302     })
303 }
304 
305 func (c *Client) Replace(item *Item) error {
306     return c.onItem(item, func(client *Client, rw *bufio.ReadWriter, item *Item) error {
307         return client.populateOne(rw, "replace", item)
308     })
309 }
310 
311 func (c *Client) Delete(key string) error {
312     if !legalKey(key) {
313         return ErrMalformedKey
314     }
315     cn, err := c.getConn()
316     if err != nil {
317         return err
318     }
319     defer cn.condRelease(&err)
320     return writeExpectf(cn.rw, resultDeleted, "delete %s\r\n", key)
321 }
322 
323 func (c *Client) FlushAll() error {
324     cn, err := c.getConn()
325     if err != nil {
326         return err
327     }
328     defer cn.condRelease(&err)
329     return writeExpectf(cn.rw, resultDeleted, "flush_all\r\n")
330 }
331 
332 func (c *Client) Version() error {
333     cn, err := c.getConn()
334     defer cn.condRelease(&err)
335     if err != nil {
336         return err
337     }
338     return func(rw *bufio.ReadWriter) error {
339         if _, e := fmt.Fprintf(rw, "version\r\n"); e != nil {
340             return err
341         }
342         if e := rw.Flush(); e != nil {
343             return e
344         }
345         line, e := rw.ReadSlice('\n')
346         if e != nil {
347             return e
348         }
349         switch {
350         case bytes.HasPrefix(line, versionPrefix):
351             break
352         default:
353             return fmt.Errorf("memcache: unexpected response line from ping: %q", string(line))
354         }
355         return nil
356     }(cn.rw)
357 
358 }
359 
360 func (c *Client) Increment(key string, delta uint64) (newValue uint64, err error) {
361     return c.incrDecr("incr", key, delta)
362 
363 }
364 
365 func (c *Client) Decrement(key string, delta uint64) (newValue uint64, err error) {
366     return c.incrDecr("decr", key, delta)
367 }
368 
369 func (c *Client) onItem(item *Item, fn func(*Client, *bufio.ReadWriter, *Item) error) error {
370     cn, err := c.getConn()
371     defer cn.condRelease(&err)
372     if err != nil {
373         return err
374     }
375     if err = fn(c, cn.rw, item); err != nil {
376         return err
377     }
378     return nil
379 }
380 
381 func parseGetResponse(r *bufio.Reader, cb func(*Item)) error {
382     for {
383         line, err := r.ReadSlice('\n')
384         if err != nil {
385             return err
386         }
387         if bytes.Equal(line, resultEnd) {
388             return nil
389         }
390         it := new(Item)
391         size, err := scanGetResponseLine(line, it)
392         if err != nil {
393             return err
394         }
395         it.Value = make([]byte, size+2)
396         _, err = io.ReadFull(r, it.Value)
397         if err != nil {
398             it.Value = nil
399             return err
400         }
401         if !bytes.HasSuffix(it.Value, crlf) {
402             it.Value = nil
403             return fmt.Errorf("memcache: corrupt get result read")
404         }
405         it.Value = it.Value[:size]
406         cb(it)
407     }
408 }
409 
410 func scanGetResponseLine(line []byte, it *Item) (size int, err error) {
411     pattern := "VALUE %s %d %d %d\r\n"
412     dest := []interface{}{&it.Key, &it.Flags, &size, &it.casid}
413     if bytes.Count(line, space) == 3 {
414         pattern = "VALUE %s %d %d\r\n"
415         dest = dest[:3]
416     }
417     n, err := fmt.Sscanf(string(line), pattern, dest...)
418     if err != nil || n != len(dest) {
419         return -1, fmt.Errorf("memcache: unexpected line in get response: %q", line)
420     }
421     return size, nil
422 }
423 
424 func legalKey(key string) bool {
425     if len(key) > 250 {
426         return false
427     }
428     for i := 0; i < len(key); i++ {
429         if key[i] <= ' ' || key[i] == 0x7f {
430             return false
431         }
432     }
433     return true
434 }
435 
436 func resumableError(err error) bool {
437     switch err {
438     case ErrCacheMiss, ErrCASConflict, ErrNotStored, ErrMalformedKey:
439         return true
440     }
441     return false
442 }
443 
444 func (c *Client) populateOne(rw *bufio.ReadWriter, verb string, item *Item) error {
445     if !legalKey(item.Key) {
446         return ErrMalformedKey
447     }
448     var err error
449     if verb == "cas" {
450         _, err = fmt.Fprintf(rw, "%s %s %d %d %d %d\r\n",
451             verb, item.Key, item.Flags, item.Expiration, len(item.Value), item.casid)
452     } else {
453         _, err = fmt.Fprintf(rw, "%s %s %d %d %d\r\n",
454             verb, item.Key, item.Flags, item.Expiration, len(item.Value))
455     }
456     if err != nil {
457         return err
458     }
459     if _, err = rw.Write(item.Value); err != nil {
460         return err
461     }
462     if _, err = rw.Write(crlf); err != nil {
463         return err
464     }
465     if err = rw.Flush(); err != nil {
466         return err
467     }
468     line, err := rw.ReadSlice('\n')
469     if err != nil {
470         return err
471     }
472     switch {
473     case bytes.Equal(line, resultStored):
474         return nil
475     case bytes.Equal(line, resultNotStored):
476         return ErrNotStored
477     case bytes.Equal(line, resultExists):
478         return ErrCASConflict
479     case bytes.Equal(line, resultNotFound):
480         return ErrCacheMiss
481     }
482     return fmt.Errorf("memcache: unexpected response line from %q: %q", verb, string(line))
483 }
484 
485 func writeExpectf(rw *bufio.ReadWriter, expect []byte, format string, args ...interface{}) error {
486     line, err := writeReadLine(rw, format, args...)
487     if err != nil {
488         return err
489     }
490     switch {
491     case bytes.Equal(line, resultOK):
492         return nil
493     case bytes.Equal(line, expect):
494         return nil
495     case bytes.Equal(line, resultNotStored):
496         return ErrNotStored
497     case bytes.Equal(line, resultExists):
498         return ErrCASConflict
499     case bytes.Equal(line, resultNotFound):
500         return ErrCacheMiss
501     }
502     return fmt.Errorf("memcache: unexpected response line: %q", string(line))
503 }
504 
505 func writeReadLine(rw *bufio.ReadWriter, format string, args ...interface{}) ([]byte, error) {
506     _, err := fmt.Fprintf(rw, format, args...)
507     if err != nil {
508         return nil, err
509     }
510     if e := rw.Flush(); e != nil {
511         return nil, e
512     }
513     line, err := rw.ReadSlice('\n')
514     return line, err
515 }
516 
517 func (c *Client) incrDecr(verb, key string, delta uint64) (uint64, error) {
518     var val uint64
519     cn, err := c.getConn()
520     defer cn.condRelease(&err)
521     if err != nil {
522         return 0, err
523     }
524     func(rw *bufio.ReadWriter) error {
525         line, e := writeReadLine(rw, "%s %s %d\r\n", verb, key, delta)
526         if e != nil {
527             return e
528         }
529         switch {
530         case bytes.Equal(line, resultNotFound):
531             return ErrCacheMiss
532         case bytes.HasPrefix(line, resultClientErrorPrefix):
533             errMsg := line[len(resultClientErrorPrefix) : len(line)-2]
534             return errors.New("memcache: client error: " + string(errMsg))
535         }
536         val, e = strconv.ParseUint(string(line[:len(line)-2]), 10, 64)
537         if e != nil {
538             return e
539         }
540         return nil
541     }(cn.rw)
542     return val, err
543 }

View Code

測試代碼,啟動一個http服務器先設置到memcached中,通過/hello 接口從memcached中獲取對應的值

package main

import (
 "fmt"
 "memcache_go/memcache"
 "net"
 "net/http"
 "time"
)

var client *memcache.Client

func IndexHandler(w http.ResponseWriter, r *http.Request) {
 ret, err := client.Get("tcp_key")
 if err != nil {
  fmt.Fprintln(w, "err ")
 }
 str := ""
 if ret != nil {
  str = string(ret.Value)
  fmt.Fprintln(w, "hello world", str)
 } else {
  fmt.Fprintln(w, "nil")
 }
}

func touchHandler(w http.ResponseWriter, r *http.Request) {
 err := client.Touch("tcp_key", 1000)
 if err != nil {
  fmt.Fprintln(w, "err ")
  return
 }
 fmt.Fprintln(w, "succ")
}

func main() {
 addr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:12001")
 if err != nil {
  fmt.Println(fmt.Sprintf("get err %v", err))
  return
 }
 client = memcache.NewClient(30*time.Second, 30, addr)

 err = client.Add(&memcache.Item{
  Key:        "tcp_key",
  Value:      []byte(fmt.Sprintf("tcp_key_value_%d", time.Now().UnixNano())),
  Flags:      0,
  Expiration: 0,
 })
 if err != nil {
  fmt.Println("執行失敗")
  //return
 }
 http.HandleFunc("/get", IndexHandler)
 http.HandleFunc("/touch", touchHandler)
 http.ListenAndServe("127.0.0.1:8000", nil)
 //fmt.Println("memcache_test...")
 for {
  time.Sleep(1000 * 30)
 }
}  

啟動memcached服務進行測試

memcached -m 1024  -u root -l 127.0.0.1 -p 12001 -c 55535
//-d選項是啟動一個守護進程
//-m是分配給Memcache使用的內存數量,單位是MB,我這裡是10MB
//-u是運行Memcache的用戶
//-l是監聽的服務器IP地址,如果有多個地址的話,我這裡指定了服務器的IP地址127.0.0.1
//-p是設置 Memcache監聽的端口,我這裡設置了12001,最好是1024以上的端口
//-c選項是最大運行的並發連接數,默認是1024,我這裡設置了 256,按照你服務器的負載量來設定
//-P是設置保存Memcache的pid文件,我這裡是保存在 /tmp/memcached.pid
//停止進程:# kill `cat /tmp/memcached.pid`  

到這裡我們大概用了幾百行代碼實現了一個簡單的memcached鏈接驅動的子集,對應用程序和memcached如何通訊有了大致了解。

好了,今天的分享就到這,希望對各位有用。 「原創不易,多多分享」

想要更多交流可以加群:

也可以加入知識星球免費提問: