diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..47667cc --- /dev/null +++ b/.travis.yml @@ -0,0 +1,36 @@ +language: perl +perl: + - "5.24" + - "5.22" + - "5.20" + - "5.18" + - "5.16" + - "5.14" + - "5.12" + - "5.10" + +sudo: false + +before_install: + - git clone git://github.com/travis-perl/helpers ~/travis-perl-helpers + - source ~/travis-perl-helpers/init + - build-perl + - perl -V + - build-dist + - cd $BUILD_DIR + +install: + - cpanm --quiet --notest Devel::Cover::Report::Coveralls + - cpanm --quiet --notest --installdeps . + +script: + - perl Makefile.PL + - make + - PERL5OPT=-MDevel::Cover=-coverage,statement,branch,condition,path,subroutine prove -b -r -s t + +after_success: + - cover -test -report coveralls + +branches: + only: + - master diff --git a/CHANGES b/CHANGES index 0221b8e..6eac657 100644 --- a/CHANGES +++ b/CHANGES @@ -1,8 +1,24 @@ +1.14_001 2017-03-14 + * no more support for text command shutdown + * more tests + +1.13.002 2016-07-11 + * pod links to gearmand repaired + +1.13.001 2016-07-11 + * Feature Request: #115368 for Gearman-Server: Allow bind to specific IP address + * Fix bug #115458 Distributions contain no tests. + Actually only use ok and version tests + * Fix bug #89033 typo fixes + * Fix bug #107045 [PATCH] fix pod whatis + * Fix bug #115350 Uses old port by default (7003) should use 4730 + * pod usage + 1.12 2014-12-14 * Add HACKING file - * Fix memory leak when clients disconnect (sleeper list isn't pruned). + * Fix bug #70728 memory leak when clients disconnect (sleeper list isn't pruned). Fixes CPAN RT 70728 (Marsh Yamazaki) 1.11 2010-01-17 diff --git a/HACKING b/HACKING deleted file mode 100644 index d1f7724..0000000 --- a/HACKING +++ /dev/null @@ -1,3 +0,0 @@ -http://contributing.appspot.com/gearman - -Please submit patches to the mailing list diff --git a/MANIFEST b/MANIFEST index 499079b..6faf67a 100644 --- a/MANIFEST +++ b/MANIFEST @@ -1,9 +1,17 @@ CHANGES -gearmand +MANIFEST This list of files +MANIFEST.SKIP +Makefile.PL +README.md +bin/gearmand lib/Gearman/Server.pm lib/Gearman/Server/Client.pm lib/Gearman/Server/Job.pm lib/Gearman/Server/Listener.pm +t/00-use.t +t/01-gearman-server.t +t/02-gearman-server-client.t +t/03-maxqueue.t Makefile.PL MANIFEST This list of files MANIFEST.SKIP diff --git a/Makefile.PL b/Makefile.PL index d8ce29d..0cd7ef8 100644 --- a/Makefile.PL +++ b/Makefile.PL @@ -1,14 +1,44 @@ use ExtUtils::MakeMaker; + # See lib/ExtUtils/MakeMaker.pm for details of how to influence # the contents of the Makefile that is written. WriteMakefile( - NAME => 'Gearman::Server', - VERSION_FROM => 'lib/Gearman/Server.pm', - ABSTRACT_FROM => 'lib/Gearman/Server.pm', - EXE_FILES => ['gearmand'], - PREREQ_PM => { - 'Gearman::Util' => 0, - 'Danga::Socket' => 1.52, - }, - AUTHOR => 'Brad Fitzpatrick (brad@danga.com), Brad Whitaker (whitaker@danga.com)', - ); + NAME => 'Gearman::Server', + VERSION_FROM => 'lib/Gearman/Server.pm', + ABSTRACT_FROM => 'lib/Gearman/Server.pm', + EXE_FILES => ['bin/gearmand'], + BUILD_REQUIRES => { + "File::Spec" => 0, + "FindBin" => 0, + "IO::Socket::INET" => 0, + "Net::EmptyPort" => 0, + "Proc::Guard" => 0, + "Socket" => 0, + "Sys::Hostname" => 0, + "Test::Exception" => 0, + "Test::More" => 0, + "Test::Script" => 1.12, + "Test::TCP" => 0, + "version" => 0, + }, + PREREQ_PM => { + "Danga::Socket" => 1.52, + "Gearman::Util" => 0, + "IO::Socket::INET" => 0, + "Sys::Hostname" => 0, + "version" => 0, + }, + AUTHOR => + 'Brad Fitzpatrick (brad@danga.com), Brad Whitaker (whitaker@danga.com)', + META_MERGE => { + 'meta-spec' => { version => 2 }, + resources => { + repository => { + type => 'git', + url => 'https://github.com/p-alik/Gearman-Server.git', + web => 'https://github.com/p-alik/Gearman-Server', + }, + }, + }, + +); diff --git a/README.md b/README.md new file mode 100644 index 0000000..fc4a44c --- /dev/null +++ b/README.md @@ -0,0 +1,14 @@ +Gearman::Server +=================== + +[![CPAN version](https://badge.fury.io/pl/Gearman-Server.png)](https://badge.fury.io/pl/Gearman-Server) +[![Build Status](https://travis-ci.org/p-alik/Gearman-Server.png)](https://travis-ci.org/p-alik/Gearman-Server) +[![Coverage Status](https://coveralls.io/repos/github/p-alik/Gearman-Server/badge.png)](https://coveralls.io/github/p-alik/Gearman-Server) + +This repository contains perl implementation of [Gearman](http://gearman.org) daemon + +see also +------------ +* [Gearman::Client](https://metacpan.org/pod/Gearman::Client) - Client for gearman distributed job system +* [Gearman::Worker](https://metacpan.org/pod/Gearman::Worker) - Worker for gearman distributed job system + diff --git a/gearmand b/bin/gearmand similarity index 58% rename from gearmand rename to bin/gearmand index 106c6bf..4c5192b 100755 --- a/gearmand +++ b/bin/gearmand @@ -1,4 +1,4 @@ -#!/usr/bin/perl +#!/usr/bin/env perl =head1 NAME @@ -23,15 +23,21 @@ daemonization. Make the daemon run in the background (good for init.d scripts, bad for running under daemontools/supervise). -=item --port=7003 / -p 7003 +=item --port=4730 / -p 4730 -Set the port number, defaults to 7003. +Set the port number, default to 4730. + +=item --listen hostname / -L hostname + +Address the server should listen on. + +Default is =item --pidfile=/some/dir/gearmand.pid Write a pidfile when starting up -=item --debug=1 +=item --debug Enable debugging (currently the only debug output is when a client or worker connects). @@ -59,9 +65,13 @@ the queue. Zero (0) means go as fast as possible, but not all at the same time. Similar to -1 on --wakeup, but is more cooperative in gearmand's multitasking model. -Negative One (-1) means that this event won't happe, so only the initial workers will be woken up to +Negative One (-1) means that this event won't happen, so only the initial workers will be woken up to handle jobs in the queue. +=item --version + +Display the version and exit. + =back =head1 COPYRIGHT @@ -95,59 +105,61 @@ L package Gearmand; use strict; use warnings; + BEGIN { - $^P = 0x200; # Provide informative names to anonymous subroutines + # Provide informative names to anonymous subroutines + $^P = 0x200; } -use FindBin; -use lib "$FindBin::Bin/lib"; + use Gearman::Server; -use Getopt::Long; use Carp; use Danga::Socket 1.52; -use IO::Socket::INET; +use Getopt::Long; use POSIX (); -use Gearman::Util; -use vars qw($DEBUG); +use Pod::Usage; use Scalar::Util (); -$DEBUG = 0; +use vars qw($DEBUG); -my ( - $daemonize, - $nokeepalive, - $notify_pid, - $opt_pidfile, - $accept, - $wakeup, - $wakeup_delay, - ); -my $conf_port = 7003; - -Getopt::Long::GetOptions( - 'd|daemonize' => \$daemonize, - 'p|port=i' => \$conf_port, - 'debug=i' => \$DEBUG, - 'pidfile=s' => \$opt_pidfile, - 'accept=i' => \$accept, - 'wakeup=i' => \$wakeup, - 'wakeup-delay=f' => \$wakeup_delay, - 'notifypid|n=i' => \$notify_pid, # for test suite only. - ); +$DEBUG = 0; +my $port = 4730; + +GetOptions( + 'accept=i' => \my $accept, + 'debug' => \$DEBUG, + 'd|daemonize' => \my $daemonize, + 'listen|L=s' => \my $listen, + 'pidfile=s' => \my $opt_pidfile, + 'p|port=i' => \$port, + 'wakeup-delay=f' => \my $wakeup_delay, + 'wakeup=i' => \my $wakeup, + 'version|V' => sub { + print "Gearman::Server $Gearman::Server::VERSION$/"; + exit; + }, + 'help|?' => sub { + pod2usage(-verbose => 1); + exit; + }, + + # for test suite only. + 'notifypid|n=i' => \my $notify_pid, +); daemonize() if $daemonize; -# true if we've closed listening socket, and we're waiting for a -# convenient place to kill the process -our $graceful_shutdown = 0; - -$SIG{'PIPE'} = "IGNORE"; # handled manually - +# handled manually +$SIG{'PIPE'} = "IGNORE"; my $server = Gearman::Server->new( - wakeup => $wakeup, - wakeup_delay => $wakeup_delay, - ); -my $ssock = $server->create_listening_sock($conf_port, accept_per_loop => $accept); + wakeup => $wakeup, + wakeup_delay => $wakeup_delay, +); +my $ssock = $server->create_listening_sock( + $port, + accept_per_loop => $accept, + local_addr => $listen +); if ($opt_pidfile) { open my $fh, '>', $opt_pidfile or die "Could not open $opt_pidfile: $!"; @@ -155,23 +167,8 @@ if ($opt_pidfile) { close $fh; } -sub shutdown_graceful { - return if $graceful_shutdown; - - my $ofds = Danga::Socket->OtherFds; - delete $ofds->{fileno($ssock)}; - $ssock->close; - $graceful_shutdown = 1; - shutdown_if_calm(); -} - -sub shutdown_if_calm { - exit 0 unless $server->jobs_outstanding; -} - sub daemonize { - my ($pid, $sess_id, $i); - + my ($pid, $sess_id); ## Fork and exit parent if ($pid = fork) { exit 0; } @@ -179,32 +176,27 @@ sub daemonize { croak "Cannot detach from controlling terminal" unless $sess_id = POSIX::setsid(); - ## Prevent possibility of acquiring a controling terminal + ## Prevent possibility of acquiring a controlling terminal $SIG{'HUP'} = 'IGNORE'; if ($pid = fork) { exit 0; } ## Change working directory - chdir "/"; + chdir("/") || croak "can't chdir to /: $!"; ## Clear file creation mask umask 0; ## Close open file descriptors - close(STDIN); - close(STDOUT); - close(STDERR); + close(STDIN) || croak "can't close STDIN: $!"; + close(STDOUT) || croak "can't close STDOUT: $!"; + close(STDERR) || croak "can't close STDERR: $!"; ## Reopen stderr, stdout, stdin to /dev/null - open(STDIN, "+>/dev/null"); - open(STDOUT, "+>&STDIN"); - open(STDERR, "+>&STDIN"); -} + open(STDIN, "+>/dev/null") || croak "can't write to /dev/null: $!"; + open(STDOUT, "+>&STDIN") || croak "can't dup STDOUT to STDIN: $!"; + open(STDERR, "+>&STDIN") || croak "can't dup STDERR to STDIN: $!"; +} ## end sub daemonize kill 'USR1', $notify_pid if $notify_pid; Danga::Socket->EventLoop(); -# Local Variables: -# mode: perl -# c-basic-indent: 4 -# indent-tabs-mode: nil -# End: diff --git a/lib/Gearman/Server.pm b/lib/Gearman/Server.pm index 9f64c65..d938768 100644 --- a/lib/Gearman/Server.pm +++ b/lib/Gearman/Server.pm @@ -1,4 +1,9 @@ package Gearman::Server; +use version (); +$Gearman::Server::VERSION = version->declare("1.140_001"); + +use strict; +use warnings; =head1 NAME @@ -23,32 +28,39 @@ script, and not use Gearman::Server directly. =cut -use strict; +use Carp qw(croak); use Gearman::Server::Client; use Gearman::Server::Listener; use Gearman::Server::Job; -use Socket qw(IPPROTO_TCP SOL_SOCKET SOCK_STREAM AF_UNIX SOCK_STREAM PF_UNSPEC); -use Carp qw(croak); -use Sys::Hostname (); +use Gearman::Util; +use IO::Socket::INET; use IO::Handle (); +use Socket qw/ + IPPROTO_TCP + SOL_SOCKET + SOCK_STREAM + AF_UNIX + SOCK_STREAM + PF_UNSPEC + /; +use Sys::Hostname (); use fields ( - 'client_map', # fd -> Client - 'sleepers', # func -> { "Client=HASH(0xdeadbeef)" => Client } - 'sleepers_list', # func -> [ Client, ... ], ... - 'job_queue', # job_name -> [Job, Job*] (key only exists if non-empty) - 'job_of_handle', # handle -> Job - 'max_queue', # func -> configured max jobqueue size - 'job_of_uniq', # func -> uniq -> Job - 'handle_ct', # atomic counter - 'handle_base', # atomic counter - 'listeners', # arrayref of listener objects - 'wakeup', # number of workers to wake - 'wakeup_delay', # seconds to wait before waking more workers - 'wakeup_timers', # func -> timer, timer to be canceled or adjusted when job grab/inject is called - ); - -our $VERSION = "1.12"; + 'client_map', # fd -> Client + 'sleepers', # func -> { "Client=HASH(0xdeadbeef)" => Client } + 'sleepers_list', # func -> [ Client, ... ], ... + 'job_queue', # job_name -> [Job, Job*] (key only exists if non-empty) + 'job_of_handle', # handle -> Job + 'max_queue', # func -> configured max jobqueue size + 'job_of_uniq', # func -> uniq -> Job + 'handle_ct', # atomic counter + 'handle_base', # atomic counter + 'listeners', # arrayref of listener objects + 'wakeup', # number of workers to wake + 'wakeup_delay', # seconds to wait before waking more workers + 'wakeup_timers', # func -> timer, timer to be canceled or adjusted + # when job grab/inject is called +); =head1 METHODS @@ -56,15 +68,46 @@ our $VERSION = "1.12"; $server_object = Gearman::Server->new( %options ) -Creates and returns a new Gearman::Server object, which attaches itself to the Danga::Socket event loop. The server will begin operating when the Danga::Socket runloop is started. This means you need to start up the runloop before anything will happen. +Creates and returns a new Gearman::Server object, which attaches itself to the +L event loop. The server will begin operating when the +L runloop is started. This means you need to start up the +runloop before anything will happen. Options: =over -=item port +=item + +port + +Specify a port which you would like the B to listen on for TCP connections (not necessary, but useful) + +=item + +wakeup + +Number of workers to wake up per job inserted into the queue. + +Zero (0) is a perfectly acceptable answer, and can be used if you don't care much about job latency. +This would bank on the base idea of a worker checking in with the server every so often. + +Negative One (-1) indicates that all sleeping workers should be woken up. + +All other negative numbers will cause the server to throw exception and not start. -Specify a port which you would like the Gearman::Server to listen on for TCP connections (not necessary, but useful) +=item + +wakeup_delay + +Time interval before waking up more workers (the value specified by B) when jobs are still in +the queue. + +Zero (0) means go as fast as possible, but not all at the same time. Similar to -1 on B, but +is more cooperative in gearmand's multitasking model. + +Negative One (-1) means that this event won't happen, so only the initial workers will be woken up to +handle jobs in the queue. =back @@ -72,27 +115,29 @@ Specify a port which you would like the Gearman::Server to listen on for TCP con sub new { my ($class, %opts) = @_; - my $self = ref $class ? $class : fields::new($class); - - $self->{client_map} = {}; - $self->{sleepers} = {}; - $self->{sleepers_list} = {}; - $self->{job_queue} = {}; - $self->{job_of_handle} = {}; - $self->{max_queue} = {}; - $self->{job_of_uniq} = {}; - $self->{listeners} = []; - $self->{wakeup} = 3; - $self->{wakeup_delay} = .1; - $self->{wakeup_timers} = {}; - - $self->{handle_ct} = 0; + my $self = ref($class) ? $class : fields::new($class); + + $self->{$_} = {} for qw/ + client_map + sleepers + sleepers_list + job_queue + job_of_handle + max_queue + job_of_uniq + wakeup_timers + /; + + $self->{listeners} = []; + $self->{wakeup} = 3; + $self->{wakeup_delay} = .1; + + $self->{handle_ct} = 0; $self->{handle_base} = "H:" . Sys::Hostname::hostname() . ":"; my $port = delete $opts{port}; my $wakeup = delete $opts{wakeup}; - if (defined $wakeup) { die "Invalid value passed in wakeup option" if $wakeup < 0 && $wakeup != -1; @@ -100,7 +145,6 @@ sub new { } my $wakeup_delay = delete $opts{wakeup_delay}; - if (defined $wakeup_delay) { die "Invalid value passed in wakeup_delay option" if $wakeup_delay < 0 && $wakeup_delay != -1; @@ -108,21 +152,31 @@ sub new { } croak("Unknown options") if %opts; + $self->create_listening_sock($port); return $self; -} +} ## end sub new sub debug { my ($self, $msg) = @_; - #warn "$msg\n"; + + warn "$msg\n"; } -=head2 create_listening_sock +=head2 create_listening_sock($portnum, %options) + +Add a TCP port listener for incoming Gearman worker and client connections. Options: + +=over 4 + +=item accept_per_loop - $server_object->create_listening_sock( $portnum ) +=item local_addr -Add a TCP port listener for incoming Gearman worker and client connections. +Bind socket to only this address. + +=back =cut @@ -130,50 +184,81 @@ sub create_listening_sock { my ($self, $portnum, %opts) = @_; my $accept_per_loop = delete $opts{accept_per_loop}; + my $local_addr = delete $opts{local_addr}; - warn "Extra options passed into create_listening_sock: " . join(', ', keys %opts) . "\n" + warn "Extra options passed into create_listening_sock: " + . join(', ', keys %opts) . "\n" if keys %opts; - my $ssock = IO::Socket::INET->new(LocalPort => $portnum, - Type => SOCK_STREAM, - Proto => IPPROTO_TCP, - Blocking => 0, - Reuse => 1, - Listen => 1024 ) - or die "Error creating socket: $@\n"; + my $ssock = IO::Socket::INET->new( + LocalPort => $portnum, + Type => SOCK_STREAM, + Proto => IPPROTO_TCP, + Blocking => 0, + Reuse => 1, + Listen => 1024, + ($local_addr ? (LocalAddr => $local_addr) : ()) + ) or die "Error creating socket: $@\n"; my $listeners = $self->{listeners}; - push @$listeners, Gearman::Server::Listener->new($ssock, $self, accept_per_loop => $accept_per_loop); + push @$listeners, + Gearman::Server::Listener->new($ssock, $self, + accept_per_loop => $accept_per_loop); return $ssock; -} +} ## end sub create_listening_sock + +=head2 new_client($sock) + +init new L object and add it to internal clients map + +=cut sub new_client { my ($self, $sock) = @_; my $client = Gearman::Server::Client->new($sock, $self); $client->watch_read(1); - $self->{client_map}{$client->{fd}} = $client; -} + $self->{client_map}{ $client->{fd} } = $client; +} ## end sub new_client + +=head2 note_disconnected_client($client) + +delete the client from internal clients map + +B deleted object + +=cut sub note_disconnected_client { my ($self, $client) = @_; - delete $self->{client_map}{$client->{fd}}; + delete $self->{client_map}{ $client->{fd} }; } +=head2 clients() + +B internal clients map + +=cut + sub clients { my $self = shift; return values %{ $self->{client_map} }; } -# Returns a socket that is connected to the server, we can then use this -# socket with a Gearman::Client::Async object to run clients and servers in the -# same thread. +=head2 to_inprocess_server() + +Returns a socket that is connected to the server, we can then use this +socket with a Gearman::Client::Async object to run clients and servers in the +same thread. + +=cut + sub to_inprocess_server { my $self = shift; my ($psock, $csock); socketpair($csock, $psock, AF_UNIX, SOCK_STREAM, PF_UNSPEC) - or die "socketpair: $!"; + or die "socketpair: $!"; $csock->autoflush(1); $psock->autoflush(1); @@ -184,14 +269,14 @@ sub to_inprocess_server { my $client = Gearman::Server::Client->new($csock, $self); my ($package, $file, $line) = caller; - $client->{peer_ip} = "[$package|$file|$line]"; + $client->{peer_ip} = "[$package|$file|$line]"; $client->watch_read(1); - $self->{client_map}{$client->{fd}} = $client; + $self->{client_map}{ $client->{fd} } = $client; return $psock; -} +} ## end sub to_inprocess_server -=head2 start_worker +=head2 start_worker($prog) $pid = $server_object->start_worker( $prog ) @@ -206,7 +291,7 @@ sub start_worker { my ($psock, $csock); socketpair($csock, $psock, AF_UNIX, SOCK_STREAM, PF_UNSPEC) - or die "socketpair: $!"; + or die "socketpair: $!"; $csock->autoflush(1); $psock->autoflush(1); @@ -222,15 +307,20 @@ sub start_worker { local $ENV{'GEARMAN_WORKER_USE_STDIO'} = 1; close(STDIN); close(STDOUT); - open(STDIN, '<&', $psock) or die "Unable to dup socketpair to STDIN: $!"; - open(STDOUT, '>&', $psock) or die "Unable to dup socketpair to STDOUT: $!"; + open(STDIN, '<&', $psock) + or die "Unable to dup socketpair to STDIN: $!"; + open(STDOUT, '>&', $psock) + or die "Unable to dup socketpair to STDOUT: $!"; if (UNIVERSAL::isa($prog, "CODE")) { $prog->(); - exit 0; # shouldn't get here. subref should exec. - } + + # shouldn't get here. subref should exec. + exit 0; + } ## end if (UNIVERSAL::isa($prog...)) + exec $prog; die "Exec failed: $!"; - } + } ## end unless ($pid) close($psock); @@ -239,34 +329,46 @@ sub start_worker { my $client = Gearman::Server::Client->new($sock, $self); - $client->{peer_ip} = "[gearman_child]"; + $client->{peer_ip} = "[gearman_child]"; $client->watch_read(1); - $self->{client_map}{$client->{fd}} = $client; + $self->{client_map}{ $client->{fd} } = $client; return wantarray ? ($pid, $client) : $pid; -} +} ## end sub start_worker + +=head2 enqueue_job() + +=cut sub enqueue_job { my ($self, $job, $highpri) = @_; - my $jq = ($self->{job_queue}{$job->{func}} ||= []); + my $jq = ($self->{job_queue}{ $job->{func} } ||= []); - if (defined (my $max_queue_size = $self->{max_queue}{$job->{func}})) { - $max_queue_size--; # Subtract one, because we're about to add one more below. + if (defined(my $max_queue_size = $self->{max_queue}{ $job->{func} })) { + + # Subtract one, because we're about to add one more below. + $max_queue_size--; while (@$jq > $max_queue_size) { my $delete_job = pop @$jq; - my $msg = Gearman::Util::pack_res_command("work_fail", $delete_job->handle); + my $msg = Gearman::Util::pack_res_command("work_fail", + $delete_job->handle); $delete_job->relay_to_listeners($msg); $delete_job->note_finished; - } - } + } ## end while (@$jq > $max_queue_size) + } ## end if (defined(my $max_queue_size...)) if ($highpri) { unshift @$jq, $job; - } else { + } + else { push @$jq, $job; } - $self->{job_of_handle}{$job->{'handle'}} = $job; -} + $self->{job_of_handle}{ $job->{'handle'} } = $job; +} ## end sub enqueue_job + +=head2 wake_up_sleepers($func) + +=cut sub wake_up_sleepers { my ($self, $func) = @_; @@ -285,21 +387,24 @@ sub wake_up_sleepers { # If we're only going to wakeup 0 workers anyways, don't set up a timer. return if $self->{wakeup} == 0; - my $timer = Danga::Socket->AddTimer($delay, sub { - # Be sure to not wake up more sleepers if we have no jobs in the queue. - # I know the object definition above says I can trust the func element to determine - # if there are items in the list, but I'm just gonna be safe, rather than sorry. - return unless @{$self->{job_queue}{$func} || []}; - $self->wake_up_sleepers($func) - }); + my $timer = Danga::Socket->AddTimer( + $delay, + sub { + # Be sure to not wake up more sleepers if we have no jobs in the queue. + # I know the object definition above says I can trust the func element to determine + # if there are items in the list, but I'm just gonna be safe, rather than sorry. + return unless @{ $self->{job_queue}{$func} || [] }; + $self->wake_up_sleepers($func); + } + ); $self->{wakeup_timers}->{$func} = $timer; -} +} ## end sub wake_up_sleepers # Returns true when there are still more workers to wake up # False if there are no sleepers sub _wake_up_some { my ($self, $func) = @_; - my $sleepmap = $self->{sleepers}{$func} or return; + my $sleepmap = $self->{sleepers}{$func} or return; my $sleeporder = $self->{sleepers_list}{$func} or return; # TODO SYNC UP STATE HERE IN CASE TWO LISTS END UP OUT OF SYNC @@ -308,7 +413,7 @@ sub _wake_up_some { while (@$sleeporder) { my Gearman::Server::Client $c = shift @$sleeporder; - next if $c->{closed} || ! $c->{sleeping}; + next if $c->{closed} || !$c->{sleeping}; if ($max-- <= 0) { unshift @$sleeporder, $c; return 1; @@ -316,18 +421,23 @@ sub _wake_up_some { delete $sleepmap->{"$c"}; $c->res_packet("noop"); $c->{sleeping} = 0; - } + } ## end while (@$sleeporder) delete $self->{sleepers}{$func}; delete $self->{sleepers_list}{$func}; return; -} +} ## end sub _wake_up_some + +=head2 on_client_sleep($client) + +=cut sub on_client_sleep { my $self = shift; my Gearman::Server::Client $cl = shift; - foreach my $cd (@{$cl->{can_do_list}}) { + foreach my $cd (@{ $cl->{can_do_list} }) { + # immediately wake the sleeper up if there are things to be done if ($self->{job_queue}{$cd}) { $cl->res_packet("noop"); @@ -348,32 +458,49 @@ sub on_client_sleep { if ($jobs_done) { unshift @$sleeporder, $cl; - } else { + } + else { push @$sleeporder, $cl; } $cl->{jobs_done_since_sleep} = 0; - } -} + } ## end foreach my $cd (@{ $cl->{can_do_list...}}) +} ## end sub on_client_sleep + +=head2 jobs_outstanding() + +=cut sub jobs_outstanding { my Gearman::Server $self = shift; return scalar keys %{ $self->{job_queue} }; } +=head2 jobs() + +=cut + sub jobs { my Gearman::Server $self = shift; return values %{ $self->{job_of_handle} }; } +=head2 jobs_by_handle($ahndle) + +=cut + sub job_by_handle { my ($self, $handle) = @_; return $self->{job_of_handle}{$handle}; } +=head2 note_job_finished($job) + +=cut + sub note_job_finished { - my Gearman::Server $self = shift; + my Gearman::Server $self = shift; my Gearman::Server::Job $job = shift; if (my Gearman::Server::Client $worker = $job->worker) { @@ -381,38 +508,74 @@ sub note_job_finished { } if (length($job->{uniq})) { - delete $self->{job_of_uniq}{$job->{func}}{$job->{uniq}}; + delete $self->{job_of_uniq}{ $job->{func} }{ $job->{uniq} }; } - delete $self->{job_of_handle}{$job->{handle}}; -} + delete $self->{job_of_handle}{ $job->{handle} }; +} ## end sub note_job_finished + +=head2 set_max_queue($func, $max) + +=over + +=item + +$func + +function name + +=item + +$max + +0/undef/"" to reset. else integer max depth. + +=back + +=cut -# <0/undef/"" to reset. else integer max depth. sub set_max_queue { my ($self, $func, $max) = @_; - if (defined $max && length $max && $max >= 0) { + if (defined($max) && length($max) && $max > 0) { $self->{max_queue}{$func} = int($max); - } else { + } + else { delete $self->{max_queue}{$func}; } -} +} ## end sub set_max_queue + +=head2 new_job_handle() + +=cut sub new_job_handle { my $self = shift; return $self->{handle_base} . (++$self->{handle_ct}); } +=head2 job_of_unique($func, $uniq) + +=cut + sub job_of_unique { my ($self, $func, $uniq) = @_; return undef unless $self->{job_of_uniq}{$func}; return $self->{job_of_uniq}{$func}{$uniq}; } +=head2 set_unique_job($func, $uniq, $job) + +=cut + sub set_unique_job { my ($self, $func, $uniq, $job) = @_; $self->{job_of_uniq}{$func} ||= {}; $self->{job_of_uniq}{$func}{$uniq} = $job; } +=head2 grab_job($func) + +=cut + sub grab_job { my ($self, $func) = @_; return undef unless $self->{job_queue}{$func}; @@ -424,17 +587,16 @@ sub grab_job { my Gearman::Server::Job $job; while (1) { - $job = shift @{$self->{job_queue}{$func}}; + $job = shift @{ $self->{job_queue}{$func} }; return $empty->() unless $job; return $job unless $job->require_listener; - foreach my Gearman::Server::Client $c (@{$job->{listeners}}) { - return $job if $c && ! $c->{closed}; + foreach my Gearman::Server::Client $c (@{ $job->{listeners} }) { + return $job if $c && !$c->{closed}; } $job->note_finished(0); - } -} - + } ## end while (1) +} ## end sub grab_job 1; __END__ diff --git a/lib/Gearman/Server/Client.pm b/lib/Gearman/Server/Client.pm index 362304f..61e360b 100644 --- a/lib/Gearman/Server/Client.pm +++ b/lib/Gearman/Server/Client.pm @@ -1,10 +1,15 @@ package Gearman::Server::Client; +use version (); +$Gearman::Server::Client::VERSION = version->declare("1.140_001"); + +use strict; +use warnings; =head1 NAME -Gearman::Server::Client +Gearman::Server::Client - client for L based on L -=head1 NAME +=head1 DESCRIPTION Used by L to instantiate connections from clients. Clients speak either a binary protocol, for normal operation (calling @@ -18,30 +23,44 @@ and L, if that's any consolation. The line-based administrative commands are documented below. +=head1 METHODS + =cut -use strict; +use Gearman::Util; use Danga::Socket; use base 'Danga::Socket'; use fields ( - 'can_do', # { $job_name => $timeout } $timeout can be undef indicating no timeout - 'can_do_list', - 'can_do_iter', - 'fast_read', - 'fast_buffer', - 'read_buf', - 'sleeping', # 0/1: they've said they're sleeping and we haven't woken them up - 'timer', # Timer for job cancellation - 'doing', # { $job_handle => Job } - 'client_id', # opaque string, no whitespace. workers give this so checker scripts - # can tell apart the same worker connected to multiple jobservers. - 'server', # pointer up to client's server - 'options', - 'jobs_done_since_sleep', - ); + + # { $job_name => $timeout } $timeout can be undef indicating no timeout + 'can_do', + 'can_do_list', + 'can_do_iter', + 'fast_read', + 'fast_buffer', + 'read_buf', + + # 0/1: they've said they're sleeping and we haven't woken them up + 'sleeping', + + # Timer for job cancellation + 'timer', + + # { $job_handle => Job } + 'doing', + + # opaque string, no whitespace. workers give this so checker scripts + # can tell apart the same worker connected to multiple jobservers. + 'client_id', + + # pointer up to client's server + 'server', + 'options', + 'jobs_done_since_sleep', +); # 60k read buffer default, similar to perlbal's backend read. -use constant READ_SIZE => 60 * 1024; +use constant READ_SIZE => 60 * 1024; use constant MAX_READ_SIZE => 512 * 1024; # Class Method: @@ -51,28 +70,43 @@ sub new { $self = fields::new($self) unless ref $self; $self->SUPER::new($sock); - $self->{fast_read} = undef; # Number of bytes to read as fast as we can (don't try to process them) - $self->{fast_buffer} = []; # Array of buffers used during fast read operation + # Number of bytes to read as fast as we can (don't try to process them) + $self->{fast_read} = undef; + + # Array of buffers used during fast read operation + $self->{fast_buffer} = []; $self->{read_buf} = ''; $self->{sleeping} = 0; $self->{can_do} = {}; - $self->{doing} = {}; # handle -> Job + + # handle -> Job + $self->{doing} = {}; $self->{can_do_list} = []; - $self->{can_do_iter} = 0; # numeric iterator for where we start looking for jobs - $self->{client_id} = "-"; - $self->{server} = $server; - $self->{options} = {}; + + # numeric iterator for where we start looking for jobs + $self->{can_do_iter} = 0; + $self->{client_id} = "-"; + $self->{server} = $server; + $self->{options} = {}; $self->{jobs_done_since_sleep} = 0; return $self; -} +} ## end sub new + +=head2 option($option) + +=cut sub option { my Gearman::Server::Client $self = shift; my $option = shift; return $self->{options}->{$option}; -} +} ## end sub option + +=head2 close() + +=cut sub close { my Gearman::Server::Client $self = shift; @@ -91,34 +125,40 @@ sub close { # Remove self from sleepers, otherwise it will be leaked if another worker # for the job never connects. - my $sleepers = $self->{server}{sleepers}; + my $sleepers = $self->{server}{sleepers}; my $sleepers_list = $self->{server}{sleepers_list}; for my $job (@{ $self->{can_do_list} }) { my $sleeping = $sleepers->{$job}; delete $sleeping->{$self}; my $new_sleepers_list; - for my $client (@{$sleepers_list->{$job}}) { + for my $client (@{ $sleepers_list->{$job} }) { next unless $client; push @{$new_sleepers_list}, $client unless $sleeping->{$client}; } if ($new_sleepers_list) { $self->{server}{sleepers_list}->{$job} = $new_sleepers_list; - } else { + } + else { delete $self->{server}{sleepers_list}->{$job}; } delete $sleepers->{$job} unless %$sleeping; - } + } ## end for my $job (@{ $self->...}) $self->{server}->note_disconnected_client($self); $self->CMD_reset_abilities; $self->SUPER::close; -} +} ## end sub close + +=head2 event_read() + +read from socket + +=cut -# Client sub event_read { my Gearman::Server::Client $self = shift; @@ -127,22 +167,23 @@ sub event_read { # Delay close till after buffers are written on EOF. If we are unable # to write 'err' or 'hup' will be thrown and we'll close faster. - return $self->write(sub { $self->close } ) unless defined $bref; + return $self->write(sub { $self->close }) unless defined $bref; if ($self->{fast_read}) { - push @{$self->{fast_buffer}}, $$bref; + push @{ $self->{fast_buffer} }, $$bref; $self->{fast_read} -= length($$bref); # If fast_read is still positive, then we need to read more data return if ($self->{fast_read} > 0); # Append the whole giant read buffer to our main read buffer - $self->{read_buf} .= join('', @{$self->{fast_buffer}}); + $self->{read_buf} .= join('', @{ $self->{fast_buffer} }); # Reset the fast read state for next time. $self->{fast_buffer} = []; - $self->{fast_read} = undef; - } else { + $self->{fast_read} = undef; + } ## end if ($self->{fast_read}) + else { # Exact read size length likely means we have more sitting on the # socket. Buffer up to half a meg in one go. if (length($$bref) == READ_SIZE) { @@ -153,9 +194,9 @@ sub event_read { last if (length($$cref) < READ_SIZE || $limit-- < 1); } $bref = \join('', @crefs); - } + } ## end if (length($$bref) == ...) $self->{read_buf} .= $$bref; - } + } ## end else [ if ($self->{fast_read})] my $found_cmd; do { @@ -165,26 +206,34 @@ sub event_read { if ($self->{read_buf} =~ /^\0REQ(.{8,8})/s) { my ($cmd, $len) = unpack("NN", $1); if ($blen < $len + 12) { + # Start a fast read loop to get all the data we need, less # what we already have in the buffer. $self->{fast_read} = $len + 12 - $blen; return; - } + } ## end if ($blen < $len + 12) $self->process_cmd($cmd, substr($self->{read_buf}, 12, $len)); # and slide down buf: - $self->{read_buf} = substr($self->{read_buf}, 12+$len); + $self->{read_buf} = substr($self->{read_buf}, 12 + $len); + + } ## end if ($self->{read_buf} ...) + elsif ($self->{read_buf} =~ s/^(\w.+?)?\r?\n//) { - } elsif ($self->{read_buf} =~ s/^(\w.+?)?\r?\n//) { # ASCII command case (useful for telnetting in) my $line = $1; $self->process_line($line); - } else { + } ## end elsif ($self->{read_buf} ...) + else { $found_cmd = 0; } } while ($found_cmd); -} +} ## end sub event_read + +=head2 event_write() + +=cut sub event_write { my $self = shift; @@ -192,7 +241,12 @@ sub event_write { $self->watch_write(0) if $done; } -# Line based command processor +=head2 process_line($line) + +Line based command processor + +=cut + sub process_line { my Gearman::Server::Client $self = shift; my $line = shift; @@ -205,10 +259,10 @@ sub process_line { $code->($self, $args); return; } - } + } ## end if ($line && $line =~ ...) return $self->err_line('unknown_command'); -} +} ## end sub process_line =head1 Binary Protocol Structure @@ -239,7 +293,7 @@ sub CMD_echo_req { my $blobref = shift; return $self->res_packet("echo_res", $$blobref); -} +} ## end sub CMD_echo_req sub CMD_work_status { my Gearman::Server::Client $self = shift; @@ -247,13 +301,14 @@ sub CMD_work_status { my ($handle, $nu, $de) = split(/\0/, $$ar); my $job = $self->{doing}{$handle}; - return $self->error_packet("not_worker") unless $job && $job->worker == $self; + return $self->error_packet("not_worker") + unless $job && $job->worker == $self; my $msg = Gearman::Util::pack_res_command("work_status", $$ar); $job->relay_to_listeners($msg); $job->status([$nu, $de]); return 1; -} +} ## end sub CMD_work_status sub CMD_work_complete { my Gearman::Server::Client $self = shift; @@ -263,9 +318,11 @@ sub CMD_work_complete { my $handle = $1; my $job = delete $self->{doing}{$handle}; - return $self->error_packet("not_worker") unless $job && $job->worker == $self; + return $self->error_packet("not_worker") + unless $job && $job->worker == $self; - my $msg = Gearman::Util::pack_res_command("work_complete", join("\0", $handle, $$ar)); + my $msg = Gearman::Util::pack_res_command("work_complete", + join("\0", $handle, $$ar)); $job->relay_to_listeners($msg); $job->note_finished(1); if (my $timer = $self->{timer}) { @@ -274,14 +331,15 @@ sub CMD_work_complete { } return 1; -} +} ## end sub CMD_work_complete sub CMD_work_fail { my Gearman::Server::Client $self = shift; - my $ar = shift; - my $handle = $$ar; - my $job = delete $self->{doing}{$handle}; - return $self->error_packet("not_worker") unless $job && $job->worker == $self; + my $ar = shift; + my $handle = $$ar; + my $job = delete $self->{doing}{$handle}; + return $self->error_packet("not_worker") + unless $job && $job->worker == $self; my $msg = Gearman::Util::pack_res_command("work_fail", $handle); $job->relay_to_listeners($msg); @@ -292,7 +350,7 @@ sub CMD_work_fail { } return 1; -} +} ## end sub CMD_work_fail sub CMD_work_exception { my Gearman::Server::Client $self = shift; @@ -300,28 +358,30 @@ sub CMD_work_exception { $$ar =~ s/^(.+?)\0//; my $handle = $1; - my $job = $self->{doing}{$handle}; + my $job = $self->{doing}{$handle}; - return $self->error_packet("not_worker") unless $job && $job->worker == $self; + return $self->error_packet("not_worker") + unless $job && $job->worker == $self; - my $msg = Gearman::Util::pack_res_command("work_exception", join("\0", $handle, $$ar)); + my $msg = Gearman::Util::pack_res_command("work_exception", + join("\0", $handle, $$ar)); $job->relay_to_option_listeners($msg, "exceptions"); return 1; -} +} ## end sub CMD_work_exception sub CMD_pre_sleep { my Gearman::Server::Client $self = shift; $self->{'sleeping'} = 1; $self->{server}->on_client_sleep($self); return 1; -} +} ## end sub CMD_pre_sleep sub CMD_grab_job { my Gearman::Server::Client $self = shift; my $job; - my $can_do_size = scalar @{$self->{can_do_list}}; + my $can_do_size = scalar @{ $self->{can_do_list} }; unless ($can_do_size) { $self->res_packet("no_job"); @@ -341,31 +401,32 @@ sub CMD_grab_job { or next; $job->worker($self); - $self->{doing}{$job->handle} = $job; + $self->{doing}{ $job->handle } = $job; my $timeout = $self->{can_do}->{$job_to_grab}; if (defined $timeout) { - my $timer = Danga::Socket->AddTimer($timeout, sub { - return $self->error_packet("not_worker") unless $job->worker == $self; - - my $msg = Gearman::Util::pack_res_command("work_fail", $job->handle); - $job->relay_to_listeners($msg); - $job->note_finished(1); - $job->clear_listeners; - $self->{timer} = undef; - }); + my $timer = Danga::Socket->AddTimer( + $timeout, + sub { + return $self->error_packet("not_worker") + unless $job->worker == $self; + + my $msg = Gearman::Util::pack_res_command("work_fail", + $job->handle); + $job->relay_to_listeners($msg); + $job->note_finished(1); + $job->clear_listeners; + $self->{timer} = undef; + } + ); $self->{timer} = $timer; - } + } ## end if (defined $timeout) return $self->res_packet("job_assign", - join("\0", - $job->handle, - $job->func, - ${$job->argref}, - )); - } + join("\0", $job->handle, $job->func, ${ $job->argref },)); + } ## end while ($tried < $can_do_size) $self->res_packet("no_job"); -} +} ## end sub CMD_grab_job sub CMD_can_do { my Gearman::Server::Client $self = shift; @@ -373,7 +434,7 @@ sub CMD_can_do { $self->{can_do}->{$$ar} = undef; $self->_setup_can_do_list; -} +} ## end sub CMD_can_do sub CMD_can_do_timeout { my Gearman::Server::Client $self = shift; @@ -383,12 +444,13 @@ sub CMD_can_do_timeout { if (defined $timeout) { $self->{can_do}->{$task} = $timeout; - } else { + } + else { $self->{can_do}->{$task} = undef; } $self->_setup_can_do_list; -} +} ## end sub CMD_can_do_timeout sub CMD_option_req { my Gearman::Server::Client $self = shift; @@ -404,7 +466,7 @@ sub CMD_option_req { } return $self->error_packet("unknown_option"); -} +} ## end sub CMD_option_req sub CMD_set_client_id { my Gearman::Server::Client $self = shift; @@ -413,7 +475,7 @@ sub CMD_set_client_id { $self->{client_id} = $$ar; $self->{client_id} =~ s/\s+//g; $self->{client_id} = "-" unless length $self->{client_id}; -} +} ## end sub CMD_set_client_id sub CMD_cant_do { my Gearman::Server::Client $self = shift; @@ -421,18 +483,18 @@ sub CMD_cant_do { delete $self->{can_do}->{$$ar}; $self->_setup_can_do_list; -} +} ## end sub CMD_cant_do sub CMD_get_status { my Gearman::Server::Client $self = shift; - my $ar = shift; - my $job = $self->{server}->job_by_handle($$ar); + my $ar = shift; + my $job = $self->{server}->job_by_handle($$ar); # handles can't contain nulls return if $$ar =~ /\0/; my ($known, $running, $num, $den); - $known = 0; + $known = 0; $running = 0; if ($job) { $known = 1; @@ -440,87 +502,83 @@ sub CMD_get_status { if (my $stat = $job->status) { ($num, $den) = @$stat; } - } + } ## end if ($job) $num = '' unless defined $num; $den = '' unless defined $den; - $self->res_packet("status_res", join("\0", - $$ar, - $known, - $running, - $num, - $den)); -} + $self->res_packet("status_res", + join("\0", $$ar, $known, $running, $num, $den)); +} ## end sub CMD_get_status sub CMD_reset_abilities { my Gearman::Server::Client $self = shift; $self->{can_do} = {}; $self->_setup_can_do_list; -} +} ## end sub CMD_reset_abilities sub _setup_can_do_list { my Gearman::Server::Client $self = shift; - $self->{can_do_list} = [ keys %{$self->{can_do}} ]; + $self->{can_do_list} = [keys %{ $self->{can_do} }]; $self->{can_do_iter} = 0; } -sub CMD_submit_job { push @_, 1; &_cmd_submit_job; } -sub CMD_submit_job_bg { push @_, 0; &_cmd_submit_job; } -sub CMD_submit_job_high { push @_, 1, 1; &_cmd_submit_job; } +sub CMD_submit_job { push @_, 1; &_cmd_submit_job; } +sub CMD_submit_job_bg { push @_, 0; &_cmd_submit_job; } +sub CMD_submit_job_high { push @_, 1, 1; &_cmd_submit_job; } sub _cmd_submit_job { my Gearman::Server::Client $self = shift; - my $ar = shift; - my $subscribe = shift; - my $high_pri = shift; + my $ar = shift; + my $subscribe = shift; + my $high_pri = shift; return $self->error_packet("invalid_args", "No func/uniq header [$$ar].") unless $$ar =~ s/^(.+?)\0(.*?)\0//; my ($func, $uniq) = ($1, $2); - my $job = Gearman::Server::Job->new($self->{server}, $func, $uniq, $ar, $high_pri); + my $job = Gearman::Server::Job->new($self->{server}, $func, $uniq, $ar, + $high_pri); if ($subscribe) { $job->add_listener($self); - } else { + } + else { # background mode $job->require_listener(0); } $self->res_packet("job_created", $job->handle); $self->{server}->wake_up_sleepers($func); -} +} ## end sub _cmd_submit_job sub res_packet { my Gearman::Server::Client $self = shift; my ($code, $arg) = @_; $self->write(Gearman::Util::pack_res_command($code, $arg)); return 1; -} +} ## end sub res_packet sub error_packet { my Gearman::Server::Client $self = shift; my ($code, $msg) = @_; $self->write(Gearman::Util::pack_res_command("error", "$code\0$msg")); return 0; -} +} ## end sub error_packet sub process_cmd { my Gearman::Server::Client $self = shift; - my $cmd = shift; - my $blob = shift; + my $cmd = shift; + my $blob = shift; my $cmd_name = "CMD_" . Gearman::Util::cmd_name($cmd); - my $ret = eval { - $self->$cmd_name(\$blob); - }; + my $ret = eval { $self->$cmd_name(\$blob); }; return $ret unless $@; warn "Error: $@\n"; return $self->error_packet("server_error", $@); -} +} ## end sub process_cmd sub event_err { my $self = shift; $self->close; } sub event_hup { my $self = shift; $self->close; } @@ -529,11 +587,15 @@ sub event_hup { my $self = shift; $self->close; } =head1 Line based commands -These commands are used for administrative or statistic tasks to be done on the gearman server. They can be entered using a line based client (telnet, etc.) by connecting to the listening port (7003) and are also intended to be machine parsable. +These commands are used for administrative or statistic tasks to be done on the +gearman server. They can be entered using a line based client (telnet, etc.) by +connecting to the listening port (4730) and are also intended to be machine +parsable. =head2 "workers" -Emits list of registered workers, their fds, IPs, client ids, and list of registered abilities (function names they can do). Of format: +Emits list of registered workers, their fds, IPs, client ids, and list of +registered abilities (function names they can do). Of format: fd ip.x.y.z client_id : func_a func_b func_c fd ip.x.y.z client_id : func_a func_b func_c @@ -549,15 +611,19 @@ sub TXTCMD_workers { foreach my $cl (sort { $a->{fd} <=> $b->{fd} } $self->{server}->clients) { my $fd = $cl->{fd}; - $self->write("$fd " . $cl->peer_ip_string . " $cl->{client_id} : @{$cl->{can_do_list}}\n"); + $self->write("$fd " + . $cl->peer_ip_string + . " $cl->{client_id} : @{$cl->{can_do_list}}\n"); - } + } ## end foreach my $cl (sort { $a->...}) $self->write(".\n"); -} +} ## end sub TXTCMD_workers =head2 "status" -The output format of this function is tab separated columns as follows, followed by a line consisting of a fullstop and a newline (".\n") to indicate the end of output. +The output format of this function is tab separated columns as follows, +followed by a line consisting of a fullstop and a newline (".\n") to indicate +the end of output. =over @@ -567,7 +633,8 @@ A string denoting the name of the function of the job =item Number in queue -A positive integer indicating the total number of jobs for this function in the queue. This includes currently running ones as well (next column) +A positive integer indicating the total number of jobs for this function in the +queue. This includes currently running ones as well (next column) =item Number of jobs running @@ -575,7 +642,9 @@ A positive integer showing how many jobs of this function are currently running =item Number of capable workers -A positive integer denoting the maximum possible count of workers that could be doing this job. Though they may not all be working on it due to other tasks holding them busy. +A positive integer denoting the maximum possible count of workers that could be +doing this job. Though they may not all be working on it due to other tasks +holding them busy. =back @@ -584,16 +653,16 @@ A positive integer denoting the maximum possible count of workers that could be sub TXTCMD_status { my Gearman::Server::Client $self = shift; - my %funcs; # func -> 1 (set of all funcs to display) + my %funcs; # func -> 1 (set of all funcs to display) # keep track of how many workers can do which functions my %can; foreach my $client ($self->{server}->clients) { - foreach my $func (@{$client->{can_do_list}}) { + foreach my $func (@{ $client->{can_do_list} }) { $can{$func}++; $funcs{$func} = 1; } - } + } ## end foreach my $client ($self->...) my %queued_funcs; my %running_funcs; @@ -604,7 +673,7 @@ sub TXTCMD_status { if ($job->worker) { $running_funcs{$func}++; } - } + } ## end foreach my $job ($self->{server...}) # also include queued functions (even if there aren't workers) # in our list of funcs to show. @@ -614,11 +683,11 @@ sub TXTCMD_status { my $queued = $queued_funcs{$func} || 0; my $running = $running_funcs{$func} || 0; my $can = $can{$func} || 0; - $self->write( "$func\t$queued\t$running\t$can\n" ); - } + $self->write("$func\t$queued\t$running\t$can\n"); + } ## end foreach my $func (sort keys...) - $self->write( ".\n" ); -} + $self->write(".\n"); +} ## end sub TXTCMD_status =head2 "jobs" @@ -639,8 +708,8 @@ sub TXTCMD_jobs { my Gearman::Server::Client $self = shift; foreach my $job ($self->{server}->jobs) { - my $func = $job->func; - my $uniq = $job->uniq; + my $func = $job->func; + my $uniq = $job->uniq; my $worker_addr = "-"; if (my $worker = $job->worker) { @@ -650,10 +719,10 @@ sub TXTCMD_jobs { my $listeners = $job->listeners; $self->write("$func\t$uniq\t$worker_addr\t$listeners\n"); - } + } ## end foreach my $job ($self->{server...}) $self->write(".\n"); -} +} ## end sub TXTCMD_jobs =head2 "clients" @@ -690,7 +759,7 @@ sub TXTCMD_clients { my $ent = $jobs_by_client{$client} ||= []; push @$ent, $job; } - } + } ## end foreach my $job ($self->{server...}) foreach my $client ($self->{server}->clients) { my $client_addr = $client->peer_addr_string; @@ -698,20 +767,20 @@ sub TXTCMD_clients { my $jobs = $jobs_by_client{$client} || []; foreach my $job (@$jobs) { - my $func = $job->func; - my $uniq = $job->uniq; + my $func = $job->func; + my $uniq = $job->uniq; my $worker_addr = "-"; if (my $worker = $job->worker) { $worker_addr = $worker->peer_addr_string; } $self->write("\t$func\t$uniq\t$worker_addr\n"); - } + } ## end foreach my $job (@$jobs) - } + } ## end foreach my $client ($self->...) $self->write(".\n"); -} +} ## end sub TXTCMD_clients sub TXTCMD_gladiator { my Gearman::Server::Client $self = shift; @@ -721,28 +790,31 @@ sub TXTCMD_gladiator { my $all = Devel::Gladiator::walk_arena(); my %ct; foreach my $it (@$all) { - $ct{ref $it}++; + $ct{ ref $it }++; if (ref $it eq "CODE") { my $name = Devel::Peek::CvGV($it); $ct{$name}++ if $name =~ /ANON/; } - } - $all = undef; # required to free memory + } ## end foreach my $it (@$all) + $all = undef; # required to free memory foreach my $n (sort { $ct{$a} <=> $ct{$b} } keys %ct) { next unless $ct{$n} > 1 || $args eq "all"; $self->write(sprintf("%7d $n\n", $ct{$n})); } - } + } ## end if ($has_gladiator) $self->write(".\n"); -} +} ## end sub TXTCMD_gladiator =head2 "maxqueue" function [max_queue_size] -For a given function of job, the maximum queue size is adjusted to be max_queue_size jobs long. A negative value indicates unlimited queue size. +For a given function of job, the maximum queue size is adjusted to be +max_queue_size jobs long. A negative value indicates unlimited queue size. -If the max_queue_size value is not supplied then it is unset (and the default maximum queue size will apply to this function). +If the max_queue_size value is not supplied then it is unset (and the default +maximum queue size will apply to this function). -This function will return OK upon success, and will return ERR incomplete_args upon an invalid number of arguments. +This function will return OK upon success, and will return ERR incomplete_args +upon an invalid number of arguments. =cut @@ -757,27 +829,7 @@ sub TXTCMD_maxqueue { $self->{server}->set_max_queue($func, $max); $self->write("OK\n"); -} - -=head2 "shutdown" ["graceful"] - -Close the server. Or "shutdown graceful" to close the listening socket, then close the server when traffic has died away. - -=cut - -sub TXTCMD_shutdown { - my Gearman::Server::Client $self = shift; - my $args = shift; - if ($args eq "graceful") { - $self->write("OK\n"); - Gearmand::shutdown_graceful(); - } elsif (! $args) { - $self->write("OK\n"); - exit 0; - } else { - $self->err_line('unknown_args'); - } -} +} ## end sub TXTCMD_maxqueue =head2 "version" @@ -792,22 +844,30 @@ sub TXTCMD_version { sub err_line { my Gearman::Server::Client $self = shift; - my $err_code = shift; - my $err_text = { - 'unknown_command' => "Unknown server command", - 'unknown_args' => "Unknown arguments to server command", - 'incomplete_args' => "An incomplete set of arguments was sent to this command", - }->{$err_code}; - - $self->write("ERR $err_code " . eurl($err_text) . "\r\n"); + my $err_code = shift; + my %err_text = ( + + # numeric iterator for where we start looking for jobl + unknown_command => "Unknown server command", + unknown_args => "Unknown arguments to server command", + incomplete_args => + "An incomplete set of arguments was sent to this command", + ); + + $self->write( + join '', + "ERR $err_code ", + eurl($err_text{$err_code}) || '', "\r\n" + ); return 0; -} +} ## end sub err_line sub eurl { my $a = $_[0]; + $a || return; $a =~ s/([^a-zA-Z0-9_\,\-.\/\\\: ])/uc sprintf("%%%02x",ord($1))/eg; $a =~ tr/ /+/; return $a; -} +} ## end sub eurl 1; diff --git a/lib/Gearman/Server/Job.pm b/lib/Gearman/Server/Job.pm index 27124cb..eb2f59e 100644 --- a/lib/Gearman/Server/Job.pm +++ b/lib/Gearman/Server/Job.pm @@ -1,19 +1,41 @@ package Gearman::Server::Job; +use version (); +$Gearman::Server::Job::VERSION = version->declare("1.140_001"); + use strict; +use warnings; + +=head1 NAME + +Gearman::Server::Job - job representation of L + +=head1 DESCRIPTION + +=head1 METHODS + +=cut + +use Gearman::Server::Client; use Scalar::Util; use Sys::Hostname; use fields ( - 'func', - 'uniq', - 'argref', - 'listeners', # arrayref of interested Clients - 'worker', - 'handle', - 'status', # [1, 100] - 'require_listener', - 'server', # Gearman::Server that owns us - ); + 'func', + 'uniq', + 'argref', + + # arrayref of interested Clients + 'listeners', + 'worker', + 'handle', + + # [1, 100] + 'status', + 'require_listener', + + # Gearman::Server that owns us + 'server', +); sub new { my Gearman::Server::Job $self = shift; @@ -24,80 +46,111 @@ sub new { # if they specified a uniq, see if we have a dup job running already # to merge with if (length($uniq)) { + # a unique value of "-" means "use my args as my unique key" $uniq = $$argref if $uniq eq "-"; if (my $job = $server->job_of_unique($func, $uniq)) { + # found a match return $job; } + # create a new key $server->set_unique_job($func, $uniq => $self); - } + } ## end if (length($uniq)) - $self->{'server'} = $server; - $self->{'func'} = $func; - $self->{'uniq'} = $uniq; - $self->{'argref'} = $argref; + $self->{'server'} = $server; + $self->{'func'} = $func; + $self->{'uniq'} = $uniq; + $self->{'argref'} = $argref; $self->{'require_listener'} = 1; - $self->{'listeners'} = []; - $self->{'handle'} = $server->new_job_handle; + $self->{'listeners'} = []; + $self->{'handle'} = $server->new_job_handle; $server->enqueue_job($self, $highpri); return $self; -} +} ## end sub new + +=head2 add_listener($client) + +=cut sub add_listener { - my Gearman::Server::Job $self = shift; + my Gearman::Server::Job $self = shift; my Gearman::Server::Client $li = shift; - push @{$self->{listeners}}, $li; + push @{ $self->{listeners} }, $li; Scalar::Util::weaken($self->{listeners}->[-1]); -} +} ## end sub add_listener + +=head2 relay_to_listeners($msg) + +=cut sub relay_to_listeners { my Gearman::Server::Job $self = shift; - foreach my Gearman::Server::Client $c (@{$self->{listeners}}) { + foreach my Gearman::Server::Client $c (@{ $self->{listeners} }) { next if !$c || $c->{closed}; $c->write($_[0]); } -} +} ## end sub relay_to_listeners + +=head2 relay_to_option_listeners($msg, [$option]) + +=cut sub relay_to_option_listeners { my Gearman::Server::Job $self = shift; my $option = $_[1]; - foreach my Gearman::Server::Client $c (@{$self->{listeners}}) { + foreach my Gearman::Server::Client $c (@{ $self->{listeners} }) { next if !$c || $c->{closed}; next unless $c->option($option); $c->write($_[0]); } -} +} ## end sub relay_to_option_listeners + +=head2 clear_listeners() + +=cut sub clear_listeners { my Gearman::Server::Job $self = shift; $self->{listeners} = []; } +=head2 listeners() + +=cut + sub listeners { my Gearman::Server::Job $self = shift; - return @{$self->{listeners}}; + return @{ $self->{listeners} }; } +=head2 uniq() + +=cut + sub uniq { my Gearman::Server::Job $self = shift; return $self->{uniq}; } +=head2 note_finished($success) + +=cut + sub note_finished { my Gearman::Server::Job $self = shift; my $success = shift; $self->{server}->note_job_finished($self); +} ## end sub note_finished - if ($Gearmand::graceful_shutdown) { - Gearmand::shutdown_if_calm(); - } -} +=head2 worker() + +=cut # accessors: sub worker { @@ -105,12 +158,21 @@ sub worker { return $self->{'worker'} unless @_; return $self->{'worker'} = shift; } + +=head2 require_listener([$require]) + +=cut + sub require_listener { my Gearman::Server::Job $self = shift; return $self->{'require_listener'} unless @_; return $self->{'require_listener'} = shift; } +=head2 status([numerator,denominator]) + +=cut + # takes arrayref of [numerator,denominator] sub status { my Gearman::Server::Job $self = shift; @@ -118,16 +180,28 @@ sub status { return $self->{'status'} = shift; } +=head2 handle() + +=cut + sub handle { my Gearman::Server::Job $self = shift; return $self->{'handle'}; } +=head2 func() + +=cut + sub func { my Gearman::Server::Job $self = shift; return $self->{'func'}; } +=head2 argref() + +=cut + sub argref { my Gearman::Server::Job $self = shift; return $self->{'argref'}; diff --git a/lib/Gearman/Server/Listener.pm b/lib/Gearman/Server/Listener.pm index d1fa59b..076d553 100644 --- a/lib/Gearman/Server/Listener.pm +++ b/lib/Gearman/Server/Listener.pm @@ -1,16 +1,41 @@ package Gearman::Server::Listener; +use version (); +$Gearman::Server::Listener::VERSION = version->declare("1.140_001"); use strict; +use warnings; + +=head1 NAME + +Gearman::Server::Listener - a listener for L + +=head1 DESCRIPTION + +Based on L + +=cut + use base 'Danga::Socket'; -use fields qw(server accept_per_loop); +use fields qw/ + server + accept_per_loop + /; use Errno qw(EAGAIN); -use Socket qw(IPPROTO_TCP TCP_NODELAY SOL_SOCKET SO_ERROR); +use Socket qw/ + IPPROTO_TCP + TCP_NODELAY + SOL_SOCKET + SO_ERROR + /; + +=head1 METHODS +=cut sub new { my Gearman::Server::Listener $self = shift; - my $sock = shift; - my $server = shift; + my $sock = shift; + my $server = shift; my %opts = @_; @@ -19,7 +44,8 @@ sub new { warn "Extra options passed into new: " . join(', ', keys %opts) . "\n" if keys %opts; - $accept_per_loop = 10 unless defined $accept_per_loop and $accept_per_loop >= 1; + $accept_per_loop = 10 + unless defined $accept_per_loop and $accept_per_loop >= 1; $self = fields::new($self) unless ref $self; @@ -28,14 +54,19 @@ sub new { $self->SUPER::new($sock); - $self->{server} = $server; + $self->{server} = $server; $self->{accept_per_loop} = int($accept_per_loop); $self->watch_read(1); return $self; -} +} ## end sub new + +=head2 event_read() + +wait for connection +=cut sub event_read { my Gearman::Server::Listener $self = shift; @@ -52,10 +83,11 @@ sub event_read { my $server = $self->{server}; - $server->debug(sprintf("Listen child making a Client for %d.", fileno($csock))); + $server->debug( + sprintf("Listen child making a Client for %d.", fileno($csock))); $server->new_client($csock); return unless $remaining-- > 0; - } + } ## end while (my $csock = $listen_sock...) return if $! == EAGAIN; @@ -63,9 +95,12 @@ sub event_read { $self->watch_read(0); - Danga::Socket->AddTimer( .1, sub { - $self->watch_read(1); - }); -} + Danga::Socket->AddTimer( + .1, + sub { + $self->watch_read(1); + } + ); +} ## end sub event_read 1; diff --git a/t/00-use.t b/t/00-use.t new file mode 100644 index 0000000..8a92511 --- /dev/null +++ b/t/00-use.t @@ -0,0 +1,26 @@ +use strict; +use warnings; + +use version (); +use Test::More; +use Test::Script; + +my @mn = qw/ + Gearman::Server + Gearman::Server::Client + Gearman::Server::Listener + Gearman::Server::Job + /; + +my $v = version->declare("1.140_001"); + +foreach my $n (@mn) { + use_ok($n); + my $_v = eval '$' . $n . '::VERSION'; + is($_v, $v, "$n version is $v"); +} + +script_compiles_ok("bin/gearmand"); + +done_testing; + diff --git a/t/01-gearman-server.t b/t/01-gearman-server.t new file mode 100644 index 0000000..8dec9bc --- /dev/null +++ b/t/01-gearman-server.t @@ -0,0 +1,134 @@ +use strict; +use warnings; + +use IO::Socket::INET; +use Socket qw/ + IPPROTO_TCP + SOCK_STREAM + /; + +use Net::EmptyPort qw/ empty_port /; +use Sys::Hostname (); +use Test::Exception; +use Test::More; + +my $mn = qw/ + Gearman::Server + /; + +use_ok($mn); + +can_ok $mn, qw/ + create_listening_sock + new_client + note_disconnected_client + clients + to_inprocess_server + start_worker + enqueue_job + wake_up_sleepers + _wake_up_some + on_client_sleep + jobs_outstanding + jobs + job_by_handle + note_job_finished + set_max_queue + new_job_handle + job_of_unique + set_unique_job + grab_job + /; + +subtest "new", sub { + my $gs = new_ok($mn); + + my @khr = qw/ + client_map + sleepers + sleepers_list + job_queue + job_of_handle + max_queue + job_of_uniq + wakeup_timers + /; + + foreach (@khr) { + is(ref($gs->{$_}), "HASH", join "->", $mn, "{$_} is hash ref") + && is(keys(%{ $gs->{$_} }), 0, join "->", $mn, "{$_} empty"); + } + + is(ref($gs->{listeners}), + "ARRAY", join "->", $mn, "{listeners} is array ref"); + + # && is(@{$gs->{listeners}}, 0, join "->", $mn, "{listeners} empty") + + is($gs->{wakeup}, 3, "wakeup 3"); + is($gs->{wakeup_delay}, .1, "wakeup_delay .1"); + is($gs->{handle_ct}, 0, "handle_ct"); + is($gs->{handle_base}, "H:" . Sys::Hostname::hostname() . ":", + "handle_base"); + + $gs = new_ok($mn, [wakeup => -1, wakeup_delay => -1]); + is($gs->{wakeup}, -1, "wakeup -1"); + is($gs->{wakeup}, -1, "wakeup_delay -1"); + + for (qw/wakeup wakeup_delay/) { + throws_ok { $mn->new($_ => -2) } qr/Invalid value passed in $_ option/, + "Invalid value passed in $_ option"; + } + + throws_ok { $mn->new(foo => 1) } qr/Unknown options/, "Unknown options"; +}; + +subtest "create listening sock/new client", sub { + my $gs = new_ok($mn); + my ($la, $port, $accept) = ("127.0.0.1", empty_port()); + ok( + my $sock = $gs->create_listening_sock( + $port, + accept_per_loop => $accept, + local_addr => $la + ) + ); + isa_ok($sock, "IO::Socket::INET"); +}; + +subtest "client", sub { + my $port = empty_port(); + $port || plan skip_all => "couldn't find free port"; + + my $sock = new_ok( + "IO::Socket::INET", + [ + LocalPort => $port, + Type => SOCK_STREAM, + Proto => IPPROTO_TCP, + Blocking => 0, + Reuse => 1, + Listen => 1024, + ] + ); + + my $gs = new_ok($mn); + ok(my $nc = $gs->new_client($sock), "new_client"); + isa_ok($nc, "Gearman::Server::Client"); + ok(my @cl = $gs->clients, "clients"); + is(@cl, 1, "clients count"); + is($cl[0], $nc, "same client"); + ok($gs->note_disconnected_client($nc), "note_disocnnected_client"); +}; + +subtest "maxqueue", sub { + my $gs = new_ok($mn); + my ($f, $c) = ("foo", int(rand(5) + 1)); + ok($gs->set_max_queue($f, $c), "set_max_queue($f, $c)"); + is($gs->{max_queue}{$f}, $c, "max_queue $f = $c"); + $c = 0; + ok($gs->set_max_queue($f, $c), "set_max_queue($f, $c)"); + is($gs->{max_queue}{$f}, undef, "max_queue $f = $c"); +}; + +done_testing; + diff --git a/t/02-gearman-server-client.t b/t/02-gearman-server-client.t new file mode 100644 index 0000000..8703a03 --- /dev/null +++ b/t/02-gearman-server-client.t @@ -0,0 +1,95 @@ +use strict; +use warnings; + +use IO::Socket::INET; +use Net::EmptyPort qw/ empty_port /; +use Socket qw/ + IPPROTO_TCP + SOCK_STREAM + /; +use Test::More; + +my $mn = "Gearman::Server::Client"; +use_ok("Gearman::Server"); +use_ok($mn); +isa_ok($mn, "Danga::Socket"); + +can_ok( + $mn, qw/ + CMD_can_do + CMD_can_do_timeout + CMD_cant_do + CMD_echo_req + CMD_get_status + CMD_grab_job + CMD_option_req + CMD_pre_sleep + CMD_reset_abilities + CMD_set_client_id + CMD_submit_job + CMD_submit_job_bg + CMD_submit_job_high + CMD_work_complete + CMD_work_exception + CMD_work_fail + CMD_work_status + TXTCMD_clients + TXTCMD_gladiator + TXTCMD_jobs + TXTCMD_maxqueue + TXTCMD_status + TXTCMD_version + TXTCMD_workers + _cmd_submit_job + _setup_can_do_list + close + error_packet + eurl + event_err + event_hup + event_read + event_write + option + process_line + process_cmd + res_packet + / +); +my ($gs, $gc) = (new_ok("Gearman::Server")); + +subtest "new", sub { + my $port = empty_port(); + my $sock = new_ok( + "IO::Socket::INET", + [ + LocalPort => $port, + Type => SOCK_STREAM, + Proto => IPPROTO_TCP, + Blocking => 0, + Reuse => 1, + Listen => 1024, + ] + ); + $gc = new_ok($mn, [$sock, $gs]); + + foreach (qw/fast_buffer can_do_list/) { + isa_ok($gc->{$_}, "ARRAY", $_) && is(@{ $gc->{$_} }, 0, "$_ empty"); + } + + foreach (qw/can_do doing options/) { + isa_ok($gc->{$_}, "HASH", $_) + && is(keys(%{ $gc->{$_} }), 0, "$_ empty"); + } + + foreach (qw/sleeping can_do_iter jobs_done_since_sleep/) { + is($gc->{$_}, 0, "$_ = 0"); + } + + is($gc->{fast_read}, undef, "fast_read"); + is($gc->{read_buf}, '', "read_buf"); + is($gc->{client_id}, '-', "client_id"); + is($gc->{server}, $gs, "server"); +}; + +done_testing; + diff --git a/t/03-maxqueue.t b/t/03-maxqueue.t new file mode 100644 index 0000000..a6cae4b --- /dev/null +++ b/t/03-maxqueue.t @@ -0,0 +1,40 @@ +use strict; +use warnings; + +use File::Spec; +use FindBin (); +use IO::Socket::INET; +use Net::EmptyPort (); +use Test::More; +use Test::TCP; + +my $host = "127.0.0.1"; +Net::EmptyPort::can_bind($host) || plan skip_all => "can not bind to $host"; + +my $dir = File::Spec->catdir($FindBin::Bin, File::Spec->updir()); +my $bin = File::Spec->catdir($dir, "bin", "gearmand"); +-e $bin || plan skip_all => "no gearmand"; + +my $gs = Test::TCP->new( + host => $host, + code => sub { + my ($port) = @_; + exec $^X, join('', "-I", File::Spec->catdir($dir, "lib")), $bin, + join('=', "--port", $port); + } +); + +my ($func, $count) = ("doit", int(rand(3) + 1)); +my $peer_addr = join(':', $host, $gs->port); + +subtest "set maxqueue", sub { + my $sock + = new_ok("IO::Socket::INET", [PeerAddr => $peer_addr, Timeout => 2]); + my $k = "MAXQUEUE"; + my $cmd = join(' ', $k, $func, $count); + ok($sock->write($cmd . $/), "write($cmd)"); + ok(my $r = $sock->getline(), "getline"); + ok($r =~ m/^OK\b/i, "match OK"); +}; + +done_testing();