diff --git a/include/aws/mqtt/private/request-response/subscription_manager.h b/include/aws/mqtt/private/request-response/subscription_manager.h index a08bf09f..4d644af2 100644 --- a/include/aws/mqtt/private/request-response/subscription_manager.h +++ b/include/aws/mqtt/private/request-response/subscription_manager.h @@ -35,7 +35,6 @@ enum aws_rr_subscription_event_type { * Under normal circumstances this can happen when * * (1) failure to rejoin a session - * (2) a successful unsubscribe when the subscription is no longer needed */ ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIPTION_ENDED, @@ -57,7 +56,14 @@ enum aws_rr_subscription_event_type { * (1) Permission failures * (2) Invalid topic filter */ - ARRSET_STREAMING_SUBSCRIPTION_HALTED + ARRSET_STREAMING_SUBSCRIPTION_HALTED, + + /* + * A subscription has been unsubscribed from + * + * This particular event is global, with an operation id of 0. + */ + ARRSET_UNSUBSCRIBE_COMPLETE, }; struct aws_rr_subscription_status_event { diff --git a/source/request-response/request_response_client.c b/source/request-response/request_response_client.c index 6f120ede..a4c3819f 100644 --- a/source/request-response/request_response_client.c +++ b/source/request-response/request_response_client.c @@ -606,7 +606,10 @@ static void s_complete_request_operation_with_failure(struct aws_mqtt_rr_client_ aws_mqtt_rr_client_operation_release(operation); } -static void s_streaming_operation_emit_streaming_subscription_event(struct aws_mqtt_rr_client_operation *operation, enum aws_rr_streaming_subscription_event_type event_type, int error_code) { +static void s_streaming_operation_emit_streaming_subscription_event( + struct aws_mqtt_rr_client_operation *operation, + enum aws_rr_streaming_subscription_event_type event_type, + int error_code) { aws_mqtt_streaming_operation_subscription_status_fn *subscription_status_callback = operation->storage.streaming_storage.options.subscription_status_callback; @@ -740,7 +743,10 @@ static void s_aws_rr_subscription_status_event_task_delete(struct aws_rr_subscri aws_mem_release(task->allocator, task); } -static void s_on_streaming_operation_subscription_status_event(struct aws_mqtt_rr_client_operation *operation, struct aws_byte_cursor topic_filter, enum aws_rr_subscription_event_type event_type) { +static void s_on_streaming_operation_subscription_status_event( + struct aws_mqtt_rr_client_operation *operation, + struct aws_byte_cursor topic_filter, + enum aws_rr_subscription_event_type event_type) { (void)topic_filter; switch (event_type) { @@ -749,11 +755,13 @@ static void s_on_streaming_operation_subscription_status_event(struct aws_mqtt_r s_change_operation_state(operation, AWS_MRROS_SUBSCRIBED); } - s_streaming_operation_emit_streaming_subscription_event(operation, ARRSSET_SUBSCRIPTION_ESTABLISHED, AWS_ERROR_SUCCESS); + s_streaming_operation_emit_streaming_subscription_event( + operation, ARRSSET_SUBSCRIPTION_ESTABLISHED, AWS_ERROR_SUCCESS); break; case ARRSET_STREAMING_SUBSCRIPTION_LOST: - s_streaming_operation_emit_streaming_subscription_event(operation, ARRSSET_SUBSCRIPTION_LOST, AWS_ERROR_SUCCESS); + s_streaming_operation_emit_streaming_subscription_event( + operation, ARRSSET_SUBSCRIPTION_LOST, AWS_ERROR_SUCCESS); break; case ARRSET_STREAMING_SUBSCRIPTION_HALTED: @@ -774,8 +782,14 @@ static void s_handle_subscription_status_event_task(struct aws_task *task, void goto done; } + if (event_task->type == ARRSET_UNSUBSCRIBE_COMPLETE) { + s_mqtt_request_response_client_wake_service(event_task->rr_client); + goto done; + } + struct aws_hash_element *element = NULL; - if (aws_hash_table_find(&event_task->rr_client->operations, &event_task->operation_id, &element) || element == NULL) { + if (aws_hash_table_find(&event_task->rr_client->operations, &event_task->operation_id, &element) || + element == NULL) { goto done; } @@ -791,7 +805,11 @@ static void s_handle_subscription_status_event_task(struct aws_task *task, void case ARRSET_STREAMING_SUBSCRIPTION_ESTABLISHED: case ARRSET_STREAMING_SUBSCRIPTION_LOST: case ARRSET_STREAMING_SUBSCRIPTION_HALTED: - s_on_streaming_operation_subscription_status_event(operation, aws_byte_cursor_from_buf(&event_task->topic_filter), event_task->type); + s_on_streaming_operation_subscription_status_event( + operation, aws_byte_cursor_from_buf(&event_task->topic_filter), event_task->type); + break; + + default: break; } @@ -800,8 +818,12 @@ static void s_handle_subscription_status_event_task(struct aws_task *task, void s_aws_rr_subscription_status_event_task_delete(event_task); } -static struct aws_rr_subscription_status_event_task *s_aws_rr_subscription_status_event_task_new(struct aws_allocator *allocator, struct aws_mqtt_request_response_client *rr_client, const struct aws_rr_subscription_status_event *event) { - struct aws_rr_subscription_status_event_task *task = aws_mem_calloc(allocator, 1, sizeof(struct aws_rr_subscription_status_event_task)); +static struct aws_rr_subscription_status_event_task *s_aws_rr_subscription_status_event_task_new( + struct aws_allocator *allocator, + struct aws_mqtt_request_response_client *rr_client, + const struct aws_rr_subscription_status_event *event) { + struct aws_rr_subscription_status_event_task *task = + aws_mem_calloc(allocator, 1, sizeof(struct aws_rr_subscription_status_event_task)); task->allocator = allocator; task->type = event->type; @@ -833,7 +855,8 @@ static void s_aws_rr_client_subscription_status_event_callback( AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(rr_client->loop)); AWS_FATAL_ASSERT(rr_client->state != AWS_RRCS_SHUTTING_DOWN); - struct aws_rr_subscription_status_event_task *task = s_aws_rr_subscription_status_event_task_new(rr_client->allocator, rr_client, event); + struct aws_rr_subscription_status_event_task *task = + s_aws_rr_subscription_status_event_task_new(rr_client->allocator, rr_client, event); aws_event_loop_schedule_task_now(rr_client->loop, &task->task); } @@ -1198,7 +1221,8 @@ static void s_handle_operation_subscribe_result( if (operation->type == AWS_MRROT_STREAMING) { s_change_operation_state(operation, AWS_MRROS_SUBSCRIBED); - s_streaming_operation_emit_streaming_subscription_event(operation, ARRSSET_SUBSCRIPTION_ESTABLISHED, AWS_ERROR_SUCCESS); + s_streaming_operation_emit_streaming_subscription_event( + operation, ARRSSET_SUBSCRIPTION_ESTABLISHED, AWS_ERROR_SUCCESS); } else { s_make_mqtt_request(client, operation); } @@ -1533,7 +1557,10 @@ static void s_mqtt_rr_client_submit_operation(struct aws_task *task, void *arg, // NYI other tables // add to timeout priority queue - aws_priority_queue_push_ref(&client->operations_by_timeout, (void *)&operation, &operation->priority_queue_node); + if (operation->type == AWS_MRROT_REQUEST) { + aws_priority_queue_push_ref( + &client->operations_by_timeout, (void *)&operation, &operation->priority_queue_node); + } // enqueue aws_linked_list_push_back(&operation->client_internal_ref->operation_queue, &operation->node); diff --git a/source/request-response/subscription_manager.c b/source/request-response/subscription_manager.c index 98a7206c..d025d765 100644 --- a/source/request-response/subscription_manager.c +++ b/source/request-response/subscription_manager.c @@ -202,6 +202,7 @@ static struct aws_rr_subscription_record *s_get_subscription_record( struct aws_subscription_stats { size_t request_response_subscriptions; size_t event_stream_subscriptions; + size_t unsubscribing_event_stream_subscriptions; }; static int s_rr_subscription_count_foreach_wrap(void *context, struct aws_hash_element *elem) { @@ -210,6 +211,9 @@ static int s_rr_subscription_count_foreach_wrap(void *context, struct aws_hash_e if (subscription->type == ARRST_EVENT_STREAM) { ++stats->event_stream_subscriptions; + if (subscription->pending_action == ARRSPAT_UNSUBSCRIBING) { + ++stats->unsubscribing_event_stream_subscriptions; + } } else { ++stats->request_response_subscriptions; } @@ -227,9 +231,10 @@ static void s_get_subscription_stats( AWS_LOGF_DEBUG( AWS_LS_MQTT_REQUEST_RESPONSE, "request-response subscription manager current stats: %d event stream sub records, %d request-response sub " - "records", + "records, %d unsubscribing event stream subscriptions", (int)stats->event_stream_subscriptions, - (int)stats->request_response_subscriptions); + (int)stats->request_response_subscriptions, + (int)stats->unsubscribing_event_stream_subscriptions); } static void s_remove_listener_from_subscription_record( @@ -326,6 +331,9 @@ static const char *s_rr_subscription_event_type_to_c_str(enum aws_rr_subscriptio case ARRSET_STREAMING_SUBSCRIPTION_HALTED: return "StreamingSubscriptionHalted"; + + case ARRSET_UNSUBSCRIBE_COMPLETE: + return "UnsubscribeComplete"; } return "Unknown"; @@ -344,9 +352,10 @@ static bool s_subscription_type_matches_event_type( case ARRSET_STREAMING_SUBSCRIPTION_LOST: case ARRSET_STREAMING_SUBSCRIPTION_HALTED: return subscription_type == ARRST_EVENT_STREAM; - } - return false; + default: + return true; + } } static void s_emit_subscription_event( @@ -447,7 +456,8 @@ enum aws_acquire_subscription_result_type aws_rr_subscription_manager_acquire_su if (!space_for_subscription) { // could space eventually free up? - if (options->type == ARRST_REQUEST_RESPONSE || stats.request_response_subscriptions > 1) { + if (options->type == ARRST_REQUEST_RESPONSE || stats.request_response_subscriptions > 1 || + stats.unsubscribing_event_stream_subscriptions > 0) { AWS_LOGF_DEBUG( AWS_LS_MQTT_REQUEST_RESPONSE, "request-response subscription manager - acquire subscription for ('" PRInSTR @@ -568,7 +578,14 @@ static void s_handle_protocol_adapter_request_subscription_event( if (event->error_code == AWS_ERROR_SUCCESS) { record->status = ARRSST_NOT_SUBSCRIBED; - s_emit_subscription_event(manager, record, ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIPTION_ENDED); + + struct aws_rr_subscription_status_event event = { + .type = ARRSET_UNSUBSCRIBE_COMPLETE, + .topic_filter = record->topic_filter_cursor, + .operation_id = 0, + }; + + (*manager->config.subscription_status_callback)(&event, manager->config.userdata); } } } @@ -599,6 +616,14 @@ static void s_handle_protocol_adapter_streaming_subscription_event( if (event->error_code == AWS_ERROR_SUCCESS) { record->status = ARRSST_NOT_SUBSCRIBED; + + struct aws_rr_subscription_status_event event = { + .type = ARRSET_UNSUBSCRIBE_COMPLETE, + .topic_filter = record->topic_filter_cursor, + .operation_id = 0, + }; + + (*manager->config.subscription_status_callback)(&event, manager->config.userdata); } } } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index aef4955a..91c6e310 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -535,14 +535,15 @@ add_test_case(rrc_submit_request_operation_failure_by_shutdown) add_test_case(rrc_submit_streaming_operation_and_shutdown) add_test_case(rrc_submit_request_operation_failure_by_timeout) -add_test_case(rrc_streaming_operation_success) -#add_test_case(rrc_streaming_operation_success_overlapping) -#add_test_case(rrc_streaming_operation_clean_session_reestablish_subscription) -#add_test_case(rrc_streaming_operation_resume_session) -#add_test_case(rrc_streaming_operation_first_subscribe_times_out_resub_succeeds) -#add_test_case(rrc_streaming_operation_first_subscribe_retryable_failure_resub_succeeds) -#add_test_case(rrc_streaming_operation_subscribe_unretryable_failure) -#add_test_case(rrc_streaming_operation_failure_exceeds_subscription_budget) +add_test_case(rrc_streaming_operation_success_single) +add_test_case(rrc_streaming_operation_success_overlapping) +add_test_case(rrc_streaming_operation_success_starting_offline) +add_test_case(rrc_streaming_operation_clean_session_reestablish_subscription) +add_test_case(rrc_streaming_operation_resume_session) +add_test_case(rrc_streaming_operation_first_subscribe_times_out_resub_succeeds) +add_test_case(rrc_streaming_operation_first_subscribe_retryable_failure_resub_succeeds) +add_test_case(rrc_streaming_operation_subscribe_unretryable_failure) +add_test_case(rrc_streaming_operation_failure_exceeds_subscription_budget) #add_test_case(rrc_streaming_operation_success_delayed_by_request_operations) #add_test_case(rrc_streaming_operation_success_sandwiched_by_request_operations) diff --git a/tests/request-response/request_response_client_tests.c b/tests/request-response/request_response_client_tests.c index 04ffc16e..12408d99 100644 --- a/tests/request-response/request_response_client_tests.c +++ b/tests/request-response/request_response_client_tests.c @@ -1070,8 +1070,11 @@ static int s_rrc_submit_request_operation_failure_by_timeout_fn(struct aws_alloc AWS_TEST_CASE(rrc_submit_request_operation_failure_by_timeout, s_rrc_submit_request_operation_failure_by_timeout_fn) -static struct aws_mqtt_rr_client_operation *s_create_streaming_operation(struct aws_rr_client_test_fixture *fixture, struct aws_byte_cursor record_key, struct aws_byte_cursor topic_filter) { - struct aws_rr_client_fixture_streaming_record *record = s_rrc_fixture_add_streaming_record(&fixture, record_key); +static struct aws_mqtt_rr_client_operation *s_create_streaming_operation( + struct aws_rr_client_test_fixture *fixture, + struct aws_byte_cursor record_key, + struct aws_byte_cursor topic_filter) { + struct aws_rr_client_fixture_streaming_record *record = s_rrc_fixture_add_streaming_record(fixture, record_key); struct aws_mqtt_streaming_operation_options streaming_options = { .topic_filter = topic_filter, @@ -1081,58 +1084,626 @@ static struct aws_mqtt_rr_client_operation *s_create_streaming_operation(struct streaming_options.terminated_callback = s_rrc_fixture_streaming_operation_terminated_callback; streaming_options.user_data = record; - struct aws_mqtt_rr_client_operation *streaming_operation = - aws_mqtt_request_response_client_create_streaming_operation(fixture->rr_client, &streaming_options); - ASSERT_NOT_NULL(streaming_operation); + return aws_mqtt_request_response_client_create_streaming_operation(fixture->rr_client, &streaming_options); +} - return streaming_operation; +static int s_rrc_publish_5( + struct aws_mqtt5_client *client, + struct aws_byte_cursor topic, + struct aws_byte_cursor payload) { + struct aws_mqtt5_packet_publish_view publish_options = { + .topic = topic, + .qos = AWS_MQTT5_QOS_AT_LEAST_ONCE, + .payload = payload, + }; + + struct aws_mqtt5_publish_completion_options completion_options; + AWS_ZERO_STRUCT(completion_options); + + return aws_mqtt5_client_publish(client, &publish_options, &completion_options); +} + +static int s_rrc_publish_311( + struct aws_mqtt_client_connection *connection, + struct aws_byte_cursor topic, + struct aws_byte_cursor payload) { + return aws_mqtt_client_connection_publish( + connection, &topic, AWS_MQTT_QOS_AT_LEAST_ONCE, false, &payload, NULL, NULL); +} + +static int s_rrc_protocol_client_publish( + struct aws_rr_client_test_fixture *fixture, + struct aws_byte_cursor topic, + struct aws_byte_cursor payload) { + + if (fixture->test_protocol == RRCP_MQTT311) { + return s_rrc_publish_311(fixture->client_test_fixture.mqtt311_test_fixture.mqtt_connection, topic, payload); + } else { + return s_rrc_publish_5(fixture->client_test_fixture.mqtt5_test_fixture.client, topic, payload); + } } -static int s_rrc_publish_5(struct aws_mqtt5_client *client, struct aws_byte_cursor topic, struct aws_byte_cursor payload) { +typedef void(modify_fixture_options_fn)( + struct aws_mqtt_request_response_client_options *fixture_options, + struct mqtt5_client_test_options *client_test_options); + +static int s_init_fixture_streaming_operation_success( + struct aws_rr_client_test_fixture *fixture, + struct mqtt5_client_test_options *client_test_options, + struct aws_allocator *allocator, + modify_fixture_options_fn *config_modifier, + void *user_data) { + aws_mqtt5_client_test_init_default_options(client_test_options); + client_test_options->server_function_table.packet_handlers[AWS_MQTT5_PT_SUBSCRIBE] = + aws_mqtt5_server_send_suback_on_subscribe; + client_test_options->server_function_table.packet_handlers[AWS_MQTT5_PT_PUBLISH] = + aws_mqtt5_mock_server_handle_publish_puback_and_forward; + + struct aws_mqtt5_client_mqtt5_mock_test_fixture_options client_test_fixture_options = { + .client_options = &client_test_options->client_options, + .server_function_table = &client_test_options->server_function_table, + .mock_server_user_data = user_data, + }; + + struct aws_mqtt_request_response_client_options rr_client_options = { + .max_subscriptions = 2, + .operation_timeout_seconds = 2, + }; + + if (config_modifier != NULL) { + (*config_modifier)(&rr_client_options, client_test_options); + } + + ASSERT_SUCCESS(s_aws_rr_client_test_fixture_init_from_mqtt5( + fixture, allocator, &rr_client_options, &client_test_fixture_options, NULL)); return AWS_OP_SUCCESS; } -static int s_rrc_publish_311(struct aws_mqtt_client_connection *connection, struct aws_byte_cursor topic, struct aws_byte_cursor payload) { +/* + * Minimal success test: + * + * Create a streaming operation, verify subscription established, inject several publishes to the operation's topic, + * verify publishes received and routed through callbacks + */ +static int s_rrc_streaming_operation_success_single_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + aws_mqtt_library_init(allocator); + + struct mqtt5_client_test_options client_test_options; + struct aws_rr_client_test_fixture fixture; + ASSERT_SUCCESS(s_init_fixture_streaming_operation_success(&fixture, &client_test_options, allocator, NULL, NULL)); + + struct aws_byte_cursor record_key1 = aws_byte_cursor_from_c_str("key1"); + struct aws_byte_cursor topic_filter1 = aws_byte_cursor_from_c_str("topic/1"); + struct aws_mqtt_rr_client_operation *operation = s_create_streaming_operation(&fixture, record_key1, topic_filter1); + + s_rrc_wait_for_n_streaming_subscription_events(&fixture, record_key1, 1); + + struct aws_rr_client_fixture_streaming_record_subscription_event expected_events[] = { + { + .status = ARRSSET_SUBSCRIPTION_ESTABLISHED, + .error_code = AWS_ERROR_SUCCESS, + }, + }; + ASSERT_SUCCESS(s_rrc_verify_streaming_record_subscription_events( + &fixture, record_key1, AWS_ARRAY_SIZE(expected_events), expected_events)); + + // two publishes on the mqtt client that get reflected into our subscription topic + struct aws_byte_cursor payload1 = aws_byte_cursor_from_c_str("Payload1"); + struct aws_byte_cursor payload2 = aws_byte_cursor_from_c_str("Payload2"); + ASSERT_SUCCESS(s_rrc_protocol_client_publish(&fixture, topic_filter1, payload1)); + ASSERT_SUCCESS(s_rrc_protocol_client_publish(&fixture, topic_filter1, payload2)); + + s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 2); + + struct aws_byte_cursor expected_publishes[] = { + payload1, + payload2, + }; + ASSERT_SUCCESS(s_rrc_verify_streaming_publishes( + &fixture, record_key1, AWS_ARRAY_SIZE(expected_publishes), expected_publishes)); + + aws_mqtt_rr_client_operation_release(operation); + + s_aws_rr_client_test_fixture_clean_up(&fixture); + + aws_mqtt_library_clean_up(); - return AWS_OP_SUCCESS; } -static int s_rrc_protocol_client_publish(struct aws_rr_client_test_fixture *fixture, struct aws_byte_cursor topic, struct aws_byte_cursor payload) { +AWS_TEST_CASE(rrc_streaming_operation_success_single, s_rrc_streaming_operation_success_single_fn) - if (fixture->test_protocol == RRCP_MQTT311) { - return s_rrc_publish_5(fixture->client_test_fixture.mqtt311_test_fixture.mqtt_connection, topic, payload); - } else { - return s_rrc_publish_5(fixture->client_test_fixture.mqtt5_test_fixture.client, topic, payload); +/* + * Variant of the minimal success test where we create two operations on the same topic filter, verify they both + * get subscriptions established and publishes, then close one, send another publish and verify only the still-open + * operation received it. + */ +static int s_rrc_streaming_operation_success_overlapping_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + aws_mqtt_library_init(allocator); + + struct mqtt5_client_test_options client_test_options; + struct aws_rr_client_test_fixture fixture; + ASSERT_SUCCESS(s_init_fixture_streaming_operation_success(&fixture, &client_test_options, allocator, NULL, NULL)); + + struct aws_byte_cursor record_key1 = aws_byte_cursor_from_c_str("key1"); + struct aws_byte_cursor topic_filter1 = aws_byte_cursor_from_c_str("topic/1"); + struct aws_mqtt_rr_client_operation *operation1 = + s_create_streaming_operation(&fixture, record_key1, topic_filter1); + + struct aws_byte_cursor record_key2 = aws_byte_cursor_from_c_str("key2"); + struct aws_mqtt_rr_client_operation *operation2 = + s_create_streaming_operation(&fixture, record_key2, topic_filter1); + + s_rrc_wait_for_n_streaming_subscription_events(&fixture, record_key1, 1); + s_rrc_wait_for_n_streaming_subscription_events(&fixture, record_key2, 1); + + struct aws_rr_client_fixture_streaming_record_subscription_event expected_events[] = { + { + .status = ARRSSET_SUBSCRIPTION_ESTABLISHED, + .error_code = AWS_ERROR_SUCCESS, + }, + }; + ASSERT_SUCCESS(s_rrc_verify_streaming_record_subscription_events( + &fixture, record_key1, AWS_ARRAY_SIZE(expected_events), expected_events)); + ASSERT_SUCCESS(s_rrc_verify_streaming_record_subscription_events( + &fixture, record_key2, AWS_ARRAY_SIZE(expected_events), expected_events)); + + // two publishes on the mqtt client that get reflected into our subscription topic + struct aws_byte_cursor payload1 = aws_byte_cursor_from_c_str("Payload1"); + struct aws_byte_cursor payload2 = aws_byte_cursor_from_c_str("Payload2"); + struct aws_byte_cursor payload3 = aws_byte_cursor_from_c_str("Payload3"); + ASSERT_SUCCESS(s_rrc_protocol_client_publish(&fixture, topic_filter1, payload1)); + ASSERT_SUCCESS(s_rrc_protocol_client_publish(&fixture, topic_filter1, payload2)); + + s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 2); + s_rrc_wait_for_n_streaming_publishes(&fixture, record_key2, 2); + + struct aws_byte_cursor expected_publishes[] = { + payload1, + payload2, + payload3, + }; + ASSERT_SUCCESS(s_rrc_verify_streaming_publishes(&fixture, record_key1, 2, expected_publishes)); + ASSERT_SUCCESS(s_rrc_verify_streaming_publishes(&fixture, record_key2, 2, expected_publishes)); + + // close the first, wait for terminate + aws_mqtt_rr_client_operation_release(operation1); + s_rrc_wait_on_streaming_termination(&fixture, record_key1); + + // publish again + ASSERT_SUCCESS(s_rrc_protocol_client_publish(&fixture, topic_filter1, payload3)); + + // verify second operation got the new publish + s_rrc_wait_for_n_streaming_publishes(&fixture, record_key2, 3); + ASSERT_SUCCESS(s_rrc_verify_streaming_publishes( + &fixture, record_key2, AWS_ARRAY_SIZE(expected_publishes), expected_publishes)); + + // verify first operation did not + ASSERT_SUCCESS(s_rrc_verify_streaming_publishes(&fixture, record_key1, 2, expected_publishes)); + + aws_mqtt_rr_client_operation_release(operation2); + + s_aws_rr_client_test_fixture_clean_up(&fixture); + + aws_mqtt_library_clean_up(); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE(rrc_streaming_operation_success_overlapping, s_rrc_streaming_operation_success_overlapping_fn) + +/* + * Variant of the simple test where we start the protocol client offline. In addition to the normal verifies, we also + * verify nothing happens event wise until we start the client. + */ +static int s_rrc_streaming_operation_success_starting_offline_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + aws_mqtt_library_init(allocator); + + struct mqtt5_client_test_options client_test_options; + struct aws_rr_client_test_fixture fixture; + ASSERT_SUCCESS(s_init_fixture_streaming_operation_success(&fixture, &client_test_options, allocator, NULL, NULL)); + + /* stop and start the underlying client */ + aws_mqtt5_client_stop(fixture.client_test_fixture.mqtt5_test_fixture.client, NULL, NULL); + aws_wait_for_stopped_lifecycle_event(&fixture.client_test_fixture.mqtt5_test_fixture); + + struct aws_byte_cursor record_key1 = aws_byte_cursor_from_c_str("key1"); + struct aws_byte_cursor topic_filter1 = aws_byte_cursor_from_c_str("topic/1"); + struct aws_mqtt_rr_client_operation *operation = s_create_streaming_operation(&fixture, record_key1, topic_filter1); + + /* wait a while (longer than request timeout) to see if anything happens */ + aws_thread_current_sleep(aws_timestamp_convert(3, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL)); + + /* fails the test if any subscription events have been emitted */ + ASSERT_SUCCESS(s_rrc_verify_streaming_record_subscription_events(&fixture, record_key1, 0, NULL)); + + /* start the protocol client */ + aws_mqtt5_client_start(fixture.client_test_fixture.mqtt5_test_fixture.client); + + s_rrc_wait_for_n_streaming_subscription_events(&fixture, record_key1, 1); + + struct aws_rr_client_fixture_streaming_record_subscription_event expected_events[] = { + { + .status = ARRSSET_SUBSCRIPTION_ESTABLISHED, + .error_code = AWS_ERROR_SUCCESS, + }, + }; + ASSERT_SUCCESS(s_rrc_verify_streaming_record_subscription_events( + &fixture, record_key1, AWS_ARRAY_SIZE(expected_events), expected_events)); + + // two publishes on the mqtt client that get reflected into our subscription topic + struct aws_byte_cursor payload1 = aws_byte_cursor_from_c_str("Payload1"); + struct aws_byte_cursor payload2 = aws_byte_cursor_from_c_str("Payload2"); + ASSERT_SUCCESS(s_rrc_protocol_client_publish(&fixture, topic_filter1, payload1)); + ASSERT_SUCCESS(s_rrc_protocol_client_publish(&fixture, topic_filter1, payload2)); + + s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 2); + + struct aws_byte_cursor expected_publishes[] = { + payload1, + payload2, + }; + ASSERT_SUCCESS(s_rrc_verify_streaming_publishes( + &fixture, record_key1, AWS_ARRAY_SIZE(expected_publishes), expected_publishes)); + + aws_mqtt_rr_client_operation_release(operation); + + s_aws_rr_client_test_fixture_clean_up(&fixture); + + aws_mqtt_library_clean_up(); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE(rrc_streaming_operation_success_starting_offline, s_rrc_streaming_operation_success_starting_offline_fn) + +static void s_rrc_force_clean_session_config( + struct aws_mqtt_request_response_client_options *fixture_options, + struct mqtt5_client_test_options *client_test_options) { + client_test_options->client_options.session_behavior = AWS_MQTT5_CSBT_CLEAN; +} + +/* + * Verifies that a streaming operation recovers properly from a clean session resumption (by resubscribing), emitting + * the proper subscription events, and receiving expected publishes once the subscription is re-established. + */ +static int s_rrc_streaming_operation_clean_session_reestablish_subscription_fn( + struct aws_allocator *allocator, + void *ctx) { + (void)ctx; + + aws_mqtt_library_init(allocator); + + struct mqtt5_client_test_options client_test_options; + struct aws_rr_client_test_fixture fixture; + ASSERT_SUCCESS(s_init_fixture_streaming_operation_success( + &fixture, &client_test_options, allocator, s_rrc_force_clean_session_config, NULL)); + + struct aws_byte_cursor record_key1 = aws_byte_cursor_from_c_str("key1"); + struct aws_byte_cursor topic_filter1 = aws_byte_cursor_from_c_str("topic/1"); + struct aws_mqtt_rr_client_operation *operation = s_create_streaming_operation(&fixture, record_key1, topic_filter1); + + s_rrc_wait_for_n_streaming_subscription_events(&fixture, record_key1, 1); + + struct aws_rr_client_fixture_streaming_record_subscription_event expected_events[] = { + { + .status = ARRSSET_SUBSCRIPTION_ESTABLISHED, + .error_code = AWS_ERROR_SUCCESS, + }, + { + .status = ARRSSET_SUBSCRIPTION_LOST, + .error_code = AWS_ERROR_SUCCESS, + }, + { + .status = ARRSSET_SUBSCRIPTION_ESTABLISHED, + .error_code = AWS_ERROR_SUCCESS, + }, + }; + ASSERT_SUCCESS(s_rrc_verify_streaming_record_subscription_events(&fixture, record_key1, 1, expected_events)); + + // two publishes on the mqtt client that get reflected into our subscription topic + struct aws_byte_cursor payload1 = aws_byte_cursor_from_c_str("Payload1"); + struct aws_byte_cursor payload2 = aws_byte_cursor_from_c_str("Payload2"); + ASSERT_SUCCESS(s_rrc_protocol_client_publish(&fixture, topic_filter1, payload1)); + + s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 1); + + struct aws_byte_cursor expected_publishes[] = { + payload1, + payload2, + }; + ASSERT_SUCCESS(s_rrc_verify_streaming_publishes(&fixture, record_key1, 1, expected_publishes)); + + /* stop and start the underlying client, this will force a resubscribe since it's a clean session */ + aws_mqtt5_client_stop(fixture.client_test_fixture.mqtt5_test_fixture.client, NULL, NULL); + aws_wait_for_stopped_lifecycle_event(&fixture.client_test_fixture.mqtt5_test_fixture); + + aws_mqtt5_client_start(fixture.client_test_fixture.mqtt5_test_fixture.client); + + s_rrc_wait_for_n_streaming_subscription_events(&fixture, record_key1, 3); + ASSERT_SUCCESS(s_rrc_verify_streaming_record_subscription_events(&fixture, record_key1, 3, expected_events)); + + ASSERT_SUCCESS(s_rrc_protocol_client_publish(&fixture, topic_filter1, payload2)); + + s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 2); + ASSERT_SUCCESS(s_rrc_verify_streaming_publishes(&fixture, record_key1, 2, expected_publishes)); + + aws_mqtt_rr_client_operation_release(operation); + + s_aws_rr_client_test_fixture_clean_up(&fixture); + + aws_mqtt_library_clean_up(); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE( + rrc_streaming_operation_clean_session_reestablish_subscription, + s_rrc_streaming_operation_clean_session_reestablish_subscription_fn) + +static void s_rrc_force_resume_session_config( + struct aws_mqtt_request_response_client_options *fixture_options, + struct mqtt5_client_test_options *client_test_options) { + client_test_options->server_function_table.packet_handlers[AWS_MQTT5_PT_CONNECT] = + aws_mqtt5_mock_server_handle_connect_honor_session_unconditional; + client_test_options->client_options.session_behavior = AWS_MQTT5_CSBT_REJOIN_ALWAYS; +} + +/* + * Variant of the clean session test where instead we always resume a session. Verify we don't get subscription + * lost/established events afterwards and can still receive messages. + */ +static int s_rrc_streaming_operation_resume_session_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + aws_mqtt_library_init(allocator); + + struct mqtt5_client_test_options client_test_options; + struct aws_rr_client_test_fixture fixture; + ASSERT_SUCCESS(s_init_fixture_streaming_operation_success( + &fixture, &client_test_options, allocator, s_rrc_force_resume_session_config, NULL)); + + struct aws_byte_cursor record_key1 = aws_byte_cursor_from_c_str("key1"); + struct aws_byte_cursor topic_filter1 = aws_byte_cursor_from_c_str("topic/1"); + struct aws_mqtt_rr_client_operation *operation = s_create_streaming_operation(&fixture, record_key1, topic_filter1); + + s_rrc_wait_for_n_streaming_subscription_events(&fixture, record_key1, 1); + + struct aws_rr_client_fixture_streaming_record_subscription_event expected_events[] = { + { + .status = ARRSSET_SUBSCRIPTION_ESTABLISHED, + .error_code = AWS_ERROR_SUCCESS, + }, + }; + ASSERT_SUCCESS(s_rrc_verify_streaming_record_subscription_events(&fixture, record_key1, 1, expected_events)); + + // two publishes on the mqtt client that get reflected into our subscription topic + struct aws_byte_cursor payload1 = aws_byte_cursor_from_c_str("Payload1"); + struct aws_byte_cursor payload2 = aws_byte_cursor_from_c_str("Payload2"); + ASSERT_SUCCESS(s_rrc_protocol_client_publish(&fixture, topic_filter1, payload1)); + + s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 1); + + struct aws_byte_cursor expected_publishes[] = { + payload1, + payload2, + }; + ASSERT_SUCCESS(s_rrc_verify_streaming_publishes(&fixture, record_key1, 1, expected_publishes)); + + /* stop and start the underlying client */ + aws_mqtt5_client_stop(fixture.client_test_fixture.mqtt5_test_fixture.client, NULL, NULL); + aws_wait_for_stopped_lifecycle_event(&fixture.client_test_fixture.mqtt5_test_fixture); + + aws_mqtt5_client_start(fixture.client_test_fixture.mqtt5_test_fixture.client); + + ASSERT_SUCCESS(s_rrc_protocol_client_publish(&fixture, topic_filter1, payload2)); + + s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 2); + ASSERT_SUCCESS(s_rrc_verify_streaming_publishes(&fixture, record_key1, 2, expected_publishes)); + + /* check events after the publish has completed, that shows nothing happened subscription-wise */ + s_rrc_wait_for_n_streaming_subscription_events(&fixture, record_key1, 1); + ASSERT_SUCCESS(s_rrc_verify_streaming_record_subscription_events(&fixture, record_key1, 1, expected_events)); + + aws_mqtt_rr_client_operation_release(operation); + + s_aws_rr_client_test_fixture_clean_up(&fixture); + + aws_mqtt_library_clean_up(); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE(rrc_streaming_operation_resume_session, s_rrc_streaming_operation_resume_session_fn) + +struct rrc_subscribe_handler_context { + struct aws_rr_client_test_fixture *fixture; + + size_t subscribes_received; +}; + +static enum aws_mqtt5_suback_reason_code s_rrc_success_suback_rcs[] = { + AWS_MQTT5_SARC_GRANTED_QOS_1, +}; + +int s_handle_subscribe_with_initial_timeout( + void *packet, + struct aws_mqtt5_server_mock_connection_context *connection, + void *user_data) { + (void)packet; + + struct rrc_subscribe_handler_context *context = user_data; + + bool should_respond = false; + aws_mutex_lock(&context->fixture->lock); + should_respond = context->subscribes_received > 0; + ++context->subscribes_received; + aws_mutex_unlock(&context->fixture->lock); + + if (!should_respond) { + return AWS_OP_SUCCESS; } + + struct aws_mqtt5_packet_subscribe_view *subscribe_packet = packet; + + struct aws_mqtt5_packet_suback_view suback_view = { + .packet_id = subscribe_packet->packet_id, + .reason_code_count = 1, + .reason_codes = s_rrc_success_suback_rcs, + }; + + return aws_mqtt5_mock_server_send_packet(connection, AWS_MQTT5_PT_SUBACK, &suback_view); +} + +static void s_rrc_initial_subscribe_timeout_config( + struct aws_mqtt_request_response_client_options *fixture_options, + struct mqtt5_client_test_options *client_test_options) { + client_test_options->server_function_table.packet_handlers[AWS_MQTT5_PT_SUBSCRIBE] = + s_handle_subscribe_with_initial_timeout; } -static int s_rrc_streaming_operation_success_fn(struct aws_allocator *allocator, void *ctx) { +/* + * Variant of the basic success test where the first subscribe is ignored, causing it to timeout. Verify the + * client sends a second subscribe (which succeeds) after which everything is fine. + */ +static int s_rrc_streaming_operation_first_subscribe_times_out_resub_succeeds_fn( + struct aws_allocator *allocator, + void *ctx) { (void)ctx; aws_mqtt_library_init(allocator); struct mqtt5_client_test_options client_test_options; - aws_mqtt5_client_test_init_default_options(&client_test_options); + struct aws_rr_client_test_fixture fixture; - client_test_options.server_function_table.packet_handlers[AWS_MQTT5_PT_SUBSCRIBE] = ??; - client_test_options.server_function_table.packet_handlers[AWS_MQTT5_PT_PUBLISH] = ??; + struct rrc_subscribe_handler_context subscribe_context = { + .fixture = &fixture, + .subscribes_received = 0, + }; + ASSERT_SUCCESS(s_init_fixture_streaming_operation_success( + &fixture, &client_test_options, allocator, s_rrc_initial_subscribe_timeout_config, &subscribe_context)); - struct aws_mqtt5_client_mqtt5_mock_test_fixture_options client_test_fixture_options = { - .client_options = &client_test_options.client_options, - .server_function_table = &client_test_options.server_function_table, + struct aws_byte_cursor record_key1 = aws_byte_cursor_from_c_str("key1"); + struct aws_byte_cursor topic_filter1 = aws_byte_cursor_from_c_str("topic/1"); + struct aws_mqtt_rr_client_operation *operation = s_create_streaming_operation(&fixture, record_key1, topic_filter1); + + s_rrc_wait_for_n_streaming_subscription_events(&fixture, record_key1, 1); + + struct aws_rr_client_fixture_streaming_record_subscription_event expected_events[] = { + { + .status = ARRSSET_SUBSCRIPTION_ESTABLISHED, + .error_code = AWS_ERROR_SUCCESS, + }, }; + ASSERT_SUCCESS(s_rrc_verify_streaming_record_subscription_events( + &fixture, record_key1, AWS_ARRAY_SIZE(expected_events), expected_events)); - struct aws_mqtt_request_response_client_options rr_client_options = { - .max_subscriptions = 2, - .operation_timeout_seconds = 2, + // verify we ignored the first subscribe, triggering a second + aws_mutex_lock(&fixture.lock); + ASSERT_INT_EQUALS(2, subscribe_context.subscribes_received); + aws_mutex_unlock(&fixture.lock); + + // two publishes on the mqtt client that get reflected into our subscription topic + struct aws_byte_cursor payload1 = aws_byte_cursor_from_c_str("Payload1"); + struct aws_byte_cursor payload2 = aws_byte_cursor_from_c_str("Payload2"); + ASSERT_SUCCESS(s_rrc_protocol_client_publish(&fixture, topic_filter1, payload1)); + ASSERT_SUCCESS(s_rrc_protocol_client_publish(&fixture, topic_filter1, payload2)); + + s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 2); + + struct aws_byte_cursor expected_publishes[] = { + payload1, + payload2, }; + ASSERT_SUCCESS(s_rrc_verify_streaming_publishes( + &fixture, record_key1, AWS_ARRAY_SIZE(expected_publishes), expected_publishes)); + + aws_mqtt_rr_client_operation_release(operation); + + s_aws_rr_client_test_fixture_clean_up(&fixture); + aws_mqtt_library_clean_up(); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE( + rrc_streaming_operation_first_subscribe_times_out_resub_succeeds, + s_rrc_streaming_operation_first_subscribe_times_out_resub_succeeds_fn) + +static enum aws_mqtt5_suback_reason_code s_rrc_retryable_suback_rcs[] = { + AWS_MQTT5_SARC_UNSPECIFIED_ERROR, +}; + +int s_handle_subscribe_with_initial_retryable_failure( + void *packet, + struct aws_mqtt5_server_mock_connection_context *connection, + void *user_data) { + (void)packet; + + struct rrc_subscribe_handler_context *context = user_data; + + bool should_succeed = false; + aws_mutex_lock(&context->fixture->lock); + should_succeed = context->subscribes_received > 0; + ++context->subscribes_received; + aws_mutex_unlock(&context->fixture->lock); + + struct aws_mqtt5_packet_subscribe_view *subscribe_packet = packet; + + struct aws_mqtt5_packet_suback_view suback_view = { + .packet_id = subscribe_packet->packet_id, + .reason_code_count = 1, + }; + + if (should_succeed) { + suback_view.reason_codes = s_rrc_success_suback_rcs; + } else { + suback_view.reason_codes = s_rrc_retryable_suback_rcs; + } + + return aws_mqtt5_mock_server_send_packet(connection, AWS_MQTT5_PT_SUBACK, &suback_view); +} + +static void s_rrc_initial_subscribe_retryable_failure_config( + struct aws_mqtt_request_response_client_options *fixture_options, + struct mqtt5_client_test_options *client_test_options) { + client_test_options->server_function_table.packet_handlers[AWS_MQTT5_PT_SUBSCRIBE] = + s_handle_subscribe_with_initial_retryable_failure; +} + +/* + * Variant of the basic success test where the first subscribe triggers a retryable suback failure. Verify the + * client sends a second subscribe (which succeeds) after which everything is fine. + */ +static int s_rrc_streaming_operation_first_subscribe_retryable_failure_resub_succeeds_fn( + struct aws_allocator *allocator, + void *ctx) { + (void)ctx; + + aws_mqtt_library_init(allocator); + + struct mqtt5_client_test_options client_test_options; struct aws_rr_client_test_fixture fixture; - ASSERT_SUCCESS(s_aws_rr_client_test_fixture_init_from_mqtt5( - &fixture, allocator, &rr_client_options, &client_test_fixture_options, NULL)); + + struct rrc_subscribe_handler_context subscribe_context = { + .fixture = &fixture, + .subscribes_received = 0, + }; + ASSERT_SUCCESS(s_init_fixture_streaming_operation_success( + &fixture, + &client_test_options, + allocator, + s_rrc_initial_subscribe_retryable_failure_config, + &subscribe_context)); struct aws_byte_cursor record_key1 = aws_byte_cursor_from_c_str("key1"); struct aws_byte_cursor topic_filter1 = aws_byte_cursor_from_c_str("topic/1"); @@ -1146,13 +1717,19 @@ static int s_rrc_streaming_operation_success_fn(struct aws_allocator *allocator, .error_code = AWS_ERROR_SUCCESS, }, }; - s_rrc_verify_streaming_record_subscription_events(&fixture, record_key1, AWS_ARRAY_SIZE(expected_events), expected_events); + ASSERT_SUCCESS(s_rrc_verify_streaming_record_subscription_events( + &fixture, record_key1, AWS_ARRAY_SIZE(expected_events), expected_events)); + + // verify we ignored the first subscribe, triggering a second + aws_mutex_lock(&fixture.lock); + ASSERT_INT_EQUALS(2, subscribe_context.subscribes_received); + aws_mutex_unlock(&fixture.lock); // two publishes on the mqtt client that get reflected into our subscription topic struct aws_byte_cursor payload1 = aws_byte_cursor_from_c_str("Payload1"); - struct aws_byte_cursor payload2 = aws_byte_cursor_from_c_str("Payload1"); - s_rrc_protocol_client_publish(&fixture, topic_filter1, payload1); - s_rrc_protocol_client_publish(&fixture, topic_filter1, payload2); + struct aws_byte_cursor payload2 = aws_byte_cursor_from_c_str("Payload2"); + ASSERT_SUCCESS(s_rrc_protocol_client_publish(&fixture, topic_filter1, payload1)); + ASSERT_SUCCESS(s_rrc_protocol_client_publish(&fixture, topic_filter1, payload2)); s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 2); @@ -1160,7 +1737,8 @@ static int s_rrc_streaming_operation_success_fn(struct aws_allocator *allocator, payload1, payload2, }; - s_rrc_verify_streaming_publishes(&fixture, record_key1, AWS_ARRAY_SIZE(expected_publishes), expected_publishes); + ASSERT_SUCCESS(s_rrc_verify_streaming_publishes( + &fixture, record_key1, AWS_ARRAY_SIZE(expected_publishes), expected_publishes)); aws_mqtt_rr_client_operation_release(operation); @@ -1171,4 +1749,208 @@ static int s_rrc_streaming_operation_success_fn(struct aws_allocator *allocator, return AWS_OP_SUCCESS; } -AWS_TEST_CASE(rrc_streaming_operation_success, s_rrc_streaming_operation_success_fn) +AWS_TEST_CASE( + rrc_streaming_operation_first_subscribe_retryable_failure_resub_succeeds, + s_rrc_streaming_operation_first_subscribe_retryable_failure_resub_succeeds_fn) + +static enum aws_mqtt5_suback_reason_code s_rrc_unretryable_suback_rcs[] = { + AWS_MQTT5_SARC_NOT_AUTHORIZED, +}; + +int s_handle_subscribe_with_terminal_failure( + void *packet, + struct aws_mqtt5_server_mock_connection_context *connection, + void *user_data) { + (void)packet; + + struct rrc_subscribe_handler_context *context = user_data; + + aws_mutex_lock(&context->fixture->lock); + ++context->subscribes_received; + aws_mutex_unlock(&context->fixture->lock); + + struct aws_mqtt5_packet_subscribe_view *subscribe_packet = packet; + + struct aws_mqtt5_packet_suback_view suback_view = { + .packet_id = subscribe_packet->packet_id, + .reason_code_count = 1, + .reason_codes = s_rrc_unretryable_suback_rcs, + }; + + return aws_mqtt5_mock_server_send_packet(connection, AWS_MQTT5_PT_SUBACK, &suback_view); +} + +static void s_rrc_subscribe_terminal_failure_config( + struct aws_mqtt_request_response_client_options *fixture_options, + struct mqtt5_client_test_options *client_test_options) { + client_test_options->server_function_table.packet_handlers[AWS_MQTT5_PT_SUBSCRIBE] = + s_handle_subscribe_with_terminal_failure; +} + +/* + * Failure variant where the subscribe triggers a non-retryable suback failure. Verify the + * operation gets halted. + */ +static int s_rrc_streaming_operation_subscribe_unretryable_failure_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + aws_mqtt_library_init(allocator); + + struct mqtt5_client_test_options client_test_options; + struct aws_rr_client_test_fixture fixture; + + struct rrc_subscribe_handler_context subscribe_context = { + .fixture = &fixture, + .subscribes_received = 0, + }; + ASSERT_SUCCESS(s_init_fixture_streaming_operation_success( + &fixture, &client_test_options, allocator, s_rrc_subscribe_terminal_failure_config, &subscribe_context)); + + struct aws_byte_cursor record_key1 = aws_byte_cursor_from_c_str("key1"); + struct aws_byte_cursor topic_filter1 = aws_byte_cursor_from_c_str("topic/1"); + struct aws_mqtt_rr_client_operation *operation = s_create_streaming_operation(&fixture, record_key1, topic_filter1); + + // wait an extra amount just for fun + aws_thread_current_sleep(aws_timestamp_convert(2, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL)); + + s_rrc_wait_for_n_streaming_subscription_events(&fixture, record_key1, 1); + + struct aws_rr_client_fixture_streaming_record_subscription_event expected_events[] = { + { + .status = ARRSSET_SUBSCRIPTION_HALTED, + .error_code = AWS_ERROR_MQTT_REQUEST_RESPONSE_SUBSCRIBE_FAILURE, + }, + }; + ASSERT_SUCCESS(s_rrc_verify_streaming_record_subscription_events( + &fixture, record_key1, AWS_ARRAY_SIZE(expected_events), expected_events)); + + aws_mutex_lock(&fixture.lock); + ASSERT_INT_EQUALS(1, subscribe_context.subscribes_received); + aws_mutex_unlock(&fixture.lock); + + aws_mqtt_rr_client_operation_release(operation); + + s_aws_rr_client_test_fixture_clean_up(&fixture); + + aws_mqtt_library_clean_up(); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE( + rrc_streaming_operation_subscribe_unretryable_failure, + s_rrc_streaming_operation_subscribe_unretryable_failure_fn) + +static void s_rrc_unsubscribe_success_config( + struct aws_mqtt_request_response_client_options *fixture_options, + struct mqtt5_client_test_options *client_test_options) { + client_test_options->server_function_table.packet_handlers[AWS_MQTT5_PT_UNSUBSCRIBE] = + aws_mqtt5_mock_server_handle_unsubscribe_unsuback_success; +} + +/* + * Multi-operation variant where we exceed the streaming subscription budget, release everything and then verify + * we can successfully establish a new streaming operation after everything cleans up. + */ +static int s_rrc_streaming_operation_failure_exceeds_subscription_budget_fn( + struct aws_allocator *allocator, + void *ctx) { + (void)ctx; + + aws_mqtt_library_init(allocator); + + struct mqtt5_client_test_options client_test_options; + struct aws_rr_client_test_fixture fixture; + ASSERT_SUCCESS(s_init_fixture_streaming_operation_success( + &fixture, &client_test_options, allocator, s_rrc_unsubscribe_success_config, NULL)); + + struct aws_byte_cursor record_key1 = aws_byte_cursor_from_c_str("key1"); + struct aws_byte_cursor topic_filter1 = aws_byte_cursor_from_c_str("topic/1"); + struct aws_mqtt_rr_client_operation *operation1 = + s_create_streaming_operation(&fixture, record_key1, topic_filter1); + + struct aws_byte_cursor record_key2 = aws_byte_cursor_from_c_str("key2"); + struct aws_byte_cursor topic_filter2 = aws_byte_cursor_from_c_str("topic/2"); + struct aws_mqtt_rr_client_operation *operation2 = + s_create_streaming_operation(&fixture, record_key2, topic_filter2); + + s_rrc_wait_for_n_streaming_subscription_events(&fixture, record_key1, 1); + s_rrc_wait_for_n_streaming_subscription_events(&fixture, record_key2, 1); + + struct aws_rr_client_fixture_streaming_record_subscription_event expected_success_events[] = { + { + .status = ARRSSET_SUBSCRIPTION_ESTABLISHED, + .error_code = AWS_ERROR_SUCCESS, + }, + }; + ASSERT_SUCCESS(s_rrc_verify_streaming_record_subscription_events( + &fixture, record_key1, AWS_ARRAY_SIZE(expected_success_events), expected_success_events)); + + struct aws_rr_client_fixture_streaming_record_subscription_event expected_failure_events[] = { + { + .status = ARRSSET_SUBSCRIPTION_HALTED, + .error_code = AWS_ERROR_MQTT_REQUEST_RESPONSE_NO_SUBSCRIPTION_CAPACITY, + }, + }; + ASSERT_SUCCESS(s_rrc_verify_streaming_record_subscription_events( + &fixture, record_key2, AWS_ARRAY_SIZE(expected_failure_events), expected_failure_events)); + + // two publishes on the mqtt client that get reflected into our subscription topic1 + struct aws_byte_cursor payload1 = aws_byte_cursor_from_c_str("Payload1"); + struct aws_byte_cursor payload2 = aws_byte_cursor_from_c_str("Payload2"); + ASSERT_SUCCESS(s_rrc_protocol_client_publish(&fixture, topic_filter1, payload1)); + ASSERT_SUCCESS(s_rrc_protocol_client_publish(&fixture, topic_filter1, payload2)); + + s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 2); + + struct aws_byte_cursor expected_publishes[] = { + payload1, + payload2, + }; + ASSERT_SUCCESS(s_rrc_verify_streaming_publishes(&fixture, record_key1, 2, expected_publishes)); + + // close the first, wait for terminate + aws_mqtt_rr_client_operation_release(operation1); + s_rrc_wait_on_streaming_termination(&fixture, record_key1); + + // close the second, wait for terminate + aws_mqtt_rr_client_operation_release(operation2); + s_rrc_wait_on_streaming_termination(&fixture, record_key2); + + // let the unsubscribe resolve + aws_thread_current_sleep(aws_timestamp_convert(1, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL)); + + // make a third using topic filter 2 + struct aws_byte_cursor record_key3 = aws_byte_cursor_from_c_str("key3"); + struct aws_mqtt_rr_client_operation *operation3 = + s_create_streaming_operation(&fixture, record_key3, topic_filter2); + + s_rrc_wait_for_n_streaming_subscription_events(&fixture, record_key3, 1); + ASSERT_SUCCESS(s_rrc_verify_streaming_record_subscription_events( + &fixture, record_key3, AWS_ARRAY_SIZE(expected_success_events), expected_success_events)); + + // publish again + struct aws_byte_cursor payload3 = aws_byte_cursor_from_c_str("payload3"); + ASSERT_SUCCESS(s_rrc_protocol_client_publish(&fixture, topic_filter2, payload3)); + + // verify third operation got the new publish + s_rrc_wait_for_n_streaming_publishes(&fixture, record_key3, 1); + + struct aws_byte_cursor third_expected_publishes[] = { + payload3, + }; + ASSERT_SUCCESS(s_rrc_verify_streaming_publishes( + &fixture, record_key3, AWS_ARRAY_SIZE(third_expected_publishes), third_expected_publishes)); + + aws_mqtt_rr_client_operation_release(operation3); + + s_aws_rr_client_test_fixture_clean_up(&fixture); + + aws_mqtt_library_clean_up(); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE( + rrc_streaming_operation_failure_exceeds_subscription_budget, + s_rrc_streaming_operation_failure_exceeds_subscription_budget_fn) diff --git a/tests/v5/mqtt5_client_tests.c b/tests/v5/mqtt5_client_tests.c index b876adda..690bf711 100644 --- a/tests/v5/mqtt5_client_tests.c +++ b/tests/v5/mqtt5_client_tests.c @@ -5353,7 +5353,10 @@ static int s_aws_mqtt5_server_send_aliased_publish_sequence( struct aws_mqtt5_packet_subscribe_view *subscribe_view = packet; struct aws_mqtt5_packet_suback_view suback_view = { - .packet_id = subscribe_view->packet_id, .reason_code_count = 1, .reason_codes = s_alias_reason_codes}; + .packet_id = subscribe_view->packet_id, + .reason_code_count = 1, + .reason_codes = s_alias_reason_codes, + }; // just to be thorough, send a suback if (aws_mqtt5_mock_server_send_packet(connection, AWS_MQTT5_PT_SUBACK, &suback_view)) { @@ -5548,7 +5551,10 @@ static int s_aws_mqtt5_server_send_aliased_publish_failure( struct aws_mqtt5_packet_subscribe_view *subscribe_view = packet; struct aws_mqtt5_packet_suback_view suback_view = { - .packet_id = subscribe_view->packet_id, .reason_code_count = 1, .reason_codes = s_alias_reason_codes}; + .packet_id = subscribe_view->packet_id, + .reason_code_count = 1, + .reason_codes = s_alias_reason_codes, + }; // just to be thorough, send a suback if (aws_mqtt5_mock_server_send_packet(connection, AWS_MQTT5_PT_SUBACK, &suback_view)) {