Skip to content

Commit

Permalink
Fix: Moved from see to gorilla websocket
Browse files Browse the repository at this point in the history
  • Loading branch information
khades committed Jan 14, 2018
1 parent a811fcd commit 191bb2e
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 42 deletions.
20 changes: 13 additions & 7 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion bot/subHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func subHandler(message *irc.Message, ircClient *ircClient.IrcClient) {

log.Printf("Channel %v: %v subbed for %v months\n", channel, user, subCount)

eventbus.EventBus.Trigger(eventbus.EventSub(&channelID))
eventbus.EventBus.Publish(eventbus.EventSub(&channelID))
}
}
}
6 changes: 3 additions & 3 deletions eventbus/eventBus.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package eventbus

import observable "github.com/GianlucaGuarini/go-observable"
import evbus "github.com/asaskevich/EventBus"

var EventBus = observable.New()
var EventBus = evbus.New()

func EventSub(channelID *string) string {
return "sub:" + *channelID
}

var SubtrainBus = observable.New()
var SubtrainBus = evbus.New()
63 changes: 51 additions & 12 deletions httpbackend/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,22 @@ import (
"strconv"
"time"

"github.com/gorilla/websocket"
"goji.io/pat"

"github.com/JanBerktold/sse"
"github.com/khades/servbot/eventbus"
"github.com/khades/servbot/models"
"github.com/khades/servbot/repos"
)


type subscriptionEvent struct {
Subscription models.SubscriptionInfo `json:"subscription"`
CurrentCallTime time.Time `json:"currentCallTimetime"`
PreviousCallTime time.Time `json:"previousCallTimetime"`
}

func subscriptions(w http.ResponseWriter, r *http.Request, s *models.HTTPSession, channelID *string, channelName *string) {
result, _:= repos.GetSubsForChannel(channelID)
result, _ := repos.GetSubsForChannel(channelID)
json.NewEncoder(w).Encode(*result)
}

Expand All @@ -46,18 +45,58 @@ func subscriptionsWithLimit(w http.ResponseWriter, r *http.Request, s *models.HT
json.NewEncoder(w).Encode(*result)
}
func subscriptionEvents(w http.ResponseWriter, r *http.Request, s *models.HTTPSession, channelID *string, channelName *string) {
log.Println("Staring ws")
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool { return true }}

conn, err := upgrader.Upgrade(w, r, nil)
pongWait := 40 * time.Second

conn.SetReadDeadline(time.Now().Add(pongWait))
conn.SetPongHandler(func(string) error {
log.Println("Got pong")
conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
if err != nil {
log.Println(err)
return
}

ping := func(value string) {
log.Println(value)
if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
log.Println(err)

return
}
}

conn, _ := sse.Upgrade(w, r)
channel := make(chan string)
write := func(value string) {
channel <- value
log.Println(value)

if err := conn.WriteMessage(websocket.TextMessage, []byte(value)); err != nil {
log.Println(err)
return
}
}
eventbus.EventBus.On("ping "+eventbus.EventSub(channelID), write)
for conn.IsOpen() {
msg := <-channel
conn.WriteString(msg)
eventbus.EventBus.Subscribe("ping", ping)

eventbus.EventBus.Subscribe(eventbus.EventSub(channelID), write)

for {
_, _, err := conn.ReadMessage()
if err != nil {
conn.Close()
break
}
}
defer eventbus.EventBus.Off("ping "+eventbus.EventSub(channelID), write)
defer log.Println("Disconnecting Subscription SSE")

defer eventbus.EventBus.Unsubscribe(eventbus.EventSub(channelID), write)
defer eventbus.EventBus.Unsubscribe("ping", ping)

defer log.Println("Disconnecting Subscription Socket")

}
4 changes: 2 additions & 2 deletions index.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ func main() {
// wg.Done()
// }
// }(&wg)
twentyticker := time.NewTicker(time.Second * 20)
twentyticker := time.NewTicker(time.Second * 30)

go func() {
for {
<-twentyticker.C
eventbus.EventBus.Trigger("ping")
eventbus.EventBus.Publish("ping", "ping")
}
}()

Expand Down
11 changes: 4 additions & 7 deletions repos/autoMessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,14 @@ func DecrementAutoMessages(channelID *string) {
bson.M{"game": bson.M{"$exists": false}}}},
bson.M{"$inc": bson.M{"messagethreshold": -1}})
}
func RemoveInactiveAutoMessages(channelID *string) (*[]models.AutoMessage, error) {
var result []models.AutoMessage
error := Db.C(autoMessageCollectionName).Find(bson.M{
func RemoveInactiveAutoMessages(channelID *string) {
Db.C(autoMessageCollectionName).Remove(bson.M{
"channelid": *channelID,
"message": "",
"history.date": []bson.M{

bson.M{"$not": bson.M{"$gte": time.Now().Add(24 * -7 * time.Hour)}}}},
).All(&result)
return &result, error
bson.M{"$not": bson.M{"$gte": time.Now().Add(24 * -7 * time.Hour)}}}} )
}

