Kafka topics dump and import

From time to time you may find yourself in need of exporting all the topics definition from a Kafka server to recreate them on a new server. Kafka command line tools are pretty handy to do that but I could not find an already cooked script that could both dump Topics definition to a CSV file and then read it back to create topics – so I made my own two scripts.

They are pretty basic, rough and tested in standard usages, but they works. If you find any bug or have any suggestion for improvement please feel free to comment it.

First one is the script to DUMP all the Topics definition in a CSV file. The CSV file will basically contain the Topic Name, the Partitions Number, the Replication Level and any other additional Configuration Parameter related to the topic. The CSV is intentionally formatted with a Keyword before the actual value in order to do some kind of Validation in the import script. Here’s the topicsList.sh script:

#!/bin/sh

./kafka-topics.sh --describe --zookeeper localhost:2181 | grep -v "Partition: " | while read Topic Partitions Replica Configs; do
	topicName=`echo $Topic | cut -d ':' -f 2`
	partitionsCount=`echo $Partitions | cut -d ':' -f 2`
	replicaCount=`echo $Replica | cut -d ':' -f 2`
	configDetails=`echo $Configs | cut -d ':' -f 2`
	echo Topic,$topicName,Partitions,$partitionsCount,Replica,$replicaCount,Configs,$configDetails
 done

Here’s a sample CSV you will get with this script:

Topic,bss.order.event,Partitions,12,Replica,2,Configs,retention.ms=345600000
Topic,bss.order.event-dlt,Partitions,3,Replica,3,Configs,

Then, finally, here’s the script to import the CSV list of topics into a new Kafka server. You have to invoke it by passing the CSV file name on the command line, and in the first lines you have some toggles to limit the maximum replica you want to apply (i.e.: you’re replicating topics on a smaller cluster) and to apply some default configurations to all the topics you’re creating. Here’s topicsCreate.sh:

#!/bin/sh

## Allows you to override the max replicas we want to set-up
MAXREPLICA=1

## Allows you to specify the Default Configurations to apply
DEFAULTCONFIGS="segment.jitter.ms=5000,segment.ms=28800000,retention.bytes=524288000,segment.bytes=524288000"

if [ "$1" == "" ] || [ ! -f "$1" ]; then
	echo "Please specify kafka topic file to import."
	echo
	echo "Format:"
	echo " - one topic per line"
	echo " - line format: Topic,<topic>,Partitions,<partitions>,Replica,<replica>,Configs,<configs>"
	exit
fi

cat $1 | while IFS="," read -r TopicKey TopicValue PartitionKey PartitionValue ReplicaKey ReplicaValue ConfigsKey ConfigsValues; do

	if [ "$TopicKey" != "Topic" ] || [ "$PartitionKey" != "Partitions" ] || [ "$ReplicaKey" != "Replica" ] || [ "$ConfigsKey" != "Configs" ]; then
		echo ""
		echo "Corrupt CSV file!"
		echo
		exit
	fi

	if [ $ReplicaValue -gt $MAXREPLICA ]; then
		ReplicaValue=$MAXREPLICA;
	fi

	echo ""
	echo "Creating topic $TopicValue with $PartitionValue partitions and $ReplicaValue replicas:"

	./kafka-topics.sh --create --bootstrap-server localhost:9092 --topic="$TopicValue" --partitions="$PartitionValue" --replication-factor="$ReplicaValue"

	echo "==> Applying default configurations for topic $TopicValue:"
	./kafka-configs.sh --alter --bootstrap-server localhost:9092  --topic="$TopicValue" --add-config="$DEFAULTCONFIGS"

	if [ "$ConfigsValues" != "" ]; then 
		echo "==> Applying custom configurations $ConfigsValues for topic $TopicValue"		
		./kafka-configs.sh --alter --bootstrap-server localhost:9092  --topic="$TopicValue" --add-config=$ConfigsValues
	fi

done

Have fun!

Verifica certificato SSL e Chain correlata

Come verificare un certificato HTTPS più la relativa chain

Siccome Google è il miglior post-it che si possa avere, incollo qui il comando per verificare il certificato SSL e la relativa Chain di un dato server HTTPS con abilitato l’SNI.

echo | openssl s_client -showcerts -connect www.google.com:443 -servername www.google.com | more

PHP Proc_Open and STDIN – STDOUT – STDERR

In gCloud Storage, our Storage-as-a-Service system, we developed some years ago some chain technologies that allowed us to expand dynamically the features of the Storage subsystem allowing it to translate incoming or outgoing files.

Some while ago we developed a chain that allows our users to securely store a file by ciphering it when it enters in the system and decipher it when it’s fetched, without our party saving the password.

After some thinking we decided to embrace already existing technologies for the purpose, and we decided to rely on openssl for the purpose.

So we had to wrap some code that was able to interact with a spawned openssl process. We did some try-and-guess and surely we did our research on google. After various attempts we found this code that proved to be pretty reliable:

