Building Resilient and Scalable Architectures

The GoDaddy hosting engineering team develops and supports a wide-range of products that host more than 10 million websites. These products include shared hosting, dedicated servers, virtual private servers, and a managed WordPress offering that span thousands of servers. So, how do we make new APIs available for these hosting products to consume in a way that’s both resilient and scalable?

Our team is in the process of implementing a new architecture for hosting products that provides all of the common web services required by new products under development. These web services provide integration with third-party vendors as well as with other GoDaddy products such as domain management, DNS, and SSL certificate management. This new architecture, which we call Hosting Foundation Services (HFS), exposes an API to these web services via a common REST endpoint. Each of these web services is implemented as a micro-service with a single concern that is task-oriented and is independent of any other web service. We also use a shared nothing architecture to avoid introducing dependencies between web services and to reduce complexity.

The initial design for such a product was straightforward enough: stand up a cluster of reverse proxy servers behind a load balancer that can proxy the various API calls for each individual web service, and set up enough application servers to handle the expected load and provide for some redundancy. However, we had some additional constraints that influenced how we designed such an architecture:

  • As each product that would eventually use the HFS architecture began to take shape, the need for additional APIs were identified. We needed to be able to add web services in a dynamic fashion that didn’t require reconfiguration of our reverse proxy servers.
  • We didn’t know exactly how many application servers we would need for each web service, and so we needed a scalable solution that would allow us to add application servers dynamically (and in the future, automatically) to meet demand.
  • We knew that as more and more products depended on the HFS architecture, it needed to be resilient to software, hardware, and network failures in a cloud environment.
  • Developers like having a swagger interface available for each web service, and we wanted to expose a consolidated swagger interface for all of the available APIs at a single location.

Our new hosting products are cloud-based, and internally we use OpenStack as our virtualization technology. Therefore, it was no problem to construct the initial tooling that created a VM, applied puppet updates, and installed any one of the arbitrary web services under development. While this approach was adequate early on in the development process, it didn’t scale in practice. As servers were created manually, it became cumbersome to keep track of which one was associated with each of the various APIs under development. We weren’t sure how many servers we would need to spin-up for any given web service and even if we knew how many servers to provision, we still needed something that would scale the solution automatically for each of the different APIs we wanted to provide. Enter the Master Control Program, or MCP.

MCP was conceived as an orchestration tool that could provision and de-provision application servers automatically to scale with the expected load on the VMs servicing the APIs. Orchestration technologies such as Kubernetes or Docker Swarm provide similar capabilities when using container-based solutions, but we needed MCP to perform similar orchestration functionality on our VM-based implementation. While we expect to eventually migrate some of these web services from standalone VMs to containers, we will still need MCP’s ability to provision additional VMs as Docker hosts in order to scale-out the environment to meet demand.

VM Factory: Speeding up the Server Deployments

During the process of automating server builds, we found that applying system updates and Puppet were taking a considerable amount of time to run. So, we implemented a “VM Factory” process that can do these steps ahead of time and provide a ready pool of VMs that could be used to quickly spin-up additional needed application servers for a specific web service.

Service Registration: Where was that API Server again?

Each web service retrieves configuration data from a ZooKeeper cluster. And for service registration and discovery, each web service registers its location in the ZooKeeper cluster using ephemeral znodes. That way, if the VM hosting that web service dies the service registration data in ZooKeeper is updated automatically. On the reverse proxy servers, a daemon process uses the service registration data in ZooKeeper to construct nginx configuration directives that expose each API via its own URL endpoint. As web services are added or removed, the reverse proxy configuration in the nginx configuration is updated automatically. The ephemeral nature of the service registrations ensures that the reverse proxy configuration is updated for any VMs that are removed. Since the daemon is already handling the creation and removal of service registrations, it can also combine the swagger specifications provided by each web service to present a consolidated specification to the end-user that will always be current.

scalable-architecture

Making Sure Supply == Demand

Now that we had addressed the requirement to have a dynamic number of web services and application servers, we still wanted the ability to automatically grow or shrink certain portions of the infrastructure based on any number of variables such as demand, health of various system components, and failure of various components. To do this, MCP compares the number of running instances of each web service to a configurable value and either starts or terminates the required number of instances for each web service to ensure that the proper number is running and that all members are healthy. This provides nicely for automatic remediation in the event a server should fail as MCP will detect a mismatch between the configured number of instances and the number of running instances, and then start a new VM when necessary.

Rethinking Upgrades

Software upgrades of a web service are also handled by MCP by specifying how many instances of the new version should be running. MCP will ensure that the specified number of instances is started, and can verify the health of those web services by checking for the expected service registration information to be present in ZooKeeper. Depending on the upgrade scenario, the old version can be de-configured and removed, or can be left configured if two versions of the web service should be left running concurrently for A/B testing or for performance comparison purposes.

Auto-Scaling to Meet Changes in Demand

Now that the count of application servers per web service is configurable, we have the opportunity to make the system scale as needed by measuring things like web traffic and system load, and by adjusting the application server counts automatically. For web services that experience increased traffic, MCP would ensure that additional VMs are spun-up and brought online to meet the higher demand. This process can be extended to cover other parts of the infrastructure as well. For instance, to deal with an increasing amount of traffic, the pool of reverse proxy servers running nginx could be scaled dynamically. And when combined with Load Balancer as a Service (LBaaS), MCP would be able to provision additional reverse proxy servers as needed and automatically add them to the load balancer configuration.

Conclusion

MCP has allowed our Hosting Foundation Services (HFS) to scale with application servers while web services are being added dynamically. Developers and end-users have the swagger specification they need and the access control is simplified by having a single endpoint. We’ve learned to think differently about how the infrastructure is constructed by taking a second look at each part of it and asking, “Is this something that can be provisioned automatically as needed?”

The concepts introduced by MCP could be further realized by bootstrapping a complete environment with only a ZooKeeper cluster and an instance of MCP. In this case, the MCP could build out the originally empty environment as needed, including reverse proxy servers, application servers, and any other server type that can be orchestrated via Puppet. Once that happens, the whole environment can be recreated, making MCP an attractive option for developers to replicate a complex environment with minimal effort.

Join us!

If you’re interested in working on challenging projects like this, GoDaddy has several positions available. Check out the GoDaddy Jobs page to learn about all our current openings across the company.

Seamlessly Connecting Domains to Services with Domain Connect 2.0

Domain names are meant to be used. At GoDaddy, we want to empower our customers to build their dreams, and one way we can measure that is by seeing the number of active domains that are out there. Thus, any technology or product that promotes usage is a good thing, as active domains also mean renewals. With so many services available online that need domain names and the Service Providers who offer them (including, for example, Wix and Squarespace for easy website creation or Microsoft’s Office 365 Email offerings) providing an easy way for our customers to attach domain names to services is a huge win.

What’s the Problem?

Often, configuring just the DNS for a domain to be used by these services can get complex, especially for novice users. If I had a dollar for every time I had to explain to a friend or family member what an “A Record” is, and why they need to change it, I’d be flush in ice cream. Simply explaining the reason that a domain needs to be connected is enough to turn a customer off to doing so. Having a simple way for a customer to “connect” a domain name to a service – or even multiple services – without having to worry about the technical aspects is critical. In other words, presenting customers with opportunities to use their domains without exposing them to the confusion of the underlying infrastructure creates a seamless experience and an expectation that things will “just work” every time.

As that expectation becomes mainstream, future developments in the Internet of Things make the implications even more staggering. Imagine if every home cable router came with an easy way to purchase and configure a domain name for the connected home? Or if the current Augmented Reality craze (as seen with Pokémon Go) let you purchase and configure a discoverable domain for your involvement.

Domain Connect 2.0 is a simple way to make the connecting of domains to services as seamless and easy as possible. Through a standardized protocol, implementation at both the Service Provider level as well as the DNS Provider level is consistent and predictable. Further, using templates to accomplish the configuration, DNS providers can ensure that changes have a lesser chance of causing problems or exposing unregulated DNS record manipulation to the outside world. Domain Connect 2.0 is a GoDaddy innovation that has been submitted to the IETF as an open standard, and our implementation, as a DNS Provider is underway.

Domain Connect has been implemented as a simpler synchronous web-based offering already, and the 2.0 version expands the concept with a better authorization strategy as well as an asynchronous REST-ful API. In this blog entry about the new version, the 2.0 is implied. Or, implied val domainConnect: Double = 2.0 as we say in Scala.

So What Are Templates, Anyway?

I called out templates as a benefit, but what are they and why do we use them? Templates solve a critical problem in configuring DNS: unregulated changes to the DNS and a lack of predictability are bad. In the simplest implementation, a protocol for making changes to the DNS could simply say, “tell me what records you want me to create,” and a Service Provider could pass in information that essentially says, “please create an A Record with a host name of ‘www’ and a value of ‘192.168.42.42’.” This would work, of course, but could cause some foreseeable problems. What if there were already a host with that name? What if that name was reserved? What if the host name requested was known to cause problems? While these issues still exist in the world of templates, it is easier to know ahead of time that there could be a problem and simply not allow such things in a template. Put another way, templates can have the host names pre-defined and sanitized by both the Service Provider as well as the DNS Provider. In fact, creating a human touch-point in onboarding new templates means that someone can eye-ball them for such problems. Once configured, a template would prevent a Service Provider from making arbitrary changes to DNS, either accidentally or deliberately.

Without templates a Service Provider could request the addition or modification of a DNS record of just about any supported type and restrictions would be difficult, at best. With templates, however, if a DNS provider doesn’t wish to allow a specific (or any) Service Provider to create, say, TXT records, the DNS provider can simply not allow them in any templates. Or they can make per-provider exceptions by simply requiring all templates be reviewed and approved before being made available for use.

Finally, templates allow for predictability. A template suitable to setting records sufficient to point to a web hosting provider could be used consistently across all connecting domains. The chances of a Service Provider slipping in other record changes are reduced to zero since those records simply don’t exist in the “host this domain’s web site” template. Standard templates to accomplish common configurations can even be shared between Service Providers, making DNS Providers’ jobs easier. Overall, everyone gets a predictability and reliability boost.

In other words, templates are never gonna give you up, never gonna let you down, and never gonna run around and desert you. They are never gonna make you cry, never gonna say goodbye and they are never gonna tell a lie and hurt you.

