diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..68f1257 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +*.gem +.bundle +Gemfile.lock +pkg/* +*.swp diff --git a/Gemfile b/Gemfile new file mode 100644 index 0000000..5a67c38 --- /dev/null +++ b/Gemfile @@ -0,0 +1,4 @@ +source 'https://rubygems.org' + +# Specify your gem's dependencies in fluent-plugin-azuresearch.gemspec +gemspec diff --git a/README.md b/README.md new file mode 100644 index 0000000..d0a9128 --- /dev/null +++ b/README.md @@ -0,0 +1,214 @@ +# Azure Search output plugin for Fluentd + +fluent-plugin-azuresearch is a fluent plugin to output to Azure Search + +## Installation + + $ gem install fluent-plugin-azuresearch + +## Configuration + +### Azure Search + +To use Microsoft Azure Search, you must create an Azure Search service in the Azure Portal. Also you must have an index, persisted storage of documents to which fluent-plugin-azuresearch writes event stream out. Here are instructions: + + * [Create a service](https://azure.microsoft.com/en-us/documentation/articles/search-create-service-portal/) + * [Create an index](https://azure.microsoft.com/en-us/documentation/articles/search-what-is-an-index/) + + +### Fluentd - fluent.conf + + + type azuresearch + endpoint https://AZURE_SEARCH_ACCOUNT.search.windows.net + api_key AZURE_SEARCH_API_KEY + search_index messages + column_names id,user_name,message,tag,created_at + key_names postid,user,content,tag,posttime + + + * **endpoint (required)** - Azure Search service endpoint URI + * **api\_key (required)** - Azure Search API key + * **search\_index (required)** - Azure Search Index name to insert records + * **column\_names (required)** - Column names in a target Azure search index. Each column needs to be separated by a comma. + * **key\_names (optional)** - Default:nil. Key names in incomming record to insert. Each key needs to be separated by a comma. ${time} is placeholder for Time.at(time).strftime("%Y-%m-%dT%H:%M:%SZ"), and ${tag} is placeholder for tag. By default, **key\_names** is as same as **column\_names** + +## Sample Configurations +### Case1 - column_names is as same as key_names + +Suppose you have the following fluent.conf and azure search index schema: + +fluent.conf + + + type azuresearch + endpoint https://yoichidemo.search.windows.net + api_key 2XX3D2456052A9AD21E54CB03C3ABF6A(dummy) + search_index messages + column_names id,user_name,message,created_at + + +Azure Search Schema: messages + + { + "name": "messages", + "fields": [ + { "name":"id", "type":"Edm.String", "key": true, "searchable": false }, + { "name":"user_name", "type":"Edm.String" }, + { "name":"message", "type":"Edm.String", "filterable":false, "sortable":false, "facetable":false, "analyzer":"en.lucene" }, + { "name":"created_at", "type":"Edm.DateTimeOffset", "facetable":false} + ] + } + +The plugin will write event stream out to Azure Ssearch like this: + +Input event stream + + { "id": "1", "user_name": "taylorswift13", "message":"post by taylorswift13", "created_at":"2016-01-29T00:00:00Z" }, + { "id": "2", "user_name": "katyperry", "message":"post by katyperry", "created_at":"2016-01-30T00:00:00Z" }, + { "id": "3", "user_name": "ladygaga", "message":"post by ladygaga", "created_at":"2016-01-31T00:00:00Z" } + + +Search results + + "value": [ + { "@search.score": 1, "id": "1", "user_name": "taylorswift13", "message": "post by taylorswift13", "created_at": "2016-01-29T00:00:00Z" }, + { "@search.score": 1, "id": "2", "user_name": "katyperry", "message": "post by katyperry", "created_at": "2016-01-30T00:00:00Z" }, + { "@search.score": 1, "id": "3", "user_name": "ladygaga", "message": "post by ladygaga", "created_at": "2016-01-31T00:00:00Z" } + ] + + +### Case2 - column_names is NOT as same as key_names + +Suppose you have the following fluent.conf and azure search index schema: + +fluent.conf + + + type azuresearch + endpoint https://yoichidemo.search.windows.net + api_key 2XX3D2456052A9AD21E54CB03C3ABF6A(dummy) + search_index messages + column_names id,user_name,message,created_at + key_names postid,user,content,posttime + + +Azure Search Schema: messages + + { + "name": "messages", + "fields": [ + { "name":"id", "type":"Edm.String", "key": true, "searchable": false }, + { "name":"user_name", "type":"Edm.String" }, + { "name":"message", "type":"Edm.String", "filterable":false, "sortable":false, "facetable":false, "analyzer":"en.lucene" }, + { "name":"created_at", "type":"Edm.DateTimeOffset", "facetable":false} + ] + } + +The plugin will write event stream out to Azure Ssearch like this: + +Input event stream + + { "postid": "1", "user": "taylorswift13", "content":"post by taylorswift13", "posttime":"2016-01-29T00:00:00Z" }, + { "postid": "2", "user": "katyperry", "content":"post by katyperry", "posttime":"2016-01-30T00:00:00Z" }, + { "postid": "3", "user": "ladygaga", "content":"post by ladygaga", "posttime":"2016-01-31T00:00:00Z" } + + +Search results + + "value": [ + { "@search.score": 1, "id": "1", "user_name": "taylorswift13", "message": "post by taylorswift13", "created_at": "2016-01-29T00:00:00Z" }, + { "@search.score": 1, "id": "2", "user_name": "katyperry", "message": "post by katyperry", "created_at": "2016-01-30T00:00:00Z" }, + { "@search.score": 1, "id": "3", "user_name": "ladygaga", "message": "post by ladygaga", "created_at": "2016-01-31T00:00:00Z" } + ] + + +### Case3 - column_names is NOT as same as key_names, Plus, key_names includes ${time} and ${tag} + +fluent.conf + + + type azuresearch + endpoint https://yoichidemo.search.windows.net + api_key 2XX3D2456052A9AD21E54CB03C3ABF6A(dummy) + search_index messages + column_names id,user_name,message,tag,created_at + key_names postid,user,content,${tag},${time} + + +Azure Search Schema: messages + + { + "name": "messages", + "fields": [ + { "name":"id", "type":"Edm.String", "key": true, "searchable": false }, + { "name":"user_name", "type":"Edm.String" }, + { "name":"message", "type":"Edm.String", "filterable":false, "sortable":false, "facetable":false, "analyzer":"en.lucene" }, + { "name":"created_at", "type":"Edm.DateTimeOffset", "facetable":false} + ] + } + +The plugin will write event stream out to Azure Ssearch like this: + +Input event stream + + { "id": "1", "user_name": "taylorswift13", "message":"post by taylorswift13" }, + { "id": "2", "user_name": "katyperry", "message":"post by katyperry" }, + { "id": "3", "user_name": "ladygaga", "message":"post by ladygaga" } + +Search results + + "value": [ + { "@search.score": 1, "id": "1", "user_name": "taylorswift13", "message": "post by taylorswift13", "tag": "azuresearch.msg", "created_at": "2016-01-31T21:03:41Z" }, + { "@search.score": 1, "id": "2", "user_name": "katyperry", "message": "post by katyperry", "tag": "azuresearch.msg", "created_at": "2016-01-31T21:03:41Z" }, + { "@search.score": 1, "id": "3", "user_name": "ladygaga", "message": "post by ladygaga", "tag": "azuresearch.msg", "created_at": "2016-01-31T21:03:41Z" } + ] +[note] the value of created_at above is the time when fluentd actually recieves its corresponding input event. + + +## Tests +### Running test code + $ git clone https://github.com/yokawasa/fluent-plugin-azuresearch.git + $ cd fluent-plugin-azuresearch + + # edit CONFIG params of test/plugin/test_azuresearch.rb + $ vi test/plugin/test_azuresearch.rb + + # run test + $ rake test + +### Creating package, running and testing locally + $ rake build + $ rake install:local + + # running fluentd with your fluent.conf + $ fluentd -c fluent.conf -vv & + + # send test input event to test plugin using fluent-cat + $ echo ' { "postid": "100", "user": "ladygaga", "content":"post by ladygaga"}' | fluent-cat azuresearch.msg + +Please don't forget that you need forward input configuration to receive the message from fluent-cat + + + type forward + + + +## TODOs + * Input validation for Azure Search - check total size of columns to add + +## Contributing + +Bug reports and pull requests are welcome on GitHub at https://github.com/yokawasa/fluent-plugin-azuresearch. + +## Copyright + + + + + + + + +
CopyrightCopyright (c) 2016- Yoichi Kawasaki
LicenseApache License, Version 2.0
+ diff --git a/Rakefile b/Rakefile new file mode 100644 index 0000000..5d33ac6 --- /dev/null +++ b/Rakefile @@ -0,0 +1,13 @@ +#!/usr/bin/env rake + +require "bundler/gem_tasks" +require "rake/testtask" + +Rake::TestTask.new(:test) do |test| + test.libs << 'lib' << 'test' + test.pattern = 'test/**/test_*.rb' + test.verbose = true +end + +task :default => :test + diff --git a/VERSION b/VERSION new file mode 100644 index 0000000..6e8bf73 --- /dev/null +++ b/VERSION @@ -0,0 +1 @@ +0.1.0 diff --git a/fluent-plugin-azuresearch.gemspec b/fluent-plugin-azuresearch.gemspec new file mode 100644 index 0000000..336ad35 --- /dev/null +++ b/fluent-plugin-azuresearch.gemspec @@ -0,0 +1,27 @@ +# coding: utf-8 +lib = File.expand_path('../lib', __FILE__) +$LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib) + +Gem::Specification.new do |gem| + gem.name = "fluent-plugin-azuresearch" + gem.version = File.read("VERSION").strip + gem.authors = ["Yoichi Kawasaki"] + gem.email = ["yoichi.kawasaki@outlook.com"] + gem.summary = %q{Azure Search output plugin for Fluentd} + gem.description = gem.summary + gem.homepage = "http://github.com/yokawasa/fluent-plugin-azuresearch" + gem.license = "Apache-2.0" + gem.has_rdoc = false + + gem.files = `git ls-files`.split("\n") + gem.executables = gem.files.grep(%r{^bin/}) { |f| File.basename(f) } + gem.test_files = gem.files.grep(%r{^(test|gem|features)/}) + gem.require_paths = ["lib"] + + gem.add_development_dependency "bundler", "~> 1.11" + gem.add_development_dependency "rake", "~> 10.0" + gem.add_development_dependency "test-unit" + gem.add_development_dependency "rest-client" + gem.add_runtime_dependency "fluentd" +end + diff --git a/lib/fluent/plugin/azuresearch/client.rb b/lib/fluent/plugin/azuresearch/client.rb new file mode 100644 index 0000000..c250b95 --- /dev/null +++ b/lib/fluent/plugin/azuresearch/client.rb @@ -0,0 +1,38 @@ +module Fluent + module AzureSearch + class Client + + def initialize (api_url, api_key, api_version="2015-02-28") + require 'rest-client' + require 'json' + @api_url = api_url + @api_version = api_version + @headers = { + 'Content-Type' => "application/json; charset=UTF-8", + 'Api-Key' => api_key, + 'Accept' => "application/json", + 'Accept-Charset' => "UTF-8" + } + end + + def add_documents(index_name, documents, merge=true) + raise ConfigError, 'no index_name' if index_name.empty? + raise ConfigError, 'no documents' if documents.empty? + action = merge ? 'mergeOrUpload' : 'upload' + for document in documents + document['@search.action'] = action + end + req_body = { :value => documents }.to_json + # p "REQ_BODY= #{req_body}" + # p "URI= #{@api_url}/indexes/#{index_name}/docs/index?api-version=#{@api_version}" + res = RestClient.post( + "#{@api_url}/indexes/#{index_name}/docs/index?api-version=#{@api_version}", + req_body, + @headers) + res + end + + end + end +end + diff --git a/lib/fluent/plugin/out_azuresearch.rb b/lib/fluent/plugin/out_azuresearch.rb new file mode 100644 index 0000000..a95ddde --- /dev/null +++ b/lib/fluent/plugin/out_azuresearch.rb @@ -0,0 +1,88 @@ +# -*- coding: utf-8 -*- + +module Fluent + class AzureSearchOutput < BufferedOutput + Plugin.register_output('azuresearch', self) + + def initialize + super + require 'msgpack' + require 'time' + require 'fluent/plugin/azuresearch/client' + end + + config_param :endpoint, :string, + :desc => "Azure Search Endpoint URL" + config_param :api_key, :string, + :desc => "Azure Search API key" + config_param :search_index, :string, + :desc => "Azure Search Index name to insert records" + config_param :column_names, :string, + :desc => "Column names in a target Azure search index (comman separated)" + config_param :key_names, :string, default: nil, + :desc => <<-DESC +Key names in incomming record to insert (comman separated). +${time} is placeholder for Time.at(time).strftime("%Y-%m-%dT%H:%M:%SZ"), +and ${tag} is placeholder for tag +DESC + + def configure(conf) + super + raise ConfigError, 'no endpoint' if @endpoint.empty? + raise ConfigError, 'no api_key' if @api_key.empty? + raise ConfigError, 'no search_index' if @search_index.empty? + raise ConfigError, 'no column_names' if @column_names.empty? + + @column_names = @column_names.split(',') + @key_names = @key_names.nil? ? @column_names : @key_names.split(',') + raise ConfigError, 'NOT match keys number: column_names and key_names' \ + if @key_names.length != @column_names.length + end + + def start + super + # start + @client=Fluent::AzureSearch::Client::new( @endpoint, @api_key ) + end + + def shutdown + super + # destroy + end + + def format(tag, time, record) + values = [] + @key_names.each_with_index do |key, i| + if key == '${time}' + value = Time.at(time).strftime('%Y-%m-%dT%H:%M:%SZ') + elsif key == '${tag}' + value = tag + else + value = record.include?(key) ? record[key] : '' + end + values << value + end + [tag, time, values].to_msgpack + end + + def write(chunk) + documents = [] + chunk.msgpack_each do |tag, time, values| + document = {} + @column_names.each_with_index do|k, i| + document[k] = values[i] + end + documents.push(document) + end + + begin + res = @client.add_documents(@search_index, documents) + puts res + rescue Exception => ex + $log.fatal "UnknownError: '#{ex}'" + + ", data=>" + (documents.to_json).to_s + end + end + + end +end diff --git a/test/helper.rb b/test/helper.rb new file mode 100644 index 0000000..d762e1b --- /dev/null +++ b/test/helper.rb @@ -0,0 +1,29 @@ +require 'rubygems' +require 'bundler' + +begin + Bundler.setup(:default, :development) +rescue Bundler::BundlerError => e + $stderr.puts e.message + $stderr.puts "Run `bundle install` to install missing gems" + exit e.status_code +end +require 'test/unit' + +$LOAD_PATH.unshift(File.join(File.dirname(__FILE__), '..', 'lib')) +$LOAD_PATH.unshift(File.dirname(__FILE__)) +require 'fluent/test' +unless ENV.has_key?('VERBOSE') + nulllogger = Object.new + nulllogger.instance_eval {|obj| + def method_missing(method, *args) + # pass + end + } + $log = nulllogger +end + +require 'fluent/plugin/out_azuresearch' + +class Test::Unit::TestCase +end diff --git a/test/plugin/test_azuresearch.rb b/test/plugin/test_azuresearch.rb new file mode 100644 index 0000000..efc0da5 --- /dev/null +++ b/test/plugin/test_azuresearch.rb @@ -0,0 +1,79 @@ +require 'helper' + +class AzureSearchOutputTest < Test::Unit::TestCase + def setup + Fluent::Test.setup + end + + CONFIG = %[ + endpoint https://AZURE_SEARCH_ACCOUNT.search.windows.net + api_key AZURE_SEARCH_API_KEY + search_index messages + column_names id,user_name,message,tag,created_at + key_names postid,user,content,tag,posttime + ] + # CONFIG = %[ + # path #{TMP_DIR}/out_file_test + # compress gz + # utc + # ] + + def create_driver(conf = CONFIG, tag='azuresearch.test') + Fluent::Test::BufferedOutputTestDriver.new(Fluent::AzureSearchOutput, tag).configure(conf) + end + + def test_configure + #### set configurations + # d = create_driver %[ + # path test_path + # compress gz + # ] + #### check configurations + # assert_equal 'test_path', d.instance.path + # assert_equal :gz, d.instance.compress + end + + def test_format + d = create_driver + + # time = Time.parse("2011-01-02 13:14:15 UTC").to_i + # d.emit({"a"=>1}, time) + # d.emit({"a"=>2}, time) + + # d.expect_format %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] + # d.expect_format %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n] + + # d.run + end + + def test_write + d = create_driver + + time = Time.parse("2016-01-28 13:14:15 UTC").to_i + d.emit( + { + "postid" => "10001", + "user"=> "ladygaga", + "content" => "post by ladygaga", + "tag" => "azuresearch.msg", + "posttime" =>"2016-01-31T00:00:00Z" + }, time) + + d.emit( + { + "postid" => "10002", + "user"=> "katyperry", + "content" => "post by katyperry", + "tag" => "azuresearch.msg", + "posttime" => "2016-01-31T00:00:00Z" + }, time) + + data = d.run + puts data + # ### FileOutput#write returns path + # path = d.run + # expect_path = "#{TMP_DIR}/out_file_test._0.log.gz" + # assert_equal expect_path, path + end +end +