Skip to content

Commit

Permalink
Merge pull request #1 from kescherCode/master
Browse files Browse the repository at this point in the history
Quality of life pull request
  • Loading branch information
vvidic authored Jul 19, 2020
2 parents 2246c65 + 158fa25 commit ed601a7
Showing 1 changed file with 64 additions and 53 deletions.
117 changes: 64 additions & 53 deletions mjpeg-proxy.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
/*
* mjpeg-proxy -- Republish a MJPEG HTTP image stream using a server in Go
*
*
* Copyright (C) 2015, Valentin Vidic
*
* This program is free software: you can redistribute it and/or modify
Expand All @@ -19,48 +19,52 @@

package main

import "io"
import "bufio"
import "flag"
import "fmt"
import "strings"
import "strconv"
import "net/http"
import "errors"
import (
"bufio"
"errors"
"flag"
"fmt"
"io"
"log"
"net/http"
"strconv"
"strings"
)

/* Sample source stream starts like this:
HTTP/1.1 200 OK
Server: nginx/1.2.1
Date: Mon, 13 Apr 2015 14:02:59 GMT
Content-Type: multipart/x-mixed-replace;boundary=myboundary
Transfer-Encoding: chunked
Connection: keep-alive
Cache-Control: no-store
Cache-Control: no-cache
Pragma: no-cache
Content-Language: en
--myboundary
Content-Type: image/jpeg
Content-Length: 36291
JPEG data...
*/

