Making RDS Database Versioning & Instrumentation Easy with Liquibase & CloudFormation

Originally published at: Making RDS Database Versioning & Instrumentation Easy with Liquibase & CloudFormation | Liquibase.com

Reposted with permission from Martin Whittington, Head of Platform @oddschecker.


Database schema changes are risky – learn how Liquibase can help make these changes in a controlled and stepped manner.

Database schema changes need to be managed carefully. Rolling out updates without fully understanding the ramifications can be catastrophic especially if, for example, a table has been nominated for “the drop”.

The team over at Liquibase has provided a wonderful database versioning tool that can be used to help make these sorts of changes in a controlled and stepped manner.

Let’s say we have three tables; Orders, Products, and Sales Reps. An order can contain one or more products. An order is also usually linked to a single sales rep so that the commission can be calculated correctly. Now, let’s imagine that a decision has come down from the top that commission is no longer being paid and the database needs updating to reflect this. The change would be removing the link from the orders table to the sales rep table and then eventually removing the sales rep table altogether. Each change would be a version-controlled update using changelogs:

<?xml version="1.0" encoding="UTF-8"?> 
<databaseChangeLog
  xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
                      http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.1.xsd">

<include file=“db.changelog-1.0.xml”/>
</databaseChangeLog>

Our master changelog contains all the files to include (run in order top to bottom). Let’s assume that db.changelog-1.0.xml was used to create the initial DB schema. We now want to add a new changelog that will:

  1. Drop the FK constraint between the order and sales_rep table.
  2. Drop the sales_rep_id column from the order table.
  3. Drop the sales_rep table.
<?xml version="1.0" encoding="UTF-8"?> 
<databaseChangeLog 
  xmlns="http://www.liquibase.org/xml/ns/dbchangelog/1.9" 
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
  xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog/1.9
                      http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-1.9.xsd"> 
  <changeSet author="mwhittington" id="changelog-1.0">
  	<dropForeignKeyConstraint baseTableName="order" constraintName="fk_order_sales_rep"/>
  	<dropColumn tableName="order" columnName="sales_rep"/>
  	<dropTable tableName="sales_rep"/>
  </changeSet>
</databaseChangeLog> 

Now we need to add this to our master log to roll out the update:

<?xml version="1.0" encoding="UTF-8"?> 
<databaseChangeLog
  xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
                      http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.1.xsd">

<include file=“db.changelog-1.0.xml”/>
<include file=“db.changelog-1.1.xml”/>
</databaseChangeLog>

Once this is tested on a local non-critical DB, we can package this as part of our AWS Lambda Jar. The Lambda code looks like this:

public class RDSManager implements RequestHandler<Map<String, Object>, Object> {
private static enum Status {
	SUCCESS, FAILED
}

private AWSSimpleSystemsManagement ssm = AWSSimpleSystemsManagementClientBuilder.defaultClient();
private String dbName = "whatever";

@Override
public Object handleRequest(Map&lt;String, Object> input, Context context) {
	LambdaLogger logger = context.getLogger();
	String requestType = input.get("RequestType").toString();
	logger.log(String.format("[INFO] RequestType is %s\n", requestType));
	
	// If the event is equal to delete we don't need to do anything
	if (requestType.equalsIgnoreCase("Delete")) {
		logger.log(String.format("[INFO] RequestType %s -> Exiting\n", requestType));
		return sendCFNResponse(input, Status.SUCCESS, context);
	}
	
	// Get Values from the event
	@SuppressWarnings("unchecked")
	Map&lt;String,Object> resourceProps = (Map&lt;String,Object>)input.get("ResourceProperties");
	String dbHost = resourceProps.get("DBHost").toString();
	
	// Get Parameter values from the env vars
	logger.log("[INFO] Fetching SSM Values\n");
	String userName = null;
	String userPass = null;
	try {
		userName = this.getSSMParameter(System.getenv("DB_USER"));
		userPass = this.getSSMParameter(System.getenv("DB_PASS"));
	} catch (SdkClientException e) {
		return this.handleError(e, input, context);
	}
	
	if (userName != null &amp;&amp; userPass != null) {
		logger.log("[INFO] Values fetched OK!\n");
		// Start the DB Connection
		logger.log(String.format("[INFO] Connecting to %s\n", dbHost));
		String url = String.format("jdbc:mysql://%s:3306/%s", dbHost, dbName);
		Connection conn = null;
		try {
			conn = DriverManager.getConnection(url, userName, userPass);
			logger.log("[INFO] Connected to DB!\n");
			Database dataBase = DatabaseFactory.getInstance().findCorrectDatabaseImplementation(new JdbcConnection(conn));
			Liquibase liquiBase = new liquibase.Liquibase("db.changelog-master.xml", new ClassLoaderResourceAccessor(), dataBase);
			liquiBase.update(new Contexts(), new LabelExpression());
		} catch (SQLException | LiquibaseException e) {
			return this.handleError(e, input, context);
		} finally {
			try {
				if (!conn.isClosed() || conn != null) {
					conn.close();
				}
			} catch (SQLException e) {
				e.printStackTrace();
				logger.log(e.getMessage());
			}
		}
		logger.log(String.format("[INFO] %s provisioned\n", dbHost));
		return sendCFNResponse(input, Status.SUCCESS, context);
	}
	logger.log("[ERROR] Unable to get AWS Parameters\n");
	return sendCFNResponse(input, Status.FAILED, context);
}

/**
 * Displays the error in the CloudWatch logs and then returns out of the Lambda
 * @param e the current exception
 * @param input The event object
 * @param context The context Object
 * @return The Status, FAILED
 */
private String handleError(Exception e, Map&lt;String, Object> input, Context context) {
	context.getLogger().log(String.format("[ERROR] Exception - %s", e.getMessage()));
	e.printStackTrace();
	return sendCFNResponse(input, Status.FAILED, context);
}

/**
 * Gets the SSM Parameter Value from AWS using the paramName
 * @param paramName the name of the param to fetch
 * @return the string value of the param
 */
private String getSSMParameter(String paramName) {
	if (paramName != null &amp;&amp; !paramName.isEmpty()) {
		GetParameterRequest request = new GetParameterRequest()
				.withName(paramName)
				.withWithDecryption(true);
		GetParameterResult result = this.ssm.getParameter(request);
		if (result != null) {
			return result.getParameter().getValue();
		}
	}
	return null;
}

/**
 * Send a response to the CFN build process
 */
private String sendCFNResponse(Map&lt;String, Object> input, Status status, Context context) {
	LambdaLogger logger = context.getLogger();
	String responseURL = input.get("ResponseURL").toString();
	try {
		URL url = new URL(responseURL);
		HttpURLConnection connection=(HttpURLConnection)url.openConnection();
		connection.setDoOutput(true);
		connection.setRequestMethod("PUT");

		OutputStreamWriter out = new OutputStreamWriter(connection.getOutputStream());
		ObjectMapper mapper = new ObjectMapper();
		
		ObjectNode cloudFormationJsonResponse = mapper.createObjectNode();
		cloudFormationJsonResponse.put("Status", status.toString());
		cloudFormationJsonResponse.put("PhysicalResourceId", context.getLogStreamName());
		cloudFormationJsonResponse.put("StackId", input.get("StackId").toString());
		cloudFormationJsonResponse.put("RequestId", input.get("RequestId").toString());
		cloudFormationJsonResponse.put("LogicalResourceId", input.get("LogicalResourceId").toString());
		cloudFormationJsonResponse.put("Reason", "See details in CloudWatch Log StreamName " + context.getLogStreamName() +" ** GroupName: "+context.getLogGroupName());
		String cfnResp = cloudFormationJsonResponse.toString();
		logger.log("[DEBUG] CF Json repsonse "+cfnResp);
		out.write(cfnResp);
		out.close();
		int responseCode = connection.getResponseCode();
		logger.log("[INFO] Response Code "+responseCode);
	} catch (IOException e) {
		logger.log("[ERROR] Not able to send message to CF Template \nStackTrace "+e.getMessage());
	}
	return status.toString();
}

}

In logical order, the Lambda does the following:

  1. Checks the request type. If it is a ‘Delete’ request, we just return a success to CloudFormation.
  2. Next, the code fetches AWS SSM Parameters to securely connect to the RDS instance.
  3. Once the parameters have been fetched, a connection to the RDS instance is made.
  4. Once the connection is established, Liquibase runs the changelog file to update the database schema.
  5. Once Liquibase has completed, a success signal is sent back to CloudFormation
  6. The connection is closed.

I’ll let you all figure out the CloudFormation code to run this, that’s the easy part!