Skip to content

Commit

Permalink
Merge pull request #1045 from bodo-hugo-barwich/no-47_wrong-index
Browse files Browse the repository at this point in the history
ElasticSearch Availabilty Check and Mapping Self-Check
  • Loading branch information
mickeyn authored Feb 19, 2022
2 parents 50332b1 + 2ea1921 commit 61caf58
Show file tree
Hide file tree
Showing 2 changed files with 464 additions and 18 deletions.
249 changes: 245 additions & 4 deletions lib/MetaCPAN/Role/Script.pm
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use ElasticSearchX::Model::Document::Types qw(:all);
use Git::Helpers qw( checkout_root );
use Log::Contextual qw( :log :dlog );
use MetaCPAN::Model ();
use MetaCPAN::Types::TypeTiny qw( Bool Int Path Str );
use MetaCPAN::Types::TypeTiny qw( Bool HashRef Int Path Str );
use Mojo::Server ();
use Term::ANSIColor qw( colored );
use IO::Interactive qw( is_interactive );
Expand Down Expand Up @@ -35,6 +35,22 @@ has die_on_error => (
documentation => 'Die on errors instead of simply logging',
);

has exit_code => (
isa => Int,
is => 'rw',
default => 0,
documentation => 'Exit Code to be returned on termination',
);

has arg_await_timeout => (
init_arg => 'await',
is => 'ro',
isa => Int,
default => 15,
documentation =>
'seconds before connection is considered failed with timeout',
);