func chunker(body io.ReadCloser, boundary string, pubChan chan []byte, stopChan chan bool) {
func dclose(c io.Closer) {
if err := c.Close(); err != nil {
log.Fatal(err)
}
}

func chunker(body io.ReadCloser, pubChan chan []byte, stopChan chan bool) {
fmt.Print("Chunker: starting\n")

reader := bufio.NewReader(body)
defer body.Close()
defer dclose(body)

defer close(pubChan)
defer close(stopChan)

var failure error;
var failure error

ChunkLoop:
ChunkLoop:
for {
head, size, err := readChunkHeader(reader, boundary)
head, size, err := readChunkHeader(reader)
if err != nil {
failure = err
break ChunkLoop
Expand Down Expand Up @@ -91,7 +95,7 @@ func chunker(body io.ReadCloser, boundary string, pubChan chan []byte, stopChan
fmt.Print("Chunker: stopping\n")
}

func readChunkHeader(reader *bufio.Reader, boundary string) (head []byte, size int, err error) {
func readChunkHeader(reader *bufio.Reader) (head []byte, size int, err error) {
head = make([]byte, 0)
size = -1
err = nil
Expand All @@ -102,11 +106,10 @@ func readChunkHeader(reader *bufio.Reader, boundary string) (head []byte, size i
if err != nil {
return
}
if bl := strings.TrimRight(string(line), "\r\n"); bl != boundary {
err_str := fmt.Sprintf("Invalid boundary received (%s)", bl)
err = errors.New(err_str)
return
}

/* don't check for valid boundary in this function; a lot of webcams
(such as those by AXIS) seem to provide improper boundaries. */

head = append(head, line...)

// read header
Expand All @@ -118,16 +121,16 @@ func readChunkHeader(reader *bufio.Reader, boundary string) (head []byte, size i
head = append(head, line...)

// empty line marks end of header
line_str := strings.TrimRight(string(line), "\r\n")
if len(line_str) == 0 {
lineStr := strings.TrimRight(string(line), "\r\n")
if len(lineStr) == 0 {
break
}

// find data size
parts := strings.SplitN(line_str, ": ", 2)
parts := strings.SplitN(lineStr, ": ", 2)
if strings.EqualFold(parts[0], "Content-Length") {
var n int
n, err = strconv.Atoi(string(parts[1]))
n, err = strconv.Atoi(parts[1])
if err != nil {
return
}
Expand Down Expand Up @@ -162,15 +165,23 @@ func readChunkData(reader *bufio.Reader, size int) (buf []byte, err error) {
}

func getBoundary(resp http.Response) (string, error) {
ct := resp.Header.Get("Content-Type")
prefix := "multipart/x-mixed-replace;boundary="
if !strings.HasPrefix(ct, prefix) {
err_str := fmt.Sprintf("Content-Type is invalid (%s)", ct)
return "", errors.New(err_str)
}
ct := strings.Split(resp.Header.Get("Content-Type"), ";")
fixedCt := ""
fixedPrefix := "multipart/x-mixed-replace;boundary="

boundary := "--" + strings.TrimPrefix(ct, prefix)
return boundary, nil;
if len(ct) < 2 || !strings.HasPrefix(ct[0], "multipart/x-mixed-replace") || !strings.HasPrefix(strings.TrimPrefix(ct[1], " "), "boundary=") {
errStr := fmt.Sprintf("Content-Type is invalid (%s)", strings.Join(ct, ";"))
return "", errors.New(errStr)
}
// Build normalized Content-Type string
builder := strings.Builder{}
builder.WriteString(ct[0])
builder.WriteString(";")
builder.WriteString(strings.TrimPrefix(ct[1], " "))
fixedCt = builder.String()

boundary := "--" + strings.TrimPrefix(fixedCt, fixedPrefix)
return boundary, nil
}

func connectChunker(url, username, password string) (*http.Response, string, error) {
Expand All @@ -190,18 +201,18 @@ func connectChunker(url, username, password string) (*http.Response, string, err
}

if resp.StatusCode != http.StatusOK {
resp.Body.Close()
err_str := fmt.Sprintf("Request failed (%s)", resp.Status)
return nil, "", errors.New(err_str)
dclose(resp.Body)
errStr := fmt.Sprintf("Request failed (%s)", resp.Status)
return nil, "", errors.New(errStr)
}

boundary, err := getBoundary(*resp)
if err != nil {
resp.Body.Close()
dclose(resp.Body)
return nil, "", err
}

return resp, boundary, nil;
return resp, boundary, nil
}

type PubSub struct {
Expand Down Expand Up @@ -270,7 +281,7 @@ func (pubsub *PubSub) loop() {
func (pubsub *PubSub) doPublish(data []byte) {
subs := pubsub.subscribers

for s, _ := range subs {
for s := range subs {
select {
case s.ChunkChannel <- data: // try to send
default: // or skip this frame
Expand All @@ -284,7 +295,7 @@ func (pubsub *PubSub) doSubscribe(s *Subscriber) {
fmt.Printf("PubSub: subscriber %v added (total=%d)\n",
s, len(pubsub.subscribers))

if (len(pubsub.subscribers) == 1) {
if len(pubsub.subscribers) == 1 {
if err := pubsub.startPublisher(); err != nil {
fmt.Printf("PubSub: failed to start publisher (%s)\n", err)
pubsub.stopSubscribers()
Expand All @@ -293,7 +304,7 @@ func (pubsub *PubSub) doSubscribe(s *Subscriber) {
}

func (pubsub *PubSub) stopSubscribers() {
for s, _ := range pubsub.subscribers {
for s := range pubsub.subscribers {
close(s.ChunkChannel)
}
}
Expand All @@ -304,15 +315,15 @@ func (pubsub *PubSub) doUnsubscribe(s *Subscriber) {
fmt.Printf("PubSub: subscriber %v removed (total=%d)\n",
s, len(pubsub.subscribers))

if (len(pubsub.subscribers) == 0) {
if len(pubsub.subscribers) == 0 {
pubsub.stopPublisher()
}
}

func (pubsub *PubSub) startPublisher() error {
fmt.Printf("PubSub: starting publisher for %s\n", pubsub.url)

resp, boundary, err := connectChunker(pubsub.url, pubsub.username, pubsub.password)
resp, _, err := connectChunker(pubsub.url, pubsub.username, pubsub.password)
if err != nil {
return err
}
Expand All @@ -321,7 +332,7 @@ func (pubsub *PubSub) startPublisher() error {
pubsub.pubChan = make(chan []byte)
pubsub.stopChan = make(chan bool)

go chunker(resp.Body, boundary, pubsub.pubChan, pubsub.stopChan)
go chunker(resp.Body, pubsub.pubChan, pubsub.stopChan)

return nil
}
Expand Down Expand Up @@ -369,7 +380,7 @@ func makeHandler(pubsub *PubSub) http.HandlerFunc {
for {
// wait for next chunk
data, ok := <-sub.ChunkChannel
if (!ok) {
if !ok {
break
}

Expand Down Expand Up @@ -404,7 +415,7 @@ func main() {
passwordPtr := flag.String("password", "", "source mjpg password")

bindPtr := flag.String("bind", ":8080", "proxy bind address")
urlPtr := flag.String("url", "/img.mjpg", "proxy serve url")
urlPtr := flag.String("url", "/", "proxy serve url")

flag.Parse()

Expand Down

0 comments on commit ed601a7

Please sign in to comment.