zone/natsmanager now properly uses byte for serializing

This commit is contained in:
Xackery 2018-03-14 12:02:39 -07:00
parent e0d4f9ecd1
commit 122f1cd02d
2 changed files with 115 additions and 99 deletions

View File

@ -49,7 +49,6 @@ void NatsManager::Process()
return;
natsMsg *msg = NULL;
std::string pubMessage;
s = NATS_OK;
for (int count = 0; (s == NATS_OK) && count < 5; count++)
@ -155,16 +154,17 @@ void NatsManager::SendChannelMessage(eqproto::ChannelMessage* message, const cha
if (!connect())
return;
std::string pubMessage;
if (!message->SerializeToString(&pubMessage)) {
Log(Logs::General, Logs::NATS, "world.channel_message.out: failed to serialize message to string");
size_t size = message->ByteSizeLong();
void *buffer = malloc(size);
if (!message->SerializeToArray(buffer, size)) {
Log(Logs::General, Logs::NATS, "world.channel_message.out: failed to serialize");
return;
}
if (reply)
s = natsConnection_Publish(conn, reply, (const void*)pubMessage.c_str(), pubMessage.length());
s = natsConnection_Publish(conn, reply, buffer, size);
else
s = natsConnection_Publish(conn, StringFormat("zone.%s.%d.channel_message.out", subscribedZoneName.c_str(), subscribedZoneInstance).c_str(), (const void*)pubMessage.c_str(), pubMessage.length());
s = natsConnection_Publish(conn, StringFormat("zone.%s.%d.channel_message.out", subscribedZoneName.c_str(), subscribedZoneInstance).c_str(), buffer, size);
if (s != NATS_OK) {
@ -502,16 +502,18 @@ void NatsManager::SendCommandMessage(eqproto::CommandMessage* message, const cha
message->set_result("Failed to parse command.");
std::string pubMessage;
if (!message->SerializeToString(&pubMessage)) {
Log(Logs::General, Logs::NATS, "zone.%s.%d.command_message.in: failed to serialize to string", subscribedZoneName.c_str(), subscribedZoneInstance);
size_t size = message->ByteSizeLong();
void *buffer = malloc(size);
if (!message->SerializeToArray(buffer, size)) {
Log(Logs::General, Logs::NATS, "zone.%s.%d.command_message.in: failed to serialize", subscribedZoneName.c_str(), subscribedZoneInstance);
SendCommandMessage(message, reply);
return;
}
if (reply)
s = natsConnection_Publish(conn, reply, (const void*)pubMessage.c_str(), pubMessage.length());
s = natsConnection_Publish(conn, reply, buffer, size);
else
s = natsConnection_Publish(conn, StringFormat("zone.%s.%d.command_message.out", subscribedZoneName.c_str(), subscribedZoneInstance).c_str(), (const void*)pubMessage.c_str(), pubMessage.length());
s = natsConnection_Publish(conn, StringFormat("zone.%s.%d.command_message.out", subscribedZoneName.c_str(), subscribedZoneInstance).c_str(), buffer, size);
if (s != NATS_OK) {
@ -565,19 +567,21 @@ void NatsManager::Unregister()
return;
}
void NatsManager::SendEvent(eqproto::OpCode op, uint32 entity_id, std::string pubMessage) {
void NatsManager::SendEvent(eqproto::OpCode op, uint32 entity_id, void * buffer, size_t size) {
eqproto::Event* finalEvent = google::protobuf::Arena::CreateMessage<eqproto::Event>(&the_arena);
finalEvent->set_payload(pubMessage.c_str());
finalEvent->set_op(op);
finalEvent->set_entity_id(entity_id);
eqproto::Event* event = google::protobuf::Arena::CreateMessage<eqproto::Event>(&the_arena);
event->set_payload(buffer, size);
event->set_op(op);
event->set_entity_id(entity_id);
if (!finalEvent->SerializeToString(&pubMessage)) {
Log(Logs::General, Logs::NATS, "zone.%s.%d.entity.%d.event.out: (OP: %d) failed to serialize message to string", subscribedZoneName.c_str(), subscribedZoneInstance, entity_id, op);
size_t event_size = event->ByteSizeLong();
void *event_buffer = malloc(event_size);
if (!event->SerializeToArray(event_buffer, event_size)) {
Log(Logs::General, Logs::NATS, "zone.%s.%d.entity.%d.event.out: (OP: %d) failed to serialize message", subscribedZoneName.c_str(), subscribedZoneInstance, entity_id, op);
return;
}
s = natsConnection_Publish(conn, StringFormat("zone.%s.%d.entity.%d.event.out", subscribedZoneName.c_str(), subscribedZoneInstance, entity_id).c_str(), (const void*)pubMessage.c_str(), pubMessage.length());
s = natsConnection_Publish(conn, StringFormat("zone.%s.%d.entity.%d.event.out", subscribedZoneName.c_str(), subscribedZoneInstance, entity_id).c_str(), event_buffer, event_size);
if (s != NATS_OK)
Log(Logs::General, Logs::NATS, "zone.%s.%d.entity.%d.event.out: (OP: %d) failed to send: %s", subscribedZoneName.c_str(), subscribedZoneInstance, entity_id, op, nats_GetLastError(&s));
@ -640,13 +644,16 @@ void NatsManager::SendAdminMessage(std::string adminMessage) {
eqproto::ChannelMessage* message = google::protobuf::Arena::CreateMessage<eqproto::ChannelMessage>(&the_arena);
message->set_message(adminMessage.c_str());
std::string pubMessage;
if (!message->SerializeToString(&pubMessage)) {
Log(Logs::General, Logs::NATS, "global.admin_message.out: failed to serialize message to string");
size_t size = message->ByteSizeLong();
void *buffer = malloc(size);
if (!message->SerializeToArray(buffer, size)) {
Log(Logs::General, Logs::NATS, "global.admin_message.out: failed to serialize");
return;
}
s = natsConnection_Publish(conn, "global.admin_message.out", (const void*)pubMessage.c_str(), pubMessage.length());
s = natsConnection_Publish(conn, "global.admin_message.out", buffer, size);
if (s != NATS_OK) {
Log(Logs::General, Logs::NATS, "global.admin_message.out: failed: %s", nats_GetLastError(&s));
return;
@ -716,6 +723,7 @@ void NatsManager::EncodeEntity(Entity * entity, eqproto::Entity * out)
if (entity->IsMob()) {
out->set_type(eqproto::EntityType::Mob);
Mob* mob = entity->CastToMob();
out->set_name(mob->GetName());
out->set_hp(mob->GetHP());
out->set_level(mob->GetLevel());
eqproto::Position* pos = google::protobuf::Arena::CreateMessage<eqproto::Position>(&the_arena);
@ -834,7 +842,7 @@ void NatsManager::OnDeathEvent(Death_Struct* d) {
return;
auto op = eqproto::OP_Death;
std::string pubMessage;
eqproto::DeathEvent* event = google::protobuf::Arena::CreateMessage<eqproto::DeathEvent>(&the_arena);
event->set_spawn_id(d->spawn_id);
@ -844,13 +852,14 @@ void NatsManager::OnDeathEvent(Death_Struct* d) {
event->set_attack_skill_id(d->attack_skill);
event->set_damage(d->damage);
if (!event->SerializeToString(&pubMessage)) {
Log(Logs::General, Logs::NATS, "zone.%s.%d.entity.%d.event.out: (OP: %d) failed to serialize message to string", subscribedZoneName.c_str(), subscribedZoneInstance, d->spawn_id, op);
return;
size_t event_size = event->ByteSizeLong();
void *event_buffer = malloc(event_size);
if (!event->SerializeToArray(event_buffer, event_size)) {
Log(Logs::General, Logs::NATS, "zone.%s.%d.entity.%d.event.out: (OP: %d) failed to serialize message", subscribedZoneName.c_str(), subscribedZoneInstance, d->spawn_id, op);
return;
}
SendEvent(op, d->spawn_id, pubMessage);
SendEvent(op, d->killer_id, pubMessage);
SendEvent(op, d->spawn_id, event_buffer, event_size);
SendEvent(op, d->killer_id, event_buffer, event_size);
}
@ -863,8 +872,7 @@ void NatsManager::OnChannelMessageEvent(uint32 entity_id, ChannelMessage_Struct*
if (!isEntityEventAllEnabled && !isEntitySubscribed(entity_id))
return;
std::string pubMessage;
auto op = eqproto::OP_ChannelMessage;
eqproto::ChannelMessageEvent* event = google::protobuf::Arena::CreateMessage<eqproto::ChannelMessageEvent>(&the_arena);
event->set_target_name(cm->targetname);
@ -875,11 +883,14 @@ void NatsManager::OnChannelMessageEvent(uint32 entity_id, ChannelMessage_Struct*
event->set_skill_in_language(cm->skill_in_language);
event->set_message(cm->message);
if (!event->SerializeToString(&pubMessage)) {
Log(Logs::General, Logs::NATS, "zone.%s.%d.channel_message.out: failed to serialize message to string", subscribedZoneName.c_str(), subscribedZoneInstance);
return;
}
size_t event_size = event->ByteSizeLong();
void *event_buffer = malloc(event_size);
if (!event->SerializeToArray(event_buffer, event_size)) {
Log(Logs::General, Logs::NATS, "zone.%s.%d.channel_message.out: failed to serialize", subscribedZoneName.c_str(), subscribedZoneInstance);
return;
}
SendEvent(op, entity_id, event_buffer, event_size);
}
void NatsManager::OnSpecialMessageEvent(uint32 entity_id, SpecialMesg_Struct* sm) {
@ -892,7 +903,6 @@ void NatsManager::OnSpecialMessageEvent(uint32 entity_id, SpecialMesg_Struct* sm
auto op = eqproto::OP_SpecialMesg;
std::string pubMessage;
eqproto::SpecialMessageEvent* event = google::protobuf::Arena::CreateMessage<eqproto::SpecialMessageEvent>(&the_arena);
event->set_header(sm->header);
event->set_number(static_cast<eqproto::MessageType>(sm->msg_type));
@ -900,13 +910,15 @@ void NatsManager::OnSpecialMessageEvent(uint32 entity_id, SpecialMesg_Struct* sm
event->set_sayer(sm->sayer);
event->set_unknown12(*sm->unknown12);
event->set_message(sm->message);
if (!event->SerializeToString(&pubMessage)) {
Log(Logs::General, Logs::NATS, "zone.%s.%d.entity.%d.event.out: (OP: %d) failed to serialize message to string", subscribedZoneName.c_str(), subscribedZoneInstance, entity_id, op);
size_t event_size = event->ByteSizeLong();
void *event_buffer = malloc(event_size);
if (!event->SerializeToArray(event_buffer, event_size)) {
Log(Logs::General, Logs::NATS, "zone.%s.%d.entity.%d.event.out: (OP: %d) failed to serialize message", subscribedZoneName.c_str(), subscribedZoneInstance, entity_id, op);
return;
}
SendEvent(op, entity_id, pubMessage);
SendEvent(op, entity_id, event_buffer, event_size);
}
void NatsManager::OnEntityEvent(const EmuOpcode op, uint32 entity_id, uint32 target_id) {
@ -919,21 +931,10 @@ void NatsManager::OnEntityEvent(const EmuOpcode op, uint32 entity_id, uint32 tar
if (!isEntityEventAllEnabled && !isEntitySubscribed(entity_id))
return;
std::string pubMessage;
eqproto::EntityEvent* event = google::protobuf::Arena::CreateMessage<eqproto::EntityEvent>(&the_arena);
event->set_entity_id(entity_id);
event->set_target_id(target_id);
if (!event->SerializeToString(&pubMessage)) {
Log(Logs::General, Logs::NATS, "zone.%s.%d.entity.%d.event.out: (OP: %d) failed to serialize message to string", subscribedZoneName.c_str(), subscribedZoneInstance, entity_id, op);
return;
}
eqproto::Event* finalEvent = google::protobuf::Arena::CreateMessage<eqproto::Event>(&the_arena);
finalEvent->set_payload(pubMessage.c_str());
auto eqop = eqproto::OP_Camp;
if (op == OP_Camp) {
eqop = eqproto::OP_Camp;
@ -945,8 +946,16 @@ void NatsManager::OnEntityEvent(const EmuOpcode op, uint32 entity_id, uint32 tar
Log(Logs::General, Logs::NATS, "unhandled op type passed: %i", op);
return;
}
SendEvent(eqop, entity_id, pubMessage);
SendEvent(eqop, target_id, pubMessage);
size_t event_size = event->ByteSizeLong();
void *event_buffer = malloc(event_size);
if (!event->SerializeToArray(event_buffer, event_size)) {
Log(Logs::General, Logs::NATS, "zone.%s.%d.entity.%d.event.out: (OP: %d) failed to serialize message", subscribedZoneName.c_str(), subscribedZoneInstance, entity_id, op);
return;
}
SendEvent(eqop, entity_id, event_buffer, event_size);
SendEvent(eqop, target_id, event_buffer, event_size);
}
@ -960,8 +969,6 @@ void NatsManager::OnSpawnEvent(const EmuOpcode op, uint32 entity_id, Spawn_Struc
if (!isEntityEventAllEnabled && !isEntitySubscribed(entity_id))
return;
std::string pubMessage;
eqproto::SpawnEvent* event = google::protobuf::Arena::CreateMessage<eqproto::SpawnEvent>(&the_arena);
event->set_unknown0000(spawn->unknown0000);
@ -1070,11 +1077,6 @@ void NatsManager::OnSpawnEvent(const EmuOpcode op, uint32 entity_id, Spawn_Struc
event->set_targetable_with_hotkey(spawn->targetable_with_hotkey);
event->set_show_name(spawn->show_name);
if (!event->SerializeToString(&pubMessage)) {
Log(Logs::General, Logs::NATS, "zone.%s.%d.entity.%d.event.out: (OP: %d) failed to serialize message to string", subscribedZoneName.c_str(), subscribedZoneInstance, entity_id, op);
return;
}
auto eqop = eqproto::OP_ZoneEntry;
if (op == OP_ZoneEntry)
eqop = eqproto::OP_ZoneEntry;
@ -1084,7 +1086,14 @@ void NatsManager::OnSpawnEvent(const EmuOpcode op, uint32 entity_id, Spawn_Struc
Log(Logs::General, Logs::NATS, "zone.%s.%d.entity.%d.event.out: (OP: %d) unhandled opcode passed", subscribedZoneName.c_str(), subscribedZoneInstance, entity_id, op);
return;
}
SendEvent(eqop, entity_id, pubMessage);
size_t event_size = event->ByteSizeLong();
void *event_buffer = malloc(event_size);
if (!event->SerializeToArray(event_buffer, event_size)) {
Log(Logs::General, Logs::NATS, "zone.%s.%d.entity.%d.event.out: (OP: %d) failed to serialize message", subscribedZoneName.c_str(), subscribedZoneInstance, entity_id, op);
return;
}
SendEvent(eqop, entity_id, event_buffer, event_size);
}
@ -1097,7 +1106,7 @@ void NatsManager::OnWearChangeEvent(uint32 entity_id, WearChange_Struct *wc) {
return;
auto op = eqproto::OP_WearChange;
std::string pubMessage;
eqproto::WearChangeEvent* event = google::protobuf::Arena::CreateMessage<eqproto::WearChangeEvent>(&the_arena);
event->set_spawn_id(wc->spawn_id);
event->set_material(wc->material);
@ -1108,12 +1117,13 @@ void NatsManager::OnWearChangeEvent(uint32 entity_id, WearChange_Struct *wc) {
//event->set_color(wc->color); //tint
event->set_wear_slot_id(wc->wear_slot_id);
if (!event->SerializeToString(&pubMessage)) {
Log(Logs::General, Logs::NATS, "zone.%s.%d.entity.%d.event.out: (OP: %d) failed to serialize message to string", subscribedZoneName.c_str(), subscribedZoneInstance, entity_id, op);
return;
size_t event_size = event->ByteSizeLong();
void *event_buffer = malloc(event_size);
if (!event->SerializeToArray(event_buffer, event_size)) {
Log(Logs::General, Logs::NATS, "zone.%s.%d.entity.%d.event.out: (OP: %d) failed to serialize message", subscribedZoneName.c_str(), subscribedZoneInstance, entity_id, op);
return;
}
SendEvent(op, entity_id, pubMessage);
SendEvent(op, entity_id, event_buffer, event_size);
}
void NatsManager::OnDeleteSpawnEvent(uint32 entity_id, DeleteSpawn_Struct *ds) {
@ -1124,18 +1134,19 @@ void NatsManager::OnDeleteSpawnEvent(uint32 entity_id, DeleteSpawn_Struct *ds) {
if (!isEntityEventAllEnabled && !isEntitySubscribed(entity_id))
return;
std::string pubMessage;
auto op = eqproto::OP_DeleteSpawn;
eqproto::DeleteSpawnEvent* event = google::protobuf::Arena::CreateMessage<eqproto::DeleteSpawnEvent>(&the_arena);
event->set_spawn_id(ds->spawn_id);
event->set_decay(ds->Decay);
if (!event->SerializeToString(&pubMessage)) {
Log(Logs::General, Logs::NATS, "zone.%s.%d.entity.%d.event.out: (OP: %d) failed to serialize message to string", subscribedZoneName.c_str(), subscribedZoneInstance, entity_id, op);
return;
size_t event_size = event->ByteSizeLong();
void *event_buffer = malloc(event_size);
if (!event->SerializeToArray(event_buffer, event_size)) {
Log(Logs::General, Logs::NATS, "zone.%s.%d.entity.%d.event.out: (OP: %d) failed to serialize message", subscribedZoneName.c_str(), subscribedZoneInstance, entity_id, op);
return;
}
SendEvent(op, entity_id, pubMessage);
SendEvent(op, entity_id, event_buffer, event_size);
}
void NatsManager::OnHPEvent(const EmuOpcode op, uint32 entity_id, uint32 cur_hp, uint32 max_hp) {
@ -1147,7 +1158,6 @@ void NatsManager::OnHPEvent(const EmuOpcode op, uint32 entity_id, uint32 cur_hp,
return;
if (cur_hp == max_hp)
return;
std::string pubMessage;
eqproto::HPEvent* event = google::protobuf::Arena::CreateMessage<eqproto::HPEvent>(&the_arena);
@ -1155,10 +1165,6 @@ void NatsManager::OnHPEvent(const EmuOpcode op, uint32 entity_id, uint32 cur_hp,
event->set_cur_hp(cur_hp);
event->set_max_hp(max_hp);
if (!event->SerializeToString(&pubMessage)) {
Log(Logs::General, Logs::NATS, "zone.%s.%d.entity.%d.event.out: (OP: %d) failed to serialize message to string", subscribedZoneName.c_str(), subscribedZoneInstance, entity_id, op);
return;
}
auto eqop = eqproto::OP_MobHealth;
if (op == OP_MobHealth)
eqop = eqproto::OP_MobHealth;
@ -1168,7 +1174,14 @@ void NatsManager::OnHPEvent(const EmuOpcode op, uint32 entity_id, uint32 cur_hp,
Log(Logs::General, Logs::NATS, "unhandled op type passed: %i", op);
return;
}
SendEvent(eqop, entity_id, pubMessage);
size_t event_size = event->ByteSizeLong();
void *event_buffer = malloc(event_size);
if (!event->SerializeToArray(event_buffer, event_size)) {
Log(Logs::General, Logs::NATS, "zone.%s.%d.entity.%d.event.out: (OP: %d) failed to serialize message", subscribedZoneName.c_str(), subscribedZoneInstance, entity_id, op);
return;
}
SendEvent(eqop, entity_id, event_buffer, event_size);
}
void NatsManager::OnDamageEvent(uint32 entity_id, CombatDamage_Struct *cd) {
@ -1179,7 +1192,6 @@ void NatsManager::OnDamageEvent(uint32 entity_id, CombatDamage_Struct *cd) {
if (!isEntityEventAllEnabled && !isEntitySubscribed(entity_id))
return;
std::string pubMessage;
eqproto::DamageEvent* event = google::protobuf::Arena::CreateMessage<eqproto::DamageEvent>(&the_arena);
event->set_target(cd->target);
@ -1192,13 +1204,15 @@ void NatsManager::OnDamageEvent(uint32 entity_id, CombatDamage_Struct *cd) {
event->set_meleepush_z(cd->hit_pitch);
auto op = eqproto::OP_Damage;
if (!event->SerializeToString(&pubMessage)) {
Log(Logs::General, Logs::NATS, "Failed to serialize message to string");
return;
size_t event_size = event->ByteSizeLong();
void *event_buffer = malloc(event_size);
if (!event->SerializeToArray(event_buffer, event_size)) {
Log(Logs::General, Logs::NATS, "zone.%s.%d.entity.%d.event.out: (OP: %d) failed to serialize message", subscribedZoneName.c_str(), subscribedZoneInstance, entity_id, op);
return;
}
SendEvent(op, entity_id, pubMessage);
SendEvent(op, entity_id, event_buffer, event_size);
}
void NatsManager::OnClientUpdateEvent(uint32 entity_id, PlayerPositionUpdateServer_Struct * spu) {
@ -1211,7 +1225,6 @@ void NatsManager::OnClientUpdateEvent(uint32 entity_id, PlayerPositionUpdateServ
if (!isEntityEventAllEnabled && !isEntitySubscribed(entity_id))
return;
std::string pubMessage;
eqproto::PlayerPositionUpdateEvent* event = google::protobuf::Arena::CreateMessage<eqproto::PlayerPositionUpdateEvent>(&the_arena);
event->set_spawn_id(spu->spawn_id);
@ -1227,13 +1240,16 @@ void NatsManager::OnClientUpdateEvent(uint32 entity_id, PlayerPositionUpdateServ
event->set_heading(spu->heading);
event->set_padding0014(spu->padding0014);
event->set_delta_z(spu->delta_z);
event->set_padding0018(spu->padding0018);
if (!event->SerializeToString(&pubMessage)) {
Log(Logs::General, Logs::NATS, "zone.%s.%d.entity.%d.event.out: (OP: %d) failed to serialize message to string", subscribedZoneName.c_str(), subscribedZoneInstance, entity_id, op);
return;
event->set_padding0018(spu->padding0018);
size_t event_size = event->ByteSizeLong();
void *event_buffer = malloc(event_size);
if (!event->SerializeToArray(event_buffer, event_size)) {
Log(Logs::General, Logs::NATS, "zone.%s.%d.entity.%d.event.out: (OP: %d) failed to serialize message", subscribedZoneName.c_str(), subscribedZoneInstance, entity_id, op);
return;
}
SendEvent(op, entity_id, pubMessage);
SendEvent(op, entity_id, event_buffer, event_size);
}
@ -1245,7 +1261,6 @@ void NatsManager::OnAnimationEvent(uint32 entity_id, Animation_Struct *anim) {
if (!isEntityEventAllEnabled && !isEntitySubscribed(entity_id))
return;
std::string pubMessage;
auto op = eqproto::OP_Animation;
@ -1253,10 +1268,11 @@ void NatsManager::OnAnimationEvent(uint32 entity_id, Animation_Struct *anim) {
event->set_spawnid(anim->spawnid);
event->set_speed(anim->speed);
event->set_action(anim->action);
if (!event->SerializeToString(&pubMessage)) {
Log(Logs::General, Logs::NATS, "zone.%s.%d.entity.%d.event.out: (OP: %d) failed to serialize message to string", subscribedZoneName.c_str(), subscribedZoneInstance, entity_id, op);
return;
size_t event_size = event->ByteSizeLong();
void *event_buffer = malloc(event_size);
if (!event->SerializeToArray(event_buffer, event_size)) {
Log(Logs::General, Logs::NATS, "zone.%s.%d.entity.%d.event.out: (OP: %d) failed to serialize message", subscribedZoneName.c_str(), subscribedZoneInstance, entity_id, op);
return;
}
SendEvent(op, entity_id, pubMessage);
SendEvent(op, entity_id, event_buffer, event_size);
}

View File

@ -29,7 +29,7 @@ public:
void GetCommandMessage(eqproto::CommandMessage* message, const char* reply = nullptr);
void SendCommandMessage(eqproto::CommandMessage* message, const char* reply = nullptr);
void SendAdminMessage(std::string adminMessage);
void SendEvent(eqproto::OpCode op, uint32 entity_id, std::string pubMessage);
void SendEvent(eqproto::OpCode op, uint32 entity_id, void * buffer, size_t size);
void OnChannelMessageEvent(uint32 entity_id, ChannelMessage_Struct * cm);
void OnSpecialMessageEvent(uint32 entity_id, SpecialMesg_Struct *sm);