HareGo: When RabbitMQ meets Go

(Arsham Shirvani) (18 december 2020)

Voorwoord

Bij Blokur gebruiken we RabbitMQ voor asynchrone communicatie tussen microservices en om banen te beheren. AMQP -bibliotheek is de de-facto standaard voor Go-applicaties, maar het niveau is erg laag waardoor het moeilijk is om flexibele en testbare code te schrijven.

In de wereld van Go zijn er een handvol bibliotheken op hoog niveau die de communicatie met RabbitMQ vergemakkelijken. Helaas voldeden ze niet aan al onze use-cases, of ze waren erg specifiek over wat ze te bieden hadden, waardoor het erg moeilijk voor ons was om mee te werken. Dus besloten we om onze eigen bibliotheek te bouwen.

We hadden iets nodig waar we snel aan konden beginnen voor het belangrijkste deel van ons dagelijkse werk: problemen oplossen. Alles daarrond is een ontwikkelingskost en past niet in ons model.

HareGo invoeren

We begonnen na te denken over het maken van onze eigen bibliotheek met de volgende kenmerken:

  1. Het zou bijna elk aspect van de AMQP-bibliotheek moeten ondersteunen.
  2. Moet goed getest en gemakkelijk testbaar zijn.
  3. Moet gebruiksvriendelijk en uitbreidbaar zijn.
  4. Moet worden geleverd met gezonde standaardinstellingen die gewoon werken.
  5. Moet gelijktijdig veilig zijn.
  6. Goed om de mogelijkheid te hebben om berichten opnieuw in de wachtrij te plaatsen (niet alleen Nack) en het bericht te kunnen manipuleren voordat ze opnieuw in de wachtrij worden geplaatst.

We zijn verheugd aan te kondigen dat we deze bibliotheek open source hebben gemaakt. Op het moment dat dit artikel wordt geschreven, ondersteunen we Go-versie 1.14 en hoger.

API

De interface van HareGo is heel eenvoudig. U moet een harego.Client -object maken met het handige harego. NewClient () functie. De enige vereisten van de NewClient () -functie is het adres van de RabbitMQ-server. U kunt elk aspect van de client, en dus ook de onderliggende AMQP-client, configureren door harego.ConfigFunc configuratiefuncties geleverd door de bibliotheek.

Standaard is een harego.Client maakt één werknemer aan, die één publicatie- of één consumptieverzoek tegelijk afhandelt. De klant krijgt een willekeurige consumentennaam. Het standaardgedrag stelt de autoDelete en autoAck eigenschappen van alle berichten als false. U kunt ze echter anders instellen als u dat wilt.

U kunt de harego.Workers (3) configuratiefunctie om 3 gelijktijdige werknemers te hebben. Dit geldt voor het publiceren en consumeren van berichten. De bibliotheek garandeert dat het volgende beschikbare bericht wordt afgeleverd bij de volgende beschikbare werkplaats.

Het is een goed idee om een ​​ harego.QueueName () config om de cliënt te laten weten met welke wachtrij hij moet communiceren. De standaard wachtrijnaam is “harego”.

De client is een interface met de volgende specificaties:

Berichten publiceren

Om berichten te publiceren, kunt u een * amqp.Publishing object naar de Publiceer methode. Het bericht wordt verwerkt door de volgende beschikbare medewerker.

Berichten consumeren

De Consume methode is een blokkerende oproep die een handler zou gebruiken telkens wanneer een nieuw bericht van de makelaar wordt ontvangen. Het verbruikt niet meer wanneer de context wordt geannuleerd of de client wordt gesloten.

Een handler moet twee waarden retourneren: de AckType waarde en een vertraging. De vertraging geeft aan hoe lang het moet slapen voordat het reageert op de makelaar, en de AckType-waarde specificeert wat er met het bericht moet gebeuren.

AckType-waarden

Er is een beslissing genomen om de “Multiple” waarde van de ack naar false om slechts op één bericht te reageren in het geval van meerdere workers. Om het werkersysteem te laten werken, moeten we de prefetch-waarde instellen op ten minste het aantal werkers.

Er zijn vier waarden voor dit type:

  1. AckTypeAck : antwoordt met msg.Ack.
  2. AckTypeNack : antwoordt met msg.Nack.
  3. AckTypeReject : antwoordt met msg.Reject.
  4. AckTypeRequeue : stuurt het bericht terug naar de wachtrij en antwoordt met msg.Ack. Zie de sectie Berichten opnieuw in wachtrij plaatsen voor meer informatie.

Berichten opnieuw in wachtrij plaatsen

Met de AMQP-bibliotheek kun je Nack berichten en ze zullen worden afgeleverd bij de volgende beschikbare consument. Soms is dit niet wat u verwacht en heeft u het bericht nodig om terug te gaan naar het einde van de wachtrij en opnieuw te worden bezorgd. Met HareGo is het niet alleen mogelijk, maar u kunt het bericht ook manipuleren voordat u opnieuw in de wachtrij komt te staan. Hier is een voorbeeld van maximaal 5 keer opnieuw in de wachtrij plaatsen.

Tips en trucs

Meerdere verbindingen

Wanneer u deze bibliotheek gebruikt, maakt u meerdere harego.Client objecten aan om te adresseren verschillende aspecten van uw logica. Stel dat u uit één wachtrij leest en naar twee wachtrijen schrijft, maak drie objecten en bel de twee uitgevers binnen de gebruiker van de eerste:

Testbare code

Om het gemakkelijk te maken om de harego.Client , u kunt een functietype maken dat een nieuwe client en een fout zou opleveren. Je kunt een nepgenererende bibliotheek gebruiken om een ​​mock-up te maken van de harego.Client interface en deze gebruiken in je tests.

In het volgende fragment gebruiken we de mockery -bibliotheek om de spot te genereren, en getuigen bibliotheek voor het beweren van spot. U kunt natuurlijk de bibliotheek kiezen waarmee u zich het prettigst voelt.

Laatste woorden

Ga voor meer informatie naar de repository van de bibliotheek. Als je een functieverzoek nodig hebt of een probleem met de bibliotheek hebt gevonden, maak dan een probleem aan in de github-portal.

Geïnteresseerd om lid te worden van onze team? Ga naar deze link en solliciteer.

Geef een reactie

Het e-mailadres wordt niet gepubliceerd. Vereiste velden zijn gemarkeerd met *