Skip to content

Commit

Permalink
Checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
bretambrose committed Mar 30, 2024
1 parent b35eb69 commit e98c217
Show file tree
Hide file tree
Showing 6 changed files with 908 additions and 61 deletions.
10 changes: 8 additions & 2 deletions include/aws/mqtt/private/request-response/subscription_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ enum aws_rr_subscription_event_type {
* Under normal circumstances this can happen when
*
* (1) failure to rejoin a session
* (2) a successful unsubscribe when the subscription is no longer needed
*/
ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIPTION_ENDED,

Expand All @@ -57,7 +56,14 @@ enum aws_rr_subscription_event_type {
* (1) Permission failures
* (2) Invalid topic filter
*/
ARRSET_STREAMING_SUBSCRIPTION_HALTED
ARRSET_STREAMING_SUBSCRIPTION_HALTED,

/*
* A subscription has been unsubscribed from
*
* This particular event is global, with an operation id of 0.
*/
ARRSET_UNSUBSCRIBE_COMPLETE,
};

struct aws_rr_subscription_status_event {
Expand Down
49 changes: 38 additions & 11 deletions source/request-response/request_response_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,10 @@ static void s_complete_request_operation_with_failure(struct aws_mqtt_rr_client_
aws_mqtt_rr_client_operation_release(operation);
}

static void s_streaming_operation_emit_streaming_subscription_event(struct aws_mqtt_rr_client_operation *operation, enum aws_rr_streaming_subscription_event_type event_type, int error_code) {
static void s_streaming_operation_emit_streaming_subscription_event(
struct aws_mqtt_rr_client_operation *operation,
enum aws_rr_streaming_subscription_event_type event_type,
int error_code) {
aws_mqtt_streaming_operation_subscription_status_fn *subscription_status_callback =
operation->storage.streaming_storage.options.subscription_status_callback;

Expand Down Expand Up @@ -740,7 +743,10 @@ static void s_aws_rr_subscription_status_event_task_delete(struct aws_rr_subscri
aws_mem_release(task->allocator, task);
}

static void s_on_streaming_operation_subscription_status_event(struct aws_mqtt_rr_client_operation *operation, struct aws_byte_cursor topic_filter, enum aws_rr_subscription_event_type event_type) {
static void s_on_streaming_operation_subscription_status_event(
struct aws_mqtt_rr_client_operation *operation,
struct aws_byte_cursor topic_filter,
enum aws_rr_subscription_event_type event_type) {
(void)topic_filter;

switch (event_type) {
Expand All @@ -749,11 +755,13 @@ static void s_on_streaming_operation_subscription_status_event(struct aws_mqtt_r
s_change_operation_state(operation, AWS_MRROS_SUBSCRIBED);
}

s_streaming_operation_emit_streaming_subscription_event(operation, ARRSSET_SUBSCRIPTION_ESTABLISHED, AWS_ERROR_SUCCESS);
s_streaming_operation_emit_streaming_subscription_event(
operation, ARRSSET_SUBSCRIPTION_ESTABLISHED, AWS_ERROR_SUCCESS);
break;

case ARRSET_STREAMING_SUBSCRIPTION_LOST:
s_streaming_operation_emit_streaming_subscription_event(operation, ARRSSET_SUBSCRIPTION_LOST, AWS_ERROR_SUCCESS);
s_streaming_operation_emit_streaming_subscription_event(
operation, ARRSSET_SUBSCRIPTION_LOST, AWS_ERROR_SUCCESS);
break;

case ARRSET_STREAMING_SUBSCRIPTION_HALTED:
Expand All @@ -774,8 +782,14 @@ static void s_handle_subscription_status_event_task(struct aws_task *task, void
goto done;
}

if (event_task->type == ARRSET_UNSUBSCRIBE_COMPLETE) {
s_mqtt_request_response_client_wake_service(event_task->rr_client);
goto done;
}

struct aws_hash_element *element = NULL;
if (aws_hash_table_find(&event_task->rr_client->operations, &event_task->operation_id, &element) || element == NULL) {
if (aws_hash_table_find(&event_task->rr_client->operations, &event_task->operation_id, &element) ||
element == NULL) {
goto done;
}

Expand All @@ -791,7 +805,11 @@ static void s_handle_subscription_status_event_task(struct aws_task *task, void
case ARRSET_STREAMING_SUBSCRIPTION_ESTABLISHED:
case ARRSET_STREAMING_SUBSCRIPTION_LOST:
case ARRSET_STREAMING_SUBSCRIPTION_HALTED:
s_on_streaming_operation_subscription_status_event(operation, aws_byte_cursor_from_buf(&event_task->topic_filter), event_task->type);
s_on_streaming_operation_subscription_status_event(
operation, aws_byte_cursor_from_buf(&event_task->topic_filter), event_task->type);
break;

default:
break;
}

Expand All @@ -800,8 +818,12 @@ static void s_handle_subscription_status_event_task(struct aws_task *task, void
s_aws_rr_subscription_status_event_task_delete(event_task);
}

static struct aws_rr_subscription_status_event_task *s_aws_rr_subscription_status_event_task_new(struct aws_allocator *allocator, struct aws_mqtt_request_response_client *rr_client, const struct aws_rr_subscription_status_event *event) {
struct aws_rr_subscription_status_event_task *task = aws_mem_calloc(allocator, 1, sizeof(struct aws_rr_subscription_status_event_task));
static struct aws_rr_subscription_status_event_task *s_aws_rr_subscription_status_event_task_new(
struct aws_allocator *allocator,
struct aws_mqtt_request_response_client *rr_client,
const struct aws_rr_subscription_status_event *event) {
struct aws_rr_subscription_status_event_task *task =
aws_mem_calloc(allocator, 1, sizeof(struct aws_rr_subscription_status_event_task));

task->allocator = allocator;
task->type = event->type;
Expand Down Expand Up @@ -833,7 +855,8 @@ static void s_aws_rr_client_subscription_status_event_callback(
AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(rr_client->loop));
AWS_FATAL_ASSERT(rr_client->state != AWS_RRCS_SHUTTING_DOWN);

struct aws_rr_subscription_status_event_task *task = s_aws_rr_subscription_status_event_task_new(rr_client->allocator, rr_client, event);
struct aws_rr_subscription_status_event_task *task =
s_aws_rr_subscription_status_event_task_new(rr_client->allocator, rr_client, event);

aws_event_loop_schedule_task_now(rr_client->loop, &task->task);
}
Expand Down Expand Up @@ -1198,7 +1221,8 @@ static void s_handle_operation_subscribe_result(

if (operation->type == AWS_MRROT_STREAMING) {
s_change_operation_state(operation, AWS_MRROS_SUBSCRIBED);
s_streaming_operation_emit_streaming_subscription_event(operation, ARRSSET_SUBSCRIPTION_ESTABLISHED, AWS_ERROR_SUCCESS);
s_streaming_operation_emit_streaming_subscription_event(
operation, ARRSSET_SUBSCRIPTION_ESTABLISHED, AWS_ERROR_SUCCESS);
} else {
s_make_mqtt_request(client, operation);
}
Expand Down Expand Up @@ -1533,7 +1557,10 @@ static void s_mqtt_rr_client_submit_operation(struct aws_task *task, void *arg,
// NYI other tables

// add to timeout priority queue
aws_priority_queue_push_ref(&client->operations_by_timeout, (void *)&operation, &operation->priority_queue_node);
if (operation->type == AWS_MRROT_REQUEST) {
aws_priority_queue_push_ref(
&client->operations_by_timeout, (void *)&operation, &operation->priority_queue_node);
}

// enqueue
aws_linked_list_push_back(&operation->client_internal_ref->operation_queue, &operation->node);
Expand Down
37 changes: 31 additions & 6 deletions source/request-response/subscription_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ static struct aws_rr_subscription_record *s_get_subscription_record(
struct aws_subscription_stats {
size_t request_response_subscriptions;
size_t event_stream_subscriptions;
size_t unsubscribing_event_stream_subscriptions;
};

static int s_rr_subscription_count_foreach_wrap(void *context, struct aws_hash_element *elem) {
Expand All @@ -210,6 +211,9 @@ static int s_rr_subscription_count_foreach_wrap(void *context, struct aws_hash_e

if (subscription->type == ARRST_EVENT_STREAM) {
++stats->event_stream_subscriptions;
if (subscription->pending_action == ARRSPAT_UNSUBSCRIBING) {
++stats->unsubscribing_event_stream_subscriptions;
}
} else {
++stats->request_response_subscriptions;
}
Expand All @@ -227,9 +231,10 @@ static void s_get_subscription_stats(
AWS_LOGF_DEBUG(
AWS_LS_MQTT_REQUEST_RESPONSE,
"request-response subscription manager current stats: %d event stream sub records, %d request-response sub "
"records",
"records, %d unsubscribing event stream subscriptions",
(int)stats->event_stream_subscriptions,
(int)stats->request_response_subscriptions);
(int)stats->request_response_subscriptions,
(int)stats->unsubscribing_event_stream_subscriptions);
}

static void s_remove_listener_from_subscription_record(
Expand Down Expand Up @@ -326,6 +331,9 @@ static const char *s_rr_subscription_event_type_to_c_str(enum aws_rr_subscriptio

case ARRSET_STREAMING_SUBSCRIPTION_HALTED:
return "StreamingSubscriptionHalted";

case ARRSET_UNSUBSCRIBE_COMPLETE:
return "UnsubscribeComplete";
}

return "Unknown";
Expand All @@ -344,9 +352,10 @@ static bool s_subscription_type_matches_event_type(
case ARRSET_STREAMING_SUBSCRIPTION_LOST:
case ARRSET_STREAMING_SUBSCRIPTION_HALTED:
return subscription_type == ARRST_EVENT_STREAM;
}

return false;
default:
return true;
}
}

static void s_emit_subscription_event(
Expand Down Expand Up @@ -447,7 +456,8 @@ enum aws_acquire_subscription_result_type aws_rr_subscription_manager_acquire_su

if (!space_for_subscription) {
// could space eventually free up?
if (options->type == ARRST_REQUEST_RESPONSE || stats.request_response_subscriptions > 1) {
if (options->type == ARRST_REQUEST_RESPONSE || stats.request_response_subscriptions > 1 ||
stats.unsubscribing_event_stream_subscriptions > 0) {
AWS_LOGF_DEBUG(
AWS_LS_MQTT_REQUEST_RESPONSE,
"request-response subscription manager - acquire subscription for ('" PRInSTR
Expand Down Expand Up @@ -568,7 +578,14 @@ static void s_handle_protocol_adapter_request_subscription_event(

if (event->error_code == AWS_ERROR_SUCCESS) {
record->status = ARRSST_NOT_SUBSCRIBED;
s_emit_subscription_event(manager, record, ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIPTION_ENDED);

struct aws_rr_subscription_status_event event = {
.type = ARRSET_UNSUBSCRIBE_COMPLETE,
.topic_filter = record->topic_filter_cursor,
.operation_id = 0,
};

(*manager->config.subscription_status_callback)(&event, manager->config.userdata);
}
}
}
Expand Down Expand Up @@ -599,6 +616,14 @@ static void s_handle_protocol_adapter_streaming_subscription_event(

if (event->error_code == AWS_ERROR_SUCCESS) {
record->status = ARRSST_NOT_SUBSCRIBED;

struct aws_rr_subscription_status_event event = {
.type = ARRSET_UNSUBSCRIBE_COMPLETE,
.topic_filter = record->topic_filter_cursor,
.operation_id = 0,
};

(*manager->config.subscription_status_callback)(&event, manager->config.userdata);
}
}
}
Expand Down
17 changes: 9 additions & 8 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -535,14 +535,15 @@ add_test_case(rrc_submit_request_operation_failure_by_shutdown)
add_test_case(rrc_submit_streaming_operation_and_shutdown)
add_test_case(rrc_submit_request_operation_failure_by_timeout)

add_test_case(rrc_streaming_operation_success)
#add_test_case(rrc_streaming_operation_success_overlapping)
#add_test_case(rrc_streaming_operation_clean_session_reestablish_subscription)
#add_test_case(rrc_streaming_operation_resume_session)
#add_test_case(rrc_streaming_operation_first_subscribe_times_out_resub_succeeds)
#add_test_case(rrc_streaming_operation_first_subscribe_retryable_failure_resub_succeeds)
#add_test_case(rrc_streaming_operation_subscribe_unretryable_failure)
#add_test_case(rrc_streaming_operation_failure_exceeds_subscription_budget)
add_test_case(rrc_streaming_operation_success_single)
add_test_case(rrc_streaming_operation_success_overlapping)
add_test_case(rrc_streaming_operation_success_starting_offline)
add_test_case(rrc_streaming_operation_clean_session_reestablish_subscription)
add_test_case(rrc_streaming_operation_resume_session)
add_test_case(rrc_streaming_operation_first_subscribe_times_out_resub_succeeds)
add_test_case(rrc_streaming_operation_first_subscribe_retryable_failure_resub_succeeds)
add_test_case(rrc_streaming_operation_subscribe_unretryable_failure)
add_test_case(rrc_streaming_operation_failure_exceeds_subscription_budget)
#add_test_case(rrc_streaming_operation_success_delayed_by_request_operations)
#add_test_case(rrc_streaming_operation_success_sandwiched_by_request_operations)

Expand Down
Loading

0 comments on commit e98c217

Please sign in to comment.