From bf84fcd908b1314dc32b4aaf3cc7d949f5864cf8 Mon Sep 17 00:00:00 2001 From: Xackery Date: Sat, 10 Mar 2018 18:37:48 -0800 Subject: [PATCH] Added helloworld go example, repaired SendChannelMessage in world --- utils/nats/helloworld/helloworld.go | 82 +++++++++++++++++++++++++++++ world/nats_manager.cpp | 17 ++++-- 2 files changed, 94 insertions(+), 5 deletions(-) create mode 100644 utils/nats/helloworld/helloworld.go diff --git a/utils/nats/helloworld/helloworld.go b/utils/nats/helloworld/helloworld.go new file mode 100644 index 000000000..0b5627343 --- /dev/null +++ b/utils/nats/helloworld/helloworld.go @@ -0,0 +1,82 @@ +package main + +import ( + "fmt" + "log" + "time" + + "github.com/golang/protobuf/proto" + "github.com/nats-io/go-nats" + "github.com/xackery/rebuildeq/go/eqproto" +) + +func main() { + fmt.Println("Starting...") + var nc *nats.Conn + var err error + + //create a nats connection, by default 127.0.0.1 + if nc, err = nats.Connect(nats.DefaultURL); err != nil { + log.Fatal(err) + } + defer nc.Close() + + //listen for any channel messages from game + go asyncChannelMessageSubscriber(nc) //async is recommended + //go syncChannelMessageSubscriber() //sync is here as example + + //send a channel message to broadcast channel + go testBroadcastMessage(nc, "Hello, World!") + + time.Sleep(20 * time.Second) + fmt.Println("Exited after 20 seconds") +} + +// asyncChannelMessageSubscriber is an example of how to subscribe +// and invoke a function when a message is received +func asyncChannelMessageSubscriber(nc *nats.Conn) { + nc.Subscribe("world.channel_message", func(m *nats.Msg) { + message := &eqproto.ChannelMessage{} + proto.Unmarshal(m.Data, message) + log.Println(message) + }) + log.Println("Waiting on async messages...") + time.Sleep(10 * time.Second) + log.Println("Timed out after 10 seconds") +} + +// syncChannelMessageSubscriber is an example of how to subscribe +// and poll for messages syncronously +func syncChannelMessageSubscriber(nc *nats.Conn) { + + sub, err := nc.SubscribeSync("world.channel_message") + if err != nil { + log.Fatal(err) + } + var m *nats.Msg + if m, err = sub.NextMsg(10 * time.Second); err != nil { + log.Println("Timed out after 10 seconds", err.Error()) + return + } + + message := &eqproto.ChannelMessage{} + proto.Unmarshal(m.Data, message) + log.Println("Got message", message) +} + +func testBroadcastMessage(nc *nats.Conn, msg string) { + message := &eqproto.ChannelMessage{ + From: "go", + Message: msg, + ChanNum: 5, //5 is ooc, 6 is bc + } + d, err := proto.Marshal(message) + if err != nil { + log.Fatal(err) + } + if err = nc.Publish("world.channel_message", d); err != nil { + log.Println("Failed to publish:", err.Error()) + return + } + log.Println("Sending message", message) +} diff --git a/world/nats_manager.cpp b/world/nats_manager.cpp index d5b863bc0..68b5d906a 100644 --- a/world/nats_manager.cpp +++ b/world/nats_manager.cpp @@ -38,8 +38,11 @@ NatsManager::~NatsManager() bool NatsManager::connect() { auto ncs = natsConnection_Status(conn); - if (ncs == CONNECTED) return true; - if (nats_timer.Enabled() && !nats_timer.Check()) return false; + if (ncs == CONNECTED) + return true; + if (nats_timer.Enabled() && !nats_timer.Check()) + return false; + natsOptions *opts = NULL; natsOptions_Create(&opts); natsOptions_SetMaxReconnect(opts, 0); @@ -50,7 +53,9 @@ bool NatsManager::connect() { //but since NATS is a second priority I wanted server impact minimum. natsOptions_SetTimeout(opts, 100); std::string connection = StringFormat("nats://%s:%d", worldConfig->NATSHost.c_str(), worldConfig->NATSPort); - if (worldConfig->NATSHost.length() == 0) connection = "nats://localhost:4222"; + if (worldConfig->NATSHost.length() == 0) + connection = "nats://localhost:4222"; + natsOptions_SetURL(opts, connection.c_str()); s = natsConnection_Connect(&conn, opts); natsOptions_Destroy(opts); @@ -61,6 +66,7 @@ bool NatsManager::connect() { nats_timer.SetTimer(20000); return false; } + Log(Logs::General, Logs::NATS, "connected to %s", connection.c_str()); nats_timer.Disable(); return true; @@ -70,7 +76,8 @@ bool NatsManager::connect() { void NatsManager::Process() { natsMsg *msg = NULL; - if (!connect()) return; + if (!connect()) + return; s = NATS_OK; for (int count = 0; (s == NATS_OK) && count < 5; count++) { @@ -163,7 +170,7 @@ void NatsManager::SendChannelMessage(eqproto::ChannelMessage* message) { Log(Logs::General, Logs::NATS, "Failed to serialize message to string"); return; } - s = natsConnection_PublishString(conn, "world.command_message", pubMessage.c_str()); + s = natsConnection_PublishString(conn, "world.channel_message", pubMessage.c_str()); if (s != NATS_OK) { Log(Logs::General, Logs::NATS, "Failed to send world.command_message"); }