您好,欢迎来到宝玛科技网。
搜索
您的当前位置:首页Learning Spark

Learning Spark

来源:宝玛科技网
LearningSpark

HoldenKarauAndyKonwinskiPatrickWendellMateiZahariaBeijing•Cambridge•Farnham•Köln•Sebastopol•Tokyo2TableofContentsPreface................................................................................................................................................5Audience................................................................................................................................................5HowThisBookisOrganized..............................................................................................................6SupportingBooks.................................................................................................................................6CodeExamples.....................................................................................................................................7EarlyReleaseStatusandFeedback...................................................................................................7Chapter1.IntroductiontoDataAnalysiswithSpark......................................................8WhatisApacheSpark?.......................................................................................................................8AUnifiedStack.....................................................................................................................................8WhoUsesSpark,andForWhat?......................................................................................................11ABriefHistoryofSpark....................................................................................................................13SparkVersionsandReleases............................................................................................................13SparkandHadoop.............................................................................................................................14Chapter2.DownloadingandGettingStarted...................................................................15DownloadingSpark............................................................................................................................15IntroductiontoSpark’sPythonandScalaShells..........................................................................16IntroductiontoCoreSparkConcepts.............................................................................................20StandaloneApplications...................................................................................................................23Conclusion..........................................................................................................................................25Chapter3.ProgrammingwithRDDs...................................................................................26RDDBasics.........................................................................................................................................26CreatingRDDs...................................................................................................................................28RDDOperations................................................................................................................................28PassingFunctionstoSpark..............................................................................................................32CommonTransformationsandActions.........................................................................................36Persistence(Caching)........................................................................................................................46Conclusion..........................................................................................................................................48Chapter4.WorkingwithKey-ValuePairs.........................................................................49

3Motivation..........................................................................................................................................49CreatingPairRDDs...........................................................................................................................49TransformationsonPairRDDs.......................................................................................................50ActionsAvailableonPairRDDs......................................................................................................60DataPartitioning................................................................................................................................61Conclusion..........................................................................................................................................70Chapter5.LoadingandSavingYourData..........................................................................71Motivation...........................................................................................................................................71ChoosingaFormat.............................................................................................................................71Formats...............................................................................................................................................72FileSystems........................................................................................................................................88Compression.......................................................................................................................................Databases............................................................................................................................................91Conclusion..........................................................................................................................................93AbouttheAuthors........................................................................................................................95

4Preface

Asparalleldataanalysishasbecomeincreasinglycommon,practitionersinmanyfieldshavesoughteasiertoolsforthistask.ApacheSparkhasquicklyemergedasoneofthemostpopulartoolsforthispurpose,extendingandgeneralizingMapReduce.Sparkoffersthreemainbenefits.First,itiseasytouse—youcandevelopapplicationsonyourlaptop,usingahigh-levelAPIthatletsyoufocusonthecontentofyourcomputation.Second,Sparkisfast,enablinginteractiveuseandcomplexalgorithms.Andthird,Sparkisageneralengine,allowingyoutocombinemultipletypesofcomputations(e.g.,SQLqueries,textprocessingandmachinelearning)thatmightpreviouslyhaverequiredlearningdifferentengines.ThesefeaturesmakeSparkanexcellentstartingpointtolearnaboutbigdataingeneral.

ThisintroductorybookismeanttogetyouupandrunningwithSparkquickly.You’lllearnhowtolearnhowtodownloadandrunSparkonyourlaptopanduseitinteractivelytolearntheAPI.Oncethere,we’llcoverthedetailsofavailableoperationsanddistributedexecution.Finally,you’llgetatourofthehigher-levellibrariesbuiltintoSpark,includinglibrariesformachinelearning,streamprocessing,graphanalyticsandSQL.Wehopethatthisbookgivesyouthetoolstoquicklytackledataanalysisproblems,whetheryoudosoononemachineorhundreds.

Audience

ThisbooktargetsDataScientistsandEngineers.WechosethesetwogroupsbecausetheyhavethemosttogainfromusingSparktoexpandthescopeofproblemstheycansolve.Spark’srichcollectionofdatafocusedlibraries(likeMLlib)makeiteasyfordatascientiststogobeyondproblemsthatfitonsinglemachinewhilemakinguseoftheirstatisticalbackground.Engineers,meanwhile,willlearnhowtowritegeneral-purposedistributedprogramsinSparkandoperateproductionapplications.Engineersanddatascientistswillbothlearndifferentdetailsfromthisbook,butwillbothbeabletoapplySparktosolvelargedistributedproblemsintheirrespectivefields.

Datascientistsfocusonansweringquestionsorbuildingmodelsfromdata.TheyoftenhaveastatisticalormathbackgroundandsomefamiliaritywithtoolslikePython,RandSQL.WehavemadesuretoincludePython,andwhereverpossibleSQL,examplesforallourmaterial,aswellasanoverviewofthemachinelearningandadvancedanalyticslibrariesinSpark.Ifyouareadatascientist,wehopethatafterreadingthisbookyouwillbeabletousethesamemathematicalapproachestosolvingproblems,exceptmuchfasterandonamuchlargerscale.

5ThesecondgroupthisbooktargetsissoftwareengineerswhohavesomeexperiencewithJava,Pythonoranotherprogramminglanguage.Ifyouareanengineer,wehopethatthisbookwillshowyouhowtosetupaSparkcluster,usetheSparkshell,andwriteSparkapplicationstosolveparallelprocessingproblems.IfyouarefamiliarwithHadoop,youhaveabitofaheadstartonfiguringouthowtointeractwithHDFSandhowtomanageacluster,buteitherway,wewillcoverbasicdistributedexecutionconcepts.

Regardlessofwhetheryouareadataanalystorengineer,togetthemostofthisbookyoushouldhavesomefamiliaritywithoneofPython,Java,Scala,orasimilarlanguage.Weassumethatyoualreadyhaveasolutionforstoringyourdataandwecoverhowtoloadandsavedatafrommanycommonones,butnothowtosetthemup.Ifyoudon’thaveexperiencewithoneofthoselanguages,don’tworry,thereareexcellentresourcesavailabletolearnthese.WecalloutsomeofthebooksavailableinSupportingBooks.

HowThisBookisOrganized

Thechaptersofthisbookarelaidoutinsuchawaythatyoushouldbeabletogothroughthematerialfronttoback.Atthestartofeachchapter,wewillmentionwhichsectionsofthechapterwethinkaremostrelevanttodatascientistsandwhichsectionswethinkaremostrelevantforengineers.Thatsaid,wehopethatallthematerialisaccessibletoreadersofeitherbackground.

ThefirsttwochapterswillgetyoustartedwithgettingabasicSparkinstallationonyourlaptopandgiveyouanideaofwhatyoucanaccomplishwithApacheSpark.Oncewe’vegotthemotivationandsetupoutoftheway,wewilldiveintotheSparkShell,averyusefultoolfordevelopmentandprototyping.SubsequentchaptersthencovertheSparkprogramminginterfaceindetail,howapplicationsexecuteonacluster,andhigher-levellibrariesavailableonSparksuchasSparkSQLandMLlib.

SupportingBooks

Ifyouareadatascientistanddon’thavemuchexperiencewithPython,theLearningPythonbookisanexcellentintroduction.

Ifyouareanengineerandafterreadingthisbookyouwouldliketoexpandyourdataanalysisskills,MachineLearningforHackersandDoingDataScienceareexcellentbooksfromO’Reilly.Thisbookisintendedtobeaccessibletobeginners.Wedointendtoreleaseadeepdivefollow-upforthoselookingtogainamorethoroughunderstandingofSpark’sinternals.

6CodeExamples

AllofthecodeexamplesfoundinthisbookareonGitHub.Youcanexaminethemandcheckthemoutfromhttps://github.com/databricks/learning-spark.CodeexamplesareprovidedinJava,Scala,andPython.

Tip

OurJavaexamplesarewrittentoworkwithJavaversion6andhigher.Java8introducesanewsyntaxcalled“lambdas”thatmakeswritinginlinefunctionsmucheasier,whichcansimplifySparkcode.Wehavechosennottotakeadvantageofthissyntaxinmostofourexamples,asmostorganizationsarenotyetusingJava8.IfyouwouldliketotryJava8syntax,youcanseetheDatabricksblogpostonthistopic.

EarlyReleaseStatusandFeedback

ThisisanearlyreleasecopyofLearningSpark,andassuchwearestillworkingonthetext,addingcodeexamples,andwritingsomeofthelaterchapters.Althoughwehopethatthebookisusefulinitscurrentform,wewouldgreatlyappreciateyourfeedbacksowecanimproveitandmakethebestpossiblefinishedproduct.Theauthorsandeditorscanbereachedatbook-feedback@databricks.com.

Theauthorswouldliketothankthereviewerswhoofferedfeedbacksofar:JulietHougland,AndrewGal,MichaelGregson,StephanJou,JoshMahonin,andMikePatterson.

7Chapter1.IntroductiontoDataAnalysiswithSpark

ThischapterprovidesahighleveloverviewofwhatApacheSparkis.IfyouarealreadyfamiliarwithApacheSparkanditscomponents,feelfreetojumpaheadtoChapter2.

WhatisApacheSpark?

ApacheSparkisaclustercomputingplatformdesignedtobefastandgeneral-purpose.

Onthespeedside,SparkextendsthepopularMapReducemodeltoefficientlysupportmoretypesofcomputations,includinginteractivequeriesandstreamprocessing.Speedisimportantinprocessinglargedatasetsasitmeansthedifferencebetweenexploringdatainteractivelyandwaitingminutesbetweenqueries,orwaitinghourstorunyourprogramversusminutes.OneofthemainfeaturesSparkoffersforspeedistheabilitytoruncomputationsinmemory,butthesystemisalsofasterthanMapReduceforcomplexapplicationsrunningondisk.

Onthegeneralityside,Sparkisdesignedtocoverawiderangeofworkloadsthatpreviouslyrequiredseparatedistributedsystems,includingbatchapplications,iterativealgorithms,interactivequeriesandstreaming.Bysupportingtheseworkloadsinthesameengine,Sparkmakesiteasyandinexpensivetocombinedifferentprocessingtypes,whichisoftennecessaryinproductiondataanalysispipelines.Inaddition,itreducesthemanagementburdenofmaintainingseparatetools.

Sparkisdesignedtobehighlyaccessible,offeringsimpleAPIsinPython,Java,ScalaandSQL,andrichbuilt-inlibraries.Italsointegratescloselywithotherbigdatatools.Inparticular,SparkcanruninHadoopclustersandaccessanyHadoopdatasource.

AUnifiedStack

TheSparkprojectcontainsmultipleclosely-integratedcomponents.Atitscore,Sparkisa“computationalengine”thatisresponsibleforscheduling,distributing,andmonitoringapplicationsconsistingofmanycomputationaltasksacrossmanyworkermachines,oracomputingcluster.BecausethecoreengineofSparkisbothfastandgeneral-purpose,itpowersmultiplehigher-levelcomponentsspecializedforvariousworkloads,suchasSQLormachinelearning.Thesecomponentsaredesignedtointeroperateclosely,lettingyoucombinethemlikelibrariesinasoftwareproject.

8Aphilosophyoftightintegrationhasseveralbenefits.First,alllibrariesandhigherlevelcomponentsinthestackbenefitfromimprovementsatthelowerlayers.Forexample,whenSpark’scoreengineaddsanoptimization,SQLandmachinelearninglibrariesautomaticallyspeedupaswell.Second,thecostsassociatedwithrunningthestackareminimized,becauseinsteadofrunning5-10independentsoftwaresystems,anorganizationonlyneedstorunone.Thesecostsincludedeployment,maintenance,testing,support,andmore.ThisalsomeansthateachtimeanewcomponentisaddedtotheSparkstack,everyorganizationthatusesSparkwillimmediatelybeabletotrythisnewcomponent.Thischangesthecostoftryingoutanewtypeofdataanalysisfromdownloading,deploying,andlearninganewsoftwareprojecttoupgradingSpark.

Finally,oneofthelargestadvantagesoftightintegrationistheabilitytobuildapplicationsthatseamlesslycombinedifferentprocessingmodels.Forexample,inSparkyoucanwriteoneapplicationthatusesmachinelearningtoclassifydatainrealtimeasitisingestedfromstreamingsources.Simultaneouslyanalystscanquerytheresultingdata,alsoinreal-time,viaSQL,e.g.tojointhedatawithunstructuredlogfiles.Inaddition,moresophisticateddataengineerscanaccessthesamedataviathePythonshellforad-hocanalysis.Othersmightaccessthedatainstandalonebatchapplications.Allthewhile,theITteamonlyhastomaintainonesoftwarestack.

Figure1-1.TheSparkStackHerewewillbrieflyintroduceeachofthecomponentsshowninFigure1-1.

9SparkCore

SparkCorecontainsthebasicfunctionalityofSpark,includingcomponentsfortaskscheduling,memorymanagement,faultrecovery,interactingwithstoragesystems,andmore.SparkCoreisalsohometotheAPIthatdefinesResilientDistributedDatasets(RDDs),whichareSpark’smainprogrammingabstraction.RDDsrepresentacollectionofitemsdistributedacrossmanycomputenodesthatcanbemanipulatedinparallel.SparkCoreprovidesmanyAPIsforbuildingandmanipulatingthesecollections.

SparkSQL

SparkSQLprovidessupportforinteractingwithSparkviaSQLaswellastheApacheHivevariantofSQL,calledtheHiveQueryLanguage(HiveQL).SparkSQLrepresentsdatabasetablesasSparkRDDsandtranslatesSQLqueriesintoSparkoperations.BeyondprovidingtheSQLinterfacetoSpark,SparkSQLallowsdeveloperstointermixSQLquerieswiththeprogrammaticdatamanipulationssupportedbyRDDsinPython,JavaandScala,allwithinasingleapplication.ThistightintegrationwiththetherichandsophisticatedcomputingenvironmentprovidedbytherestoftheSparkstackmakesSparkSQLunlikeanyotheropensourcedatawarehousetool.SparkSQLwasaddedtoSparkinversion1.0.

SharkisaprojectoutofUCBerkeleythatpre-datesSparkSQLandisbeingportedtoworkontopofSparkSQL.SharkprovidesadditionalfunctionalitysothatSparkcanactasdrop-inreplacementforApacheHive.ThisincludesaHiveQLshell,aswellasaJDBCserverthatmakesiteasytoconnectexternalgraphinganddataexplorationtools.

SparkStreaming

SparkStreamingisaSparkcomponentthatenablesprocessinglivestreamsofdata.Examplesofdatastreamsincludelogfilesgeneratedbyproductionwebservers,orqueuesofmessagescontainingstatusupdatespostedbyusersofawebservice.SparkStreamingprovidesanAPIformanipulatingdatastreamsthatcloselymatchestheSparkCore’sRDDAPI,makingiteasyforprogrammerstolearntheprojectandmovebetweenapplicationsthatmanipulatedatastoredinmemory,ondisk,orarrivinginreal-time.UnderneathitsAPI,SparkStreamingwasdesignedtoprovidethesamedegreeoffaulttolerance,throughput,andscalabilitythattheSparkCoreprovides.

MLlib

Sparkcomeswithalibrarycontainingcommonmachinelearning(ML)functionalitycalledMLlib.MLlibprovidesmultipletypesofmachinelearningalgorithms,includingbinaryclassification,regression,clusteringandcollaborativefiltering,aswellassupporting

10functionalitysuchasmodelevaluationanddataimport.ItalsoprovidessomelowerlevelMLprimitivesincludingagenericgradientdescentoptimizationalgorithm.Allofthesemethodsaredesignedtoscaleoutacrossacluster.

GraphX

GraphXisalibraryaddedinSpark0.9thatprovidesanAPIformanipulatinggraphs(e.g.,asocialnetwork’sfriendgraph)andperforminggraph-parallelcomputations.LikeSparkStreamingandSparkSQL,GraphXextendstheSparkRDDAPI,allowingustocreateadirectedgraphwitharbitrarypropertiesattachedtoeachvertexandedge.GraphXalsoprovidessetofoperatorsformanipulatinggraphs(e.g.,subgraphandmapVertices)andalibraryofcommongraphalgorithms(e.g.,PageRankandtrianglecounting).

ClusterManagers

Underthehood,Sparkisdesignedtoefficientlyscaleupfromonetomanythousandsofcomputenodes.Toachievethiswhilemaximizingflexibility,Sparkcanrunoveravarietyofclustermanagers,includingincludingHadoopYARN,ApacheMesos,andasimpleclustermanagerincludedinSparkitselfcalledtheStandaloneScheduler.IfyouarejustinstallingSparkonanemptysetofmachines,theStandaloneSchedulerprovidesaneasywaytogetstarted;whileifyoualreadyhaveaHadoopYARNorMesoscluster,Spark’ssupportfortheseallowsyourapplicationstoalsorunonthem.

WhoUsesSpark,andForWhat?

BecauseSparkisageneralpurposeframeworkforclustercomputing,itisusedforadiverserangeofapplications.InthePrefaceweoutlinedtwopersonasthatthisbooktargetsasreaders:DataScientistsandEngineers.Let’stakeacloserlookateachofthesepersonasandhowtheyuseSpark.Unsurprisingly,thetypicalusecasesdifferacrossthetwopersonas,butwecanroughlyclassifythemintotwocategories,datascienceanddataapplications.

Ofcourse,theseareimprecisepersonasandusagepatterns,andmanyfolkshaveskillsfromboth,sometimesplayingtheroleoftheinvestigatingDataScientist,andthen“changinghats”andwritingahardeneddataprocessingsystem.Nonetheless,itcanbeilluminatingtoconsiderthetwopersonasandtheirrespectiveusecasesseparately.

DataScienceTasks

DataScienceisthenameofadisciplinethathasbeenemergingoverthepastfewyearscenteredaroundanalyzingdata.Whilethereisnostandarddefinition,forourpurposesaDataScientist

11issomebodywhosemaintaskistoanalyzeandmodeldata.DatascientistsmayhaveexperienceusingSQL,statistics,predictivemodeling(machinelearning),andsomeprogramming,usuallyinPython,MatlaborR.Datascientistsalsohaveexperiencewithtechniquesnecessarytotransformdataintoformatsthatcanbeanalyzedforinsights(sometimesreferredtoasdatawrangling).

DataScientistsusetheirskillstoanalyzedatawiththegoalofansweringaquestionordiscoveringinsights.Oftentimes,theirworkflowinvolvesad-hocanalysis,andsotheyuseinteractiveshells(vs.buildingcomplexapplications)thatletthemseeresultsofqueriesandsnippetsofcodeintheleastamountoftime.Spark’sspeedandsimpleAPIsshineforthispurpose,anditsbuilt-inlibrariesmeanthatmanyalgorithmsareavailableoutofthebox.Sometimes,aftertheinitialexplorationphase,theworkofaDataScientistwillbe“productionized”,orextended,hardened(i.e.madefaulttolerant),andtunedtobecomeaproductiondataprocessingapplication,whichitselfisacomponentofabusinessapplication.Forexample,theinitialinvestigationofaDataScientistmightleadtothecreationofaproductionrecommendersystemthatisintegratedintoonawebapplicationandusedtogeneratecustomizedproductsuggestionstousers.OftenitisadifferentpersonorteamthatleadstheprocessofproductizingtheworkoftheDataScientists,andthatpersonisoftenanEngineer.

DataProcessingApplications

TheothermainusecaseofSparkcanbedescribedinthecontextoftheEngineerpersona.Forourpurposeshere,wethinkofEngineersaslargeclassofsoftwaredeveloperswhouseSparktobuildproductiondataprocessingapplications.Thesedevelopersusuallyhaveanunderstandingoftheprinciplesofsoftwareengineering,suchasencapsulation,interfacedesign,andObjectOrientedProgramming.TheyfrequentlyhaveadegreeinComputerScience.Theyusetheirengineeringskillstodesignandbuildsoftwaresystemsthatimplementabusinessusecase.ForEngineers,Sparkprovidesasimplewaytoparallelizetheseapplicationsacrossclusters,andhidesthecomplexityofdistributedsystemsprogramming,networkcommunicationandfaulttolerance.Thesystemgivesenoughcontroltomonitor,inspectandtuneapplicationswhileallowingcommontaskstobeimplementedquickly.ThemodularnatureoftheAPI(basedonpassingdistributedcollectionsofobjects)makesiteasytofactorworkintoreusablelibrariesandtestitlocally.

Spark’suserschoosetouseitfortheirdataprocessingapplicationsbecauseitprovidesawidevarietyoffunctionality,iseasytolearnanduse,andismatureandreliable.

12ABriefHistoryofSpark

Sparkisanopensourceprojectthathasbeenbuiltandismaintainedbyathrivinganddiversecommunityofdevelopersfrommanydifferentorganizations.IfyouoryourorganizationaretryingSparkforthefirsttime,youmightbeinterestedinthehistoryoftheproject.Sparkstartedin2009asaresearchprojectintheUCBerkeleyRADLab,latertobecometheAMPLab.TheresearchersinthelabhadpreviouslybeenworkingonHadoopMapReduce,andobservedthatMapReducewasinefficientforiterativeandinteractivecomputingjobs.Thus,fromthebeginning,Sparkwasdesignedtobefastforinteractivequeriesanditerativealgorithms,bringinginideaslikesupportforin-memorystorageandefficientfaultrecovery.

ResearchpaperswerepublishedaboutSparkatacademicconferencesandsoonafteritscreationin2009,itwasalready10—20xfasterthanMapReduceforcertainjobs.

SomeofSpark’sfirstuserswereothergroupsinsideofUCBerkeley,includingmachinelearningresearcherssuchasthetheMobileMillenniumproject,whichusedSparktomonitorandpredicttrafficcongestionintheSanFranciscobayArea.Inaveryshorttime,however,manyexternalorganizationsbeganusingSpark,andtoday,over50organizationslistthemselvesontheSparkPoweredBypageSparkMeetups[1],anddozensspeakabouttheirusecasesatSparkcommunityeventssuchas

[3].

[2]andtheSparkSummitApartfromUCBerkeley,majorcontributorstothe

projectcurrentlyincludeYahoo!,IntelandDatabricks.

In2011,theAMPLabstartedtodevelophigher-levelcomponentsonSpark,suchasShark(HiveonSpark)andSparkStreaming.TheseandothercomponentsareoftenreferredtoastheBerkeleyDataAnalyticsStack(BDAS)[4].

