Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
yokawasa committed Feb 11, 2016
0 parents commit ee44935
Show file tree
Hide file tree
Showing 10 changed files with 498 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
*.gem
.bundle
Gemfile.lock
pkg/*
*.swp
4 changes: 4 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
source 'https://rubygems.org'

# Specify your gem's dependencies in fluent-plugin-azuresearch.gemspec
gemspec
214 changes: 214 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
# Azure Search output plugin for Fluentd

fluent-plugin-azuresearch is a fluent plugin to output to Azure Search

## Installation

$ gem install fluent-plugin-azuresearch

## Configuration

### Azure Search

To use Microsoft Azure Search, you must create an Azure Search service in the Azure Portal. Also you must have an index, persisted storage of documents to which fluent-plugin-azuresearch writes event stream out. Here are instructions:

* [Create a service](https://azure.microsoft.com/en-us/documentation/articles/search-create-service-portal/)
* [Create an index](https://azure.microsoft.com/en-us/documentation/articles/search-what-is-an-index/)


### Fluentd - fluent.conf

<match azuresearch.*>
type azuresearch
endpoint https://AZURE_SEARCH_ACCOUNT.search.windows.net
api_key AZURE_SEARCH_API_KEY
search_index messages
column_names id,user_name,message,tag,created_at
key_names postid,user,content,tag,posttime
</match>

* **endpoint (required)** - Azure Search service endpoint URI
* **api\_key (required)** - Azure Search API key
* **search\_index (required)** - Azure Search Index name to insert records
* **column\_names (required)** - Column names in a target Azure search index. Each column needs to be separated by a comma.
* **key\_names (optional)** - Default:nil. Key names in incomming record to insert. Each key needs to be separated by a comma. ${time} is placeholder for Time.at(time).strftime("%Y-%m-%dT%H:%M:%SZ"), and ${tag} is placeholder for tag. By default, **key\_names** is as same as **column\_names**

## Sample Configurations
### Case1 - column_names is as same as key_names

Suppose you have the following fluent.conf and azure search index schema:

<u>fluent.conf</u>

<match azuresearch.*>
type azuresearch
endpoint https://yoichidemo.search.windows.net
api_key 2XX3D2456052A9AD21E54CB03C3ABF6A(dummy)
search_index messages
column_names id,user_name,message,created_at
</match>

<u>Azure Search Schema: messages</u>

{
"name": "messages",
"fields": [
{ "name":"id", "type":"Edm.String", "key": true, "searchable": false },
{ "name":"user_name", "type":"Edm.String" },
{ "name":"message", "type":"Edm.String", "filterable":false, "sortable":false, "facetable":false, "analyzer":"en.lucene" },
{ "name":"created_at", "type":"Edm.DateTimeOffset", "facetable":false}
]
}

The plugin will write event stream out to Azure Ssearch like this:

<u>Input event stream</u>

{ "id": "1", "user_name": "taylorswift13", "message":"post by taylorswift13", "created_at":"2016-01-29T00:00:00Z" },
{ "id": "2", "user_name": "katyperry", "message":"post by katyperry", "created_at":"2016-01-30T00:00:00Z" },
{ "id": "3", "user_name": "ladygaga", "message":"post by ladygaga", "created_at":"2016-01-31T00:00:00Z" }


<u>Search results</u>

"value": [
{ "@search.score": 1, "id": "1", "user_name": "taylorswift13", "message": "post by taylorswift13", "created_at": "2016-01-29T00:00:00Z" },
{ "@search.score": 1, "id": "2", "user_name": "katyperry", "message": "post by katyperry", "created_at": "2016-01-30T00:00:00Z" },
{ "@search.score": 1, "id": "3", "user_name": "ladygaga", "message": "post by ladygaga", "created_at": "2016-01-31T00:00:00Z" }
]


### Case2 - column_names is NOT as same as key_names

Suppose you have the following fluent.conf and azure search index schema:

<u>fluent.conf</u>

<match azuresearch.*>
type azuresearch
endpoint https://yoichidemo.search.windows.net
api_key 2XX3D2456052A9AD21E54CB03C3ABF6A(dummy)
search_index messages
column_names id,user_name,message,created_at
key_names postid,user,content,posttime
</match>

<u>Azure Search Schema: messages</u>

{
"name": "messages",
"fields": [
{ "name":"id", "type":"Edm.String", "key": true, "searchable": false },
{ "name":"user_name", "type":"Edm.String" },
{ "name":"message", "type":"Edm.String", "filterable":false, "sortable":false, "facetable":false, "analyzer":"en.lucene" },
{ "name":"created_at", "type":"Edm.DateTimeOffset", "facetable":false}
]
}

The plugin will write event stream out to Azure Ssearch like this:

<u>Input event stream</u>

{ "postid": "1", "user": "taylorswift13", "content":"post by taylorswift13", "posttime":"2016-01-29T00:00:00Z" },
{ "postid": "2", "user": "katyperry", "content":"post by katyperry", "posttime":"2016-01-30T00:00:00Z" },
{ "postid": "3", "user": "ladygaga", "content":"post by ladygaga", "posttime":"2016-01-31T00:00:00Z" }


<u>Search results</u>

"value": [
{ "@search.score": 1, "id": "1", "user_name": "taylorswift13", "message": "post by taylorswift13", "created_at": "2016-01-29T00:00:00Z" },
{ "@search.score": 1, "id": "2", "user_name": "katyperry", "message": "post by katyperry", "created_at": "2016-01-30T00:00:00Z" },
{ "@search.score": 1, "id": "3", "user_name": "ladygaga", "message": "post by ladygaga", "created_at": "2016-01-31T00:00:00Z" }
]


### Case3 - column_names is NOT as same as key_names, Plus, key_names includes ${time} and ${tag}

<u>fluent.conf</u>

<match azuresearch.*>
type azuresearch
endpoint https://yoichidemo.search.windows.net
api_key 2XX3D2456052A9AD21E54CB03C3ABF6A(dummy)
search_index messages
column_names id,user_name,message,tag,created_at
key_names postid,user,content,${tag},${time}
</match>

<u>Azure Search Schema: messages</u>

{
"name": "messages",
"fields": [
{ "name":"id", "type":"Edm.String", "key": true, "searchable": false },
{ "name":"user_name", "type":"Edm.String" },
{ "name":"message", "type":"Edm.String", "filterable":false, "sortable":false, "facetable":false, "analyzer":"en.lucene" },
{ "name":"created_at", "type":"Edm.DateTimeOffset", "facetable":false}
]
}

The plugin will write event stream out to Azure Ssearch like this:

<u>Input event stream</u>

{ "id": "1", "user_name": "taylorswift13", "message":"post by taylorswift13" },
{ "id": "2", "user_name": "katyperry", "message":"post by katyperry" },
{ "id": "3", "user_name": "ladygaga", "message":"post by ladygaga" }

<u>Search results</u>

"value": [
{ "@search.score": 1, "id": "1", "user_name": "taylorswift13", "message": "post by taylorswift13", "tag": "azuresearch.msg", "created_at": "2016-01-31T21:03:41Z" },
{ "@search.score": 1, "id": "2", "user_name": "katyperry", "message": "post by katyperry", "tag": "azuresearch.msg", "created_at": "2016-01-31T21:03:41Z" },
{ "@search.score": 1, "id": "3", "user_name": "ladygaga", "message": "post by ladygaga", "tag": "azuresearch.msg", "created_at": "2016-01-31T21:03:41Z" }
]
[note] the value of created_at above is the time when fluentd actually recieves its corresponding input event.


## Tests
### Running test code
$ git clone https://github.com/yokawasa/fluent-plugin-azuresearch.git
$ cd fluent-plugin-azuresearch

# edit CONFIG params of test/plugin/test_azuresearch.rb
$ vi test/plugin/test_azuresearch.rb

# run test
$ rake test

### Creating package, running and testing locally
$ rake build
$ rake install:local
# running fluentd with your fluent.conf
$ fluentd -c fluent.conf -vv &
# send test input event to test plugin using fluent-cat
$ echo ' { "postid": "100", "user": "ladygaga", "content":"post by ladygaga"}' | fluent-cat azuresearch.msg

Please don't forget that you need forward input configuration to receive the message from fluent-cat

<source>
type forward
</source>


## TODOs
* Input validation for Azure Search - check total size of columns to add

## Contributing

Bug reports and pull requests are welcome on GitHub at https://github.com/yokawasa/fluent-plugin-azuresearch.

## Copyright

<table>
<tr>
<td>Copyright</td><td>Copyright (c) 2016- Yoichi Kawasaki</td>
</tr>
<tr>
<td>License</td><td>Apache License, Version 2.0</td>
</tr>
</table>

13 changes: 13 additions & 0 deletions Rakefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/usr/bin/env rake

require "bundler/gem_tasks"
require "rake/testtask"

Rake::TestTask.new(:test) do |test|
test.libs << 'lib' << 'test'
test.pattern = 'test/**/test_*.rb'
test.verbose = true
end

