diff --git a/lib/logstash/outputs/influxdb.rb b/lib/logstash/outputs/influxdb.rb index cc170f1..b4390f9 100644 --- a/lib/logstash/outputs/influxdb.rb +++ b/lib/logstash/outputs/influxdb.rb @@ -39,6 +39,12 @@ class LogStash::Outputs::InfluxDB < LogStash::Outputs::Base # Measurement name - supports sprintf formatting config :measurement, :validate => :string, :default => "logstash" + + # Allow measurement from field - allow override of measurement name + config :allow_measurement_override, :validate => :boolean, :default => false + + # Measurement from field - use measurement from field name + config :measurement_from_field, :validate => :string, :default => "measurement" # Hash of key/value pairs representing data points to send to the named database # Example: `{'column1' => 'value1', 'column2' => 'value2'}` @@ -90,6 +96,17 @@ class LogStash::Outputs::InfluxDB < LogStash::Outputs::Base # request should be considered metadata and given as tags. config :send_as_tags, :validate => :array, :default => ["host"] + # An array containing the names of fields to send to Influxdb as fields instead + # of tags. Influxdb 0.9 convention is that values that do not change every + # request should be considered metadata and given as tags. Enable with the + # prefer_tags option. + config :send_as_fields, :validate => :array, :default => ["value"] + + # This setting sends fields as tags which may then be overridden per field + # using the send_as_fields setting. Influx 0.9 preference to tags indicates + # that more fields will be sent as tags than as fields. + config :prefer_tags, :validate => :boolean, :default => false + # This setting controls how many events will be buffered before sending a batch # of events. Note that these are only batched for the same measurement config :flush_size, :validate => :number, :default => 100 @@ -149,6 +166,14 @@ def receive(event) time = point.delete("time") end end + + if @allow_measurement_override + unless point.has_key?(@measurement_from_field) + logger.error("Cannot override measurement, field specified does not exist. Using default.") + else + @measurement = point.delete(@measurement_from_field) + end + end exclude_fields!(point) coerce_values!(point) @@ -286,13 +311,20 @@ def exclude_fields!(event_data) # Returns a tuple containing a hash of tags (as configured by send_as_tags) # and a hash of fields that exclude the tags. If fields contains a key # "tags" with an array, they will be moved to the tags hash (and each will be - # given a value of true) + # given a value of true). If prefer_tags is true, will instead move all values + # in fields to tags array except for values specificed in the send_as_fields + # array. # # Example: # # Given send_as_tags: ["bar"] # original_fields = {"foo" => 1, "bar" => 2, "tags" => ["tag"]} # tags, fields = extract_tags(original_fields) # # tags: {"bar" => 2, "tag" => "true"} and fields: {"foo" => 1} + # Example: + # # Given send_as_fields: ["bar"] and prefer_tags = true + # original_fields = {"foo" => 1, "bar" => 2, "foobar" => 3} + # tags, fields = extract_tags(original_fields) + # # tags: {"foo" => 1, "foobar" => 3} and fields: {"bar" => 2} def extract_tags(fields) remainder = fields.dup @@ -301,8 +333,12 @@ def extract_tags(fields) else {} end - - @send_as_tags.each { |key| (tags[key] = remainder.delete(key)) if remainder.has_key?(key) } + + if @prefer_tags + remainder.each_pair { |key,value| (tags[key] = remainder.delete(key)) unless @send_as_fields.include?(key) } + else + @send_as_tags.each { |key| (tags[key] = remainder.delete(key)) if remainder.has_key?(key) } + end tags.delete_if { |key,value| value.nil? || value == "" } remainder.delete_if { |key,value| value.nil? || value == "" } diff --git a/spec/outputs/influxdb_spec.rb b/spec/outputs/influxdb_spec.rb index 31d2c1b..86e1af7 100644 --- a/spec/outputs/influxdb_spec.rb +++ b/spec/outputs/influxdb_spec.rb @@ -115,6 +115,79 @@ end end + context "sending some fields as Influxdb fields" do + let(:config) do <<-CONFIG + input { + generator { + message => "foo=1 bar=2 baz=3 time=4" + count => 1 + type => "generator" + } + } + + filter { + kv { } + } + + output { + influxdb { + host => "localhost" + measurement => "my_series" + allow_time_override => true + prefer_tags => true + use_event_fields_for_data_points => true + exclude_fields => ["@version", "@timestamp", "sequence", "message", "type", "host"] + send_as_fields => ["foo"] + } + } + CONFIG + end + + let(:expected_url) { 'http://localhost:8086/write?db=statistics&rp=default&precision=ms&u=&p='} + let(:expected_body) { 'my_series,bar=2,baz=3 foo="1" 4' } + + it "should send the specified fields as fields" do + expect_any_instance_of(Manticore::Client).to receive(:post!).with(expected_url, body: expected_body) + pipeline.run + end + end + + context "sending field as Influxdb measurement" do + let(:config) do <<-CONFIG + input { + generator { + message => "foo=1 bar=2 baz=3 time=4" + count => 1 + type => "generator" + } + } + + filter { + kv { } + } + + output { + influxdb { + host => "localhost" + allow_time_override => true + use_event_fields_for_data_points => true + exclude_fields => ["@version", "@timestamp", "sequence", "message", "type", "host"] + allow_measurement_override => true + measurement_from_field => "bar" + } + } + CONFIG + end + + let(:expected_url) { 'http://localhost:8086/write?db=statistics&rp=default&precision=ms&u=&p='} + let(:expected_body) { 'bar baz="3",foo="1" 4' } + + it "should send the specified fields as fields" do + expect_any_instance_of(Manticore::Client).to receive(:post!).with(expected_url, body: expected_body) + pipeline.run + end + end + context "when fields data contains a list of tags" do let(:config) do <<-CONFIG input {