用C、python手寫redis客戶端,兼容redis集群 (-MOVED和-ASK),快速搭建redis集群
想沒想過,自己寫一個redis客戶端,是不是很難呢?
其實,並不是特別難。
首先,要知道redis服務端用的通訊協議,建議直接去官網看,部落格啥的其實也是從官網摘抄的,或者從其他部落格抄的(忽略)。
協議說明中文官網地址: //www.redis.cn/topics/protocol.html
redis通訊協議
列出主要的點,便於對於下面程式的理解。
Redis在TCP埠6379(默認埠,在配置可以修改)上監聽到來的連接,在客戶端與伺服器端之間傳輸的每個Redis命令或者數據都以\r\n結尾。
回復(服務端可客戶端恢復的協議)
Redis用不同的回復類型回復命令。它可能從伺服器發送的第一個位元組開始校驗回復類型:
* 用單行回復(狀態回復),回復的第一個位元組將是「+」
* 錯誤消息,回復的第一個位元組將是「-」
* 整型數字,回復的第一個位元組將是「:」
* 批量回復,回復的第一個位元組將是「$」
* 多個批量回復,回復的第一個位元組將是「*」
Bulk Strings(批量回復)
批量回復被伺服器用於返回一個單二進位安全字元串。
C: GET mykey
S: $6\r\nfoobar\r\n
伺服器發送第一行回復,該行以「$」開始後面跟隨實際要發送的位元組數,隨後是CRLF,然後發送實際數據,隨後是2個位元組的額外數據用於最後的CRLF。伺服器發送的準確序列如下:
”$6\r\nfoobar\r\n”
如果請求的值不存在,批量回復將使用特殊的值-1來作為數據長度,例如:
C: GET nonexistingkey
S: $-1
當請求的對象不存在時,客戶端庫API不會返回空字元串,而會返回空對象。例如:Ruby庫返回『nil』,而C庫返回NULL(或者在回復的對象里設置指定的標誌)等等。
二進位
簡單說下二進位,就是會包含\0,所以C語言在處理的時候,就不能用str函數,像strlen、strcpy等,因為它們都是以\0來判斷字元串結尾的。
redis集群
寫redis客戶端,要考慮到單機和集群,單機知道上面的協議就可以寫了,集群還要學習一下。
超簡單搭建redis集群
官網也介紹了怎麼搭建redis集群,試過比較麻煩,因為用的centos6.5,如果用較新的centos,可能會好點。
redis集群超簡單搭建方法://blog.csdn.net/cjfeii/article/details/47320351
Redis 集群的數據分片
Redis 集群沒有使用一致性hash, 而是引入了 哈希槽的概念.
Redis 集群有16384個哈希槽,每個key通過CRC16校驗後對16384取模來決定放置哪個槽.集群的每個節點負責一部分hash槽,舉個例子,比如當前集群有3個節點,那麼:
* 節點 A 包含 0 到 5500號哈希槽.
* 節點 B 包含5501 到 11000 號哈希槽.
* 節點 C 包含11001 到 16384號哈希槽.
這種結構很容易添加或者刪除節點. 比如如果我想新添加個節點D, 我需要從節點 A, B, C中得部分槽到D上. 如果我想移除節點A,需要將A中的槽移到B和C節點上,然後將沒有任何槽的A節點從集群中移除即可. 由於從一個節點將哈希槽移動到另一個節點並不會停止服務,所以無論添加刪除或者改變某個節點的哈希槽的數量都不會造成集群不可用的狀態.
Redis 集群協議中的客戶端和伺服器端
在 Redis 集群中,節點負責存儲數據、記錄集群的狀態(包括鍵值到正確節點的映射)。集群節點同樣能自動發現其他節點,檢測出沒正常工作的節點, 並且在需要的時候在從節點中推選出主節點。
為了執行這些任務,所有的集群節點都通過TCP連接(TCP bus?)和一個二進位協議(集群連接,cluster bus)建立通訊。 每一個節點都通過集群連接(cluster bus)與集群上的其餘每個節點連接起來。 節點們使用一個 gossip 協議來傳播集群的資訊,這樣可以:發現新的節點、 發送ping包(用來確保所有節點都在正常工作中)、在特定情況發生時發送集群消息。集群連接也用於在集群中發布或訂閱消息。
由於集群節點不能代理(proxy)請求,所以客戶端在接收到重定向錯誤(redirections errors) -MOVED 和 -ASK 的時候, 將命令重定向到其他節點。理論上來說,客戶端是可以自由地向集群中的所有節點發送請求,在需要的時候把請求重定向到其他節點,所以客戶端是不需要保存集群狀態。 不過客戶端可以快取鍵值和節點之間的映射關係,這樣能明顯提高命令執行的效率。
-MOVED
簡單說下返回-MOVED的情況,就是客戶端連節點A請求處理key,但其實key其實在節點B,就返回-MOVED,協議如下:-MOVED 3999 127.0.0.1:6381
不用考慮-ASK的情況。
C語言實現redis客戶端
程式碼如下:


