spark-solrとキャッキャウフフしてみる

Solr Advent Calendar 2016

こちらの記事は Solr Advent Calendar 2016 の20日目の記事です!

Lucene/Solr Revolution 2016

先日ボストンで開催された Lucene/Solr Revolution に行ってきたのですが(全体的なレポートは下記のブログを参照)、

その中で『SearchHub or How to Spend Your Summer Keeping it Real』という、Lucidworks CTOのGrant Ingersollさんのセッションが印象的でした。SearchHub という、Amazon EC2(m4.2xlarge)上にDockerコンテナを使って構築されたサービスを技術的に深掘りするセッション。
私が特に興味を持ったのは↓のようにSolrとSparkを組み合わせて〜、というところ。SparkSQLは非常にポピュラーで、ソリューションアーキテクトとしてイロイロなお客様とお話させていただいていますが、かなりデファクトスタンダートに近い地位なのかな?という印象です。そんな中でサーチエンジンならではのケーパビリティというか、イイとこ取りできるのイイのにな、なんて思ってたら、ちょうどそういうことなのかしら、と。
[slideshare id=qhPKhwPKpQx72z?startSlide=8&w=595&h=485&fb=0&mw=0&mh=0&style=border: 1px solid #CCC; border-width: 1px; margin-bottom: 5px; max-width: 100%;&sc=no]

以前DataFrameとかZeppelinとかについて書いたスライド↓
[slideshare id=ubRz97mnaJzLeN&w=595&h=485&fb=0&mw=0&mh=0&style=border: 1px solid #CCC; border-width: 1px; margin-bottom: 5px; max-width: 100%;&sc=no]

『SearchHub or How to Spend Your Summer Keeping it Real』のセッションの動画は↓こちらです。
[youtube https://www.youtube.com/watch?v=_bniFi8qV9g]

Solr, Spark, spark-solr

ここのところSolrもSparkもあまり自分で手を動かして触れられてなかったりするので、今回はEC2上に自前でインストールしてホゲホゲやっていきたいと思います。こういうのってなかなかサクっといかなかったりするのですが、自分の備忘録も兼ねて、極力細かめにログを残していきたいと思います。
# 後から読み返したら、ズドーンと長い割に、コマンドラインだけで完結していて、それっぽいUIが無いのでなんかアレでしたね…笑

EC2にSolrをインストール

■ yum update

$ sudo yum update
読み込んだプラグイン:priorities, update-motd, upgrade-helper
依存性の解決をしています
--> トランザクションの確認を実行しています。
---> パッケージ aws-cfn-bootstrap.noarch 0:1.4-13.8.amzn1 を 更新
〜略〜
python27-botocore.noarch 0:1.4.74-1.60.amzn1 system-release.noarch 0:2016.09-0.8
tzdata.noarch 0:2016j-1.67.amzn1 tzdata-java.noarch 0:2016j-1.67.amzn1
完了しました!

■ Java8化

$ java -version
java version "1.7.0_121"
OpenJDK Runtime Environment (amzn-2.6.8.1.69.amzn1-x86_64 u121-b00)
OpenJDK 64-Bit Server VM (build 24.121-b00, mixed mode)
$ sudo yum -y install java-1.8.0-openjdk-devel
読み込んだプラグイン:priorities, update-motd, upgrade-helper
依存性の解決をしています
--> トランザクションの確認を実行しています。
$ sudo alternatives --config java
2 プログラムがあり 'java' を提供します。
選択 コマンド
-----------------------------------------------
*+ 1 /usr/lib/jvm/jre-1.7.0-openjdk.x86_64/bin/java
2 /usr/lib/jvm/jre-1.8.0-openjdk.x86_64/bin/java
Enter を押して現在の選択 [+] を保持するか、選択番号を入力します:2
$ java -version
openjdk version "1.8.0_111"
OpenJDK Runtime Environment (build 1.8.0_111-b15)
OpenJDK 64-Bit Server VM (build 25.111-b15, mixed mode)

■ Solrをダウンロード

$ wget http://ftp.tsukuba.wide.ad.jp/software/apache/lucene/solr/6.3.0/solr-6.3.0.tgz
--2016-12-19 05:58:54-- http://ftp.tsukuba.wide.ad.jp/software/apache/lucene/solr/6.3.0/solr-6.3.0.tgz
ftp.tsukuba.wide.ad.jp (ftp.tsukuba.wide.ad.jp) をDNSに問いあわせています... 203.178.132.80, 2001:200:0:7c06::9393
ftp.tsukuba.wide.ad.jp (ftp.tsukuba.wide.ad.jp)|203.178.132.80|:80 に接続しています... 接続しました。
HTTP による接続要求を送信しました、応答を待っています... 200 OK
長さ: 146004505 (139M) [application/x-gzip]
`solr-6.3.0.tgz' に保存中
solr-6.3.0.tgz 100%[============================================================>] 139.24M 83.6MB/s in 1.7s
2016-12-19 05:58:56 (83.6 MB/s) - `solr-6.3.0.tgz' へ保存完了 [146004505/146004505]
$ tar xvf solr-6.3.0.tgz
solr-6.3.0/LUCENE_CHANGES.txt
solr-6.3.0/contrib/analysis-extras/lib/
solr-6.3.0/contrib/clustering/lib/
solr-6.3.0/contrib/dataimporthandler-extras/lib/
solr-6.3.0/contrib/extraction/lib/
〜略〜
solr-6.3.0/docs/solr-velocity/prettify.js
solr-6.3.0/docs/solr-velocity/script.js
solr-6.3.0/docs/solr-velocity/stylesheet.css

■ SolrCloudのセットアップ
ナニコレ、こんな感じでサクサク出来るのね、と。ちょっと構えてたのでビックリしました。

$ bin/solr -e cloud
Welcome to the SolrCloud example!
This interactive session will help you launch a SolrCloud cluster on your local workstation.
To begin, how many Solr nodes would you like to run in your local cluster? (specify 1-4 nodes) [2]:
2
Ok, let's start up 2 Solr nodes for your example SolrCloud cluster.
Please enter the port for node1 [8983]:
8983
Please enter the port for node2 [7574]:
7574
Creating Solr home directory /home/ec2-user/solr-6.3.0/example/cloud/node1/solr
Cloning /home/ec2-user/solr-6.3.0/example/cloud/node1 into
   /home/ec2-user/solr-6.3.0/example/cloud/node2
Starting up Solr on port 8983 using command:
bin/solr start -cloud -p 8983 -s "example/cloud/node1/solr"
Waiting up to 180 seconds to see Solr running on port 8983 [|]
Started Solr server on port 8983 (pid=8625). Happy searching!
Starting up Solr on port 7574 using command:
bin/solr start -cloud -p 7574 -s "example/cloud/node2/solr" -z localhost:9983
Waiting up to 180 seconds to see Solr running on port 7574 []
Started Solr server on port 7574 (pid=8840). Happy searching!
Now let's create a new collection for indexing documents in your 2-node cluster.
Please provide a name for your new collection: [gettingstarted]
gettingstarted
How many shards would you like to split gettingstarted into? [2]
2
How many replicas per shard would you like to create? [2]
2
Please choose a configuration for the gettingstarted collection, available options are:
basic_configs, data_driven_schema_configs, or sample_techproducts_configs [data_driven_schema_configs]
basic_configs
Connecting to ZooKeeper at localhost:9983 ...
Uploading /home/ec2-user/solr-6.3.0/server/solr/configsets/basic_configs/conf for config gettingstarted to ZooKeeper at localhost:9983
Creating new collection 'gettingstarted' using command:
http://localhost:8983/solr/admin/collections?action=CREATE&name=gettingstarted&numShards=2&replicationFactor=2&maxShardsPerNode=2&collection.configName=gettingstarted
{ "responseHeader":{ "status":0, "QTime":5433}, "success":{ "172.31.14.210:8983_solr":{ "responseHeader":{ "status":0, "QTime":3863}, "core":"gettingstarted_shard1_replica1"}, "172.31.14.210:7574_solr":{ "responseHeader":{ "status":0, "QTime":4210}, "core":"gettingstarted_shard1_replica2"}}} Enabling auto soft-commits with maxTime 3 secs using the Config API POSTing request to Config API: http://localhost:8983/solr/gettingstarted/config {"set-property":{"updateHandler.autoSoftCommit.maxTime":"3000"}} Successfully set-property updateHandler.autoSoftCommit.maxTime to 3000 SolrCloud example running, please visit: http://localhost:8983/solr

■ Javaなプロセス
フムフム、と。

$ ps -ef | grep java
ec2-user  8625     1  2 06:03 pts/0    00:00:16 /usr/lib/jvm/jre/bin/java -server -Xms512m -Xmx512m -XX:NewRatio=3 -XX:SurvivorRatio=4 -XX:TargetSurvivorRatio=90 -XX:MaxTenuringThreshold=8 -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:ConcGCThreads=4 -XX:ParallelGCThreads=4 -XX:+CMSScavengeBeforeRemark -XX:PretenureSizeThreshold=64m -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=50 -XX:CMSMaxAbortablePrecleanTime=6000 -XX:+CMSParallelRemarkEnabled -XX:+ParallelRefProcEnabled -XX:-OmitStackTraceInFastThrow -verbose:gc -XX:+PrintHeapAtGC -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+PrintTenuringDistribution -XX:+PrintGCApplicationStoppedTime -Xloggc:/home/ec2-user/solr-6.3.0/example/cloud/node1/solr/../logs/solr_gc.log -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=9 -XX:GCLogFileSize=20M -DzkClientTimeout=15000 -DzkRun -Dsolr.log.dir=/home/ec2-user/solr-6.3.0/example/cloud/node1/solr/../logs -Djetty.port=8983 -DSTOP.PORT=7983 -DSTOP.KEY=solrrocks -Duser.timezone=UTC -Djetty.home=/home/ec2-user/solr-6.3.0/server -Dsolr.solr.home=/home/ec2-user/solr-6.3.0/example/cloud/node1/solr -Dsolr.install.dir=/home/ec2-user/solr-6.3.0 -Dlog4j.configuration=file:/home/ec2-user/solr-6.3.0/example/resources/log4j.properties -Xss256k -Dsolr.log.muteconsole -XX:OnOutOfMemoryError=/home/ec2-user/solr-6.3.0/bin/oom_solr.sh 8983 /home/ec2-user/solr-6.3.0/example/cloud/node1/solr/../logs -jar start.jar --module=http
ec2-user  8840     1  2 06:03 pts/0    00:00:14 /usr/lib/jvm/jre/bin/java -server -Xms512m -Xmx512m -XX:NewRatio=3 -XX:SurvivorRatio=4 -XX:TargetSurvivorRatio=90 -XX:MaxTenuringThreshold=8 -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:ConcGCThreads=4 -XX:ParallelGCThreads=4 -XX:+CMSScavengeBeforeRemark -XX:PretenureSizeThreshold=64m -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=50 -XX:CMSMaxAbortablePrecleanTime=6000 -XX:+CMSParallelRemarkEnabled -XX:+ParallelRefProcEnabled -XX:-OmitStackTraceInFastThrow -verbose:gc -XX:+PrintHeapAtGC -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+PrintTenuringDistribution -XX:+PrintGCApplicationStoppedTime -Xloggc:/home/ec2-user/solr-6.3.0/example/cloud/node2/solr/../logs/solr_gc.log -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=9 -XX:GCLogFileSize=20M -DzkClientTimeout=15000 -DzkHost=localhost:9983 -Dsolr.log.dir=/home/ec2-user/solr-6.3.0/example/cloud/node2/solr/../logs -Djetty.port=7574 -DSTOP.PORT=6574 -DSTOP.KEY=solrrocks -Duser.timezone=UTC -Djetty.home=/home/ec2-user/solr-6.3.0/server -Dsolr.solr.home=/home/ec2-user/solr-6.3.0/example/cloud/node2/solr -Dsolr.install.dir=/home/ec2-user/solr-6.3.0 -Dlog4j.configuration=file:/home/ec2-user/solr-6.3.0/example/resources/log4j.properties -Xss256k -Dsolr.log.muteconsole -XX:OnOutOfMemoryError=/home/ec2-user/solr-6.3.0/bin/oom_solr.sh 7574 /home/ec2-user/solr-6.3.0/example/cloud/node2/solr/../logs -jar start.jar --module=http
ec2-user  9064  2795  0 06:13 pts/0    00:00:00 grep --color=auto java

■ Solrの諸々確認

$ bin/solr status
Found 2 Solr nodes:
Solr process 8625 running on port 8983
{
  "solr_home":"/home/ec2-user/solr-6.3.0/example/cloud/node1/solr",
  "version":"6.3.0 a66a44513ee8191e25b477372094bfa846450316 - shalin - 2016-11-02 19:52:42",
  "startTime":"2016-12-19T06:03:16.565Z",
  "uptime":"0 days, 0 hours, 12 minutes, 1 seconds",
  "memory":"60.1 MB (%12.2) of 490.7 MB",
  "cloud":{
    "ZooKeeper":"localhost:9983",
    "liveNodes":"2",
    "collections":"1"}}
Solr process 8840 running on port 7574
{
  "solr_home":"/home/ec2-user/solr-6.3.0/example/cloud/node2/solr",
  "version":"6.3.0 a66a44513ee8191e25b477372094bfa846450316 - shalin - 2016-11-02 19:52:42",
  "startTime":"2016-12-19T06:03:20.397Z",
  "uptime":"0 days, 0 hours, 11 minutes, 57 seconds",
  "memory":"76.8 MB (%15.7) of 490.7 MB",
  "cloud":{
    "ZooKeeper":"localhost:9983",
    "liveNodes":"2",
    "collections":"1"}}
[ec2-user@ip-172-31-14-210 solr-6.3.0]$ bin/solr healthcheck -c gettingstarted
{
  "collection":"gettingstarted",
  "status":"healthy",
  "numDocs":0,
  "numShards":2,
  "shards":[
    {
      "shard":"shard1",
      "status":"healthy",
      "replicas":[
        {
          "name":"core_node3",
          "url":"http://172.31.14.210:8983/solr/gettingstarted_shard1_replica1/",
          "numDocs":0,
          "status":"active",
          "uptime":"0 days, 0 hours, 12 minutes, 18 seconds",
          "memory":"63.5 MB (%12.9) of 490.7 MB",
          "leader":true},
        {
          "name":"core_node4",
          "url":"http://172.31.14.210:7574/solr/gettingstarted_shard1_replica2/",
          "numDocs":0,
          "status":"active",
          "uptime":"0 days, 0 hours, 12 minutes, 14 seconds",
          "memory":"79 MB (%16.1) of 490.7 MB"}]},
    {
      "shard":"shard2",
      "status":"healthy",
      "replicas":[
        {
          "name":"core_node1",
          "url":"http://172.31.14.210:8983/solr/gettingstarted_shard2_replica1/",
          "numDocs":0,
          "status":"active",
          "uptime":"0 days, 0 hours, 12 minutes, 18 seconds",
          "memory":"63.9 MB (%13) of 490.7 MB",
          "leader":true},
        {
          "name":"core_node2",
          "url":"http://172.31.14.210:7574/solr/gettingstarted_shard2_replica2/",
          "numDocs":0,
          "status":"active",
          "uptime":"0 days, 0 hours, 12 minutes, 14 seconds",
          "memory":"79.4 MB (%16.2) of 490.7 MB"}]}]}

EC2にSparkをインストール

■ Sparkをダウンロード

$ wget http://d3kbcqa49mib13.cloudfront.net/spark-2.0.2-bin-hadoop2.7.tgz
--2016-12-19 06:21:44--  http://d3kbcqa49mib13.cloudfront.net/spark-2.0.2-bin-hadoop2.7.tgz
d3kbcqa49mib13.cloudfront.net (d3kbcqa49mib13.cloudfront.net) をDNSに問いあわせています... 54.192.233.162, 54.192.233.191, 54.192.233.204, ...
d3kbcqa49mib13.cloudfront.net (d3kbcqa49mib13.cloudfront.net)|54.192.233.162|:80 に接続しています... 接続しました。
HTTP による接続要求を送信しました、応答を待っています... 200 OK
長さ: 187426587 (179M) [application/x-tar]
`spark-2.0.2-bin-hadoop2.7.tgz' に保存中
spark-2.0.2-bin-hadoop2.7.tgz     100%[============================================================>] 178.74M   113MB/s    in 1.6s
2016-12-19 06:21:46 (113 MB/s) - `spark-2.0.2-bin-hadoop2.7.tgz' へ保存完了 [187426587/187426587]

■ Sparkのアーカイブを解凍して移動

$ tar xvf spark-2.0.2-bin-hadoop2.7.tgz
spark-2.0.2-bin-hadoop2.7/
spark-2.0.2-bin-hadoop2.7/NOTICE
spark-2.0.2-bin-hadoop2.7/jars/
〜略〜
park-2.0.2-bin-hadoop2.7/yarn/
spark-2.0.2-bin-hadoop2.7/yarn/spark-2.0.2-yarn-shuffle.jar
spark-2.0.2-bin-hadoop2.7/README.md
$ cd spark-2.0.2-bin-hadoop2.7

■ Pythonでホゲホゲ

$ bin/pyspark
Python 2.7.12 (default, Sep  1 2016, 22:14:00)
[GCC 4.8.3 20140911 (Red Hat 4.8.3-9)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
16/12/19 06:24:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _ / _ / _ `/ __/  '_/
   /__ / .__/_,_/_/ /_/_   version 2.0.2
      /_/
Using Python version 2.7.12 (default, Sep  1 2016 22:14:00)
SparkSession available as 'spark'.
>>> textFile = sc.textFile("README.md")
>>> textFile.count()
99
>>> textFile.first()
u'# Apache Spark'

spark-solrのGetting Started

■ spark-solrのパッケージをインストール → ★失敗★

$ bin/spark-shell --packages "com.lucidworks.spark:spark-solr:3.0.0-alpha"
Ivy Default Cache set to: /home/ec2-user/.ivy2/cache
The jars for the packages stored in: /home/ec2-user/.ivy2/jars
:: loading settings :: url = jar:file:/home/ec2-user/spark-2.0.2-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
com.lucidworks.spark#spark-solr added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
	confs: [default]
	found com.lucidworks.spark#spark-solr;3.0.0-alpha in central
〜略〜
	==== spark-packages: tried
	  http://dl.bintray.com/spark-packages/maven/org/restlet/jee/org.restlet.ext.servlet/2.3.0/org.restlet.ext.servlet-2.3.0.pom
	  -- artifact org.restlet.jee#org.restlet.ext.servlet;2.3.0!org.restlet.ext.servlet.jar:
	  http://dl.bintray.com/spark-packages/maven/org/restlet/jee/org.restlet.ext.servlet/2.3.0/org.restlet.ext.servlet-2.3.0.jar
		::::::::::::::::::::::::::::::::::::::::::::::
		::          UNRESOLVED DEPENDENCIES         ::
		::::::::::::::::::::::::::::::::::::::::::::::
		:: org.restlet.jee#org.restlet;2.3.0: not found
		:: org.restlet.jee#org.restlet.ext.servlet;2.3.0: not found
		::::::::::::::::::::::::::::::::::::::::::::::
$ echo $?
1

■ 個別にrestletやりますかね、、と。
mavenを自前でインストールして、↓のようなpom.xmlを作って、 mvn dependency:copy-dependencies でローカルの.m2ディレクトリに依存ライブラリを持ってきます。
(ホントはこういうことやりたくないッスけどね、、、)

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.shinodogg</groupId>
  <artifactId>sample</artifactId>
  <packaging>jar</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>Maven Quick Start Archetype</name>
  <repositories>
    <repository>
      <id>maven-restlet</id>
      <name>Restlet repository</name>
      <url>https://maven.restlet.com</url>
    </repository>
  </repositories>
  <properties>
    <restlet-version>2.3.0</restlet-version>
  </properties>
  <dependencies>
    <dependency>
      <groupId>org.restlet.jee</groupId>
      <artifactId>org.restlet.ext.servlet</artifactId>
      <version>${restlet-version}</version>
    </dependency>
    <dependency>
      <groupId>org.restlet.jee</groupId>
      <artifactId>org.restlet</artifactId>
      <version>${restlet-version}</version>
    </dependency>
    <dependency>
      <groupId>org.restlet.jse</groupId>
      <artifactId>org.restlet</artifactId>
      <version>${restlet-version}</version>
    </dependency>
    <dependency>
      <groupId>org.restlet.jse</groupId>
      <artifactId>org.restlet.ext.jackson</artifactId>
      <version>${restlet-version}</version>
    </dependency>
  </dependencies>
</project>

■ spark-shell –packages “com.lucidworks.spark:spark-solr:3.0.0-alpha” を再チャレンジ⇒★無事起動!★

$ bin/spark-shell --packages "com.lucidworks.spark:spark-solr:3.0.0-alpha"
Ivy Default Cache set to: /home/ec2-user/.ivy2/cache
The jars for the packages stored in: /home/ec2-user/.ivy2/jars
:: loading settings :: url = jar:file:/home/ec2-user/spark-2.0.2-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
com.lucidworks.spark#spark-solr added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
	confs: [default]
	found com.lucidworks.spark#spark-solr;3.0.0-alpha in central
〜略〜
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |  173  |   2   |   2   |   24  ||  149  |   1   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
	confs: [default]
	149 artifacts copied, 0 already retrieved (79674kB/104ms)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
2016-12-19 07:36:40,971 [main] WARN  NativeCodeLoader  - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2016-12-19 07:36:41,955 [main] WARN  SparkContext  - Use an existing SparkContext, some configuration may not take effect.
Spark context Web UI available at http://172.31.14.210:4040
Spark context available as 'sc' (master = local[*], app id = local-1482133001755).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _ / _ / _ `/ __/  '_/
   /___/ .__/_,_/_/ /_/_   version 2.0.2
      /_/
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_111)
Type in expressions to have them evaluated.
Type :help for more information.

■ SparkからSolrへアクセス
コレクション名とZooKeeperのパスを設定

scala> val options = Map(
     |   "collection" -> "gettingstarted",
     |   "zkhost" -> "localhost:9983"
     | )
options: scala.collection.immutable.Map[String,String] = Map(collection -> gettingstarted, zkhost -> localhost:9983)

いよいよメインイベント…。⇒失敗…??

scala> val df = spark.read.format("solr").options(options).load
2016-12-19 07:47:26,557 [main] WARN  ObjectStore  - Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
2016-12-19 07:47:26,651 [main] WARN  ObjectStore  - Failed to get database default, returning NoSuchObjectException
2016-12-19 07:47:27,479 [main] ERROR RetryingHMSHandler  - AlreadyExistsException(message:Database default already exists)
	at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_database(HiveMetaStore.java:891)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
〜略〜
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
df: org.apache.spark.sql.DataFrame = [*_ancestor_path: string, *_txt_hi: string ... 72 more fields]

※ 一旦、そのまま進んでみます!
ってことで今動いているSolrをシャットダウンします。

$ bin/solr stop -all
Sending stop command to Solr running on port 8983 ... waiting up to 180 seconds to allow Jetty process 8625 to stop gracefully.
Sending stop command to Solr running on port 7574 ... waiting up to 180 seconds to allow Jetty process 8840 to stop gracefully.

Indexing and Querying NYC yellow taxi csv data

↓に沿ってやっていきます。
https://github.com/lucidworks/spark-solr/blob/master/docs/examples/csv.adoc
■ 新しくSolrのコレクションを作ります。

$ bin/solr -c && bin/solr create -c test-spark-solr -shards 2
Waiting up to 180 seconds to see Solr running on port 8983 []
Started Solr server on port 8983 (pid=32395). Happy searching!
Connecting to ZooKeeper at localhost:9983 ...
Uploading /home/ec2-user/solr-6.3.0/server/solr/configsets/data_driven_schema_configs/conf for config test-spark-solr to ZooKeeper at localhost:9983
Creating new collection 'test-spark-solr' using command:
http://localhost:8983/solr/admin/collections?action=CREATE&name=test-spark-solr&numShards=2&replicationFactor=1&maxShardsPerNode=2&collection.configName=test-spark-solr
{ "responseHeader":{ "status":0, "QTime":3759}, "success":{"172.31.14.210:8983_solr":{ "responseHeader":{ "status":0, "QTime":2552}, "core":"test-spark-solr_shard1_replica1"}}}

■ ダウンロードしてきたCSVファイルをDataFrameとして読み込んで整形

scala> val csvFileLocation = "/home/ec2-user/nyc_yellow_taxi_sample_1k.csv"
csvFileLocation: String = /home/ec2-user/nyc_yellow_taxi_sample_1k.csv
scala> var csvDF = spark.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load(csvFileLocation)
csvDF: org.apache.spark.sql.DataFrame = [vendor_id: int, pickup_datetime: timestamp ... 17 more fields]
scala> csvDF = csvDF.filter("pickup_latitude >= -90 AND pickup_latitude <= 90 AND pickup_longitude >= -180 AND pickup_longitude <= 180")
csvDF: org.apache.spark.sql.DataFrame = [vendor_id: int, pickup_datetime: timestamp ... 17 more fields]
scala> csvDF = csvDF.filter("dropoff_latitude >= -90 AND dropoff_latitude <= 90 AND dropoff_longitude >= -180 AND dropoff_longitude <= 180")
csvDF: org.apache.spark.sql.DataFrame = [vendor_id: int, pickup_datetime: timestamp ... 17 more fields]
scala> csvDF = csvDF.withColumn("pickup", concat_ws(",", col("pickup_latitude"),col("pickup_longitude"))).drop("pickup_latitude").drop("pickup_longitude")
csvDF: org.apache.spark.sql.DataFrame = [vendor_id: int, pickup_datetime: timestamp ... 16 more fields]
scala> csvDF = csvDF.withColumn("dropoff", concat_ws(",", col("dropoff_latitude"),col("dropoff_longitude"))).drop("dropoff_latitude").drop("dropoff_longitude")
csvDF: org.apache.spark.sql.DataFrame = [vendor_id: int, pickup_datetime: timestamp ... 15 more fields]

■ DataFrameをSolrに書き込み

scala> val options = Map(
     |   "zkhost" -> "localhost:9983",
     |   "collection" -> "test-spark-solr",
     |   "gen_uniq_key" -> "true"
     | )
options: scala.collection.immutable.Map[String,String] = Map(zkhost -> localhost:9983, collection -> test-spark-solr, gen_uniq_key -> true)
scala> csvDF.write.format("solr").options(options).mode(org.apache.spark.sql.SaveMode.Overwrite).save

■ Solrからデータをテーブルとして読み込んでSQLを実行
一時テーブルとして設定(なんかwarning出てるけど…)

scala> val options = Map(
     |   "zkHost" -> "localhost:9983",
     |   "collection" -> "test-spark-solr"
     | )
options: scala.collection.immutable.Map[String,String] = Map(zkHost -> localhost:9983, collection -> test-spark-solr)
scala> val df = spark.read.format("solr").options(options).load
df: org.apache.spark.sql.DataFrame = [*_ancestor_path: string, *_txt_hi: string ... 93 more fields]
scala> df.printSchema()
root
 |-- *_ancestor_path: string (nullable = true)
 |-- *_txt_hi: string (nullable = true)
 |-- *_ws: string (nullable = true)
 |-- *_txt_da: string (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- *_fs: double (nullable = true)
 |-- *_bs: boolean (nullable = true)
 |-- random_*: string (nullable = true)
 |-- *_txt_cjk: string (nullable = true)
 |-- *_tis: long (nullable = true)
 |-- *_txt_en_split: string (nullable = true)
 |-- *_phon_en: string (nullable = true)
 |-- *_txt_tr: string (nullable = true)
 |-- *_txt_eu: string (nullable = true)
 |-- vendor_id: long (nullable = true)
 |-- *_srpt: string (nullable = true)
 |-- *_ls: long (nullable = true)
 |-- *_ss: string (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- *_txt_el: string (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- *_txt_ro: string (nullable = true)
 |-- *_b: boolean (nullable = true)
 |-- *_tdts: timestamp (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- *_txt_hu: string (nullable = true)
 |-- *_txt_en_split_tight: string (nullable = true)
 |-- *_txt_hy: string (nullable = true)
 |-- *_s: string (nullable = true)
 |-- *_is: long (nullable = true)
 |-- *_txt_fi: string (nullable = true)
 |-- *_dt: timestamp (nullable = true)
 |-- *_txt_nl: string (nullable = true)
 |-- *_tf: double (nullable = true)
 |-- *_txt_bg: string (nullable = true)
 |-- *_txt_cz: string (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- *_txt_rev: string (nullable = true)
 |-- *_txt_id: string (nullable = true)
 |-- *_txt_it: string (nullable = true)
 |-- *_txt_th: string (nullable = true)
 |-- *_txt_fa: string (nullable = true)
 |-- *_tfs: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- *_txt_en: string (nullable = true)
 |-- id: string (nullable = false)
 |-- *_c: string (nullable = true)
 |-- pickup: string (nullable = true)
 |-- *_tdt: timestamp (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- *_txt_ga: string (nullable = true)
 |-- *_txt_no: string (nullable = true)
 |-- *_descendent_path: string (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- *_txt_es: string (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- *_txt_ja: string (nullable = true)
 |-- *_t: string (nullable = true)
 |-- attr_*: string (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- dropoff: string (nullable = true)
 |-- *_txt: string (nullable = true)
 |-- *_txt_gl: string (nullable = true)
 |-- *_ds: double (nullable = true)
 |-- *_f: double (nullable = true)
 |-- *_ti: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- *_l: long (nullable = true)
 |-- extra: double (nullable = true)
 |-- *_tl: long (nullable = true)
 |-- *_p: string (nullable = true)
 |-- *_txt_ca: string (nullable = true)
 |-- *_tds: double (nullable = true)
 |-- *_s_lower: string (nullable = true)
 |-- *_tls: long (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- _version_: long (nullable = true)
 |-- *_point: string (nullable = true)
 |-- *_d: double (nullable = true)
 |-- *_dts: timestamp (nullable = true)
 |-- *_txt_lv: string (nullable = true)
 |-- rate_code_id: long (nullable = true)
 |-- *_txt_de: string (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- *_coordinate: double (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- *_txt_ar: string (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- *_td: double (nullable = true)
 |-- *_txt_ru: string (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- *_i: long (nullable = true)
 |-- *_txt_fr: string (nullable = true)
 |-- *_txt_sv: string (nullable = true)
 |-- *_txt_pt: string (nullable = true)
scala> df.registerTempTable("trips")
warning: there was one deprecation warning; re-run with -deprecation for details

諸々importしてSQLContext作って(コレもなんかメッセージ出てるけど、、)、DataFrameを効率化のためにキャッシュした後、SQLを実行!!

scala> import org.apache.spark.SparkContext
import org.apache.spark.SparkContex
scala> import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext
scala> val conf = new SparkConf().setAppName("Spark SQL from MySQL").setMaster("local[*]")
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@1eb207c3
scala> val sc = new SparkContext(conf)
org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at:
〜略〜
  at org.apache.spark.SparkContext.<init>(SparkContext.scala:86)
  ... 48 elided
scala> val sqlContext = new SQLContext(sc)
warning: there was one deprecation warning; re-run with -deprecation for details
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@1cd853ee
val conf = new SparkConf().setAppName("Spark SQL from MySQL").setMaster("local[*]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
scala> sqlContext.sql("SELECT avg(tip_amount), avg(fare_amount) FROM trips").show()
+------------------+------------------+
|   avg(tip_amount)|  avg(fare_amount)|
+------------------+------------------+
|1.5674853801169588|11.802144249512668|
+------------------+------------------+
scala> sqlContext.sql("SELECT max(tip_amount), max(fare_amount) FROM trips WHERE trip_distance > 10").show()
+---------------+----------------+
|max(tip_amount)|max(fare_amount)|
+---------------+----------------+
|          16.44|            68.0|
+---------------+----------------+

なんか結果が csv.adoc と違う!けど、まぁ操作感は掴めました。これならマスター的に読み込んだSpark上のデータとSolrのデータをjoinしてキャッキャウフフできそうです(*´∀`*)
近日、Solrの新しい日本語の本が出るそうです!こちらも乞うご期待 🙂
↓にはお世話になりました〜(´▽`)ノ

[改訂新版] Apache Solr入門 ~オープンソース全文検索エンジン (Software Design plus)
大谷 純 阿部 慎一朗 大須賀 稔 北野 太郎 鈴木 教嗣 平賀 一昭
技術評論社
売り上げランキング: 27,813

コメント

タイトルとURLをコピーしました