HoldenKarauAndyKonwinskiPatrickWendellMateiZahariaBeijing•Cambridge•Farnham•Köln•Sebastopol•Tokyo2TableofContentsPreface................................................................................................................................................5Audience................................................................................................................................................5HowThisBookisOrganized..............................................................................................................6SupportingBooks.................................................................................................................................6CodeExamples.....................................................................................................................................7EarlyReleaseStatusandFeedback...................................................................................................7Chapter1.IntroductiontoDataAnalysiswithSpark......................................................8WhatisApacheSpark?.......................................................................................................................8AUnifiedStack.....................................................................................................................................8WhoUsesSpark,andForWhat?......................................................................................................11ABriefHistoryofSpark....................................................................................................................13SparkVersionsandReleases............................................................................................................13SparkandHadoop.............................................................................................................................14Chapter2.DownloadingandGettingStarted...................................................................15DownloadingSpark............................................................................................................................15IntroductiontoSpark’sPythonandScalaShells..........................................................................16IntroductiontoCoreSparkConcepts.............................................................................................20StandaloneApplications...................................................................................................................23Conclusion..........................................................................................................................................25Chapter3.ProgrammingwithRDDs...................................................................................26RDDBasics.........................................................................................................................................26CreatingRDDs...................................................................................................................................28RDDOperations................................................................................................................................28PassingFunctionstoSpark..............................................................................................................32CommonTransformationsandActions.........................................................................................36Persistence(Caching)........................................................................................................................46Conclusion..........................................................................................................................................48Chapter4.WorkingwithKey-ValuePairs.........................................................................49
3Motivation..........................................................................................................................................49CreatingPairRDDs...........................................................................................................................49TransformationsonPairRDDs.......................................................................................................50ActionsAvailableonPairRDDs......................................................................................................60DataPartitioning................................................................................................................................61Conclusion..........................................................................................................................................70Chapter5.LoadingandSavingYourData..........................................................................71Motivation...........................................................................................................................................71ChoosingaFormat.............................................................................................................................71Formats...............................................................................................................................................72FileSystems........................................................................................................................................88Compression.......................................................................................................................................Databases............................................................................................................................................91Conclusion..........................................................................................................................................93AbouttheAuthors........................................................................................................................95
4Preface
Asparalleldataanalysishasbecomeincreasinglycommon,practitionersinmanyfieldshavesoughteasiertoolsforthistask.ApacheSparkhasquicklyemergedasoneofthemostpopulartoolsforthispurpose,extendingandgeneralizingMapReduce.Sparkoffersthreemainbenefits.First,itiseasytouse—youcandevelopapplicationsonyourlaptop,usingahigh-levelAPIthatletsyoufocusonthecontentofyourcomputation.Second,Sparkisfast,enablinginteractiveuseandcomplexalgorithms.Andthird,Sparkisageneralengine,allowingyoutocombinemultipletypesofcomputations(e.g.,SQLqueries,textprocessingandmachinelearning)thatmightpreviouslyhaverequiredlearningdifferentengines.ThesefeaturesmakeSparkanexcellentstartingpointtolearnaboutbigdataingeneral.
ThisintroductorybookismeanttogetyouupandrunningwithSparkquickly.You’lllearnhowtolearnhowtodownloadandrunSparkonyourlaptopanduseitinteractivelytolearntheAPI.Oncethere,we’llcoverthedetailsofavailableoperationsanddistributedexecution.Finally,you’llgetatourofthehigher-levellibrariesbuiltintoSpark,includinglibrariesformachinelearning,streamprocessing,graphanalyticsandSQL.Wehopethatthisbookgivesyouthetoolstoquicklytackledataanalysisproblems,whetheryoudosoononemachineorhundreds.
Audience
ThisbooktargetsDataScientistsandEngineers.WechosethesetwogroupsbecausetheyhavethemosttogainfromusingSparktoexpandthescopeofproblemstheycansolve.Spark’srichcollectionofdatafocusedlibraries(likeMLlib)makeiteasyfordatascientiststogobeyondproblemsthatfitonsinglemachinewhilemakinguseoftheirstatisticalbackground.Engineers,meanwhile,willlearnhowtowritegeneral-purposedistributedprogramsinSparkandoperateproductionapplications.Engineersanddatascientistswillbothlearndifferentdetailsfromthisbook,butwillbothbeabletoapplySparktosolvelargedistributedproblemsintheirrespectivefields.
Datascientistsfocusonansweringquestionsorbuildingmodelsfromdata.TheyoftenhaveastatisticalormathbackgroundandsomefamiliaritywithtoolslikePython,RandSQL.WehavemadesuretoincludePython,andwhereverpossibleSQL,examplesforallourmaterial,aswellasanoverviewofthemachinelearningandadvancedanalyticslibrariesinSpark.Ifyouareadatascientist,wehopethatafterreadingthisbookyouwillbeabletousethesamemathematicalapproachestosolvingproblems,exceptmuchfasterandonamuchlargerscale.
5ThesecondgroupthisbooktargetsissoftwareengineerswhohavesomeexperiencewithJava,Pythonoranotherprogramminglanguage.Ifyouareanengineer,wehopethatthisbookwillshowyouhowtosetupaSparkcluster,usetheSparkshell,andwriteSparkapplicationstosolveparallelprocessingproblems.IfyouarefamiliarwithHadoop,youhaveabitofaheadstartonfiguringouthowtointeractwithHDFSandhowtomanageacluster,buteitherway,wewillcoverbasicdistributedexecutionconcepts.
Regardlessofwhetheryouareadataanalystorengineer,togetthemostofthisbookyoushouldhavesomefamiliaritywithoneofPython,Java,Scala,orasimilarlanguage.Weassumethatyoualreadyhaveasolutionforstoringyourdataandwecoverhowtoloadandsavedatafrommanycommonones,butnothowtosetthemup.Ifyoudon’thaveexperiencewithoneofthoselanguages,don’tworry,thereareexcellentresourcesavailabletolearnthese.WecalloutsomeofthebooksavailableinSupportingBooks.
HowThisBookisOrganized
Thechaptersofthisbookarelaidoutinsuchawaythatyoushouldbeabletogothroughthematerialfronttoback.Atthestartofeachchapter,wewillmentionwhichsectionsofthechapterwethinkaremostrelevanttodatascientistsandwhichsectionswethinkaremostrelevantforengineers.Thatsaid,wehopethatallthematerialisaccessibletoreadersofeitherbackground.
ThefirsttwochapterswillgetyoustartedwithgettingabasicSparkinstallationonyourlaptopandgiveyouanideaofwhatyoucanaccomplishwithApacheSpark.Oncewe’vegotthemotivationandsetupoutoftheway,wewilldiveintotheSparkShell,averyusefultoolfordevelopmentandprototyping.SubsequentchaptersthencovertheSparkprogramminginterfaceindetail,howapplicationsexecuteonacluster,andhigher-levellibrariesavailableonSparksuchasSparkSQLandMLlib.
SupportingBooks
Ifyouareadatascientistanddon’thavemuchexperiencewithPython,theLearningPythonbookisanexcellentintroduction.
Ifyouareanengineerandafterreadingthisbookyouwouldliketoexpandyourdataanalysisskills,MachineLearningforHackersandDoingDataScienceareexcellentbooksfromO’Reilly.Thisbookisintendedtobeaccessibletobeginners.Wedointendtoreleaseadeepdivefollow-upforthoselookingtogainamorethoroughunderstandingofSpark’sinternals.
6CodeExamples
AllofthecodeexamplesfoundinthisbookareonGitHub.Youcanexaminethemandcheckthemoutfromhttps://github.com/databricks/learning-spark.CodeexamplesareprovidedinJava,Scala,andPython.
Tip
OurJavaexamplesarewrittentoworkwithJavaversion6andhigher.Java8introducesanewsyntaxcalled“lambdas”thatmakeswritinginlinefunctionsmucheasier,whichcansimplifySparkcode.Wehavechosennottotakeadvantageofthissyntaxinmostofourexamples,asmostorganizationsarenotyetusingJava8.IfyouwouldliketotryJava8syntax,youcanseetheDatabricksblogpostonthistopic.
EarlyReleaseStatusandFeedback
ThisisanearlyreleasecopyofLearningSpark,andassuchwearestillworkingonthetext,addingcodeexamples,andwritingsomeofthelaterchapters.Althoughwehopethatthebookisusefulinitscurrentform,wewouldgreatlyappreciateyourfeedbacksowecanimproveitandmakethebestpossiblefinishedproduct.Theauthorsandeditorscanbereachedatbook-feedback@databricks.com.
Theauthorswouldliketothankthereviewerswhoofferedfeedbacksofar:JulietHougland,AndrewGal,MichaelGregson,StephanJou,JoshMahonin,andMikePatterson.
7Chapter1.IntroductiontoDataAnalysiswithSpark
ThischapterprovidesahighleveloverviewofwhatApacheSparkis.IfyouarealreadyfamiliarwithApacheSparkanditscomponents,feelfreetojumpaheadtoChapter2.
WhatisApacheSpark?
ApacheSparkisaclustercomputingplatformdesignedtobefastandgeneral-purpose.
Onthespeedside,SparkextendsthepopularMapReducemodeltoefficientlysupportmoretypesofcomputations,includinginteractivequeriesandstreamprocessing.Speedisimportantinprocessinglargedatasetsasitmeansthedifferencebetweenexploringdatainteractivelyandwaitingminutesbetweenqueries,orwaitinghourstorunyourprogramversusminutes.OneofthemainfeaturesSparkoffersforspeedistheabilitytoruncomputationsinmemory,butthesystemisalsofasterthanMapReduceforcomplexapplicationsrunningondisk.
Onthegeneralityside,Sparkisdesignedtocoverawiderangeofworkloadsthatpreviouslyrequiredseparatedistributedsystems,includingbatchapplications,iterativealgorithms,interactivequeriesandstreaming.Bysupportingtheseworkloadsinthesameengine,Sparkmakesiteasyandinexpensivetocombinedifferentprocessingtypes,whichisoftennecessaryinproductiondataanalysispipelines.Inaddition,itreducesthemanagementburdenofmaintainingseparatetools.
Sparkisdesignedtobehighlyaccessible,offeringsimpleAPIsinPython,Java,ScalaandSQL,andrichbuilt-inlibraries.Italsointegratescloselywithotherbigdatatools.Inparticular,SparkcanruninHadoopclustersandaccessanyHadoopdatasource.
AUnifiedStack
TheSparkprojectcontainsmultipleclosely-integratedcomponents.Atitscore,Sparkisa“computationalengine”thatisresponsibleforscheduling,distributing,andmonitoringapplicationsconsistingofmanycomputationaltasksacrossmanyworkermachines,oracomputingcluster.BecausethecoreengineofSparkisbothfastandgeneral-purpose,itpowersmultiplehigher-levelcomponentsspecializedforvariousworkloads,suchasSQLormachinelearning.Thesecomponentsaredesignedtointeroperateclosely,lettingyoucombinethemlikelibrariesinasoftwareproject.
8Aphilosophyoftightintegrationhasseveralbenefits.First,alllibrariesandhigherlevelcomponentsinthestackbenefitfromimprovementsatthelowerlayers.Forexample,whenSpark’scoreengineaddsanoptimization,SQLandmachinelearninglibrariesautomaticallyspeedupaswell.Second,thecostsassociatedwithrunningthestackareminimized,becauseinsteadofrunning5-10independentsoftwaresystems,anorganizationonlyneedstorunone.Thesecostsincludedeployment,maintenance,testing,support,andmore.ThisalsomeansthateachtimeanewcomponentisaddedtotheSparkstack,everyorganizationthatusesSparkwillimmediatelybeabletotrythisnewcomponent.Thischangesthecostoftryingoutanewtypeofdataanalysisfromdownloading,deploying,andlearninganewsoftwareprojecttoupgradingSpark.
Finally,oneofthelargestadvantagesoftightintegrationistheabilitytobuildapplicationsthatseamlesslycombinedifferentprocessingmodels.Forexample,inSparkyoucanwriteoneapplicationthatusesmachinelearningtoclassifydatainrealtimeasitisingestedfromstreamingsources.Simultaneouslyanalystscanquerytheresultingdata,alsoinreal-time,viaSQL,e.g.tojointhedatawithunstructuredlogfiles.Inaddition,moresophisticateddataengineerscanaccessthesamedataviathePythonshellforad-hocanalysis.Othersmightaccessthedatainstandalonebatchapplications.Allthewhile,theITteamonlyhastomaintainonesoftwarestack.
Figure1-1.TheSparkStackHerewewillbrieflyintroduceeachofthecomponentsshowninFigure1-1.
9SparkCore
SparkCorecontainsthebasicfunctionalityofSpark,includingcomponentsfortaskscheduling,memorymanagement,faultrecovery,interactingwithstoragesystems,andmore.SparkCoreisalsohometotheAPIthatdefinesResilientDistributedDatasets(RDDs),whichareSpark’smainprogrammingabstraction.RDDsrepresentacollectionofitemsdistributedacrossmanycomputenodesthatcanbemanipulatedinparallel.SparkCoreprovidesmanyAPIsforbuildingandmanipulatingthesecollections.
SparkSQL
SparkSQLprovidessupportforinteractingwithSparkviaSQLaswellastheApacheHivevariantofSQL,calledtheHiveQueryLanguage(HiveQL).SparkSQLrepresentsdatabasetablesasSparkRDDsandtranslatesSQLqueriesintoSparkoperations.BeyondprovidingtheSQLinterfacetoSpark,SparkSQLallowsdeveloperstointermixSQLquerieswiththeprogrammaticdatamanipulationssupportedbyRDDsinPython,JavaandScala,allwithinasingleapplication.ThistightintegrationwiththetherichandsophisticatedcomputingenvironmentprovidedbytherestoftheSparkstackmakesSparkSQLunlikeanyotheropensourcedatawarehousetool.SparkSQLwasaddedtoSparkinversion1.0.
SharkisaprojectoutofUCBerkeleythatpre-datesSparkSQLandisbeingportedtoworkontopofSparkSQL.SharkprovidesadditionalfunctionalitysothatSparkcanactasdrop-inreplacementforApacheHive.ThisincludesaHiveQLshell,aswellasaJDBCserverthatmakesiteasytoconnectexternalgraphinganddataexplorationtools.
SparkStreaming
SparkStreamingisaSparkcomponentthatenablesprocessinglivestreamsofdata.Examplesofdatastreamsincludelogfilesgeneratedbyproductionwebservers,orqueuesofmessagescontainingstatusupdatespostedbyusersofawebservice.SparkStreamingprovidesanAPIformanipulatingdatastreamsthatcloselymatchestheSparkCore’sRDDAPI,makingiteasyforprogrammerstolearntheprojectandmovebetweenapplicationsthatmanipulatedatastoredinmemory,ondisk,orarrivinginreal-time.UnderneathitsAPI,SparkStreamingwasdesignedtoprovidethesamedegreeoffaulttolerance,throughput,andscalabilitythattheSparkCoreprovides.
MLlib
Sparkcomeswithalibrarycontainingcommonmachinelearning(ML)functionalitycalledMLlib.MLlibprovidesmultipletypesofmachinelearningalgorithms,includingbinaryclassification,regression,clusteringandcollaborativefiltering,aswellassupporting
10functionalitysuchasmodelevaluationanddataimport.ItalsoprovidessomelowerlevelMLprimitivesincludingagenericgradientdescentoptimizationalgorithm.Allofthesemethodsaredesignedtoscaleoutacrossacluster.
GraphX
GraphXisalibraryaddedinSpark0.9thatprovidesanAPIformanipulatinggraphs(e.g.,asocialnetwork’sfriendgraph)andperforminggraph-parallelcomputations.LikeSparkStreamingandSparkSQL,GraphXextendstheSparkRDDAPI,allowingustocreateadirectedgraphwitharbitrarypropertiesattachedtoeachvertexandedge.GraphXalsoprovidessetofoperatorsformanipulatinggraphs(e.g.,subgraphandmapVertices)andalibraryofcommongraphalgorithms(e.g.,PageRankandtrianglecounting).
ClusterManagers
Underthehood,Sparkisdesignedtoefficientlyscaleupfromonetomanythousandsofcomputenodes.Toachievethiswhilemaximizingflexibility,Sparkcanrunoveravarietyofclustermanagers,includingincludingHadoopYARN,ApacheMesos,andasimpleclustermanagerincludedinSparkitselfcalledtheStandaloneScheduler.IfyouarejustinstallingSparkonanemptysetofmachines,theStandaloneSchedulerprovidesaneasywaytogetstarted;whileifyoualreadyhaveaHadoopYARNorMesoscluster,Spark’ssupportfortheseallowsyourapplicationstoalsorunonthem.
WhoUsesSpark,andForWhat?
BecauseSparkisageneralpurposeframeworkforclustercomputing,itisusedforadiverserangeofapplications.InthePrefaceweoutlinedtwopersonasthatthisbooktargetsasreaders:DataScientistsandEngineers.Let’stakeacloserlookateachofthesepersonasandhowtheyuseSpark.Unsurprisingly,thetypicalusecasesdifferacrossthetwopersonas,butwecanroughlyclassifythemintotwocategories,datascienceanddataapplications.
Ofcourse,theseareimprecisepersonasandusagepatterns,andmanyfolkshaveskillsfromboth,sometimesplayingtheroleoftheinvestigatingDataScientist,andthen“changinghats”andwritingahardeneddataprocessingsystem.Nonetheless,itcanbeilluminatingtoconsiderthetwopersonasandtheirrespectiveusecasesseparately.
DataScienceTasks
DataScienceisthenameofadisciplinethathasbeenemergingoverthepastfewyearscenteredaroundanalyzingdata.Whilethereisnostandarddefinition,forourpurposesaDataScientist
11issomebodywhosemaintaskistoanalyzeandmodeldata.DatascientistsmayhaveexperienceusingSQL,statistics,predictivemodeling(machinelearning),andsomeprogramming,usuallyinPython,MatlaborR.Datascientistsalsohaveexperiencewithtechniquesnecessarytotransformdataintoformatsthatcanbeanalyzedforinsights(sometimesreferredtoasdatawrangling).
DataScientistsusetheirskillstoanalyzedatawiththegoalofansweringaquestionordiscoveringinsights.Oftentimes,theirworkflowinvolvesad-hocanalysis,andsotheyuseinteractiveshells(vs.buildingcomplexapplications)thatletthemseeresultsofqueriesandsnippetsofcodeintheleastamountoftime.Spark’sspeedandsimpleAPIsshineforthispurpose,anditsbuilt-inlibrariesmeanthatmanyalgorithmsareavailableoutofthebox.Sometimes,aftertheinitialexplorationphase,theworkofaDataScientistwillbe“productionized”,orextended,hardened(i.e.madefaulttolerant),andtunedtobecomeaproductiondataprocessingapplication,whichitselfisacomponentofabusinessapplication.Forexample,theinitialinvestigationofaDataScientistmightleadtothecreationofaproductionrecommendersystemthatisintegratedintoonawebapplicationandusedtogeneratecustomizedproductsuggestionstousers.OftenitisadifferentpersonorteamthatleadstheprocessofproductizingtheworkoftheDataScientists,andthatpersonisoftenanEngineer.
DataProcessingApplications
TheothermainusecaseofSparkcanbedescribedinthecontextoftheEngineerpersona.Forourpurposeshere,wethinkofEngineersaslargeclassofsoftwaredeveloperswhouseSparktobuildproductiondataprocessingapplications.Thesedevelopersusuallyhaveanunderstandingoftheprinciplesofsoftwareengineering,suchasencapsulation,interfacedesign,andObjectOrientedProgramming.TheyfrequentlyhaveadegreeinComputerScience.Theyusetheirengineeringskillstodesignandbuildsoftwaresystemsthatimplementabusinessusecase.ForEngineers,Sparkprovidesasimplewaytoparallelizetheseapplicationsacrossclusters,andhidesthecomplexityofdistributedsystemsprogramming,networkcommunicationandfaulttolerance.Thesystemgivesenoughcontroltomonitor,inspectandtuneapplicationswhileallowingcommontaskstobeimplementedquickly.ThemodularnatureoftheAPI(basedonpassingdistributedcollectionsofobjects)makesiteasytofactorworkintoreusablelibrariesandtestitlocally.
Spark’suserschoosetouseitfortheirdataprocessingapplicationsbecauseitprovidesawidevarietyoffunctionality,iseasytolearnanduse,andismatureandreliable.
12ABriefHistoryofSpark
Sparkisanopensourceprojectthathasbeenbuiltandismaintainedbyathrivinganddiversecommunityofdevelopersfrommanydifferentorganizations.IfyouoryourorganizationaretryingSparkforthefirsttime,youmightbeinterestedinthehistoryoftheproject.Sparkstartedin2009asaresearchprojectintheUCBerkeleyRADLab,latertobecometheAMPLab.TheresearchersinthelabhadpreviouslybeenworkingonHadoopMapReduce,andobservedthatMapReducewasinefficientforiterativeandinteractivecomputingjobs.Thus,fromthebeginning,Sparkwasdesignedtobefastforinteractivequeriesanditerativealgorithms,bringinginideaslikesupportforin-memorystorageandefficientfaultrecovery.
ResearchpaperswerepublishedaboutSparkatacademicconferencesandsoonafteritscreationin2009,itwasalready10—20xfasterthanMapReduceforcertainjobs.
SomeofSpark’sfirstuserswereothergroupsinsideofUCBerkeley,includingmachinelearningresearcherssuchasthetheMobileMillenniumproject,whichusedSparktomonitorandpredicttrafficcongestionintheSanFranciscobayArea.Inaveryshorttime,however,manyexternalorganizationsbeganusingSpark,andtoday,over50organizationslistthemselvesontheSparkPoweredBypageSparkMeetups[1],anddozensspeakabouttheirusecasesatSparkcommunityeventssuchas
[3].
[2]andtheSparkSummitApartfromUCBerkeley,majorcontributorstothe
projectcurrentlyincludeYahoo!,IntelandDatabricks.
In2011,theAMPLabstartedtodevelophigher-levelcomponentsonSpark,suchasShark(HiveonSpark)andSparkStreaming.TheseandothercomponentsareoftenreferredtoastheBerkeleyDataAnalyticsStack(BDAS)[4].
BDASincludesbothcomponentsofSparkandother
softwareprojectsthatcomplementit,suchastheTachyonmemorymanager.
SparkwasfirstopensourcedinMarch2010,andwastransferredtotheApacheSoftwareFoundationinJune2013,whereitisnowatop-levelproject.
SparkVersionsandReleases
SinceitscreationSparkhasbeenaveryactiveprojectandcommunity,withthenumberofcontributorsgrowingwitheachrelease.Spark1.0hadover100individualcontributors.Thoughthelevelofactivityhasrapidlygrown,thecommunitycontinuestoreleaseupdatedversionsofSparkonaregularschedule.Spark1.0wasreleasedinMay2014.ThisbookfocusesprimarilyonSpark1.0andbeyond,thoughmostoftheconceptsandexamplesalsoworkinearlierversions.
13SparkandHadoop
SparkcancreatedistributeddatasetsfromanyfilestoredintheHadoopdistributedfilesystem(HDFS)orotherstoragesystemssupportedbyHadoop(includingyourlocalfilesystem,AmazonS3,Cassandra,Hive,HBase,etc).Sparksupportstextfiles,SequenceFiles,Avro,Parquet,andanyotherHadoopInputFormat.Wewilllookatinteractingwiththesedatasourcesintheloadingandsavingchapter.
[1]https://cwiki.apache.org/confluence/display/SPARK/Powered+By+Sparkhttp://www.meetup.com/spark-users/http://spark-summit.orghttps://amplab.cs.berkeley.edu/software[2][3][4]14Chapter2.DownloadingandGettingStarted
InthischapterwewillwalkthroughtheprocessofdownloadingandrunningSparkinlocalmodeonasinglecomputer.ThischapterwaswrittenforanybodythatisnewtoSpark,includingbothDataScientistsandEngineers.
SparkcanbeusedfromPython,JavaorScala.Tobenefitfromthisbook,youdon’tneedtobeanexpertprogrammer,butwedoassumethatyouarecomfortablewiththebasicsyntaxofatleastoneoftheselanguages.Wewillincludeexamplesinalllanguageswhereverpossible.
SparkitselfiswritteninScala,andrunsontheJavaVirtualMachine(JVM).TorunSparkoneitheryourlaptoporacluster,allyouneedisaninstallationofJava6(ornewer).IfyouwishtousethePythonAPIyouwillalsoneedaPythoninterpreter(version2.6ornewer).SparkdoesnotyetworkwithPython3.
DownloadingSpark
ThefirststeptousingSparkistodownloadandunpackitintoausableform.Let’sstartbydownloadingarecentprecompiledreleasedversionofSpark.Visit
http://spark.apache.org/downloads.html,thenunder“Pre-builtpackages”,nextto“ForHadoop1(HDP1,CDH3)”,click“directfiledownload”.Thiswilldownloadacompressedtarfile,or“tarball,”calledspark-1.0.0-bin-hadoop1.tgz.
IfyouwanttouseSparkwithanotherHadoopversion,thosearealsoavailablefromhttp://spark.apache.org/downloads.htmlbutwillhaveslightlydifferentfilenames.Buildingfromsourceisalsopossible,andyoucanfindthelatestsourcecodeonGitHubathttp://github.com/apache/spark.
Note
MostUnixandLinuxvariants,includingMacOSX,comewithacommand-linetoolcalledtarthatcanbeusedtounpacktarfiles.Ifyouroperatingsystemdoesnothavethetarcommandinstalled,trysearchingtheInternetforafreetarextractor—forexample,onWindows,youmaywishtotry7-Zip.
NowthatwehavedownloadedSpark,let’sunpackitandtakealookatwhatcomeswiththedefaultSparkdistribution.Todothat,openaterminal,changetothedirectorywhereyoudownloadedSpark,anduntarthefile.Thiswillcreateanewdirectorywiththesamenamebut
15withoutthefinal.tgzsuffix.Changeintothedirectory,andseewhat’sinside.Youcanusethefollowingcommandstoaccomplishallofthat.cd~
tar-xfspark-1.0.0-bin-hadoop1.tgzcdspark-1.0.0-bin-hadoop1ls
Inthelinecontainingthetarcommandabove,thexflagtellstarweareextractingfiles,andthefflagspecifiesthenameofthetarball.ThelscommandliststhecontentsoftheSparkdirectory.Let’sbrieflyconsiderthenamesandpurposeofsomeofthemoreimportantfilesanddirectoriesyouseeherethatcomewithSpark.
README.md-ContainsshortinstructionsforgettingstartedwithSpark.bin-ContainsexecutablefilesthatcanbeusedtointeractwithSparkinvariousways,e.g.thespark-shell,whichwewillcoverlaterinthischapter,isinhere.core,streaming,python-sourcecodeofmajorcomponentsoftheSparkproject.examples-containssomehelpfulSparkstandalonejobsthatyoucanlookatandruntolearnabouttheSparkAPI.Don’tworryaboutthelargenumberofdirectoriesandfilestheSparkprojectcomeswith;wewillcovermostoftheseintherestofthisbook.Fornow,let’sdiveinrightawayandtryoutSpark’sPythonandScalashells.WewillstartbyrunningsomeoftheexamplesthatcomewithSpark.Thenwewillwrite,compileandrunasimpleSparkJobofourown.
AlloftheworkwewilldointhischapterwillbewithSparkrunningin“localmode”,i.e.non-distributedmode,whichonlyusesasinglemachine.Sparkcanruninavarietyofdifferentmodes,orenvironments.Beyondlocalmode,SparkcanalsoberunonMesos,YARN,ontopofaStandaloneSchedulerthatisincludedintheSparkdistribution.Wewillcoverthevariousdeploymentmodesindetailinchapter(tocome).
IntroductiontoSpark’sPythonandScalaShells
Sparkcomeswithinteractiveshellsthatmakead-hocdataanalysiseasy.Spark’sshellswillfeelfamiliarifyouhaveusedothershellssuchasthoseinR,Python,andScala,oroperatingsystemshellslikeBashortheWindowscommandprompt.
Unlikemostothershells,however,whichletyoumanipulatedatausingthediskandmemoryonasinglemachine,Spark’sshellsallowyoutointeractwithdatathatisdistributedondiskorin
16memoryacrossmanymachines,andSparktakescareofautomaticallydistributingthisprocessing.
BecauseSparkcanloaddataintomemory,manydistributedcomputations,evenonesthatprocessterabytesofdataacrossdozensofmachines,canfinishrunninginafewseconds.Thismakesthesortofiterative,ad-hoc,andexploratoryanalysiscommonlydoneinshellsagoodfitforSpark.SparkprovidesbothPythonandScalashellsthathavebeenaugmentedtosupportconnectingtoacluster.
Note
MostofthisbookincludescodeinallofSpark’slanguages,butinteractiveshellsareonlyavailableinPythonandScala.BecauseashellisveryusefulforlearningtheAPI,werecommendusingoneoftheselanguagesfortheseexamplesevenifyouareaJavadeveloper.TheAPIisthesameineverylanguage.
TheeasiestwaytodemonstratethepowerofSpark’sshellsistostartusingoneofthemforsomesimpledataanalysis.Let’swalkthroughtheexamplefromtheQuickStartGuideintheofficialSparkdocumentation[5].
ThefirststepistoopenuponeofSpark’sshells.ToopenthePythonversionoftheSparkShell,whichwealsorefertoasthePySparkShell,gointoyourSparkdirectoryandtype:bin/pyspark
(Orbin\\pysparkinWindows.)ToopentheScalaversionoftheshell,type:bin/spark-shell
Theshellpromptshouldappearwithinafewseconds.Whentheshellstarts,youwillnoticealotoflogmessages.Youmayneedtohit[Enter]oncetoclearthelogoutput,andgettoashellprompt.FigureFigure2-1showswhatthePySparkshelllookslikewhenyouopenit.
17Figure2-1.ThePySparkShellWithDefaultLoggingOutputYoumayfindtheloggingstatementsthatgetprintedintheshelldistracting.Youcancontroltheverbosityofthelogging.Todothis,youcancreateafileintheconfdirectorycalledlog4j.properties.TheSparkdevelopersalreadyincludeatemplateforthisfilecalledlog4j.properties.template.
To
make
the
logging
less
verbose,
make
a
copy
of
conf/log4j.properties.templatecalledconf/log4j.propertiesandfindthefollowingline:log4j.rootCategory=INFO,console
ThenlowertheloglevelsothatweonlyshowWARNmessageandabovebychangingittothefollowing:
log4j.rootCategory=WARN,console
Whenyoure-opentheshell,youshouldseelessoutput.
18Figure2-2.ThePySparkShellWithLessLoggingOutputUsingIPython
IPythonisanenhancedPythonshellthatmanyPythonusersprefer,offeringfeaturessuchastabcompletion.Youcanfindinstructionsforinstallingitathttp://ipython.org.YoucanuseIPythonwithSparkbysettingtheIPYTHONenvironmentvariableto1:IPYTHON=1./bin/pyspark
TousetheIPythonNotebook,whichisawebbrowserbasedversionofIPython,use:IPYTHON_OPTS=\"notebook\"./bin/pyspark
OnWindows,settheenvironmentvariableandruntheshellasfollows:setIPYTHON=1bin\\pyspark
InSparkweexpressourcomputationthroughoperationsondistributedcollectionsthatareautomaticallyparallelizedacrossthecluster.ThesecollectionsarecalledaResilient
19DistributedDatasets,orRDDs.RDDsareSpark’sfundamentalabstractionfordistributeddataandcomputation.
BeforewesaymoreaboutRDDs,let’screateoneintheshellfromalocaltextfileanddosomeverysimplead-hocanalysisbyfollowingtheexamplebelow.
Example2-1.Pythonlinecount>>>lines=sc.textFile(\"README.md\")#CreateanRDDcalledlines>>>lines.count()#CountthenumberofitemsinthisRDD127
>>>lines.first()#FirstiteminthisRDD,i.e.firstlineofREADME.mdu'#ApacheSpark'
Example2-2.Scalalinecountscala>vallines=sc.textFile(\"README.md\")//CreateanRDDcalledlineslines:spark.RDD[String]=MappedRDD[...]
scala>lines.count()//CountthenumberofitemsinthisRDDres0:Long=127
scala>lines.first()//FirstiteminthisRDD,i.e.firstlineofREADME.mdres1:String=#ApacheSpark
Toexittheshell,youcanpressControl+D.
Intheexampleabove,thevariablescalledlinesareRDDs,createdherefromatextfileonourlocalmachine.WecanrunvariousparalleloperationsontheRDDs,suchascountingthenumberofelementsinthedataset(herelinesoftextinthefile)orprintingthefirstone.WewilldiscussRDDsingreatdepthinlaterchapters,butbeforewegoanyfurther,let’stakeamomentnowtointroducebasicSparkconcepts.
IntroductiontoCoreSparkConcepts
NowthatyouhaverunyourfirstSparkcodeusingtheshell,it’stimelearnaboutprogramminginitinmoredetail.
Atahighlevel,everySparkapplicationconsistsofadriverprogramthatlaunchesvariousparalleloperationsonacluster.Thedriverprogramcontainsyourapplication’smainfunctionanddefinesdistributeddatasetsonthecluster,thenappliesoperationstothem.Intheexamples
20above,thedriverprogramwastheSparkshellitself,andyoucouldjusttypeintheoperationsyouwantedtorun.
DriverprogramsaccessSparkthroughaSparkContextobject,whichrepresentsaconnectiontoacomputingcluster.Intheshell,aSparkContextisautomaticallycreatedforyou,asthevariablecalledsc.Tryprintingoutsctoseeitstype:>>>sc
OnceyouhaveaSparkContext,youcanuseittobuildresilientdistributeddatasets,orRDDs.Intheexampleabove,wecalledSparkContext.textFiletocreateanRDDrepresentingthelinesoftextinafile.Wecanthenrunvariousoperationsontheselines,suchascount(). Toruntheseoperations,driverprogramstypicallymanageanumberofnodescalledexecutors.Forexample,ifwewererunningthecount()aboveonacluster,differentmachinesmightcountlinesindifferentrangesofthefile.BecausewejustrantheSparkshelllocally,itexecutedallitsworkonasinglemachine—butyoucanconnectthesameshelltoaclustertoanalyzedatainparallel.Figure2-3showshowSparkexecutesonacluster. Figure2-3.ComponentsfordistributedexecutioninSparkFinally,alotofSpark’sAPIrevolvesaroundpassingfunctionstoitsoperatorstorunthemonthecluster.Forexample,wecouldextendourREADMEexamplebyfilteringthelinesinthefilethatcontainaword,suchas“Python”: 21Example2-3.Pythonfilteringexample>>>lines=sc.textFile(\"README.md\") >>>pythonLines=lines.filter(lambdaline:\"Python\"inline)>>>pythonLines.first()u'##InteractivePythonShell' Example2-4.Scalafilteringexamplescala>vallines=sc.textFile(\"README.md\")//CreateanRDDcalledlineslines:spark.RDD[String]=MappedRDD[...] scala>valpythonLines=lines.filter(line=>line.contains(\"Python\"))pythonLines:spark.RDD[String]=FilteredRDD[...]scala>lines.first() res0:String=##InteractivePythonShell Note Ifyouareunfamiliarwiththelambdaor=>syntaxabove,itisashorthandwaytodefinefunctionsinlineinPythonandScala.WhenusingSparkintheselanguages,youcanalsodefineafunctionseparatelyandthenpassitsnametoSpark.Forexample,inPython:defhasPython(line): return\"Python\"inline pythonLines=lines.filter(hasPython) PassingfunctionstoSparkisalsopossibleinJava,butinthiscasetheyaredefinedasclasses,implementinganinterfacecalledFunction.Forexample:JavaRDD Booleancall(Stringline){returnline.contains(\"Python\");}}); Java8introducesshorthandsyntaxcalled“lambdas”thatlookssimilartoPythonandScala.Hereishowthecodewouldlookwiththissyntax: 22JavaRDD WhilewewillcovertheSparkAPIinmoredetaillater,alotofitsmagicisthatfunction-basedoperationslikefilteralsoparallelizeacrossthecluster.Thatis,Sparkautomaticallytakesyourfunction(e.g.line.contains(\"Python\"))andshipsittoexecutornodes.Thus,youcanwritecodeinasingledriverprogramandautomaticallyhavepartsofitrunonmultiplenodes.Chapter3coverstheRDDAPIinmoredetail. StandaloneApplications ThefinalpiecemissinginthisquicktourofSparkishowtouseitinstandaloneprograms.Apartfromrunninginteractively,SparkcanbelinkedintostandaloneapplicationsineitherJava,ScalaorPython.ThemaindifferencefromusingitintheshellisthatyouneedtoinitializeyourownSparkContext.Afterthat,theAPIisthesame. TheprocessoflinkingtoSparkvariesbylanguage.InJavaandScala,yougiveyourapplicationaMavendependencyonthespark-coreartifactpublishedbyApache.Asofthetimeofwriting,thelatestSparkversionis1.0.0,andtheMavencoordinatesforthatare:groupId=org.apache.sparkartifactId=spark-core_2.10version=1.0.0 IfyouareunfamiliarwithMaven,itisapopularpackagemanagementtoolforJava-basedlanguagesthatletsyoulinktolibrariesinpublicrepositories.YoucanuseMavenitselftobuildyourproject,oruseothertoolsthatcantalktotheMavenrepositories,includingScala’sSBTtoolorGradle.PopularintegrateddevelopmentenvironmentslikeEclipsealsoallowyoutodirectlyaddaMavendependencytoaproject. InPython,yousimplywriteapplicationsasPythonscripts,butyoumustinsteadrunthemusingaspecialbin/spark-submitscriptincludedinSpark.ThisscriptsetsuptheenvironmentforSpark’sPythonAPItofunction.Simplyrunyourscriptwith:bin/spark-submitmy_script.py (NotethatyouwillhavetousebackslashesinsteadofforwardslashesonWindows.) 23Note InSparkversionsbefore1.0,usebin/pysparkmy_script.pytorunPythonapplicationsinstead.FordetailedexamplesoflinkingapplicationstoSpark,refertotheQuickStartGuideanappendix. [6]inthe officialSparkdocumentation.Inafinalversionofthebook,wewillalsoincludefullexamplesin InitializingaSparkContext OnceyouhavelinkedanapplicationtoSpark,youneedtoimporttheSparkpackagesinyourprogramandcreateaSparkContext.ThisisdonebyfirstcreatingaSparkConfobjecttoconfigureyourapplication,andthenbuildingaSparkContextforit.Hereisashortexampleineachsupportedlanguage: Example2-5.InitializingSparkinPythonfrompysparkimportSparkConf,SparkContext conf=SparkConf().setMaster(\"local\").setAppName(\"MyApp\")sc=SparkContext(conf) Example2-6.InitializingSparkinJavaimportorg.apache.spark.SparkConf; importorg.apache.spark.api.java.JavaSparkContext; SparkConfconf=newSparkConf().setMaster(\"local\").setAppName(\"MyApp\");JavaSparkContextsc=newJavaSparkContext(conf); Example2-7.InitializingSparkinScalaimportorg.apache.spark.SparkConfimportorg.apache.spark.SparkContextimportorg.apache.spark.SparkContext._ valconf=newSparkConf().setMaster(\"local\").setAppName(\"MyApp\")valsc=newSparkContext(\"local\\"MyApp\") TheseexamplesshowtheminimalwaytoinitializeaSparkContext,whereyoupasstwoparameters: AclusterURL,namely“local”intheseexamples,whichtellsSparkhowtoconnecttoacluster.“local”isaspecialvaluethatrunsSparkononethreadonthelocalmachine,withoutconnectingtoacluster.24Anapplicationname,namely“MyApp”intheseexamples.Thiswillidentifyyourapplicationontheclustermanager’sUIifyouconnecttoacluster.Additionalparametersexistforconfiguringhowyourapplicationexecutesoraddingcodetobeshippedtothecluster,butwewillcovertheseinlaterchaptersofthebook. AfteryouhaveinitializedaSparkContext,youcanuseallthemethodsweshowedbeforetocreateRDDs(e.g.fromatextfile)andmanipulatethem. Finally,toshutdownSpark,youcaneithercallthestop()methodonyourSparkContext,orsimplyexittheapplication(e.g.withSystem.exit(0)orsys.exit()). ThisquickoverviewshouldbeenoughtoletyourunastandaloneSparkapplicationonyourlaptop.Formoreadvancedconfiguration,alaterchapterinthebookwillcoverhowtoconnectyourapplicationtoacluster,includingpackagingyourapplicationsothatitscodeisautomaticallyshippedtoworkernodes.Fornow,pleaserefertotheQuickStartGuideofficialSparkdocumentation. [7]inthe Conclusion Inthischapter,wehavecovereddownloadingSpark,runningitlocallyonyourlaptop,andusingiteitherinteractivelyorfromastandaloneapplication.WegaveaquickoverviewofthecoreconceptsinvolvedinprogrammingwithSpark:adriverprogramcreatesaSparkContextandRDDs,andthenrunsparalleloperationsonthem.Inthenextchapter,wewilldivemoredeeplyintohowRDDsoperate. [5]http://spark.apache.org/docs/latest/quick-start.html[6]http://spark.apache.org/docs/latest/quick-start.html[7]http://spark.apache.org/docs/latest/quick-start.html25Chapter3.ProgrammingwithRDDs ThischapterintroducesSpark’scoreabstractionforworkingwithdata,theResilientDistributedDataset(RDD).AnRDDissimplyadistributedcollectionofelements.InSparkallworkisexpressedaseithercreatingnewRDDs,transformingexistingRDDs,orcallingoperationsonRDDstocomputearesult.Underthehood,SparkautomaticallydistributesthedatacontainedinRDDsacrossyourclusterandparallelizestheoperationsyouperformonthem. BothDataScientistsandEngineersshouldreadthischapter,asRDDsarethecoreconceptinSpark.Wehighlyrecommendthatyoutrysomeoftheseexamplesinaninteractiveshell(seeIntroductiontoSpark’sPythonandScalaShells).Inaddition,allcodeinthischapterisavailableinthebook’sGitHubrepository. RDDBasics AnRDDinSparkissimplyadistributedcollectionofobjects.EachRDDissplitintomultiplepartitions,whichmaybecomputedondifferentnodesofthecluster.RDDscancontainanytypeofPython,JavaorScalaobjects,includinguser-definedclasses. UserscreateRDDsintwoways:byloadinganexternaldataset,orbydistributingacollectionofobjectsintheirdriverprogram.WehavealreadyseenloadingatextfileasanRDDofstringsusingSparkContext.textFile(): Example3-1.CreatinganRDDofstringswithtextFile()inPython>>>lines=sc.textFile(\"README.md\") Oncecreated,RDDsoffertwotypesofoperations:transformationsandactions.TransformationsconstructanewRDDfromapreviousone.Forexample,onetransformationwesawbeforeisfilteringdatathatmatchesapredicate.Inourtextfileexample,wecanusethistocreateanewRDDholdingjustthestringsthatcontain“Python”:>>>pythonLines=lines.filter(lambdaline:\"Python\"inline) Actions,ontheotherhand,computearesultbasedonanRDD,andeitherreturnittothedriverprogramorsaveittoanexternalstoragesystem(e.g.,HDFS).Oneexampleofanactionwecalledearlierisfirst(),whichreturnsthefirstelementinanRDD:>>>pythonLines.first() 26u'##InteractivePythonShell' ThedifferencebetweentransformationsandactionsisduetothewaySparkcomputesRDDs.AlthoughyoucandefinenewRDDsanytime,Sparkonlycomputestheminalazyfashion,thefirsttimetheyareusedinanaction.Thisapproachmightseemunusualatfirst,butmakesalotofsensewhenworkingwithbigdata.Forinstance,considertheexampleabove,wherewedefinedatextfileandthenfilteredthelineswith“Python”.IfSparkweretoloadandstoreallthelinesinthefileassoonaswewrotelines=sc.textFile(...),itwouldwastealotofstoragespace,giventhatwethenimmediatelyfilteroutmanylines.Instead,onceSparkseesthewholechainoftransformations,itcancomputejustthedataneededforitsresult.Infact,forthefirst()action,Sparkonlyscansthefileuntilitfindsthefirstmatchingline;itdoesn’tevenreadthewholefile. Finally,Spark’sRDDsarebydefaultrecomputedeachtimeyourunanactiononthem.IfyouwouldliketoreuseanRDDinmultipleactions,youcanaskSparktopersistitusingRDD.persist().Aftercomputingitthefirsttime,SparkwillstoretheRDDcontentsinmemory(partitionedacrossthemachinesinyourcluster),andreusetheminfutureactions.PersistingRDDsondiskinsteadofmemoryisalsopossible.Thebehaviorofnotpersistingbydefaultmayagainseemunusual,butitmakesalotofsenseforbigdatasets:ifyouwillnotreusetheRDD,there’snoreasontowastestoragespacewhenSparkcouldinsteadstreamthroughthedataonceandjustcomputetheresult.[8]Inpractice,youwilloftenusepersisttoloadasubsetofyourdataintomemoryandqueryitrepeatedly.Forexample,ifweknewthatwewantedtocomputemultipleresultsabouttheREADMElinesthatcontain“Python”,wecouldwrite:>>>pythonLines.persist()>>>pythonLines.count()2 >>>pythonLines.first()u'##InteractivePythonShell' Tosummarize,everySparkprogramandshellsessionwillworkasfollows: 1.CreatesomeinputRDDsfromexternaldata.2.TransformthemtodefinenewRDDsusingtransformationslikefilter().3.AskSparktopersist()anyintermediateRDDsthatwillneedtobereused.274.Launchactionssuchascount()andfirst()tokickoffaparallelcomputation,whichisthenoptimizedandexecutedbySpark.Intherestofthischapter,we’llgothrougheachofthesestepsindetail,andcoversomeofthemostcommonRDDoperationsinSpark. CreatingRDDs SparkprovidestwowaystocreateRDDs:loadinganexternaldatasetandparallelizingacollectioninyourdriverprogram. ThesimplestwaytocreateRDDsistotakeanexistingin-memorycollectionandpassittoSparkContext’sparallelizemethod.ThisapproachisveryusefulwhenlearningSpark,sinceyoucanquicklycreateyourownRDDsintheshellandperformoperationsonthem.Keepinmindhowever,thatoutsideofprototypingandtesting,thisisnotwidelyusedsinceitrequiresyouhaveyourentiredatasetinmemoryononemachine. Example3-2.Pythonparallelizeexamplelines=sc.parallelize([\"pandas\\"ilikepandas\"]) Example3-3.Scalaparallelizeexamplevallines=sc.parallelize(List(\"pandas\\"ilikepandas\")) Example3-4.JavaparallelizeexampleJavaRDD AmorecommonwaytocreateRDDsistoloaddatainexternalstorage.LoadingexternaldatasetsiscoveredindetailinChapter5.However,wealreadysawonemethodthatloadsatextfileasanRDDofstrings,SparkContext.textFile: Example3-5.PythontextFileexamplelines=sc.textFile(\"/path/to/README.md\") Example3-6.ScalatextFileexamplevallines=sc.textFile(\"/path/to/README.md\") Example3-7.JavatextFileexampleJavaRDD RDDOperations RDDssupporttwotypesofoperations,transformationsandactions.TransformationsareoperationsonRDDsthatreturnanewRDD,suchasmapandfilter.Actionsareoperationsthatreturnaresultbacktothedriverprogramorwriteittostorage,andkickoffacomputation,such 28ascountandfirst.Sparktreatstransformationsandactionsverydifferently,sounderstandingwhichtypeofoperationyouareperformingwillbeimportant.Ifyouareeverconfusedwhetheragivenfunctionisatransformationorandaction,youcanlookatitsreturntype:transformationsreturnRDDswhereasactionsreturnsomeotherdatatype. Transformations TransformationsareoperationsonRDDsthatreturnanewRDD.Asdiscussedinthelazyevaluationsection,transformedRDDsarecomputedlazily,onlywhenyouusetheminanaction.Manytransformationsareelement-wise,thatistheyworkononeelementatatime,butthisisnottrueforalltransformations. Asanexample,supposethatwehavealogfile,log.txt,withanumberofmessages,andwewanttoselectonlytheerrormessages.Wecanusethefiltertransformationseenbefore.Thistimethough,we’llshowafilterinallthreeofSpark’slanguageAPIs: Example3-8.PythonfilterexampleinputRDD=sc.textFile(\"log.txt\") errorsRDD=inputRDD.filter(lambdax:\"error\"inx) Example3-9.ScalafilterexamplevalinputRDD=sc.textFile(\"log.txt\") valerrorsRDD=inputRDD.filter(line=>line.contains(\"error\")) Example3-10.JavafilterexampleJavaRDD publicBooleancall(Stringx){returnx.contains(\"error\");}}); NotethatthefilteroperationdoesnotmutatetheexistinginputRDD.Instead,itreturnsapointertoanentirelynewRDD.inputRDDcanstillbere-usedlaterintheprogram,forinstance,tosearchforotherwords.Infact,let’suseinputRDDagaintosearchforlineswiththeword“warning”inthem.Then,we’lluseanothertransformation,union,toprintoutthenumberoflinesthatcontainedeither“error”or“warning”.WeshowPythonhere,buttheunion()functionisidenticalinallthreelanguages: Example3-11.PythonunionexampleerrorsRDD=inputRDD.filter(lambdax:\"error\"inx)warningsRDD=inputRDD.filter(lambdax:\"warning\"inx) 29badLinesRDD=errorsRDD.union(warningsRDD) unionisabitdifferentthanfilter,inthatitoperatesontwoRDDsinsteadofone.TransformationscanactuallyoperateonanynumberofinputRDDs. Finally,asyouderivenewRDDsfromeachotherusingtransformations,SparkkeepstrackofthesetofdependenciesbetweendifferentRDDs,calledthelineagegraph.ItusesthisinformationtocomputeeachRDDondemandandtorecoverlostdataifpartofapersistentRDDislost.WewillshowalineagegraphforthisexampleinFigure3-1. Actions We’veseenhowtocreateRDDsfromeachotherwithtransformations,butatsomepoint,we’llwanttoactuallydosomethingwithourdataset.ActionsarethesecondtypeofRDDoperation.Theyaretheoperationsthatreturnafinalvaluetothedriverprogramorwritedatatoanexternalstoragesystem.ActionsforcetheevaluationofthetransformationsrequiredfortheRDDtheyarecalledon,sincetheyarerequiredtoactuallyproduceoutput. Continuingthelogexamplefromtheprevioussection,wemightwanttoprintoutsomeinformationaboutthebadLinesRDD.Todothat,we’llusetwoactions,count(),whichreturnsthecountasanumber,andtake(),whichcollectsanumberofelementsfromtheRDD. Example3-12.Pythonerrorcountexampleusingactionsprint\"Inputhad\"+badLinesRDD.count()+\"concerninglines\"print\"Hereare10examples:\"forlineinbadLinesRDD.take(10): printline Example3-13.Scalaerrorcountexampleusingactionsprintln(\"Inputhad\"+badLinesRDD.count()+\"concerninglines\")println(\"Hereare10examples:\")badLinesRDD.take(10).foreach(println) Example3-14.JavaerrorcountexampleusingactionsSystem.out.println(\"Inputhad\"+badLinesRDD.count()+\"concerninglines\")System.out.println(\"Hereare10examples:\")for(Stringline:badLinesRDD.take(10)){System.out.println(line);} Inthisexample,weusedtake()toretrieveasmallnumberofelementsintheRDDatthedriverprogram.Wetheniterateoverthemlocallytoprintoutinformationatthedriver.RDDsalso 30haveacollect()functiontoretrievetheentireRDD.ThiscanbeusefulifyourprogramfiltersRDDsdowntoaverysmallsizeandyou’dliketodealwithitlocally.Keepinmindthatyourentiredatasetmustfitinmemoryonasinglemachinetousecollect()onit,socollect()shouldn’tbeusedonlargedatasets. InmostcasesRDDscan’tjustbecollect()‘edtothedriverbecausetheyaretoolarge.Inthesecases,it’scommontowritedataouttoadistributedstoragesystemssuchasHDFSorAmazonS3.ThecontentsofanRDDcanbesavedusingthesaveAsTextFileaction,saveAsSequenceFileoranyofanumberactionsforvariousbuilt-informats.WewillcoverthedifferentoptionsforexportingdatalateroninChapter5. Theimagebelowpresentsthelineagegraphforthisentireexample,startingwithourinputRDDandendingwiththetwoactions.Itisimportanttonotethateachtimewecallanewaction,theentireRDDmustbecomputed“fromscratch”.Toavoidthisinefficiency,userscanpersistintermediateresults,aswewillcoverinPersistence(Caching). Figure3-1.RDDlineagegraphcreatedduringloganalysis.LazyEvaluation TransformationsonRDDsarelazilyevaluated,meaningthatSparkwillnotbegintoexecuteuntilitseesanaction.Thiscanbesomewhatcounter-intuitivefornewusers,butmaybefamiliarforthosewhohaveusedfunctionallanguagessuchasHaskellorLINQ-likedataprocessingframeworks. 31LazyevaluationmeansthatwhenwecallatransformationonanRDD(forinstancecallingmap),theoperationisnotimmediatelyperformed.Instead,Sparkinternallyrecordsmeta-datatoindicatethisoperationhasbeenrequested.RatherthanthinkingofanRDDascontainingspecificdata,itisbesttothinkofeachRDDasconsistingofinstructionsonhowtocomputethedatathatwebuildupthroughtransformations.LoadingdataintoanRDDislazilyevaluatedinthesamewaytransformationsare.Sowhenwecallsc.textFilethedataisnotloadeduntilitisnecessary.Likewithtransformations,theoperation(inthiscasereadingthedata)canoccurmultipletimes. Tip Althoughtransformationsarelazy,forceSparktoexecutethematanytimebyrunninganaction,suchascount().Thisisaneasywaytotestoutjustpartofyourprogram. Sparkuseslazyevaluationtoreducethenumberofpassesithastotakeoverourdatabygroupingoperationstogether.InMapReducesystemslikeHadoop,developersoftenhavetospendalottimeconsideringhowtogrouptogetheroperationstominimizethenumberofMapReducepasses.InSpark,thereisnosubstantialbenefittowritingasinglecomplexmapinsteadofchainingtogethermanysimpleoperations.Thus,usersarefreetoorganizetheirprogramintosmaller,moremanageableoperations. PassingFunctionstoSpark MostofSparktransformations,andsomeofitsactions,dependonpassinginfunctionsthatareusedbySparktocomputedata.EachofthecorelanguageshasaslightlydifferentmechanismforpassingfunctionstoSpark. Python InPython,wehavethreeoptionsforpassingfunctionsintoSpark.Forshorterfunctionwecanpassinlambdaexpressions,aswehavedoneintheexampleatthestartofthischapter.Wecanalsopassintop-levelfunctions,orlocallydefinedfunctions. Example3-15.PassingalambdainPythonword=rdd.filter(lambdas:\"error\"ins)Passingatop-levelPythonfunction.defcontainsError(s): return\"error\"ins 32word=rdd.filter(containsError) Oneissuetowatchoutforwhenpassingfunctionsifthatifyoupassfunctionsthataremembersofanobject,orreferencestofieldsinanobject(e.g.,self.field),thisresultsinsendingintheentireobject,whichcanbemuchlargerthanjustthebitofinformationyouneed.Sometimesthiscanalsocauseyourprogramtofail,ifyourclasscontainsobjectsthatPythoncan’tfigureouthowtopickle. Example3-16.Passingafunctionwithfieldreferences(don’tdothis!)classSearchFunctions(object):def__init__(self,query): self.query=querydefisMatch(self,s): returnqueryins defgetMatchesFunctionReference(self,rdd): #Problem:referencesallof\"self\"in\"self.isMatch\"returnrdd.filter(self.isMatch) defgetMatchesMemberReference(self,rdd): #Problem:referencesallof\"self\"in\"self.query\"returnrdd.filter(lambdax:self.queryinx) Instead,justextractthefieldsyouneedfromyourobjectintolocalvariableandpassthatin,likewedobelow: Example3-17.PythonfunctionpassingwithoutfieldreferencesclassWordFunctions(object):... defgetMatchesNoReference(self,rdd): #Safe:extractonlythefieldweneedintoalocalvariablequery=self.query returnrdd.filter(lambdax:queryinx) Scala InScala,wecanpassinfunctionsdefinedinlineorreferencestomethodsorstaticfunctionsaswedoforScala’sotherfunctionalAPIs.Someotherconsiderationscomeintoplaythough,namelythatthefunctionwepassandthedatareferencedinitneedstobeSerializable(implementingJava’sSerializableinterface).Furthermore,likeinPython,passingamethodorfieldofanobjectincludesareferencetothatwholeobject,thoughthisislessobviousbecausewearenotforcedtowritethesereferenceswithself.LikehowwedidwithPython,wecan 33insteadextractoutthefieldsweneedaslocalvariablesandavoidneedingtopassthewholeobjectcontainingthem. Example3-18.ScalafunctionpassingclassSearchFunctions(valquery:String){defisMatch(s:String):Boolean={s.contains(query)} defgetMatchesFunctionReference(rdd:RDD[String]):RDD[String]={//Problem:\"isMatch\"means\"this.isMatch\sowepassallof\"this\"rdd.map(isMatch)} defgetMatchesFieldReference(rdd:RDD[String]):RDD[String]={//Problem:\"query\"means\"this.query\sowepassallof\"this\"rdd.map(x=>x.split(query))} defgetMatchesNoReference(rdd:RDD[String]):RDD[String]={//Safe:extractjustthefieldweneedintoalocalvariablevalquery_=this.queryrdd.map(x=>x.split(query_))}} Ifyou“NotSerializableException”errorsinScala,areferencetoamethodorfieldinanon-serializableclassisusuallytheproblem.Notethatpassinginlocalvariablesorfunctionsthataremembersofatop-levelobjectisalwayssafe. Java InJava,functionsarespecifiedasobjectsthatimplementoneofSpark’sfunctioninterfacesfromtheorg.apache.spark.api.java.functionpackage.Thereareanumberofdifferentinterfacesbasedonthereturntypeofthefunction.Weshowthemostbasicfunctioninterfacesbelow,andcoveranumberofotherfunctioninterfacesforwhenweneedtoreturnspecialtypesofdatainthesectiononconvertingbetweenRDDtypes. 34Table3-1.StandardJavafunctioninterfacesFunctionnamemethodtoimplementUsageFunction Takeinoneinputandreturnoneoutput,forusewiththingslikemapandfilter. Takeintwoinputsandreturnoneoutput,forusewiththingslikeaggregateorfold. Takeinoneinputandreturnzeroormoreoutputs,forusewiththingslikeflatMap. Function2 Iterable Wecaneitherdefineourfunctionclassesin-lineasanonymousinnerclasses,ormakeanamedclass: Example3-19.JavafunctionpassingwithanonymousinnerclassRDD Example3-20.JavafunctionpassingwithnamedclassclassContainsErrorimplementsFunction RDD Thestyletochooseisapersonalpreference,butwefindthattop-levelnamedfunctionsareoftencleanerfororganizinglargeprograms.Oneotherbenefitoftop-levelfunctionsisthatyoucangivethemconstructorparameters: Example3-21.JavafunctionclasswithparametersclassContainsimplementsFunction publicContains(Stringquery){this.query=query;}publicBooleancall(Stringx){returnx.contains(query);}} RDD 35InJava8,youcanalsouselambdaexpressionstoconciselyimplementtheFunctioninterfaces.SinceJava8isstillrelativelynewasofthewritingofthisbook,ourexamplesusethemoreverbosesyntaxfordefiningclassesinpreviousversionsofJava.However,withlambdaexpressions,oursearchexamplewouldlooklikethis: Example3-22.JavafunctionpassingwithlambdaexpressioninJava8RDD IfyouareinterestedinusingJava8’slambdaexpression,refertoOracle’sdocumentationandtheDatabricksblogpostonhowtouselambdaswithSpark. Tip Bothanonymousinnerclassesandlambdaexpressionscanreferenceanyfinalvariablesinthemethodenclosingthem,soyoucanpassthesevariablestoSparkjustlikeinPythonandScala. CommonTransformationsandActions Inthischapter,wetourthemostcommontransformationsandactionsinSpark.AdditionaloperationsareavailableonRDDscontainingcertaintypeofdata—forexample,statisticalfunctionsonRDDsofnumbers,andkey-valueoperationssuchasaggregatingdatabykeyonRDDsofkey-valuepairs.WecoverconvertingbetweenRDDtypesandthesespecialoperationsinlatersections. BasicRDDs WewillbeginbyevaluatingwhatoperationswecandoonallRDDsregardlessofthedata.ThesetransformationsandactionsareavailableonallRDDclasses. Transformations Element-wisetransformations ThetwomostcommontransformationsyouwilllikelybeperformingonbasicRDDsaremap,andfilter.ThemaptransformationtakesinafunctionandappliesittoeachelementintheRDDwiththeresultofthefunctionbeingthenewvalueofeachelementintheresultingRDD.ThefiltertransformationtakeinafunctionandreturnsanRDDwhichonlyhaselementsthatpassthefilterfunction. 36Figure3-2.MapandfilteronanRDDWecanusemaptodoanynumberofthingsfromfetchingthewebsiteassociatedwitheachURLinourcollection,tojustsquaringthenumbers.WithScalaandpythonyoucanusethestandardanonymousfunctionnotationorpassinafunction,andwithJavayoushoulduseSpark’sFunctionclassfromorg.apache.spark.api.java.functionorJava8functions. Itisusefultonotethatthereturntypeofthemapdoesnothavetobethesameastheinputtype,soifwehadanRDDofcustomerIDsandourmapfunctionweretofetchthecorrespondingcustomerrecordsthetypeofourinputRDDwouldbeRDD[CustomerID]andthetypeoftheresultingRDDwouldbeRDD[CustomerRecord]. LetslookatabasicexampleofmapwhichsquaresallofthenumbersinanRDD: Example3-23.PythonsquaringthevalueinanRDDnums=sc.parallelize([1,2,3,4]) squared=nums.map(lambdax:x*x).collect()fornuminsquared: print\"%i\"%(num) Example3-24.ScalasquaringthevaluesinanRDDvalinput=sc.parallelize(List(1,2,3,4))valresult=input.map(x=>x*x)println(result.collect()) Example3-25.JavasquaringthevaluesinanRDDJavaRDD System.out.println(StringUtils.join(result.collect(),\ Sometimeswewanttoproducemultipleoutputelementsforeachinputelement.TheoperationtodothisiscalledflatMap.Likewithmap,thefunctionweprovidetoflatMapiscalled 37individuallyforeachelementinourinputRDD.Insteadofreturningasingleelement,wereturnaniteratorwithourreturnvalues.RatherthanproducinganRDDofiterators,wegetbackanRDDwhichconsistsoftheelementsfromalloftheiterators.AsimpleexampleofflatMapissplittingupaninputstringintowords,asshownbelow. Example3-26.PythonflatMapexample,splittinglinesintowordslines=sc.parallelize([\"helloworld\\"hi\"])words=lines.flatMap(lambdaline:line.split(\"\"))words.first()#returns\"hello\" Example3-27.ScalaflatMapexample,splittinglinesintomultiplewordsvallines=sc.parallelize(List(\"helloworld\\"hi\"))valwords=lines.flatMap(line=>line.split(\"\"))words.first()//returns\"hello\" Example3-28.ScalaflatMapexample,splittinglinesintomultiplewordsJavaRDD words.first();//returns\"hello\" PseudoSetOperations Figure3-3.Somesimplesetoperations(imagetoberedone)RDDssupportmanyoftheoperationsofmathematicalsets,suchasunionandintersection,evenwhentheRDDsthemselvesnotproperlysets. 38ThesetpropertymostfrequentlymissingfromourRDDsistheuniquenessofelements.IfweonlywantuniqueelementswecanusetheRDD.distinct()transformationtoproduceanewRDDwithonlydistinctitems.Notethatdistinct()isexpensive,however,asitrequiresshufflingallthedataoverthenetworktoensurethatweonlyreceiveonecopyofeachelement. Thesimplestsetoperationisunion(other),whichgivesbackanRDDconsistingofthedatafrombothsources.Thiscanbeusefulinanumberofusecases,suchasprocessinglogfilesfrommanysources.Unlikethemathematicalunion(),ifthereareduplicatesintheinputRDDs,theresultofSpark’sunion()willcontainduplicates(whichwecanfixifdesiredwithdistinct()). Sparkalsoprovidesanintersection(other)method,whichreturnsonlyelementsinbothRDDs.intersection()alsoremovesallduplicates(includingduplicatesfromasingleRDD)whilerunning.Whileintersectionandunionaretoverysimilarconcepts,theperformanceofintersectionismuchworsesinceitrequiresashuffleoverthenetworktoidentifycommonelements. Sometimesweneedtoremovesomedatafromconsideration.Thesubtract(other)functiontakesinanotherRDDandreturnsanRDDthatonlyhasvaluespresentinthefirstRDDandnotthesecondRDD. WecanalsocomputeaCartesianproductbetweentwoRDDs.Thecartesian(other)transformationresultsinpossiblepairsof(a,b)whereaisinthesourceRDDandbisintheotherRDD.TheCartesianproductcanbeusefulwhenwewishtoconsiderthesimilaritybetweenallpossiblepairs,suchascomputingeveryusersexpectedinterestsineachoffer.WecanalsotaketheCartesianproductofanRDDwithitself,whichcanbeusefulfortaskslikecomputingusersimilarity. 39Figure3-4.CartesianproductbetweentwoRDDsThetablesbelowsummarizecommonsingle-RDDandmulti-RDDtransformations. Table3-2.BasicRDDtransformationsonanRDDcontaining{1,2,3,3}FunctionNamePurposeExampleResultApplyafunctiontoeach map elementintheRDDandreturnanRDDoftheresultApplyafunctiontoeachelementintheRDDand flatMap returnanRDDofthecontentsoftheiteratorsreturned.Oftenusedtoextractwords. ReturnanRDDconsistingof filterdistinct sample(withReplacement,fraction,[seed]) onlyelementswhichpasstheconditionpassedtofilterRemoveduplicatesSampleanRDD rdd.map(x=>x+1){2,3,4,4} rdd.flatMap(x=>x.to(3)){1,2,3,2,3,3,3} rdd.filter(x=>x!=1)rdd.distinct()rdd.sample(false,0.5){2,3,3}{1,2,3} non-deterministic Table3-3.Two-RDDtransformationsonRDDscontaining{1,2,3}and{3,4,5}FunctionNamePurposeExampleResultunionintersectionsubtractcartesian ProduceanRDDcontainelementsfrombothRDDs RDDcontainingonlyelementsfoundinbothRDDs RemovethecontentsofoneRDD(e.g.removetrainingdata) CartesianproductwiththeotherRDD rdd.union(other){1,2,3,3,4,5} rdd.intersection(other){3}rdd.subtract(other)rdd.cartesian(other) {1,2} {(1,3),(1,4),…(3,5)} AsyoucanseethereareawidevarietyoftransformationsavailableonallRDDsregardlessofourspecificunderlyingdata.Wecantransformourdataelement-wise,obtaindistinctelements,anddoavarietyofsetoperations. 40Actions ThemostcommonactiononbasicRDDsyouwilllikelyuseisreduce.ReducetakesinafunctionwhichoperatesontwoelementsofthesametypeofyourRDDandreturnsanewelementofthesametype.Asimpleexampleofsuchafunctionis+,whichwecanusetosumourRDD.WithreducewecaneasilysumtheelementsofourRDD,countthenumberofelements,andperformothertypesofaggregations. Example3-29.Pythonreduceexamplesum=rdd.reduce(lambdax,y:x+y) Example3-30.Scalareduceexamplevalsum=rdd.reduce((x,y)=>x+y) Example3-31.JavareduceexampleIntegersum=rdd.reduce(newFunction2 Similartoreduceisfoldwhichalsotakesafunctionwiththesamesignatureasneededforreduce,butalsotakesa“zerovalue”tobeusedfortheinitialcalloneachpartition.Thezerovalueyouprovideshouldbetheidentityelementforyouroperation,thatisapplyingitmultipletimeswithyourfunctionshouldnotchangethevalue,(e.g.0for+,1for*,oranemptylistforconcatenation). Tip Youcanminimizeobjectcreationinfoldbymodifyingandreturningthefirstofthetwoparametersin-place.However,youshouldnotmodifythesecondparameter. FoldandreducebothrequirethatthereturntypeofourresultbethesametypeasthatoftheRDDweareoperatingover.Thisworkswellfordoingthingslikesum,butsometimeswewanttoreturnadifferenttype.Forexamplewhencomputingtherunningaverageweneedtohaveadifferentreturntype.Wecouldimplementthisusingamapfirstwherewetransformeveryelementintotheelementandthenumber1sothatthereducefunctioncanworkonpairs.TheaggregatefunctionfreesusfromtheconstraintofhavingthereturnthesametypeastheRDDwhichweareworkingon.Withaggregate,likefold,wesupplyaninitialzerovalueofthetypewewanttoreturn.WethensupplyafunctiontocombinetheelementsfromourRDDwiththeaccumulator.Finally,weneedtosupplyasecondfunctiontomergetwoaccumulators,giventhateachnodeaccumulatesitsownresultslocally. 41WecanuseaggregatetocomputetheaverageofaRDDavoidingamapbeforethefold. Example3-32.PythonaggregateexamplesumCount=nums.aggregate((0,0), (lambdax,y:(x[0]+y,x[1]+1),(lambdax,y:(x[0]+y[0],x[1]+y[1])))) returnsumCount[0]/float(sumCount[1]) Example3-33.Scalaaggregateexamplevalresult=input.aggregate((0,0))( (x,y)=>(x._1+y,x._2+1),(x,y)=>(x._1+y._1,x._2+y._2)) valavg=result._1/result._2.toDouble Example3-34.JavaaggregateexampleclassAvgCount{ publicAvgCount(inttotal,intnum){this.total=total;this.num=num;} publicinttotal;publicintnum;publicdoubleavg(){ returntotal/(double)num;}}
因篇幅问题不能全部显示,请点此查看更多更全内容
Copyright © 2019- baomayou.com 版权所有 赣ICP备2024042794号-6
违法及侵权请联系:TEL:199 18 7713 E-MAIL:2724546146@qq.com
本站由北京市万商天勤律师事务所王兴未律师提供法律服务