func GetCurrentAutoMessages() (*[]models.AutoMessage, error) {
//log.Println("AutoMessage: Getting Current AutoMessages")
var result []models.AutoMessage
Expand Down
20 changes: 10 additions & 10 deletions services/checkVK.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"encoding/json"
"errors"
"fmt"
"log"
//"log"
"strings"
"time"
"unicode/utf8"
Expand Down Expand Up @@ -40,9 +40,9 @@ func Short(s string, i int) string {
}

func CheckVK() {
log.Println("Checking VK")
//log.Println("Checking VK")
if repos.Config.VkClientKey == "" {
log.Println("VK key is not set")
// log.Println("VK key is not set")
return
}
channels, error := repos.GetVKEnabledChannels()
Expand All @@ -54,10 +54,10 @@ func CheckVK() {
}
}
func checkOne(channel *models.ChannelInfo) {
log.Println("Checking group " + channel.VkGroupInfo.GroupName)
// log.Println("Checking group " + channel.VkGroupInfo.GroupName)
result, parseError := ParseVK(&channel.VkGroupInfo)
if parseError != nil {
log.Println("ParseError " + parseError.Error())
// log.Println("ParseError " + parseError.Error())
return
}
if result.LastMessageID == channel.VkGroupInfo.LastMessageID {
Expand All @@ -70,7 +70,7 @@ func checkOne(channel *models.ChannelInfo) {
channelName, channelNameError := repos.GetUsernameByID(&channel.ChannelID)

if channelNameError == nil && *channelName != "" {
log.Println("SENDING MESSAGE")
// log.Println("SENDING MESSAGE")
bot.IrcClientInstance.SendPublic(&models.OutgoingMessage{
Channel: *channelName,
Body: "[VK https://vk.com/" + channel.VkGroupInfo.GroupName + "] " + result.LastMessageBody + " " + result.LastMessageURL})
Expand All @@ -86,10 +86,10 @@ func ParseVK(vkInputGroupInfo *models.VkGroupInfo) (*models.VkGroupInfo, error)
url = "https://api.vk.com/method/wall.get?owner_id=-" + strings.Replace(vkInputGroupInfo.GroupName, "club", "", -1) + "&filter=owner&count=2&v=5.60"

}
log.Println("URL: " + url)
// log.Println("URL: " + url)
resp, error := httpclient.Get(url + "&access_token=" + repos.Config.VkClientKey)
if error != nil {
log.Println(error)
// log.Println(error)
return &vkGroupInfo, error
}
if resp != nil {
Expand All @@ -98,7 +98,7 @@ func ParseVK(vkInputGroupInfo *models.VkGroupInfo) (*models.VkGroupInfo, error)
vkResp := vkResponse{}
marshallError := json.NewDecoder(resp.Body).Decode(&vkResp)
if marshallError != nil {
log.Println(marshallError)
//log.Println(marshallError)
return &vkGroupInfo, marshallError
}
if len(vkResp.Response.Items) == 0 {
Expand Down Expand Up @@ -127,6 +127,6 @@ func ParseVK(vkInputGroupInfo *models.VkGroupInfo) (*models.VkGroupInfo, error)
loc, _ := time.LoadLocation("Europe/Moscow")
nowTime := time.Unix(0, int64(vkPost.Date)*1000000000).In(loc)
vkGroupInfo.LastMessageDate = nowTime.Format("Jan _2 15:04 MSK")
log.Println(vkGroupInfo)
// log.Println(vkGroupInfo)
return &vkGroupInfo, nil
}

0 comments on commit 191bb2e

Please sign in to comment.