Skip to content

Commit

Permalink
support udpate
Browse files Browse the repository at this point in the history
  • Loading branch information
qiyuewuyi committed Mar 22, 2018
1 parent b7fd14a commit 10dc30d
Show file tree
Hide file tree
Showing 12 changed files with 811 additions and 20 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

# Log file
*.log
logs

# BlueJ files
*.ctxt
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.aliyun.openservices.tablestore</groupId>
<artifactId>Timeline</artifactId>
<version>1.1.0</version>
<version>1.2.1</version>
<build>
<plugins>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public TimelineEntry store(IMessage message) {
* 异步写入消息接口。
* @param message 消息对象,需实现IMessage接口。
* @param callback 回调函数。
* @return Future对象,异步模式下,Future和callback需要二选一
* @return Future对象。
*/
public Future<TimelineEntry> storeAsync(IMessage message, TimelineCallback<IMessage> callback) {
if (message == null) {
Expand All @@ -92,6 +92,47 @@ public void batch(IMessage message) {
this.store.batch(this.timelineID, message);
}

/**
* 同步更新消息。
* @param sequenceID 消息顺序ID,和TimelineID一起唯一确定一条消息。
* @param message 消息对象,需实现IMessage接口。
* @return 完整的TimelineEntry,包括消息和顺序ID。
*/
public TimelineEntry update(Long sequenceID, IMessage message) {
if (message == null) {
throw new TimelineException(TimelineExceptionType.INVALID_USE,
"store parameter message is null");
}

if (sequenceID == null) {
throw new TimelineException(TimelineExceptionType.INVALID_USE,
"store parameter sequenceID is null");
}

return this.store.update(this.timelineID, sequenceID, message);
}

/**
* 异步更新消息接口。
* @param sequenceID 消息顺序ID,和TimelineID一起唯一确定一条消息。
* @param message 消息对象,需实现IMessage接口。
* @param callback 回调函数。
* @return Future对象。
*/
public Future<TimelineEntry> updateAsync(Long sequenceID, IMessage message, TimelineCallback<IMessage> callback) {
if (message == null) {
throw new TimelineException(TimelineExceptionType.INVALID_USE,
"store parameter message is null");
}

if (sequenceID == null) {
throw new TimelineException(TimelineExceptionType.INVALID_USE,
"store parameter sequenceID is null");
}

return this.store.updateAsync(this.timelineID, sequenceID, message, callback);
}

/**
* 同步读取接口,通过制定一个唯一的顺序ID读取目标TimelineEntry。
* @param sequenceID 顺序ID。
Expand All @@ -110,7 +151,7 @@ public TimelineEntry get(Long sequenceID) {
* 异步读取接口,通过制定一个唯一的顺序ID读取目标TimelineEntry。
* @param sequenceID 顺序ID。
* @param callback 读取结束后的回调函数。
* @return Future对象,异步模式下,Future和Callback需要二选一
* @return Future对象。
*/
public Future<TimelineEntry> getAsync(Long sequenceID, TimelineCallback<Long> callback) {
if (sequenceID == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ public interface IMessage {
*/
void addAttribute(String name, String value);

/**
* 更新属性列的值。
* @param name 属性名。
* @param newValue 属性值。
*/
void updateAttribute(String name, String newValue);

/**
* 获取属性列的列名和列值。
* @return 属性的名字和值。
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,15 @@ public void addAttribute(String name, String value) {
attributes.put(name, value);
}

@Override
public void updateAttribute(String name, String newValue) {
if (!attributes.containsKey(name)) {
throw new TimelineException(TimelineExceptionType.INVALID_USE,
"Attribute[" + name + "] is not exist.");
}
addAttribute(name, newValue);
}

@Override
public Map<String, String> getAttributes() {
return Collections.unmodifiableMap(this.attributes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public class DistributeTimelineConfig {
*/
private String messageContentSuffix = "content";

private String messageContentCountSuffix = "column_count";

/**
* 消息内容的crc32值,用于校验数据是否完整。
*/
Expand Down Expand Up @@ -367,4 +369,20 @@ public WriterConfig getWriterConfig() {
public void setWriterConfig(WriterConfig writerConfig) {
this.writerConfig = writerConfig;
}

/**
* 获取消息内容列的个数,主要是为了避免update内容后,内容列变短带来的影响。
* @return 消息内容列个数的后缀
*/
public String getMessageContentCountSuffix() {
return messageContentCountSuffix;
}

/**
* 设置消息内容列的个数,主要是为了避免update内容后,内容列变短带来的影响。
* @param messageContentCountSuffix 消息内容列个数的后缀
*/
public void setMessageContentCountSuffix(String messageContentCountSuffix) {
this.messageContentCountSuffix = messageContentCountSuffix;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* 基于表格存储(Table Store)的分布式存储层实现.
Expand Down Expand Up @@ -55,11 +56,14 @@ public TimelineEntry write(String timelineID, IMessage message) {
@Override
public void batch(String timelineID, IMessage message) {
if (tableStoreWriter == null) {
ExecutorService executor = Executors.newFixedThreadPool(config.getClientConfiguration().getIoThreadCount());
tableStoreWriter = new DefaultTableStoreWriter(tableStore, config.getTableName(),
config.getWriterConfig(), null, executor);
synchronized(this) {
if (tableStoreWriter == null) {
ExecutorService executor = Executors.newFixedThreadPool(config.getClientConfiguration().getIoThreadCount());
tableStoreWriter = new DefaultTableStoreWriter(tableStore, config.getTableName(),
config.getWriterConfig(), null, executor);
}
}
}

tableStoreWriter.addRowChange(createPutRowRequest(timelineID, message).getRowChange());
}

Expand All @@ -78,6 +82,39 @@ public Future<TimelineEntry> writeAsync(final String timelineID,
}
}

@Override
public TimelineEntry update(String timelineID,
Long sequenceID,
IMessage message)
{
try {
Future<TimelineEntry> res = updateAsync(timelineID, sequenceID, message, null);
return Utils.waitForFuture(res);
} catch (TableStoreException ex) {
throw handleTableStoreException(ex, timelineID, "update");
} catch (ClientException ex) {
throw new TimelineException(TimelineExceptionType.INVALID_USE,
"Parameter is invalid, reason:" + ex.getMessage(), ex);
}
}

@Override
public Future<TimelineEntry> updateAsync(String timelineID,
Long sequenceID,
IMessage message,
TimelineCallback<IMessage> callback)
{
try {
UpdateRowRequest request = createUpdateRowRequest(timelineID, sequenceID, message);
return doUpdateAsync(timelineID, message, callback, request);
} catch (TableStoreException ex) {
throw handleTableStoreException(ex, timelineID, "update");
} catch (ClientException ex) {
throw new TimelineException(TimelineExceptionType.INVALID_USE,
"Parameter is invalid, reason:" + ex.getMessage(), ex);
}
}

@Override
public TimelineEntry read(String timelineID, Long sequenceID) {
try {
Expand Down Expand Up @@ -249,9 +286,9 @@ private PutRowRequest createPutRowRequest(String timelineID, IMessage message) {
putChange.setReturnType(ReturnType.RT_PK);

byte[] content = message.serialize();
if (content.length > 1000 * 1024 * 1024) {
if (content.length > 2 * 1024 * 1024) {
throw new TimelineException(TimelineExceptionType.INVALID_USE,
String.format("Message Content must less than 1GB, current:%s", String.valueOf(content.length)));
String.format("Message Content must less than 2MB, current:%s", String.valueOf(content.length)));
}

/**
Expand All @@ -271,6 +308,14 @@ private PutRowRequest createPutRowRequest(String timelineID, IMessage message) {
pos += columnValue.length;
}

/**
* Write Message Content Count
*/
{
String columnName = Utils.SYSTEM_COLUMN_NAME_PREFIX + config.getMessageContentCountSuffix();
putChange.addColumn(columnName, ColumnValue.fromLong(index - Utils.CONTENT_COLUMN_START_ID));
}

/**
* Write CRC32.
*/
Expand Down Expand Up @@ -302,6 +347,78 @@ private PutRowRequest createPutRowRequest(String timelineID, IMessage message) {
return request;
}

private UpdateRowRequest createUpdateRowRequest(String timelineID, Long sequenceID, IMessage message) {
UpdateRowRequest request = new UpdateRowRequest();
RowUpdateChange updateChange = new RowUpdateChange(config.getTableName());

PrimaryKeyColumn firstPK = new PrimaryKeyColumn(config.getFirstPKName(), PrimaryKeyValue.fromString(timelineID));
PrimaryKeyColumn secondPK = new PrimaryKeyColumn(config.getSecondPKName(), PrimaryKeyValue.fromLong(sequenceID));
updateChange.setPrimaryKey(PrimaryKeyBuilder.createPrimaryKeyBuilder().
addPrimaryKeyColumn(firstPK).addPrimaryKeyColumn(secondPK).build());
updateChange.setReturnType(ReturnType.RT_PK);

/**
* Write message content.
*/
byte[] content = message.serialize();
if (content.length > 2 * 1024 * 1024) {
throw new TimelineException(TimelineExceptionType.INVALID_USE,
String.format("Message Content must less than 2MB, current:%s", String.valueOf(content.length)));
}

int pos = 0;
int index = Utils.CONTENT_COLUMN_START_ID;
while (pos < content.length) {
byte[] columnValue;
if (pos + config.getColumnMaxLength() < content.length) {
columnValue = Arrays.copyOfRange(content, pos, pos + config.getColumnMaxLength());
} else {
columnValue = Arrays.copyOfRange(content, pos, content.length);
}
String columnName = Utils.SYSTEM_COLUMN_NAME_PREFIX + config.getMessageContentSuffix() + String.valueOf(index++);
updateChange.put(String.valueOf(columnName), ColumnValue.fromBinary(columnValue));
pos += columnValue.length;
}

/**
* Write Message Content Count
*/
{
String columnName = Utils.SYSTEM_COLUMN_NAME_PREFIX + config.getMessageContentCountSuffix();
updateChange.put(columnName, ColumnValue.fromLong(index - Utils.CONTENT_COLUMN_START_ID));
}

/**
* Write CRC32.
*/
if (config.getColumnNameOfMessageCrc32Suffix() != null && !config.getColumnNameOfMessageCrc32Suffix().isEmpty()) {
long crc32 = Utils.crc32(content);
String columnName = Utils.SYSTEM_COLUMN_NAME_PREFIX + config.getColumnNameOfMessageCrc32Suffix();
updateChange.put(columnName, ColumnValue.fromLong(crc32));
}

/**
* Write message ID.
*/
updateChange.put(Utils.SYSTEM_COLUMN_NAME_PREFIX + config.getMessageIDColumnNameSuffix(), ColumnValue.fromString(message.getMessageID()));

/**
* Write message attributes.
*/
Map<String, String> attributes = message.getAttributes();
for (String key : attributes.keySet()) {
if (key.startsWith(Utils.SYSTEM_COLUMN_NAME_PREFIX))
{
throw new TimelineException(TimelineExceptionType.INVALID_USE,
String.format("Attribute name:%s can not start with %s", key, Utils.SYSTEM_COLUMN_NAME_PREFIX));
}
updateChange.put(key, ColumnValue.fromString(attributes.get(key)));
}

request.setRowChange(updateChange);
return request;
}

private GetRowRequest createGetRowRequest(String timelineID, Long sequenceID) {
GetRowRequest request = new GetRowRequest();
PrimaryKeyColumn firstPK = new PrimaryKeyColumn(config.getFirstPKName(), PrimaryKeyValue.fromString(timelineID));
Expand Down Expand Up @@ -445,6 +562,72 @@ public TimelineEntry get(long timeout, TimeUnit unit) throws InterruptedExceptio
};
}

private Future<TimelineEntry> doUpdateAsync(final String timelineID,
final IMessage message,
final TimelineCallback<IMessage> callback,
UpdateRowRequest request)
{
final TableStoreCallback<UpdateRowRequest, UpdateRowResponse> tablestoreCallback = new TableStoreCallback<UpdateRowRequest, UpdateRowResponse>() {
@Override
public void onCompleted(UpdateRowRequest request, UpdateRowResponse response) {
long sequenceID = response.getRow().getPrimaryKey().getPrimaryKeyColumn(config.getSecondPKName()).getValue().asLong();
TimelineEntry timelineEntry = new TimelineEntry(sequenceID, message);
callback.onCompleted(timelineID, message, timelineEntry);
}

@Override
public void onFailed(UpdateRowRequest putRowRequest, Exception e) {
e = createException(e, timelineID, "update");

callback.onFailed(timelineID, message, e);
}
};

final Future<UpdateRowResponse> future = tableStore.updateRow(request, tablestoreCallback);
return new Future<TimelineEntry>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return future.cancel(mayInterruptIfRunning);
}

@Override
public boolean isCancelled() {
return future.isCancelled();
}

@Override
public boolean isDone() {
return future.isDone();
}

@Override
public TimelineEntry get() throws InterruptedException, ExecutionException {
try {
UpdateRowResponse response = future.get();
return Utils.toTimelineEntry(response, message);
} catch (TableStoreException ex) {
throw handleTableStoreException(ex, timelineID, "update");
} catch (ClientException ex) {
throw new TimelineException(TimelineExceptionType.INVALID_USE,
"Drop store failed, reason:" + ex.getMessage(), ex);
}
}

@Override
public TimelineEntry get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
try {
UpdateRowResponse response = future.get(timeout, unit);
return Utils.toTimelineEntry(response, message);
} catch (TableStoreException ex) {
throw handleTableStoreException(ex, timelineID, "update");
} catch (ClientException ex) {
throw new TimelineException(TimelineExceptionType.INVALID_USE,
"Drop store failed, reason:" + ex.getMessage(), ex);
}
}
};
}

private TimelineException handleTableStoreException(TableStoreException ex, String timelineID, String type) {
if (ex.getErrorCode().equals("OTSObjectNotExist")) {
return new TimelineException(TimelineExceptionType.INVALID_USE,
Expand Down
Loading

0 comments on commit 10dc30d

Please sign in to comment.