From 3864091ac976abf0e5c8140472096fae79ef235d Mon Sep 17 00:00:00 2001 From: Issac Goldstand Date: Sun, 14 Oct 2012 15:39:09 +0200 Subject: [PATCH] Adds support for WORK_DATA packet --- lib/Gearman/Objects.pm | 1 + lib/Gearman/Task.pm | 10 +++++++++- lib/Gearman/Taskset.pm | 18 ++++++++++++++++++ lib/Gearman/Util.pm | 1 + 4 files changed, 29 insertions(+), 1 deletion(-) diff --git a/lib/Gearman/Objects.pm b/lib/Gearman/Objects.pm index d004fed..a19c1a1 100644 --- a/lib/Gearman/Objects.pm +++ b/lib/Gearman/Objects.pm @@ -48,6 +48,7 @@ use fields ( 'on_exception', 'on_retry', 'on_status', + 'on_data', 'on_post_hooks', # used internally, when other hooks are done running, prior to cleanup 'retry_count', 'timeout', diff --git a/lib/Gearman/Task.pm b/lib/Gearman/Task.pm index 86ec48b..c58a73d 100644 --- a/lib/Gearman/Task.pm +++ b/lib/Gearman/Task.pm @@ -206,11 +206,19 @@ sub complete { sub status { my Gearman::Task $task = shift; return if $task->{is_finished}; - return unless $task->{on_status}; + my ($nu, $de) = @_; $task->{on_status}->($nu, $de); } +sub data { + my Gearman::Task $task = shift; + return if $task->{is_finished}; + my $result_ref = shift; + + $task->{on_data}->($result_ref) if $task->{on_data}; +} + # getter/setter for the fully-qualified handle of form "IP:port//shandle" where # shandle is an opaque handle specific to the job server running on IP:port sub handle { diff --git a/lib/Gearman/Taskset.pm b/lib/Gearman/Taskset.pm index e7bdd1b..eb6edc1 100644 --- a/lib/Gearman/Taskset.pm +++ b/lib/Gearman/Taskset.pm @@ -169,6 +169,7 @@ sub wait { # -- on_complete # -- on_fail # -- on_status +# -- on_data # -- retry_count # -- fail_after_idle # -- high_priority @@ -322,6 +323,23 @@ sub _process_packet { return 1; } + if ($res->{type} eq "work_data") { + ${ $res->{'blobref'} } =~ s/^(.+?)\0// + or die "Bogus work_data from server"; + my $shandle = $1; + + my $task_list = $ts->{waiting}{$shandle} or + die "Uhhhh: got work_data for unknown handle: $shandle\n"; + + my Gearman::Task $task = $task_list->[0] or + die "Uhhhh: task_list is empty on work_data for handle $shandle\n"; + + $task->data($res->{'blobref'}); + + return 1; + } + + if ($res->{type} eq "work_exception") { ${ $res->{'blobref'} } =~ s/^(.+?)\0// or die "Bogus work_exception from server"; diff --git a/lib/Gearman/Util.pm b/lib/Gearman/Util.pm index 5cee850..4a0051c 100644 --- a/lib/Gearman/Util.pm +++ b/lib/Gearman/Util.pm @@ -51,6 +51,7 @@ our %cmd = ( # to one jobserver, so no polls/grabs will take place, and server is free # to push "job_assign" packets back down. 24 => [ 'I', "all_yours" ], # W->J --- + 28 => [ 'IO', "work_data"], # W->J/C: HANDLE[0]RES ); our %num; # name -> num