Now that I’ve thoroughly convinced you that templates are awesome, how do we use them? While the internal implementation of templates is left up to the DNS provider, a standard JSON format is specified to make implementation consistent for all providers (and will encourage the sharing I mentioned just now). Indeed, a central repository of templates is noted as an improvement for further versions of the specification. Each template is identified by the provider and given a unique ID comprised of a composite key that identifies the Service Provider and the service being offered. Templates contain information suitable to making DNS changes. This is done through an array of records or actions for the DNS provider to modify or enact. Records note the DNS record type and any values to modify, including the ability to pass in dynamic data (such as an IP address for an A Record). DNS providers can check templates for suitability to purpose or policy before making them available to be used and can work with service providers to refine templates.

And as I mentioned, by making this a standard, we allow Service Providers to re-use templates across multiple DNS providers. Similarly, DNS providers can make a template suitable to a particular purpose available to multiple service providers, saving time and effort.

Discoverability

When a customer wishes to connect a domain, the service provider needs to know who the DNS provider is. To do this, Domain Connect specifies a TXT record be added to the DNS for a domain that specifies a URL that can be called for discovery. The service provider queries the domain for this TXT record (called “DOMAIN_CONNECT”) which, if present, indicates that the domain is served by a DNS provider that supports the Domain Connect protocol. Given the URL, a service provider can call a API endpoint for protocol discovery:

GET v2/{domain}/settings

This will return a JSON structure that contains settings for Domain Connect on the domain name specified, including the provider name and URLs to the two main methods of using Domain Connect. Rather than bloat the DNS with this record, we implement our DNS to inject it in all applicable requests, which also allows for rapid change if necessary without modifying a massive number of DNS zones.

Two Ways to Get It Done – Way the First, the Synchronous Web-Based Flow

Domain Connect provides two ways to get the job of connecting a domain done. The first is via a one-time synchronous web-based HTTP call. This flow is for service providers who want a one-time change to the DNS, and is very similar to how our first version of Domain Connect works. A user identifies the domain they wish to connect and the service provider determines the DNS provider through the discovery process. Once ensuring that the DNS provider supports Domain Connect, as demonstrated previously, the service provider simply calls a known URL and passes in the information necessary to configure a domain to their specification.

v2/domainTemplates/providers/{providerDomain}/services/{serviceName}/apply?[properties]

So a typical call might look like:

https://webconnect.dnsprovider.com/v2/domainTemplates/providers/coolprovider.com/services/hosting/apply?www=192.168.42.42&m=192.168.42.43&domain=example.com

This call indicates that the Service Provider wishes to connect the domain example.com to the service using the template identified by the composite key of the provider (coolprovider.com) and the service owned by them (hosting).  In this example, there are two variables in this template, “www” and “m” which both require values (each requires an IP address).  These variables are passed as name/value pairs in the query.

Once on the website of the DNS provider, the customer is asked to authenticate and give permission for the DNS changes to be applied. These changes to the DNS are then done via templates, which ensure that the DNS records to be applied are both already known as well as properly constrained. Once the changes are made or any errors handled, the customer is optionally redirected back to the service provider’s site, providing confirmation that all went well (or an indication of an error is passed).

web-based-flow

Way the Second – The Asynchronous API Flow

The second connection method (and new in Version 2) is an OAuth-based flow combined with a RESTful API. This is intended for service providers who want to make DNS changes asynchronously or with use cases that require multiple steps. This flow begins like the synchronous flow in terms of authenticating the customer, but instead of actually applying any DNS changes, the calling service provider is given an access token allowing them to call API functions to apply or remove a DNS change template (or even multiple templates).

This permission gives the service provider the right to apply (or remove) specific templates for a specific domain owned by a specific customer. The service provider may retain this token to apply or remove the template at any time during the token’s lifetime (or until the customer revokes permission, of course).

Service providers who want to use this flow register as an OAuth client with the DNS provider by both giving the templates that will be used as well as callback URLs that specify where customers are redirected after OAuth authorizations are done. Customers are authenticated much like the web-based flow and after giving permission to apply a template to a domain an OAuth authorization token is issued. This token can be used to request or renew a specific access token to perform API calls. The access token is passed in the Authorization Header of API requests.

The API is very simple, and contains endpoints to apply a template, remove a template, or revoke access.

Applying a template is done to a single domain and the domain is part of the authorization. While the provider ID and service ID are also implied in the authorization, these are on the path for consistency with the synchronous flows. If not matching what is in the authorization, an error is returned. The API endpoint should look familiar:

https://connect.dnsprovider.com/v2/domainTemplates/providers/coolprovider.com/services/hosting/apply?www=192.168.42.42&m=192.168.42.43

Since this is an API call, HTTP response codes are used to indicate success or error.

Reverting a template is a similar API call, but under the hood there is a bit more going on, in that the DNS provider has to ensure that the template had been previous applied (you can’t revert what you’ve not actually done!) and that doing so won’t break anything else. The specification leaves such implementation details to the individual DNS providers.

Interesting Alternatives

Another possible flow has the initiation of the connection coming from a DNS provider, such as suggesting to a customer that they might want to connect their domain to a partner service provider. In this case it’s entirely possible that the whole process be done at the DNS provider’s site and the service provider either need not be called or perhaps could be called just to notify them of the connection. Some DNS providers are, in essence, also Service Providers as well, and Domain Connect can be used 100% internally, making things much easier for customers to have a consistent experience.

In cases where a template would have dynamic elements, the flow could still be initiated by the DNS provider but then handed off to the service provider to inject the appropriate variables and the flow continue as usual.

Future Developments

While this is a 2.0 version, it is really the first iteration of this specification that connects all the dots for templated connection of domains between service providers and DNS providers. During its development there were many ideas put forth for future development. Once adopted by industry, Domain Connect has the promise to make the configuration of domains much easier for customers, driving innovation and adoption.

Come Innovate With Us

GoDaddy’s Domain Connect 2.0 is a new innovation and we are looking for awesome engineers to help us build out more cool features for our customers. If you are interested, come join us at GoDaddy Careers.

Delivering the Power of the Cloud to Small Businesses

A few years ago, GoDaddy adopted OpenStack as the software used to run our internal cloud. As a result, our ability to execute and build products has increased tremendously. It has helped change the culture of how we develop and architect products, as well as enabled us to leverage automation across our infrastructure to ensure we meet the demands of our customers. The next logical progression for us was to give our customers access to the same power and scale that GoDaddy uses to run it’s own business, but in a simple and easy to consume product. Enter GoDaddy Cloud Servers. Our goal was to create a simple to use product, powered by a consistent and intuitive API that empowers developers, IT professionals, and small business owners to get up and running quickly and easily. To delight our customers we understand that our system has to scale, be highly available, and provide industry leading performance. This article will discuss some of the technologies and tools we used to achieve our goals with the GoDaddy Cloud Servers provisioning system.

In order to make the product intuitive and easy to use we decided to expose our own API as a shim layer in front of the OpenStack APIs. The thought process behind this is that it allows us to keep the API consistent with other GoDaddy APIs that are exposed externally and to remove some of the complexity involved in learning and consuming the OpenStack APIs. To address the consistency of our APIs, we have adopted Swagger-API as the framework for describing all of the APIs at GoDaddy. This allows developers to easily consume our various products since they all follow a consistent convention. OpenStack has grown quickly over the years and their APIs are not all that consistent. There are also multiple ways to accomplish the same task (Nova versus Neutron and allocating/assigning floating IPs are good examples). We wanted to eliminate this for our end-users by building an API that follows the Unix philosophy of “Do one thing and do it well”.

One of the cool things about GoDaddy Cloud Servers is that it runs on the same platform that it exposes. We are truly eating our own dog food with this product. All of our infrastructure runs on our private OpenStack cloud, while our customers live on our public cloud. As mentioned above, one of the goals we set out to achieve was a product that is highly available and horizontally scalable so that we we can meet our customer’s ever changing demands. To achieve this, we decided to use Apache Kafka at the center of our cloud provisioning system. By using Kafka, we get a centralized messaging system that is scalable, distributed, and durable. Kafka is also built on top of Apache ZooKeeper, which provides many features that are helpful in building distributed systems. GoDaddy Cloud Servers leverages both Kafka and ZooKeeper. Below is a high level architecture diagram of our system:

image00

The main takeaway from the architecture diagram is that we use Kafka to decouple the back-end services from the front-end API. All requests that hit the API end up becoming JSON messages that land in Kafka and are consumed by the back-end services that interact with OpenStack. We also take advantage of OpenStack’s Ceilometer Notification Service to track changes that occur on the OpenStack side by feeding them back in to the same Kafka cluster used by the provisioning system. This lends itself to a much better event-driven architecture versus polling OpenStack for state transitions.   

Kafka has some key features that allow us to scale out our provisioning system horizontally. First, it guarantees to replicate messages across the cluster. This means that if a node in the cluster goes away, no messages are lost. Kafka also provides partitioning across topics, which allows us to distribute incoming messages to specific partitions. What this buys us is back-end services that can target specific partitions allowing us to distribute the load of processing messages across multiple nodes. To keep things simple, we have a dedicated VM for each partition in our topic. As the load increases, we simply add a new partition, update our partitioning logic (think buckets), and spin up a new node to consume the new partition. We can do this as many times as needed to scale out. In the future, as we bring on more data centers, we can shard the data appropriately to the specific DC for processing allowing us to scale out indefinitely.

At this point, we have a highly scalable system, but what if something fails? Our backend services come in two flavors. For the first type of service, we only want one instance running at any given time, but we always want one running. And for the second flavor of services, we want one running on every application server, but only processing data that belongs to it. To solve the first problem we leverage Zookeeper’s distributed locking mechanism. The distributed locking mechanism allows us to run our services on every application server, but only have one actively working at any given time. If the active node fails, ZooKeeper will detect it is gone very quickly and allow one of the standby services to acquire the lock and start processing. The second flavor of service is responsible for processing data for the primary partition that it is reading events from in Kafka. The problem with this approach is that if a particular app server goes down, we have lost the node responsible for the corresponding partition. To get around this, each service is configured with a failover partner that will assume responsibility for the ailing node’s partition until it is brought back online. Again, this is done using ZooKeeper and distributed locks. However, in the future, we are going to be experimenting with consumer groups now that they are available in the latest python Kafka libraries.