stdin, stdout, stderr with proc_open in PHP

We tried first on our Mac OS machines, then on our FreeBSD server and it worked flawlessly for a couple of years. Recently one of our customer asked for a on-premises installation of a stripped-down clone of gCloud Storage, that had to run on Linux (CentOS if that’s relevant). We were pretty confident that everything would go smoothly but that wasn’t the case. When the system went live we found out that when deciphering the files it would lose some ending blocks.

Long story short we found that on Linux a child process can finish while leaving data still in the stdout buffer while – apparently – it can’t on FreeBSD.

The code we adopted had a specific control to make sure that it wasn’t trying to interact with a dead process. Specifically:

if (!is_resource($process)) break;

was the guilty portion of the code. What was happening was that openssl was closing, the code was detecting it and bailing out before fetching the whole stdout/stderr.

So in the end we came out with this:

public function procOpenHandler($command = '', $stdin = '', $maxExecutionTime = 30) {

    $timeLimit = (time() + $maxExecutionTime);

    $descriptorSpec = array(
        0 => array("pipe", "r"),
        1 => array('pipe', 'w'),
        2 => array('pipe', 'w')
    );

    $pipes = array();

    $response = new stdClass();
    $response->status = TRUE;
    $response->stdOut = '';
    $response->stdErr = '';
    $response->exitCode = '';

    $process = proc_open($command, $descriptorSpec, $pipes);
    if (!$process) {
        // could not exec command
        $response->status = FALSE;
        return $response;
    }

    $txOff = 0;
    $txLen = strlen($stdin);
    $stdoutDone = FALSE;
    $stderrDone = FALSE;

    // Make stdin/stdout/stderr non-blocking
    stream_set_blocking($pipes[0], 0);
    stream_set_blocking($pipes[1], 0);
    stream_set_blocking($pipes[2], 0);

    if ($txLen == 0) {
        fclose($pipes[0]);
    }

    while (TRUE) {

        if (time() > $timeLimit) {
            // max execution time reached
            // echo 'MAX EXECUTION TIME REACHED'; die;
            @proc_close($process);
            $response->status = FALSE;
            break;
        }

        $rx = array(); // The program's stdout/stderr

        if (!$stdoutDone) {
            $rx[] = $pipes[1];
        }

        if (!$stderrDone) {
            $rx[] = $pipes[2];
        }

        $tx = array(); // The program's stdin

        if ($txOff < $txLen) {
              $tx[] = $pipes[0];
          }
          $ex = NULL;
          stream_select($rx, $tx, $ex, NULL, NULL); // Block til r/w possible
          if (!empty($tx)) {
              $txRet = fwrite($pipes[0], substr($stdin, $txOff, 8192));
              if ($txRet !== FALSE) {
                  $txOff += $txRet;
              }
              if ($txOff >= $txLen) {
                fclose($pipes[0]);
            }
        }

        foreach ($rx as $r) {

            if ($r == $pipes[1]) {

                $response->stdOut .= fread($pipes[1], 8192);

                if (feof($pipes[1])) {

                    fclose($pipes[1]);
                    $stdoutDone = TRUE;
                }
            } else if ($r == $pipes[2]) {

                $response->stdErr .= fread($pipes[2], 8192);

                if (feof($pipes[2])) {

                    fclose($pipes[2]);
                    $stderrDone = TRUE;
                }
            }
        }
        if (!is_resource($process)) {
            $txOff = $txLen;
        }

        $processStatus = proc_get_status($process);
        if (array_key_exists('running', $processStatus) && !$processStatus['running']) {
            $txOff = $txLen;
        }

        if ($txOff >= $txLen && $stdoutDone && $stderrDone) {
            break;
        }
    }

    // Ok - close process (if still running)
    $response->exitCode = @proc_close($process);

    return $response;
}

Have Fun! 😉

MySQLfs 0.4.1

Hello everybody.

I’m pleased to announce the release 0.4.1 of MySQLfs. This is the first new official version after a long inactivity so please handle it with care. Furthermore this is my first release so, although I have double checked everything, yet I may have done some tremendous mistake.

These are main improvements in this version:

  • InnoDB usage instead of MyISAM
  • Basic transaction support
  • Upgrade to FUSE API 2.6
  • Enabled support for “big_writes” to speed up FS operations
  • New datablock size
  • FreeBSD (FUSE4BSD) support “out-of-the-box”
  • Support for new FUSE 2.6 API functions:
    • fuse::create – create a file
    • fuse::statfs – returns stats about file system usage (needed for df and such)
    • corrected used block count (needed for du and such)
  • Fixed command line issues: now you can use -obig_writes, -oallow_other (to allow other users to read the mounted filesystem) and -odefault_permissions (which, per this version, is mandatory when using mysqlfs under FreeBSD)

Please note that this is not a production-ready version (yet), but I ask you to test it wildly and please report all the issues that you may have. I’ll try to fix them.