#include <string.h> #include <sys/socket.h> #include <arpa/inet.h> #include <errno.h> #include <fcntl.h> #include <netdb.h> #include <sys/poll.h> #include <unistd.h> #include <sys/types.h> #include <stdlib.h> #include <stdio.h> ssize_t sock_write_loop( int fd, const void *vptr, size_t n ) { size_t nleft = 0; ssize_t nwritten = 0; const char *ptr; ptr = (char *) vptr; nleft = n; while( nleft > 0 ) { if( (nwritten = write(fd, ptr, nleft) ) <= 0 ) { if( errno == EINTR ) { nwritten = 0; //再次調用write } else { return -5; } } nleft = nleft - nwritten; ptr = ptr + nwritten; } return(n); } int sock_read_wait( int fd, int timeout ) { struct pollfd pfd; pfd.fd = fd; pfd.events = POLLIN; pfd.revents = 0; timeout *= 1000; for (;;) { switch( poll(&pfd, 1, timeout) ) { case -1: if( errno != EINTR ) { return (-2); } continue; case 0: errno = ETIMEDOUT; return (-1); default: if( pfd.revents & POLLIN ) return (0); else return (-3); } } } ssize_t sock_read_tmo( int fd, void *vptr, size_t len, int timeout ) { if( timeout > 0 && sock_read_wait(fd, timeout) < 0 ) return (-1); else return (read(fd, vptr, len)); } int sock_connect_nore(const char *IPaddr , int port , int timeout) { // char temp[4096]; int sock_fd = 0, n = 0, errcode = 0; struct sockaddr_in servaddr; if( IPaddr == NULL ) { return -1; } if( (sock_fd = socket(AF_INET, SOCK_STREAM, 0) ) < 0 ) { return -1; } memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(port); //changed by navy 2003.3.3 for support domain addr //if( (servaddr.sin_addr.s_addr = inet_addr(IPaddr) ) == -1 ) if( (errcode = inet_pton(AF_INET, IPaddr, &servaddr.sin_addr) ) <= 0 ) { //added by navy 2003.3.31 for support domain addr struct hostent* pHost = NULL, host; char sBuf[2048], sHostIp[17]; int h_errnop = 0; memset(&host, 0, sizeof(host)); memset(sBuf, 0, sizeof(sBuf)); memset(sHostIp, 0 , sizeof(sHostIp)); pHost = &host; #ifdef _SOLARIS_PLAT //solaris if( (gethostbyname_r(IPaddr, pHost, sBuf, sizeof(sBuf), &h_errnop) == NULL) || #else //linux if( (gethostbyname_r(IPaddr, pHost, sBuf, sizeof(sBuf), &pHost, &h_errnop) != 0) || #endif (pHost == NULL) ) { close(sock_fd); return -1; } if( pHost->h_addrtype != AF_INET && pHost->h_addrtype != AF_INET6 ) { close(sock_fd); return -1; } //目前僅取第一個IP地址 if( (inet_ntop(pHost->h_addrtype, *(pHost->h_addr_list), sHostIp, sizeof(sHostIp)) ) == NULL ) { close(sock_fd); return -1; } if( (errcode = inet_pton(AF_INET, sHostIp, &servaddr.sin_addr) ) <= 0 ) { close(sock_fd); return -1; } //end added by navy 2003.3.31 } if( (errcode = sock_timed_connect(sock_fd, (struct sockaddr *)&servaddr, sizeof(servaddr), timeout) ) < 0 ) { close(sock_fd); return -1; } return sock_fd; } int sock_connect(const char *IPaddr , int port , int timeout) { char temp[4096]; int sock_fd = 0, n = 0, errcode = 0; struct sockaddr_in servaddr; if( IPaddr == NULL ) { return -1; } if( (sock_fd = socket(AF_INET, SOCK_STREAM, 0) ) < 0 ) { return -1; } memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(port); //changed by navy 2003.3.3 for support domain addr //if( (servaddr.sin_addr.s_addr = inet_addr(IPaddr) ) == -1 ) if( (errcode = inet_pton(AF_INET, IPaddr, &servaddr.sin_addr) ) <= 0 ) { //added by navy 2003.3.31 for support domain addr struct hostent* pHost = NULL, host; char sBuf[2048], sHostIp[17]; int h_errnop = 0; memset(&host, 0, sizeof(host)); memset(sBuf, 0, sizeof(sBuf)); memset(sHostIp, 0 , sizeof(sHostIp)); pHost = &host; #ifdef _SOLARIS_PLAT //solaris if( (gethostbyname_r(IPaddr, pHost, sBuf, sizeof(sBuf), &h_errnop) == NULL) || #else //linux if( (gethostbyname_r(IPaddr, pHost, sBuf, sizeof(sBuf), &pHost, &h_errnop) != 0) || #endif (pHost == NULL) ) { close(sock_fd); return -1; } if( pHost->h_addrtype != AF_INET && pHost->h_addrtype != AF_INET6 ) { close(sock_fd); return -1; } //目前僅取第一個IP地址 if( (inet_ntop(pHost->h_addrtype, *(pHost->h_addr_list), sHostIp, sizeof(sHostIp)) ) == NULL ) { close(sock_fd); return -1; } if( (errcode = inet_pton(AF_INET, sHostIp, &servaddr.sin_addr) ) <= 0 ) { close(sock_fd); return -1; } //end added by navy 2003.3.31 } if( (errcode = sock_timed_connect(sock_fd, (struct sockaddr *)&servaddr, sizeof(servaddr), timeout) ) < 0 ) { close(sock_fd); return -1; } n = sock_read_tmo(sock_fd, temp, 4096, timeout); //一般錯誤 if( n <= 0 ) { close(sock_fd); sock_fd = -1; } return sock_fd; } int sock_non_blocking(int fd, int on) { int flags; if ((flags = fcntl(fd, F_GETFL, 0)) < 0){ return -10; } if (fcntl(fd, F_SETFL, on ? flags | O_NONBLOCK : flags & ~O_NONBLOCK) < 0){ return -10; } return 0; } int sock_write_wait(int fd, int timeout) { struct pollfd pfd; pfd.fd = fd; pfd.events = POLLOUT; pfd.revents = 0; timeout *= 1000; for (;;) { switch( poll(&pfd, 1, timeout) ) { case -1: if( errno != EINTR ) { return (-2); } continue; case 0: errno = ETIMEDOUT; return (-1); default: if( pfd.revents & POLLOUT ) return (0); else return (-3); } } } int sock_timed_connect(int sock, struct sockaddr * sa, int len, int timeout) { int error = 0; socklen_t error_len; sock_non_blocking(sock, 1); if( connect(sock, sa, len) == 0 ) { sock_non_blocking(sock, 0); return (0); } if( errno != EINPROGRESS ) { sock_non_blocking(sock, 0); return (-1); } /* * A connection is in progress. Wait for a limited amount of time for * something to happen. If nothing happens, report an error. */ if( sock_write_wait(sock, timeout) != 0) { sock_non_blocking(sock, 0); return (-2); } /* * Something happened. Some Solaris 2 versions have getsockopt() itself * return the error, instead of returning it via the parameter list. */ error = 0; error_len = sizeof(error); if( getsockopt(sock, SOL_SOCKET, SO_ERROR, (char *) &error, &error_len) != 0 ) { sock_non_blocking(sock, 0); return (-3); } if( error ) { errno = error; sock_non_blocking(sock, 0); return (-4); } sock_non_blocking(sock, 0); /* * No problems. */ return (0); } static int check_ip_in_list(const char *ip, char *iplist) { char *token = NULL; char *saveptr = NULL; token = strtok_r(iplist, ",", &saveptr); while(token != NULL) { char *ptmp = NULL; char *ip_mask = strtok_r(token, "/", &ptmp); if(!ip_mask) return -1; char *ip_bit = strtok_r(NULL, "/", &ptmp); if(ip_bit) { int mask_bit = atoi(ip_bit); if(mask_bit < 0 || mask_bit >32) continue; unsigned long addr[4] = { 0 }; sscanf( ip_mask, "%lu.%lu.%lu.%lu", addr, addr + 1, addr + 2, addr + 3 ); unsigned long vl1 = addr[0] << 24 | addr[1] << 16 | addr[2] << 8 | addr[3]; sscanf( ip, "%lu.%lu.%lu.%lu", addr, addr + 1, addr + 2, addr + 3 ); unsigned long vl2 = addr[0] << 24 | addr[1] << 16 | addr[2] << 8 | addr[3]; vl1 = ( vl1 >> ( 32 - mask_bit ) ); vl2 = ( vl2 >> ( 32 - mask_bit ) ); if( vl1 == vl2 ) return 1; } else { if(strcmp(ip,ip_mask) == 0) return 1; } token = strtok_r(NULL, ",", &saveptr); } return 0; } static int check_ip_in_redis(const char *redis_host, const char *ip,const char *rq_pro) { char buf[128]; int loops = 0; strcpy(buf, redis_host); do { loops ++; char *ptmp = NULL; char *host = strtok_r(buf, ":", &ptmp); if(!host) return -1; char *s_port = strtok_r(NULL, ":", &ptmp); if(!s_port) return -1; int port = atoi(s_port); char respone[40] = {0}; int sock_fd = -1; if((sock_fd = sock_connect_nore(host, port, 5))<0) return -1; if(sock_write_loop(sock_fd, rq_pro, strlen(rq_pro)) != strlen(rq_pro)) { close(sock_fd); return -1; } if(sock_read_tmo(sock_fd, respone, sizeof(respone)-1, 5)<=0) { close(sock_fd); return -1; } if(strncmp(":0", respone, 2) == 0) { close(sock_fd); return 0; } else if(strncmp(":1", respone, 2) == 0) { close(sock_fd); return 1; } else if(strncmp("$", respone, 1) == 0) { int data_size = 0; int ret = 0; char *data_line = strstr(respone,"\r\n"); if(!data_line) { close(sock_fd); return -1; } data_line = data_line+2; data_size = atoi(respone+1); if(data_size == -1) { close(sock_fd); return 0; } if(strlen(data_line) == data_size+2) { printf("line = %d, data_line = %s\n",__LINE__,data_line); ret=check_ip_in_list(ip, data_line); close(sock_fd); return ret; } char *data = calloc(data_size+3,1); if(!data) { close(sock_fd); return -1; } strcpy(data,data_line); int read_size = strlen(data); int left_size = data_size + 2 - read_size; while(left_size > 0) { int nread = sock_read_tmo(sock_fd, data+read_size, left_size, 5); if(nread<=0) { free(data); close(sock_fd); return -1; } read_size += nread; left_size -= nread; } close(sock_fd); printf("line = %d, data = %s\n",__LINE__,data); ret=check_ip_in_list(ip, data); free(data); return ret; } else if(strncmp("-MOVED", respone, 6) == 0) { close(sock_fd); char *p = strchr(respone, ' '); if(p == NULL) return -1; p = strchr(p+1, ' '); if(p == NULL) return -1; strcpy(buf, p+1); } else { close(sock_fd); return -1; } }while(loops < 2); return -1; } int main(int argc,char *argv[]) { if(argc != 2) { printf("please input ip\n"); return -1; } const char *redis_ip = "127.0.0.1:7002"; const char *domain = "test.com"; char exist_pro[128] = {0}; char get_pro[128] = {0}; snprintf(exist_pro,sizeof(exist_pro),"EXISTS test|%s|%s\r\n",domain,"127.0.0.1"); snprintf(get_pro,sizeof(get_pro),"GET test_%s\r\n",domain); int loops = 0; int ret = 0; do { loops ++; ret = check_ip_in_redis(redis_ip, argv[1],exist_pro); if(ret == 0) ret = check_ip_in_redis(redis_ip, argv[1],get_pro); }while(loops < 3 && ret < 0); printf("line = %d, ret = %d\n",__LINE__,ret); return ret; }
c_redis_cli.c
主要看這個check_ip_in_redis函數就行了,其它都是一些socket的封裝。
python實現redis客戶端
#!/usr/bin/python import sys import socket def main(argv): if(len(argv) != 3): print "please input domain ip!" return host = "192.168.188.47" port = 7002 while 1: s = socket.socket() s.connect((host, port)) cmd = 'set %s_white_ip %s\r\n' % (argv[1],argv[2]) s.send(cmd) res = s.recv(32) s.close() if res[0] == "+": print "set domain white ip suc!" return elif res[0:6] == "-MOVED": list = res.split(" ") ip_list = list[2].split(":") host = ip_list[0] port = int(ip_list[1]) else: print "set domain white ip error!" return if __name__ == "__main__": main(sys.argv)
總結
多去學一些,想想redis客戶端怎麼實現的,對redis的理解會更加深入,寫了這部分之後,對redis集群有了更加深入的了解了