hugegraph 數據存取數據解析
hugegraph 是百度開源的圖資料庫,支援hbase,mysql,rocksdb等作為存儲後端。本文以EDGE 存儲,hbase為存儲後端,來探索是如何hugegraph是如何存取數據的。
存數據
序列化
首先需要序列化,hbase
使用BinarySerializer:
- keyWithIdPrefix 和indexWithIdPrefix都是false
這個後面會用到。
public class HbaseSerializer extends BinarySerializer {
public HbaseSerializer() {
super(false, true);
}
}
要存到db,首先需要序列化為BackendEntry,BackendEntry
是圖資料庫和後端存儲的傳輸對象,Hbase對應的是BinaryBackendEntry
:
public class BinaryBackendEntry implements BackendEntry {
private static final byte[] EMPTY_BYTES = new byte[]{};
private final HugeType type;
private final BinaryId id;
private Id subId;
private final List<BackendColumn> columns;
private long ttl;
public BinaryBackendEntry(HugeType type, byte[] bytes) {
this(type, BytesBuffer.wrap(bytes).parseId(type));
}
public BinaryBackendEntry(HugeType type, BinaryId id) {
this.type = type;
this.id = id;
this.subId = null;
this.columns = new ArrayList<>();
this.ttl = 0L;
}
我們來看序列化,序列化,其實就是要將數據放到entry的column列里。
hbase
的keyWithIdPrefix
是false,因此name
不包含ownerVertexId(參考下面的EdgeId,去掉ownerVertexId)
public BackendEntry writeEdge(HugeEdge edge) {
BinaryBackendEntry entry = newBackendEntry(edge);
byte[] name = this.keyWithIdPrefix ?
this.formatEdgeName(edge) : EMPTY_BYTES;
byte[] value = this.formatEdgeValue(edge);
entry.column(name, value);
if (edge.hasTtl()) {
entry.ttl(edge.ttl());
}
return entry;
}
EdgeId:
private final Id ownerVertexId;
private final Directions direction;
private final Id edgeLabelId;
private final String sortValues;
private final Id otherVertexId;
private final boolean directed;
private String cache;
backend 存儲
生成BackendEntry後,通過store機制,交給後端的backend存儲。
EDGE的保存,對應HbaseTables.Edge:
public static class Edge extends HbaseTable {
@Override
public void insert(Session session, BackendEntry entry) {
long ttl = entry.ttl();
if (ttl == 0L) {
session.put(this.table(), CF, entry.id().asBytes(),
entry.columns());
} else {
session.put(this.table(), CF, entry.id().asBytes(),
entry.columns(), ttl);
}
}
}
CF 是固定的f:
protected static final byte[] CF = "f".getBytes();
session.put
對應:
@Override
public void put(String table, byte[] family, byte[] rowkey,
Collection<BackendColumn> columns) {
Put put = new Put(rowkey);
for (BackendColumn column : columns) {
put.addColumn(family, column.name, column.value);
}
this.batch(table, put);
}
可以看出,存儲時,edgeid作為rowkey
,然後把去除ownerVertexId
後的edgeid
作為column.name
EDGE 讀取
從backend讀取BackendEntry
讀取就是從hbase讀取result,轉換為BinaryBackendEntry,再轉成Edge。
讀取,是scan的過程:
/**
* Inner scan: send scan request to HBase and get iterator
*/
@Override
public RowIterator scan(String table, Scan scan) {
assert !this.hasChanges();
try (Table htable = table(table)) {
return new RowIterator(htable.getScanner(scan));
} catch (IOException e) {
throw new BackendException(e);
}
}
scan後,返回BackendEntryIterator
protected BackendEntryIterator newEntryIterator(Query query,
RowIterator rows) {
return new BinaryEntryIterator<>(rows, query, (entry, row) -> {
E.checkState(!row.isEmpty(), "Can't parse empty HBase result");
byte[] id = row.getRow();
if (entry == null || !Bytes.prefixWith(id, entry.id().asBytes())) {
HugeType type = query.resultType();
// NOTE: only support BinaryBackendEntry currently
entry = new BinaryBackendEntry(type, id);
}
try {
this.parseRowColumns(row, entry, query);
} catch (IOException e) {
throw new BackendException("Failed to read HBase columns", e);
}
return entry;
});
}
注意,new BinaryBackendEntry(type, id)
時,BinaryBackendEntry的id並不是rowkey
,而是對rowkey做了處理:
public BinaryId parseId(HugeType type) {
if (type.isIndex()) {
return this.readIndexId(type);
}
// Parse id from bytes
int start = this.buffer.position();
/*
* Since edge id in edges table doesn't prefix with leading 0x7e,
* so readId() will return the source vertex id instead of edge id,
* can't call: type.isEdge() ? this.readEdgeId() : this.readId();
*/
Id id = this.readId();
int end = this.buffer.position();
int len = end - start;
byte[] bytes = new byte[len];
System.arraycopy(this.array(), start, bytes, 0, len);
return new BinaryId(bytes, id);
}
這裡是先讀取ownervertexId作為Id部分, 然後將剩餘的直接放入bytes,組合成BinaryId,和序列化的時候有差別,為什麼這麼設計呢?原來不管是vertex還是edge,都是當成Vertex來讀取的。
protected final BinaryBackendEntry newBackendEntry(HugeEdge edge) {
BinaryId id = new BinaryId(formatEdgeName(edge),
edge.idWithDirection());
return newBackendEntry(edge.type(), id);
}
public EdgeId directed(boolean directed) {
return new EdgeId(this.ownerVertexId, this.direction, this.edgeLabelId,
this.sortValues, this.otherVertexId, directed);
}
序列化的時候是EdgeId
。
BackendEntryIterator
迭代器支援對結果進行merge, 上面程式碼里的!Bytes.prefixWith(id, entry.id().asBytes()))
就是對比是否是同一個ownervertex,如果是同一個,則放到同一個BackendEntry的Columns里。
public BinaryEntryIterator(BackendIterator<Elem> results, Query query,
BiFunction<BackendEntry, Elem, BackendEntry> m)
@Override
protected final boolean fetch() {
assert this.current == null;
if (this.next != null) {
this.current = this.next;
this.next = null;
}
while (this.results.hasNext()) {
Elem elem = this.results.next();
BackendEntry merged = this.merger.apply(this.current, elem);
E.checkState(merged != null, "Error when merging entry");
if (this.current == null) {
// The first time to read
this.current = merged;
} else if (merged == this.current) {
// The next entry belongs to the current entry
assert this.current != null;
if (this.sizeOf(this.current) >= INLINE_BATCH_SIZE) {
break;
}
} else {
// New entry
assert this.next == null;
this.next = merged;
break;
}
// When limit exceed, stop fetching
if (this.reachLimit(this.fetched() - 1)) {
// Need remove last one because fetched limit + 1 records
this.removeLastRecord();
this.results.close();
break;
}
}
return this.current != null;
}
從BackendEntry轉換為edge
然後再來看讀取數據readVertex
,前面說了,就算是edge,其實也是當vertex來讀取的:
@Override
public HugeVertex readVertex(HugeGraph graph, BackendEntry bytesEntry) {
if (bytesEntry == null) {
return null;
}
BinaryBackendEntry entry = this.convertEntry(bytesEntry);
// Parse id
Id id = entry.id().origin();
Id vid = id.edge() ? ((EdgeId) id).ownerVertexId() : id;
HugeVertex vertex = new HugeVertex(graph, vid, VertexLabel.NONE);
// Parse all properties and edges of a Vertex
for (BackendColumn col : entry.columns()) {
if (entry.type().isEdge()) {
// NOTE: the entry id type is vertex even if entry type is edge
// Parse vertex edges
this.parseColumn(col, vertex);
} else {
assert entry.type().isVertex();
// Parse vertex properties
assert entry.columnsSize() == 1 : entry.columnsSize();
this.parseVertex(col.value, vertex);
}
}
return vertex;
}
邏輯:
- 先讀取ownervertexid,生成HugeVertex,這個時候只知道id,不知道vertexlabel,所以設置為VertexLabel.NONE
- 然後,讀取BackendColumn,一個edge,一個Column(name是edgeid去除ownervertexid後的部分,value是邊數據)
讀取是在parseColumn
:
protected void parseColumn(BackendColumn col, HugeVertex vertex) {
BytesBuffer buffer = BytesBuffer.wrap(col.name);
Id id = this.keyWithIdPrefix ? buffer.readId() : vertex.id();
E.checkState(buffer.remaining() > 0, "Missing column type");
byte type = buffer.read();
// Parse property
if (type == HugeType.PROPERTY.code()) {
Id pkeyId = buffer.readId();
this.parseProperty(pkeyId, BytesBuffer.wrap(col.value), vertex);
}
// Parse edge
else if (type == HugeType.EDGE_IN.code() ||
type == HugeType.EDGE_OUT.code()) {
this.parseEdge(col, vertex, vertex.graph());
}
// Parse system property
else if (type == HugeType.SYS_PROPERTY.code()) {
// pass
}
// Invalid entry
else {
E.checkState(false, "Invalid entry(%s) with unknown type(%s): 0x%s",
id, type & 0xff, Bytes.toHex(col.name));
}
}
從“col.name`讀取type,如果是edge,則parseEdge:
protected void parseEdge(BackendColumn col, HugeVertex vertex,
HugeGraph graph) {
// owner-vertex + dir + edge-label + sort-values + other-vertex
BytesBuffer buffer = BytesBuffer.wrap(col.name);
if (this.keyWithIdPrefix) {
// Consume owner-vertex id
buffer.readId();
}
byte type = buffer.read();
Id labelId = buffer.readId();
String sortValues = buffer.readStringWithEnding();
Id otherVertexId = buffer.readId();
boolean direction = EdgeId.isOutDirectionFromCode(type);
EdgeLabel edgeLabel = graph.edgeLabelOrNone(labelId);
// Construct edge
HugeEdge edge = HugeEdge.constructEdge(vertex, direction, edgeLabel,
sortValues, otherVertexId);
// Parse edge-id + edge-properties
buffer = BytesBuffer.wrap(col.value);
//Id id = buffer.readId();
// Parse edge properties
this.parseProperties(buffer, edge);
// Parse edge expired time if needed
if (edge.hasTtl()) {
this.parseExpiredTime(buffer, edge);
}
}
從col.name依次讀取出type,labelId,sortValues和otherVertexId:
byte type = buffer.read();
Id labelId = buffer.readId();
String sortValues = buffer.readStringWithEnding();
Id otherVertexId = buffer.readId();
然後根據labelid找到 EdgeLabel edgeLabel = graph.edgeLabelOrNone(labelId);
創建edge
, 解析邊屬性parseProperties
最後讀取Ttl
, 處理結果的時候,會過濾過期數據。