has ua => (
is => 'ro',
lazy => 1,
Expand Down Expand Up @@ -71,6 +87,27 @@ has index => (
'Index to use, defaults to "cpan" (when used: also export ES_SCRIPT_INDEX)',
);

has cluster_info => (
isa => HashRef,
traits => ['Hash'],
is => 'rw',
default => sub { {} },
);

has indices_info => (
isa => HashRef,
traits => ['Hash'],
is => 'rw',
default => sub { {} },
);

has aliases_info => (
isa => HashRef,
traits => ['Hash'],
is => 'rw',
default => sub { {} },
);

has port => (
isa => Int,
is => 'ro',
Expand Down Expand Up @@ -123,13 +160,27 @@ sub BUILDARGS {
}

sub handle_error {
my ( $self, $error ) = @_;
my ( $self, $error, $die_always ) = @_;

# Die if configured (for the test suite).
$die_always = $self->die_on_error unless defined $die_always;

# Always log.
log_fatal {$error};

# Die if configured (for the test suite).
Carp::croak $error if $self->die_on_error;
$! = $self->exit_code if ( $self->exit_code != 0 );

Carp::croak $error if $die_always;
}

sub print_error {
my ( $self, $error ) = @_;

# Always log.
log_error {$error};

# Display Error in red
print colored( ['bold red'], "*** ERROR ***: $error" ), "\n";
}

sub index {
Expand Down Expand Up @@ -195,6 +246,122 @@ before run => sub {
#Dlog_debug {"Connected to $_"} $self->remote;
};

sub _get_indices_info {
my ( $self, $irefresh ) = @_;

if ( $irefresh || scalar( keys %{ $self->indices_info } ) == 0 ) {
my $sinfo_rs = $self->es->cat->indices( h => [ 'index', 'health' ] );
my $sindices_parsing = qr/^([^[:space:]]+) +([^[:space:]]+)/m;

$self->indices_info( {} );

while ( $sinfo_rs =~ /$sindices_parsing/g ) {
$self->indices_info->{$1}
= { 'index_name' => $1, 'health' => $2 };
}
}
}

sub _get_aliases_info {
my ( $self, $irefresh ) = @_;

if ( $irefresh || scalar( keys %{ $self->aliases_info } ) == 0 ) {
my $sinfo_rs = $self->es->cat->aliases( h => [ 'alias', 'index' ] );
my $saliases_parsing = qr/^([^[:space:]]+) +([^[:space:]]+)/m;

$self->aliases_info( {} );

while ( $sinfo_rs =~ /$saliases_parsing/g ) {
$self->aliases_info->{$1} = { 'alias_name' => $1, 'index' => $2 };
}
}
}

sub check_health {
my ( $self, $irefresh ) = @_;
my $ihealth = 0;

$irefresh = 0 unless ( defined $irefresh );

$ihealth = $self->await;

if ($ihealth) {
$self->_get_indices_info($irefresh);

foreach ( keys %{ $self->indices_info } ) {
$ihealth = 0
if ( $self->indices_info->{$_}->{'health'} eq 'red' );
}
}

if ($ihealth) {
$self->_get_aliases_info($irefresh);

$ihealth = 0 if ( scalar( keys %{ $self->aliases_info } ) == 0 );
}

return $ihealth;
}

sub await {
my $self = $_[0];
my $iready = 0;

if ( scalar( keys %{ $self->cluster_info } ) == 0 ) {
my $es = $self->es;
my $iseconds = 0;

log_info {"Awaiting Elasticsearch ..."};

do {
eval {
$iready = $es->ping;

if ($iready) {
log_info {
"Awaiting $iseconds / "
. $self->arg_await_timeout
. " : ready"
};

$self->cluster_info( \%{ $es->info } );
}
};

if ($@) {
if ( $iseconds < $self->arg_await_timeout ) {
log_info {
"Awaiting $iseconds / "
. $self->arg_await_timeout
. " : unavailable - sleeping ..."
};

sleep(1);

$iseconds++;
}
else {
log_error {
"Awaiting $iseconds / "
. $self->arg_await_timeout
. " : unavailable - timeout!"
};

#Set System Error: 112 - EHOSTDOWN - Host is down
$self->exit_code(112);
$self->handle_error( $@, 1 );
}
}
} while ( !$iready && $iseconds <= $self->arg_await_timeout );
}
else {
#ElasticSearch Service is available
$iready = 1;
}

return $iready;
}

sub are_you_sure {
my ( $self, $msg ) = @_;

Expand All @@ -216,8 +383,82 @@ __END__
=pod
=head1 NAME
MetaCPAN::Role::Script - Base Role which is used by many command line applications
=head1 SYNOPSIS
Roles which should be available to all modules.
=head1 OPTIONS
This Role makes the command line application accept the following options
=over 4
=item Option C<--await 15>
This option will set the I<ElasticSearch Availability Check Timeout>.
After C<await> seconds the Application will fail with an Exception and the Exit Code [112]
(C<112 - EHOSTDOWN - Host is down>) will be returned
bin/metacpan <script_name> --await 15
See L<Method C<await()>>
=back
=head1 METHODS
This Role provides the following methods
=over 4
=item C<await()>
This method uses the
L<C<Search::Elasticsearch::Client::2_0::Direct::ping()>|https://metacpan.org/pod/Search::Elasticsearch::Client::2_0::Direct#ping()>
method to verify the service availabilty and wait for C<arg_await_timeout> seconds.
When the service does not become available within C<arg_await_timeout> seconds it re-throws the
Exception from the C<Search::Elasticsearch::Client> and sets C< $! > to C< 112 >.
The C<Search::Elasticsearch::Client> generates a C<"Search::Elasticsearch::Error::NoNodes"> Exception.
When the service is available it will populate the C<cluster_info> C<HASH> structure with the basic information
about the cluster.
See L<Option C<--await 15>>
See L<Method C<check_health()>>
=item C<check_health( [ refresh ] )>
This method uses the
L<C<Search::Elasticsearch::Client::2_0::Direct::cat()>|https://metacpan.org/pod/Search::Elasticsearch::Client::2_0::Direct#cat()>
method to collect basic data about the cluster structure as the general information,
the health state of the indices and the created aliases.
This information is stored in C<cluster_info>, C<indices_info> and C<aliases_info> as C<HASH> structures.
If the parameter C<refresh> is set to C< 1 > the structures C<indices_info> and C<aliases_info> will always
be updated.
If the C<cluster_info> structure is empty it calls first the C<await()> method.
If the service is unavailable the C<await()> method will produce an exception and the structures will be empty
The method returns C< 1 > when the C<cluster_info> is populated, none of the indices in C<indices_info> has
the Health State I<red> and at least one alias is created in C<aliases_info>
otherwise the method returns C< 0 >
=item C<are_you_sure()>
Requests the user to confirm the operation with "I< YES >"
=item C<handle_error( error_message[, die_always ] )>
Logs the string C<error_message> with the log function as fatal error.
If C<exit_code> is not equel C< 0 > sets its value in C< $! >.
If the option C<--die_on_error> is enabled it throws an Exception with C<error_message>.
If the parameter C<die_always> is set it overrides the option C<--die_on_error>.
=item C<print_error( error_message )>
Logs the string C<error_message> with the log function and displays it in red.
But it does not end the application.
=back
=cut
Loading

0 comments on commit 61caf58

Please sign in to comment.