dynamodb analyst (2)- how to delete data more effective

一  各种删除方法的“坑”点:

先扫一盲,删除一个对象有下面一些风格:

Student.GroupKey id = new Student.GroupKey();
id.setParentId("321123"); //pk
id.setChildId("789");   //sk


//直接用ID删除
studentRepository.deleteById(id);

//直接用ID集批量删
studentRepository.deleteAllById(Arrays.asList(id));


Student student= new Student();
student.setId(id);

//直接删除"对象"

studentRepository.delete(student);


//批量删除"对象"集合
studentRepository.deleteAll(Arrays.asList(student));

这中间有3个关键点要注意:

(1)慎用studentRepository.deleteById(id);

因为它会先根据这个id来Query出对象,然后删除,如果查不到这个对象,则直接报错,但是我们的删除往往不需要这么低效,而且我们也很少关心要删除的对象存在与否,所以要慎用。参考代码:org.socialsignin.spring.data.dynamodb.repository.support.SimpleDynamoDBCrudRepository#deleteById

@Override
public void deleteById(ID id) {

   Assert.notNull(id, "The given id must not be null!");

   Optional<T> entity = findById(id);

   if (entity.isPresent()) {
      dynamoDBOperations.delete(entity.get());

   } else {
      throw new EmptyResultDataAccessException(String.format("No %s entity with id %s exists!", domainType, id),
            1);
   }
}

(2)studentRepository.deleteAllById(Arrays.asList(id));虽然有接口,但是还尚未实现,至少目前如此,巨坑。具体的错误如下:

2021-08-02 14:00:22.901 INFO 16600 --- [ main] e.d.DynamodbApplicationForDeleteImproved : Started DynamodbApplicationForDeleteImproved in 1.919 seconds (JVM running for 2.407)
Exception in thread "main" java.lang.IllegalArgumentException: No fragment found for method public abstract void org.springframework.data.repository.CrudRepository.deleteAllById(java.lang.Iterable)
at org.springframework.data.repository.core.support.RepositoryComposition$RepositoryFragments.lambda$findImplementationFragment$2(RepositoryComposition.java:537)
at java.base/java.util.Optional.orElseThrow(Optional.java:403)
at org.springframework.data.repository.core.support.RepositoryComposition$RepositoryFragments.findImplementationFragment(RepositoryComposition.java:537)
at java.base/java.util.concurrent.ConcurrentMap.computeIfAbsent(ConcurrentMap.java:330)
at org.springframework.data.repository.core.support.RepositoryComposition$RepositoryFragments.invoke(RepositoryComposition.java:513)
at org.springframework.data.repository.core.support.RepositoryComposition.invoke(RepositoryComposition.java:285)
at org.springframework.data.repository.core.support.RepositoryFactorySupport$ImplementationMethodExecutionInterceptor.invoke(RepositoryFactorySupport.java:599)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.doInvoke(QueryExecutorMethodInterceptor.java:163)

(3)studentRepository.delete(student); 貌似很不好的方法,因为看起来需要先查询出整个student对象,实际并非如此,因为只需要构建出对象,把相关的Id传进去就可以了。实际删除请求构建的时候,压根不需要其他属性。参考下面的代码:

com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBMapper#delete

@Override
public <T> void delete(T object, DynamoDBDeleteExpression deleteExpression, DynamoDBMapperConfig config) {
    config = mergeConfig(config);

    @SuppressWarnings("unchecked")
    Class<T> clazz = (Class<T>) object.getClass();
    final DynamoDBMapperTableModel<T> model = getTableModel(clazz, config);

    String tableName = getTableName(clazz, object, config);

    Map<String, AttributeValue> key = model.convertKey(object);
 
    //......
    DeleteItemRequest req = new DeleteItemRequest().withKey(key)
            .withTableName(tableName).withExpected(internalAssertions)
            .withRequestMetricCollector(config.getRequestMetricCollector());

    //......
    db.deleteItem(applyUserAgent(req));
}

其中“model.convertKey(object)”的实现如下:

public <H,R> Map<String,AttributeValue> convertKey(final T key) {
    final DynamoDBMapperFieldModel<T,H> hk = this.<H>hashKey();
    final DynamoDBMapperFieldModel<T,R> rk = this.<R>rangeKeyIfExists();
    return this.<H,R>convertKey(hk.get(key), (rk == null ? (R)null : rk.get(key)));
}

可见根本不需要其他属性。

