-
Notifications
You must be signed in to change notification settings - Fork 85
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[server][dvc] Drop partitions asynchronously #1310
base: main
Are you sure you want to change the base?
Changes from all commits
682cf65
b49d9c4
88a78a5
772166b
ee2591d
41836ec
de4d3d0
7db261b
fee2146
483fa9d
13a1de8
04d43f1
fb034c8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,8 +29,8 @@ | |
import com.linkedin.davinci.stats.AggVersionedIngestionStats; | ||
import com.linkedin.davinci.stats.ParticipantStoreConsumptionStats; | ||
import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService; | ||
import com.linkedin.davinci.storage.StorageEngineRepository; | ||
import com.linkedin.davinci.storage.StorageMetadataService; | ||
import com.linkedin.davinci.storage.StorageService; | ||
import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend; | ||
import com.linkedin.davinci.store.view.VeniceViewWriterFactory; | ||
import com.linkedin.venice.SSLConfig; | ||
|
@@ -130,6 +130,7 @@ public class KafkaStoreIngestionService extends AbstractVeniceService implements | |
private static final String GROUP_ID_FORMAT = "%s_%s"; | ||
|
||
private static final Logger LOGGER = LogManager.getLogger(KafkaStoreIngestionService.class); | ||
private final StorageService storageService; | ||
|
||
private final VeniceConfigLoader veniceConfigLoader; | ||
|
||
|
@@ -190,7 +191,7 @@ public class KafkaStoreIngestionService extends AbstractVeniceService implements | |
private final ExecutorService aaWCWorkLoadProcessingThreadPool; | ||
|
||
public KafkaStoreIngestionService( | ||
StorageEngineRepository storageEngineRepository, | ||
StorageService storageService, | ||
VeniceConfigLoader veniceConfigLoader, | ||
StorageMetadataService storageMetadataService, | ||
ClusterInfoProvider clusterInfoProvider, | ||
|
@@ -212,6 +213,7 @@ public KafkaStoreIngestionService( | |
PubSubClientsFactory pubSubClientsFactory, | ||
Optional<SSLFactory> sslFactory, | ||
HeartbeatMonitoringService heartbeatMonitoringService) { | ||
this.storageService = storageService; | ||
this.cacheBackend = cacheBackend; | ||
this.recordTransformerFunction = recordTransformerFunction; | ||
this.storageMetadataService = storageMetadataService; | ||
|
@@ -448,7 +450,7 @@ public void handleStoreDeleted(Store store) { | |
|
||
ingestionTaskFactory = StoreIngestionTaskFactory.builder() | ||
.setVeniceWriterFactory(veniceWriterFactory) | ||
.setStorageEngineRepository(storageEngineRepository) | ||
.setStorageEngineRepository(storageService.getStorageEngineRepository()) | ||
.setStorageMetadataService(storageMetadataService) | ||
.setLeaderFollowerNotifiersQueue(leaderFollowerNotifiers) | ||
.setSchemaRepository(schemaRepo) | ||
|
@@ -519,6 +521,7 @@ private StoreIngestionTask createStoreIngestionTask( | |
}; | ||
|
||
return ingestionTaskFactory.getNewIngestionTask( | ||
storageService, | ||
store, | ||
version, | ||
getKafkaConsumerProperties(veniceStoreVersionConfig), | ||
|
@@ -920,6 +923,37 @@ public void stopConsumptionAndWait( | |
} | ||
} | ||
|
||
/** | ||
* Drops the corresponding Venice Partition gracefully. | ||
* This should only be called after {@link #stopConsumptionAndWait} has been called | ||
* @param veniceStore Venice Store for the partition. | ||
* @param partitionId Venice partition's id. | ||
*/ | ||
public void dropStoragePartitionGracefully(VeniceStoreVersionConfig veniceStore, int partitionId) { | ||
final String topic = veniceStore.getStoreVersionName(); | ||
|
||
if (isPartitionConsuming(topic, partitionId)) { | ||
throw new VeniceException("Tried to drop storage partition that is still consuming"); | ||
} | ||
|
||
try (AutoCloseableLock ignore = topicLockManager.getLockForResource(topic)) { | ||
StoreIngestionTask ingestionTask = topicNameToIngestionTaskMap.get(topic); | ||
if (ingestionTask != null && ingestionTask.isRunning()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am thinking of a race condition that after we add |
||
LOGGER.info( | ||
"Ingestion task is still running for Topic {}. Dropping partition {} asynchronously", | ||
veniceStore.getStoreVersionName(), | ||
partitionId); | ||
ingestionTask.dropPartition(new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic(topic), partitionId)); | ||
} else { | ||
LOGGER.info( | ||
"Ingestion task isn't running for Topic {}. Dropping partition {} synchronously", | ||
veniceStore.getStoreVersionName(), | ||
partitionId); | ||
this.storageService.dropStorePartition(veniceStore, partitionId, true); | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* This function will try to kill the ingestion tasks belonging to non-current versions. | ||
* And this is mainly being used by memory limiter feature to free up resources when encountering memory | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This exception could cause the ST to be in the ERROR state, is that right? I read function
stopConsumptionAndWait
and, today, we simply log a warning message if consumption couldn't be stopped in time. This sounds like a behavior change in the new PR and we probably want to be careful about it.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
STANDBY->OFFLINE
issues anUNSUBSCRIBE
message, and so willstopConsumptionAndWait
.By the time
SIT
processes theDROP_PARTITION
message, it should have been already unsubscribed.I think it's safe to remove this check. What do you think?