task :default => :test

1 change: 1 addition & 0 deletions VERSION
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
0.1.0
27 changes: 27 additions & 0 deletions fluent-plugin-azuresearch.gemspec
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# coding: utf-8
lib = File.expand_path('../lib', __FILE__)
$LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib)

Gem::Specification.new do |gem|
gem.name = "fluent-plugin-azuresearch"
gem.version = File.read("VERSION").strip
gem.authors = ["Yoichi Kawasaki"]
gem.email = ["yoichi.kawasaki@outlook.com"]
gem.summary = %q{Azure Search output plugin for Fluentd}
gem.description = gem.summary
gem.homepage = "http://github.com/yokawasa/fluent-plugin-azuresearch"
gem.license = "Apache-2.0"
gem.has_rdoc = false

gem.files = `git ls-files`.split("\n")
gem.executables = gem.files.grep(%r{^bin/}) { |f| File.basename(f) }
gem.test_files = gem.files.grep(%r{^(test|gem|features)/})
gem.require_paths = ["lib"]

gem.add_development_dependency "bundler", "~> 1.11"
gem.add_development_dependency "rake", "~> 10.0"
gem.add_development_dependency "test-unit"
gem.add_development_dependency "rest-client"
gem.add_runtime_dependency "fluentd"
end

