Reservation

Once the payment callback is fired, we need to create a reservation entity in our booking database. The booking workflow will go through various stages, and this persistent entity will be responsible for maintaining the reservation status for all the microservices involved in the workflow.

The booking service is a REST API, and we will implement it using Gin (which we covered in Chapter 5, Going Distributed). The createReservation API, which is called during the Payment Callback, is defined in the following code:

func createReservation(c *gin.Context) {
var (
reservationDTO HotelReservationDTO
err error
)

if err = c.ShouldBindJSON(&reservationDTO); err == nil {
fmt.Printf("In createReservation : %+v ", reservationDTO)
err = persistReservation(&reservationDTO)
sendMessageToPerformBooking(&reservationDTO)
//return OK
c.JSON(http.StatusAccepted, gin.H{"status": "created"})
}

if err != nil {
// some inputs parameters are not correct
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
}
}

The HotelReservationDTO is a data-transfer object and represents an object that the client and server understand. It describes the details of the reservation:

type HotelReservationDTO struct {
CustomerId uint `json:"customer_id" `
PaymentIdentifier uint `json:"payment_identifier" `
SKU uint `json:"entity_id" `
RoomId uint `json:"room_id" `
CheckIn ReservationTime `json:"check_in" gorm:"type:datetime"`
CheckOut ReservationTime `json:"check_out" gorm:"type:datetime"`
}

You might be wondering why we used ReservationTime instead of the standard time.Time, well, ReservationTime is just a simple wrapper over time.Time and this is needed so that the encoding/JSON package can understand how exactly to serialize/deserialize the time. Currently, the package only accepts time in a specific format of RFC3339 (for example, "2018-11-01T22:08:41+00:00"), and this is unnecessarily inconvenient for us, where we want to give a date such as 2018-12-07. The workaround in Golang is the wrapper struct, as shown here:

const reservationDateFormat = "2006-01-02"

type ReservationTime time.Time