(4)studentRepository.deleteAll(Arrays.asList(student)); 批量删除,没有啥好说的。同样,student仅需要传入Id就行了,无须其他属性,所以不需要先执行查询构建student这种操作(假设已经知道Id)。

总结:上述四种删除,很明显,假设只知道Id的情况下,单个删除用studentRepository.delete(student);多个删除用studentRepository.deleteAll(Arrays.asList(student)); 其他两种一种不能用,一种慎用。

二:假设一个表有2个key(pk + sk),如何仅仅用pk删除所有数据

实际上,这个问题对于其他数据库,例如Cassandra,不存在这个问题,但是对于Dynamodb本身,遇到双Key的表仅带Pk是没有办法删除数据的(不要问为啥,因为闭源),直接报错。此时如果想删除怎么办尼?只能先根据PK来找出所有的SK,然后批量使用PK + SK来删除对象。那这种通用的需求肯定已经有人实现过了。是的。在高版本的spring-data-dynamodb(低版本不支持的)中存在这样的删除方法支持。

你需要额外定义一个JPA方法:

void deleteByXXX(String id) //XXX是PK对应的变量名。

为什么需要额外定义:因为默认的org.springframework.data.repository.CrudRepository没有内置这个方法。

当我们定义后,一旦调用,会自动创建一个Query然后查询出数据集合,然后批量调用删除。这里可以可以参考关键代码:

class DeleteExecution implements QueryExecution<T, ID> {

@Override
public Object execute(AbstractDynamoDBQuery<T, ID> dynamoDBQuery, Object[] values) throws BatchDeleteException {
    List entities = dynamoDBQuery.doCreateQueryWithPermissions(values).getResultList(); //paged,有效的page
    List<DynamoDBMapper.FailedBatch> failedBatches = dynamoDBOperations.batchDelete(entities);
    //...... 
}

说到这里,你可能觉得这不就完事了么?但是这个是真正高效的删除方法么?考虑这点,我们需要衡量两个因素:
(1)在前文我们提到,删除需要知道所有的数据。而实际上仅仅知道PK和SK就足够了,不需要其他属性。而实际上一条记录,“大头”都是非key部分。所以衡量是否高效,我们要看它的查询是不是仅返回PK和SK就够了。实际上仅SK就够了。因为你是知道PK的不是么?
(2)当遇到大量数据的时候,我们需要衡量是不是上来把所有的数据全部加载到内存了再去删除。特别是在你没有查询结果仅包含ID的情况下。

了解了上面两个因素,我们来衡量下上面提及的方法是不是最好的。
(1)是不是查询仅返回rankey,其实可以@Query注解来约束,所以是我们自己要做的事情。如果不约束,很明显会读取所有的额外属性值的。可以参考下面的方法来设置:

   @Query(fields = "rangekeyname")
   void deleteByXXX(String id) //XXX是PK对应的变量名。

实现原理在于,当设置fields后,在发送请求时,会带上参数”ProjectionExpression”来告诉dynamodb仅返回指定的属性。
(2)遇到大量数据时,是否分批来删除,如果单纯看上述的“DeleteExecution”实现,你会觉得会分批的,毕竟“ dynamoDBQuery.doCreateQueryWithPermissions(values).getResultList()”正常返回的是PaginatedQueryList类型。
所以后面的删除会遍历这个PaginatedQueryList。而一旦遍历,就会按需自动加载后面的“页”。实际上就算不是遍历,直接get(index)也可能会自动按需加载后面的页:

    @Override
    public T get(int n) {
        checkUnsupportedOperationForIterationOnlyMode("get(int n)");
        
        while ( allResults.size() <= n && nextResultsAvailable() ) {
            moveNextResults(false);
        }

        return allResults.get(n);
    }

所以至少表明看,当删除大数据量内容,看起来是满足之前描述的要求的:分批获取rangeId,分批去删除,避免一次加载所有。但是实际测试、调试你会发现,事与愿违。可以查看下面的代码:

	@Override
	public List batchDelete(Iterable entities) {
		entities.forEach(it -> maybeEmitEvent(it, BeforeDeleteEvent::new)); ////这个地方把内容加载完毕了。

		List result = dynamoDBMapper.batchDelete(entities);

		entities.forEach(it -> maybeEmitEvent(it, AfterDeleteEvent::new));
		return result;
	}

在真正删除之前,会调用"entities.forEach(it -> maybeEmitEvent(it, BeforeDeleteEvent::new));", 它本来是好心做一些“发布事件”的。但是越误打误撞,在删除之前就直接把所有的内容全部加载完了。

所以综合起来看,spring-data-dynamodb提供的这个方法是不满足我们的理想条件的。

那么上述方法不符合条件时,我们是不是可以自己去实现,例如我们自己实现一个删除。不过此时,我们还需要实现一个get操作。所以大概率我们可能会直接使用JPA定义如下一个get方法来获取所有的rangekey集合:

