-1

I have created a beam pipeline to read data from the Kafka topic and then insert it to hive tables in DataProc cluster

I consumed the data and converted it to the HcatRecord as below

p.getCoderRegistry()
        .registerCoderForClass(HCatRecord.class, WritableCoder.of(DefaultHCatRecord.class));


PCollection<KafkaRecord<byte[], byte[]>> collection = p.apply(KafkaIO.<byte[], byte[]>read()
                .withBootstrapServers("host:9092").withTopic("topic")
                .withConsumerConfigUpdates(ImmutableMap.of("specific.avro.reader", "true"))
                .withStartReadTime(Instant.now().minus(org.joda.time.Duration.standardDays(10)))
                .withKeyDeserializer(ByteArrayDeserializer.class)
                .commitOffsetsInFinalize()
                .withValueDeserializer(ByteArrayDeserializer.class)
                .withConsumerConfigUpdates(ImmutableMap.of("enable.auto.commit", "true"))
                .withConsumerConfigUpdates(ImmutableMap.of("group.id", "group"))

);

PCollection<String> deserializedRecords = collection.apply("DeserializeRecords", ParDo.of(new DeserializeDoFn("http://schema-registry:port")));

PCollection<HCatRecord> hCatRecords = deserializedRecords.apply(ParDo.of(new JsonToHCatRecordFn()));

Then I use HCatalogIO to insert data into table.

hCatRecords.apply(
        HCatalogIO.write()
                .withConfigProperties(configProperties)
                .withDatabase("database") //optional, assumes default if none specified
                .withTable("table")
                .withPartition(partition) //optional, may be specified if the table is partitioned
       

     .withBatchSize(1024L)
    );

when it's runs on my local mchine it throws below exception

Exception in thread "main" org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.beam.sdk.util.UserCodeException: org.apache.hive.hcatalog.common.HCatException : 2004 : HCatOutputFormat not initialized, setOutput has to be called. Cause : org.apache.hive.hcatalog.common.HCatException : 2001 : Error setting output information. Cause : org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "gs"
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2086)
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.get(LocalCache.java:4012)
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035)
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5013)
    at org.apache.beam.runners.direct.DoFnLifecycleManager.get(DoFnLifecycleManager.java:64)
    at org.apache.beam.runners.direct.ParDoEvaluatorFactory.createEvaluator(ParDoEvaluatorFactory.java:131)
    at org.apache.beam.runners.direct.ParDoEvaluatorFactory.forApplication(ParDoEvaluatorFactory.java:81)
    at org.apache.beam.runners.direct.TransformEvaluatorRegistry.forApplication(TransformEvaluatorRegistry.java:158)
    at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:122)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.beam.sdk.util.UserCodeException: org.apache.hive.hcatalog.common.HCatException : 2004 : HCatOutputFormat not initialized, setOutput has to be called. Cause : org.apache.hive.hcatalog.common.HCatException : 2001 : Error setting output information. Cause : org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "gs"

Then i Tried to deploy it in the gcp dataflow so while the deployment process it throws below exception

java.lang.RuntimeException: Error while staging packages
    at org.apache.beam.runners.dataflow.util.PackageUtil.stageClasspathElements (PackageUtil.java:372)
    at org.apache.beam.runners.dataflow.util.PackageUtil.stageClasspathElements (PackageUtil.java:238)
    at org.apache.beam.runners.dataflow.util.GcsStager.stageFiles (GcsStager.java:53)
    at org.apache.beam.runners.dataflow.DataflowRunner.stageArtifacts (DataflowRunner.java:1068)
    at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:1217)
    at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:203)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:325)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:310)
    at org.pickme.rt.App.run (App.java:158)
    at org.pickme.rt.App.main (App.java:165)
    at jdk.internal.reflect.DirectMethodHandleAccessor.invoke (DirectMethodHandleAccessor.java:103)
    at java.lang.reflect.Method.invoke (Method.java:580)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
    at java.lang.Thread.run (Thread.java:1583)
Caused by: java.lang.NoClassDefFoundError: com/google/cloud/hadoop/util/TraceOperation
    at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getObject (GoogleCloudStorageImpl.java:2327)
    at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getItemInfo (GoogleCloudStorageImpl.java:2225)
    at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getWriteGeneration (GoogleCloudStorageImpl.java:2295)
    at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.create (GoogleCloudStorageImpl.java:553)
    at org.apache.beam.sdk.extensions.gcp.util.GcsUtil.create (GcsUtil.java:619)
    at org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem.create (GcsFileSystem.java:154)
    at org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem.create (GcsFileSystem.java:71)
    at org.apache.beam.sdk.io.FileSystems.create (FileSystems.java:245)
    at org.apache.beam.runners.dataflow.util.PackageUtil.tryStagePackage (PackageUtil.java:215)
    at org.apache.beam.runners.dataflow.util.PackageUtil.tryStagePackageWithRetry (PackageUtil.java:174)
    at org.apache.beam.runners.dataflow.util.PackageUtil.stagePackageSynchronously (PackageUtil.java:153)
    at org.apache.beam.runners.dataflow.util.PackageUtil.lambda$stagePackage$1 (PackageUtil.java:142)
    at org.apache.beam.sdk.util.MoreFutures.lambda$supplyAsync$0 (MoreFutures.java:108)
    at java.util.concurrent.CompletableFuture$AsyncRun.run (CompletableFuture.java:1804)
    at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1144)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:642)
    at java.lang.Thread.run (Thread.java:1583)

Can you guys help me to figure out the issue here. also please share the documntes or guidence as well

1

0