You can download the package here: mysqlfs-0.4.1.tar (232kb).

PLEASE NOTE THAT THE DATABASE SCHEMA HAS BEEN CHANGED FROM 0.4.0 TO 0.4.1!

If your plan is to upgrade from a previous installation my suggestion is to compile the new version alongside the old one, create a new, separated FS, mount the new FS and then copying the datas from the old FS to the new one.

If you really need to do a live upgrade of an 0.4.0 database please take a look at the (unrecommended and incomplete!) upgrade script in the sql subdir.

Installation

To install mysqlfs just make sure you have installed fuse and all it’s libs, plus mysql and all his devel libraries, unpack the tar.gz and just run

./configure
make
make install (as root)

Then create a database with proper permissions and use the file schema.sql in the sql dir to create the database definitions.

Run mysqlfs –help to see al the available options.

You’re done. Have fun.

MySQLfs updates

I already spent some talk on this previously but I wrote in Italian, so let’s do a little recap for English readers.

Just recently I became involved in a project where a cluster of machine had to replicate their datas constantly in an active-active fashion and with geographical distribution.

We checked different kinds of solutions based either on drbd or zfs or hast or coda or… Well there’s a lengthy post on this issue just a few posts before this one, so I suggest you checking that.

At the end of our comparison we found the solution that suited us best to be mysqlfs. So I started investigating on that and quickly found some issues that could be improved.

Main points were:

  • performances, as mysqlfs turned out to be pretty slow under certain circumstances
  • transaction awareness
  • better integration with mysql replication

As I digged in the code, I found a pretty good general infrastructure but quite frankly I don’t think mysqlfs was really ever used in a production environment.
Apart from that, the project was quite young (latest official version was 0.4.0) and also pretty “static”, with it’s latest release dating back in 2009.

So, without any knowledge of C or Fuse whatsoever I took the sources of the latest “stable” release and began experimentating with it.

A few weeks has passed and I think I reached a very interesting point. Those are the goal that I reached as of now:

  • mysqlfs is now using InnoDB instead of MyIsam
  • all the writing operations are now enclosed into a transaction that gets rolled back if something bad happens halfway
  • using transactions also means better replication interoperability, since innodb and the binary log don’t fight for the drives. Innodb first, bin log after.
  • mysqlfs now uses fuse API version 2.6 instead of the old… Mmh… 2.5 I think.
  • using fuse’s API version allowed switching on fuse “big writes”, the switch that allow the kernel and the file system to exchange data blocks bigger than standard 4k
  • mysqlfs internal data block was changed from 4k to 128k either to reduce the blocks fragmentation, to reduce the rows in the “data block” table, to reduce the inserts, and finally to match the big write setting. Receiving 128kb of datas and then pushing 4k at time in the db wouldn’t really make any sense
  • moving to the 2.6 API allowed mysqlfs to be FreeBSD compliant. I don’t know (yet) about Mac OS or anything else, but having the devil in the party is a good plus for me.
  • today I started working on the new file system function that got introduced in e latest versions of fuse, that should to even better to speed and such

Next steps are…

  • implement file and inode locking
  • implement some kind of mechanism to do somewhat write-thru cache. Many basic system commands (cp, tar, gzip) use very small writing buffers (8/16k) so the impact of the big write switch get lost and the performances degrade a lot. I’m a bit afraid this task is above my (null) C knowledge but I have some interesting ideas in mind…
  • implement the missing functions from fuse
  • introduce some kind of referential integrity in the db, although I have to understand the performance downsize
  • introduce some kind of internal data checksum, although everything comes at a price in terms of CPU time
  • introduce some kind of external, let’s say php for example, API, to allow direct php applications to access directly the file system, while it’s mounted, maybe in different machines at the same time, without having to use the file system-functions… Wouldn’t be cool to have a web app, a Linux server and a FreeBSD server all working on the same file system at the same time? Yes I know, I’m insane 🙂
  • improve replication interoperability introducing server signatures in stored datas
  • introduce some kind of command line tools to interact with the db and check total size usage and such
  • in a long future introduce some kind of versioning algorithm for the stored datas

Right now the modification I made to mysqlfs aren’t public yet, as I couldn’t really understand the status of the project on sourceforge. Furthermore it’s code base on sourceforge is not aligned to the version that was on the installer (the one I started working on), and it’s stored in svn that I don’t absolutely know how to use.

I know the latter ones aren’t big issue, but my spare time is very thin, and I definitely can’t waste time in learning svn or manually merging the “new” code that’s in the svn and that is different from the tgz I started from. If any of you is willing to do it I’ll be glad to help.

My internal git repository isn’t public yet because of laziness and because I’d like to publish something that’s at least usable and I have to fix a couple of problems before it could be defined “idiot proof”.

In the meantime if any of you is willing to try I’d be very glad to share the modified code, or if any of you is willing to contribute then I’d be twice as glad as long as we try to keep development a bit aligned.

Cheers