BDASincludesbothcomponentsofSparkandother

softwareprojectsthatcomplementit,suchastheTachyonmemorymanager.

SparkwasfirstopensourcedinMarch2010,andwastransferredtotheApacheSoftwareFoundationinJune2013,whereitisnowatop-levelproject.

SparkVersionsandReleases

SinceitscreationSparkhasbeenaveryactiveprojectandcommunity,withthenumberofcontributorsgrowingwitheachrelease.Spark1.0hadover100individualcontributors.Thoughthelevelofactivityhasrapidlygrown,thecommunitycontinuestoreleaseupdatedversionsofSparkonaregularschedule.Spark1.0wasreleasedinMay2014.ThisbookfocusesprimarilyonSpark1.0andbeyond,thoughmostoftheconceptsandexamplesalsoworkinearlierversions.

13SparkandHadoop

SparkcancreatedistributeddatasetsfromanyfilestoredintheHadoopdistributedfilesystem(HDFS)orotherstoragesystemssupportedbyHadoop(includingyourlocalfilesystem,AmazonS3,Cassandra,Hive,HBase,etc).Sparksupportstextfiles,SequenceFiles,Avro,Parquet,andanyotherHadoopInputFormat.Wewilllookatinteractingwiththesedatasourcesintheloadingandsavingchapter.

[1]https://cwiki.apache.org/confluence/display/SPARK/Powered+By+Sparkhttp://www.meetup.com/spark-users/http://spark-summit.orghttps://amplab.cs.berkeley.edu/software[2][3][4]14Chapter2.DownloadingandGettingStarted

InthischapterwewillwalkthroughtheprocessofdownloadingandrunningSparkinlocalmodeonasinglecomputer.ThischapterwaswrittenforanybodythatisnewtoSpark,includingbothDataScientistsandEngineers.

SparkcanbeusedfromPython,JavaorScala.Tobenefitfromthisbook,youdon’tneedtobeanexpertprogrammer,butwedoassumethatyouarecomfortablewiththebasicsyntaxofatleastoneoftheselanguages.Wewillincludeexamplesinalllanguageswhereverpossible.

SparkitselfiswritteninScala,andrunsontheJavaVirtualMachine(JVM).TorunSparkoneitheryourlaptoporacluster,allyouneedisaninstallationofJava6(ornewer).IfyouwishtousethePythonAPIyouwillalsoneedaPythoninterpreter(version2.6ornewer).SparkdoesnotyetworkwithPython3.

DownloadingSpark

ThefirststeptousingSparkistodownloadandunpackitintoausableform.Let’sstartbydownloadingarecentprecompiledreleasedversionofSpark.Visit

http://spark.apache.org/downloads.html,thenunder“Pre-builtpackages”,nextto“ForHadoop1(HDP1,CDH3)”,click“directfiledownload”.Thiswilldownloadacompressedtarfile,or“tarball,”calledspark-1.0.0-bin-hadoop1.tgz.

IfyouwanttouseSparkwithanotherHadoopversion,thosearealsoavailablefromhttp://spark.apache.org/downloads.htmlbutwillhaveslightlydifferentfilenames.Buildingfromsourceisalsopossible,andyoucanfindthelatestsourcecodeonGitHubathttp://github.com/apache/spark.

Note

MostUnixandLinuxvariants,includingMacOSX,comewithacommand-linetoolcalledtarthatcanbeusedtounpacktarfiles.Ifyouroperatingsystemdoesnothavethetarcommandinstalled,trysearchingtheInternetforafreetarextractor—forexample,onWindows,youmaywishtotry7-Zip.

NowthatwehavedownloadedSpark,let’sunpackitandtakealookatwhatcomeswiththedefaultSparkdistribution.Todothat,openaterminal,changetothedirectorywhereyoudownloadedSpark,anduntarthefile.Thiswillcreateanewdirectorywiththesamenamebut

15withoutthefinal.tgzsuffix.Changeintothedirectory,andseewhat’sinside.Youcanusethefollowingcommandstoaccomplishallofthat.cd~

tar-xfspark-1.0.0-bin-hadoop1.tgzcdspark-1.0.0-bin-hadoop1ls

Inthelinecontainingthetarcommandabove,thexflagtellstarweareextractingfiles,andthefflagspecifiesthenameofthetarball.ThelscommandliststhecontentsoftheSparkdirectory.Let’sbrieflyconsiderthenamesandpurposeofsomeofthemoreimportantfilesanddirectoriesyouseeherethatcomewithSpark.

README.md-ContainsshortinstructionsforgettingstartedwithSpark.bin-ContainsexecutablefilesthatcanbeusedtointeractwithSparkinvariousways,e.g.thespark-shell,whichwewillcoverlaterinthischapter,isinhere.core,streaming,python-sourcecodeofmajorcomponentsoftheSparkproject.examples-containssomehelpfulSparkstandalonejobsthatyoucanlookatandruntolearnabouttheSparkAPI.Don’tworryaboutthelargenumberofdirectoriesandfilestheSparkprojectcomeswith;wewillcovermostoftheseintherestofthisbook.Fornow,let’sdiveinrightawayandtryoutSpark’sPythonandScalashells.WewillstartbyrunningsomeoftheexamplesthatcomewithSpark.Thenwewillwrite,compileandrunasimpleSparkJobofourown.

AlloftheworkwewilldointhischapterwillbewithSparkrunningin“localmode”,i.e.non-distributedmode,whichonlyusesasinglemachine.Sparkcanruninavarietyofdifferentmodes,orenvironments.Beyondlocalmode,SparkcanalsoberunonMesos,YARN,ontopofaStandaloneSchedulerthatisincludedintheSparkdistribution.Wewillcoverthevariousdeploymentmodesindetailinchapter(tocome).

IntroductiontoSpark’sPythonandScalaShells

Sparkcomeswithinteractiveshellsthatmakead-hocdataanalysiseasy.Spark’sshellswillfeelfamiliarifyouhaveusedothershellssuchasthoseinR,Python,andScala,oroperatingsystemshellslikeBashortheWindowscommandprompt.

Unlikemostothershells,however,whichletyoumanipulatedatausingthediskandmemoryonasinglemachine,Spark’sshellsallowyoutointeractwithdatathatisdistributedondiskorin

16memoryacrossmanymachines,andSparktakescareofautomaticallydistributingthisprocessing.

BecauseSparkcanloaddataintomemory,manydistributedcomputations,evenonesthatprocessterabytesofdataacrossdozensofmachines,canfinishrunninginafewseconds.Thismakesthesortofiterative,ad-hoc,andexploratoryanalysiscommonlydoneinshellsagoodfitforSpark.SparkprovidesbothPythonandScalashellsthathavebeenaugmentedtosupportconnectingtoacluster.

Note

MostofthisbookincludescodeinallofSpark’slanguages,butinteractiveshellsareonlyavailableinPythonandScala.BecauseashellisveryusefulforlearningtheAPI,werecommendusingoneoftheselanguagesfortheseexamplesevenifyouareaJavadeveloper.TheAPIisthesameineverylanguage.

TheeasiestwaytodemonstratethepowerofSpark’sshellsistostartusingoneofthemforsomesimpledataanalysis.Let’swalkthroughtheexamplefromtheQuickStartGuideintheofficialSparkdocumentation[5].

ThefirststepistoopenuponeofSpark’sshells.ToopenthePythonversionoftheSparkShell,whichwealsorefertoasthePySparkShell,gointoyourSparkdirectoryandtype:bin/pyspark

(Orbin\\pysparkinWindows.)ToopentheScalaversionoftheshell,type:bin/spark-shell

Theshellpromptshouldappearwithinafewseconds.Whentheshellstarts,youwillnoticealotoflogmessages.Youmayneedtohit[Enter]oncetoclearthelogoutput,andgettoashellprompt.FigureFigure2-1showswhatthePySparkshelllookslikewhenyouopenit.

17Figure2-1.ThePySparkShellWithDefaultLoggingOutputYoumayfindtheloggingstatementsthatgetprintedintheshelldistracting.Youcancontroltheverbosityofthelogging.Todothis,youcancreateafileintheconfdirectorycalledlog4j.properties.TheSparkdevelopersalreadyincludeatemplateforthisfilecalledlog4j.properties.template.

To

make

the

logging

less

verbose,

make

a

copy

of

conf/log4j.properties.templatecalledconf/log4j.propertiesandfindthefollowingline:log4j.rootCategory=INFO,console

ThenlowertheloglevelsothatweonlyshowWARNmessageandabovebychangingittothefollowing:

log4j.rootCategory=WARN,console

Whenyoure-opentheshell,youshouldseelessoutput.

18Figure2-2.ThePySparkShellWithLessLoggingOutputUsingIPython

IPythonisanenhancedPythonshellthatmanyPythonusersprefer,offeringfeaturessuchastabcompletion.Youcanfindinstructionsforinstallingitathttp://ipython.org.YoucanuseIPythonwithSparkbysettingtheIPYTHONenvironmentvariableto1:IPYTHON=1./bin/pyspark

TousetheIPythonNotebook,whichisawebbrowserbasedversionofIPython,use:IPYTHON_OPTS=\"notebook\"./bin/pyspark

OnWindows,settheenvironmentvariableandruntheshellasfollows:setIPYTHON=1bin\\pyspark

InSparkweexpressourcomputationthroughoperationsondistributedcollectionsthatareautomaticallyparallelizedacrossthecluster.ThesecollectionsarecalledaResilient

19DistributedDatasets,orRDDs.RDDsareSpark’sfundamentalabstractionfordistributeddataandcomputation.

BeforewesaymoreaboutRDDs,let’screateoneintheshellfromalocaltextfileanddosomeverysimplead-hocanalysisbyfollowingtheexamplebelow.