Overall, we are very pleased with the performance and uptime of our provisioning system. Both Kafka and ZooKeeper have proven to be very stable and relatively easy to maintain. More importantly, it has been extremely easy for us to recover from planned and unplanned outages that naturally occur in “cloudy” environments.

GoDaddy Cloud Servers is a new product and we are looking for awesome engineers to help us build out more cool features for our customers. If you are interested, come join us at careers.godaddy.net.

 

Taming Cassandra in Node.js with Datastar

by Jarrett Cruger and Charlie Robbins

Photo of an album

At GoDaddy, Cassandra is a core system in our infrastructure. Many teams leverage it to ensure redundancy, reliability, and reduce the probability of downtime. With many teams also using Node.js, we had several experiments with different patterns leveraging Cassandra such as a fluent wrapper around the various drivers that existed. After exploring the Node.js ecosystem, we realized there was a need for a robust, proper Object Data Mapper (ODM) for Cassandra that could define, manage, and use Cassandra tables in our applications and comprehensive validation. It needed to be simple and usable while still offering enough flexibility for the power user including control over batching, compound partition keys, and using streams where appropriate. From these requirements, we built datastar to eliminate the need for redundant work around data modeling and statement creation in Cassandra. We are happy to announce that datastar is now available via Open Source under the MIT license.

Mapping the Cassandra Data Model into JavaScript

Models in datastar are vanilla JavaScript constructors with prototypes as you might expect. One of these constructor objects that represents a model is returned from calling datastar.define() with the name, schema, and any other options needed to configure this model.

We support defining read/write consistency, automatically creating tables, and any additional modifications needed when the table is created.

How do we Define Schemas?

When working with Cassandra and most schema-based databases, there usually is some kind of dialect or syntax used to define the name of the table, the properties, and types in order to keep the data fetched predictable. We use this explicitly for table creation and then our table can start accepting data of that form. We wanted something flexible that could be extended without being completely declarative, like a json-schema. Thankfully there was already a feature-rich option available: joi.

joi is a well-known validation library used in the hapi http framework written originally by Eran Hammer. The goal of joi is to provide an expressive and clear way to define a schema that receives data to validate. It does so with the terseness and flexibility you would expect in JavaScript.

In order to make joi work with Cassandra, we needed to blend its functionality with the specific types and other concerns of Cassandra and Cassandra Query Language (CQL). This need became joi-of-cql thanks to the work of some GoDaddy engineers – Sam Shull and Fady Matar. Let’s look at a sample Artist Model below where we define our overall schema using schema.object() which returns our extended joi-of-cql instance and accepts properties just like any vanilla joi instance.

//
// This `schema` object that we are grabbing is an alias to joi-of-cql module.
// We extend joi and create functions that represent the cassandra types for
// proper validation. The cql object represents the individual types that use
// joi under the hood.
//
var cql = datastar.schema.cql;

var Artist = datastar.define('artist', {
  schema: datastar.schema.object({
    artist_id: cql.uuid(),
    name: cql.text(),
    create_date: cql.timestamp({ default: 'create' }),
    update_date: cql.timestamp({ default: 'update' }),
    members: cql.set(cql.text()),
    related_artists: cql.set(cql.uuid()).allow(null),
    traits: cql.set(cql.text()),
    metadata: cql.map(cql.text(), cql.text()).allow(null)
  }).partitionKey('artist_id'),
  readConsistency: 'one',
  writeConsistency: 'localQuorum',
  with: {
    compaction: {
      class: 'LeveledCompactionStrategy'
    }
  }
});

These properties are then defined using the cql variable that hangs off our joi-of-cql object that is attached to datastar. All functions on our cql object are what we use to define the very specific cassandra types using joi’s dialect to do so. joi gave us the primitive types that we used to build higher level types specific to Cassandra and its capabilities as a database. We also have very specific functions on our joi-of-cql schema instance that act as setters for our schema, e.g partitionKey(‘artist_id’). Without the help of joi, we would not have such powerful expression in such a terse format.

Extending the Prototype, Smart Models

The power of datastar comes from how we utilize the JavaScript language itself. We have mentioned that datastar.define returns a constructor function that represents the configured model, and because it is a vanilla constructor function, we can define prototype methods on it. These application-specific prototype method extensions are then available for use on any object returned from the find methods of datastar. Let’s look at our Artist Model once again and see how we would extend its prototype and use it.

var Artist = datastar.define(‘artist’, {
  schema: // … see above example
});

Artist.prototype.fetchAlbums = function (callback) {
  Album.findAll({ artistId: this.artistId }, callback);
});

Artist.findOne(artistId, (err, artist) => {
  if (err) /* handle me */ return;
  
  artist.validate() // returns true
  artist.name = ‘Pink Floyd’ // this operates as a setter
  artist.save((err) => { // Save calls `validate` implicitly
    if (err) /* handle me */ return;
    console.log('We have saved our model!')
    artist.fetchAlbums((err, albums) => {
      if (err) /* handle me */ return;
      console.log(‘we have all the albums!’);
    });
  });
});

This ability to extend models with more functionality is often referred to as “fat models”. The simple cases above allows us to put more data centric logic on the data to prevent the need to write redundant logic using just the model constructor. By extending the prototype, we leveraged an already available Album Model and linked it directly to the Artist Model, which is most useful for embedding the common path queries onto the data model that is most likely to be there automatically.

Modularity, Reusability, and Micro-Services

“Configuration as code, models as modules”

At GoDaddy, we care about modularity within our software. We want to make our software more testable and encourage more re-use. The same goes for interacting with a database. The abstraction needed to fit so we could share the same models within multiple services that talk the same tables. In addition, given that we are defining our tables in code and not executing straight CQL, we need to be able to have a module that can be used for a simple management tool as well as something we can integrate with our various services. What would this look like?

album.js

module.exports = function (datastar, models) {
  var cql = datastar.schema.cql;
  var Album = datastar.define('album', {
    schema: datastar.schema.object({
      album_id: cql.uuid(),
      artist_id: cql.uuid(),
      name: cql.text(),
      track_list: cql.list(cql.text()),
      song_list: cql.list(cql.uuid()),
      release_date: cql.timestamp(),
      create_date: cql.timestamp(),
      producer: cql.text()
    }).partitionKey('artist_id')
      .clusteringKey('album_id')
  });
  return Album;
}

We now have a simple function wrapper that accepts a datastar instance and then returns the model constructed. This allows models to be easily decoupled, which in turn allows that group of models to be decoupled from an application. For example, we could use the same pattern for our Artist Model so we can put them together in our models module.

model.js

var models = module.exports = function (datastar) {
  return new Models(datastar);
};

function Models(datastar) {
  this.Album = require('./album')(datastar, this);
  this.Artist = require('./artist')(datastar, this);
}

Decoupling models allows multiple decoupled micro-services backed by the same entities to share the same code. For example, consider a service for various types of data associated with music. There could be a service that acts as the main write pipeline for these entities and a second service that reads the data to be dumped into a machine-learning pipeline t integrating with the number of listens per album, per song, etc. The possibility of how we interact with our data has opened up through very simple abstraction.

Range Queries

Beyond promoting modularity, datastar has first-class support for more advanced Cassandra features like range queries, which is the most efficient way to access a series of data. Consider again our example of artists and albums. Each artist in this case has 1 to N number of albums that can be fetched out of the database with a single range query.

It is a best practice to store data in this way as it creates a very specific hierarchy based on the partitionKey. The partitionKey is similar to a primary key used to decide where in the Cassandra cluster a particular record is stored. When we have both a partitionKey and a clusteringKey in a model, we are expecting to have multiple rows per a single partitionKey. By having the Artist and Album Models implement this best practice, the range query below is exceptionally fast. This is because all the records live on a single partition and are ordered on the disk based on the clusteringKey.