   @Query(fields = "rangekeyname")
   List getByXXX(String id) //XXX是PK对应的变量名。

在上述代码中,返回的List实际也是PaginatedQueryList。此时我们再手工写一个不提前遍历的删除,例如如下:

	@Override
	public List batchDelete(Iterable entities) {
		List result = dynamoDBMapper.batchDelete(entities);
		return result;
	}

这样看起来好像实现了,前面获取一个PaginatedQueryList,然后中途没有遍历,所以没有提前记载,到删除的时候,会遍历,从而批量加载,然后以每25个元素一批来批量删除。但是实际上,如果你测试,你会发现又事与愿违,还是会在具体删除之前就所有的都加载了。那么问题出在那了?这里直接说下答案:List getByXXX(String id)会返回PaginatedQueryList,但是会提前把内容加载完,为啥?参考下面的代码:

org.springframework.data.repository.core.support.QueryExecutionResultHandler#requiresConversion

for (Object o : collection) 
	private boolean requiresConversion(Collection collection, Class expectedReturnType,
			@Nullable TypeDescriptor elementDescriptor) {

		if (Streamable.class.isAssignableFrom(expectedReturnType) || !expectedReturnType.isInstance(collection)) {
			return true;
		}

		if (elementDescriptor == null || !Iterable.class.isAssignableFrom(expectedReturnType)) {
			return false;
		}

		Class type = elementDescriptor.getType();

		for (Object o : collection) {  //提前一次性把内容加载完了。foreach本质也是调用迭代器。

			if (!type.isInstance(o)) {   
				return true;
			}
		}

		return false;
	}

当调用getByXXX时,我们在底层确实拿到了一个分页的结果集,当时spring data会把这个结果和我们方法定义的类型(这里就是List)进行比较,并按需转化。而即使你定义的也是集合类型,看道理不需要转,但是实际上,要不要转,对于集合类型而言,判断很特殊,即还要判断每个子元素要不要转。而这又会遍历PaginatedQueryList。导致提前把内容在删除前加载完了。

所以我们上述写的方案又被我们否决了。接下来的重点来了,那么我们怎么实现尼?我们可以避免完全使用spring-data-dynamodb来实现一个最优的版本:

首先获取结果参考下面的代码:

    private QueryRequest createQuery(String tableName, String rangeKey, HashMap keyConditions, int limit){
        QueryRequest queryRequest = new QueryRequest();
        queryRequest.setTableName(tableName);
        queryRequest.setKeyConditions(keyConditions);

/*        // old aws service interface, not allowed to co-existed with followed statements
        queryRequest.withAttributesToGet(rangeKey);*/

        // new aws service interface
        queryRequest.setSelect(Select.SPECIFIC_ATTRIBUTES);
        queryRequest.withProjectionExpression(rangeKey);

        queryRequest.withLimit(limit);

        return queryRequest;
    }

上面代码创建了一个QueryRequest,指定了仅获取rangekey。然后使用下面的代码来执行,并按批执行删除。

        QueryRequest queryRequest = createQuery(tableName, rangeKey, keyConditions, 200);
        QueryResult queryResult = amazonDynamoDB.query(queryRequest);

        deleteByQueryResult(hashKeyValue, rangeKey, queryResult);

        if (queryResult.getLastEvaluatedKey() == null) {
            return;
        }

        // handle for multi pages;
        while (queryResult.getLastEvaluatedKey() != null) {
            queryRequest.setExclusiveStartKey(queryResult.getLastEvaluatedKey());
            queryResult = amazonDynamoDB.query(queryRequest);
            deleteByQueryResult(hashKeyValue, rangeKey, queryResult);
        }

其中删除代码参考下面的:

    private void deleteByQueryResult(String hashKeyValue, String rangeKey, QueryResult queryResult) {
        List> items = queryResult.getItems();
        List toDeleteEntities = items.stream().map(getFunctionForConvertingToToDeletedObjects(hashKeyValue, rangeKey)).collect(Collectors.toList());
        deleteAll(toDeleteEntities);   //调用repository.deleteAll(toDeleteEntities);
    }

那么上面的方法,为什么就不会自动提前把所有rangid加载完尼?关键点在于,我们自己的实现,没有任何提前遍历的操作,构建删除对象的时候用的是“queryResult.getItems()”,这个并不会提前加载下一页。而是获取当前获取到的内容。

所以以上方法才是最优化的。不过这里注意下另外一件事,即我们显式设置了“queryRequest.withLimit(limit)”,这样的好处在于分页会更小,如果我们不显示指定的话,默认是以返回内容1M为分页的(AWS DYNAMODB自己控制的)。那说到这个limit设置,其实@Query注解也支持,用法如下:

   @Query(fields = "rangekeyname", limit = 5)
   List getByXXX(String id) //XXX是PK对应的变量名。

那么用上述方法,分页更小了,不过不要忘记,为啥不用上面的方法来获取?因为上述方法虽然分页小一些,但是还记得上面刚提的问题么?它会在删除前把内容全部加载完。也就是比如10000的数据,那最终虽然可能每5个一批获取,但是在删除之前,它还是获取完了这10000个数据的。

最好,再额外补充下,同时分页思想,删除需要的提前获取rangeid为啥不用Pageable这个Spring data自带的功能?例如下面这样:

@Query(fields = "objectId", limit = 5)
Page getByParentId(String parentId, Pageable pageable);

我们可以看下它的实现关键代码:

private Page createPage(List allResults, Pageable pageable, AbstractDynamoDBQuery dynamoDBQuery,
      Object[] values) {

   // Get the result = this list might be a lazy list
   Iterator iterator = allResults.iterator();

   // Check if the pageable request is 'beyond' the result set
   if (!pageable.isUnpaged() && pageable.getOffset() > 0) {
      long processedCount = scanThroughResults(iterator, pageable.getOffset());
      if (processedCount < pageable.getOffset()) {
         return new PageImpl<>(Collections.emptyList());
      }
   }

   // Then Count the result set size
   Query countQuery = dynamoDBQuery.doCreateCountQueryWithPermissions(values, true);
   long count = countQuery.getSingleResult();

   // Finally wrap the result in a page -
   if (!pageable.isUnpaged()) {
      // either seek to the proper part of the result set
      if (getResultsRestrictionIfApplicable() != null) {
         count = Math.min(count, getResultsRestrictionIfApplicable());
      }

      List results = readPageOfResultsRestrictMaxResultsIfNecessary(iterator, pageable.getPageSize());
      return new PageImpl<>(results, pageable, count);
   } else {
      // or treat the whole (lazy) list as the result page if it's unpaged
      return new UnpagedPageImpl<>(allResults, count);
   }
}

它不适合用来删除的“查询”主要有2点:
(1)每次查都会调用count();
(2)其中的scanThroughResults会“调过”之前的“页数”,等于要显示第3页的内容,要先加载完第1和第2页。

		private long scanThroughResults(Iterator iterator, long resultsToScan) {
			long processed = 0;
			while (iterator.hasNext() && processed < resultsToScan) {  //又是一个循环。
				iterator.next();
				processed++;
			}
			return processed;
		}

所以以删除为目的的需求,我们用page的思路明显“太重了”。

所以综合起来看,最高效的方法,目前还是需要自己去实现。不过反思下,为什么不提供出这样的方法尼?因为每一次的提前遍历都有特殊用途:发布事件或者判断要不要转化。而且这些遍历可能不是自己一个项目,例如判断转化是spring data的统一逻辑。
所以一不小心就提前加载了。不过话说回来,或许很少人去思考这些细节,毕竟功能都正常,简单想下看起来很好就结束了。

最后的文末,再说个相关的小插曲:

/*        // old aws service interface, not allowed to co-existed with followed statements
        queryRequest.withAttributesToGet(rangeKey);*/

        // new aws service interface
        queryRequest.setSelect(Select.SPECIFIC_ATTRIBUTES);
        queryRequest.withProjectionExpression(rangeKey);

仅获取部分属性时,上述2个字段都可行,但是不能同时设置,或者报错:

{"__type":"com.amazon.coral.validate#ValidationException","message":"Can not use both expression and non-expression parameters in the same request: Non-expression parameters: {AttributesToGet} Expression parameters: {ProjectionExpression}"}

通过上述研究,可见,删除一个东西在Dynamodb中并不方便,特别是仅有一个PK时,这点远不如Cassandra。