38 changes: 38 additions & 0 deletions lib/fluent/plugin/azuresearch/client.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
module Fluent
module AzureSearch
class Client

def initialize (api_url, api_key, api_version="2015-02-28")
require 'rest-client'
require 'json'
@api_url = api_url
@api_version = api_version
@headers = {
'Content-Type' => "application/json; charset=UTF-8",
'Api-Key' => api_key,
'Accept' => "application/json",
'Accept-Charset' => "UTF-8"
}
end

def add_documents(index_name, documents, merge=true)
raise ConfigError, 'no index_name' if index_name.empty?
raise ConfigError, 'no documents' if documents.empty?
action = merge ? 'mergeOrUpload' : 'upload'
for document in documents
document['@search.action'] = action
end
req_body = { :value => documents }.to_json
# p "REQ_BODY= #{req_body}"
# p "URI= #{@api_url}/indexes/#{index_name}/docs/index?api-version=#{@api_version}"
res = RestClient.post(
"#{@api_url}/indexes/#{index_name}/docs/index?api-version=#{@api_version}",
req_body,
@headers)
res
end

end
end
end

88 changes: 88 additions & 0 deletions lib/fluent/plugin/out_azuresearch.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# -*- coding: utf-8 -*-

module Fluent
class AzureSearchOutput < BufferedOutput
Plugin.register_output('azuresearch', self)

def initialize
super
require 'msgpack'
require 'time'
require 'fluent/plugin/azuresearch/client'
end

config_param :endpoint, :string,
:desc => "Azure Search Endpoint URL"
config_param :api_key, :string,
:desc => "Azure Search API key"
config_param :search_index, :string,
:desc => "Azure Search Index name to insert records"
config_param :column_names, :string,
:desc => "Column names in a target Azure search index (comman separated)"
config_param :key_names, :string, default: nil,
:desc => <<-DESC
Key names in incomming record to insert (comman separated).
${time} is placeholder for Time.at(time).strftime("%Y-%m-%dT%H:%M:%SZ"),
and ${tag} is placeholder for tag
DESC

def configure(conf)
super
raise ConfigError, 'no endpoint' if @endpoint.empty?
raise ConfigError, 'no api_key' if @api_key.empty?
raise ConfigError, 'no search_index' if @search_index.empty?
raise ConfigError, 'no column_names' if @column_names.empty?

@column_names = @column_names.split(',')
@key_names = @key_names.nil? ? @column_names : @key_names.split(',')
raise ConfigError, 'NOT match keys number: column_names and key_names' \
if @key_names.length != @column_names.length
end

def start
super
# start
@client=Fluent::AzureSearch::Client::new( @endpoint, @api_key )
end

def shutdown
super
# destroy
end

def format(tag, time, record)
values = []
@key_names.each_with_index do |key, i|
if key == '${time}'
value = Time.at(time).strftime('%Y-%m-%dT%H:%M:%SZ')
elsif key == '${tag}'
value = tag
else
value = record.include?(key) ? record[key] : ''
end
values << value
end
[tag, time, values].to_msgpack
end

def write(chunk)
documents = []
chunk.msgpack_each do |tag, time, values|
document = {}
@column_names.each_with_index do|k, i|
document[k] = values[i]
end
documents.push(document)
end

begin
res = @client.add_documents(@search_index, documents)
puts res
rescue Exception => ex
$log.fatal "UnknownError: '#{ex}'"
+ ", data=>" + (documents.to_json).to_s
end
end

end
end
Loading

0 comments on commit ee44935

Please sign in to comment.