500行代碼了解Mecached緩存客戶端驅動原理
原創不易,求分享、求一鍵三連
緩存一般是用來加速數據訪問的效率,在獲取數據耗時高的場景下使用緩存可以有效的提高數據獲取的效率。
比如,先從memcached中獲取數據,如果沒有則查詢mysql中的數據得到結果寫入到memcached中然後返回,下次請求就能夠從memcached中獲取數據直接返回。
在行業中使用比較多的緩存數據庫有Redis和Memcached。
今天用go實現Memcached的驅動程序,深入了解Memcached和咱們平時所寫的業務代碼如何進行數據交互的協議和原理。
什麼是memcached
Memcached是LiveJournal旗下Danga Interactive公司的Brad Fitzpatric為首開發的一款自由開源、高性能的key-value緩存數據庫軟件。
Mecached協議
Mecached服務和應用程序是不同機器不同的進程,雙方進行數據交互通訊涉及到tcp和通訊協議,在memcache中協議有兩種類型一種是文本行方式,另一種是非結構化數據方式。
我們挑選文本行協議來實現,大多數Mecached的客戶端也是採用文本行協議來開發的因為比較簡單,特定格式文本字符串來約定數據交互,如下以客戶端發送命令舉例:
<command name> <key> <flags> <exptime> <bytes>\r\n
<data block>\r\n
<command name>
是協議的命令,大致分為三類:
- 存儲命令:set、 add、replace、append、prepend、cas。
- 獲取命令:get、gets。
- 其他命令:version、stats
<key>
要求存儲數據的關鍵字;由於memached底層實現的限制,key的長度限制在250個字符內,並且key中不能包含控制字符或空格。
<flags>
是一個16位無符號整數。
<exptime>
是存儲超時時間。如果值為0表示該數據項永不超時;
過期時間實現限制,過期時間要麼是Unix時間(從1970-1-1開始計算的秒數),要麼是從當前時間開始計算的秒數。
該值不能超過30天,否則服務器將該參數當做真正的Unix時間而不是當前時間的一個偏移值。
<bytes>
是隨後數據的位元組數,不包括終結符\r\n。<bytes>
有可能是0,它後面將是一個空的數據塊。
<data block>
存儲數據流。
客戶端以字符串的方式向服務端發送文本行的內容服務器端會返回對應的執行結果數據,也有返回錯誤的情況,memcache也對錯誤的數據格式定義三種不同錯誤類型的三種格式讓錯誤的返回簡單:
- ERROR\r\n
說明客戶端發送了一個不存在命令
- CLIENT_ERROR\r\n
說明在輸入行中存在某種類型的客戶端錯誤,例如輸入的信息沒有遵循memcached的協議
- SERVER_ERROR\r\n
說明服務端存在某種類型的錯誤導致致命命令無法執行。
<error>
是具有可讀性的錯誤字符串。
當服務端錯誤發生後,將會導致服務器將不會再提供服務,服務器在發送該錯誤信息行後將關閉鏈接。只有在此場景下,服務器才會關閉與客戶端鏈接。
以下就具體羅列memcached常用包含客戶端發送和響應的命令格式列表:
需要注意的是command是區分大小寫的,客戶端使用tcp連接服務端發送客戶端文本行命令,發送成功後等待服務器返回數據,根據格式解析獲取需要的返回值這就是一個簡單的協議命令執行流程。
Golang實現客戶端驅動
有了對memache協議的了解現在來實現通訊就比較簡單,首先需要定義Client結構體,保存客戶端一些基本配置信息及鏈接信息:
type Client struct {
Timeout time.Duration
MaxIdleConns int
lock sync.Mutex
addr net.Addr
conns []*conn
}
- Timeout tcp鏈接讀寫超時
- conns 是memcache鏈接池存放的數組
- MaxIdleConns 是Idle鏈接的數量
- lock 是操作conns時加鎖
- addr則是鏈接的memcache的地址
memcached的單獨一個conn連接結構體定義
type conn struct {
nc net.Conn
rw *bufio.ReadWriter
addr net.Addr
c *Client
}
- nc 是建立好的tcp網絡鏈接
- rw 為了方便數據發送和讀取設置bufio的ReadWriter
- addr 存儲memcached地址
- c 存儲客戶端的引用
下面是看如何獲取鏈接和使用完之後如何將鏈接放回到鏈接池中
//獲取memcached的鏈接
func (c *Client) getFreeConn() (cn *conn, ok bool) {
c.lock.Lock()
defer c.lock.Unlock()
if c.conns == nil {
return nil, false
}
freelist := c.conns
if len(freelist) == 0 {
return nil, false
}
cn = freelist[len(freelist)-1]
c.conns = freelist[:len(freelist)-1]
return cn, true
}
//將使用完的鏈接放回到conns中
func (c *Client) putFreeConn(cn *conn) {
c.lock.Lock()
defer c.lock.Unlock()
if c.conns == nil {
c.conns = make([]*conn, 0)
}
freelist := c.conns
if len(freelist) >= c.maxIdleConns() {
cn.nc.Close()
return
}
c.conns = append(freelist, cn)
}
接下來以GET命令為例,來詳細看如何進行網絡傳輸和協議解析的實現
func (c *Client) Get(key string) (item *Item, err error) {
//check key len 驗證key是否長於250字符
if !legalKey(key) {
err = ErrMalformedKey
return
}
keys := []string{key}
cn, err := c.getConn() //獲取memcached鏈接
defer cn.condRelease(&err) // 方法執行完之後將鏈接release,返回到鏈接池中
if err != nil {
return
}
rw := cn.rw
//將gets 命令用文本行協議寫入到rw中
if _, err = fmt.Fprintf(rw, "gets %s\r\n", strings.Join(keys, " ")); err != nil {
return
}
if err = rw.Flush(); err != nil {
return
}
//獲取GET命令發送之後等待和獲取返回的響應數據
if err = parseGetResponse(rw.Reader, func(it *Item) { item = it }); err != nil {
return
}
if item == nil {
err = ErrCacheMiss
}
return
}
func parseGetResponse(r *bufio.Reader, cb func(*Item)) error {
for {
line, err := r.ReadSlice('\n')
if err != nil {
return err
}
if bytes.Equal(line, resultEnd) { //如果獲取是 END\r\n 則數據返回完,則返回
return nil
}
it := new(Item)
size, err := scanGetResponseLine(line, it)//先根據格式獲取第一行數據和<data> 部分的大小
if err != nil {
return err
}
//根據bytes獲取數據
it.Value = make([]byte, size+2)
_, err = io.ReadFull(r, it.Value)
if err != nil {
it.Value = nil
return err
}
if !bytes.HasSuffix(it.Value, crlf) {
it.Value = nil
return fmt.Errorf("memcache: corrupt get result read")
}
it.Value = it.Value[:size]
cb(it)
}
}
//根據返回數據格式獲取返回值設置到Item結構中。
func scanGetResponseLine(line []byte, it *Item) (size int, err error) {
// 返回的數據格式 VALUE <key> <falgs> <bytes> <casid>
pattern := "VALUE %s %d %d %d\r\n"
dest := []interface{}{&it.Key, &it.Flags, &size, &it.casid}
if bytes.Count(line, space) == 3 {
pattern = "VALUE %s %d %d\r\n"
dest = dest[:3]
}
n, err := fmt.Sscanf(string(line), pattern, dest...)
if err != nil || n != len(dest) {
return -1, fmt.Errorf("memcache: unexpected line in get response: %q", line)
}
return size, nil
}
//判斷key是否符合要求
func legalKey(key string) bool {
if len(key) > 250 {
return false
}
for i := 0; i < len(key); i++ {
if key[i] <= ' ' || key[i] == 0x7f {
return false
}
}
return true
}
其它命令不在詳細描述,完整代碼如下:


1 package memcache 2 3 import ( 4 "bufio" 5 "bytes" 6 "errors" 7 "fmt" 8 "io" 9 "net" 10 "strconv" 11 "strings" 12 "sync" 13 "time" 14 ) 15 16 //memcached -m 1024 -u root -l 127.0.0.1 -p 12001 -c 55535 17 //# memcached -d -m 10 -u root -l 127.0.0.1 -p 12001 -c 256 -P /tmp/memcached.pid 18 //-d選項是啟動一個守護進程 19 //-m是分配給Memcache使用的內存數量,單位是MB,我這裡是10MB 20 //-u是運行Memcache的用戶 21 //-l是監聽的服務器IP地址,如果有多個地址的話,我這裡指定了服務器的IP地址127.0.0.1 22 //-p是設置 Memcache監聽的端口,我這裡設置了12001,最好是1024以上的端口 23 //-c選項是最大運行的並發連接數,默認是1024,我這裡設置了 256,按照你服務器的負載量來設定 24 //-P是設置保存Memcache的pid文件,我這裡是保存在 /tmp/memcached.pid 25 //停止進程:# kill `cat /tmp/memcached.pid` 26 27 const ( 28 DefaultTimeout = 100 * time.Millisecond 29 DefaultMaxIdleConns = 40 30 ) 31 32 var ( 33 resultClientErrorPrefix = []byte("CLIENT_ERROR") 34 resultErrPrefix = []byte("ERROR") 35 resultServerErrPrefix = []byte("SERVER_ERROR") 36 37 crlf = []byte("\r\n") 38 space = []byte(" ") 39 resultOK = []byte("OK\r\n") 40 resultStored = []byte("STORED\r\n") 41 resultNotStored = []byte("NOT_STORED\r\n") 42 resultExists = []byte("EXISTS\r\n") 43 resultNotFound = []byte("NOT_FOUND\r\n") 44 resultDeleted = []byte("DELETED\r\n") 45 resultEnd = []byte("END\r\n") 46 resultTouched = []byte("TOUCHED\r\n") 47 versionPrefix = []byte("VERSION") 48 49 ErrMalformedKey = errors.New("malformed: key is too long or contains invalid characters") 50 ErrCacheMiss = errors.New("memcache: cache miss") 51 ErrCASConflict = errors.New("memcache: compare-and-swap conflict") 52 ErrNotStored = errors.New("memcache: item not stored") 53 ) 54 55 type Client struct { 56 Timeout time.Duration 57 MaxIdleConns int 58 lock sync.Mutex 59 addr net.Addr 60 conns []*conn 61 } 62 63 func NewClient(timeout time.Duration, maxIdleConns int, addr net.Addr) *Client { 64 return &Client{ 65 Timeout: timeout, 66 MaxIdleConns: maxIdleConns, 67 lock: sync.Mutex{}, 68 addr: addr, 69 conns: nil, 70 } 71 } 72 73 type Item struct { 74 // Key is the Item's key (250 bytes maximum). 75 Key string 76 // Value is the Item's value. 77 Value []byte 78 // Flags are server-opaque flags whose semantics are entirely 79 // up to the app. 80 Flags uint32 81 // Expiration is the cache expiration time, in seconds: either a relative 82 // time from now (up to 1 month), or an absolute Unix epoch time. 83 // Zero means the Item has no expiration time. 84 Expiration int32 85 // Compare and swap ID. 86 casid uint64 87 } 88 89 func (c *Client) maxIdleConns() int { 90 if c.MaxIdleConns > 0 { 91 return c.MaxIdleConns 92 } 93 return DefaultMaxIdleConns 94 } 95 96 func (c *Client) netTimeout() time.Duration { 97 if c.Timeout != 0 { 98 return c.Timeout 99 } 100 return DefaultTimeout 101 } 102 103 type conn struct { 104 nc net.Conn 105 rw *bufio.ReadWriter 106 addr net.Addr 107 c *Client 108 } 109 110 // 設置超時時間 111 func (cn *conn) extendDeadline() { 112 cn.nc.SetDeadline(time.Now().Add(cn.c.netTimeout())) 113 } 114 115 // Release 如果是正常的err 則放回到conns中,如果不是這直接close掉conn 116 func (cn *conn) condRelease(err *error) { 117 if *err == nil || resumableError(*err) { 118 cn.release() 119 } else { 120 fmt.Println("xxx", fmt.Sprintf("%s", (*err).Error())) 121 cn.nc.Close() 122 } 123 } 124 125 // release returns this connection back to the client's free pool 126 func (cn *conn) release() { 127 cn.c.putFreeConn(cn) 128 } 129 130 func (c *Client) putFreeConn(cn *conn) { 131 c.lock.Lock() 132 defer c.lock.Unlock() 133 if c.conns == nil { 134 c.conns = make([]*conn, 0) 135 } 136 freelist := c.conns 137 if len(freelist) >= c.maxIdleConns() { 138 cn.nc.Close() 139 return 140 } 141 c.conns = append(freelist, cn) 142 } 143 144 func (c *Client) getFreeConn() (cn *conn, ok bool) { 145 c.lock.Lock() 146 defer c.lock.Unlock() 147 if c.conns == nil { 148 return nil, false 149 } 150 freelist := c.conns 151 if len(freelist) == 0 { 152 return nil, false 153 } 154 cn = freelist[len(freelist)-1] 155 c.conns = freelist[:len(freelist)-1] 156 return cn, true 157 } 158 159 type ConnectTimeoutError struct { 160 Addr net.Addr 161 } 162 163 func (cte *ConnectTimeoutError) Error() string { 164 return "memcache: connect timeout to " + cte.Addr.String() 165 } 166 167 //獲取memcached連接 168 func (c *Client) getConn() (*conn, error) { 169 cn, ok := c.getFreeConn() 170 if ok { 171 cn.extendDeadline() 172 return cn, nil 173 } 174 nc, err := c.dial(c.addr) 175 if err != nil { 176 return nil, err 177 } 178 cn = &conn{ 179 nc: nc, 180 addr: c.addr, 181 rw: bufio.NewReadWriter(bufio.NewReader(nc), bufio.NewWriter(nc)), 182 c: c, 183 } 184 cn.extendDeadline() 185 return cn, nil 186 } 187 188 func (c *Client) dial(addr net.Addr) (net.Conn, error) { 189 nc, err := net.DialTimeout(addr.Network(), addr.String(), c.netTimeout()) 190 if err == nil { 191 return nc, nil 192 } 193 if ne, ok := err.(net.Error); ok && ne.Timeout() { 194 return nil, &ConnectTimeoutError{Addr: addr} 195 } 196 return nil, err 197 } 198 199 func (c *Client) Get(key string) (item *Item, err error) { 200 //check key len 201 if !legalKey(key) { 202 err = ErrMalformedKey 203 return 204 } 205 keys := []string{key} 206 cn, err := c.getConn() 207 defer cn.condRelease(&err) 208 if err != nil { 209 return 210 } 211 rw := cn.rw 212 if _, err = fmt.Fprintf(rw, "gets %s\r\n", strings.Join(keys, " ")); err != nil { 213 return 214 } 215 if err = rw.Flush(); err != nil { 216 return 217 } 218 if err = parseGetResponse(rw.Reader, func(it *Item) { item = it }); err != nil { 219 return 220 } 221 if item == nil { 222 err = ErrCacheMiss 223 } 224 return 225 } 226 227 func (c *Client) GetMulti(keys []string) (map[string]*Item, error) { 228 var lk sync.Mutex 229 m := make(map[string]*Item) 230 addItemToMap := func(it *Item) { 231 lk.Lock() 232 defer lk.Unlock() 233 m[it.Key] = it 234 } 235 for _, key := range keys { 236 if !legalKey(key) { 237 return nil, ErrMalformedKey 238 } 239 } 240 cn, err := c.getConn() 241 defer cn.condRelease(&err) 242 if err != nil { 243 return nil, err 244 } 245 if _, err = fmt.Fprintf(cn.rw, "gets %s\r\n", strings.Join(keys, " ")); err != nil { 246 return nil, err 247 } 248 if err = cn.rw.Flush(); err != nil { 249 return nil, err 250 } 251 if err = parseGetResponse(cn.rw.Reader, addItemToMap); err != nil { 252 return nil, err 253 } 254 return m, err 255 256 } 257 258 func (c *Client) Touch(key string, seconds int32) (err error) { 259 260 cn, err := c.getConn() 261 if err != nil { 262 return 263 } 264 defer cn.condRelease(&err) 265 266 if _, err = fmt.Fprintf(cn.rw, "touch %s %d\r\n", key, seconds); err != nil { 267 return 268 } 269 if err = cn.rw.Flush(); err != nil { 270 return 271 } 272 line, err := cn.rw.ReadSlice('\n') 273 if err != nil { 274 return 275 } 276 switch { 277 case bytes.Equal(line, resultTouched): 278 break 279 case bytes.Equal(line, resultNotFound): 280 return ErrCacheMiss 281 default: 282 return fmt.Errorf("memcache: unexpected response line from touch: %q", string(line)) 283 } 284 return nil 285 } 286 287 func (c *Client) Add(item *Item) error { 288 return c.onItem(item, func(client *Client, rw *bufio.ReadWriter, item *Item) error { 289 return client.populateOne(rw, "add", item) 290 }) 291 } 292 293 func (c *Client) Set(item *Item) error { 294 return c.onItem(item, func(client *Client, rw *bufio.ReadWriter, item *Item) error { 295 return client.populateOne(rw, "set", item) 296 }) 297 } 298 299 func (c *Client) CompareAndSwap(item *Item) error { 300 return c.onItem(item, func(client *Client, rw *bufio.ReadWriter, item *Item) error { 301 return client.populateOne(rw, "cas", item) 302 }) 303 } 304 305 func (c *Client) Replace(item *Item) error { 306 return c.onItem(item, func(client *Client, rw *bufio.ReadWriter, item *Item) error { 307 return client.populateOne(rw, "replace", item) 308 }) 309 } 310 311 func (c *Client) Delete(key string) error { 312 if !legalKey(key) { 313 return ErrMalformedKey 314 } 315 cn, err := c.getConn() 316 if err != nil { 317 return err 318 } 319 defer cn.condRelease(&err) 320 return writeExpectf(cn.rw, resultDeleted, "delete %s\r\n", key) 321 } 322 323 func (c *Client) FlushAll() error { 324 cn, err := c.getConn() 325 if err != nil { 326 return err 327 } 328 defer cn.condRelease(&err) 329 return writeExpectf(cn.rw, resultDeleted, "flush_all\r\n") 330 } 331 332 func (c *Client) Version() error { 333 cn, err := c.getConn() 334 defer cn.condRelease(&err) 335 if err != nil { 336 return err 337 } 338 return func(rw *bufio.ReadWriter) error { 339 if _, e := fmt.Fprintf(rw, "version\r\n"); e != nil { 340 return err 341 } 342 if e := rw.Flush(); e != nil { 343 return e 344 } 345 line, e := rw.ReadSlice('\n') 346 if e != nil { 347 return e 348 } 349 switch { 350 case bytes.HasPrefix(line, versionPrefix): 351 break 352 default: 353 return fmt.Errorf("memcache: unexpected response line from ping: %q", string(line)) 354 } 355 return nil 356 }(cn.rw) 357 358 } 359 360 func (c *Client) Increment(key string, delta uint64) (newValue uint64, err error) { 361 return c.incrDecr("incr", key, delta) 362 363 } 364 365 func (c *Client) Decrement(key string, delta uint64) (newValue uint64, err error) { 366 return c.incrDecr("decr", key, delta) 367 } 368 369 func (c *Client) onItem(item *Item, fn func(*Client, *bufio.ReadWriter, *Item) error) error { 370 cn, err := c.getConn() 371 defer cn.condRelease(&err) 372 if err != nil { 373 return err 374 } 375 if err = fn(c, cn.rw, item); err != nil { 376 return err 377 } 378 return nil 379 } 380 381 func parseGetResponse(r *bufio.Reader, cb func(*Item)) error { 382 for { 383 line, err := r.ReadSlice('\n') 384 if err != nil { 385 return err 386 } 387 if bytes.Equal(line, resultEnd) { 388 return nil 389 } 390 it := new(Item) 391 size, err := scanGetResponseLine(line, it) 392 if err != nil { 393 return err 394 } 395 it.Value = make([]byte, size+2) 396 _, err = io.ReadFull(r, it.Value) 397 if err != nil { 398 it.Value = nil 399 return err 400 } 401 if !bytes.HasSuffix(it.Value, crlf) { 402 it.Value = nil 403 return fmt.Errorf("memcache: corrupt get result read") 404 } 405 it.Value = it.Value[:size] 406 cb(it) 407 } 408 } 409 410 func scanGetResponseLine(line []byte, it *Item) (size int, err error) { 411 pattern := "VALUE %s %d %d %d\r\n" 412 dest := []interface{}{&it.Key, &it.Flags, &size, &it.casid} 413 if bytes.Count(line, space) == 3 { 414 pattern = "VALUE %s %d %d\r\n" 415 dest = dest[:3] 416 } 417 n, err := fmt.Sscanf(string(line), pattern, dest...) 418 if err != nil || n != len(dest) { 419 return -1, fmt.Errorf("memcache: unexpected line in get response: %q", line) 420 } 421 return size, nil 422 } 423 424 func legalKey(key string) bool { 425 if len(key) > 250 { 426 return false 427 } 428 for i := 0; i < len(key); i++ { 429 if key[i] <= ' ' || key[i] == 0x7f { 430 return false 431 } 432 } 433 return true 434 } 435 436 func resumableError(err error) bool { 437 switch err { 438 case ErrCacheMiss, ErrCASConflict, ErrNotStored, ErrMalformedKey: 439 return true 440 } 441 return false 442 } 443 444 func (c *Client) populateOne(rw *bufio.ReadWriter, verb string, item *Item) error { 445 if !legalKey(item.Key) { 446 return ErrMalformedKey 447 } 448 var err error 449 if verb == "cas" { 450 _, err = fmt.Fprintf(rw, "%s %s %d %d %d %d\r\n", 451 verb, item.Key, item.Flags, item.Expiration, len(item.Value), item.casid) 452 } else { 453 _, err = fmt.Fprintf(rw, "%s %s %d %d %d\r\n", 454 verb, item.Key, item.Flags, item.Expiration, len(item.Value)) 455 } 456 if err != nil { 457 return err 458 } 459 if _, err = rw.Write(item.Value); err != nil { 460 return err 461 } 462 if _, err = rw.Write(crlf); err != nil { 463 return err 464 } 465 if err = rw.Flush(); err != nil { 466 return err 467 } 468 line, err := rw.ReadSlice('\n') 469 if err != nil { 470 return err 471 } 472 switch { 473 case bytes.Equal(line, resultStored): 474 return nil 475 case bytes.Equal(line, resultNotStored): 476 return ErrNotStored 477 case bytes.Equal(line, resultExists): 478 return ErrCASConflict 479 case bytes.Equal(line, resultNotFound): 480 return ErrCacheMiss 481 } 482 return fmt.Errorf("memcache: unexpected response line from %q: %q", verb, string(line)) 483 } 484 485 func writeExpectf(rw *bufio.ReadWriter, expect []byte, format string, args ...interface{}) error { 486 line, err := writeReadLine(rw, format, args...) 487 if err != nil { 488 return err 489 } 490 switch { 491 case bytes.Equal(line, resultOK): 492 return nil 493 case bytes.Equal(line, expect): 494 return nil 495 case bytes.Equal(line, resultNotStored): 496 return ErrNotStored 497 case bytes.Equal(line, resultExists): 498 return ErrCASConflict 499 case bytes.Equal(line, resultNotFound): 500 return ErrCacheMiss 501 } 502 return fmt.Errorf("memcache: unexpected response line: %q", string(line)) 503 } 504 505 func writeReadLine(rw *bufio.ReadWriter, format string, args ...interface{}) ([]byte, error) { 506 _, err := fmt.Fprintf(rw, format, args...) 507 if err != nil { 508 return nil, err 509 } 510 if e := rw.Flush(); e != nil { 511 return nil, e 512 } 513 line, err := rw.ReadSlice('\n') 514 return line, err 515 } 516 517 func (c *Client) incrDecr(verb, key string, delta uint64) (uint64, error) { 518 var val uint64 519 cn, err := c.getConn() 520 defer cn.condRelease(&err) 521 if err != nil { 522 return 0, err 523 } 524 func(rw *bufio.ReadWriter) error { 525 line, e := writeReadLine(rw, "%s %s %d\r\n", verb, key, delta) 526 if e != nil { 527 return e 528 } 529 switch { 530 case bytes.Equal(line, resultNotFound): 531 return ErrCacheMiss 532 case bytes.HasPrefix(line, resultClientErrorPrefix): 533 errMsg := line[len(resultClientErrorPrefix) : len(line)-2] 534 return errors.New("memcache: client error: " + string(errMsg)) 535 } 536 val, e = strconv.ParseUint(string(line[:len(line)-2]), 10, 64) 537 if e != nil { 538 return e 539 } 540 return nil 541 }(cn.rw) 542 return val, err 543 }
View Code
測試代碼,啟動一個http服務器先設置到memcached中,通過/hello 接口從memcached中獲取對應的值
package main
import (
"fmt"
"memcache_go/memcache"
"net"
"net/http"
"time"
)
var client *memcache.Client
func IndexHandler(w http.ResponseWriter, r *http.Request) {
ret, err := client.Get("tcp_key")
if err != nil {
fmt.Fprintln(w, "err ")
}
str := ""
if ret != nil {
str = string(ret.Value)
fmt.Fprintln(w, "hello world", str)
} else {
fmt.Fprintln(w, "nil")
}
}
func touchHandler(w http.ResponseWriter, r *http.Request) {
err := client.Touch("tcp_key", 1000)
if err != nil {
fmt.Fprintln(w, "err ")
return
}
fmt.Fprintln(w, "succ")
}
func main() {
addr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:12001")
if err != nil {
fmt.Println(fmt.Sprintf("get err %v", err))
return
}
client = memcache.NewClient(30*time.Second, 30, addr)
err = client.Add(&memcache.Item{
Key: "tcp_key",
Value: []byte(fmt.Sprintf("tcp_key_value_%d", time.Now().UnixNano())),
Flags: 0,
Expiration: 0,
})
if err != nil {
fmt.Println("執行失敗")
//return
}
http.HandleFunc("/get", IndexHandler)
http.HandleFunc("/touch", touchHandler)
http.ListenAndServe("127.0.0.1:8000", nil)
//fmt.Println("memcache_test...")
for {
time.Sleep(1000 * 30)
}
}
啟動memcached服務進行測試
memcached -m 1024 -u root -l 127.0.0.1 -p 12001 -c 55535
//-d選項是啟動一個守護進程
//-m是分配給Memcache使用的內存數量,單位是MB,我這裡是10MB
//-u是運行Memcache的用戶
//-l是監聽的服務器IP地址,如果有多個地址的話,我這裡指定了服務器的IP地址127.0.0.1
//-p是設置 Memcache監聽的端口,我這裡設置了12001,最好是1024以上的端口
//-c選項是最大運行的並發連接數,默認是1024,我這裡設置了 256,按照你服務器的負載量來設定
//-P是設置保存Memcache的pid文件,我這裡是保存在 /tmp/memcached.pid
//停止進程:# kill `cat /tmp/memcached.pid`
到這裡我們大概用了幾百行代碼實現了一個簡單的memcached鏈接驅動的子集,對應用程序和memcached如何通訊有了大致了解。
好了,今天的分享就到這,希望對各位有用。 「原創不易,多多分享」
想要更多交流可以加群:
也可以加入知識星球免費提問: