Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ElasticSearch Availabilty Check and Mapping Self-Check #1045

Merged
merged 3 commits into from
Feb 19, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 237 additions & 4 deletions lib/MetaCPAN/Role/Script.pm
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use ElasticSearchX::Model::Document::Types qw(:all);
use Git::Helpers qw( checkout_root );
use Log::Contextual qw( :log :dlog );
use MetaCPAN::Model ();
use MetaCPAN::Types::TypeTiny qw( Bool Int Path Str );
use MetaCPAN::Types::TypeTiny qw( Bool HashRef Int Path Str );
use Mojo::Server ();
use Term::ANSIColor qw( colored );
use IO::Interactive qw( is_interactive );
Expand Down Expand Up @@ -35,6 +35,22 @@ has die_on_error => (
documentation => 'Die on errors instead of simply logging',
);

has exit_code => (
isa => Int,
is => 'rw',
default => 0,
documentation => 'Exit Code to be returned on termination',
);

has arg_await_timeout => (
init_arg => 'await',
is => 'ro',
isa => Int,
default => 15,
documentation =>
'seconds before connection is considered failed with timeout',
);

has ua => (
is => 'ro',
lazy => 1,
Expand Down Expand Up @@ -71,6 +87,27 @@ has index => (
'Index to use, defaults to "cpan" (when used: also export ES_SCRIPT_INDEX)',
);

has cluster_info => (
isa => HashRef,
traits => ['Hash'],
is => 'rw',
default => sub { {} },
);

has indices_info => (
isa => HashRef,
traits => ['Hash'],
is => 'rw',
default => sub { {} },
);

has aliases_info => (
isa => HashRef,
traits => ['Hash'],
is => 'rw',
default => sub { {} },
);

has port => (
isa => Int,
is => 'ro',
Expand Down Expand Up @@ -123,13 +160,27 @@ sub BUILDARGS {
}

sub handle_error {
my ( $self, $error ) = @_;
my ( $self, $error, $die_always ) = @_;

# Die if configured (for the test suite).
$die_always = $self->die_on_error unless defined $die_always;

# Always log.
log_fatal {$error};

# Die if configured (for the test suite).
Carp::croak $error if $self->die_on_error;
$! = $self->exit_code if ( $self->exit_code != 0 );

Carp::croak $error if $die_always;
}

sub print_error {
my ( $self, $error ) = @_;

# Always log.
log_error {$error};

# Display Error in red
print colored( ['bold red'], "*** ERROR ***: $error" ), "\n";
}

sub index {
Expand Down Expand Up @@ -195,6 +246,114 @@ before run => sub {
#Dlog_debug {"Connected to $_"} $self->remote;
};

sub check_health {
my ( $self, $irefresh ) = @_;
my $ihealth = 0;

$irefresh = 0 unless ( defined $irefresh );

$ihealth = $self->await;

if ($ihealth) {
if ( $irefresh || scalar( keys %{ $self->indices_info } ) == 0 ) {
my $sinfo_rs
= $self->es->cat->indices( h => [ 'index', 'health' ] );

$self->indices_info( {} );

while ( $sinfo_rs =~ /^([^[:space:]]+) +([^[:space:]]+)/gm ) {
bodo-hugo-barwich marked this conversation as resolved.
Show resolved Hide resolved
$self->indices_info->{$1}
= { 'index_name' => $1, 'health' => $2 };

$ihealth = 0 if ( $2 eq 'red' );
}
}
else {
foreach ( keys %{ $self->indices_info } ) {
$ihealth = 0
if ( $self->indices_info->{$_}->{'health'} eq 'red' );
}
}
}

if ($ihealth) {
if ( $irefresh || scalar( keys %{ $self->aliases_info } ) == 0 ) {
my $sinfo_rs
= $self->es->cat->aliases( h => [ 'alias', 'index' ] );

$self->aliases_info( {} );

while ( $sinfo_rs =~ /^([^[:space:]]+) +([^[:space:]]+)/gm ) {
$self->aliases_info->{$1}
= { 'alias_name' => $1, 'index' => $2 };
}
}

$ihealth = 0 if ( scalar( keys %{ $self->aliases_info } ) == 0 );
}

return $ihealth;
}

sub await {
my $self = $_[0];
my $iready = 0;

if ( scalar( keys %{ $self->cluster_info } ) == 0 ) {
my $es = $self->es;
my $iseconds = 0;

log_info {"Awaiting Elasticsearch ..."};

do {
eval {
$iready = $es->ping;

if ($iready) {
log_info {
"Awaiting $iseconds / "
. $self->arg_await_timeout
. " : ready"
};

$self->cluster_info( \%{ $es->info } );
}
};

if ($@) {
if ( $iseconds < $self->arg_await_timeout ) {
log_info {
"Awaiting $iseconds / "
. $self->arg_await_timeout
. " : unavailable - sleeping ..."
};

sleep(1);

$iseconds++;
}
else {
log_error {
"Awaiting $iseconds / "
. $self->arg_await_timeout
. " : unavailable - timeout!"
};

#Set System Error: 112 - EHOSTDOWN - Host is down
$self->exit_code(112);
$self->handle_error( $@, 1 );
}
}
} while ( !$iready && $iseconds <= $self->arg_await_timeout );
}
else {
#ElasticSearch Service is available
$iready = 1;
}

return $iready;
}

sub are_you_sure {
my ( $self, $msg ) = @_;

Expand All @@ -216,8 +375,82 @@ __END__

=pod

=head1 NAME

MetaCPAN::Role::Script - Base Role which is used by many command line applications

=head1 SYNOPSIS

Roles which should be available to all modules.

=head1 OPTIONS

This Role makes the command line application accept the following options

=over 4

=item Option C<--await 15>

This option will set the I<ElasticSearch Availability Check Timeout>.
After C<await> seconds the Application will fail with an Exception and the Exit Code [112]
(C<112 - EHOSTDOWN - Host is down>) will be returned

bin/metacpan <script_name> --await 15

See L<Method C<await()>>

=back

=head1 METHODS

This Role provides the following methods

=over 4

=item C<await()>

This method uses the
L<C<Search::Elasticsearch::Client::2_0::Direct::ping()>|https://metacpan.org/pod/Search::Elasticsearch::Client::2_0::Direct#ping()>
method to verify the service availabilty and wait for C<arg_await_timeout> seconds.
When the service does not become available within C<arg_await_timeout> seconds it re-throws the
Exception from the C<Search::Elasticsearch::Client> and sets C< $! > to C< 112 >.
The C<Search::Elasticsearch::Client> generates a C<"Search::Elasticsearch::Error::NoNodes"> Exception.
When the service is available it will populate the C<cluster_info> C<HASH> structure with the basic information
about the cluster.
See L<Option C<--await 15>>
See L<Method C<check_health()>>

=item C<check_health( [ refresh ] )>

This method uses the
L<C<Search::Elasticsearch::Client::2_0::Direct::cat()>|https://metacpan.org/pod/Search::Elasticsearch::Client::2_0::Direct#cat()>
method to collect basic data about the cluster structure as the general information,
the health state of the indices and the created aliases.
This information is stored in C<cluster_info>, C<indices_info> and C<aliases_info> as C<HASH> structures.
If the parameter C<refresh> is set to C< 1 > the structures C<indices_info> and C<aliases_info> will always
be updated.
If the C<cluster_info> structure is empty it calls first the C<await()> method.
If the service is unavailable the C<await()> method will produce an exception and the structures will be empty
The method returns C< 1 > when the C<cluster_info> is populated, none of the indices in C<indices_info> has
the Health State I<red> and at least one alias is created in C<aliases_info>
otherwise the method returns C< 0 >

=item C<are_you_sure()>

Requests the user to confirm the operation with "I< YES >"

=item C<handle_error( error_message[, die_always ] )>

Logs the string C<error_message> with the log function as fatal error.
If C<exit_code> is not equel C< 0 > sets its value in C< $! >.
If the option C<--die_on_error> is enabled it throws an Exception with C<error_message>.
If the parameter C<die_always> is set it overrides the option C<--die_on_error>.

=item C<print_error( error_message )>

Logs the string C<error_message> with the log function and displays it in red.
But it does not end the application.

=back

=cut
Loading