Solr Advent Calendar 2016
こちらの記事は Solr Advent Calendar 2016 の20日目の記事です!
Lucene/Solr Revolution 2016
先日ボストンで開催された Lucene/Solr Revolution に行ってきたのですが(全体的なレポートは下記のブログを参照)、
- Lucene/Solr Revolution 2016 – Welcome Reception | shinodogg.com
- Lucene/Solr Revolution 2016 – Day1 | shinodogg.com
- Lucene/Solr Revolution 2016 – Day2 | shinodogg.com
その中で『SearchHub or How to Spend Your Summer Keeping it Real』という、Lucidworks CTOのGrant Ingersollさんのセッションが印象的でした。SearchHub という、Amazon EC2(m4.2xlarge)上にDockerコンテナを使って構築されたサービスを技術的に深掘りするセッション。
私が特に興味を持ったのは↓のようにSolrとSparkを組み合わせて〜、というところ。SparkSQLは非常にポピュラーで、ソリューションアーキテクトとしてイロイロなお客様とお話させていただいていますが、かなりデファクトスタンダートに近い地位なのかな?という印象です。そんな中でサーチエンジンならではのケーパビリティというか、イイとこ取りできるのイイのにな、なんて思ってたら、ちょうどそういうことなのかしら、と。
[slideshare id=qhPKhwPKpQx72z?startSlide=8&w=595&h=485&fb=0&mw=0&mh=0&style=border: 1px solid #CCC; border-width: 1px; margin-bottom: 5px; max-width: 100%;&sc=no]
以前DataFrameとかZeppelinとかについて書いたスライド↓
[slideshare id=ubRz97mnaJzLeN&w=595&h=485&fb=0&mw=0&mh=0&style=border: 1px solid #CCC; border-width: 1px; margin-bottom: 5px; max-width: 100%;&sc=no]
『SearchHub or How to Spend Your Summer Keeping it Real』のセッションの動画は↓こちらです。
[youtube https://www.youtube.com/watch?v=_bniFi8qV9g]
Solr, Spark, spark-solr
ここのところSolrもSparkもあまり自分で手を動かして触れられてなかったりするので、今回はEC2上に自前でインストールしてホゲホゲやっていきたいと思います。こういうのってなかなかサクっといかなかったりするのですが、自分の備忘録も兼ねて、極力細かめにログを残していきたいと思います。
# 後から読み返したら、ズドーンと長い割に、コマンドラインだけで完結していて、それっぽいUIが無いのでなんかアレでしたね…笑
EC2にSolrをインストール
■ yum update
$ sudo yum update 読み込んだプラグイン:priorities, update-motd, upgrade-helper 依存性の解決をしています --> トランザクションの確認を実行しています。 ---> パッケージ aws-cfn-bootstrap.noarch 0:1.4-13.8.amzn1 を 更新 〜略〜 python27-botocore.noarch 0:1.4.74-1.60.amzn1 system-release.noarch 0:2016.09-0.8 tzdata.noarch 0:2016j-1.67.amzn1 tzdata-java.noarch 0:2016j-1.67.amzn1 完了しました!
■ Java8化
$ java -version java version "1.7.0_121" OpenJDK Runtime Environment (amzn-2.6.8.1.69.amzn1-x86_64 u121-b00) OpenJDK 64-Bit Server VM (build 24.121-b00, mixed mode) $ sudo yum -y install java-1.8.0-openjdk-devel 読み込んだプラグイン:priorities, update-motd, upgrade-helper 依存性の解決をしています --> トランザクションの確認を実行しています。 $ sudo alternatives --config java 2 プログラムがあり 'java' を提供します。 選択 コマンド ----------------------------------------------- *+ 1 /usr/lib/jvm/jre-1.7.0-openjdk.x86_64/bin/java 2 /usr/lib/jvm/jre-1.8.0-openjdk.x86_64/bin/java Enter を押して現在の選択 [+] を保持するか、選択番号を入力します:2 $ java -version openjdk version "1.8.0_111" OpenJDK Runtime Environment (build 1.8.0_111-b15) OpenJDK 64-Bit Server VM (build 25.111-b15, mixed mode)
■ Solrをダウンロード
$ wget http://ftp.tsukuba.wide.ad.jp/software/apache/lucene/solr/6.3.0/solr-6.3.0.tgz --2016-12-19 05:58:54-- http://ftp.tsukuba.wide.ad.jp/software/apache/lucene/solr/6.3.0/solr-6.3.0.tgz ftp.tsukuba.wide.ad.jp (ftp.tsukuba.wide.ad.jp) をDNSに問いあわせています... 203.178.132.80, 2001:200:0:7c06::9393 ftp.tsukuba.wide.ad.jp (ftp.tsukuba.wide.ad.jp)|203.178.132.80|:80 に接続しています... 接続しました。 HTTP による接続要求を送信しました、応答を待っています... 200 OK 長さ: 146004505 (139M) [application/x-gzip] `solr-6.3.0.tgz' に保存中 solr-6.3.0.tgz 100%[============================================================>] 139.24M 83.6MB/s in 1.7s 2016-12-19 05:58:56 (83.6 MB/s) - `solr-6.3.0.tgz' へ保存完了 [146004505/146004505] $ tar xvf solr-6.3.0.tgz solr-6.3.0/LUCENE_CHANGES.txt solr-6.3.0/contrib/analysis-extras/lib/ solr-6.3.0/contrib/clustering/lib/ solr-6.3.0/contrib/dataimporthandler-extras/lib/ solr-6.3.0/contrib/extraction/lib/ 〜略〜 solr-6.3.0/docs/solr-velocity/prettify.js solr-6.3.0/docs/solr-velocity/script.js solr-6.3.0/docs/solr-velocity/stylesheet.css
■ SolrCloudのセットアップ
ナニコレ、こんな感じでサクサク出来るのね、と。ちょっと構えてたのでビックリしました。
$ bin/solr -e cloud Welcome to the SolrCloud example! This interactive session will help you launch a SolrCloud cluster on your local workstation. To begin, how many Solr nodes would you like to run in your local cluster? (specify 1-4 nodes) [2]: 2 Ok, let's start up 2 Solr nodes for your example SolrCloud cluster. Please enter the port for node1 [8983]: 8983 Please enter the port for node2 [7574]: 7574 Creating Solr home directory /home/ec2-user/solr-6.3.0/example/cloud/node1/solr Cloning /home/ec2-user/solr-6.3.0/example/cloud/node1 into /home/ec2-user/solr-6.3.0/example/cloud/node2 Starting up Solr on port 8983 using command: bin/solr start -cloud -p 8983 -s "example/cloud/node1/solr" Waiting up to 180 seconds to see Solr running on port 8983 [|] Started Solr server on port 8983 (pid=8625). Happy searching! Starting up Solr on port 7574 using command: bin/solr start -cloud -p 7574 -s "example/cloud/node2/solr" -z localhost:9983 Waiting up to 180 seconds to see Solr running on port 7574 [] Started Solr server on port 7574 (pid=8840). Happy searching! Now let's create a new collection for indexing documents in your 2-node cluster. Please provide a name for your new collection: [gettingstarted] gettingstarted How many shards would you like to split gettingstarted into? [2] 2 How many replicas per shard would you like to create? [2] 2 Please choose a configuration for the gettingstarted collection, available options are: basic_configs, data_driven_schema_configs, or sample_techproducts_configs [data_driven_schema_configs] basic_configs Connecting to ZooKeeper at localhost:9983 ... Uploading /home/ec2-user/solr-6.3.0/server/solr/configsets/basic_configs/conf for config gettingstarted to ZooKeeper at localhost:9983 Creating new collection 'gettingstarted' using command:{ "responseHeader":{ "status":0, "QTime":5433}, "success":{ "172.31.14.210:8983_solr":{ "responseHeader":{ "status":0, "QTime":3863}, "core":"gettingstarted_shard1_replica1"}, "172.31.14.210:7574_solr":{ "responseHeader":{ "status":0, "QTime":4210}, "core":"gettingstarted_shard1_replica2"}}} Enabling auto soft-commits with maxTime 3 secs using the Config API POSTing request to Config API: http://localhost:8983/solr/gettingstarted/config {"set-property":{"updateHandler.autoSoftCommit.maxTime":"3000"}} Successfully set-property updateHandler.autoSoftCommit.maxTime to 3000 SolrCloud example running, please visit: http://localhost:8983/solrhttp://localhost:8983/solr/admin/collections?action=CREATE&name=gettingstarted&numShards=2&replicationFactor=2&maxShardsPerNode=2&collection.configName=gettingstarted
■ Javaなプロセス
フムフム、と。
$ ps -ef | grep java ec2-user 8625 1 2 06:03 pts/0 00:00:16 /usr/lib/jvm/jre/bin/java -server -Xms512m -Xmx512m -XX:NewRatio=3 -XX:SurvivorRatio=4 -XX:TargetSurvivorRatio=90 -XX:MaxTenuringThreshold=8 -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:ConcGCThreads=4 -XX:ParallelGCThreads=4 -XX:+CMSScavengeBeforeRemark -XX:PretenureSizeThreshold=64m -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=50 -XX:CMSMaxAbortablePrecleanTime=6000 -XX:+CMSParallelRemarkEnabled -XX:+ParallelRefProcEnabled -XX:-OmitStackTraceInFastThrow -verbose:gc -XX:+PrintHeapAtGC -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+PrintTenuringDistribution -XX:+PrintGCApplicationStoppedTime -Xloggc:/home/ec2-user/solr-6.3.0/example/cloud/node1/solr/../logs/solr_gc.log -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=9 -XX:GCLogFileSize=20M -DzkClientTimeout=15000 -DzkRun -Dsolr.log.dir=/home/ec2-user/solr-6.3.0/example/cloud/node1/solr/../logs -Djetty.port=8983 -DSTOP.PORT=7983 -DSTOP.KEY=solrrocks -Duser.timezone=UTC -Djetty.home=/home/ec2-user/solr-6.3.0/server -Dsolr.solr.home=/home/ec2-user/solr-6.3.0/example/cloud/node1/solr -Dsolr.install.dir=/home/ec2-user/solr-6.3.0 -Dlog4j.configuration=file:/home/ec2-user/solr-6.3.0/example/resources/log4j.properties -Xss256k -Dsolr.log.muteconsole -XX:OnOutOfMemoryError=/home/ec2-user/solr-6.3.0/bin/oom_solr.sh 8983 /home/ec2-user/solr-6.3.0/example/cloud/node1/solr/../logs -jar start.jar --module=http ec2-user 8840 1 2 06:03 pts/0 00:00:14 /usr/lib/jvm/jre/bin/java -server -Xms512m -Xmx512m -XX:NewRatio=3 -XX:SurvivorRatio=4 -XX:TargetSurvivorRatio=90 -XX:MaxTenuringThreshold=8 -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:ConcGCThreads=4 -XX:ParallelGCThreads=4 -XX:+CMSScavengeBeforeRemark -XX:PretenureSizeThreshold=64m -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=50 -XX:CMSMaxAbortablePrecleanTime=6000 -XX:+CMSParallelRemarkEnabled -XX:+ParallelRefProcEnabled -XX:-OmitStackTraceInFastThrow -verbose:gc -XX:+PrintHeapAtGC -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+PrintTenuringDistribution -XX:+PrintGCApplicationStoppedTime -Xloggc:/home/ec2-user/solr-6.3.0/example/cloud/node2/solr/../logs/solr_gc.log -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=9 -XX:GCLogFileSize=20M -DzkClientTimeout=15000 -DzkHost=localhost:9983 -Dsolr.log.dir=/home/ec2-user/solr-6.3.0/example/cloud/node2/solr/../logs -Djetty.port=7574 -DSTOP.PORT=6574 -DSTOP.KEY=solrrocks -Duser.timezone=UTC -Djetty.home=/home/ec2-user/solr-6.3.0/server -Dsolr.solr.home=/home/ec2-user/solr-6.3.0/example/cloud/node2/solr -Dsolr.install.dir=/home/ec2-user/solr-6.3.0 -Dlog4j.configuration=file:/home/ec2-user/solr-6.3.0/example/resources/log4j.properties -Xss256k -Dsolr.log.muteconsole -XX:OnOutOfMemoryError=/home/ec2-user/solr-6.3.0/bin/oom_solr.sh 7574 /home/ec2-user/solr-6.3.0/example/cloud/node2/solr/../logs -jar start.jar --module=http ec2-user 9064 2795 0 06:13 pts/0 00:00:00 grep --color=auto java
■ Solrの諸々確認
$ bin/solr status Found 2 Solr nodes: Solr process 8625 running on port 8983 { "solr_home":"/home/ec2-user/solr-6.3.0/example/cloud/node1/solr", "version":"6.3.0 a66a44513ee8191e25b477372094bfa846450316 - shalin - 2016-11-02 19:52:42", "startTime":"2016-12-19T06:03:16.565Z", "uptime":"0 days, 0 hours, 12 minutes, 1 seconds", "memory":"60.1 MB (%12.2) of 490.7 MB", "cloud":{ "ZooKeeper":"localhost:9983", "liveNodes":"2", "collections":"1"}} Solr process 8840 running on port 7574 { "solr_home":"/home/ec2-user/solr-6.3.0/example/cloud/node2/solr", "version":"6.3.0 a66a44513ee8191e25b477372094bfa846450316 - shalin - 2016-11-02 19:52:42", "startTime":"2016-12-19T06:03:20.397Z", "uptime":"0 days, 0 hours, 11 minutes, 57 seconds", "memory":"76.8 MB (%15.7) of 490.7 MB", "cloud":{ "ZooKeeper":"localhost:9983", "liveNodes":"2", "collections":"1"}} [ec2-user@ip-172-31-14-210 solr-6.3.0]$ bin/solr healthcheck -c gettingstarted { "collection":"gettingstarted", "status":"healthy", "numDocs":0, "numShards":2, "shards":[ { "shard":"shard1", "status":"healthy", "replicas":[ { "name":"core_node3", "url":"http://172.31.14.210:8983/solr/gettingstarted_shard1_replica1/", "numDocs":0, "status":"active", "uptime":"0 days, 0 hours, 12 minutes, 18 seconds", "memory":"63.5 MB (%12.9) of 490.7 MB", "leader":true}, { "name":"core_node4", "url":"http://172.31.14.210:7574/solr/gettingstarted_shard1_replica2/", "numDocs":0, "status":"active", "uptime":"0 days, 0 hours, 12 minutes, 14 seconds", "memory":"79 MB (%16.1) of 490.7 MB"}]}, { "shard":"shard2", "status":"healthy", "replicas":[ { "name":"core_node1", "url":"http://172.31.14.210:8983/solr/gettingstarted_shard2_replica1/", "numDocs":0, "status":"active", "uptime":"0 days, 0 hours, 12 minutes, 18 seconds", "memory":"63.9 MB (%13) of 490.7 MB", "leader":true}, { "name":"core_node2", "url":"http://172.31.14.210:7574/solr/gettingstarted_shard2_replica2/", "numDocs":0, "status":"active", "uptime":"0 days, 0 hours, 12 minutes, 14 seconds", "memory":"79.4 MB (%16.2) of 490.7 MB"}]}]}
EC2にSparkをインストール
■ Sparkをダウンロード
$ wget http://d3kbcqa49mib13.cloudfront.net/spark-2.0.2-bin-hadoop2.7.tgz --2016-12-19 06:21:44-- http://d3kbcqa49mib13.cloudfront.net/spark-2.0.2-bin-hadoop2.7.tgz d3kbcqa49mib13.cloudfront.net (d3kbcqa49mib13.cloudfront.net) をDNSに問いあわせています... 54.192.233.162, 54.192.233.191, 54.192.233.204, ... d3kbcqa49mib13.cloudfront.net (d3kbcqa49mib13.cloudfront.net)|54.192.233.162|:80 に接続しています... 接続しました。 HTTP による接続要求を送信しました、応答を待っています... 200 OK 長さ: 187426587 (179M) [application/x-tar] `spark-2.0.2-bin-hadoop2.7.tgz' に保存中 spark-2.0.2-bin-hadoop2.7.tgz 100%[============================================================>] 178.74M 113MB/s in 1.6s 2016-12-19 06:21:46 (113 MB/s) - `spark-2.0.2-bin-hadoop2.7.tgz' へ保存完了 [187426587/187426587]
■ Sparkのアーカイブを解凍して移動
$ tar xvf spark-2.0.2-bin-hadoop2.7.tgz spark-2.0.2-bin-hadoop2.7/ spark-2.0.2-bin-hadoop2.7/NOTICE spark-2.0.2-bin-hadoop2.7/jars/ 〜略〜 park-2.0.2-bin-hadoop2.7/yarn/ spark-2.0.2-bin-hadoop2.7/yarn/spark-2.0.2-yarn-shuffle.jar spark-2.0.2-bin-hadoop2.7/README.md $ cd spark-2.0.2-bin-hadoop2.7
■ Pythonでホゲホゲ
$ bin/pyspark Python 2.7.12 (default, Sep 1 2016, 22:14:00) [GCC 4.8.3 20140911 (Red Hat 4.8.3-9)] on linux2 Type "help", "copyright", "credits" or "license" for more information. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). 16/12/19 06:24:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Welcome to ____ __ / __/__ ___ _____/ /__ _ / _ / _ `/ __/ '_/ /__ / .__/_,_/_/ /_/_ version 2.0.2 /_/ Using Python version 2.7.12 (default, Sep 1 2016 22:14:00) SparkSession available as 'spark'. >>> textFile = sc.textFile("README.md") >>> textFile.count() 99 >>> textFile.first() u'# Apache Spark'
spark-solrのGetting Started
■ spark-solrのパッケージをインストール → ★失敗★
$ bin/spark-shell --packages "com.lucidworks.spark:spark-solr:3.0.0-alpha" Ivy Default Cache set to: /home/ec2-user/.ivy2/cache The jars for the packages stored in: /home/ec2-user/.ivy2/jars :: loading settings :: url = jar:file:/home/ec2-user/spark-2.0.2-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml com.lucidworks.spark#spark-solr added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0 confs: [default] found com.lucidworks.spark#spark-solr;3.0.0-alpha in central 〜略〜 ==== spark-packages: tried http://dl.bintray.com/spark-packages/maven/org/restlet/jee/org.restlet.ext.servlet/2.3.0/org.restlet.ext.servlet-2.3.0.pom -- artifact org.restlet.jee#org.restlet.ext.servlet;2.3.0!org.restlet.ext.servlet.jar: http://dl.bintray.com/spark-packages/maven/org/restlet/jee/org.restlet.ext.servlet/2.3.0/org.restlet.ext.servlet-2.3.0.jar :::::::::::::::::::::::::::::::::::::::::::::: :: UNRESOLVED DEPENDENCIES :: :::::::::::::::::::::::::::::::::::::::::::::: :: org.restlet.jee#org.restlet;2.3.0: not found :: org.restlet.jee#org.restlet.ext.servlet;2.3.0: not found :::::::::::::::::::::::::::::::::::::::::::::: $ echo $? 1
■ 個別にrestletやりますかね、、と。
mavenを自前でインストールして、↓のようなpom.xmlを作って、 mvn dependency:copy-dependencies でローカルの.m2ディレクトリに依存ライブラリを持ってきます。
(ホントはこういうことやりたくないッスけどね、、、)
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.shinodogg</groupId> <artifactId>sample</artifactId> <packaging>jar</packaging> <version>1.0-SNAPSHOT</version> <name>Maven Quick Start Archetype</name> <repositories> <repository> <id>maven-restlet</id> <name>Restlet repository</name> <url>https://maven.restlet.com</url> </repository> </repositories> <properties> <restlet-version>2.3.0</restlet-version> </properties> <dependencies> <dependency> <groupId>org.restlet.jee</groupId> <artifactId>org.restlet.ext.servlet</artifactId> <version>${restlet-version}</version> </dependency> <dependency> <groupId>org.restlet.jee</groupId> <artifactId>org.restlet</artifactId> <version>${restlet-version}</version> </dependency> <dependency> <groupId>org.restlet.jse</groupId> <artifactId>org.restlet</artifactId> <version>${restlet-version}</version> </dependency> <dependency> <groupId>org.restlet.jse</groupId> <artifactId>org.restlet.ext.jackson</artifactId> <version>${restlet-version}</version> </dependency> </dependencies> </project>
■ spark-shell –packages “com.lucidworks.spark:spark-solr:3.0.0-alpha” を再チャレンジ⇒★無事起動!★
$ bin/spark-shell --packages "com.lucidworks.spark:spark-solr:3.0.0-alpha" Ivy Default Cache set to: /home/ec2-user/.ivy2/cache The jars for the packages stored in: /home/ec2-user/.ivy2/jars :: loading settings :: url = jar:file:/home/ec2-user/spark-2.0.2-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml com.lucidworks.spark#spark-solr added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0 confs: [default] found com.lucidworks.spark#spark-solr;3.0.0-alpha in central 〜略〜 --------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 173 | 2 | 2 | 24 || 149 | 1 | --------------------------------------------------------------------- :: retrieving :: org.apache.spark#spark-submit-parent confs: [default] 149 artifacts copied, 0 already retrieved (79674kB/104ms) Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). 2016-12-19 07:36:40,971 [main] WARN NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2016-12-19 07:36:41,955 [main] WARN SparkContext - Use an existing SparkContext, some configuration may not take effect. Spark context Web UI available at http://172.31.14.210:4040 Spark context available as 'sc' (master = local[*], app id = local-1482133001755). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _ / _ / _ `/ __/ '_/ /___/ .__/_,_/_/ /_/_ version 2.0.2 /_/ Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_111) Type in expressions to have them evaluated. Type :help for more information.
■ SparkからSolrへアクセス
コレクション名とZooKeeperのパスを設定
scala> val options = Map( | "collection" -> "gettingstarted", | "zkhost" -> "localhost:9983" | ) options: scala.collection.immutable.Map[String,String] = Map(collection -> gettingstarted, zkhost -> localhost:9983)
いよいよメインイベント…。⇒失敗…??
scala> val df = spark.read.format("solr").options(options).load 2016-12-19 07:47:26,557 [main] WARN ObjectStore - Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0 2016-12-19 07:47:26,651 [main] WARN ObjectStore - Failed to get database default, returning NoSuchObjectException 2016-12-19 07:47:27,479 [main] ERROR RetryingHMSHandler - AlreadyExistsException(message:Database default already exists) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_database(HiveMetaStore.java:891) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 〜略〜 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) df: org.apache.spark.sql.DataFrame = [*_ancestor_path: string, *_txt_hi: string ... 72 more fields]
※ 一旦、そのまま進んでみます!
ってことで今動いているSolrをシャットダウンします。
$ bin/solr stop -all Sending stop command to Solr running on port 8983 ... waiting up to 180 seconds to allow Jetty process 8625 to stop gracefully. Sending stop command to Solr running on port 7574 ... waiting up to 180 seconds to allow Jetty process 8840 to stop gracefully.
Indexing and Querying NYC yellow taxi csv data
↓に沿ってやっていきます。
https://github.com/lucidworks/spark-solr/blob/master/docs/examples/csv.adoc
■ 新しくSolrのコレクションを作ります。
$ bin/solr -c && bin/solr create -c test-spark-solr -shards 2 Waiting up to 180 seconds to see Solr running on port 8983 [] Started Solr server on port 8983 (pid=32395). Happy searching! Connecting to ZooKeeper at localhost:9983 ... Uploading /home/ec2-user/solr-6.3.0/server/solr/configsets/data_driven_schema_configs/conf for config test-spark-solr to ZooKeeper at localhost:9983 Creating new collection 'test-spark-solr' using command:{ "responseHeader":{ "status":0, "QTime":3759}, "success":{"172.31.14.210:8983_solr":{ "responseHeader":{ "status":0, "QTime":2552}, "core":"test-spark-solr_shard1_replica1"}}}http://localhost:8983/solr/admin/collections?action=CREATE&name=test-spark-solr&numShards=2&replicationFactor=1&maxShardsPerNode=2&collection.configName=test-spark-solr
■ ダウンロードしてきたCSVファイルをDataFrameとして読み込んで整形
scala> val csvFileLocation = "/home/ec2-user/nyc_yellow_taxi_sample_1k.csv" csvFileLocation: String = /home/ec2-user/nyc_yellow_taxi_sample_1k.csv scala> var csvDF = spark.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load(csvFileLocation) csvDF: org.apache.spark.sql.DataFrame = [vendor_id: int, pickup_datetime: timestamp ... 17 more fields] scala> csvDF = csvDF.filter("pickup_latitude >= -90 AND pickup_latitude <= 90 AND pickup_longitude >= -180 AND pickup_longitude <= 180") csvDF: org.apache.spark.sql.DataFrame = [vendor_id: int, pickup_datetime: timestamp ... 17 more fields] scala> csvDF = csvDF.filter("dropoff_latitude >= -90 AND dropoff_latitude <= 90 AND dropoff_longitude >= -180 AND dropoff_longitude <= 180") csvDF: org.apache.spark.sql.DataFrame = [vendor_id: int, pickup_datetime: timestamp ... 17 more fields] scala> csvDF = csvDF.withColumn("pickup", concat_ws(",", col("pickup_latitude"),col("pickup_longitude"))).drop("pickup_latitude").drop("pickup_longitude") csvDF: org.apache.spark.sql.DataFrame = [vendor_id: int, pickup_datetime: timestamp ... 16 more fields] scala> csvDF = csvDF.withColumn("dropoff", concat_ws(",", col("dropoff_latitude"),col("dropoff_longitude"))).drop("dropoff_latitude").drop("dropoff_longitude") csvDF: org.apache.spark.sql.DataFrame = [vendor_id: int, pickup_datetime: timestamp ... 15 more fields]
■ DataFrameをSolrに書き込み
scala> val options = Map( | "zkhost" -> "localhost:9983", | "collection" -> "test-spark-solr", | "gen_uniq_key" -> "true" | ) options: scala.collection.immutable.Map[String,String] = Map(zkhost -> localhost:9983, collection -> test-spark-solr, gen_uniq_key -> true) scala> csvDF.write.format("solr").options(options).mode(org.apache.spark.sql.SaveMode.Overwrite).save
■ Solrからデータをテーブルとして読み込んでSQLを実行
一時テーブルとして設定(なんかwarning出てるけど…)
scala> val options = Map( | "zkHost" -> "localhost:9983", | "collection" -> "test-spark-solr" | ) options: scala.collection.immutable.Map[String,String] = Map(zkHost -> localhost:9983, collection -> test-spark-solr) scala> val df = spark.read.format("solr").options(options).load df: org.apache.spark.sql.DataFrame = [*_ancestor_path: string, *_txt_hi: string ... 93 more fields] scala> df.printSchema() root |-- *_ancestor_path: string (nullable = true) |-- *_txt_hi: string (nullable = true) |-- *_ws: string (nullable = true) |-- *_txt_da: string (nullable = true) |-- improvement_surcharge: double (nullable = true) |-- *_fs: double (nullable = true) |-- *_bs: boolean (nullable = true) |-- random_*: string (nullable = true) |-- *_txt_cjk: string (nullable = true) |-- *_tis: long (nullable = true) |-- *_txt_en_split: string (nullable = true) |-- *_phon_en: string (nullable = true) |-- *_txt_tr: string (nullable = true) |-- *_txt_eu: string (nullable = true) |-- vendor_id: long (nullable = true) |-- *_srpt: string (nullable = true) |-- *_ls: long (nullable = true) |-- *_ss: string (nullable = true) |-- trip_distance: double (nullable = true) |-- *_txt_el: string (nullable = true) |-- dropoff_longitude: double (nullable = true) |-- *_txt_ro: string (nullable = true) |-- *_b: boolean (nullable = true) |-- *_tdts: timestamp (nullable = true) |-- pickup_latitude: double (nullable = true) |-- *_txt_hu: string (nullable = true) |-- *_txt_en_split_tight: string (nullable = true) |-- *_txt_hy: string (nullable = true) |-- *_s: string (nullable = true) |-- *_is: long (nullable = true) |-- *_txt_fi: string (nullable = true) |-- *_dt: timestamp (nullable = true) |-- *_txt_nl: string (nullable = true) |-- *_tf: double (nullable = true) |-- *_txt_bg: string (nullable = true) |-- *_txt_cz: string (nullable = true) |-- tolls_amount: double (nullable = true) |-- *_txt_rev: string (nullable = true) |-- *_txt_id: string (nullable = true) |-- *_txt_it: string (nullable = true) |-- *_txt_th: string (nullable = true) |-- *_txt_fa: string (nullable = true) |-- *_tfs: double (nullable = true) |-- tip_amount: double (nullable = true) |-- *_txt_en: string (nullable = true) |-- id: string (nullable = false) |-- *_c: string (nullable = true) |-- pickup: string (nullable = true) |-- *_tdt: timestamp (nullable = true) |-- payment_type: long (nullable = true) |-- *_txt_ga: string (nullable = true) |-- *_txt_no: string (nullable = true) |-- *_descendent_path: string (nullable = true) |-- fare_amount: double (nullable = true) |-- *_txt_es: string (nullable = true) |-- pickup_longitude: double (nullable = true) |-- *_txt_ja: string (nullable = true) |-- *_t: string (nullable = true) |-- attr_*: string (nullable = true) |-- passenger_count: long (nullable = true) |-- dropoff: string (nullable = true) |-- *_txt: string (nullable = true) |-- *_txt_gl: string (nullable = true) |-- *_ds: double (nullable = true) |-- *_f: double (nullable = true) |-- *_ti: long (nullable = true) |-- store_and_fwd_flag: string (nullable = true) |-- *_l: long (nullable = true) |-- extra: double (nullable = true) |-- *_tl: long (nullable = true) |-- *_p: string (nullable = true) |-- *_txt_ca: string (nullable = true) |-- *_tds: double (nullable = true) |-- *_s_lower: string (nullable = true) |-- *_tls: long (nullable = true) |-- dropoff_datetime: timestamp (nullable = true) |-- _version_: long (nullable = true) |-- *_point: string (nullable = true) |-- *_d: double (nullable = true) |-- *_dts: timestamp (nullable = true) |-- *_txt_lv: string (nullable = true) |-- rate_code_id: long (nullable = true) |-- *_txt_de: string (nullable = true) |-- total_amount: double (nullable = true) |-- *_coordinate: double (nullable = true) |-- pickup_datetime: timestamp (nullable = true) |-- *_txt_ar: string (nullable = true) |-- dropoff_latitude: double (nullable = true) |-- *_td: double (nullable = true) |-- *_txt_ru: string (nullable = true) |-- mta_tax: double (nullable = true) |-- *_i: long (nullable = true) |-- *_txt_fr: string (nullable = true) |-- *_txt_sv: string (nullable = true) |-- *_txt_pt: string (nullable = true) scala> df.registerTempTable("trips") warning: there was one deprecation warning; re-run with -deprecation for details
諸々importしてSQLContext作って(コレもなんかメッセージ出てるけど、、)、DataFrameを効率化のためにキャッシュした後、SQLを実行!!
scala> import org.apache.spark.SparkContext import org.apache.spark.SparkContex scala> import org.apache.spark.sql.SQLContext import org.apache.spark.sql.SQLContext scala> val conf = new SparkConf().setAppName("Spark SQL from MySQL").setMaster("local[*]") conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@1eb207c3 scala> val sc = new SparkContext(conf) org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at: 〜略〜 at org.apache.spark.SparkContext.<init>(SparkContext.scala:86) ... 48 elided scala> val sqlContext = new SQLContext(sc) warning: there was one deprecation warning; re-run with -deprecation for details sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@1cd853ee val conf = new SparkConf().setAppName("Spark SQL from MySQL").setMaster("local[*]") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) scala> sqlContext.sql("SELECT avg(tip_amount), avg(fare_amount) FROM trips").show() +------------------+------------------+ | avg(tip_amount)| avg(fare_amount)| +------------------+------------------+ |1.5674853801169588|11.802144249512668| +------------------+------------------+ scala> sqlContext.sql("SELECT max(tip_amount), max(fare_amount) FROM trips WHERE trip_distance > 10").show() +---------------+----------------+ |max(tip_amount)|max(fare_amount)| +---------------+----------------+ | 16.44| 68.0| +---------------+----------------+
なんか結果が csv.adoc と違う!けど、まぁ操作感は掴めました。これならマスター的に読み込んだSpark上のデータとSolrのデータをjoinしてキャッキャウフフできそうです(*´∀`*)
近日、Solrの新しい日本語の本が出るそうです!こちらも乞うご期待 🙂
↓にはお世話になりました〜(´▽`)ノ
技術評論社
売り上げランキング: 27,813
コメント