Example2-1.Pythonlinecount>>>lines=sc.textFile(\"README.md\")#CreateanRDDcalledlines>>>lines.count()#CountthenumberofitemsinthisRDD127

>>>lines.first()#FirstiteminthisRDD,i.e.firstlineofREADME.mdu'#ApacheSpark'

Example2-2.Scalalinecountscala>vallines=sc.textFile(\"README.md\")//CreateanRDDcalledlineslines:spark.RDD[String]=MappedRDD[...]

scala>lines.count()//CountthenumberofitemsinthisRDDres0:Long=127

scala>lines.first()//FirstiteminthisRDD,i.e.firstlineofREADME.mdres1:String=#ApacheSpark

Toexittheshell,youcanpressControl+D.

Intheexampleabove,thevariablescalledlinesareRDDs,createdherefromatextfileonourlocalmachine.WecanrunvariousparalleloperationsontheRDDs,suchascountingthenumberofelementsinthedataset(herelinesoftextinthefile)orprintingthefirstone.WewilldiscussRDDsingreatdepthinlaterchapters,butbeforewegoanyfurther,let’stakeamomentnowtointroducebasicSparkconcepts.

IntroductiontoCoreSparkConcepts

NowthatyouhaverunyourfirstSparkcodeusingtheshell,it’stimelearnaboutprogramminginitinmoredetail.

Atahighlevel,everySparkapplicationconsistsofadriverprogramthatlaunchesvariousparalleloperationsonacluster.Thedriverprogramcontainsyourapplication’smainfunctionanddefinesdistributeddatasetsonthecluster,thenappliesoperationstothem.Intheexamples

20above,thedriverprogramwastheSparkshellitself,andyoucouldjusttypeintheoperationsyouwantedtorun.

DriverprogramsaccessSparkthroughaSparkContextobject,whichrepresentsaconnectiontoacomputingcluster.Intheshell,aSparkContextisautomaticallycreatedforyou,asthevariablecalledsc.Tryprintingoutsctoseeitstype:>>>sc

OnceyouhaveaSparkContext,youcanuseittobuildresilientdistributeddatasets,orRDDs.Intheexampleabove,wecalledSparkContext.textFiletocreateanRDDrepresentingthelinesoftextinafile.Wecanthenrunvariousoperationsontheselines,suchascount().

Toruntheseoperations,driverprogramstypicallymanageanumberofnodescalledexecutors.Forexample,ifwewererunningthecount()aboveonacluster,differentmachinesmightcountlinesindifferentrangesofthefile.BecausewejustrantheSparkshelllocally,itexecutedallitsworkonasinglemachine—butyoucanconnectthesameshelltoaclustertoanalyzedatainparallel.Figure2-3showshowSparkexecutesonacluster.

Figure2-3.ComponentsfordistributedexecutioninSparkFinally,alotofSpark’sAPIrevolvesaroundpassingfunctionstoitsoperatorstorunthemonthecluster.Forexample,wecouldextendourREADMEexamplebyfilteringthelinesinthefilethatcontainaword,suchas“Python”:

21Example2-3.Pythonfilteringexample>>>lines=sc.textFile(\"README.md\")

>>>pythonLines=lines.filter(lambdaline:\"Python\"inline)>>>pythonLines.first()u'##InteractivePythonShell'

Example2-4.Scalafilteringexamplescala>vallines=sc.textFile(\"README.md\")//CreateanRDDcalledlineslines:spark.RDD[String]=MappedRDD[...]

scala>valpythonLines=lines.filter(line=>line.contains(\"Python\"))pythonLines:spark.RDD[String]=FilteredRDD[...]scala>lines.first()

res0:String=##InteractivePythonShell

Note

Ifyouareunfamiliarwiththelambdaor=>syntaxabove,itisashorthandwaytodefinefunctionsinlineinPythonandScala.WhenusingSparkintheselanguages,youcanalsodefineafunctionseparatelyandthenpassitsnametoSpark.Forexample,inPython:defhasPython(line):

return\"Python\"inline

pythonLines=lines.filter(hasPython)

PassingfunctionstoSparkisalsopossibleinJava,butinthiscasetheyaredefinedasclasses,implementinganinterfacecalledFunction.Forexample:JavaRDDpythonLines=lines.filter(newFunction(){

Booleancall(Stringline){returnline.contains(\"Python\");}});

Java8introducesshorthandsyntaxcalled“lambdas”thatlookssimilartoPythonandScala.Hereishowthecodewouldlookwiththissyntax:

22JavaRDDpythonLines=lines.filter(line->line.contains(\"Python\"));WediscusspassingfunctionsfurtherinPassingFunctionstoSpark.

WhilewewillcovertheSparkAPIinmoredetaillater,alotofitsmagicisthatfunction-basedoperationslikefilteralsoparallelizeacrossthecluster.Thatis,Sparkautomaticallytakesyourfunction(e.g.line.contains(\"Python\"))andshipsittoexecutornodes.Thus,youcanwritecodeinasingledriverprogramandautomaticallyhavepartsofitrunonmultiplenodes.Chapter3coverstheRDDAPIinmoredetail.

StandaloneApplications

ThefinalpiecemissinginthisquicktourofSparkishowtouseitinstandaloneprograms.Apartfromrunninginteractively,SparkcanbelinkedintostandaloneapplicationsineitherJava,ScalaorPython.ThemaindifferencefromusingitintheshellisthatyouneedtoinitializeyourownSparkContext.Afterthat,theAPIisthesame.

TheprocessoflinkingtoSparkvariesbylanguage.InJavaandScala,yougiveyourapplicationaMavendependencyonthespark-coreartifactpublishedbyApache.Asofthetimeofwriting,thelatestSparkversionis1.0.0,andtheMavencoordinatesforthatare:groupId=org.apache.sparkartifactId=spark-core_2.10version=1.0.0

IfyouareunfamiliarwithMaven,itisapopularpackagemanagementtoolforJava-basedlanguagesthatletsyoulinktolibrariesinpublicrepositories.YoucanuseMavenitselftobuildyourproject,oruseothertoolsthatcantalktotheMavenrepositories,includingScala’sSBTtoolorGradle.PopularintegrateddevelopmentenvironmentslikeEclipsealsoallowyoutodirectlyaddaMavendependencytoaproject.

InPython,yousimplywriteapplicationsasPythonscripts,butyoumustinsteadrunthemusingaspecialbin/spark-submitscriptincludedinSpark.ThisscriptsetsuptheenvironmentforSpark’sPythonAPItofunction.Simplyrunyourscriptwith:bin/spark-submitmy_script.py

(NotethatyouwillhavetousebackslashesinsteadofforwardslashesonWindows.)

23Note

InSparkversionsbefore1.0,usebin/pysparkmy_script.pytorunPythonapplicationsinstead.FordetailedexamplesoflinkingapplicationstoSpark,refertotheQuickStartGuideanappendix.

[6]inthe

officialSparkdocumentation.Inafinalversionofthebook,wewillalsoincludefullexamplesin

InitializingaSparkContext

OnceyouhavelinkedanapplicationtoSpark,youneedtoimporttheSparkpackagesinyourprogramandcreateaSparkContext.ThisisdonebyfirstcreatingaSparkConfobjecttoconfigureyourapplication,andthenbuildingaSparkContextforit.Hereisashortexampleineachsupportedlanguage:

Example2-5.InitializingSparkinPythonfrompysparkimportSparkConf,SparkContext

conf=SparkConf().setMaster(\"local\").setAppName(\"MyApp\")sc=SparkContext(conf)

Example2-6.InitializingSparkinJavaimportorg.apache.spark.SparkConf;

importorg.apache.spark.api.java.JavaSparkContext;

SparkConfconf=newSparkConf().setMaster(\"local\").setAppName(\"MyApp\");JavaSparkContextsc=newJavaSparkContext(conf);

Example2-7.InitializingSparkinScalaimportorg.apache.spark.SparkConfimportorg.apache.spark.SparkContextimportorg.apache.spark.SparkContext._

valconf=newSparkConf().setMaster(\"local\").setAppName(\"MyApp\")valsc=newSparkContext(\"local\\"MyApp\")

TheseexamplesshowtheminimalwaytoinitializeaSparkContext,whereyoupasstwoparameters:

AclusterURL,namely“local”intheseexamples,whichtellsSparkhowtoconnecttoacluster.“local”isaspecialvaluethatrunsSparkononethreadonthelocalmachine,withoutconnectingtoacluster.24Anapplicationname,namely“MyApp”intheseexamples.Thiswillidentifyyourapplicationontheclustermanager’sUIifyouconnecttoacluster.Additionalparametersexistforconfiguringhowyourapplicationexecutesoraddingcodetobeshippedtothecluster,butwewillcovertheseinlaterchaptersofthebook.

AfteryouhaveinitializedaSparkContext,youcanuseallthemethodsweshowedbeforetocreateRDDs(e.g.fromatextfile)andmanipulatethem.

Finally,toshutdownSpark,youcaneithercallthestop()methodonyourSparkContext,orsimplyexittheapplication(e.g.withSystem.exit(0)orsys.exit()).

ThisquickoverviewshouldbeenoughtoletyourunastandaloneSparkapplicationonyourlaptop.Formoreadvancedconfiguration,alaterchapterinthebookwillcoverhowtoconnectyourapplicationtoacluster,includingpackagingyourapplicationsothatitscodeisautomaticallyshippedtoworkernodes.Fornow,pleaserefertotheQuickStartGuideofficialSparkdocumentation.

[7]inthe

Conclusion

Inthischapter,wehavecovereddownloadingSpark,runningitlocallyonyourlaptop,andusingiteitherinteractivelyorfromastandaloneapplication.WegaveaquickoverviewofthecoreconceptsinvolvedinprogrammingwithSpark:adriverprogramcreatesaSparkContextandRDDs,andthenrunsparalleloperationsonthem.Inthenextchapter,wewilldivemoredeeplyintohowRDDsoperate.

[5]http://spark.apache.org/docs/latest/quick-start.html[6]http://spark.apache.org/docs/latest/quick-start.html[7]http://spark.apache.org/docs/latest/quick-start.html25Chapter3.ProgrammingwithRDDs

ThischapterintroducesSpark’scoreabstractionforworkingwithdata,theResilientDistributedDataset(RDD).AnRDDissimplyadistributedcollectionofelements.InSparkallworkisexpressedaseithercreatingnewRDDs,transformingexistingRDDs,orcallingoperationsonRDDstocomputearesult.Underthehood,SparkautomaticallydistributesthedatacontainedinRDDsacrossyourclusterandparallelizestheoperationsyouperformonthem.

BothDataScientistsandEngineersshouldreadthischapter,asRDDsarethecoreconceptinSpark.Wehighlyrecommendthatyoutrysomeoftheseexamplesinaninteractiveshell(seeIntroductiontoSpark’sPythonandScalaShells).Inaddition,allcodeinthischapterisavailableinthebook’sGitHubrepository.

RDDBasics

AnRDDinSparkissimplyadistributedcollectionofobjects.EachRDDissplitintomultiplepartitions,whichmaybecomputedondifferentnodesofthecluster.RDDscancontainanytypeofPython,JavaorScalaobjects,includinguser-definedclasses.

UserscreateRDDsintwoways:byloadinganexternaldataset,orbydistributingacollectionofobjectsintheirdriverprogram.WehavealreadyseenloadingatextfileasanRDDofstringsusingSparkContext.textFile():

Example3-1.CreatinganRDDofstringswithtextFile()inPython>>>lines=sc.textFile(\"README.md\")

Oncecreated,RDDsoffertwotypesofoperations:transformationsandactions.TransformationsconstructanewRDDfromapreviousone.Forexample,onetransformationwesawbeforeisfilteringdatathatmatchesapredicate.Inourtextfileexample,wecanusethistocreateanewRDDholdingjustthestringsthatcontain“Python”:>>>pythonLines=lines.filter(lambdaline:\"Python\"inline)

Actions,ontheotherhand,computearesultbasedonanRDD,andeitherreturnittothedriverprogramorsaveittoanexternalstoragesystem(e.g.,HDFS).Oneexampleofanactionwecalledearlierisfirst(),whichreturnsthefirstelementinanRDD:>>>pythonLines.first()

26u'##InteractivePythonShell'

ThedifferencebetweentransformationsandactionsisduetothewaySparkcomputesRDDs.AlthoughyoucandefinenewRDDsanytime,Sparkonlycomputestheminalazyfashion,thefirsttimetheyareusedinanaction.Thisapproachmightseemunusualatfirst,butmakesalotofsensewhenworkingwithbigdata.Forinstance,considertheexampleabove,wherewedefinedatextfileandthenfilteredthelineswith“Python”.IfSparkweretoloadandstoreallthelinesinthefileassoonaswewrotelines=sc.textFile(...),itwouldwastealotofstoragespace,giventhatwethenimmediatelyfilteroutmanylines.Instead,onceSparkseesthewholechainoftransformations,itcancomputejustthedataneededforitsresult.Infact,forthefirst()action,Sparkonlyscansthefileuntilitfindsthefirstmatchingline;itdoesn’tevenreadthewholefile.

Finally,Spark’sRDDsarebydefaultrecomputedeachtimeyourunanactiononthem.IfyouwouldliketoreuseanRDDinmultipleactions,youcanaskSparktopersistitusingRDD.persist().Aftercomputingitthefirsttime,SparkwillstoretheRDDcontentsinmemory(partitionedacrossthemachinesinyourcluster),andreusetheminfutureactions.PersistingRDDsondiskinsteadofmemoryisalsopossible.Thebehaviorofnotpersistingbydefaultmayagainseemunusual,butitmakesalotofsenseforbigdatasets:ifyouwillnotreusetheRDD,there’snoreasontowastestoragespacewhenSparkcouldinsteadstreamthroughthedataonceandjustcomputetheresult.[8]Inpractice,youwilloftenusepersisttoloadasubsetofyourdataintomemoryandqueryitrepeatedly.Forexample,ifweknewthatwewantedtocomputemultipleresultsabouttheREADMElinesthatcontain“Python”,wecouldwrite:>>>pythonLines.persist()>>>pythonLines.count()2

>>>pythonLines.first()u'##InteractivePythonShell'

Tosummarize,everySparkprogramandshellsessionwillworkasfollows:

1.CreatesomeinputRDDsfromexternaldata.2.TransformthemtodefinenewRDDsusingtransformationslikefilter().3.AskSparktopersist()anyintermediateRDDsthatwillneedtobereused.274.Launchactionssuchascount()andfirst()tokickoffaparallelcomputation,whichisthenoptimizedandexecutedbySpark.Intherestofthischapter,we’llgothrougheachofthesestepsindetail,andcoversomeofthemostcommonRDDoperationsinSpark.

CreatingRDDs

SparkprovidestwowaystocreateRDDs:loadinganexternaldatasetandparallelizingacollectioninyourdriverprogram.

ThesimplestwaytocreateRDDsistotakeanexistingin-memorycollectionandpassittoSparkContext’sparallelizemethod.ThisapproachisveryusefulwhenlearningSpark,sinceyoucanquicklycreateyourownRDDsintheshellandperformoperationsonthem.Keepinmindhowever,thatoutsideofprototypingandtesting,thisisnotwidelyusedsinceitrequiresyouhaveyourentiredatasetinmemoryononemachine.

Example3-2.Pythonparallelizeexamplelines=sc.parallelize([\"pandas\\"ilikepandas\"])

Example3-3.Scalaparallelizeexamplevallines=sc.parallelize(List(\"pandas\\"ilikepandas\"))

Example3-4.JavaparallelizeexampleJavaRDDlines=sc.parallelize(Arrays.asList(\"pandas\\"ilikepandas\"));

AmorecommonwaytocreateRDDsistoloaddatainexternalstorage.LoadingexternaldatasetsiscoveredindetailinChapter5.However,wealreadysawonemethodthatloadsatextfileasanRDDofstrings,SparkContext.textFile:

Example3-5.PythontextFileexamplelines=sc.textFile(\"/path/to/README.md\")

Example3-6.ScalatextFileexamplevallines=sc.textFile(\"/path/to/README.md\")

Example3-7.JavatextFileexampleJavaRDDlines=sc.textFile(\"/path/to/README.md\");

RDDOperations

RDDssupporttwotypesofoperations,transformationsandactions.TransformationsareoperationsonRDDsthatreturnanewRDD,suchasmapandfilter.Actionsareoperationsthatreturnaresultbacktothedriverprogramorwriteittostorage,andkickoffacomputation,such

28ascountandfirst.Sparktreatstransformationsandactionsverydifferently,sounderstandingwhichtypeofoperationyouareperformingwillbeimportant.Ifyouareeverconfusedwhetheragivenfunctionisatransformationorandaction,youcanlookatitsreturntype:transformationsreturnRDDswhereasactionsreturnsomeotherdatatype.

Transformations

TransformationsareoperationsonRDDsthatreturnanewRDD.Asdiscussedinthelazyevaluationsection,transformedRDDsarecomputedlazily,onlywhenyouusetheminanaction.Manytransformationsareelement-wise,thatistheyworkononeelementatatime,butthisisnottrueforalltransformations.

Asanexample,supposethatwehavealogfile,log.txt,withanumberofmessages,andwewanttoselectonlytheerrormessages.Wecanusethefiltertransformationseenbefore.Thistimethough,we’llshowafilterinallthreeofSpark’slanguageAPIs:

Example3-8.PythonfilterexampleinputRDD=sc.textFile(\"log.txt\")

errorsRDD=inputRDD.filter(lambdax:\"error\"inx)

Example3-9.ScalafilterexamplevalinputRDD=sc.textFile(\"log.txt\")

valerrorsRDD=inputRDD.filter(line=>line.contains(\"error\"))

Example3-10.JavafilterexampleJavaRDDinputRDD=sc.textFile(\"log.txt\");JavaRDDerrorsRDD=inputRDD.filter(newFunction(){

publicBooleancall(Stringx){returnx.contains(\"error\");}});

NotethatthefilteroperationdoesnotmutatetheexistinginputRDD.Instead,itreturnsapointertoanentirelynewRDD.inputRDDcanstillbere-usedlaterintheprogram,forinstance,tosearchforotherwords.Infact,let’suseinputRDDagaintosearchforlineswiththeword“warning”inthem.Then,we’lluseanothertransformation,union,toprintoutthenumberoflinesthatcontainedeither“error”or“warning”.WeshowPythonhere,buttheunion()functionisidenticalinallthreelanguages:

Example3-11.PythonunionexampleerrorsRDD=inputRDD.filter(lambdax:\"error\"inx)warningsRDD=inputRDD.filter(lambdax:\"warning\"inx)

29badLinesRDD=errorsRDD.union(warningsRDD)

unionisabitdifferentthanfilter,inthatitoperatesontwoRDDsinsteadofone.TransformationscanactuallyoperateonanynumberofinputRDDs.

Finally,asyouderivenewRDDsfromeachotherusingtransformations,SparkkeepstrackofthesetofdependenciesbetweendifferentRDDs,calledthelineagegraph.ItusesthisinformationtocomputeeachRDDondemandandtorecoverlostdataifpartofapersistentRDDislost.WewillshowalineagegraphforthisexampleinFigure3-1.

Actions

We’veseenhowtocreateRDDsfromeachotherwithtransformations,butatsomepoint,we’llwanttoactuallydosomethingwithourdataset.ActionsarethesecondtypeofRDDoperation.Theyaretheoperationsthatreturnafinalvaluetothedriverprogramorwritedatatoanexternalstoragesystem.ActionsforcetheevaluationofthetransformationsrequiredfortheRDDtheyarecalledon,sincetheyarerequiredtoactuallyproduceoutput.

Continuingthelogexamplefromtheprevioussection,wemightwanttoprintoutsomeinformationaboutthebadLinesRDD.Todothat,we’llusetwoactions,count(),whichreturnsthecountasanumber,andtake(),whichcollectsanumberofelementsfromtheRDD.

Example3-12.Pythonerrorcountexampleusingactionsprint\"Inputhad\"+badLinesRDD.count()+\"concerninglines\"print\"Hereare10examples:\"forlineinbadLinesRDD.take(10):

printline

Example3-13.Scalaerrorcountexampleusingactionsprintln(\"Inputhad\"+badLinesRDD.count()+\"concerninglines\")println(\"Hereare10examples:\")badLinesRDD.take(10).foreach(println)

Example3-14.JavaerrorcountexampleusingactionsSystem.out.println(\"Inputhad\"+badLinesRDD.count()+\"concerninglines\")System.out.println(\"Hereare10examples:\")for(Stringline:badLinesRDD.take(10)){System.out.println(line);}

Inthisexample,weusedtake()toretrieveasmallnumberofelementsintheRDDatthedriverprogram.Wetheniterateoverthemlocallytoprintoutinformationatthedriver.RDDsalso

30haveacollect()functiontoretrievetheentireRDD.ThiscanbeusefulifyourprogramfiltersRDDsdowntoaverysmallsizeandyou’dliketodealwithitlocally.Keepinmindthatyourentiredatasetmustfitinmemoryonasinglemachinetousecollect()onit,socollect()shouldn’tbeusedonlargedatasets.

InmostcasesRDDscan’tjustbecollect()‘edtothedriverbecausetheyaretoolarge.Inthesecases,it’scommontowritedataouttoadistributedstoragesystemssuchasHDFSorAmazonS3.ThecontentsofanRDDcanbesavedusingthesaveAsTextFileaction,saveAsSequenceFileoranyofanumberactionsforvariousbuilt-informats.WewillcoverthedifferentoptionsforexportingdatalateroninChapter5.

Theimagebelowpresentsthelineagegraphforthisentireexample,startingwithourinputRDDandendingwiththetwoactions.Itisimportanttonotethateachtimewecallanewaction,theentireRDDmustbecomputed“fromscratch”.Toavoidthisinefficiency,userscanpersistintermediateresults,aswewillcoverinPersistence(Caching).

Figure3-1.RDDlineagegraphcreatedduringloganalysis.LazyEvaluation

TransformationsonRDDsarelazilyevaluated,meaningthatSparkwillnotbegintoexecuteuntilitseesanaction.Thiscanbesomewhatcounter-intuitivefornewusers,butmaybefamiliarforthosewhohaveusedfunctionallanguagessuchasHaskellorLINQ-likedataprocessingframeworks.

31LazyevaluationmeansthatwhenwecallatransformationonanRDD(forinstancecallingmap),theoperationisnotimmediatelyperformed.Instead,Sparkinternallyrecordsmeta-datatoindicatethisoperationhasbeenrequested.RatherthanthinkingofanRDDascontainingspecificdata,itisbesttothinkofeachRDDasconsistingofinstructionsonhowtocomputethedatathatwebuildupthroughtransformations.LoadingdataintoanRDDislazilyevaluatedinthesamewaytransformationsare.Sowhenwecallsc.textFilethedataisnotloadeduntilitisnecessary.Likewithtransformations,theoperation(inthiscasereadingthedata)canoccurmultipletimes.

Tip

Althoughtransformationsarelazy,forceSparktoexecutethematanytimebyrunninganaction,suchascount().Thisisaneasywaytotestoutjustpartofyourprogram.

Sparkuseslazyevaluationtoreducethenumberofpassesithastotakeoverourdatabygroupingoperationstogether.InMapReducesystemslikeHadoop,developersoftenhavetospendalottimeconsideringhowtogrouptogetheroperationstominimizethenumberofMapReducepasses.InSpark,thereisnosubstantialbenefittowritingasinglecomplexmapinsteadofchainingtogethermanysimpleoperations.Thus,usersarefreetoorganizetheirprogramintosmaller,moremanageableoperations.

PassingFunctionstoSpark

MostofSparktransformations,andsomeofitsactions,dependonpassinginfunctionsthatareusedbySparktocomputedata.EachofthecorelanguageshasaslightlydifferentmechanismforpassingfunctionstoSpark.

Python

InPython,wehavethreeoptionsforpassingfunctionsintoSpark.Forshorterfunctionwecanpassinlambdaexpressions,aswehavedoneintheexampleatthestartofthischapter.Wecanalsopassintop-levelfunctions,orlocallydefinedfunctions.

Example3-15.PassingalambdainPythonword=rdd.filter(lambdas:\"error\"ins)Passingatop-levelPythonfunction.defcontainsError(s):

return\"error\"ins

32word=rdd.filter(containsError)

Oneissuetowatchoutforwhenpassingfunctionsifthatifyoupassfunctionsthataremembersofanobject,orreferencestofieldsinanobject(e.g.,self.field),thisresultsinsendingintheentireobject,whichcanbemuchlargerthanjustthebitofinformationyouneed.Sometimesthiscanalsocauseyourprogramtofail,ifyourclasscontainsobjectsthatPythoncan’tfigureouthowtopickle.

Example3-16.Passingafunctionwithfieldreferences(don’tdothis!)classSearchFunctions(object):def__init__(self,query):

self.query=querydefisMatch(self,s):

returnqueryins

defgetMatchesFunctionReference(self,rdd):

#Problem:referencesallof\"self\"in\"self.isMatch\"returnrdd.filter(self.isMatch)

defgetMatchesMemberReference(self,rdd):

#Problem:referencesallof\"self\"in\"self.query\"returnrdd.filter(lambdax:self.queryinx)

Instead,justextractthefieldsyouneedfromyourobjectintolocalvariableandpassthatin,likewedobelow:

Example3-17.PythonfunctionpassingwithoutfieldreferencesclassWordFunctions(object):...

defgetMatchesNoReference(self,rdd):

#Safe:extractonlythefieldweneedintoalocalvariablequery=self.query

returnrdd.filter(lambdax:queryinx)

Scala

InScala,wecanpassinfunctionsdefinedinlineorreferencestomethodsorstaticfunctionsaswedoforScala’sotherfunctionalAPIs.Someotherconsiderationscomeintoplaythough,namelythatthefunctionwepassandthedatareferencedinitneedstobeSerializable(implementingJava’sSerializableinterface).Furthermore,likeinPython,passingamethodorfieldofanobjectincludesareferencetothatwholeobject,thoughthisislessobviousbecausewearenotforcedtowritethesereferenceswithself.LikehowwedidwithPython,wecan

33insteadextractoutthefieldsweneedaslocalvariablesandavoidneedingtopassthewholeobjectcontainingthem.

Example3-18.ScalafunctionpassingclassSearchFunctions(valquery:String){defisMatch(s:String):Boolean={s.contains(query)}

defgetMatchesFunctionReference(rdd:RDD[String]):RDD[String]={//Problem:\"isMatch\"means\"this.isMatch\sowepassallof\"this\"rdd.map(isMatch)}

defgetMatchesFieldReference(rdd:RDD[String]):RDD[String]={//Problem:\"query\"means\"this.query\sowepassallof\"this\"rdd.map(x=>x.split(query))}

defgetMatchesNoReference(rdd:RDD[String]):RDD[String]={//Safe:extractjustthefieldweneedintoalocalvariablevalquery_=this.queryrdd.map(x=>x.split(query_))}}

Ifyou“NotSerializableException”errorsinScala,areferencetoamethodorfieldinanon-serializableclassisusuallytheproblem.Notethatpassinginlocalvariablesorfunctionsthataremembersofatop-levelobjectisalwayssafe.

Java

InJava,functionsarespecifiedasobjectsthatimplementoneofSpark’sfunctioninterfacesfromtheorg.apache.spark.api.java.functionpackage.Thereareanumberofdifferentinterfacesbasedonthereturntypeofthefunction.Weshowthemostbasicfunctioninterfacesbelow,andcoveranumberofotherfunctioninterfacesforwhenweneedtoreturnspecialtypesofdatainthesectiononconvertingbetweenRDDtypes.

34Table3-1.StandardJavafunctioninterfacesFunctionnamemethodtoimplementUsageFunctionRcall(T)

Takeinoneinputandreturnoneoutput,forusewiththingslikemapandfilter.

Takeintwoinputsandreturnoneoutput,forusewiththingslikeaggregateorfold.

Takeinoneinputandreturnzeroormoreoutputs,forusewiththingslikeflatMap.

Function2Rcall(T1,T2)FlatMapFunction

Iterablecall(T)

Wecaneitherdefineourfunctionclassesin-lineasanonymousinnerclasses,ormakeanamedclass:

Example3-19.JavafunctionpassingwithanonymousinnerclassRDDerrors=lines.filter(newFunction(){publicBooleancall(Stringx){returns.contains(\"error\");}});

Example3-20.JavafunctionpassingwithnamedclassclassContainsErrorimplementsFunction(){publicBooleancall(Stringx){returnx.contains(\"error\");}}

RDDerrors=lines.filter(newContainsError());

Thestyletochooseisapersonalpreference,butwefindthattop-levelnamedfunctionsareoftencleanerfororganizinglargeprograms.Oneotherbenefitoftop-levelfunctionsisthatyoucangivethemconstructorparameters:

Example3-21.JavafunctionclasswithparametersclassContainsimplementsFunction(){privateStringquery;

publicContains(Stringquery){this.query=query;}publicBooleancall(Stringx){returnx.contains(query);}}

RDDerrors=lines.filter(newContains(\"error\"));

35InJava8,youcanalsouselambdaexpressionstoconciselyimplementtheFunctioninterfaces.SinceJava8isstillrelativelynewasofthewritingofthisbook,ourexamplesusethemoreverbosesyntaxfordefiningclassesinpreviousversionsofJava.However,withlambdaexpressions,oursearchexamplewouldlooklikethis:

Example3-22.JavafunctionpassingwithlambdaexpressioninJava8RDDerrors=lines.filter(s->s.contains(\"error\"));

IfyouareinterestedinusingJava8’slambdaexpression,refertoOracle’sdocumentationandtheDatabricksblogpostonhowtouselambdaswithSpark.

Tip

Bothanonymousinnerclassesandlambdaexpressionscanreferenceanyfinalvariablesinthemethodenclosingthem,soyoucanpassthesevariablestoSparkjustlikeinPythonandScala.

CommonTransformationsandActions

Inthischapter,wetourthemostcommontransformationsandactionsinSpark.AdditionaloperationsareavailableonRDDscontainingcertaintypeofdata—forexample,statisticalfunctionsonRDDsofnumbers,andkey-valueoperationssuchasaggregatingdatabykeyonRDDsofkey-valuepairs.WecoverconvertingbetweenRDDtypesandthesespecialoperationsinlatersections.

BasicRDDs

WewillbeginbyevaluatingwhatoperationswecandoonallRDDsregardlessofthedata.ThesetransformationsandactionsareavailableonallRDDclasses.

Transformations

Element-wisetransformations

ThetwomostcommontransformationsyouwilllikelybeperformingonbasicRDDsaremap,andfilter.ThemaptransformationtakesinafunctionandappliesittoeachelementintheRDDwiththeresultofthefunctionbeingthenewvalueofeachelementintheresultingRDD.ThefiltertransformationtakeinafunctionandreturnsanRDDwhichonlyhaselementsthatpassthefilterfunction.

36Figure3-2.MapandfilteronanRDDWecanusemaptodoanynumberofthingsfromfetchingthewebsiteassociatedwitheachURLinourcollection,tojustsquaringthenumbers.WithScalaandpythonyoucanusethestandardanonymousfunctionnotationorpassinafunction,andwithJavayoushoulduseSpark’sFunctionclassfromorg.apache.spark.api.java.functionorJava8functions.

Itisusefultonotethatthereturntypeofthemapdoesnothavetobethesameastheinputtype,soifwehadanRDDofcustomerIDsandourmapfunctionweretofetchthecorrespondingcustomerrecordsthetypeofourinputRDDwouldbeRDD[CustomerID]andthetypeoftheresultingRDDwouldbeRDD[CustomerRecord].

LetslookatabasicexampleofmapwhichsquaresallofthenumbersinanRDD:

Example3-23.PythonsquaringthevalueinanRDDnums=sc.parallelize([1,2,3,4])

squared=nums.map(lambdax:x*x).collect()fornuminsquared:

print\"%i\"%(num)

Example3-24.ScalasquaringthevaluesinanRDDvalinput=sc.parallelize(List(1,2,3,4))valresult=input.map(x=>x*x)println(result.collect())

Example3-25.JavasquaringthevaluesinanRDDJavaRDDrdd=sc.parallelize(Arrays.asList(1,2,3,4));JavaRDDresult=rdd.map(newFunction(){publicIntegercall(Integerx){returnx*x;}});

System.out.println(StringUtils.join(result.collect(),\

Sometimeswewanttoproducemultipleoutputelementsforeachinputelement.TheoperationtodothisiscalledflatMap.Likewithmap,thefunctionweprovidetoflatMapiscalled

37individuallyforeachelementinourinputRDD.Insteadofreturningasingleelement,wereturnaniteratorwithourreturnvalues.RatherthanproducinganRDDofiterators,wegetbackanRDDwhichconsistsoftheelementsfromalloftheiterators.AsimpleexampleofflatMapissplittingupaninputstringintowords,asshownbelow.

Example3-26.PythonflatMapexample,splittinglinesintowordslines=sc.parallelize([\"helloworld\\"hi\"])words=lines.flatMap(lambdaline:line.split(\"\"))words.first()#returns\"hello\"

Example3-27.ScalaflatMapexample,splittinglinesintomultiplewordsvallines=sc.parallelize(List(\"helloworld\\"hi\"))valwords=lines.flatMap(line=>line.split(\"\"))words.first()//returns\"hello\"

Example3-28.ScalaflatMapexample,splittinglinesintomultiplewordsJavaRDDlines=sc.parallelize(Arrays.asList(\"helloworld\\"hi\"));JavaRDDwords=rdd.flatMap(newFlatMapFunction(){publicIterablecall(Stringline){returnArrays.asList(line.split(\"\"));}});

words.first();//returns\"hello\"

PseudoSetOperations

Figure3-3.Somesimplesetoperations(imagetoberedone)RDDssupportmanyoftheoperationsofmathematicalsets,suchasunionandintersection,evenwhentheRDDsthemselvesnotproperlysets.

38ThesetpropertymostfrequentlymissingfromourRDDsistheuniquenessofelements.IfweonlywantuniqueelementswecanusetheRDD.distinct()transformationtoproduceanewRDDwithonlydistinctitems.Notethatdistinct()isexpensive,however,asitrequiresshufflingallthedataoverthenetworktoensurethatweonlyreceiveonecopyofeachelement.

Thesimplestsetoperationisunion(other),whichgivesbackanRDDconsistingofthedatafrombothsources.Thiscanbeusefulinanumberofusecases,suchasprocessinglogfilesfrommanysources.Unlikethemathematicalunion(),ifthereareduplicatesintheinputRDDs,theresultofSpark’sunion()willcontainduplicates(whichwecanfixifdesiredwithdistinct()).

Sparkalsoprovidesanintersection(other)method,whichreturnsonlyelementsinbothRDDs.intersection()alsoremovesallduplicates(includingduplicatesfromasingleRDD)whilerunning.Whileintersectionandunionaretoverysimilarconcepts,theperformanceofintersectionismuchworsesinceitrequiresashuffleoverthenetworktoidentifycommonelements.

Sometimesweneedtoremovesomedatafromconsideration.Thesubtract(other)functiontakesinanotherRDDandreturnsanRDDthatonlyhasvaluespresentinthefirstRDDandnotthesecondRDD.

WecanalsocomputeaCartesianproductbetweentwoRDDs.Thecartesian(other)transformationresultsinpossiblepairsof(a,b)whereaisinthesourceRDDandbisintheotherRDD.TheCartesianproductcanbeusefulwhenwewishtoconsiderthesimilaritybetweenallpossiblepairs,suchascomputingeveryusersexpectedinterestsineachoffer.WecanalsotaketheCartesianproductofanRDDwithitself,whichcanbeusefulfortaskslikecomputingusersimilarity.

39Figure3-4.CartesianproductbetweentwoRDDsThetablesbelowsummarizecommonsingle-RDDandmulti-RDDtransformations.

Table3-2.BasicRDDtransformationsonanRDDcontaining{1,2,3,3}FunctionNamePurposeExampleResultApplyafunctiontoeach

map

elementintheRDDandreturnanRDDoftheresultApplyafunctiontoeachelementintheRDDand

flatMap

returnanRDDofthecontentsoftheiteratorsreturned.Oftenusedtoextractwords.

ReturnanRDDconsistingof

filterdistinct

sample(withReplacement,fraction,[seed])

onlyelementswhichpasstheconditionpassedtofilterRemoveduplicatesSampleanRDD

rdd.map(x=>x+1){2,3,4,4}

rdd.flatMap(x=>x.to(3)){1,2,3,2,3,3,3}

rdd.filter(x=>x!=1)rdd.distinct()rdd.sample(false,0.5){2,3,3}{1,2,3}

non-deterministic

Table3-3.Two-RDDtransformationsonRDDscontaining{1,2,3}and{3,4,5}FunctionNamePurposeExampleResultunionintersectionsubtractcartesian

ProduceanRDDcontainelementsfrombothRDDs

RDDcontainingonlyelementsfoundinbothRDDs

RemovethecontentsofoneRDD(e.g.removetrainingdata)

CartesianproductwiththeotherRDD

rdd.union(other){1,2,3,3,4,5}

rdd.intersection(other){3}rdd.subtract(other)rdd.cartesian(other)

{1,2}

{(1,3),(1,4),…(3,5)}

AsyoucanseethereareawidevarietyoftransformationsavailableonallRDDsregardlessofourspecificunderlyingdata.Wecantransformourdataelement-wise,obtaindistinctelements,anddoavarietyofsetoperations.

40Actions

ThemostcommonactiononbasicRDDsyouwilllikelyuseisreduce.ReducetakesinafunctionwhichoperatesontwoelementsofthesametypeofyourRDDandreturnsanewelementofthesametype.Asimpleexampleofsuchafunctionis+,whichwecanusetosumourRDD.WithreducewecaneasilysumtheelementsofourRDD,countthenumberofelements,andperformothertypesofaggregations.

Example3-29.Pythonreduceexamplesum=rdd.reduce(lambdax,y:x+y)

Example3-30.Scalareduceexamplevalsum=rdd.reduce((x,y)=>x+y)

Example3-31.JavareduceexampleIntegersum=rdd.reduce(newFunction2(){publicIntegercall(Integerx,Integery){returnx+y;}});

Similartoreduceisfoldwhichalsotakesafunctionwiththesamesignatureasneededforreduce,butalsotakesa“zerovalue”tobeusedfortheinitialcalloneachpartition.Thezerovalueyouprovideshouldbetheidentityelementforyouroperation,thatisapplyingitmultipletimeswithyourfunctionshouldnotchangethevalue,(e.g.0for+,1for*,oranemptylistforconcatenation).

Tip

Youcanminimizeobjectcreationinfoldbymodifyingandreturningthefirstofthetwoparametersin-place.However,youshouldnotmodifythesecondparameter.

FoldandreducebothrequirethatthereturntypeofourresultbethesametypeasthatoftheRDDweareoperatingover.Thisworkswellfordoingthingslikesum,butsometimeswewanttoreturnadifferenttype.Forexamplewhencomputingtherunningaverageweneedtohaveadifferentreturntype.Wecouldimplementthisusingamapfirstwherewetransformeveryelementintotheelementandthenumber1sothatthereducefunctioncanworkonpairs.TheaggregatefunctionfreesusfromtheconstraintofhavingthereturnthesametypeastheRDDwhichweareworkingon.Withaggregate,likefold,wesupplyaninitialzerovalueofthetypewewanttoreturn.WethensupplyafunctiontocombinetheelementsfromourRDDwiththeaccumulator.Finally,weneedtosupplyasecondfunctiontomergetwoaccumulators,giventhateachnodeaccumulatesitsownresultslocally.

41WecanuseaggregatetocomputetheaverageofaRDDavoidingamapbeforethefold.

Example3-32.PythonaggregateexamplesumCount=nums.aggregate((0,0),

(lambdax,y:(x[0]+y,x[1]+1),(lambdax,y:(x[0]+y[0],x[1]+y[1]))))

returnsumCount[0]/float(sumCount[1])

Example3-33.Scalaaggregateexamplevalresult=input.aggregate((0,0))(

(x,y)=>(x._1+y,x._2+1),(x,y)=>(x._1+y._1,x._2+y._2))

valavg=result._1/result._2.toDouble

Example3-34.JavaaggregateexampleclassAvgCount{

publicAvgCount(inttotal,intnum){this.total=total;this.num=num;}

publicinttotal;publicintnum;publicdoubleavg(){

returntotal/(double)num;}}

Function2addAndCount=newFunction2(){publicAvgCountcall(AvgCounta,Integerx){a.total+=x;a.num+=1;returna;}};

Function2combine=newFunction2(){publicAvgCountcall(AvgCounta,AvgCountb){a.total+=b.total;a.num+=b.num;returna;}

42};

AvgCountinitial=newAvgCount(0,0);

AvgCountresult=rdd.aggregate(initial,addAndCount,combine);System.out.println(result.avg());

SomeactionsonRDDsreturnsomeorallofthedatatoourdriverprogramintheformofaregularcollectionorvalue.

Thesimplestandmostcommonoperationthatreturnsdatatoourdriverprogramiscollect(),whichreturnstheentireRDD’scontents.collectsuffersfromtherestrictionthatallofyourdatamustfitonasinglemachine,asitallneedstobecopiedtothedriver.

take(n)returnsnelementsfromtheRDDandattemptstominimizethenumberofpartitionsitaccesses,soitmayrepresentabiasedcollection.Itsimportanttonotethattheseoperationsdonotreturntheelementsintheorderyoumightexpect.

Theseoperationsareusefulforunittestsandquickdebugging,butmayintroducebottleneckswhendealingwithlargeamountsofdata.

Ifthereisanorderingdefinedonourdata,wecanalsoextractthetopelementsfromanRDDusingtop.topwillusethedefaultorderingonthedata,butwecansupplyourowncomparisonfunctiontoextractthetopelements.

Sometimesweneedasampleofourdatainourdriverprogram.The

takeSample(withReplacement,num,seed)functionallowsustotakeasampleofourdataeitherwithorwithoutreplacement.Formorecontrol,wecancreateaSampledRDDandcollectwhichwewilltalkaboutintheSamplingyourdatasectionintheSimpleOptimizationschapter.ThefurtherstandardoperationsonabasicRDDallbehaveprettymuchexactlyasonewouldimaginefromtheirname.count()returnsacountoftheelements,andcountByValue()returnsamapofeachuniquevaluetoitscount.SeethebasicRDDactionstableformoreactions.

Table3-4.BasicactionsonanRDDcontaining{1,2,3,3}FunctionNamePurposeExampleResultcollect()count()take(num)

ReturnallelementsfromtheRDDNumberofelementsintheRDD

ReturnnumelementsfromtheRDD

rdd.collect()rdd.count()rdd.take(2)

{1,2,3,3}4{1,2}

43FunctionNamePurposeExampleResulttop(num)

ReturnthetopnumelementstheRDDReturnnumelements

rdd.top(2)

rdd.takeOrdered(2)(myOrdering)

rdd.takeSample(false,1)

{3,3}

takeOrdered(num)(ordering)basedonproviding

ordering

takeSample(withReplacement,Returnnumelementsnum,[seed])reduce(func)

atrandom

Combinetheelements

{3,3}non-deterministic9

oftheRDDtogetherinrdd.fold((x,y)=>x+y)parallel(e.g.sum)Sameasreducebut

fold(zero)(func)withtheprovidedzerordd.fold(0)((x,y)=>x+y)9value

rdd.aggregate(0,0)({case

Similartoreducebut(x,y)=>(y._1()+x,y._2()usedtoreturnadifferenttypeApplytheprovided

+1)},{case(x,y)=>(y._1()+x._1(),y._2()+x._2())})

(9,4)

aggregate(zeroValue)(seqOp,combOp)

foreach(func)functiontoeachelementoftheRDD

rdd.foreach(func)nothing

ConvertingBetweenRDDTypes

Wedon’thavetodoanythingspecialtogetbackthecorrecttemplated/generictypeofRDD(thatis,ourRDDofStringscanbecomeanRDDofIntegersjustbycallingmapwiththecorrectfunction).SomefunctionsareonlyavailableoncertaintypesofRDDs,suchasaverageonnumericRDDsandjoinonkey-valuepairRDDs.Wewillcoverthesespecialfunctionsfornumericdatain(tocome)andpairRDDsinChapter4.

InScalaandJava,thesemethodsaren’tdefinedonthestandardRDDclass,sotoaccessthisadditionalfunctionalitywehavetomakesurewegetthecorrectspecializedclass.

Scala

InScalatheconversionbetweensuchRDDs(likefromRDD[Double]andRDD[Numeric]toDoubleRDD)ishandledautomaticallyusingimplicitconversions.Amentionedinstandard

44imports,weneedtoaddimportorg.apache.spark.SparkContext._fortheseconversiontowork.YoucanseetheimplicitconversionslistedintheobjectSparkContextintheSparksourcecodeat./core/src/main/scala/org/apache/spark/SparkContext.scala.TheseimplicitsalsoallowforRDDsofScalatypestobewrittenouttoHDFSandsimilar.

Implicits,whilequitepowerful,cansometimesbeconfusing.Ifyoucallafunctionlikesaystats()onanRDD,youmightlookatthescaladocsfortheRDDclassandnoticethereisnostats()function.ThecallmanagestosucceedbecauseofimplicitconversionsbetweenRDD[Numeric]andDoubleRDDFunctions.WhenlookingforfunctionsonyourRDDinScaladocmakesuretolookatfunctionsthatareavailableintheotherRDDclasses.

Java

InJavatheconversionbetweenthespecializedtypesofRDDsisabitmoreexplicit.Thishasthebenefitofgivingyouagreaterunderstandingofwhatexactlyisgoingon,butcanbeabitmorecumbersome.

ToallowSparktodeterminethecorrectreturntype,insteadofalwaysusingtheFunctionclasswewillneedtousespecializedversions.IfwewanttocreateaDoubleRDDfromanRDDoftypeT,ratherthanusingFunctionweuseDoubleFunction.ThespecialJavafunctionstableshowsthespecializedfunctionsandtheiruses.

WealsoneedtocalldifferentfunctionsonourRDD(sowecan’tjustcreateaDoubleFunctionandpassittomap).WhenwewantaDoubleRDDbackinsteadofcallingmapweneedtocallmapToDoublewiththesamepatternfollowedwithallotherfunctions.

Table3-5.Javainterfacesfortype-specificfunctionsFunctionnameEquivalentFunction*UsageDoubleFlatMapFunctionFunction>DoubleFunctionPairFlatMapFunction

PairFunction

Function

DoubleRDDfromaflatMapToDoubleDoubleRDDfrommapToDoubleflatMapToPairPairRDDfromamapToPair

FunctionfromaV>>>

Function>

45WecanmodifyourpreviousexamplewherewesquaredanRDDofnumberstoproduceaJavaDoubleRDD.ThisgivesusaccesstotheadditionalDoubleRDDspecificfunctionslikeaverageandstats.

Example3-35.JavacreateDoubleRDDexampleJavaDoubleRDDresult=rdd.mapToDouble(newDoubleFunction(){publicdoublecall(Integerx){return(double)x*x;}});

System.out.println(result.average());

Python

ThePythonAPIisstructuredabitdifferentthanboththeJavaandScalaAPI.LiketheScalaAPI,wedon’tneedtobeexplicittoaccessthefunctionswhichareonlyavailableonDoubleorPairRDDs.InPythonallofthefunctionsareimplementedonthebaseRDDclassandwillsimplyfailatruntimeifthetypedoesn’twork.

Persistence(Caching)

Asdiscussedearlier,SparkRDDsarelazilyevaluated,andsometimeswemaywishtousethesameRDDmultipletimes.Ifwedothisnaively,SparkwillrecomputetheRDDandallofitsdependencieseachtimewecallanactionontheRDD.Thiscanbeespeciallyexpensiveforiterativealgorithms,whichlookatthedatamanytimes.AnothertrivialexamplewouldbedoingacountandthenwritingoutthesameRDD.

Example3-36.Scaladoubleexecuteexamplevalresult=input.map(x=>x*x)println(result.count())

println(result.collect().mkString(\

ToavoidcomputinganRDDmultipletimes,wecanaskSparktopersistthedata.WhenweaskSparktopersistanRDD,thenodesthatcomputetheRDDstoretheirpartitions.Ifanodethathasdatapersistedonitfails,Sparkwillrecomputethelostpartitionsofthedatawhenneeded.Wecanalsoreplicateourdataonmultiplenodesifwewanttobeabletohandlenodefailurewithoutslowdown.

46Sparkhasmanylevelsofpersistencetochosefrombasedonwhatourgoalsare.InScalaandJava,thedefaultpersist()willstorethedataintheJVMheapasunserializedobjects.InPython,wealwaysserializethedatathatpersiststores,sothedefaultisinsteadstoredintheJVMheapaspickledobjects.Whenwewritedataouttodiskoroff-heapstoragethatdataisalsoalwaysserialized.

Tip

Off-heapcachingisexperimentalandusesTachyon.Ifyouareinterestedinoff-heapcachingwithSpark,takealookattherunningSparkonTachyonguide.

Table3-6.PersistencelevelsLevelSpaceUsedCPUtimeInOnNodeswithdataCommentsmemoryDiskMEMORY_ONLYMEMORY_ONLY_2MEMORY_ONLY_SERMEMORY_ONLY_SER_2MEMORY_AND_DISK

HighLowHighLowLowLow

HighHigh

YYYY

NNNN

1212

Spillstodiskifthereistoomuchdatatofitinmemory.

Spillstodiskifthereis

HighMediumSomeSome1

MEMORY_AND_DISK_2HighMediumSomeSome2toomuchdatatofitinmemory.

Spillstodiskifthereis

MEMORY_AND_DISK_SERLowHighSomeSome1toomuchdatatofitinmemory.

Spillstodiskifthereis

MEMORY_AND_DISK_SER_2LowDISK_ONLYDISK_ONLY_2

Example3-37.ScalapersistexampleHighHighHigh

SomeNN

Some2YY

12

toomuchdatatofitinmemory.

LowLow

valresult=input.map(x=>x*x)result.persist(MEMORY_ONLY)println(result.count())

println(result.collect().mkString(\

47Tip

YouwillnotethatwecalledpersistontheRDDbeforethefirstaction.Thepersistcallonitsowndoesn’tforceevaluation.

Ifyouattempttocachetoomuchdatatofitinmemory,SparkwillautomaticallyevictoldpartitionsusingaLeastRecentlyUsed(LRU)cachepolicy.Forthememory-onlystoragelevels,itwillrecomputethesepartitionsthenexttimetheyareaccessed,whileforthememory-and-diskones,itwillwritethemouttodisk.Ineithercase,thismeansthatyoudon’thavetoworryaboutyourjobbreakingifyouaskSparktocachetoomuchdata.However,cachingunnecessarydatacanleadtoevictionofusefuldataandmorerecomputationtime.Finally,RDDscomewithamethodcalledunpersist()thatletsyoumanuallyremovethemfromthecache.

Conclusion

Inthischapter,wehavecoveredtheRDDexecutionmodelandalargenumberofcommonoperationsonRDDs.Ifyouhavegottenhere,congratulations—you’velearnedallthecoreconceptsofworkinginSpark.Inthenextchapter,we’llcoveraspecialsetofoperationsavailableonRDDsofkey-valuepairs,whicharethemostcommonwaytoaggregateorgrouptogetherdatainparallel.Afterthat,wediscussinputandoutputfromavarietyofdatasources,andmoreadvancedtopicsinworkingwithSparkContext.

[8]TheabilitytoalwaysrecomputeanRDDisactuallywhyRDDsarecalled“resilient”.Whena

machineholdingRDDdatafails,Sparkusesthisabilitytorecomputethemissingpartitions,transparenttotheuser.

48Chapter4.WorkingwithKey-ValuePairs

ThischaptercovershowtoworkwithRDDsofkey-valuepairs,whichareacommondatatyperequiredformanyoperationsinSpark.Key-valueRDDsexposenewoperationssuchasaggregatingdataitemsbykey(e.g.,countingupreviewsforeachproduct),groupingtogetherdatawiththesamekey,andgroupingtogethertwodifferentRDDs.Oftentimes,toworkwithdatarecordsinSpark,youwillneedtoturnthemintokey-valuepairsandapplyoneoftheseoperations.

WealsodiscussanadvancedfeaturethatletsuserscontrolthelayoutofpairRDDsacrossnodes:partitioning.Usingcontrollablepartitioning,applicationscansometimesgreatlyreducecommunicationcosts,byensuringthatdatathatwillbeaccessedtogetherwillbeonthesamenode.Thiscanprovidesignificantspeedups.WeillustratepartitioningusingthePageRankalgorithmasanexample.Choosingtherightpartitioningforadistributeddatasetissimilartochoosingtherightdatastructureforalocalone—inbothcases,datalayoutcangreatlyaffectperformance.

Motivation

SparkprovidesspecialoperationsonRDDscontainingkey-valuepairs.TheseRDDsarecalledPairRDDs.PairRDDsareausefulbuildingblockinmanyprograms,astheyexposeoperationsthatallowyoutoactoneachkeyinparallelorregroupdataacrossthenetwork.Forexample,pairRDDshaveareduceByKeymethodthatcanaggregatedataseparatelyforeachkey,andajoinmethodthatcanmergetwoRDDstogetherbygroupingelementswiththesamekey.ItiscommontoextractfieldsfromanRDD(representingforinstance,aneventtime,customerID,orotheridentifier)andusethosefieldsaskeysinPairRDDoperations.

CreatingPairRDDs

ThereareanumberofwaystogetPairRDDsinSpark.ManyformatsweexploreinChapter5willdirectlyreturnPairRDDsfortheirkey-valuedata.InothercaseswehavearegularRDDthatwewanttoturnintoaPairRDDs.ToillustratecreatingaPairRDDswewillkeyourdatabythefirstwordineachlineoftheinput.

InPython,forthefunctionsonkeyeddatatoworkweneedtomakesureourRDDconsistsoftuples.

49Example4-1.PythoncreatepairRDDusingthefirstwordasthekeyinput.map(lambdax:(x.split(\"\")[0],x))

InScala,tocreatePairRDDsfromaregularRDD,wesimplyneedtoreturnatuplefromourfunction.

Example4-2.ScalacreatepairRDDusingthefirstwordasthekeyinput.map(x=>(x.split(\"\")(0),x))

Javadoesn’thaveabuilt-intupletype,soSpark’sJavaAPIhasuserscreatetuplesusingthescala.Tuple2class.Thisclassisverysimple:JavauserscanconstructanewtuplebywritingnewTuple2(elem1,elem2)andcanthenaccesstheelementswiththe._1()and._2()methods.JavausersalsoneedtocallspecialversionsofSpark’sfunctionswhencreatingPairRDDs.Forinstance,themapToPairfunctionshouldbeusedinplaceofthebasicmapfunction.ThisisdiscussedinmoredetailinconvertingbetweenRDDtypes,butletslookatasimpleexamplebelow.

Example4-3.JavacreatepairRDDusingthefirstwordasthekeyPairFunctionkeyData=newPairFunction(){publicTuple2call(Stringx){returnnewTuple2(x.split(\"\")[0],x);}};

JavaPairRDDrdd=input.mapToPair(keyData);

WhencreatingaPairRDDfromaninmemorycollectioninScalaandPythonweonlyneedtomakesurethetypesofourdataarecorrect,andcallparallelize.TocreateaPairRDDinJavafromaninmemorycollectionweneedtomakesureourcollectionconsistsoftuplesandalsocallSparkContext.parallelizePairsinsteadofSparkContext.parallelize.

TransformationsonPairRDDs

PairRDDsareallowedtouseallthetransformationsavailabletostandardRDDs.Thesamerulesfrompassingfunctionstosparkapply.SincePairRDDscontaintuples,weneedtopassfunctionsthatoperateontuplesratherthanonindividualelements.

InJavaandScalawhenwerunamaporfilterorsimilaroveraPairRDDs,ourfunctionwillbepassedaninstanceofscala.Tuple2.InScalapatternmatchingisacommonwayofextractingthe

50individualvalues,whereasinJavaweusethe._1()and._2()valuestoaccesstheelements.InPythonourPairRDDsconsistofstandardPythontupleobjectsthatweinteractwithasnormal.Forinstance,wecancreatetakeourPairRDDfromtheprevioussectionthenfilteroutlineslongerthan20characters.

Example4-4.Pythonsimplefilteronsecondelementresult=pair.filter(lambdax:len(x[1])<20)

Example4-5.Scalasimplefilteronsecondelementpair.filter{case(x,y)=>y.length<20}

Example4-6.JavasimplefilteronsecondelementFunction,Boolean>longWordFilter=newFunction,Boolean>(){publicBooleancall(Tuple2input){return(input._2().length()<20);}};

JavaPairRDDresult=rdd.filter(longWordFilter);

SometimesworkingwiththesepairscanbeawkwardifweonlywanttoaccessthevaluepartofourPairRDD.Sincethisacommonpattern,SparkprovidesthemapValues(func)functionwhichisthesameasmap{case(x,y)⇒(x,func(y)}andwewillusethisfunctioninmanyofourexamples.

Aggregations

Whendatasetsaredescribedintermsofkey-valuepairs,itiscommontowanttoaggregatestatisticsacrossallelementswiththesamekey.Wehavelookedatthefold,combine,andreduceactionsonbasicRDDs,andsimilarper-keytransformationsexistonPairRDDs.Sparkhasasimilarsetofoperationswhichcombinethevaluestogetherwhichhavethesamekey.InsteadofbeingactionstheseoperationsreturnRDDsandassucharetransformations.

reduceByKeyisquitesimilartoreduce,bothtakeafunctionanduseittocombinevalues.reduceByKeyrunsseveralparallelreduceoperations,oneforeachkeyinthedataset,whereeachoperationcombinesvaluestogetherwhichhavethesamekey.Becausedatasetscanhaveverylargenumbersofkeys,reduceByKeyisnotimplementedasanactionthatreturnsavaluebacktotheuserprogram.Instead,itreturnsanewRDDconsistingofeachkeyandthereducedvalueforthatkey.

51foldByKeyisquitesimilartofold,bothuseazerovalueofthesametypeofthedatainourRDDandcombinationfunction.LikewithfoldtheprovidedzerovalueforfoldByKeyshouldhavenoimpactwhenaddedwithyourcombinationfunctiontoanotherelement.

WecanusereduceByKeyalongwithmapValuestocomputetheper-keyaverageinaverysimilarmannertohowweusedfoldandmapcomputetheentireRDDaverage.Aswithaveraging,wecanachievethesameresultusingamorespecializedfunctionwewillcovernext.

Example4-7.PythonperkeyaveragewithreduceByKeyandmapValuesrdd.mapValues(lambdax:(x,1)).reduceByKey(lambdax,y:(x[0]+y[0],x[1]+y[1]))

Example4-8.ScalaperkeyaveragewithreduceByKeyandmapValuesrdd.mapValues(x=>(x,1)).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2))

Note

ThosefamiliarwiththecombinerconceptfromMapReduceshouldnotethatcallingreduceByKeyandfoldByKeywillautomaticallyperformcombininglocallyoneachmachinebeforecomputingglobaltotalsforeachkey.Theuserdoesnotneedtospecifyacombiner.ThemoregeneralcombineByKeyinterfaceallowsyoutocustomizecombiningbehavior.

Wecanuseasimilarapproachtoalsoimplementtheclassicdistributedwordcountproblem.WewilluseflatMapfromthepreviouschaptersothatwecanproduceaPairRDDofwordsandthenumber1andthensumtogetherallofthewordsusingreduceByKeylikeinourpreviousexample.

Example4-9.Pythonwordcountexamplerdd=sc.textFile(\"s3://...\")

words=rdd.flatMap(lambdax:x.split(\"\"))

result=words.map(lambdax:(x,1)).reduceByKey(lambdax,y:x+y)

Example4-10.Scalawordcountexamplevalinput=sc.textFile(\"s3://...\")

valwords=input.flatMap(x=>x.split(\"\"))

valresult=words.map(x=>(x,1)).reduceByKey((x,y)=>x+y)

Example4-11.JavawordcountexampleJavaRDDinput=sc.textFile(\"s3://...\")

JavaRDDwords=rdd.flatMap(newFlatMapFunction(){publicIterablecall(Stringx){returnArrays.asList(x.split(\"\"));}});

JavaPairRDDresult=words.mapToPair(newPairFunction(){

52publicTuple2call(Stringx){returnnewTuple2(x,1);}}).reduceByKey(

newFunction2(){

publicIntegercall(Integera,Integerb){returna+b;}});

Tip

WecanactuallyimplementwordcountevenfasterbyusingthecountByValue()functiononthefirstRDD:lines.flatMap(x=>x.split(\"\")).countByValue().

combineByKeyisthemostgeneraloftheper-keyaggregationfunctionsandprovidesflexibilityinhowhowthevaluesassociatedwitheachkeyarecombined.Mostoftheotherper-keycombinersareimplementedusingit.Likeaggregate,combineByKeyallowstheusertoreturnvalueswhicharenotthesametypeasourinputdata.TousecombineByKeyweneedtoprovideanumberofdifferentfunctions.

ThefirstrequiredfunctioniscalledcreateCombinerwhichshouldtakeasingleelementinthesourceRDDandreturnanelementofthedesiredtypeintheresultingRDD.Agoodexampleofthiswouldbeifwewerebuildingalistofallofthevaluesforeachkey,createCombinercouldreturnalistcontainingtheelementitwaspassedin.InimplementingfoldByKeycreateCombinercreatesanewinstanceoftheprovidedzerovalueandcombinesitwiththeinputvalue,andinreduceByKeyitistheidentityoperator(namelyitjustreturnstheinput).ThesecondrequiredfunctionismergeValuewhichtakesthecurrentaccumulatedvalueforthekeyandthenewvalueandreturnsanewaccumulatedvalueforthekey.IfwewantedtomakealistofelementswemighthavemergeValuesimplyappendthenewelementtothecurrentlist.WithreduceByKeyandfoldByKeythemergeValuefunctionisusedissimplytheuserprovidedmergefunction.mergeValueisusedtoupdatetheaccumulatedvalueforeachkeywhenprocessinganewelement.

ThefinalrequiredfunctionyouneedtoprovidetocombineByKeyismergeCombiners.Sincewedon’trunthroughtheelementslinearly,wecanhavemultipleaccumulatorsforeachkey.mergeCombinersmusttaketwoaccumulators(ofthetypereturnedbycreateCombiner)andreturnamergedresult.IfwewereusingcombineByKeytoimplementgroupByKeyourmergeCombinersfunctioncouldjustreturnthelistsasappendedtoeachother.InthecaseoffoldByKeyandreduceByKeysinceouraccumulatoristhesametypeasourdata,thecombineByKeyfunctiontheyuseisthesameasthemergeValuefunction.

53SincecombineByKeyhasalotofdifferentparametersitisagreatcandidateforanexplanatoryexample.TobetterillustratehowcombineByKeyworkswewilllookatcomputingtheaveragevalueforeachkey,sinceouraccumulatorwillbeofadifferenttypethan

Example4-12.Pythonper-keyaverageusingcombineByKeysumCount=nums.combineByKey((lambdax:(x,1)),

(lambdax,y:(x[0]+y,x[1]+1)),(lambdax,y:(x[0]+y[0],x[1]+y[1])))

sumCount.collectAsMap()

Example4-13.Scalaper-keyaverageusingcombineByKeyvalinput=sc.parallelize(List((\"coffee\1),(\"coffee\2),(\"panda\4)))valresult=input.combineByKey((v)=>(v,1),

(acc:(Int,Int),v)=>(acc._1+v,acc._2+1),

(acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2)).map{case(key,value)=>(key,value._1/value._2.toFloat)}result.collectAsMap().map(println(_))

Example4-14.Javaper-keyaverageusingcombineByKeyFunctioncreateAcc=newFunction(){@Override

publicAvgCountcall(Integerx){returnnewAvgCount(x,1);}};

Function2addAndCount=newFunction2(){@Override

publicAvgCountcall(AvgCounta,Integerx){a.total_+=x;a.num_+=1;returna;}};

Function2combine=newFunction2(){@Override

publicAvgCountcall(AvgCounta,AvgCountb){a.total_+=b.total_;a.num_+=b.num_;

54returna;}};

AvgCountinitial=newAvgCount(0,0);JavaPairRDDavgCounts=

rdd.combineByKey(createAcc,addAndCount,combine);MapcountMap=avgCounts.collectAsMap();for(Entryentry:countMap.entrySet()){System.out.println(entry.getKey()+\":\"+entry.getValue().avg());}

Therearemanyoptionsforcombiningourdatatogetherbykey.MostofthemareimplementedontopofcombineByKeybutprovideasimplerinterface.Usingoneofthespecializedper-keycombinersinSparkcanbemuchfasterthanthenaiveapproachofgroupingourdataandthenreducingthedata.

TuningtheLevelofParallelism

Sofarwehavetalkedabouthowallofourtransformationsaredistributed,butwehavenotreallylookedathowSparkdecideshowtosplitupthework.EveryRDDhasafixednumberofpartitionswhichdeterminethedegreeofparallelismtousewhenexecutingoperationsontheRDD.

Whenperformingaggregationsorgroupingoperations,wecanaskSparktouseaspecificnumberofpartitions.Sparkwillalwaystrytoinferasensibledefaultvaluebasedonthesizeofyourcluster,butinsomecasesuserswillwanttotunethelevelofparallelismforbetterperformance.

MostoftheoperatorsdiscussedinthischapteracceptasecondparametergivingthenumberofpartitionstousewhencreatingthegroupedoraggregatedRDD:

Example4-15.PythonreduceByKeywithcustomparallelismdata=[(\"a\3),(\"b\4),(\"a\1)]

sc.parallelize(data).reduceByKey(lambdax,y:x+y)sc.parallelize(data).reduceByKey(lambdax,y:x+y,10)

Example4-16.ScalareduceByKeywithcustomparallelism#Defaultparallelism#Customparallelism

valdata=Seq((\"a\3),(\"b\4),(\"a\1))sc.parallelize(data).reduceByKey(_+_)sc.parallelize(data).reduceByKey(_+_,10)

//Defaultparallelism//Customparallelism

55Sometimes,wewanttochangethepartitioningofanRDDoutsideofthecontextofgroupingandaggregationoperations.Forthosecases,Sparkprovidestherepartitionfunction.Keepinmindthatrepartitioningyourdataisafairlyexpensiveoperation.Sparkalsohasanoptimizedversionofrepartitioncalledcoalescethatallowsavoidingdatamovement,butonlyifyouaredecreasingthenumberofRDDpartitions.ToknowwhetheryoucansafelycallcoalesceyoucancheckthesizeoftheRDDusingrdd.partitions.size()inJava/Scalaandrdd.getNumPartitions()inPythonandmakesurethatyouarecoalescingitfewerpartitionsthatitcurrentlyhas.

GroupingData

Withkeyeddataacommonusecaseisgroupingourdatatogetherbykey,sayjoiningallofacustomersorderstogether.

IfourdataisalreadykeyedinthewaywhichweareinterestedgroupByKeywillgroupourdatatogetherusingthekeyinourRDD.OnanRDDconsistingofkeysoftypeKandvaluesoftypeVwegetbackanRDDoftype[K,Iterable[V]].

groupByworksonunpaireddataordatawherewewanttouseadifferentconditionbesidesequalityonthecurrentkey.groupBytakesafunctionwhichitappliestoeveryelementinthesourceRDDandusestheresulttodeterminethekey.

Tip

IfyoufindyourselfwritingcodewhereyougroupByKeyandthendoareduceorfoldonthevalues,youcanprobablyachievethesameresultmoreefficientlybyusingonoftheper-keycombiners.RatherthanreducingtheRDDdowntoaninmemoryvalue,thedataisreducedper-keyandwegetbackanRDDwiththereducedvaluescorrespondingtoeachkey.e.g.rdd.reduceByKey(func)producesthesameRDDasrdd.groupByKey().mapValues(value=>value.reduce(func))butismoreefficientasitavoidsthestepofcreatingalistofvaluesforeachkey.

InadditiontogroupingtogetherdatafromasingleRDD,wecangrouptogetherdatasharingthesamekeyfrommultipleRDDsusingafunctioncalledcogroupcogroupovertwoRDDssharingthesamekeytypeKwiththerespectivevaluetypesVandWgivesusebackRDD[(K,Tuple(Iterable[V],Iterable[W]))].IfoneoftheRDDsdoesn’thaveanelementsforagivenkeythatispresentintheotherRDDthecorrespondingIterableissimplyempty.cogroupgivesusthepowertogrouptogetherdatafrommultipleRDDs.

Thebasictransformationonwhichthejoinswediscussinthenextsectionareimplementediscogroup.cogroupreturnsaPairRDDwithanentryforeverykeyfoundinanyoftheRDDsitis

56calledonalongwithatupleofiteratorswitheachiteratorcontainingalloftheelementsinthecorrespondingRDDforthatkey.

Tip

cogroupcanbeusedformuchmorethanjustimplementingjoins.Wecanalsouseittoimplementintersectbykey.Additionally,cogroupcanworkonthreeRDDsatonce.

Joins

Someofthemostusefuloperationswegetwithkeyeddatacomesfromusingittogetherwithotherkeyeddata.JoiningdatatogetherisprobablyoneofthemostcommonoperationsonaPairRDD,andwehaveafullrangeofoptionsincludingrightandleftouterjoins,crossjoins,innerjoins.

Thesimplejoinoperatorisaninnerjoin.OnlykeyswhicharepresentinbothPairRDDsareoutput.WhentherearemultiplevaluesforthesamekeyinoneoftheinputstheresultingPairRDDwillalsohavemultipleentriesforthesamekey,withthevaluesbeingtheCartesianproductofthevaluesforthatkeyineachoftheinputRDDs.Asimplewaytounderstandthisisbylookingatanexampleofajoin.

Example4-17.ScalashellinnerjoinexamplestoreAddress={

(Store(\"Ritual\"),\"1026ValenciaSt\"),(Store(\"Philz\"),\"748VanNessAve\"),(Store(\"Philz\"),\"310124thSt\"),(Store(\"Starbucks\"),\"Seattle\")}storeRating={

(Store(\"Ritual\"),4.9),(Store(\"Philz\"),4.8))}storeAddress.join(storeRating)={

(Store(\"Ritual\"),(\"1026ValenciaSt\4.9)),(Store(\"Philz\"),(\"748VanNessAve\4.8)),(Store(\"Philz\"),(\"310124thSt\4.8))}

Sometimeswedon’tneedthekeytobepresentinbothRDDstowantitinourresult.Forexampleifwewerejoiningcustomerinformationwithrecommendationswemightnotwanttodropcustomersiftherewerenotanyrecommendationsyet.leftOuterJoin(other)andrightOuterJoin(other)bothjoinPairRDDstogetherbykeywhereoneofthePairRDDscanbemissingthekey.

57WithleftOuterJointheresultingPairRDDhasentriesforeachkeyinthesourceRDD.ThevalueassociatedwitheachkeyintheresultisatupleofthevaluefromthesourceRDDandanOption(orOptionalinJava)forthevaluefromtheotherPairRDD.InPythonifanvalueisn’tpresentNoneisusedandifthevalueispresenttheregularvalue,withoutanywrapper,isused.Likewithjoinwecanhavemultipleentriesforeachkeyandwhenthisoccurswegetthecartesianproductbetweenthetwolistofvalues.

Tip

OptionalispartofGoogle’sGuavalibraryandissimilartonullable.WecancheckisPresent()toseeifitsset,andget()willreturnthecontainedinstanceprovidedithasdatapresent.

rightOuterJoinisalmostidenticaltoleftOuterJoinexceptthekeymustbepresentintheotherRDDandthetuplehasanoptionforthesourceratherthanotherRDD.

WecanlookatourexamplefromearlieranddoaleftOuterJoinandarightOuterJoinbetweenthetwoPairRDDsweusedtoillustratejoin.

Example4-18.ScalashellleftOuterJoin/rightOuterJoinexamplesstoreAddress.leftOuterJoin(storeRating)={(Store(\"Ritual\"),(\"1026ValenciaSt\(Store(\"Starbucks\"),(\"Seattle\

(Store(\"Philz\"),(\"748VanNessAve\(Store(\"Philz\"),(\"310124thSt\storeAddress.rightOuterJoin(storeRating)={(Store(\"Ritual\"),(Some(\"1026ValenciaSt\"),4.9)),(Store(\"Philz\"),(Some(\"748VanNessAve\"),4.8)),(Store(\"Philz\"),(Some(\"310124thSt\"),4.8))}

SortingData

Havingsorteddataisquiteusefulinmanycases,especiallywhenproducingdownstreamoutput.WecansortanRDDwithkeyvaluepairsprovidedthatthereisanorderingdefinedonthekey.Oncewehavesortedourdataanysubsequentcallonthesorteddatatocollectorsavewillresultinordereddata.

SinceweoftenwantourRDDsinthereverseorder,thesortByKeyfunctiontakesaparametercalledascendingindicatingifwewantitinascendingorder(defaultstotrue).Sometimeswewantadifferentsortorderentirely,andtosupportthiswecanprovideourowncomparison

58functionherewewillsortourRDDbyconvertingtheintegerstostringsandusingthestringcomparisonfunctions.

Example4-19.CustomsortorderinPythonsortingintegersasifstringsrdd.sortByKey(ascending=True,numPartitions=None,keyfunc=lambdax:str(x))

Example4-20.CustomsortorderinScalasortingintegersasifstringsvalinput:RDD[(Int,Venue)]=...

implicitvalsortIntegersByString=newOrdering[Int]{

overridedefcompare(a:Int,b:Int)=a.toString.compare(b.toString)}

rdd.sortByKey()

Example4-21.CustomsortorderinJavasortingintegersasifstringsclassIntegerComparatorimplementsComparator{publicintcompare(Integera,Integerb){

returnString.valueOf(a).compareTo(String.valueOf(b))}}

rdd.sortByKey(comp)

ThefollowingtablessummarizetransformationsonpairRDDs.

Table4-1.TransformationsononePairRDD(example({(1,2),(3,4),(3,6)}))FunctionNamePurposeExampleResultcombineByKey(createCombiner,mergeValue,mergeCombiners,partitioner)groupByKey()

CombinevalueswiththesamekeytogetherGrouptogethervalueswiththesamekey

CombinevalueswiththesamekeytogetherApplyafunctiontoeachvalueofaPairRDDwithoutchangingthekeyApplyafunctionwhichreturnsaniteratortoeach

Seecombinebykeyexample{(1,[2]),

rdd.groupByKey()

(3,[4,6])}

rdd.reduceByKey((x,{(1,2),y)=>x+y)

rdd.mapValues(x=>x+1)

(3,10)}{(1,3),(3,5),(3,7)}{(1,2),

rdd.flatMapValues(x=>x.to(5))

(1,3),(1,4),(1,5),(3,4),

reduceByKey(func)

mapValues(func)

flatMapValues(func)valueofaPairRDDandforeachelementreturnedproduceakey-valueentry

59FunctionNamePurposeExampleResultwiththeoldkey.Oftenusedfortokenization.

keys()values()

ReturnanRDDofjustthekeys

ReturnanRDDofjustthevalues

ReturnsanRDDsortedbythekey

rdd.keys()rdd.values()

(3,5)}

{1,3,3}{2,4,6}{(1,2),

sortByKey()rdd.sortByKey()(3,4),(3,6)}

Table4-2.TransformationsontwoPairRDD(example({(1,2),(3,4),(3,6)})other({(3,9)})FunctionNamePurposeExampleResultsubtractByKeyjoin

RemoveelementswithakeypresentintheotherRDDPerformaninnerjoinbetweentwoRDDsPerformajoinbetweentwo

rdd.subtractByKey(other){1,2}rdd.join(other)

{(3,(4,9)),(3,(6,9))}{(3,(Some(4),9)),(3,(Some(6),9))}{(1,(2,None)),

rdd.leftOuterJoin(other)(3,(4,Some(9))),

(3,(6,Some(9)))}{(1,([2],[])),(3,([4,6],[9]))}

rightOuterJoinRDDSwherethekeymust

bepresentinthefirstRDD.Performajoinbetweentwo

leftOuterJoinRDDSwherethekeymust

bepresentintheotherRDD.Grouptogetherdatafrom

cogroup

rdd.rightOuterJoin(other)

bothRDDssharingthesamerdd.cogroup(other)key

ActionsAvailableonPairRDDs

Likewiththetransformations,allofthetraditionalactionsavailableonthebaseRDDarealsoavailableonPairRDDs.SomeadditionalactionsareavailableonPairRDDswhichtakeadvantageofthekey-valuenatureofthedata.

60Table4-3.ActionsonPairRDDs(example({(1,2),(3,4),(3,6)}))countByKey()collectAsMap()lookup(key)

Countthenumberofelementsforeachkeyrdd.countByKey()rdd.collectAsMap()rdd.lookup(3)

{(1,1),(3,2)}Map{(1,2),(3,4),(3,6)}[4,6]

Collecttheresultasamaptoprovideeasylookup

Returnallvaluesassociatedwiththeprovidedkey

TherearealsomultipleotheractionsonPairRDDsthatsavetheRDD,whichwewillexamineinthenextchapter.

DataPartitioning

ThefinalSparkfeaturewewilldiscussinthischapterishowtocontroldatasets’partitioningacrossnodes.Inadistributedprogram,communicationisveryexpensive,solayingoutdatatominimizenetworktrafficcangreatlyimproveperformance.Muchlikehowasingle-nodeprogramneedstochoosetherightdatastructureforacollectionofrecords,SparkprogramscanchoosetocontroltheirRDDs’partitioningtoreducecommunication.Partitioningwillnotbehelpfulinallapplications—forexample,ifagivenRDDisonlyscannedonce,thereisnopointinpartitioningitinadvance.Itisonlyusefulwhenadatasetisreusedmultipletimesinkey-orientedoperationssuchasjoins.Wewillgivesomeexamplesbelow.

Spark’spartitioningisavailableonallRDDsofkey-valuepairs,andcausesthesystemtogrouptogetherelementsbasedonafunctionofeachkey.AlthoughSparkdoesnotgiveexplicitcontrolofwhichworkernodeeachkeygoesto(partlybecausethesystemisdesignedtoworkevenifspecificnodesfail),itletstheprogramensurethatasetofkeyswillappeartogetheronsomenode.Forexample,onemightchoosetohash-partitionanRDDinto100partitionssothatkeysthathavethesamehashvaluemodulo100appearonthesamenode.Oronemightrange-partitiontheRDDintosortedrangesofkeyssothatelementswithkeysinthesamerangeappearonthesamenode.

Asasimpleexample,consideranapplicationthatkeepsalargetableofuserinformationinmemory—say,anRDDof(UserID,UserInfo)pairswhereUserInfocontainsalistoftopicstheuserissubscribedto.Theapplicationperiodicallycombinesthistablewithasmallerfilerepresentingeventsthathappenedinthepastfiveminutes—say,atableof(UserID,LinkInfo)pairsforuserswhohaveclickedalinkonawebsiteinthosefiveminutes.Forexample,wemaywishtocounthowmanyusersvisitedalinkthatwasnottooneoftheirsubscribedtopics.WecanperformthiscombiningwithSpark’sjoinoperation,whichcanbeusedtogrouptheUserInfoandLinkInfopairsforeachUserIDbykey.Ourapplicationwouldlooklikethis:

61//Initializationcode;weloadtheuserinfofromaHadoopSequenceFileonHDFS.//ThisdistributeselementsofuserDatabytheHDFSblockwheretheyarefound,//anddoesn'tprovideSparkwithanywayofknowinginwhichpartitiona//particularUserIDislocated.valsc=newSparkContext(...)

valuserData=sc.sequenceFile[UserID,UserInfo](\"hdfs://...\").persist()

//Functioncalledperiodicallytoprocessalogfileofeventsinthepast5minutes;//weassumethatthisisaSequenceFilecontaining(UserID,LinkInfo)pairs.defprocessNewLogs(logFileName:String){

valevents=sc.sequenceFile[UserID,LinkInfo](logFileName)

valjoined=userData.join(events)//RDDof(UserID,(UserInfo,LinkInfo))pairsvaloffTopicVisits=joined.filter{

case(userId,(userInfo,linkInfo))=>//Expandthetupleintoitscomponents!userInfo.topics.contains(linkInfo.topic)}.count()

println(\"Numberofvisitstonon-subscribedtopics:\"+offTopicVisits)}

Thiscodewillrunfineasis,butitwillbeinefficient.Thisisbecausethejoinoperation,calledeachtimethatprocessNewLogsisinvoked,doesnotknowanythingabouthowthekeysarepartitionedinthedatasets.Bydefault,thisoperationwillhashallthekeysofbothdatasets,sendingelementswiththesamekeyhashacrossthenetworktothesamemachine,andthenjoinonthatmachinetheelementswiththesamekey(tocome).BecauseweexpecttheuserDatatabletobemuchlargerthanthesmalllogofeventsseeneveryfiveminutes,thiswastesalotofwork:theuserDatatableishashedandshuffledacrossthenetworkoneverycall,eventhoughitdoesn’tchange.

Fixingthisissimple:justusethepartitionBytransformationonuserDatatohash-partitionitatthestartoftheprogram.Wedothisbypassingaspark.HashPartitionerobjecttopartitionBy:valsc=newSparkContext(...)

valuserData=sc.sequenceFile[UserID,UserInfo](\"hdfs://...\")

.partitionBy(newHashPartitioner(100)).persist()

TheprocessNewLogsmethodcanremainunchanged—theeventsRDDislocaltoprocessNewLogs,andisonlyusedoncewithinthismethod,sothereisnoadvantageinspecifyingapartitionerforevents.BecausewecalledpartitionBywhenbuildinguserData,Sparkwillnowknowthatitishash-partitioned,andcallstojoinonitwilltakeadvantageofthis

62//Create100partitions

information.Inparticular,whenwecalluserData.join(events),SparkwillonlyshuffletheeventsRDD,sendingeventswitheachparticularUserIDtothemachinethatcontainsthecorrespondinghashpartitionofuserData(tocome).Theresultisthatalotlessdataiscommunicatedoverthenetwork,andtheprogramrunssignificantlyfaster.

NotethatpartitionByisatransformation,soitalwaysreturnsanewRDD—itdoesnotchangetheoriginalRDDin-place.RDDscanneverbemodifiedoncecreated.ThereforeitisimportanttopersistandsaveasuserDatatheresultofpartitionBy,nottheoriginalsequenceFile.Also,the100passedtopartitionByrepresentsthenumberofpartitions,whichwillcontrolhowmanyparalleltasksperformfurtheroperationsontheRDD(e.g.,joins);ingeneral,makethisatleastaslargeasthenumberofcoresinyourcluster.

Warning

FailuretopersistanRDDafterithasbeentransformedwithpartitionBywillcausesubsequentusesoftheRDDtorepeatthepartitioningofthedata.Withoutpersistence,useofthepartitionedRDDwillcausere-evaluationoftheRDDscompletelineage.ThatwouldnegatetheadvantageofpartitionBy,resultinginrepeatedpartitioningandshufflingofdataacrossthenetwork,similartowhatoccurswithoutanyspecifiedpartitioner.

Note

WhenusingaHashPartitioner,specifyanumberofhashbucketsatleastaslargeasthenumberofcoresinyourclusterinordertoachieveappropriateparallelism.

Infact,manyotherSparkoperationsautomaticallyresultinanRDDwithknownpartitioninginformation;andmanyoperationsotherthanjoinwilltakeadvantageofthisinformation.Forexample,sortByKeyandgroupByKeywillresultinrange-partitionedandhash-partitionedRDDs,respectively.Ontheotherhand,operationslikemapcausethenewRDDtoforgettheparent’spartitioninginformation,becausesuchoperationscouldtheoreticallymodifythekeyofeachrecord.ThenextfewsectionsdescribehowtodeterminehowanRDDispartitioned,andexactlyhowpartitioningaffectsthevariousSparkoperations.

PartitioninginJavaandPython

Spark’sJavaandPythonAPIsbenefitfrompartitioningthesamewayastheScalaAPI.However,inPython,youcannotpassaHashPartitionerobjecttopartitionBy;instead,youjustpassthenumberofpartitionsdesired(e.g.,rdd.partitionBy(100)).

63DetermininganRDD’sPartitioner

InScalaandJava,youcandeterminehowanRDDispartitionedusingitspartitionerproperty(orpartitioner()methodinJava).[9]Thisreturnsascala.Optionobject,whichisaScalaclassforacontainerthatmayormaynotcontainoneitem.(ItisconsideredgoodpracticeinScalatouseOptionforfieldsthatmaynotbepresent,insteadofsettingafieldtonullifavalueisnotpresent,runningtheriskofanull-pointerexceptioniftheprogramsubsequentlytriestousethenullasifitwereanactual,presentvalue.)YoucancallisDefined()ontheOptiontocheckwhetherithasavalue,andget()togetthisvalue.Ifpresent,thevaluewillbeaspark.Partitionerobject.ThisisessentiallyafunctiontellingtheRDDwhichpartitioneachkeygoesinto—moreaboutthislater.ThepartitionerpropertyisagreatwaytotestintheSparkshellhowdifferentSparkoperationsaffectpartitioning,andtocheckthattheoperationsyouwanttodoinyourprogramwillyieldtherightresult.Forexample:

scala>valpairs=sc.parallelize(List((1,1),(2,2),(3,3)))

pairs:spark.RDD[(Int,Int)]=ParallelCollectionRDD[0]atparallelizeat:12scala>pairs.partitioner

res0:Option[spark.Partitioner]=None

scala>valpartitioned=pairs.partitionBy(newspark.HashPartitioner(2))

partitioned:spark.RDD[(Int,Int)]=ShuffledRDD[1]atpartitionByat:14scala>partitioned.partitioner

res1:Option[spark.Partitioner]=Some(spark.HashPartitioner@5147788d)

Inthisshortsession,wecreatedanRDDof(Int,Int)pairs,whichinitiallyhavenopartitioninginformation(anOptionwithvalueNone).WethencreatedasecondRDDbyhash-partitioningthefirst.Ifweactuallywantedtousepartitionedinfurtheroperations,thenweshouldhaveappended.cache()tothethirdlineofinput,inwhichpartitionedisdefined.ThisisforthesamereasonthatweneededcacheforuserDatainthepreviousexample:withoutcache,subsequentRDDactionswillevaluatetheentirelineageofpartitioned,whichwillcausepairstobehash-partitionedoverandover.

OperationsthatBenefitfromPartitioning

ManyofSpark’soperationsinvolveshufflingdatabykeyacrossthenetwork.Allofthesewillbenefitfrompartitioning.AsofSpark1.0,theoperationsthatbenefitfrompartitioningare:

cogroup,groupWith,join,leftOuterJoin,rightOuterJoin,groupByKey,reduceByKey,

combineByKey,andlookup.

ForoperationsthatactonasingleRDD,suchasreduceByKey,runningonapre-partitionedRDDwillcauseallthevaluesforeachkeytobecomputedlocallyonasinglemachine,requiringonlythefinal,locally-reducedvaluetobesentfromeachworkernodebacktothemaster.Forbinaryoperations,suchascogroupandjoin,pre-partitioningwillcauseatleastoneoftheRDDs(theonewiththeknownpartitioner)tonotbeshuffled.IfbothRDDshavethesamepartitioner,andiftheyarecachedonthesamemachines(e.g.onewascreatedusingmapValuesontheother,whichpreserveskeysandpartitioning)orifoneofthemhasnotyetbeencomputed,thennoshufflingacrossthenetworkwilloccur.

OperationsthatAffectPartitioning

Sparkknowsinternallyhoweachofitsoperationsaffectspartitioning,andautomaticallysetsthepartitioneronRDDscreatedbyoperationsthatpartitionthedata.Forexample,supposeyoucalledjointojointwoRDDs;becausetheelementswiththesamekeyhavebeenhashedtothesamemachine,Sparkknowsthattheresultishash-partitioned,andoperationslikereduceByKeyonthejoinresultaregoingtobesignificantlyfaster.

Theflip-side,however,isthatfortransformationsthatcannotbeguaranteedtoproduceaknownpartitioning,theoutputRDDwillnothaveapartitionerset.Forexample,ifyoucallmaponahash-partitionedRDDofkey-valuepairs,thefunctionpassedtomapcanintheorychangethekeyofeachelement,sotheresultwillnothaveapartitioner.Sparkdoesnotanalyzeyourfunctionstocheckwhethertheyretainthekey.Instead,itprovidestwootheroperations,mapValuesandflatMapValues,whichguaranteethateachtuple’skeyremainsthesame.

Allthatsaid,herearealltheoperationsthatresultinapartitionerbeingsetontheoutputRDD:cogroup,

groupWith,

join,

leftOuterJoin,

rightOuterJoin,

groupByKey,

reduceByKey,

combineByKey,partitionBy,sort,mapValues(iftheparentRDDhasapartitioner),flatMapValues(ifparenthasapartitioner),andfilter(ifparenthasapartitioner).Allotheroperationswillproducearesultwithnopartitioner.

Thatthereisapartitionerdoesnotanswerthequestionofwhattheoutputpartitionerisforbinaryoperationssuchasjoin.Bydefault,itisahashpartitioner,withthenumberofpartitionssettothelevelofparallelismoftheoperation.However,ifoneoftheparentshasapartitionerobject,itwillbethatpartitioner;andifbothparentshaveapartitionerset,itwillbethepartitionerofthefirstparent(theonethatjoinwascalledon,forexample).

65Example:PageRank

AsanexampleofamoreinvolvedalgorithmthatcanbenefitfromRDDpartitioning,weconsiderPageRank.ThePageRankalgorithm,namedafterGoogle’sLarryPage,aimstoassignameasureofimportance(a“rank”)toeachdocumentinasetbasedonhowmanydocumentshavelinkstoit.Itcanbeusedtorankwebpages,ofcourse,butalsoscientificarticles,orinfluentialusersinasocialnetwork(bytreatingeachuserasa“document”andeachfriendrelationshipasa“link”).

PageRankisaniterativealgorithmthatperformsmanyjoins,soitisagoodusecaseforRDDpartitioning.Thealgorithmmaintainstwodatasets:oneof(pageID,linkList)elementscontainingthelistofneighborsofeachpage,andoneof(pageID,rank)elementscontainingthecurrentrankforeachpage.Itproceedsasfollows:

1.Initializeeachpage’srankto1.02.Oneachiteration,havepagepsendacontributionofrank(p)/numNeighbors(p)toitsneighbors(thepagesithaslinksto).3.Seteachpage’srankto0.15+0.85*contributionsReceived.Thelasttwostepsrepeatforseveraliterations,duringwhichthealgorithmwillconvergetothecorrectPageRankvalueforeachpage.Asasimplesolution,it’stypicallyenoughtorunaboutteniterationsanddeclaretheresultingrankstobethePageRankvalues.HereisthecodetoimplementPageRankinSpark:valsc=newSparkContext(...)

//AssumethatourneighborlistwassavedasaSparkobjectFilevallinks=sc.objectFile[(String,Seq[String])](\"links\")

.partitionBy(newHashPartitioner(100)).persist()

//Initializeeachpage'srankto1.0;sinceweusemapValues,theresultingRDD//willhavethesamepartitioneraslinksvarranks=links.mapValues(_=>1.0)//Run10iterationsofPageRankfor(i<-0until10){

valcontributions=links.join(ranks).flatMap{case(pageId,(links,rank))=>

66links.map(dest=>(dest,rank/links.size))}

ranks=contributions.reduceByKey(_+_).mapValues(0.15+0.85*_)}

//Writeoutthefinalranksranks.saveAsTextFile(\"ranks\")

That’sit!ThealgorithmstartswitharanksRDDinitializedat1.0foreachelement,andkeepsupdatingtheranksvariableoneachiteration.ThebodyofPageRankisprettysimpletoexpressinSpark:itfirstdoesajoinbetweenthecurrentranksRDDandthestaticlinksone,inordertoobtainthelinklistandrankforeachpageIDtogether,thenusesthisinaflatMaptocreate“contribution”valuestosendtoeachofthepage’sneighbors.WethenaddupthesevaluesbypageID(i.e.bythepagereceivingthecontribution)andsetthatpage’srankto0.15+0.85*contributionsReceived.

Althoughthecodeitselfissimple,theexampledoesseveralthingstoensurethattheRDDsarepartitionedinanefficientway,andtominimizecommunication:

1.NoticethatthelinksRDDisjoinedagainstranksoneachiteration.Sincelinksisastaticdataset,wepartitionitatthestartwithpartitionBy,sothatitdoesnotneedtobeshuffledacrossthenetwork.Inpractice,thelinksRDDisalsolikelytobemuchlargerintermsofbytesthanranks,sinceitcontainsalistofneighborsforeachpageIDinsteadofjustaDouble,sothisoptimizationsavesconsiderablenetworktrafficoverasimpleimplementationofPageRank(e.g.inplainMapReduce).2.Forthesamereason,wecallpersistonlinkstokeepitinRAMacrossiterations.3.Whenwefirstcreateranks,weusemapValuesinsteadofmaptopreservethepartitioningoftheparentRDD(links),sothatourfirstjoinagainstitisverycheap.4.Intheloopbody,wefollowourreduceByKeywithmapValues;becausetheresultofreduceByKeyisalreadyhash-partitioned,thiswillmakeitmoreefficienttojointhemappedresultagainstlinksonthenextiteration.Finally,notealsothattheextrasyntaxfromusingpartitioningissmall,andmapValuesinparticularismoreconciseintheplacesit’susedherethanamap.

Note

Tomaximizethepotentialforpartitioning-relatedoptimizations,youshouldusemapValuesorflatMapValueswheneveryouarenotchanginganelement’skey.

67CustomPartitioners

WhileSpark’sHashPartitionerandRangePartitionerarewell-suitedtomanyusecases,SparkalsoallowsyoutotunehowanRDDispartitionedbyprovidingacustomPartitionerobject.Thiscanbeusedtofurtherreducecommunicationbytakingadvantageofdomain-specificknowledge.

Forexample,supposewewantedtorunthePageRankalgorithmintheprevioussectiononasetofwebpages.Hereeachpage’sID(thekeyinourRDD)willbeitsURL.Usingasimplehashfunctiontodothepartitioning,pageswithsimilarURLs(e.g.,http://www.cnn.com/WORLDandhttp://www.cnn.com/US)mightbehashedtocompletelydifferentnodes.However,weknowthatwebpageswithinthesamedomaintendtolinktoeachotheralot.BecausePageRankneedstosendamessagefromeachpagetoeachofitsneighborsoneachiteration,ithelpstogroupthesepagesintothesamepartition.WecandothiswithacustomPartitionerthatlooksatjustthedomainnameinsteadofthewholeURL.

Toimplementacustompartitioner,youneedtosubclassthespark.Partitionerclassandimplementthreemethods:

numPartitions:Int,whichreturnsthenumberofpartitionsyouwillcreategetPartition(key:Any):Int,whichreturnsthepartitionID(0tonumPartitions-1)foragivenkeyequals,thestandardJavaequalitymethod.ThisisimportanttoimplementbecauseSparkwillneedtotestyourPartitionerobjectagainstotherinstancesofitselfwhenitdecideswhethertwoofyourRDDsarepartitionedinthesameway!Onegotchaisthat,ifyourelyonJava’shashCodemethodinyouralgorithm,itcanreturnnegativenumbers.YouneedtobecarefulthatgetPartitionalwaysreturnsanon-negativeresult.Forexample,hereishowwewouldwritethedomain-namebasedpartitionersketchedabove,whichhashesonlythedomainnameofeachURL:

classDomainNamePartitioner(numParts:Int)extendsPartitioner{overridedefnumPartitions:Int=numPartsoverridedefgetPartition(key:Any):Int={

valdomain=newjava.net.URL(key.toString).getHost()valcode=(domain.hashCode%numPartitions)if(code<0){

code+numPartitions//Makeitnon-negative}else{

68code}}

//JavaequalsmethodtoletSparkcompareourPartitionerobjectsoverridedefequals(other:Any):Boolean=othermatch{casednp:DomainNamePartitioner=>dnp.numPartitions==numPartitionscase_=>false}}

Notethatintheequalsmethod,weusedScala’spatternmatchingoperator(match)totestwhetherotherisaDomainNamePartitioner,andcastitifso;thisisthesameasusinginstanceofinJava.

UsingacustomPartitioneriseasy:justpassittothepartitionBymethod.Manyoftheshuffle-basedmethodsinSpark,suchasjoinandgroupByKey,canalsotakeanoptionalPartitionerobjectto+controlthepartitioningoftheoutput.

CreatingacustomPartitionerinJavaisverysimilartoScala:justextendthespark.Partitionerclassandimplementtherequiredmethods.

InPython,youdonotextendaPartitionerclass,butinsteadpassahashfunctionasanadditionalargumenttoRDD.partitionBy().Forexample:importurlparsedefhash_domain(url):

returnhash(urlparse.urlparse(url).netloc)rdd.partitionBy(20,hash_domain)

#Create20partitions

NotethatthehashfunctionyoupasswillbecomparedbyidentitytothatofotherRDDs.IfyouwanttopartitionmultipleRDDswiththesamepartitioner,passthesamefunctionobject(e.g.,aglobalfunction)insteadofcreatinganewlambdaforeachone!

69Conclusion

Inthischapter,wehaveseenhowtoworkwithkeyeddatausingthespecializedfunctionsavailableinSpark.ThetechniquesfromthepreviouschapteronProgrammingwithRDDsalsostillworkonourPairRDDs.Inthenextchapter,wewilllookathowtoloadandsavedata.

[9]ThePythonAPIdoesnotyetofferawaytoquerypartitioners,thoughitstillusesthem

internally.

70Chapter5.LoadingandSavingYourData

Bothengineersanddatascientistswillfindpartsofthischapteruseful.EngineersmaywishtoexploremoreoutputformatstoseeifthereissomethingwellsuitedtotheirintendeddownstreamconsumeravailableandshouldconsiderlookingonlinefordifferentHadoopformats.DataScientistscanlikelyfocusontheinputformatthattheirdataisalreadyin.SparksupportsreadingfromclassesthatimplementHadoop’sInputFormatandwritingtoHadoop’sOutputFormatinterfaces.

Motivation

We’velookedatanumberofoperationswecanperformonourdataoncewehaveitdistributedinSpark.Sofarourexampleshaveloadedandsavedalloftheirdatafromanativecollectionandregularfiles,butoddsarethatyourdatadoesn’tfitonasinglemachine,soitstimetoexploreouroptions.

Inadditiontodifferentformats,wealsohavedifferentcompressionoptionswhichcandecreasetheamountofspaceandnetworkoverheadrequiredbutcanintroducerestrictionsonhowwereadthedata.TherearemyriadofdifferentdatasourcesandformatswecanusetocreateanewRDDandwewillonlycoverthemostcommonhere.

Note

InSpark1.0,thestandarddataAPIsforPythononlysupporttextdata.However,youcanworkaroundthisbyusingSparkSQL,whichallowsloadingdatafromanumberofformatssupportedbyHive.WewillcoverSparkSQLinalaterchapter,butbrieflyillustratehowtoloaddatafromHiveinthischaptertoo.

ChoosingaFormat

Oftenwhenweneeddataforourcodewedon’tgettochoosetheformat.Sometimesweareluckyenoughtobeabletochoosefrommultipleformatsorworkwithourupstreamdataproviders.Wewillstartwithsomecommonfileformatswhichwecanwritetoanumberofdifferentfilesystems.Inadditiontostandardfileformats,wewillalsoexamineusingdifferentdatabasesystemsforIO.SparkworkswithalltheformatsimplementingHadoop’sInputFormatandOutputFormatinterfaces.

71Table5-1.CommonSupportedFileFormatsFormatnametextfiles

Splittableyes

Structured

no

comments

Plainoldtextfiles.Splittableprovidedrecordsareoneperline.Commontextbasedformat,are

semi-structured,splittableifonerecordperline.

Verycommontextbasedformat,oftenusedwithspreadsheetapplications.AcommonHadoopfileformatusedforkey-valuedata.

Afastspace-efficientmulti-languageformat.UsefulforsavingdatafromaSparkjobtobe

JSONyessemi

CSVSequencefilesProtocolBuffersObjectFiles

yesyesyes

yesyesyes

yesyes

consumedbysharedcode.Breaksifyouchangeyourclasses,asitreliesonJavaSerialization.

Inadditiontothesefileformats,Sparkcanalsoworkdirectlywithanumberofdatabaseandsearchsystems.WewilllookatMongo,Hive,Cassandra,andElasticsearch,butmanymorearesupportedthroughthestandardHadoopconnectorswithSpark.Thisisconvenientifyourdatahappenstoalreadybeinoneofthesesystemsasnightlydumpsofdatacanbedifficulttomaintain.WhenabouttorunanewSparkjobagainstanon-linesystemyoushouldverifyyouhavesufficientcapacityforapotentiallyveryhighvolumeofqueries.

Formats

Sparkmakesitverysimpletoloadandsavedatainalargenumberofformats.Formatsrangefromunstructured,liketext,tosemi-structured,likeJSON,andtostructuredlikeSequenceFiles.TheinputformatsthatSparkwrapsalltransparentlyhandlecompressedformatsbasedonthefileextension.

InadditiontotheoutputmechanismsupporteddirectlyinSpark,wecanusebothHadoop’snewandoldfileAPIsforkeyed(orpaired)data.ThisrestrictionexistsbecausetheHadoopinterfacesrequirekey-valuedata,althoughsomeformatsignorethekey.

72TextFiles

TextfilesareverysimpletoloadfromandsavetowithSpark.WhenweloadasingletextfileasanRDDeachinputlinebecomesanelementintheRDD.WecanalsoloadmultiplewholetextfilesatthesametimeintoaPairRDDwiththekeybeingthenameandthevaluebeingthecontentsofeachfile.

LoadingasingletextfileisassimpleascallingthetextFilefunctiononoursparkcontextwiththepathtothefile.IfwewanttocontrolthenumberofpartitionsthatthewecanalsospecifyminPartitions.

Example5-1.Pythonloadtextfileexampleinput=sc.textFile(\"file:///home/holden/repos/spark/README.md\")

Example5-2.Scalaloadtextfileexamplevalinput=sc.textFile(\"file:///home/holden/repos/spark/README.md\")

Example5-3.JavaloadtextfileexampleJavaRDDinput=sc.textFile(\"file:///home/holden/repos/spark/README.md\")Multi-partinputsintheformofadirectorycontainingallofthepartscanbehandledintwoways.WecanjustusesametextFilemethodandpassitadirectoryanditwillloadallofthepartsintoourRDD.Sometimesitsimportanttoknowwhichfilewhichpieceofinputcamefrom(suchastimedatawiththekeyinthefile)orweneedtoprocessanentirefileatatime.IfourfilesaresmallenoughthenwecanusethewholeTextFilesmethodandgetbackaPairRDDwherethekeyisthenameoftheinputfile.

wholeTextFilescanbeveryusefulwheneachfilerepresentsacertaintimeperiodsdata.Ifwehadfilesrepresentingsalesdatafromdifferentperiodswecouldeasilycomputetheaverageforeachperiod.

Example5-4.Scalaaveragevalueperfilevalinput=sc.wholeTextFiles(\"file://home/holden/happypanda\")valresult=input.mapValues{y=>

valnums=y.split(\"\").map(x=>x.toDouble)nums.sum/nums.size.toDouble}

Tip

Sparksupportsreadingallthefilesinagivendirectoryanddoingwildcardexpansionontheinput(e.g.part-*.txt).Thisisusefulsincelargedatasetsareoftenspreadacrossmultiplefiles.

73Outputtingtextfilesisalsoquitesimple.ThemethodsaveAsTextFiletakesapathandwilloutputthecontentsoftheRDDtothatfile.ThepathistreatedasadirectoryandSparkwilloutputmultiplefilesunderneaththatdirectory.ThisallowsSparktowritetheoutputfrommultiplenodes.Withthismethodwedon’tgettocontrolwhichfilesendupwithwhichsegmentsofourdatabutthereareotheroutputformatswhichdoallowthis.

Example5-5.Scalasaveastextfileexampleresult.saveAsTextFile(outputFile)

LoadingandsavingtextfilesisimplementedthroughwrappersaroundHadoop’sfileAPIs.IfyouwantfurtherexamplesofhowtoworkwiththeHadoopfileAPIsyoucanseehowthisisimplementedinSparkContextandRDDrespectively.

JSON

JSONisapopularsemi-structureddataformat.ThesimplestwaytoloadJSONdataisbyloadingthedataasatextfileandthenmappingoverthevalueswithaJSONparser.Likewise,wecanuseourpreferredJSONserializationlibrarytowriteoutthevaluestostringswhichwecanthenwriteout.InJavaandScalawecanalsoworkwithJSONdatausingacustomHadoopformat.

LoadingthedataasatextfileandthenparsingtheJSONdataisanapproachthatwecanuseinallofthesupportedlanguages.ThisworksassumingthatyouhaveoneJSONrecordper-row,ifyouhavemulti-lineJSONfilesyouwillinsteadhavetoloadthewholefileandthenparseeachfile.IfconstructingaJSONparserisexpensiveinyourlanguage,youcanusemapPartitionstore-usetheparser.

ThereareawidevarietyofJSONlibrariesavailableforthethreelanguageswearelookingat,forsimplicitiessakeweareonlyconsideringonelibraryperlanguage.InJavawewilluseJackson,inScalaweareusingliftweb-json,andinpythonweusethebuiltinlibrary.Theselibrarieshavebeenchosenastheyperformreasonablewellandarealsorelativelysimple,ifyouspendalotoftimeintheparsingstageyoumaywishtolookatotherJSONlibrariesforScalaorforJava.

Example5-6.PythonloadJSONexampledata=input.map(lambdax:json.loads(x))

Example5-7.ScalaloadJSONexampleimportplay.api.libs.json._

importplay.api.libs.functional.syntax._...

valparsed=input.map(Json.parse(_))

74IfyourJSONdatahappenstofollowapredictableschema(luckyyou!),wecanparseitintoamorestructuredformat.Thisisoftenwherewewillfindinvalidrecords,sowemaywishtoskipthem.

Example5-8.ScalaloadJSONexamplecaseclassPerson(name:String,lovesPandas:Boolean)implicitvalpersonReads=Json.format[Person]

//WeuseasOptcombinedwithflatMapsothatifitfailstoparsewe//getbackaNoneandtheflatMapessentiallyskipstheresult.valparsed=input.flatMap(record=>

personReads.read(JSON.parseFull(record)).asOpt)

Example5-9.JavaloadJSONexamplepublicstaticclassParseJsonimplementsFlatMapFunction,Person>{publicIterablecall(Iteratorlines)throwsException{ArrayListpeople=newArrayList();ObjectMappermapper=newObjectMapper();while(lines.hasNext()){Stringline=lines.next();try{

people.add(mapper.readValue(line,Person.class));}catch(Exceptione){//skiprecordsonfailure}}

returnpeople;}}

Tip

Handlingincorrectlyformattedrecordscanbeabigproblem,especiallywithsemi-structureddatalikeJSON.Withsmalldatasetsitcanbeacceptabletostoptheworld(i.e.failtheprogram)onmalformedinput,butoftenwithlargedatasetsmalformedinputissimplyapartoflife.Ifyoudochoosetoskipincorrectlyformatted(orattempttorecover)incorrectlyformatteddatayoumaywishtolookatusingaccumulatorstokeeptrackofthenumberoferrors.

WritingoutJSONfilesismuchsimplercomparedtoloadingitaswedon’thavetoworryaboutincorrectlyformatteddataandweknowthetypeofthedatathatwearewritingout.WecanusethesamelibrariessweusedtoconvertourRDDofstringsintoparsedJSONdataandinstead

75takeourRDDofstructureddataandconvertitintoanRDDofstringswhichwecanthenwriteoutusingSpark’stextfileAPI.

Letssaywewererunningapromotionforpeoplethatlovepandas,sowecantakeourinputfromthefirststepandfilteritforthepeoplethatlovepandas.

Example5-10.PythonsaveJSONexample(data.filter(lambdax:x['lovesPandas']).map(lambdax:json.dumps(x)).saveAsTextFile(outputFile))

Example5-11.ScalasaveJSONexampleresult.filter(x=>x.lovesPandas).map(x=>Json.toJson(x)).saveAsTextFile(outputFile)

Example5-12.JavasaveJSONexamplepublicstaticclassWriteJsonimplementsFlatMapFunction,String>{

publicIterablecall(Iteratorpeople)throwsException{ArrayListtext=newArrayList();ObjectMappermapper=newObjectMapper();while(people.hasNext()){Personperson=people.next();

text.add(mapper.writeValueAsString(person));}

returntext;}}

JavaRDDresult=input.mapPartitions(newParseJson()).filter(newLikesPandas());

JavaRDDformatted=result.mapPartitions(newWriteJson());formatted.saveAsTextFile(outfile);

WecaneasilyloadandsaveJSONdatawithSparkbyusingtheexistingmechanismforworkingwithtextandaddingJSONlibraries.

CSV(CommaSeparatedValues)/TSV(TabSeparatedValues)

CSVfilesaresupposedtocontainafixednumberoffieldsper-line,andthefieldsaremostcommonlyseparatedbycommaortab.Recordsareoftenstoredoneperline,butthisisnotalwaysthecaseasrecordscansometimesspanlines.CSV/TSVfilescansometimesbeinconsistent,mostfrequentlyinrespecttohandlingnewlines,escaping,non-ASCIIcharacters,

76non-integernumbers.CSVscannothandlenestedfieldtypesnatively,sowehavetounpackandpacktospecificfieldsmanually.

UnlikewithJSONfieldseachrecorddoesn’thavefieldnamesassociatedwiththem;insteadwegetbackrownumbers.ItiscommonpracticeinsingleCSVfilestohavethefirstrowscolumnvaluesbethenamesofeachfield.

LoadingCSV/TSVdataissimilartoloadingJSONdatainthatwecanfirstloaditastextandthenprocessit.Thelackofstandardizationleadstodifferentversionsofthesamelibrarysometimeshandlinginputindifferentways.

LikewithJSONtherearemanydifferentCSVlibrariesandwewillonlyuseoneforeachlanguage.InbothScalaandJavaweuseopencsv.OnceagaininPythonweusetheincludedcsvlibrary.

Tip

AswithJSONthereisaHadoopCSVInputFormatthatwecanusetoloadCSVdatainScalaandJava(althoughitdoesnotsupportrecordscontainingnewlines).

IfyourCSVdatahappenstonotcontainnewlinesinanyofthefields,youcanloadyourdatawithtextFileandparseit.

Example5-13.PythonloadCSVexampleimportcsvimportStringIO...

defloadRecord(line):

\"\"\"ParseaCSVline\"\"\"

input=StringIO.StringIO(line)

reader=csv.DictReader(input,fieldnames=[\"name\\"favouriteAnimal\"])returnreader.next()

input=sc.textFile(inputFile).map(loadRecord)

Example5-14.JavaloadCSVexampleimportau.com.bytecode.opencsv.CSVReader;...

publicstaticclassParseLineimplementsFunction{publicString[]call(Stringline)throwsException{

CSVReaderreader=newCSVReader(newStringReader(line));returnreader.readNext();

77}}

JavaRDDcsvFile1=sc.textFile(csv1);

JavaPairRDDcsvData=csvFile1.map(newParseLine());

Example5-15.ScalaloadCSVexampleimportau.com.bytecode.opencsv.CSVReader...

valinput=sc.textFile(inputFile)valresult=input.map{line=>

valreader=newCSVReader(newStringReader(line));reader.readNext();}

Ifthereareembeddednewlinesinfieldswewillneedtoloadeachfileinfullandparsetheentiresegment.Thisisunfortunateasifeachfileislargethiscaneasilyintroducebottlenecksinloadingandparsing.

Example5-16.PythonloadCSVexampledefloadRecords(fileNameContents):

\"\"\"Loadalltherecordsinagivenfile\"\"\"

input=StringIO.StringIO(fileNameContents[1])

reader=csv.DictReader(input,fieldnames=[\"name\\"favoriteAnimal\"])returnreader

fullFileData=sc.wholeTextFiles(inputFile).flatMap(loadRecords)

Example5-17.JavaloadCSVexamplepublicstaticclassParseLineimplementsFlatMapFunction,String[]>{publicIterablecall(Tuple2file)throwsException{CSVReaderreader=newCSVReader(newStringReader(file._2()));returnreader.readAll();}}

JavaPairRDDcsvData=sc.wholeTextFiles(csvInput);JavaRDDkeyedRDD=csvData.flatMap(newParseLine());

Example5-18.ScalaloadCSVexamplecaseclassPerson(name:String,favoriteAnimal:String)valinput=sc.wholeTextFiles(inputFile)valresult=input.flatMap{case(_,txt)=>

valreader=newCSVReader(newStringReader(txt));

78reader.readAll().map(x=>Person(x(0),x(1)))}

Tip

IfthereareonlyafewinputfilesyoumaywanttorepartitionyourinputtoallowSparktoeffectivelyparallelizeyourfutureoperations.

AswithJSONdata,writingoutCSV/TSVdataisquitesimpleandwecanbenefitfromreusingtheoutputencodingobject.SinceinCSVwedon’toutputthefieldnamewitheachrecord,tohaveaconsistentoutputweneedtocreateamapping.Oneoftheeasywaystodothisistojustwriteafunctionwhichconvertsthefieldstogivenpositionsinanarray.InPythonifweareoutputtingdictionariesthecsvwritercandothisforusbasedontheorderweprovidethefeildnameswhenconstructingthewriter.

TheCSVlibrariesweareusingoutputtofiles/writerssowecanuseStringWriter/StringIOtoallowustoputtheresultinourRDD.

Example5-19.PythonwritecsvexampledefwriteRecords(records):

\"\"\"WriteoutCSVlines\"\"\"output=StringIO.StringIO()

writer=csv.DictWriter(output,fieldnames=[\"name\\"favoriteAnimal\"])forrecordinrecords:

writer.writerow(record)return[output.getvalue()]

pandaLovers.mapPartitions(writeRecords).saveAsTextFile(outputFile)

Example5-20.ScalawriteCSVexamplepandaLovers.map(person=>List(person.name,person.favoriteAnimal).toArray).mapPartitions{people=>

valstringWriter=newStringWriter();valcsvWriter=newCSVWriter(stringWriter);csvWriter.writeAll(people.toList)Iterator(stringWriter.toString)}.saveAsTextFile(outFile)

Asyoumayhavenoticedtheaboveonlyworksprovidedthatweknowallofthefieldsthatwewillbeoutputting.However,ifsomeofthefieldnamesaredeterminedatruntimefromuser

79inputweneedtotakeadifferentapproach.Thesimplestapproachisgoingoverallofourdataandextractingthedistinctkeys.

SequenceFiles

SequencefilesareapopularHadoopformatcomprisedofflatfileswithkey-valuepairsandaresupportedinSpark’sJavaandScalaAPIs.SequencefileshavesyncmarkersthatallowSparktoseektoapointinthefileandthenresychronizewiththerecordboundaries.ThisallowsSparktoefficientlyreadSequencefilesinfrommultiplenodesandintomanypartitions.SequenceFilesareacommoninput/outputformatforHadoopMapReducejobsaswellsoifyouareworkingwithanexistingHadoopsystemthereisagoodchangeyourdatawillbeavailableasasequencefile.

SequencefilesconsistofelementswhichimplementHadoop’sWritableinterface,asHadoopusesacustomserializationframework.WehaveaconversiontableofsomecommontypesandtheircorrespondingWritableclass.ThestandardruleofthumbistryaddingthewordWritabletotheendofyourclassnameandseeifitisaknownsubclassoforg.apache.hadoop.io.Writable.Ifyoucan’tfindaWritableforthedatayouaretryingtowriteout(likeforexampleacustomcaseclass),youcangoaheadandimplementyourownWritableclassbyoverridingreadFieldsandwritefromorg.apache.hadoop.io.Writable.

Warning

Hadoop’sRecordReaderre-usesthesameobjectforeachrecord,sodirectlycallingcache,onanRDDyoureadinlikethiscanfail,insteadaddasimplemapoperationandcachetheresultofthemap.Furthermore,manyHadoopWritableclassesdonotimplementjava.io.SerializablesoforthemtoworkinRDDsweneedtoconvertthemwithamapanyways.

Table5-2.CorrespondingHadoopWritableTypesScalaTypeJavaTypeHadoopWritableIntLongFloatDoubleBooleanArray[Byte]StringArray[T]List[T]

IntegerLongFloatDoubleBooleanByte[]StringT[]List

IntWritableorVIntWritable[a]LongWritableorVLongWritable[a]FloatWritableDoubleWritableBooleanWritableBytesWritableText

ArrayWritable[b]ArrayWritable[b]80ScalaTypeJavaTypeHadoopWritableMap[A,B]

[a]MapMapWritable[b]intsandlongsareoftenstoredasafixedsize.Storingthenumber12takesthesameamountofspaceasstoringthenumber2**30.Ifyoumighthavealargenumberofsmallnumbers.Insteadwecanusevariablesizedtypeswhichwilluselessbitstostoresmallernumbers.

[b]The

templatedtypemustalsobeawritabletype.

SparkhasaspecializedAPIforreadinginsequencefiles.OntheSparkContextwecancallsequenceFile(path,keyClass,valueClass,minPartitions).Asmentionedearlier,SequenceFilesworkwithWritableclasses,soourkeyClassandvalueClasswillbothhavetobethecorrectWritableclass.Letsconsiderloadingpeopleandthenumberofpandastheyhaveseenfromasequencefile,inthiscaseourkeyClasswouldbeTextandourvalueClasswouldbeIntWritableorVIntWritable,forsimplicityletsworkwithIntWritable.

Example5-21.Scalaloadsequencefileexamplevaldata=sc.sequenceFile(inFile,classOf[Text],classOf[IntWritable]).map{case(x,y)=>(x.toString,y.get())}

Example5-22.JavaloadsequencefileexamplepublicstaticclassConvertToNativeTypesimplements

PairFunction,String,Integer>{

publicTuple2call(Tuple2record){returnnewTuple2(record._1.toString(),record._2.get());}}

JavaPairRDDinput=sc.sequenceFile(fileName,Text.class,IntWritable.class);

JavaPairRDDresult=input.mapToPair(newConvertToNativeTypes());

Tip

InScalathereisaconveniencefunctionwhichcanautomaticallyconvertWritablestotheircorrespondingScalatype.InsteadofspecifyingthekeyClassandvalueClasswecancallsequenceFile[Key,Class](path,minPartitions)andgetbackanRDDofnativeScalatypes.WritingthedataouttoasequencefileisfairlysimilarinScala.Firstsincesequencefilesarekey-valuepairs,weneedaPairRDDwithtypesthatoursequencefilecanwriteout.ImplicitconversionsbetweenScalatypesandHadoopWritablesexistformanynativetypes,soifyouare

81writingoutanativetypeyoucanjustsaveyourPairRDDbycallingsaveAsSequenceFile(path)anditwillwriteoutthedataforus.Ifthereisn’tanautomaticconversionfromourkeyandvaluetoWritable,orwewanttouseVarIntswecanjustmapoverthedataandconvertitbeforesaving.Letsconsiderwritingoutthedatathatweloadedinthepreviousexample(peopleandhowmanypandastheyhaveseen).

Example5-23.Scalasavesequencefileexamplevaldata=sc.parallelize(List((\"Panda\3),(\"Kay\6),(\"Snail\2)))data.saveAsSequenceFile(outputFile)

InJavasavingasequencefileisslightlymoreinvolved,duetothelackofsaveAsSequenceFilemethodontheJavaPairRDD.InsteadweuseSpark’sabilitytosavetocustomHadoopformatsandwewillshowhowtosavetoasequencefileinjavainthecustomHadoopformatssubsection.

ObjectFiles

ObjectfilesareadeceptivelysimplewrapperaroundsequencefileswhichallowsustosaveourRDDscontainingjustvalues.UnlikewithSequencefiles,thevaluesarewrittenoutusingJavaSerialization.

Warning

Ifyouchangeyourclasses,e.g.,toaddandremovefields,oldobjectfilesmaynolongerbereadable.ObjectfilesuseJavaSerialization,whichhassomesupportformanagingcompatibilityacrossclassversionsbutrequiresprogrammerefforttodoso.

UsingJavaSerializationforobjectfileshasanumberofimplications.Unlikewithnormalsequencefiles,theoutputwillbedifferentthanHadoopoutputtingthesameobjects.Unliketheotherformats,objectfilesaremostlyintendedtobeusedforSparkjobscommunicatingwithotherSparkJobs.JavaSerializationcanalsobequiteslow.

SavinganobjectfileisassimpleascallingsaveAsObjectFileonanRDD.Readinganobjectfilebackisalsoquitesimple,thefunctionobjectFileontheSparkContexttakesinapathandreturnsanRDD.

Withallofthesewarningsaboutobjectfilesyoumightwonderwhyanyonewouldusethem.Theprimaryreasontouseobjectfilesaretheyrequirealmostnoworktosavealmostarbitraryobjects.

82HadoopInputandOutputFormats

InadditiontotheformatsSparkhaswrappersfor,wecanalsointeractwithotherHadoopsupportedformats.SparksupportsboththeoldandnewHadoopfileAPIsprovidingagreatamountofflexibility.

ToreadinafileusingthenewHadoopAPIweneedtotellsparkafewthings.ThenewAPIHadoopFiletakesapath,andthreeclasses.Thefirstclassisthe“format”class,thisistheclassrepresentingourinputformat.Thenextclassistheclassforour,andthefinalclassistheclassofourvalue.IfweneedtospecifyadditionalHadoopconfigurationpropertieswecanalsopassinaconfobject.

OneofthesimplestHadoopinputformatsistheKeyValueTextInputFormatwhichcanbeusedforreadinginkey-valuedatafromtextfiles.Eachlineisprocessedindividuallywiththeykeyandvalueseparatedbyatabcharacter.ThisformatshipswithHadoopsowedon’thavetoaddanyextradependenciestoourprojecttouseit.

Example5-24.ScalaloadKeyValueTextInputFormatvalinput=sc.hadoopFile[Text,Text,KeyValueTextInputFormat](inputFile).map{case(x,y)=>(x.toString,y.toString)}

WelookedatloadingJSONdatabyloadingthedataasatextfileandthenparsingit,butwecanalsoloadJSONdatausingacustomHadoopinputformat.Thisexamplerequiressettingupsomeextrabitsforcompressionsofeelfreetoskipit.Twitter’sElephantBirdpackagesupportsalargenumberofdataformats,includingJSON,Lucene,ProtocolBufferrelatedformats,andsoon.ThepackagealsoworkswithboththenewandoldHadoopfileAPIs.ToillustratehowtoworkwiththenewstyleHadoopAPIsfromSparkletslookatloadingLZOcompressedJSONdatawithLzoJsonInputFormat:

Example5-25.ScalaloadLZOcompressedJSONwithElephantBirdvalinput=sc.newAPIHadoopFile(inputFile,classOf[LzoJsonInputFormat],classOf[LongWritable],classOf[MapWritable],conf)//EachMapWritablein\"input\"representsaJSONobject

Warning

LZOsupportrequiresinstallingthehadoop-lzopackageandpointingSparktoitsnativelibraries.

If

you

install

the

Debian

package,

adding

--driver-library-path

/usr/lib/hadoop/lib/native/--driver-class-path/usr/lib/hadoop/lib/toyourspark-submitinvocationshoulddothetrick.

83ReadingafileusingtheoldHadoopAPIisprettymuchthesamefromausagepointofview,exceptweprovideanoldstyleInputFormatclass.ManyofSpark’sbuiltinconveniencefunctions(likesequenceFile)areimplementedusingtheoldstyleHadoopAPI.

Wealreadyexaminedsequencefilestosomeextent,butinJavawedon’thavethesameconviencefunctionforsavingfromaPairRDD.WewillusethisasawaytoillustratehowtohaveusingtheoldHadoopformatAPIsasthisishowSparkimplementsitshelperfunctionforPairRDDsinscalaandwe’vealreadyshownthenewAPIswiththeJSONexample.

Example5-26.JavasavesequencefileexamplepublicstaticclassConvertToWritableTypesimplementsPairFunction,Text,IntWritable>{

publicTuple2call(Tuple2record){returnnewTuple2(newText(record._1),newIntWritable(record._2));}}

JavaPairRDDrdd=sc.parallelizePairs(input);

JavaPairRDDresult=rdd.mapToPair(newConvertToWritableTypes());result.saveAsHadoopFile(fileName,Text.class,IntWritable.class,SequenceFileOutputFormat.class);

InadditiontothesaveAsHadoopFileandsaveAsNewAPIHadoopFilefunctions,ifyouwantmorecontroloverwritingoutaHadoopformatyoucanusesaveAsHadoopDataset/saveAsNewAPIHadoopDataset.BothfunctionsjusttakeaconfigurationobjectonwhichyouneedtosetalloftheHadoopproperties.TheconfigurationisdonethesameasonewoulddoforconfiguringtheoutputofaHadoopMapReducejob.

ProtocolBuffers

Protocolbuffers[10]werefirstdevelopedatGoogleforinternalRPCsandhavesincebeenopen

sourced.ProtocolBuffers(PB)arestructureddata,withthefieldsandtypesoffieldsbeingclearlydefined.ProtocolBuffersareoptimizedtobefastforencodinganddecodingandalsotakeuptheminimumamountofspace.ComparedtoXMLprotocolbuffersare3xto10xsmallerandcanbe20xto100xfastertoencodeanddecode.WhileaPBhasaconsistentencodingtherearemultiplewaystocreateafileconsistingofmanyPBmessages.

ProtocolBuffersaredefinedusingadomainspecificlanguageandthentheprotocolbuffercompilercanbeusedtogenerateaccessormethodsinavarietyoflanguages(includingallthose

84supportedbySpark).Sinceprotocolbuffersaimtotakeupaminimalamountofspacetheyarenot“self-describing”asencodingthedescriptionofthedatawouldtakeupadditionalspace.ThismeansthattoparsedatawhichisformattedasPBweneedtheprotocolbufferdefinitiontomakesenseofdata.

Protocolbuffersconsistoffieldswhichcanbeeitheroptional,required,orrepeated.Whenparsingdata,amissionoptionalfielddoesnotresultinafailure,butamissingrequiredfieldresultsinfailingtoparsethedata.Assuchwhenaddingnewfieldstoexistingprotocolbuffersitisgoodpracticetomakethenewfieldsoptionalasnoteveryonewillupgradeatthesametime(andeveniftheydoyoumightwanttoreadyourolddata).

Protocolbuffersfieldscanbemanypre-definedtypes,oranotherprotocolbuffermessage.Thesetypesincludestring,int32,enums,andmore.Thisisbynomeanacompleteintroductiontoprotocolbuffers,ifyouareinterestedyoushouldconsulttheprotobufwebsite.ForourexamplewewilllookatloadingmanyVenueResponseobjectsfromoursampleproto.

Example5-27.SampleprotocolbufferdefinitionmessageVenue{requiredint32id=1;requiredstringname=2;requiredVenueTypetype=3;optionalstringaddress=4;enumVenueType{COFFEESHOP=0;WORKPLACE=1;CLUB=2;OMNOMNOM=3;OTHER=4;}}

messageVenueResponse{repeatedVenueresults=1;}

Twitter’sElephantBirdlibrary,thatweusedintheprevioussectiontoloadJSONdata,alsosupportsloadingandsavingdatafromprotocolbuffers.LetslookatwritingoutsomeVenues.

85Example5-28.ScalaElephantBirdProtocolbufferwriteoutexamplevaljob=newJob()

valconf=job.getConfiguration

LzoProtobufBlockOutputFormat.setClassConf(classOf[Places.Venue],conf);valdnaLounge=Places.Venue.newBuilder()dnaLounge.setId(1);

dnaLounge.setName(\"DNALounge\")

dnaLounge.setType(Places.Venue.VenueType.CLUB)valdata=sc.parallelize(List(dnaLounge.build()))valoutputData=data.map{pb=>

valprotoWritable=ProtobufWritable.newInstance(classOf[Places.Venue]);protoWritable.set(pb)(null,protoWritable)}

outputData.saveAsNewAPIHadoopFile(outputFile,classOf[Text],classOf[ProtobufWritable[Places.Venue]],

classOf[LzoProtobufBlockOutputFormat[ProtobufWritable[Places.Venue]]],conf)

Tip

WhenbuildingyourprojectmakesuretousethesameprotocolbufferlibraryversionasSpark,asofthiswritingthatisversion2.5

HiveandParquet

AgreatwaytocreateRDDsfromHiveandParquetisthroughusingSparkSQL.ThisallowsustogetbackstructureddatafromHiveandalsowriteexpressivequeries.Thisapproachalsosupportsallthreeofourlanguages.WewillcoverthisindetailtheSparkSQLChapter,butfornowlet’slookatjustloadingdata.

ToconnectSparkSQLtoanexistingHiveinstallation,youneedtoprovideaHiveconfiguration.Thisisdonebycopyingyourhive-site.xmltoSpark’sconf/directory.Ifyoujustwanttoexplore,alocalHivemetastorewillbeusedifnohive-site.xmlisset,andwecaneasilyloaddataintoaHivetabletoquerylateron.SparkhasanexamplefilewecanloadintoouraHivetable.WhenloadingdatawithSparkSQL,theresultingRDDsconsistofRowobjects.InPythonyoucantreattherowobjectmuchlikeahashmap.

Example5-29.PythonHiveloadexamplefrompyspark.sqlimportHiveContext

86hiveCtx=HiveContext(sc)

rows=hiveCtx.hql(\"SELECTkey,valueFROMsrc\")keys=input.map(lambdarow:row[\"key\"])

InJavaandScalatheRowobjectswegetbackallowaccessbasedonthecolumnnumber.EachRowobjecthasagetmethodthatgivesbackageneraltypewecancast,andspecificgetmethodsforcommonbasictypes(e.g.getFloat,getInt,getLong,getString,getShort,getBoolean).

Example5-30.ScalaHiveloadexampleimportorg.apache.spark.sql.hive.HiveContext

valhiveCtx=neworg.apache.spark.sql.hive.HiveContext(sc)valrows=hiveCtx.hql(\"SELECTkey,valueFROMsrc\")valkeys=input.map(row=>row.getInt(0))

Example5-31.JavaHiveloadexampleimportorg.apache.spark.sql.hive.api.java.JavaHiveContext;importorg.apache.spark.sql.api.java.Row;

importorg.apache.spark.sql.api.java.JavaSchemaRDD;JavaHiveContexthiveCtx=newJavaHiveContext(sc);

JavaSchemaRDDrows=hiveCtx.hql(\"SELECTkey,valueFROMsrc\");JavaRDDkeys=rdd.map(newFunction(){publicIntegercall(Rowrow){returnrow.getInt(0);}});

WhenloadingdatafromHive,SparkSQLsupportsanyHive-supportedstorageformat,includingtextfiles,RCFiles,ORC,Parquet,AvroandProtocolBuffers.

WithoutaHiveinstallation,SparkSQLcanalsodirectlyloaddatafromParquetfiles:

Example5-32.PythonParquetloadexamplefrompyspark.sqlimportSQLContextsqlCtx=SQLContext(sc)

rows=sqlCtx.parquetFile(\"people.parquet\")names=input.map(lambdarow:row[\"name\"])

87FileSystems

Sparksupportsalargenumberoffilesystemsforreadingandwritingtowhichwecanusewithanyofthefileformatswewant.

Local/\"Regular”FS

WhileSparksupportsloadingfilesfromthelocalfilesystem,itcanbesomewhatlessconvenienttoworkwithcomparedtotheotheroptions.Whileitdoesn’trequireanysetup,itrequiresthatthefilesareavailableonallthenodesinyourcluster.

Somenetworkfilesystems,likeNFS,AFS,MapR’sNFSlayer,areexposedtotheuserasaregularfilesystem.Ifyourdataisalreadyinoneoftheseforms,thenyoucanuseitasaninputbyjustspecifyingpathtofileasthepathandSparkwillhandleit.

Example5-33.ScalaloadcompressedtextfilefromlocalFSvalrdd=sc.textFile(\"file:///home/holden/happypandas.gz\")

Tip

Ifthefileisn’talreadyonallnodesinthecluster,youcanloaditlocallyandthencallparallelize.WecanalsouseaddFile(path)todistributethecontentsandthenuseSparkFiles.get(path)inplacewherewewouldnormallyspecifythelocation(e.g.sc.textFile(SparkFiles.get(…))).Bothoftheseapproachescanbeslow,soconsiderifyoucanputyourfilesinHDFS,onS3orsimilar.

AmazonS3

S3isanincreasinglypopularoptionforstoringlargeamountofdata.S3isespeciallyfastwhenourcomputenodesarelocatedinsideofEC2,butcaneasilyhavemuchworseperformanceifwehavetogooverthepublicinternet.

SparkwillchecktheAWS_ACCESS_KEY_IDandAWS_SECRET_ACCESS_KEYenvironmentvariablesforyourS3credentials.

HDFS

TheHadoopDistributedFileSystem(HDFS)isapopulardistributedfilesystemthatSparkworkswellwith.HDFSisdesignedtoworkoncommodityhardwareandberesilienttonodefailurewhileprovidinghighdatathroughput.SparkandHDFScanbecollocatedonthesamemachinesandSparkcantakeadvantageofthisdatalocalitytoavoidnetworkoverhead.

88UsingSparkwithHDFSisassimpleasspecifyinghdfs://master:port/pathforyourinputandoutput.TheDeployingSparkchaptercovershowtosetupsparkforHDFSsystemsrequiringauthentication.

Warning

TheHDFSprotocolhaschanged,ifyourunaversionofSparkwhichiscompiledforadifferentversionitwillfail.BydefaultSparkisbuildagainst1.0.4,ifyoubuildfromsourceyoucanspecifySPARK_HADOOP_VERSION=asaenvironmentvariabletobuildagainstadifferentversionoryoucandownloadadifferentprecompiledversionofSpark.

Compression

Frequentlywhenworkingwithbigdata,wefindourselvesneedingtousecompresseddatatosavestoragespaceandnetworkoverhead.WithmostHadoopoutputformatswecanspecifyacompressioncodecwhichwillcompressthedata.Aswehavealreadyseen,Spark’snativeinputformats(textFileandSequenceFile)canautomaticallyhandlesometypesofcompressionforus.Whenreadingincompresseddata,therearesomecompressioncodecswhichcanbeusedtoautomaticallyguessthecompressiontype.

ThesecompressionoptionsonlyapplytotheHadoopformatswhichsupportcompression,namelythosewhicharewrittenouttoafilesystem.ThedatabaseHadoopformatsgenerallydonotimplementsupportforcompression,oriftheyhavecompressedrecordsitisconfiguredinthedatabaseitsselfratherthanwiththeHadoopconnector.

Choosinganoutputcompressioncodeccanhaveabigimpactonfutureusersofthedata.WithdistributedsystemssuchasSparkwenormallytryandreadourdatainfrommultipledifferencemachines.Tomakethispossibleeachworkerneedstobeabletofindthestartofanewrecord.Somecompressionformatsmakethisimpossible,whichrequiresasinglenodereadinallofthedatawhichcaneasilyleadtoabottleneck.Formatswhichcanbeeasilyreadfrommultiplemachinesarecalled“splittable”.

Table5-3.CompressionOptionsSplittableAverageCompressionSpeedEffectivenessonTextPureJavaFormatHadoopCompressionCodecNativeCommentsgzipNFastHigh

org.apache.hadoop.io.compress.GzipCodeccom.hadoop.compression.lzo.LzoCodec

YY

LZOrequire

lzoY

[a]VeryfastMediumYY

installationoneveryworkernodeUsespure

bzip2YSlowVeryhigh

org.apache.hadoop.io.compress.BZip2Codec

YY

JavaforsplittableversionDefault

zlibNSlowMedium

org.apache.hadoop.io.compress.DefaultCodec

YY

compressioncodecforHadoopThereisapurejavaportofSnappybut

SnappyNVeryFastLow

org.apache.hadoop.io.compress.SnappyCodec

NYitisnotcurrentlyavailableinSpark/Haddop

[a]Dependsonthelibraryused

Warning

WhileSpark’stextFilemethodcanhandlecompressedinput,theyautomaticallydisablesplittableeveniftheinputiscompressedinsuchawaythatitcouldbereadinasplittableway.Ifyoufindyourselfneedingtoreadinalargesinglefilecompressedinput,youshouldconsiderskippingSpark’swrapperandinsteaduseeithernewAPIHadoopFileorhadoopFileandspecifythecorrectcompressioncodec.

Somedataformats(likeSequencefiles)allowustoonlycompressthedataofourkeyvaluedata,whichcanbeusefulfordoinglookups.Otherdataformatshavetheirowncompressioncontrol,

90forexamplemanyoftheformatsin’sElephantBirdpackageworkwithLZOcompresseddata.

SparkwrapsboththeoldandnewstyleAPIsforspecifyingthecompressioncodec.

Ifwedon’tknowwhatthecompressionformatiswhilewritingourcode,wecaninsteadusetheCompressionCodecFactorytodeterminethecodecbasedonthefilename.

Databases

Sparkloadandwritedatawithdatabase,anddatabaselikesystemsintwoprimaryways.SparkSQLprovidesaquerylanguageandrowinterfaceforsomedatabases.Additionaldatabases,anddatabaselikesystems,canbeaccessedthroughHadoop’sconnectors.

Elasticsearch

SparkcanbothreadandwritedatafromElasticsearchusingElasticSearch-Hadoop.ElasticsearchisanewopensourceLucenebasedsearchsystem.Mostoftheconnectorswehavelookedatsofarhavewrittenouttofiles,thisconnectorinsteadwrapsRPCstotheElasticsearchcluster.

Theelasticsearchconnectorisabitdifferentthantheotherconnectorswehaveexamined,sinceitignoresthepathinformationweprovideinsteaddependsonsettingupconfigurationonourSparkcontext.TheElasticsearchOutputFormatconnectoralsodoesn’tquitehavethetypestouseSpark’swrappers,soweinsteadusesaveAsHadoopDataSetwhichmeansweneedtosetmorepropertiesbyhand.Letslookathowtoread/writesomesimpledataouttoElasticSearch.

Example5-34.ScalaElasticSearchOutputExamplevaljobConf=newJobConf(sc.hadoopConfiguration)

jobConf.set(\"mapred.output.format.class\\"org.elasticsearch.hadoop.mr.EsOutputFormat\")jobConf.setOutputCommitter(classOf[FileOutputCommitter])

jobConf.set(ConfigurationOptions.ES_RESOURCE_WRITE,\"/tweets\")jobConf.set(ConfigurationOptions.ES_NODES,\"localhost\")FileOutputFormat.setOutputPath(jobConf,newPath(\"-\"))output.saveAsHadoopDataset(jobConf)

Example5-35.ScalaElasticSearchInputExampledefmapWritableToInput(in:MapWritable):Map[String,String]={in.map{case(k,v)=>(k.toString,v.toString)}.toMap}

91valjobConf=newJobConf(sc.hadoopConfiguration)jobConf.set(\"mapred.output.format.class\\"org.elasticsearch.hadoop.mr.EsOutputFormat\")

jobConf.set(ConfigurationOptions.ES_RESOURCE_READ,args(1))jobConf.set(ConfigurationOptions.ES_NODES,args(2))valcurrentTweets=sc.hadoopRDD(jobConf,

classOf[EsInputFormat[Object,MapWritable]],classOf[Object],classOf[MapWritable])//Extractonlythemap

//ConverttheMapWritable[Text,Text]toMap[String,String]

valtweets=currentTweets.map{case(key,value)=>mapWritableToInput(value)}

Comparedtosomeofourotherconnectorsthisisabitconvoluted,butservesasausefulreferenceforhowtoworkwiththesetypesofconnectors.

Warning

OnthewritesideElasticsearchcandomappinginference,butthiscanocassionalyinferthetypesincorrectly,soitcanbeagoodideatoexplicitlysetamappingifyouarestoringthingsbesidesstrings.

Mongo

WewillcoverloadingandsavingdatawithMongoandSparkinthenextupdateofthisbook.

Cassandra

SparkshipswithexamplesofhowtoworkwithCassandraandwewillexpandonthissectioninthefuture.FornowifyouwanttouseSparkwithCassandra,takealookattheCassandraexamplesinSpark.

HBase

SparkshipswithexamplesofhowtoworkwithHBaseandwewillexpandonthissectioninthefuture.FornowifyouwanttouseSparkwithHBase,takealookattheHBaseexamplesinSpark.

JavaDatabaseConnectivity(JDBC)

InadditiontousingHadoopinputformats,youcancreateRDDsfromJDBCqueries.Unliketheothermethodsofloadingdata,ratherthancallingamethodontheSparkContextweinstead

92createaninstanceoforg.apache.spark.rdd.JdbcRDDandprovideitwithourSparkContextandtheotherinputdataitrequires.

WewillcreateasimpleJdbcRDDusingMySQLas

Example5-36.ScalaJdbcRDDExampledefcreateConnection()={

Class.forName(\"com.mysql.jdbc.Driver\").newInstance();

DriverManager.getConnection(\"jdbc:mysql://localhost/test?user=holden\");}

defextractValues(r:ResultSet)={(r.getInt(1),r.getString(2))}

valdata=newJdbcRDD(sc,

createConnection,\"SELECT*FROMpandaWHERE?<=idANDID<=?\lowerBound=1,upperBound=3,numPartitions=2,mapRow=extractValues)println(data.collect().toList)

TheminandmaxweprovidetotheJdbcRDDclassallowSparktoquerydifferentrangesofthedataondifferentmachines,sowedon’tgetbottleneckedtryingtoloadallthedataonasinglenode.Ifyoudon’tknowhowmanyrecordsthereare,theyoucanjustdoacountquerymanuallyfirstandusetheresult.

Alongsimilarlinesweprovideafunctiontoestablishtheconnectiontoourdatabase.Thisletseachnodecreateitsownconnectiontoloaddataover.

Thelastparameterconvertstheresultfromjava.sql.ResultSettoaformatthatisusefulformanipulatingourdata.IfleftoutSparkwillautomaticallyconverteverythingtoarraysofobjects.

TheJdbcRDDiscurrentlyaccessedabitdifferentlyfrommostoftheothermethodsforloadingdataintoSpark,andprovidesanotheroptionforinterfacingwithdatabasesystems.

Conclusion

WiththeendofthischapteryoushouldbeabletogetyourdataintoSparktoworkwithandstoretheresultofyourcomputationinaformatthatisusefulforyou.Wehaveexaminedanumberofdifferentformatswecanuseforourdata,aswellascompressionoptionsandtheir

93implicationsonhowdatacanbeconsumed.SubsequentchapterswillexaminewaystowritemoreeffectiveandpowerfulSparkprogramsnowthatwecanloadandsavelargedatasets.

[10]sometimescalledpbsorprotobufs

94AbouttheAuthors

HoldenKarauisasoftwaredevelopmentengineeratDatabricksandisactiveinopensource.SheistheauthorofanearlierSparkbook.PriortoDatabrickssheworkedonavarietyofsearchandclassificationproblemsatGoogle,Foursquare,andAmazon.ShegraduatedfromtheUniversityofWaterloowithaBachelorsofMathematicsinComputerScience.Outsideofsoftwaresheenjoyspayingwithfire,welding,andhulahooping.

Mostrecently,AndyKonwinskico-foundedDatabricks.BeforethathewasaPhDstudentandthenpostdocintheAMPLabatUCBerkeley,focusedonlargescaledistributedcomputingandclusterscheduling.Heco-createdandisacommitterontheApacheMesosproject.HealsoworkedwithsystemsengineersandresearchersatGoogleonthedesignofOmega,theirnextgenerationclusterschedulingsystem.Morerecently,hedevelopedandledtheAMPCampBigDataBootcampsandfirstSparkSummit,andhasbeencontributingtotheSparkproject.PatrickWendellisanengineeratDatabricksaswellasaSparkCommitterandPMCmember.IntheSparkproject,PatrickhasactedasreleasemanagerforseveralSparkreleases,includingSpark1.0.PatrickalsomaintainsseveralsubsystemsofSpark'scoreengine.BeforehelpingstartDatabricks,PatrickobtainedanM.S.inComputerScienceatUCBerkeley.Hisresearchfocusedonlowlatencyschedulingforlargescaleanalyticsworkloads.HeholdsaB.S.EinComputerSciencefromPrincetonUniversity

MateiZahariaisaPhDstudentintheAMPLabatUCBerkeley,workingontopicsincomputersystems,cloudcomputingandbigdata.HeisalsoacommitteronApacheHadoopandApacheMesos.AtBerkeley,heleadsthedevelopmentoftheSparkclustercomputingframework,andhasalsoworkedonprojectsincludingMesos,theHadoopFairScheduler,Hadoop'sstragglerdetectionalgorithm,Shark,andmulti-resourcesharing.MateigothisundergraduatedegreeattheUniversityofWaterlooinCanada.

95

因篇幅问题不能全部显示,请点此查看更多更全内容

Copyright © 2019- baomayou.com 版权所有 赣ICP备2024042794号-6

违法及侵权请联系:TEL:199 18 7713 E-MAIL:2724546146@qq.com

本站由北京市万商天勤律师事务所王兴未律师提供法律服务