func (t *ReservationTime) UnmarshalJSON(bytes []byte) error {
rawT, err := time.Parse(reservationDateFormat, strings.Replace(
string(bytes),
""",
"",
-1,
))

if err != nil {
return err
}

*t = ReservationTime(rawT)
return nil
}

func (t *ReservationTime) MarshalJSON() ([]byte, error) {
buf := fmt.Sprintf(""%s"", time.Time(*t).Format(reservationDateFormat))
return []byte(buf), nil
}

From the preceding code, it functions as follows points:

  • It creates the reservation entity in the database using persistReservation().
  • It then sends a Kafka message to the Seller Proxy to actually perform the booking using sendMessageToPerformBooking().

For persistence, we will use MySQL as the relational database. To avoid boiler plate code, we will use an Object Relational Mapper (ORM), specifically gorm ( https://github.com/jinzhu/gorm ). The persistReservation() function is defined here:

func persistReservation(res *HotelReservationDTO) error {
// Note the use of tx as the database handle once you are within a transaction
tx := db.Begin()
defer func() {
if r := recover(); r != nil {
tx.Rollback()
}
}()

if tx.Error != nil {
return tx.Error
}

//TODO : Check that there is no overlapping reservation
if err := tx.Create(&HotelReservation{
CustomerId: res.CustomerId,
PaymentIdentifier: res.PaymentIdentifier,
SKU: res.SKU,
RoomId: res.RoomId,
CheckIn: time.Time(res.CheckIn),
CheckOut: time.Time(res.CheckOut),
Id: makeId(res),
Status: Initial}).Error; err != nil {
tx.Rollback()
return err
}

fmt.Println("created hotel reservation..")

// update the entry for availability threshold
var threshold AvailabilityThreshold
tx.Where("entity_id = ? AND room_id = ?", res.SKU, res.RoomId).First(&threshold)

fmt.Printf(" threshold = %+v ", threshold)
tx.Model(&threshold).Where("id = ?", threshold.ID)
.Update("availability", threshold.Availability-1)

// NOTE : availability is just a threshold for update here.
// Even if availability is 0, reservation is forwarded to the Seller
// And availability >0 in thresholds DB is not a guarantee of reservation
if threshold.Availability <= 1 {
// we have reached threshold
sendInvaliationMessageToPriceStore(threshold.SKU, threshold.RoomId)
}

return tx.Commit().Error
}

It kickstarts things by starting a transaction. This is important because we will be updating more than one table, and we need ACID semantics. The two main tables being updated are these:

Name

Utility

Schema

hotel_reservations

Details about a reservation, including the status.

availability_thresholds

Hold triggers—when the availability trigger is fired, the caches in the price store need to be invalidated, to avoid stable data.

 

The availability_thresholds table is one way to keep the price store caches fresh. After a few bookings are made, the booking service sends a message to the price store . The price store will then drop caches for which the availability threshold (the availability field) reached 0. Once fresh data is loaded, the price store service will send another message to the booking service, to update availability.

When the reservation is inserted, a key attribute is the status. In the preceding code, the status is in the initial state, but as the workflow progresses, then the status evolves. The status is modeled as an enum:

type Status int

const (
Initial Status = 0
BookingMade Status = 1
EmailSent Status = 2
)

The sendMessageToPerformBooking() sends a Kafka message to create_booking topic, to enable the next stage of the workflow: the seller proxy. This is explained through the following code: 

func sendMessageToPerformBooking(reservationDTO *HotelReservationDTO) {
log.Println("sending message to kickstart booking for ", reservationDTO)
bytes, err := json.Marshal(reservationDTO)
if err != nil {
log.Println("error sending message to Kafka ", err)
return
}

// We are not setting a message key, which means that all messages will
// be distributed randomly over the different partitions.
msg := &sarama.ProducerMessage{
Topic: "create_booking",
Value: sarama.ByteEncoder(bytes),
}
partition, offset, err := kafkaProducer.SendMessage(msg)
if err != nil {
fmt.Printf("FAILED to publish message: %s ", err)
} else {
fmt.Printf("message sent | partition(%d)/offset(%d) ", partition, offset)
}
}

The seller proxy microservice takes this message sent by the sendMessageToPerformBooking() and does the actual booking with the seller. The Seller Proxy code starts off by doing a few initializations, the main one being registering as a consumer on the create_booking topic.

We use the Sarama cluster (https://github.com/bsm/sarama-cluster) library to use the Kafka high level consumer API. The brokers heartbeat individual consumer instance and distribute partitions of the Kafka topic to healthy instances. The init() code is as follows:

func init() {
// setup config, enable errors and notifications
config := cluster.NewConfig()
config.Consumer.Return.Errors = true
config.Group.Mode = cluster.ConsumerModePartitions
config.Group.Return.Notifications = true

// specify Broker co-ordinates and topics of interest
// should be done from config
brokers := []string{"localhost:9092"}
topics := []string{"create_booking"}

// trap SIGINT to trigger a shutdown.
signals = make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

// connect, and register specifiying the consumer group name
consumer, err := cluster.NewConsumer(brokers, "booking-service", topics, config)
if err != nil {
panic(err)
}

// process errors
go func() {
for err := range consumer.Errors() {
log.Printf("Error: %s ", err.Error())
}
}()

// process notifications
go func() {
for ntf := range consumer.Notifications() {
log.Printf("Rebalanced: %+v ", ntf)
}
}()

//start the listener thread
go handleCreateBookingMessage(consumer)
}

The actual work is done by the handleCreateBookingMessage() function, which is spawned as a go routine at the end:

func handleCreateBookingMessage(consumer *cluster.Consumer) {
for {
select {
case partition, ok := <-consumer.Partitions():
if !ok {
panic("kafka consumer : error getting paritions..")
}

// start a separate goroutine to consume messages
go func(pc cluster.PartitionConsumer) {
for msg := range pc.Messages() {
var reservationDTO HotelReservationDTO
if err := json.Unmarshal(msg.Value, &reservationDTO); err != nil {
fmt.Println("unmarshalling error", err)
// Commit even on error to avoid poison pills
consumer.MarkOffset(msg, "")
continue
}

// make actual booking with seller - here :)

// update status in DB
updateReservationStatus(&reservationDTO, BookingMade)
fmt.Printf("processed create booking message %s-%d-%d-%s-%s ",
msg.Topic,
msg.Partition,
msg.Offset,
msg.Key,
msg.Value) // <- Actually process message here
consumer.MarkOffset(msg, "") // Commit offeset for this message
}
}(partition)

case <-signals:
fmt.Println("consumer killed..")
return
}
}
}

It listens to incoming Kafka messages and when a message is received, and it does the following:

  • De-serializes the payload
  • Makes the actual reservation with the seller
  • Updates the status in the hotel_reservations DB using updateReservationStatus(&reservationDTO, BookingMade)
  • Marks the message as read

Although not shown, on a successful booking, it will also put a trigger message for the mailer service, which will notify the customer of the successful booking.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset