@@ -86,18 +86,17 @@ public List<Metadata> getUnprocessedCpcPlusMetaData(String orgAttribute) {
86
86
Map <String , AttributeValue > valueMap = new HashMap <>();
87
87
valueMap .put (":cpcValue" , new AttributeValue ().withS (Constants .CPC_DYNAMO_PARTITION_START + partition ));
88
88
valueMap .put (":cpcProcessedValue" , new AttributeValue ().withS ("false#" +year ));
89
- valueMap .put (":createDate" , new AttributeValue ().withS (cpcConversionStartDate ));
90
89
91
90
DynamoDBQueryExpression <Metadata > metadataQuery = new DynamoDBQueryExpression <Metadata >()
92
91
.withIndexName (indexName )
93
92
.withKeyConditionExpression (Constants .DYNAMO_CPC_ATTRIBUTE + " = :cpcValue and begins_with("
94
93
+ orgAttribute + ", :cpcProcessedValue)" )
95
- .withFilterExpression (Constants .DYNAMO_CREATE_DATE_ATTRIBUTE + " > :createDate" )
96
94
.withExpressionAttributeValues (valueMap )
97
95
.withConsistentRead (false );
98
96
99
- return mapper .get ().query (Metadata .class , metadataQuery ).stream ();
100
- }).flatMap (Function .identity ()).collect (Collectors .toList ());
97
+ return mapper .get ().query (Metadata .class , metadataQuery ).stream ().limit (10 );
98
+ })
99
+ .flatMap (Function .identity ()).collect (Collectors .toList ());
101
100
} else {
102
101
API_LOG .warn ("Could not get unprocessed CPC+ metadata because the dynamodb mapper is absent" );
103
102
return Collections .emptyList ();
@@ -123,17 +122,15 @@ public List<Metadata> getUnprocessedPcfMetaData(String orgAttribute) {
123
122
Map <String , AttributeValue > valueMap = new HashMap <>();
124
123
valueMap .put (":pcfValue" , new AttributeValue ().withS (Constants .PCF_DYNAMO_PARTITION_START + partition ));
125
124
valueMap .put (":pcfProcessedValue" , new AttributeValue ().withS ("false#" +year ));
126
- valueMap .put (":createDate" , new AttributeValue ().withS (cpcConversionStartDate ));
127
125
128
126
DynamoDBQueryExpression <Metadata > metadataQuery = new DynamoDBQueryExpression <Metadata >()
129
127
.withIndexName (indexName )
130
128
.withKeyConditionExpression (Constants .DYNAMO_PCF_ATTRIBUTE + " = :pcfValue and begins_with("
131
129
+ orgAttribute + ", :pcfProcessedValue)" )
132
- .withFilterExpression (Constants .DYNAMO_CREATE_DATE_ATTRIBUTE + " > :createDate" )
133
130
.withExpressionAttributeValues (valueMap )
134
131
.withConsistentRead (false );
135
132
136
- return mapper .get ().query (Metadata .class , metadataQuery ).stream ();
133
+ return mapper .get ().query (Metadata .class , metadataQuery ).stream (). limit ( 10 ) ;
137
134
}).flatMap (Function .identity ()).collect (Collectors .toList ());
138
135
} else {
139
136
API_LOG .warn ("Could not get unprocessed PCF metadata because the dynamodb mapper is absent" );
0 commit comments