var Album = datastar.define('album', {
    schema: datastar.schema.object({
      album_id: cql.uuid(),
      artist_id: cql.uuid(),
      name: cql.text(),
      track_list: cql.list(cql.text()),
      song_list: cql.list(cql.uuid()),
      release_date: cql.timestamp(),
      create_date: cql.timestamp(),
      producer: cql.text()
    }).partitionKey('artist_id')
      .clusteringKey('album_id')

Streams as First-Class Citizens

Given our model, if we assume we have inserted a handful of albums for a particular artist at artistId, let’s fetch them all and write it to disk as JSON. To do this we are going to use the streams API that is exposed by the find* functions of a datastar model.

var fs = require('fs');
var stringify = require('stringify-stream');
Album.findAll({ artistId: artistId })
  .on('error', function (err) {
    /* handle me */
  })
  //
  // Turn objects into a stringified array and write the json file to disk
  // without buffering it all in memory.
  //
  .pipe(stringify({ open: '[', close: ']' }))
  .pipe(fs.createWriteStream(albums.json'))

Here we were able to create a very sleek pipeline that fetches the data, and simply strings it together without loading it all into memory and writes it to disk. If you are not familiar with streams, think of it as an array in time rather than in “space” or memory. They allow us to make our web services less memory hungry when we do not have a need to process an entire collection of records at once.

If we needed to modify any properties or whitelist part of the object before serializing it to disk, we could have added a transform stream before we “stringified” the objects. Think of streams as first-class citizens in node an http response is a stream, a TCP socket is a stream or a file stream as we are using here. You get a lot of power and composability when using streams so it makes sense to make them first class citizens in datastar.

To Batch or not to Batch

One Cassandra feature that was essential to a small subset of use cases at GoDaddy is batching. Cassandra supports the ability to batch any write operation on the database. The intention is for the entire batch of operations to act as a single atomic operation on the database. This is a very useful property when you need to make sure multiple records stay in sync with one another. You want all of the statements to either fail or succeed and not to leave you in an undetermined state. Batching is especially useful when there are associated properties that need to be updated on multiple models as part of a single operation. We take full advantage of this feature in our implementation of datastar and it plays a critical role in our Lookup Tables implementation.

While this is an extremely useful feature of Cassandra, it is only for very particular scenarios. Abuse of batching in Cassandra can lead to serious performance implications that will affect not only your application, but also the entire Cassandra cluster. This is due to the atomic nature of a batched operation on a distributed database as it needs to go through a coordinator node. Too many batch operations or batch large sizes create excess strain on this coordinator node and can degrade the overall performance and health of the system. For operations that do not require this batching functionality, we recommend passing a number as a strategy if you are using our statement building feature.

Looking Forward as Open Source

This project has evolved considerably since its inception and this release marks the next stage: a stable and robust ODM for Cassandra. Features include:

  • Model validation with joi
  • Vanilla prototype Models to promote modularity
  • Range Queries with streams as first class citizens
  • Batching updates on multiple models

In addition to the above there are a number of ways that datastar takes advantage of Cassandra’s features that are worth digging into if this topic interests you:

Now that datastar is available to the Open Source community, we have a simple ask for you – try it out! Dig into open issues, ask questions, and if you can spare some of your valuable time please contribute. We are exploring new and innovative ways to use datastar and value input and diverse points of view to make that happen.

Join us!

Interested in working on this or solving other hard technology problems to help small businesses be more successful? If so, we are growing our team. Check out the GoDaddy Jobs page to learn about all our current openings across the company.

Photo Credit: John Donges via Compfight cc

Accelerate JSON in Spark SQL

spark

GoDaddy has heavily invested in the Hadoop ecosystem to manage much of our data needs. We have multi-petabyte clusters across the US, a modern data lake, and JSON events flowing everywhere.

Our production data warehousing jobs are written using PigHive and a handful of legacy Map/Reduce jobs. These are high level languages that have traditionally compiled down to a set of Hadoop map/reduce jobs for execution. Battle tested for years, they’re stable and reliable, but can be slow for development and execution.

Alternatively, Spark was developed at the UC Berkeley AMPlab with a slightly different goal in mind: fast, iterative distributed computation over data that was stored primarily in memory. The API was easy to use and popularized first-class functions on a Hadoop cluster. While it is fairly similar to high-level languages like Pig or Hive, there is no new syntax to learn. Spark programs are written in popular languages including Scala, Python and R.

Spark SQL builds on Spark with a powerful optimization system and relational APIs (ala SQL), perfect for querying almost anything that resembles a tabular data source. Including our JSON event streams, if you squint just enough.

More capabilities

Spark SQL added support for JSON quite a while back with the release of Spark 1.1. While it wasn’t game changing, it did simplify things for those of us committed to JSON.

One of my first experiments using Spark SQL was a little tool that measured storage costs of compressed JSON vs. typed Parquet files. It was just a handful of lines of Scala, an excellent demonstration of the power of Spark SQL. The runtime was much slower than I had anticipated however, and profiling confirmed that there was room for improvement. But it was good enough at the time, and I left good enough alone.

Replacing the foundation: Spark 1.4

Fast forward another year or so and we have a massive amount of JSON events streaming through an equally massive Kafka bus, and an ever increasing number of Spark and PySpark applications are running in production. While many jobs are well served by datasets that “cooked” from the raw JSON periodically (among other sources), others need to access the real-time event stream or to information ignored by the scheduled batch conversions.

Around the release of Spark 1.3, I started working on a small research project to find optimization opportunities with our infrastructure. One of our large computing grids runs a mix of long running jobs that have very different resource utilization patterns. Some are I/O heavy, others are CPU heavy, and they all have variable but predictable changes in behavior throughout the day and week. These jobs are randomly assigned to machines across the grid, causing some resources to be locally oversubscribed even though the grid has excess capacity.

We’ve been collecting operational metrics about these jobs for quite a while. CPU time, network utilization, disk IOPS and bandwidth, memory utilization, among others. Very useful. Exactly what we’d like to aggregate and feed into a multidimensional bin packer to search for better static job placement.

But there was a rub: disk IOPS were missing from the cooked dataset. Since this was a research project, it didn’t seem appropriate to hack up the production job. Besides that, it processed a lot of fields that I didn’t need and at a resolution much greater than what my prototype optimizer would be able to handle. So I turned to Spark SQL for my JSON handling needs, primed with a few hundred terabytes worth of small event payloads. I found that even though it had received a handful of improvements to it’s JSON handling since Spark 1.1, one thing near to my heart (and quota) remained unchanged: performance.

Now seemed as good a time as any to finish what had been tempting many months ago.

Under the hood

The JSON parsing in Spark SQL had been based on Jackson’s very popular ObjectMapper. It’s easy to use and is fast for what it does. However, it assumes you have both a JSON document and a Java class that provides the schema. As Spark SQL schemas don’t have a corresponding class, it requested a generic representation: java.util.Map[String, AnyRef]. Alternatively it could have requested a more natural class: JsonNode. But neither is natively supported by Spark SQL, and a few transformations are needed before they can be used. Besides being a rather roundabout way to do the parsing, it had some other effects we’ll talk about later.

Dropping down a few layers of abstraction below the ObjectMapper is Jackson’s streaming API. It generates a stream of JsonToken values that represent (unsurprisingly) the tokenized JSON document. It is less convenient to work than an object model, but it enables a very direct conversion to to Spark’s internal schema and rows, and generates a lot less garbage along the way.

The main casualty of the stream representation is, well, pretty much the remainder of the code. Converting trees (an object model) to trees (like a schema) no longer works when the input is a stream. So, we’ll rely on the classic way of converting streams to a tree: a recursive descent parser.

Recursive descent parsers aren’t new or novel, but are popular for a reason…. they’re easy to implement, understand, and debug. However, they’re best for languages that require limited (or no) ‘lookahead’. Lucky for us, Spark SQL’s datatypes fall into this class. So what are they exactly? Wikipedia has a good overview, but the basic idea is that each production in the language is implemented by a function, and these functions can call each other (they are mutually recursive) to handle different types. We will end up writing a couple of these.

JSON schema inference

The first step along the way is to perform schema inference. For a single JSON document, we’d like to figure out what the types are of each field. And since we’re talking about Spark, we can safely assume there are going to be a whole lot of documents… we will need some way of inferring a single schema for an entire RDD[String] full of JSON.

Conceptually this process is identical to before. Schema inference will still take an RDD[String] and return a StructType. Similarly, parsing needs an RDD[String], a StructType, and to produce a DataFrame. However, the new implementation is nearly a complete rewrite that takes complete advantage of the JSON token stream.

Making it a little more concrete, lets define inference as a map-reduce job:

  • The map function performs runs inference on a single document: String => StructType
  • A reduce function combines any two schemas into one: (StructType, StructType) => StructType
    • This function needs to be commutative in order to be safely used in reduce or aggregate operation
    • Which forms a commutative monoid with an empty structure as the identity: ∅ = StructType(Seq())

Single document inference

Single document inference is very straightforward. We merely need to map Jackson’s token types to Spark SQL’s data types. The token value is never accessed, the type alone is usually sufficient for schema inference. This saves additional overhead.

But there is one big problem: JSON arrays are heterogeneous, but Spark SQL arrays are not. While is valid JSON doesn’t have a clear mapping back to a Spark type: {array: [1, "test"]}. So how do we fix this? Read on.

Merging types

When we have an array containing a bunch of types there isn’t always a great way of resolving the differences. In some cases it’s easy. Given an Integer and a BigInteger and it’s obvious that the smaller type can be converted to a larger type without losing information. But a String with a Long? One thing we know for sure is that all JSON nodes can be serialized. And their serialized form, at least in Spark SQL, is a String. This is our top type.

And we can extend this to all other types. Whenever we find a mismatch that cannot be resolved via a lossless widening we will convert both sides to a String. With a few exceptions for complex types.

Multiple document inference

By fixing the array case we’ve really done most of the work for inferring a schema for an entire RDD worth of JSON.

Extending this to multiple documents ended up being boring (Spark is awesome, right?). We could fold (sequentially) the inference and merging functions over the RDD, but we’ll take advantage of the commutative laws and divide-and-conquer with a tree aggregation to improve performance on very large datasets.

Parsing

Now that we have a schema we can use to drive the conversion, let’s tackle the parser.

The new parser is also recursive descent parser and is schema directed. That is to say that that only fields and types defined in the schema are parsed and everything else is discarded. This paves the road for projection pushdown as well as being good for performance.

The hard work was almost entirely in the inference stage, the parser itself will be much simpler. While recursively parsing the token stream, the fields in the stream just need to be paired up with the fields defined in the schema (if there is one) and written into the appropriate column in a row.

However, there is one little snag. String is our top type, so any field, including objects and arrays in the token stream, must be convertible to a String. Jackson natively supports converting a token stream back to a String so this is nearly free. Easy right? Parsing is complete.

Benchmarks

Now we can convert any set of JSON strings into a Spark StructType and its matching DataFrame without an object model in sight. How did the rewrite fare in the end?

On production jobs I regularly saw a 10x to 50x decrease in wall clock time for parsing tasks. The actual performance delta will depend on the schema and queries being executed, but it should be faster across the board.

Prior to merging upstream I ran some simple benchmarks using the publicly available last.fm Million Song dataset:

Command v1.3.1 v1.4.0
import sqlContext.implicits._
val df = sqlContext.jsonFile("/tmp/lastfm.json") 70.0s 14.6s
df.count() 28.8s 6.2s
df.rdd.count() 35.3s 21.5s
df.where($"artist" === "Robert Hood").collect() 28.3s 16.9s

Resolution

Spark 1.4 has been out for nearly a year and this change lives on in the most recent releases as well. Whenever you use SQLContext.read.json (or the deprecated SQLContext.jsonRDD) this is quietly processing your data behind the scenes. No config or code changes required, JSON handling is just faster.

The Spark job that might have taken a week before now completes in a few hours. And I ended up with the IOPS metrics I needed rolled up in 15 minute quantiles, perfect for the bin packing optimizer. That’s what I call a win.

Hope you’re also enjoying the improved performance! And if you’re interested in working on these and other fun problems with us, check out our jobs page.

Mobile foot traffic analytics system using custom optical flow

crowd

Simple computer vision techniques can help us analyze real-time video streams automatically using computers that have low processing power given that the algorithms are implemented keeping the memory and processor constraints in mind. Computer vision on a Raspberry Pi has become easier than ever, with a rate as good as 10 – 12 frames per second while running several filters and dense optical flow algorithm variants on a real-time video stream. If we can achieve good results for a small computing platform like this, there are many potentially useful applications we can build.

How can we utilize the efficiency and mobility factor of a system designed to do computer vision at low processing power? In this case, however, we did not build the hammer first. There is an exciting frontier utilizing computer vision and some other Internet of Things (IoT) based analytics in urban space, shops, and retail stores to inform policy makers, shop owners, and general public about how they interact with the physical space. In one of our research efforts, we decided to explore these ideas a little deeper.

GoDaddy recently collaborated with the Social Computing group in the MIT Media Lab in one of our projects (Placelet) that aims to understand pedestrian and shop customer foot traffic dynamics. This will eventually shed some light on economic activities in a given area. We also expect that foot traffic analytics will fundamentally change the way small business owners understand and operate their businesses.

Technological tools to help researchers understand urban dynamics in the cities have existed for decades. The inventions that help us understand pedestrian and traffic dynamics in streets ranged from computer vision, ultrasonic (or other wave based) Time of Flight (ToF ) tracking, etc. to simple manual counting of how many pedestrians and cars pass by an intersection. This data has been used to inform city planners about the efficiency of their designs and also to suggest possible interventions in the city.

At the same time, there has been a boom in small businesses that operate in the urban landscape. Internet has given the small business owners opportunities to have an online identity (through domain names) and get rated by customers for their service (yelp.com, etc.). There has been a lot of research and businesses created based on analytics of website visitors, popularity, and aggregated ratings from different rating sites. However, what small business owners still lack is an analytics platform for the physical space their business resides in.

Google Analytics provides us with data about website traffic, but in our project, we aim to design a system that can provide the equivalent of Google Analytics for the physical space. Just like website visitors leave their mark in different pages by interacting with different HTML elements, customers of many stores (small grocery shops to retail chain shops) leave their physical footprint by interacting with the products on display. Before entering the shop, they may slow down and spend some time deciding to go in based on what’s displayed outside. After entering the shop, they move around different shelves, stop at certain shelves, and do not bother to stop for other ones. It would immensely help business owners to know how customers interact with different shelves in their store, if there are particularly popular spots, and also the factors that make a shelf or a spot popular. This kind of data can also help them design their shops better, deploy interventions at the right places (placing advertisements and promotion offers at the popular sites). We can also possibly understand how appealing the store’s current design is to different aged people.

The analytics part of this process can be accomplished using techniques that already exist in the computer vision community. We have used computer vision to detect customer flow and velocity profiles, while keeping the data anonymized by virtue of the system design. In this blog post, I will describe the system design, show some preliminary data analysis, and present a future direction for this kind of research.

Things computer vision can easily answer

Given a store’s position and some features in a scene shot in an outdoor location, we can measure foot traffic velocity and group dynamics outside that store. Given a store’s floor plan, we can also answer several questions about customer’s interaction dynamics. These include (but are not limited to): What are the popular sites in the store? How many customers visited the store at different times of the day? What were their velocity profiles? In other words, where were they slowing down, which shelves did they pass by comparably quickly?

Customer interaction dynamics questions aren’t limited to the above set. However, for the sake of keeping the article shorter, we will only talk about tackling the above questions.

There are several constraints that need to be taken care of when designing a solution for the above problems. The anonymity of the customer needs to be maintained at all times to respect the privacy of customers. At the same time, the business owner should not miss out the detailed interaction information about customers either.

Simple algorithms for handling real-time streams

Our solution to the above questions is to build and place mobile video processing units in the stores. The video will be captured and processed in real-time, without saving any video or image data. Specific algorithms that will be used in processing the video are as follows.

Optical flow in a scene. This will measure flow velocity of moving customers in general. Optical flow is a technique to measure velocity of each pixel in a scene, based on magnitude and direction of movement in each pixel. The first two images show the optical flow method being applied to a traffic video using a simple matlab script. Pixels that correspond to moving objects are colored with yellow arrows, with the arrows pointing towards the direction of movement.

cars

The other pixels in this particular scene remain stationary so they are left as is.

Blob detection, based on optical flow. This method will distinguish between moving objects and also find customer group sizes. Based on the optical flow data, we can find pixels that are currently in motion for each frame. After some standard noise reduction and morphological smoothing, we would get labelled ‘blobs’ that represent each customer in a given scene. The rest of the images show reconstruction of blobs from the above optical flow data.

Contour reconstruction to find the polygonal shape of moving blobs in a scene. Analyzing the polygons can tell us more about group size and proximity between customers. In some cases, this will allow us to correlate the size of the polygon to age group of the customer. However, that would depend on light, shadow, and occlusion conditions.

Among the three algorithms described above, the video processing unit will only do the first (optical flow calculation) in real-time, and save the optical flow data in the disk to save processing time. The data can be post-processed later with the other two algorithms. We particularly do this to save processing time and capture information without any considerable amount of lag. Also, note that customer privacy is maintained by using computer vision in real-time – no images are saved, no face recognition etc. are done, just pixel velocity data are saved in the device.

Once the units start collecting data, we will need to provide the user (small business owner) tools to understand the data. There should be a mobile phone app to visualize this data, and also a web interface for completeness.

Video processing unit

device

The video processing unit we designed is an enclosure that contains a Raspberry Pi and a web camera. There is a USB Wi-Fi antenna to help access the Raspberry Pi through secure internet protocols (for batch transferring data).

diagram

The casing is designed so the unit can be attached to the ceiling to get a top view of the store. In case a top view is not possible to attain, or we are deploying the unit outside, we need to project all the blobs acquired from the elevated view to a top view. This can be done by transforming all blob coordinates guided by an appropriately calculated projection matrix.

Software

numbersThere are two components to the software part. Let me describe them briefly.

Computer Vvsion: Raspberry Pi has its own operating system called Raspbian, which is a variant of standard Linux distribution. We installed OpenCV in the Raspberry Pis, and wrote OpenCV code to capture video stream from the attached webcam. The optical flow algorithm is given in OpenCV, we enhanced the algorithm by adding some of our own filters to precondition the image and the resulting data (flow velocity profile of moving pixels) is stored in a set of local files periodically. The figure shows a snapshot of the data. The files contain rows of pixel information. The first two coordinates are the original pixel, and the next coordinates (separated by a dot) represent the direction and magnitude of velocity change from the pixel.

OS maintenance: We wrote bash scripts to execute the computer vision code at certain times of the day. OS scheduler jobs were written for Linux to execute the computer vision scripts on startup (in case of power issues in the store or accidental restart of the Raspberry Pi). Some scripts were also written to securely copy the flow velocity data to a remote server over Wi-Fi.

Preliminary data collection and post-processing

We have collected some pilot data to understand the prospect of this system. In collaboration with GoDaddy, we deployed these units in several shops in the Downtown Crossing area of Boston. These stores are small business customers of GoDaddy (for example, Henry Herrara’s Mexican Grill). The video units were deployed in one outdoor location (to possibly measure foot traffic outside some stores), and three other indoor locations (two restaurants and one crafts shop).

The units were deployed in late October, and we collected them back two weeks later, in mid-November. Each unit had approximately 30 GB of pixel velocity data. The first job was to clean, aggregate, and calculate velocity profiles from the data.

The following diagram shows a snapshot of view from an outdoor location and the approximately projected top view of optical flow at the intersection.

projectionmapoptical

The optical flow data can now be averaged and interpolated to create a smooth velocity map across many frames. This kind of a map can show trends over longer time periods. In this case, the velocity trend plot is created based on 5 minutes of optical flow data.

5minuteflowdata

The data can also be used to create maps of averaged velocity magnitudes, accumulated over time. This gives us heatmaps of regions where activities were occurring. The following are such activity maps, each picture representing 5 minutes of activity at the intersection of Summer Street and Winter Street beside Macy’s.

orangeheatmaps

This activity data can be properly visualized to inform store owners about outdoor and indoor activity hot spots. The same kind of data that were collected indoors can be used to inform the store owners about customer’s interaction dynamics.

Future directions

We are currently working on the third computer vision task – understanding shape and size of moving polygons in the scene. A detailed analysis using velocity and shape features may reveal more information about group size of customers. Devising an unsupervised method that requires less to no labelled data is one of our main goals.

Secondly, based on customer behavior, an AI-based software may suggest interventions in the shop. These interventions may be in the form of advertisement or product placement recommendations. These will require a combination of advanced recommender systems and some human interior designer’s inputs.

The system can be used to track product and advertisement life cycle to some extent. An advertisement may initially draw attention but later customers may not be interested in the offer. By analyzing customer movement behavior around the advertisement area over a period of days, the owner may take down or replace the promotion/advertisement offer. The same goes for understanding how/if a new product attracts customer attention.

Finally, a cheap and working technology around this kind of physical space analytics is Bluetooth scanning. Assuming a significant number of customers have their Bluetooth discovery option enabled in their smartphones, we can build cheap and energy efficient Bluetooth sniffing devices that can be deployed in stores. Counting the number of Bluetooth devices in a designated space and understanding their movement may give us similar results at a cheaper price. Proximity tracking based on RSSI signal strength is a way to understand movement patterns in a store if there are enough sniffers sampling at a high time resolution.

Join us!

Interested to join GoDaddy’s Emerging Products group and working on hard technology problems to help small businesses be more successful? We are constantly growing our team – for example have a look at the following current job openings for our San Francisco and Sunnyvale office:

  • Principal Engineer, Emerging Products
  • Senior Mobile Engineer, Emerging Products

Visit GoDaddy careers to learn about these and all our current openings across the company.

Logging – The Easy Way

logs

Logging is a funny thing. Everyone knows what logs are and everyone knows you should log, but there are no hard and fast rules on how to log or what to log. Your logs are your first line of defense against figuring out issues live. Sometimes logs are the only line of defense (especially in time sensitive systems).

That said, in any application good logging is critical. Debugging an issue can be made ten times easier with simple, consistent logging. Inconsistent or poor logging can actually make it impossible to figure out what went wrong in certain situations. Here at GoDaddy we want to make sure that we encourage logging that is consistent, informative, and easy to search.

Enter the GoDaddy Logger. This is a SLF4J wrapper library that encourages us to fall into the pit of success when dealing with our logging formats and styles in a few ways:

  • Frees you from having to think about what context fields need to be logged and removes any worries about forgetting to log a value,
  • Provides the ability to skip personal identifiable information from being logged,
  • Abstracts out the actual format of the logs from the production of them. By decoupling the output of the framework from the log statements themselves, you can easily swap out the formatter when you want to change the structure and all of your logging statements will be consistently logged using the new format.

A lot of teams at GoDaddy use ELK (Elasticsearch, Logstash, Kibana) to search logs in a distributed system. By combining consistent logging with ELK (or Splunk or some other solution), it becomes relatively straight forward for developers to correlate and locate related events in their distributed systems.

The GoDaddy Logger

In an effort to make doing the right thing the easy thing, our team set out to build an extra layer on top of SLF4J – The GoDaddy Logger. While SLF4J is meant to abstract logging libraries and gives you a basic logging interface, our goal was to extend that interface to provide for consistent logging formats. One of the most important things for us was that we wanted to provide an easy way to log objects rather than having to use string formatting everywhere.

Capturing the Context

One of the first things we did was expose what we call the ‘with’ syntax. The ‘with’ syntax builds a formatted key value pair, which by default is “key=value;”, and allows logging statements to be more human readable. For example:

logger.with(“first-name”, “GoDaddy”)
     .with(“last-name”, “Developers!”)
     .info(“Logging is fun”);

Using the default logging formatter this log statement outputs:

Logging is fun; first-name=“GoDaddy”; last-name=”Developers!”.

We can build on this to support deep object logging as well. A good example is to log the entire object from an incoming request. Instead of relying on the .toString() of the object to be its loggable representation, we can crawl the object using reflectasm and format it globally and consistently. Let’s look at an example of how a full object is logged.

Logger logger = LoggerFactory.getLogger(LoggerTest.class);
Car car = new Car(“911”, 2015, “Porsche”, 70000.00, Country.GERMANY, new Engine(“V12”));
logger.with(car).info(“Logging Car”);

Like the initial string ‘with’ example, the above log line produces:

14:31:03.943 [main] INFO com.godaddy.logger.LoggerTest – Logging Car; cost=70000.0; country=GERMANY; engine.name=”V12”; make=”Porsche”; model=”911”; year=2015

All of the car objects info is cleanly logged in a consistent way. We can easily search for a model property in our logs and we won’t be at the whim of spelling errors of forgetful developers. You can also see that our logger nests object properties in dot object notation like “engine.name=”V12””. To accomplish the same behavior using SLF4J, we would need to do something akin to the following:

Use the Car’s toString functionality:

Implement the Car object’s toString function:

String toString() {
     Return “cost=” + cost + “; country=” + country + “; engine.name=” + (engine == null ? “null” : engine.getName()) … etc.
}

Log the car via it’s toString() function:

logger.info(“Logging Car; {}”, car.toString());

Use String formatting

logger.info("Logging Car; cost={}; country={};e.name=\"{}\"; make=\"{}\"; model=\"{}\"; " + "year={}; test=\"{}\"", car.getCost(), car.getCountry(), car.getEngine() == null ? null : car.getEngine().getName(), car.getMake(), car.getModel(), car.getYear());

Our logger combats these unfortunate scenarios and many others by allowing you to set the recursive logging level, which defines the amount of levels deep into a nested object you want to have logged and takes into account object cycles so there isn’t infinite recursion.

Skipping Sensitive Information

The GoDaddy Logger provides annotation based logging scope support giving you the ability to prevent fields/methods from being logged with the use of annotations. If you don’t want to skip the entity completely, but would rather provide a hashed value, you can use an injectable hash processor to hash the values that are to be logged. Hashing a value can be useful since you may want to log a piece of data consistently but you may not want to log the actual data value. For example:

import lombok.Data;



@Data

public class AnnotatedObject {
     private String notAnnotated;

     

@LoggingScope(scope = Scope.SKIP)
     private String annotatedLogSkip;

     public String getNotAnnotatedMethod() {
          return "Not Annotated";
     }



     @LoggingScope(scope = Scope.SKIP)
     public String getAnnotatedLogSkipMethod() {
          return "Annotated";
     }

     

@LoggingScope(scope = Scope.HASH)
     public String getCreditCardNumber() {
          return "1234-5678-9123-4567";
     }
}

If we were to log this object:

AnnotatedObject annotatedObject = new AnnotatedObject();
annotatedObject.setAnnotatedLogSkip(“SKIP ME”);
annotatedObject.setNotAnnotated(“NOT ANNOTATED”);

logger.with(annotatedObject).info(“Annotation Logging”);

The following would be output to the logs:

09:43:13.306 [main] INFO com.godaddy.logging.LoggerTest – Annotating Logging; creditCardNumber=”5d4e923fe014cb34f4c7ed17b82d6c58; notAnnotated=”NOT ANNOTATED”; notAnnotatedMethod=”Not Annotated”

Notice that the annotatedLogSkip value of “SKIP ME” is not logged. You can also see that the credit card number has been hashed. The GoDaddy Logger uses Guava’s MD5 hashing algorithm by default which is not cryptographically secure, but definitely fast. And you’re able to provide your own hashing algorithm when configuring the logger.

Logging Context

One of the more powerful things of the logger is that the ‘with’ syntax returns a new immutable captured logger. This means you can do something like this:

Logger contextLogger = logger.with(“request-id”, 123);
contextLogger.info(“enter”);


// .. Do Work


contextLogger.info(“exist”);

All logs generated off the captured logger will include the captured with statements. This lets you factor out common logging statements and cleans up your logs so you see what you really care about (and make less mistakes).

Conclusion

With consistent logging we can easily search through our logs and debug complicated issues with confidence. As an added bonus, since our log formatting is centralized and abstracted, we can also make team-wide or company-wide formatting shifts without impacting developers or existing code bases.

Logging is hard. There is a fine line between logging too much and too little. Logging is also best done while you write code vs. as an afterthought. We’ve really enjoyed using the GoDaddy Logger and it’s really made logging into a simple and unobtrusive task. We hope you take a look and if you find it useful for yourself or your team let us know!

For more information about the GoDaddy Logger, check out the GitHub project, or if you’re interested in working on these and other fun problems with us, check out our jobs page.

Premium Results through Elasticsearch

elasticsearch

The GoDaddy FIND Team is responsible for services that help suggest domain names. While this sounds reasonably straightforward, it is a critical function of taking a customer from the point of being interested to making the purchase of the perfect domain name – and when it comes to premium names, the higher price point means the suggestions must be right on target. Based on an initial starting suggestion, be it an initial domain name or even just search terms, we leverage ElasticSearch to quickly and insightfully suggest names to the customer.

We can even take into account customer information like previous purchases, other domains a customer owns, or any other hint. The inventory of premium names comes from a number of different sources including names available for sale and auctions from both internal as well as partner providers. We load this data continuously and put it into ElasticSearch. From this index, our engine can query for good candidates to present to customers.

Why Should You Care?

Like most things here at GoDaddy, this process needs to be fast, accurate, and reliable. Fast and reliable can be accomplished by any modern cache or key/value lookup system. Accurate can be achieved by quite a few robust search systems; many of which don’t win any races when it comes to speed and make “reliable” a nice way to discover humor. ElasticSearch, when used properly, hits all three requirements. Learning how to use ElasticSearch provides a solid resource for collecting, analyzing, and serving data that allows our customers to make solid purchasing decisions.

GoDaddy had to configure ElasticSearch by trial-and-error, test-and-measure and, in some cases, making outright guesses to see how everything played out. ElasticSearch, while more mature now, is still somewhat of an emerging technology and learning how to use it for our specific needs was both a challenge and an opportunity.

How we use Eslasticsearch at GoDaddy

What Is ElasticSearch?

Let’s take a closer look at how we use ElasticSearch to help customers find domain names at GoDaddy and examine some of the challenges we faced and the solutions we uncovered.

ElasticSearch is a scalable, distributed search engine that lives on top of Lucene, a Java-based indexing and search engine. ElasticSearch is fast and provides an elegant API-based query system. The first thing we need to understand is how ElasticSearch defines indexes that live in shards across nodes and how it replicates data for reliability. Our first challenge was ensuring that our index was always available and returned results fast enough to support real-time search.

An ElasticSearch index is the data set upon which our searches are performed and shards are pieces of that data set distributed amongst nodes, which are individual installations of ElasticSearch typically one per machine. For the GoDaddy Domain Find team, we rebuild our index daily while also taking in real-time feeds of auction domains that inform us when names are added and deleted as well as when prices change. We have set-up a Jenkins job to bring in this data and add it to our current index throughout the day without impacting read operations. To do this, we have configured our ElasticSearch cluster to separate nodes that hold data and nodes that we make available via our API for doing searches. This way, as we’re taxing the data nodes with new data the API nodes are not impacted. We even turn off replication during large loads so that each batch loaded does not start a brand new replication operation. These have now become somewhat standard practice with ElasticSearch, but when we were first starting out, it was the Wild West! This strategy reaped the best results out of much trial and error.

Scaling Challenges

We scale our system with the addition of new nodes which reduces the number of shards per node (thus reducing load) by re-allocating nodes automatically. Indeed, most of the scaling and distribution of ElasticSearch is done automatically with hints based on your configuration. Any given node may have all shards or a subset, thereby distributing the load when a search is performed. This is a key benefit of ElasticSearch in that it does the hard work for you.

The default number of replicas is one. A replica is a replication of your index, so with a replica of one you have two complete sets of data: one to start plus one replica. ElasticSearch will ensure that a shard and its replica are never stored on the same node (provided you have more than one node, of course). If a node should go down, it will take with it either a shard or its replica, but not both. So you’re covered. Even if you have more than two nodes and two replicas, you could suffer a failure of two nodes and still have your data available.

For our system, we chose to have two data nodes and two client nodes that hold no data, but have the interfaces for performing queries. This was also trial and error. We tried two and we tried six.

The Black Magic

Deciding how many shards and replicas is somewhat of an untested black art both here at GoDaddy and also in the wild. Trial and error on the number of shards, if you have the time and patience, is an interesting exercise. We started with the default of five and tested performance. We then increased or decreased and then remeasured. As ElasticSearch matures, this area will likely receive attention from developers. At the ElastiCon15 conference this year, none of the presenters had the same configuration in terms of size and that was rather telling. Each had to determine their configuration individually based on their use cases. One thing to note is that once you create an index with a set number of shards, you cannot change it. You can create a new index, of course, but the one you’ve created has its shard count set. Replica count, however, can be changed at any time and ElasticSearch will re-allocate shards across nodes appropriately.

For our purposes in the GoDaddy Domain FIND team, we stuck with the default of five shards because our data set does not contain a huge number of documents and five shards is a decent number. If your document count is high, you would want to consider more shards to split up the data to make queries (and indexing) faster. We also found that for our use, four nodes provided enough headroom and redundancy, so we created three replications in our configuration.

Down the Rabbit Hole: Cross Data-Center Distribution

What about distributing across multiple data centers like we have? The official answer seems to be, “sure, but it’s not supported.” If the pipe between your data centers is reasonably wide and reliable, there’s no reason you can’t. We tried it with nodes in different data centers and the communication between them got bogged down and caused nodes to overload and time out – taking the whole cluster down! For now, it’s my recommendation that we avoid doing it. This, too, is something the ElasticSearch developers say they’re working on improving. Honestly, I’ll believe it when I see it.

Monitoring

For monitoring, we use a number of tools, but the best one that we like is the very simple “head” plugin.

Head Plugin Sceenshot

In the image above, we see the output from head plugin. Each index is spread across six nodes and each node has five shards. In this case, we have set the number of replicas to five, meaning we have a total of six copies (the original plus five replicas). Primary shards have a bold border and replicas do not. This tool gives us a great, but simple visual representation of our cluster and also provides for quick ad-hoc queries and modifications to our indexes.

One indicates everything is reasonably balanced, queries are fast. Ours tend to be sub-10 milliseconds which allows ElasticSearch to be used as a real-time responsive system. While we had a number of challenges in crafting efficient queries, once we got over that hurdle things have been fast and stable ever since. Word to the wise: don’t put wildcards in your queries that result in huge intermediate results. It’s not pretty.

ElasticSearch also provides a more heavyweight monitoring solution called Marvel. Our first try at Marvel was less than impressive as it put too much of a load on each node and filled the indexes with lots of monitoring information that cluttered things up. I’m told that this has improved dramatically in the past year and we’re keen to give Marvel another try.

Challenge: Indexing a Live System

What about indexing? For our team, our biggest challenge is that we need to ensure 100% uptime and that includes ensuring ongoing indexing does not impact read operations. Failing to provide solid premium domain recommendations means money left on the table. So when we rebuild our entire index every night, we do it in a creative way by indexing to only one node which is marked as never being a master node and containing only data. This is, in essence, a “write” node. We turn off replication, create the new index, and load into it. This operation takes about two hours. Once done, we turn replication back on and let the index be copied to all nodes, including those from which we read. Once that is complete, we then tell ElasticSearch about the new index and make it primary using an alias. This gives us zero downtime for reads as well as keeping the heavy lifting of indexing constrained to one node until it is done. When we add records throughout the day, we do it to that one node and let it push the updates to the read-only nodes.

Eleasticsearch at GoDaddy

For us at GoDaddy, such strategies make a lot of sense when considering how uptime, responsiveness, and indexing operations can potentially impact read performance.

Shameless Self-Promotion

The fine folk at Elastic created a video wherein they asked a number of people how they’re using ElasticSearch and why they like it. I got to ramble for a while and they used a couple clips.

If you’re interested in working on these and other fun problems with us, check out our jobs page.

Crowdsourcing for Complex Tasks: How to Ensure Quality Output

empty-chairs-crowd-source

Here at GoDaddy, we use crowdsourcing in our pipeline for extracting content from the web. This pipeline helps us convert arbitrary web pages into semantically structured data such as price lists (e.g. restaurant menus). There are many different ways to make crowdsourcing tasks, ranging from microtasks (seconds per task) to complex tasks (up to hours per task). The best task type to use depends on the problem you are trying to solve and how much context crowdworkers need. In a previous post, we discussed the trade-offs between task types, why complex tasks work better for our use case, as well as touched on how to ensure high quality output with task design. In this post, we will talk about how to achieve quality output in more depth.

A Hierarchy of Workers

a-hierarchy-of-workers

Our workers are organized in a 3-level hierarchy, with the least experienced workers at the bottom and most experienced at the top. Workers at the bottom process a task from scratch, while workers at higher levels, called reviewers, review and correct their output. As a task moves through the hierarchy, the quality should improve. This hierarchical structure is uncommon for crowd work, where the typical model is to treat all workers as interchangeable. Here are the main reasons we chose this design:

Complex tasks require more training. Our tasks require a few weeks for a new worker to onboard. This is the opposite of microtasks, where you can typically learn the task in a few minutes. A hierarchy of review allows experienced workers to provide feedback and instruction to new workers, serving as a key part of the training process. Our system facilitates communication by allowing workers to leave messages and annotate a task with comments.

Reviews takes less time that reprocessing. Reviewing a task for mistakes takes much less time than processing a new task. Sometimes tasks require a lot of manual data entry, in which case it is much easier to check for errors in the task than to reproduce it.

Difficult to compare the output of complex tasks. In crowdsourcing, there is a chance that a spammer picks up your task. The common method of dealing with this is to send a task to multiple workers and take a majority vote on the output. This works well for tasks with simple outputs like true/false or multiple choice, but is much more difficult for tasks with complex outputs like a structured price list.

Saving Money & Maintaining Quality with Machine Learning

We have a hierarchy of workers doing complex work. Can we do better? One metric we care about is keeping costs low. Another is output quality. Is there a way for us to lower costs while keeping similar quality output? The simple answer is yes. The secret is that not all reviews are the same. Some reviews result in lots of fixes, while others make no changes at all. What if we could use machine learning to predict how many fixes a review will make? Then we could focus our efforts on only the tasks that need it the most.

We use machine learning to train a model to decide which tasks to review. In training this model, there are a couple of points to consider. First, what data do we use for training? We do not have “ground truth” data in the sense that we are 100% certain the task output is correct. Instead, we have tasks that have been corrected by trusted reviewers. For model training, we assume that the output of a task after review is “ground truth”.

The other question is, what exactly is the model predicting? There are many ways to measure how much a task has changed.  We need some quantitative measure of output quality that we can train against. In our case, the task output is a large blob of text with roughly one significant unit of data per line. A natural measure of quality is the percentage of lines different between the worker output and “ground truth”. We train our model to predict percentage line difference, and then pick the tasks with the highest predicted line difference for review. We can evaluate our model’s performance with the following graph, which shows the total errors caught if we review the tasks that the model picked, compared to randomly picked tasks, as we vary the review budget from 0% to 100%.
taskgrader_performance_blog

In production, we dynamically change the threshold for review to match our desired review budget. Currently, our production review budget is set to around 40%. From the performance graph, you can see that at the desired budget our model catches 50% more errors than random.

The Future of Crowdsourcing at GoDaddy

There’s so much more we want to do with human workers in our workflows. Right now, all of our tasks involve some kind of data extraction / entry, but we are curious to see how the crowd can be used for more subjective, creative tasks like designing a website! We also plan to use crowd workers to help train models for personalization to help small businesses better target their users. Stay tuned for more posts about that in the future.

For more details on the topics discussed in this blog post, you can read our paper that appeared in the 41st International Conference on Very Large Data Bases (VLDB 2015).  If you are interested in joining our team, you can apply at http://careers.godaddy.net.

Why GoDaddy Built an Actor System Library

theater

GoDaddy had a number of challenges to solve when designing its Virtual Private Server (VPS) and Dedicated Hosting Platform. The system had to be highly scalable, capable of operating across multiple datacenters spread across the world, and support over 5,000 systems hosting tens of thousands of customers. The system had to be capable of running simultaneous operations directly on those remote systems, interact with external services, and it must be fault tolerant, adaptable, extensible, and operationally agile.

There are many different approaches to these problems, but one simple yet very powerful approach that met all the requirements is the Actor Model. The Actor Model was conceived in MIT’s Artificial Intelligence labs back in the mid 1970’s, and recently has seen renewed interest because of the way it addresses development architecture.

GoDaddy uses the Python Language for many of its internal systems and production applications. Evaluating the available Python Actor Libraries, we found that the existing ones were relatively simple with a focus on academic exploration and did not support key features that we needed:

  • Simple and direct actor implementation
  • Remote distributed execution
  • Flexible implementation with debugging support
  • Dynamic source code loading

GoDaddy therefore developed an Actor Library for Python called Thespian to support this highly-scalable architecture. GoDaddy has open-sourced the Thespian Library to share it with the community for others looking to use Python Actors to deploy real, distributed, enterprise-level services. This blog talks about the Thespian Library and some of the challenges GoDaddy has overcome using it.

Why we chose to use actors

GoDaddy Virtual and Dedicated Hosting distributed provisioning application creates Virtual Private Servers (VPS) or Dedicated Servers for customers, and provides ongoing management of those servers. The provisioning application implements the business logic of the VPS and Dedicated Hosting Platform as well as coordinating operations that run either locally or on one of the thousands of remote systems that support customer servers.

Customer requests (creating a server, rebooting a server, changing passwords, etc.) are passed to our application server through a primary REST API (via our Web-based User access control panel). The application server is responsible for handling all of these requests in parallel in order to support our thousands of customers; although the requests for a specific customer server are processed in the order they are received. The Django front-end, coupled with the MySQL database is sufficient to provide the API interface and handle any requests that simply query static information and return their results synchronously, however, longer-running requests (like rebooting a Windows Server) cannot be performed in a blocking manner by a Django gunicorn thread. In addition, many portions of these requests must be executed remotely on the host system where the customer’s server is actually running as well as interact with external services which may take varying amounts of time to complete their operation.

The distributed nature of this provisioning application brings with it two additional challenges: deployment synchronization and testing/debugging. The system must support deploying new software to 5,000+ systems without impacting ongoing customer requests and avoiding the drift problems of upgraded systems interacting with yet-to-be-updated systems. Due to its nature, a distributed system is very difficult to execute in a simple deterministic manner under external control, yet GoDaddy engineering must ensure that the system operates correctly by writing unit and functional tests, and using those tests to debug the system in our development environment.

The Actor Model provides the necessary asynchronous and distributed framework for processing distributed requests involving the customer’s servers or external services while Thespian also solves the deployment and testing problems described above.

How Actors process our requests

Each logical step that must be performed to complete a customer request is implemented as an Actor in order to gain the advantages of concurrency and fault-tolerance provided by an Actor-based system. Every step performs its function independently of all other steps, and any failures or issues related to that step are isolated to the corresponding Actor. In each step the Actor may enlist other Actors on remote systems to run commands on those systems, or it may access external services that provide auxiliary functionality (e.g. allocating a new IP address from a pool of addresses). Failures and delays in those operations similarly affect only those Actors, and only for a single request.

The Actor Model provides the ability to run multiple different requests simultaneously by implementing concurrency at the Actor level allowing each Actor to run simultaneously with other Actors so that the system can process multiple requests at the same time. The Actor Model also allows for automatic recovery of actor failures while isolating the effects of that failure to a single Actor and a single request. This makes the system very resilient to failures.

the-actor-model

As shown in the figure above, requests are passed to the backend by creating an “action” object that corresponds to the request in the MySQL database (except ephemeral actions which have no database entry) and sends that message to the top-level Control Actor. Each action object specifies the list of step names that should be executed in order to perform the action. The Control Actor validates the action, then passes the action to each corresponding Step Actor in sequence to perform the action. Successful completion of all of the steps results in the Control Actor marking the MySQL request entry corresponding to the action with a “completed” status.

The Agency Actor acts as a singleton factory of Step Actors. The Control Actor supplies the step name (from the action) and the agency returns the Actor Address of the dynamically loaded and started Step Actor. Multiple actions of the same type can be processed in parallel for instance the first action may be at the second step (i.e. being processed by the second Step Actor) while the second action may be at the first step and being processed by the first Step Actor.
As the figure above shows, for the reboot request there is a step to validate that the customer’s disk is not 100% full which will prevent a successful boot, followed by the reboot step itself, and then multiple steps which verify the successful restoration of network connectivity for the customer’s server.

Each Step Actor implements whatever functionality is needed to perform that step. Some steps may communicate with external services and some steps must execute commands on the remote host system. In addition to the fault tolerance described above, the Actor Model makes the overall application tolerant of delays meaning it does not matter to the other Actors how long one particular Actor takes to perform its task, and the other Actors can be busy processing other requests in the interim.

Remote execution

When writing an Actor, it receives messages delivered to it by the Actor System within which it is running. The delivery argument specifies the message and the address of the Actor that sent the message. The messages can be anything, but the Actor Address is an opaque handle to the actor. Actors can create other Actors by making a call to the Actor System and receiving an Actor Address for the newly created Actor in return. In addition, messages can contain Actor Addresses.

When an Actor wishes to communicate with another Actor, it makes a call to send a message, specifying the message itself and the target Actor Address as the arguments. The Actor System is responsible for determining which Actor the Actor Address references and delivering that message to the recipient Actor. This makes the Actor Code itself very simple, as seen in the example below, which contains two Actors that work together to generate a response to an external query.

from thespian.actors import *

class Hello(Actor):
    def receiveMessage(self, message, sender):
        if not hasattr(self, 'world'):
            self.world = self.createActor(World)
        self.send(self.world, (sender, 'Hello, '))

class World(Actor):
    def receiveMessage(self, message, sender):
        orig_sender, greeting = message
        self.send(orig_sender, greeting + "world!")

Because Actors reference other Actors using opaque Actor Addresses each Actor has no real knowledge of the actual circumstances of the other Actor. The GoDaddy Thespian Library allows multiple Actor Systems running on different network nodes to communicate and cooperate in Actor management and message delivery. Thus, an Actor running on the application server can send a message to an Actor running as another process on the same system or to an Actor running on any of the other 5,000+ host systems in exactly the same manner and without being aware of where the target Actor actually exists.

As host systems are created to meet additional customer demand, our Operations Team simply installs Thespian on the newly built systems. When Thespian starts up, it connects to the Thespian Actor System running on the application server to join into the convention of Actor Systems. These systems are now immediately available to host customer servers without the Control Actor or Step Actors ever being aware that there is a new server in the system.

Thespian Actor capabilities

The Thespian Actor System is responsible for determining whether new Actors should be created on the local system or a remote system running Thespian. This is handled by specifying a set of “capabilities” for each Actor System, based on probing the local environment. The capabilities are simply a key/value pair and can be anything that the code starting the Actor System wishes to supply.

Example Actor System startup with capability specification:

from thespian.actors import *
from database import dbclass
import socket

lcladdr = socket.getaddrinfo(socket.getfqdn(), 0,
                             socket.AF_INET,
                             socket.SOCK_STREAM,
                             socket.IPPROTO_TCP, 0)

capabilities = { 'Convention Address.IPv4': '10.5.1.1:1900',
                 'System Address.IPv4': lcladdr,
               }

try:
    dbconn = dbclass.connect(...)
    if dbconn and 1 == dbconn.runquery('select 1=1'):
        capabilities['Has DB access'] = True
except DBError:
    pass

ActorSystem('multiprocTCPBase', capabilities)

Actor definitions can specify what capabilities that Actor requires to run successfully, either via the actorSystemCapabilityCheck() static method on the actor class or using the @requireCapability class decorator or both. When createActor() is called for the Actor, the current Actor System checks to see if its capabilities match the Actor’s requirements. If they do not, it will then attempt to locate another Actor System that satisfies the required capabilities and forwards the createActor operation to that remote Actor System. All of this is handled by the Actor System without requiring any code in the Actors themselves.

Example Actor specification of its required capabilities:

from thespian.actors import *
from database import dbclass

@requireCapability('Has DB access')
@requireCapability('LDAP installed')
class DBUpdateFromLDAP(Actor):

    @staticmethod
    def actorSystemCapabilityCheck(capabilities, requirements):
        return capabilities.get('LDAP zone', None) == 'Active'

    def receiveMessage(self, message, sender):
        rows = self.db.query('select * from ...')
        ...

This capability-based automatic configuration mechanism also allows the overall application to adapt to individual faults. For example, if one system loses access to the database perhaps due to a mis-applied firewall rule on a network switch, the Actors on that system will exit and be re-created on any other system in the environment that still has access to the database. All other Actors continue to operate as normal without any awareness of the reconfiguration other than receiving a new Actor Address for the Actor requiring database access. In conjunction with the fault tolerant behavior provided by automatically restarting Actors, this makes the system extremely resilient and essentially self-healing.

Dynamic source updates

As described above, our Hosting Provisioning system handles operational growth automatically, but as the overall environment grew the deployment process quickly became problematic using the conventional yum/spacewalk methodology:

  • The entire environment must stop taking requests and become quiescent. The RPM update stops the Actor System and all local Actors, which would drop any actions being processed at the time.
  • All 5,000+ systems must be updated, which can take some time and overload the yum servers if too many systems are updated at the same time.
  • Version mismatches can still occur if some remote hosts missed the upgrade, either because they were down or just going through the build process while the new deployment was being pushed out.

To resolve this, GoDaddy enhanced Thespian to support loadable code. The Thespian Actor System itself is still deployed using yum, but Thespian is relatively stable and new deployments are infrequent so the provisioning application itself is where most of the changes occur and this code can be dynamically loaded into the currently running Thespian via an Actor System call.

The same abstraction that allows Actors to communicate without being aware of the location of other Actors allows to identical Actors from different versions to run side-by-side. A zipfile containing Python source code can be dynamically loaded into Thespian via the loadSource() call which verifies the load as valid by having it authorized by a Source Authority Actor, and then assigns the loaded code a hash signature. When the Django front-end passes actions to the Control Actor, it specifies the source hash id associated with the Control Actor. In this manner, the older sources can still run pre-existing actions through to completion while new actions are directed to the newly loaded source. Once the older source is no longer active it can be unloaded from the Thespian environment.

When the current Actor System does not have the necessary set of capabilities to create an Actor, then the create is deferred to a remote Actor System and the source hash is passed along with the create request. If the remote Actor System does not have the identified sources available, it will automatically retrieve them from the requesting Actor System. This allows new sources to be deployed just to the application server whereupon they will be propagated throughout the rest of the environment on-demand alleviating the need to upgrade all 5,000+ systems at the exact same time which simplifies the deployment and ensures that all systems are using the same version of the software.

Debugging

A multi-process distributed system is very difficult to debug as setting breakpoints and stepping through code is no longer a viable option when the code is spread across multiple systems or even multiple processes. In addition, unit and functional test frameworks are usually only capable of testing, analyzing, and providing coverage information for a single-threaded application.

An actor-based design can be used to make the debugging and testing efforts considerably easier however. The abstraction of the concurrency methodology and message transport system provided by the Actor System is again the key element to enabling this simplicity.

The Thespian Actor System can be initialized with a parameter that specifies a “system base” that will be used to implement the Actor System. Using different system bases allows the ability to change the concurrency and message transport implementations within the Actor System requiring no code changes to the Actors themselves.

To simplify a large portion of the debugging process as well as enabling code coverage analysis during unit testing, a special system base is used that simply runs all Actors sequentially. Individual messages are delayed on a global queue until they can be sequentially delivered to the target Actor, and all Actors are simply run in the context of the primary test process. Actor code that requires external concurrency will still require fixtures to enable this mode of operation, but the overall system is quickly and simply reduced to a set of sequential operations that can be more easily debugged.

In addition, Thespian will soon be adding functionality similar to the “Goons” as described in the blog post by Sam Ng about Simulating Faults. Allowing a Goon to proxy for an Actor is another great and easy way to test an Actor-based application and help make Thespian-based applications more robust.

Conclusion

The Actor Model is a powerful yet simple tool for the modern software developer. The Actor Model provides a higher level of abstraction than alternative approaches which enables more focus on the application logic and doesn’t lock in a specific concurrency or transport mechanism enabling future operational flexibility. GoDaddy’s Thespian was developed to support our Hosting Provisioning System which requires an Actor Model that is scalable, fault tolerant, concurrent, modular, and extensible.

GoDaddy is excited to share this technology by open sourcing the Thespian Library to Python developers. Please feel free to submit contributions, comments, feature requests, or join us in person…we’re hiring!