Skip to content

Commit

Permalink
Add configuration file for sources
Browse files Browse the repository at this point in the history
  • Loading branch information
vvidic committed Jul 19, 2020
1 parent 22cd941 commit 7010c1b
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 28 deletions.
10 changes: 10 additions & 0 deletions config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[
{
"Source" : "http://213.193.89.202/axis-cgi/mjpg/video.cgi",
"Url" : "/source1"
},
{
"Source" : "http://klosterplatz.selfip.info/axis-cgi/mjpg/video.cgi",
"Url" : "/source2"
}
]
113 changes: 85 additions & 28 deletions mjpeg-proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ package main

import (
"bufio"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"net/http"
"net/url"
"os"
Expand All @@ -46,6 +48,7 @@ import (
*/

type Chunker struct {
id string
source string
username string
password string
Expand All @@ -54,7 +57,7 @@ type Chunker struct {
stop chan struct{}
}

func NewChunker(source, username, password string) (*Chunker, error) {
func NewChunker(id, source, username, password string) (*Chunker, error) {
chunker := new(Chunker)

sourceUrl, err := url.Parse(source)
Expand All @@ -65,6 +68,7 @@ func NewChunker(source, username, password string) (*Chunker, error) {
return nil, fmt.Errorf("url is not absolute: %s", source)
}

chunker.id = id
chunker.source = source
chunker.username = username
chunker.password = password
Expand All @@ -73,7 +77,7 @@ func NewChunker(source, username, password string) (*Chunker, error) {
}

func (chunker *Chunker) Connect() error {
fmt.Println("chunker: connecting to", chunker.source)
fmt.Printf("chunker[%s]: connecting to %s\n", chunker.id, chunker.source)

req, err := http.NewRequest("GET", chunker.source, nil)
if err != nil {
Expand Down Expand Up @@ -112,14 +116,14 @@ func (chunker *Chunker) GetHeader() http.Header {
}

func (chunker *Chunker) Start(pubChan chan []byte) {
fmt.Println("chunker: started")
fmt.Printf("chunker[%s]: started\n", chunker.id)

body := chunker.resp.Body
reader := bufio.NewReader(body)
defer func() {
err := body.Close()
if err != nil {
fmt.Println("chunker: body close failed:", err)
fmt.Printf("chunker[%s]: body close failed: %s\n", chunker.id, err)
}
}()
defer close(pubChan)
Expand Down Expand Up @@ -153,14 +157,14 @@ ChunkLoop:
}

if failure != nil {
fmt.Println("chunker: failed: ", failure)
fmt.Printf("chunker[%s]: failed: %s\n", chunker.id, failure)
} else {
fmt.Println("chunker: stopped")
fmt.Printf("chunker[%s]: stopped\n", chunker.id)
}
}

func (chunker *Chunker) Stop() {
fmt.Println("chunker: stopping")
fmt.Printf("chunker[%s]: stopping\n", chunker.id)
close(chunker.stop)
}

Expand Down Expand Up @@ -251,16 +255,18 @@ func getBoundary(resp http.Response) (string, error) {
}

type PubSub struct {
id string
chunker *Chunker
pubChan chan []byte
subChan chan *Subscriber
unsubChan chan *Subscriber
subscribers map[*Subscriber]bool
}

func NewPubSub(chunker *Chunker) *PubSub {
func NewPubSub(id string, chunker *Chunker) *PubSub {
pubsub := new(PubSub)

pubsub.id = id
pubsub.chunker = chunker
pubsub.subChan = make(chan *Subscriber)
pubsub.unsubChan = make(chan *Subscriber)
Expand Down Expand Up @@ -315,12 +321,13 @@ func (pubsub *PubSub) doPublish(data []byte) {
func (pubsub *PubSub) doSubscribe(s *Subscriber) {
pubsub.subscribers[s] = true

fmt.Printf("pubsub: added subscriber %s (total=%d)\n",
s.RemoteAddr, len(pubsub.subscribers))
fmt.Printf("pubsub[%s]: added subscriber %s (total=%d)\n",
pubsub.id, s.RemoteAddr, len(pubsub.subscribers))

if len(pubsub.subscribers) == 1 {
if err := pubsub.startChunker(); err != nil {
fmt.Println("pubsub: failed to start chunker:", err)
fmt.Printf("pubsub[%s]: failed to start chunker: %s\n",
pubsub.id, err)
pubsub.stopSubscribers()
}
}
Expand All @@ -335,8 +342,8 @@ func (pubsub *PubSub) stopSubscribers() {
func (pubsub *PubSub) doUnsubscribe(s *Subscriber) {
delete(pubsub.subscribers, s)

fmt.Printf("pubsub: removed subscriber %s (total=%d)\n",
s.RemoteAddr, len(pubsub.subscribers))
fmt.Printf("pubsub[%s]: removed subscriber %s (total=%d)\n",
pubsub.id, s.RemoteAddr, len(pubsub.subscribers))

if len(pubsub.subscribers) == 0 {
pubsub.stopChunker()
Expand Down Expand Up @@ -381,8 +388,8 @@ func (pubsub *PubSub) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// prepare response for flushing
flusher, ok := w.(http.Flusher)
if !ok {
fmt.Printf("server: client %s could not be flushed",
r.RemoteAddr)
fmt.Printf("server[%s]: client %s could not be flushed\n",
pubsub.id, r.RemoteAddr)
return
}

Expand Down Expand Up @@ -417,36 +424,86 @@ func (pubsub *PubSub) ServeHTTP(w http.ResponseWriter, r *http.Request) {

// check for client close
if err != nil {
fmt.Printf("server: client %s failed: %s\n",
r.RemoteAddr, err)
fmt.Printf("server[%s]: client %s failed: %s\n",
pubsub.id, r.RemoteAddr, err)
break
}
}
}

func startSource(source, username, password, proxyUrl string) error {
chunker, err := NewChunker(proxyUrl, source, username, password)
if err != nil {
return fmt.Errorf("chunker[%s]: create failed: %s", proxyUrl, err)
}
pubsub := NewPubSub(proxyUrl, chunker)
pubsub.Start()

fmt.Printf("chunker[%s]: serving from %s\n", proxyUrl, source)
http.Handle(proxyUrl, pubsub)

return nil
}

type configSource struct {
Source string
Username string
Password string
Url string
}

func loadConfig(config string) error {
file, err := os.Open(config)
if err != nil {
return err
}
defer file.Close()

sources := make([]configSource, 0)
dec := json.NewDecoder(file)
err = dec.Decode(&sources)
if err != nil && err != io.EOF {
return err
}

exists := make(map[string]bool)
for _, conf := range sources {
if exists[conf.Url] {
return fmt.Errorf("duplicate proxy url: %s", conf.Url)
}

err = startSource(conf.Source, conf.Username, conf.Password, conf.Url)
if err != nil {
return err
}

exists[conf.Url] = true
}

return nil
}

func main() {
// check parameters
source := flag.String("source", "http://example.com/img.mjpg", "source mjpg url")
username := flag.String("username", "", "source mjpg username")
password := flag.String("password", "", "source mjpg password")
url := flag.String("url", "/", "proxy serve url")

config := flag.String("config", "", "JSON configuration file to load")
bind := flag.String("bind", ":8080", "proxy bind address")

flag.Parse()

// start pubsub client connector
chunker, err := NewChunker(*source, *username, *password)
var err error
if *config != "" {
err = loadConfig(*config)
} else {
err = startSource(*source, *username, *password, *url)
}
if err != nil {
fmt.Println("chunker: create failed:", err)
fmt.Println("config: ", err)
os.Exit(1)
}
pubsub := NewPubSub(chunker)
http.Handle(*url, pubsub)

// start serving
pubsub.Start()
fmt.Printf("server: starting on address %s with url %s\n", *bind, *url)
fmt.Printf("server: starting on address %s\n", *bind)
err = http.ListenAndServe(*bind, nil)
if err != nil {
fmt.Println("server: failed to start:", err)
Expand Down

0 comments on commit 7010c1b

Please sign in to comment.